一、为什么要学ORM
半个月前刚开始学FastAPI的时候,我练习时写的接口都是返回假数据,比如:
@app.get("/users")asyncdefget_users():return[{"id":1,"name":"张三"}]实际项目肯定是需要从数据库读数据的。我开始啥也不会就直接写SQL字符串,比如:
sql=f"SELECT * FROM t_user WHERE id ={user_id}"后来进一步学习相关知识时了解到这样做会有SQL注入风险,而且每个接口都要重复写连接、关闭的逻辑,代码很冗余也不想敲。然后就知道了ORM这个概念。
ORM的意思很简单:就是把数据库的表当成Python的类,把表里的每一行数据当成这个类的一个对象。这样我在操作Python对象时,ORM自动帮我翻译成SQL语句。
二、安装依赖
我学习时用的MySQL数据库,做这个的时候需要安装两个包:
pipinstallsqlalchemy[asyncio]aiomysqlsqlalchemy[asyncio]:SQLAlchemy的异步版本aiomysql:异步MySQL驱动
这里有个小坑:[asyncio]不是文件名的一部分,而是告诉pip额外安装异步相关的依赖。我第一次搞的时候没加这个方括号,结果出问题代码跑不起来。
三、连接数据库
3.1 创建异步引擎
引擎就是连接数据库的入口。异步场景下要用create_async_engine:
fromsqlalchemy.ext.asyncioimportcreate_async_engine DATABASE_URL="mysql+aiomysql://root:root@localhost:3306/fastapi_review?charset=utf8"engine=create_async_engine(DATABASE_URL,echo=True,# 打印SQL语句,学习阶段一定要开pool_size=10,# 连接池大小max_overflow=20# 最大额外连接数)echo=True这个参数特别有用,它会在控制台打印所有生成的SQL。我学习的时候就喜欢用它,这样就能直观的看到ORM到底翻译成了什么样的SQL。
3.2 应用启动时建表
我需要在服务启动的时候,自动在数据库里创建表。代码如下:
@app.on_event("startup")asyncdefinit():awaitcreate_tables()asyncdefcreate_tables():asyncwithengine.begin()asconn:awaitconn.run_sync(Base.metadata.create_all)这里我一开始不明白:为什么要用run_sync?
后来查资料才明白:Base.metadata.create_all是SQLAlchemy早期的同步API。我现在用的是异步引擎,所以需要用run_sync把它"包"一层,让它能在异步环境里执行。简单说就是——异步引擎调用同步方法,需要这个桥接。
四、定义模型
4.1 基类设计
所有模型都要继承一个基类,我把它设计成了这样:
fromsqlalchemy.ormimportDeclarativeBase,Mapped,mapped_columnfromsqlalchemyimportDateTime,funcfromdatetimeimportdatetimeclassBase(DeclarativeBase):passDeclarativeBase是SQLAlchemy 2.0的新写法,取代了旧的declarative_base()函数。我觉得这种写法更直观。
我还给基类加了两个自动维护的时间字段:
classBase(DeclarativeBase):# 创建时间:插入时自动填充create_time:Mapped[datetime]=mapped_column(DateTime,default=datetime.now)# 更新时间:插入和更新时自动刷新update_time:Mapped[datetime]=mapped_column(DateTime,default=datetime.now,onupdate=func.now(),insert_default=func.now())func.now()是SQL函数,让数据库自己算当前时间,比Python的datetime.now更准确。这样我就不用每次插入数据时手动填时间了。
4.2 用户模型
我的用户模型长这样:
classUser(Base):__tablename__="t_user"id:Mapped[int]=mapped_column(primary_key=True,autoincrement=True,name="user_id",comment="用户ID")name:Mapped[str]=mapped_column(String(20),nullable=False,name="user_name",comment="用户名称")password:Mapped[str]=mapped_column(String(20),nullable=False,name="user_password",comment="用户密码")salary:Mapped[float]=mapped_column(Float(6,2),nullable=False,name="user_salary",comment="用户薪水")birthday:Mapped[datetime]=mapped_column(DateTime,nullable=False,name="user_birthday",comment="用户出生日期")这里有几个我踩过的坑:
__tablename__必须写,不然SQLAlchemy不知道表名叫什么name="user_id"是指定数据库里的真实列名。如果不写,默认就用Python属性名idMapped[int]这种写法是SQLAlchemy 2.0的类型注解风格,比旧版更清晰Float(6, 2)表示总共6位数字,小数占2位,比如1234.56
五、会话管理:我最花时间的部分
5.1 创建会话工厂
fromsqlalchemy.ext.asyncioimportasync_sessionmaker,AsyncSession AsyncSessionLocal=async_sessionmaker(bind=engine,class_=AsyncSession,expire_on_commit=False)expire_on_commit=False这个参数很重要。如果不加,提交后再访问对象属性可能会报错。异步场景下建议一定要设成False。
5.2 依赖注入函数
这部分我研究了很久,最终写成这样:
asyncdefget_session():asyncwithAsyncSessionLocal()assession:try:yieldsessionawaitsession.commit()exceptException:awaitsession.rollback()finally:awaitsession.close()为什么用yield?这是FastAPI依赖注入的固定写法:
yield之前的代码:请求进来时执行(创建会话)yield session:把会话交给路由函数使用yield之后的代码:请求结束后执行(提交或回滚,然后关闭)
try-except-finally确保不管发生什么,连接最终都会关闭。我一开始没加finally,后来意识到如果代码中途报错,连接就泄漏了。
使用的时候很简单:
@app.get("/users")asyncdefget_user_list(session:AsyncSession=Depends(get_session)):...FastAPI会自动调用get_session(),把生成的session传进来。
六、查询操作:我写了5种场景
6.1 查询全部
@app.get("/users")asyncdefget_user_list(session:AsyncSession=Depends(get_session)):stmt=Select(User)result=awaitsession.execute(stmt)user_list=result.scalars().all()returnuser_listscalars().all()返回所有记录的对象列表。如果只想取一条,用scalar()。
6.2 精确查询
@app.get("/users/")asyncdefget_user_by_name(name:str,session:AsyncSession=Depends(get_session)):stmt=Select(User).where(User.name==name)result=awaitsession.execute(stmt)returnresult.scalars().all()6.3 模糊查询
@app.get("/users/like/")asyncdefget_user_like_name(name:str,session:AsyncSession=Depends(get_session)):stmt=Select(User).where(User.name.like(f"%{name}%"))result=awaitsession.execute(stmt)returnresult.scalars().all()like是SQL的模糊匹配,%是通配符。
6.4 批量查询(IN)
@app.get("/users/in/")asyncdefget_user_in_ids(ids:str,session:AsyncSession=Depends(get_session)):id_list=[int(item)foriteminids.split(",")]stmt=Select(User).where(User.id.in_(id_list))result=awaitsession.execute(stmt)returnresult.scalars().all()注意SQLAlchemy里用in_(),后面有个下划线,因为in是Python关键字。
6.5 范围查询(BETWEEN)
@app.get("/users/between/{min_salary}/{max_salary}")asyncdefget_user_between_salary(min_salary:float,max_salary:float,session:AsyncSession=Depends(get_session)):stmt=Select(User).where(User.salary.between(min_salary,max_salary))result=awaitsession.execute(stmt)returnresult.scalars().all()6.6 主键快捷查询
@app.get("/users/{id}")asyncdefget_user_by_id(id:int,session:AsyncSession=Depends(get_session)):user=awaitsession.get(User,id)returnusersession.get(User, id)是最简洁的主键查询方式,等同于SELECT * FROM t_user WHERE user_id = ?。
6.7 分页查询
@app.get("/users/page/{page_no}/{page_size}")asyncdefget_user_by_page(page_no:int,page_size:int,session:AsyncSession=Depends(get_session)):stmt=Select(User).offset((page_no-1)*page_size).limit(page_size)result=awaitsession.execute(stmt)returnresult.scalars().all()分页公式:跳过(page_no - 1) * page_size条,取page_size条。
七、增删改
7.1 添加数据
我先用Pydantic定义了请求模型:
classUserRequest(BaseModel):name:strpassword:strsalary:floatbirthday:datetime然后这样添加:
@app.post("/users")asyncdefadd_user(user:UserRequest,session:AsyncSession=Depends(get_session)):user_object=User(name=user.name,password=user.password,salary=user.salary,birthday=user.birthday)session.add(user_object)return{"code":200,"message":"添加成功"}注意我没有手动commit(),因为get_session在yield之后会自动提交。
7.2 删除数据
@app.delete("/users/{id}")asyncdefdelete_user(id:int,session:AsyncSession=Depends(get_session)):stmt=Delete(User).where(User.id==id)awaitsession.execute(stmt)awaitsession.commit()return{"code":200,"message":"删除成功"}这里我手动commit()了,因为删除操作我想确保立即生效。
顺便提一下,我在注释里写了:实际项目中删除基本都是逻辑删除(改个状态字段,比如
is_deleted = True),不会真的删掉数据。这个我记着了,以后做项目要注意。
7.3 更新数据
@app.put("/users/{id}")asyncdefupdate_user(id:int,user_request:UserRequest,session:AsyncSession=Depends(get_session)):stmt=Select(User).where(User.id==id)result=awaitsession.execute(stmt)user=result.scalar_one_or_none()user.name=user_request.name user.password=user_request.password user.salary=user_request.salary user.birthday=user_request.birthdayreturn{"code":200,"message":"修改成功"}SQLAlchemy的更新很"Pythonic":直接改对象属性,提交时ORM会自动生成UPDATE语句。
八、我踩过的坑
| 坑 | 现象 | 解决 |
|---|---|---|
忘记await | 报错coroutine object | 所有session.execute()都要加await |
in写成in | 语法错误 | SQLAlchemy里是in_(),带下划线 |
| 会话没关闭 | 连接池耗尽 | 用async with或yield+finally |
commit时机混乱 | 数据没写入 | 统一交给get_session管理,特殊情况再手动commit |
| 更新时没查对象 | 不知道改谁 | 先Select查出对象,再改属性 |
九、总结
学完这些知识后,我个人感受是:
- ORM确实省事:不用自己手写SQL,操作Python对象就行
- 异步要处处加
await:从引擎到会话到执行,全程异步 - 会话生命周期最重要:打开→使用→提交/回滚→关闭,这个流程不能乱
- Pydantic+ORM是绝配:前端传JSON → Pydantic校验 → 转ORM对象 → 入库
下一步我打算学:
- 把代码拆到不同文件(config/models/schemas/dependencies/routers)
- 表与表之间的关联(比如用户和订单的一对多关系)
- Alembic数据库迁移工具