Skip to content

fastapi基础用法

更新时间: 2025/6/29 本章字数: 0 字 预计阅读时长: 0 分钟

本章总结

路由

提示

可以在主目录下新建一个api和一个BASEMODELS的文件夹 那么目录如下

.
│─ api
│─ BASEMODELS
└─ main.py

提示

在BASEMODELS下新建一个文件,index.py

pyhthon
from pydantic import BaseModel


class Index(BaseModel):
    username : str
    password : str

提示

在api下新建一个文件,index.py,然后把BASEMODELSIndex导入进去,这里接收的是post请求

请求参数为usernamepassword,如果都为123,则返回true,否则返回false

python
from fastapi import APIRouter
from BASEMODELS.index import Index


index = APIRouter()

@index.post("/")
async def logins(index:Index):
    if index.username == "123" and index.password == "123":
        return true
    return false

提示

接下来main.py就变成了这样子

python
from fastapi import FastAPI, Request
from fastapi.staticfiles import StaticFiles
from fastapi.responses import  JSONResponse
from starlette.status import HTTP_404_NOT_FOUND, HTTP_405_METHOD_NOT_ALLOWED
import uvicorn
from api.index import index

app = FastAPI()

app.include_router(index, prefix="/api/index")

@app.exception_handler(HTTP_404_NOT_FOUND)
async def custom_404_handler(request, exc):
    return JSONResponse(
        status_code=HTTP_404_NOT_FOUND,
        content={"code": 404, "message": "The page you are visiting does not exist!"}
    )


@app.exception_handler(HTTP_405_METHOD_NOT_ALLOWED)
async def custom_405_handler(request, exc):
    return JSONResponse(
        status_code=HTTP_405_METHOD_NOT_ALLOWED,
        content={"code": 405, "message": "The request method is not allowed!"}
    )



if __name__ == "__main__":
    try:
        uvicorn.run(app, host="127.0.0.1", port=9987)
    except KeyboardInterrupt:
        print("Server stop")

封装

封装sql

提示

安装aiomysql

sh
pip install aiomysql

提示

接着封装一下

python
import aiomysql



class sqlaio:
    def __init__(self,host,user,passwd,db):
        self.host = host
        self.user = user
        self.passwd = passwd
        self.db = db

    async def init_pool(self, minsize=1, maxsize=10):
        self.pool = await aiomysql.create_pool(host=self.host, user=self.user,
                                               password=self.passwd, db=self.db,
                                               minsize=minsize, maxsize=maxsize,
                                               autocommit=True,
                                               port=3306,
                                               cursorclass=aiomysql.DictCursor)

    async def close_pool(self):
        if self.pool:
            self.pool.close()
            await self.pool.wait_closed()

    # 查询数据
    async def execute_query(self, query, params=None):
        async with self.pool.acquire() as conn:
            async with conn.cursor() as cur:
                await cur.execute(query, params)
                result = await cur.fetchall()
                return result

    # 修改、更新、删除
    async def execute_update(self, update_query, params=None):
        async with self.pool.acquire() as conn:
            async with conn.cursor() as cur:
                await cur.execute(update_query, params)
                affected_rows = cur.rowcount
                return affected_rows

封装一下sqlite3

提示

封装一下sqlite3

python
class SQLiteHelper:
    def __init__(self, db_name):
        self.conn = sqlite3.connect(db_name)
        # 设置游标,以便查询结果以字典形式返回
        self.conn.row_factory = sqlite3.Row
        self.cursor = self.conn.cursor()

    def close(self):
        self.conn.close()

    def execute_sql(self, sql, params=None):
        if params:
            self.cursor.execute(sql, params)
        else:
            self.cursor.execute(sql)
        self.conn.commit()

    def query(self, sql, params=None):
        if params:
            self.cursor.execute(sql, params)
        else:
            self.cursor.execute(sql)
        rows = self.cursor.fetchall()
        dict_rows = []
        for row in rows:
            dict_row = {}
            for idx, col in enumerate(self.cursor.description):
                dict_row[col[0]] = row[idx]
            dict_rows.append(dict_row)
        return dict_rows