FastAPI + SQLAlchemy 2.0 通用CRUD操作手册 —— 从同步到异步,一次讲透
本文能帮你解决什么• 彻底搞懂 SQLAlchemy 2.0 的核心变化告别旧式Column写法• 搭建同步/异步双引擎掌握连接池的最佳配置• 亲手写出通用 CRUD从单条插入到复杂关联查询全涵盖• 给出“同步还是异步”的务实答案让你不再盲目跟风• 奉上我踩过的坑和排查清单出了问题可以直接照单抓药 内容主要脉络 问题与背景异步真的更快吗 核心原理从模型定义到引擎配置 实战 CRUD增删改查 高级查询 事务与异常显式边界 FastAPI 依赖注入 注意事项同步异步共存策略 常见翻车现场 第一部分问题与背景——你其实不用那么焦虑先说一个反直觉的结论异步不会自动让你的接口变快。如果你的接口只是查一下数据库然后返回计算量很小那同步和异步的性能差距微乎其微。真正让异步闪光的是高并发 I/O 密集场景——比如你的服务要同时请求多个外部 API、读写大量 WebSocket这时候异步才能把事件循环的优势发挥出来。很多团队一上来就非异步不可结果发现 SQLAlchemy 1.x 时代异步支持根本是半残废各种 hack维护得想哭。2.0 之后异步终于能打了但也有一堆前置条件。所以简单项目、低并发、团队对异步不熟同步完全够用。如果你正准备起一个新项目而且能保证全链路异步FastAPI async DB driver 异步任务队列那上异步很香。否则别给自己找麻烦。⚙️ 第二部分核心原理与基础配置SQLAlchemy 2.0 把声明式映射彻底革新了。现在模型定义长这样from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column class Base(DeclarativeBase): pass class User(Base): __tablename__ users id: Mapped[int] mapped_column(primary_keyTrue) name: Mapped[str] mapped_column(String(50)) version: Mapped[int] mapped_column(default1) # 乐观锁版本号老式的Column()也能用但我强烈建议你全部换上Mapped mapped_column类型提示更清晰IDE 提示也舒服多了。接下来是引擎配置同步和异步的差别主要在这# 同步引擎 from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker sync_engine create_engine( postgresql://user:passlocalhost/db, pool_size20, max_overflow10, pool_pre_pingTrue, ) SyncSessionLocal sessionmaker(bindsync_engine) # 异步引擎 from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession async_engine create_async_engine( postgresqlasyncpg://user:passlocalhost/db, pool_size20, max_overflow10, pool_pre_pingTrue, ) AsyncSessionLocal async_sessionmaker(async_engine, class_AsyncSession, expire_on_commitFalse)你可能会问为毛异步 URL 要变成postgresqlasyncpg因为asyncpg是真正的异步驱动psycopg2 是同步的混用必定翻车。MySQL 用户请选aiomysql。连接池参数pool_size和max_overflow在两种模式下意义一样记得加上pool_pre_pingTrue防止连接被数据库回收后应用还傻傻复用。️ 第三部分全程实战——CRUD 一把梭下面我会同步异步对照着写你会发现核心逻辑几乎一样区别只在于 async/await 关键字和会话对象。✅ CREATE 操作# 同步单条插入 def create_user_sync(session, name: str): user User(namename) session.add(user) session.commit() session.refresh(user) # 获取数据库生成字段 return user # 异步单条插入 async def create_user_async(session: AsyncSession, name: str): user User(namename) session.add(user) await session.commit() await session.refresh(user) # 千万别忘 await return user批量插入时add_all很直观但对于大数据量我更喜欢bulk_insert_mappings速度快得多。但注意异步下 bulk 操作不支持自动刷新和关系绑定用完记得手动 commit。️ async with session.begin() 事务管理# 异步批量插入最佳实践使用 async with session.begin() # 更新和删除同理写操作一律用 begin() 管事务读操作直接用裸 session因为读一般不需要事务边界 async def bulk_create_users(session: AsyncSession, names: list[str]): async with session.begin(): session.add_all([User(namen) for n in names]) # 事务已提交无需显式 commit # 当然同步时也可使用 with session.begin() 事务自动提交user 已持有数据库生成字段✅ READ 操作主键查询用session.get()简单粗暴。高级查询统一用select()构造再也没query()什么事了。from sqlalchemy import select, and_, or_, func from sqlalchemy.orm import selectinload # 异步条件查询 排序 分页 async def search_users(session: AsyncSession, keyword: str, page: int, size: int): stmt ( select(User) .where(User.name.ilike(f%{keyword}%)) .order_by(User.id.desc()) .limit(size) .offset((page - 1) * size) ) result await session.execute(stmt) return result.scalars().all()关联查询时强烈建议显式预加载用selectinload或joinedload把 N1 问题扼杀在摇篮里。异步下尤其注意懒加载属性一访问就会抛出MissingGreenlet错误血的教训啊。stmt select(User).options(selectinload(User.posts)).where(User.id uid)聚合查询用funcstmt select(func.count(User.id)).where(User.name.like(%小%)) total (await session.execute(stmt)).scalar() 再来一个复杂综合案例搜索用户及其文章统计需求搜用户名或邮箱包含关键词的活跃用户带出他们发表的文章数量按文章数降序分页。async def search_users_with_post_count( session: AsyncSession, keyword: str, page: int 1, size: int 10, ): # 子查询每篇文章所属作者 post_count_sub ( select(Post.author_id, func.count(Post.id).label(post_count)) .group_by(Post.author_id) .subquery() ) # 主查询用户 left join 文章统计 stmt ( select(User, func.coalesce(post_count_sub.c.post_count, 0).label(post_count)) .outerjoin(post_count_sub, User.id post_count_sub.c.author_id) .where( and_( User.is_active True, or_( User.username.ilike(f%{keyword}%), User.email.ilike(f%{keyword}%), ), ) ) .order_by(desc(post_count)) # 按别名排 .limit(size) .offset((page - 1) * size) ) result await session.execute(stmt) rows result.all() return [ {user: row.User, post_count: row.post_count} for row in rows ]这个案例一口气用上了子查询、outerjoin、and_/or_、ilike、coalesce、别名排序、分页基本上日常查询的天花板难度也就这样了。你把它拆开看每一块都是前面讲过的基础零件拼起来的。✅ UPDATE 操作方式一查出对象改属性commit。直观适合小更新。方式二批量更新update()直接发 SQL不经过 ORM 对象。适合批量改状态。from sqlalchemy import update async def deactivate_users(session: AsyncSession, user_ids: list[int]): async with session.begin(): stmt update(User).where(User.id.in_(user_ids)).values(activeFalse) result await session.execute(stmt) return result.rowcount再说个容易被忽略的乐观锁更新。利用版本号字段version修改时带上版本条件防止并发覆盖。stmt ( update(User) .where(User.id uid, User.version old_version) .values(namenew_name, versionold_version 1) ) result await session.execute(stmt) if result.rowcount 0: raise Exception(数据已被修改请重试)✅ DELETE 操作from sqlalchemy import delete async def remove_user(session: AsyncSession, uid: int): async with session.begin(): stmt delete(User).where(User.id uid) await session.execute(stmt) 第四部分事务与异常——别把连接池打爆了在 FastAPI 中我习惯用依赖注入管理会话生命周期这样每个请求都会自动获取、自动关闭。async def get_db(): async with AsyncSessionLocal() as session: try: yield session await session.commit() except Exception: await session.rollback() raise finally: await session.close()关键一条显式事务边界。哪怕你用了依赖注入也建议在业务逻辑里用async with session.begin()包裹写操作这样出错了自动回滚绝对不会出现连接未释放的惨案。有时候你需要设置事务隔离级别特别是报表类查询加with_for_update()或修改 session 连接参数就行异步下完全一致。 第五部分同步异步共存——俩种方案一个原则项目大了难免有遗留模块还在同步新的又想用异步。怎么搞如果在异步路由里调用同步数据库方法可以用asyncio.to_thread把同步操作扔进线程池避免阻塞事件循环。import asyncio async def async_endpoint(): user await asyncio.to_thread(sync_get_user, user_id1) return user反过来在同步路由里调异步方法千万别用 asyncio.run()它会新建事件循环和 FastAPI 当前循环冲突结果就是玄学报错。要么老实用同步要么把整个路由改成 async。归根结底我的建议是新项目从一开始就全异步栈——FastAPI SQLAlchemy 2.0 async asyncpg/aiomysql。即使现在并发不高也能免去未来重构的痛苦。 第六部分常见翻车现场 排查指南1. 异步 refresh 前没 await现象获取不到数据库默认值或自增 ID。原因session.refresh()是协程必须await。2. MissingGreenlet 错误现象异步会话访问懒加载属性直接炸。解决把所有要用的关联全部selectinload或joinedload。别偷懒。3. 连接池耗尽现象突然所有请求卡住日志显示“等待连接超时”。大概率是事务没关闭连接一直占着。马上检查是否有commit/rollback缺失。4. 同步异步引擎混用现象在同一个模块里不小心把异步 session 传给了同步方法。对策严格分层文件名带_sync或_async后缀写清楚。 总结别让选择本身变成负担说到底CRUD 就是那几个固定动作2.0 的 API 也帮我们抹平了很多心智负担。同步还是异步真正要看的只有两件事你的并发模型和团队的肌肉记忆。选定一套栈写清楚规范然后沉下心写业务就好。