SQLModel零基础教程(四)- 高级查询、事务、原生SQL
这里写目录标题前言一、阶段学习目标二、高级查询聚合、分组、分页2.1 基础环境模型复用前文用户订单场景2.2 聚合函数计数、求和、平均值、最大最小2.3 group_by分组 having过滤分组2.4 标准分页查询接口通用模板2.5 查询结果四种读取方法区别三、事务控制保证多操作原子性3.1 基础事务规则3.2 基础事务下单同时扣库存模拟多表修改3.3 begin() 自动事务上下文推荐写法3.4 Savepoint 保存点局部回滚不撤销全部事务四、批量操作优化解决循环add慢4.1 批量插入 add_all基础批量4.2 bulk_insert_mappings 无实例批量高性能4.3 批量更新ORM批量不逐条查询五、原生SQL安全执行text防SQL注入5.1 带参数查询返回元组5.2 原生结果映射为SQLModel对象5.3 原生增删改六、综合实战分组统计分页报表 事务七、阶段核心总结半天必掌握八、新手高频避坑指南前言前3篇我们学完单表CRUD、高级字段DTO、一对多/多对多关联查询日常基础增删改查完全够用。但真实项目中还有几类高频刚需场景报表统计求和、计数、平均值、分组统计、筛选分组结果分页列表接口标准分页、总条数统计数据一致性下单扣库存、转账等多操作必须原子执行事务批量导入/批量更新循环add性能极低需要批量API复杂统计SQLORM难以实现时安全执行原生SQL并映射模型。本文为系列第四阶段半天掌握高级统计查询、分页、事务机制、批量操作、原生SQL安全写法所有代码可直接运行适配后台报表、数据同步、金融类强一致性业务。一、阶段学习目标掌握聚合函数func.count/func.sum/func.avggroup_by分组having分组过滤标准分页实现offset/limit 统计总条数彻底理解Session事务commit提交、rollback回滚、保存点savepoint局部回滚批量插入、批量更新优化写法避免循环add性能灾难使用text()安全执行原生SQL参数防注入结果映射SQLModel模型区分all()/one()/one_or_none()/first()四种结果读取方法报表综合实战分组统计分页事务保证数据统一。二、高级查询聚合、分组、分页2.1 基础环境模型复用前文用户订单场景fromsqlmodelimportSQLModel,Field,create_engine,Session,select,textfromsqlalchemyimportfuncfromtypingimportOptional,Listfromdatetimeimportdatetime enginecreate_engine(sqlite:///stage4.db,echoFalse)classOrder(SQLModel,tableTrue):id:Optional[int]Field(defaultNone,primary_keyTrue)user_id:intgoods_name:strprice:floatField(gt0)num:intField(gt1)create_time:datetimeField(default_factorydatetime.utcnow)definit_table():SQLModel.metadata.create_all(bindengine)# 测试填充数据definit_test_data():withSession(engine)assession:data[Order(user_id1,goods_namePython教程,price59.9,num2),Order(user_id1,goods_name机械键盘,price199,num1),Order(user_id2,goods_name鼠标,price49.9,num3),Order(user_id2,goods_name显示器,price899,num1),Order(user_id1,goods_name耳机,price129,num2),]session.add_all(data)session.commit()init_table()init_test_data()2.2 聚合函数计数、求和、平均值、最大最小sqlalchemy.func提供全套统计函数搭配select直接查询数值# 1. 使用上下文管理器创建数据库会话# 优点代码块执行完毕后session 会自动关闭并释放数据库连接无需手动处理异常withSession(engine)assession:# 2. 构建聚合查询语句 (Aggregate Query)stmtselect(# func.count(Order.id): 统计订单总数。使用主键 id 进行 COUNT 操作效率最高# .label(total_order): 为这个聚合列指定一个别名方便后续通过属性名获取结果func.count(Order.id).label(total_order),# func.sum(Order.price * Order.num): 计算总销售额。# 在数据库层面直接进行列的乘法运算并求和避免了将大量数据拉到 Python 内存中计算func.sum(Order.price*Order.num).label(total_amount),# func.avg(Order.price): 计算所有订单的平均单价func.avg(Order.price).label(avg_price),# func.max(Order.price): 获取所有订单中的最高单价func.max(Order.price).label(max_price))# 3. 执行查询并获取单条结果# session.exec(stmt): 将构建好的 SQL 语句发送给数据库执行# .one(): 专门用于获取聚合查询的结果。# 因为聚合函数没有 GROUP BY 时只会返回一行数据使用 .one() 可以安全地提取这一行。# 返回的 res 是一个元组Row对象可以通过前面定义的 label 别名来访问对应的值ressession.exec(stmt).one()# 4. 打印统计结果# 通过 .label() 定义的别名可以像访问对象属性一样直接读取聚合结果print(订单总数,res.total_order)print(总交易额,res.total_amount)print(平均价格,res.avg_price)print(最高价格,res.max_price)2.3 group_by分组 having过滤分组where过滤原始行having过滤分组后的统计结果不可混用聚合函数在where中。withSession(engine)assession:# 按用户分组统计每个用户下单数量、总消费只保留消费300的用户stmtselect(Order.user_id,func.count(Order.id).label(order_cnt),func.sum(Order.price*Order.num).label(user_total)).group_by(Order.user_id).having(func.sum(Order.price*Order.num)300)rowssession.exec(stmt).all()forrowinrows:withSession(engine)assession:# 1. 构建分组聚合查询语句stmtselect(# 查询分组依据的字段用户IDOrder.user_id,# 统计每个用户的下单数量func.count(Order.id).label(order_cnt),# 计算每个用户的总消费金额单价 × 数量 的总和func.sum(Order.price*Order.num).label(user_total)# 2. 按用户ID进行分组# 相当于 SQL 中的 GROUP BY user_id# 执行后后续的聚合函数count, sum将针对每个用户单独计算# 3. 对分组后的结果进行过滤# 相当于 SQL 中的 HAVING SUM(price * num) 300# 注意HAVING 是在分组之后执行的用于过滤聚合计算的结果# 而 WHERE 是在分组之前执行的用于过滤原始行数据。# 这里只保留总消费金额大于 300 的用户记录).group_by(Order.user_id).having(func.sum(Order.price*Order.num)300)# 4. 执行查询并获取所有符合条件的结果# 因为经过了分组结果可能有多条每个符合条件的用户一条所以使用 .all() 获取列表rowssession.exec(stmt).all()# 5. 遍历结果并格式化打印# 每一行 (row) 都包含前面 select 中定义的字段可以直接通过别名访问forrowinrows:print(f用户{row.user_id}订单{row.order_cnt}笔消费{row.user_total})print(f用户{row.user_id}订单{row.order_cnt}笔消费{row.user_total})2.4 标准分页查询接口通用模板分页固定公式offset((page-1)*page_size).limit(page_size)搭配count获取总条数。defget_order_page(page:int1,page_size:int2):# 使用上下文管理器创建数据库会话确保执行完毕后自动释放连接withSession(engine)assession:# 1. 构建分页查询语句stmt(# .order(Order.create_time.desc()): 按照创建时间倒序排列最新的订单排在最前面# .offset((page-1)*page_size): 设置偏移量跳过前面的记录。# 核心分页公式(当前页码 - 1) * 每页条数。# 例如第1页跳过0条第2页跳过2条第3页跳过4条。# .limit(page_size): 限制本次查询最多返回的记录数即每页的条数select(Order).order_by(Order.create_time.desc()).offset((page-1)*page_size).limit(page_size))# 2. 执行查询并获取当前页的数据列表# 返回的是一个包含 Order 对象的列表itemssession.exec(stmt).all()# 3. 查询满足条件的总记录数用于前端计算总页数count_stmtselect(func.count(Order.id))# .scalar(): 专门用于获取聚合查询如 count, sum返回的单个值。# 它会自动提取结果集中的第一行第一列直接返回一个整数而不是一个 Row 对象totalsession.exec(count_stmt).one()# 4. 封装并返回标准的分页数据结构return{page:page,# 当前页码page_size:page_size,# 每页条数total:total,# 总记录数items:items# 当前页的数据列表}# 调用分页接口进行测试# 获取第1页每页2条数据print(get_order_page(page1,page_size2))2.5 查询结果四种读取方法区别方法适用场景报错规则.all()列表数据、分页永远返回列表空列表不报错.first()只取第一条返回第一条/None不抛异常.one()必须有且仅有1条0条/多条都会抛异常.one_or_none()最多一条无数据返回None多条报错.scalar()仅单个数值count/sum直接取出单个值三、事务控制保证多操作原子性3.1 基础事务规则Session默认自动事务修改仅存在缓存必须commit()才持久化任意操作异常执行rollback()撤销本次所有修改with Session.begin()自动管理提交回滚代码更简洁。3.2 基础事务下单同时扣库存模拟多表修改defcreate_order_with_stock():try:withSession(engine)assession:# 1. 新增订单new_orderOrder(user_id3,goods_nameU盘,price39.9,num1)session.add(new_order)# 2. 模拟扣减库存此处省略库存表逻辑# 手动抛出异常测试回滚# raise Exception(库存不足)session.commit()print(事务提交成功)exceptExceptionase:session.rollback()print(事务回滚所有操作撤销,e)create_order_with_stock()3.3 begin() 自动事务上下文推荐写法with session.begin()块结束无异常自动commit出现异常自动rollback无需手动写回滚代码withSession(engine)assession:withsession.begin():session.add(Order(user_id3,goods_name耳机,price89,num1))# 出错自动回滚不用手动rollback# 1 / 03.4 Savepoint 保存点局部回滚不撤销全部事务大循环批量导入时单条失败只回滚当前条目不撤销整批数据。withSession(engine)assession:withsession.begin():# 第一条正常插入session.add(Order(user_id4,goods_name笔记本,price4999,num1))# 创建保存点# 1. 开启一个嵌套事务保存点# 在关系型数据库中这相当于执行了 SAVEPOINT 命令。# 它允许你在当前的主事务中开启一个“子事务”或“检查点”spsession.begin_nested()try:# 异常数据session.add(Order(user_id4,goods_name平板,price-10,num1))exceptException:# 仅回滚保存点内操作第一条保留sp.rollback()四、批量操作优化解决循环add慢4.1 批量插入 add_all基础批量多条数据统一加入会话单次提交远优于循环add多次commitbatch[Order(user_id5,goods_name充电宝,price69,num1),Order(user_id5,goods_name支架,price19,num2)]withSession(engine)assession:session.add_all(batch)session.commit()4.2 bulk_insert_mappings 无实例批量高性能无需创建SQLModel实例直接传字典列表百万级数据推荐fromsqlalchemy.ormimportbulk_insert_mappings data_list[{user_id:6,goods_name:风扇,price:29.9,num:1},{user_id:6,goods_name:台灯,price:45,num:1}]withSession(engine)assession:bulk_insert_mappings(session,Order,data_list)session.commit()4.3 批量更新ORM批量不逐条查询fromsqlmodelimportupdatewithSession(engine)assession:stmtupdate(Order).where(Order.user_id1).values(priceOrder.price*0.9)session.exec(stmt)session.commit()五、原生SQL安全执行text防SQL注入业务复杂统计ORM难以实现时使用text()必须params传参拼接禁止字符串格式化拼接SQL杜绝注入漏洞。5.1 带参数查询返回元组withSession(engine)assession:sqltext(SELECT * FROM order WHERE user_id :uid AND price :min_price)ressession.exec(sql,params{uid:1,min_price:50}).all()forrowinres:print(row.goods_name,row.price)5.2 原生结果映射为SQLModel对象fromsqlalchemyimportselectassa_selectwithSession(engine)assession:# 1. 编写原生 SQL 语句# text(): 将普通的字符串转换为 SQLAlchemy 的文本 SQL 对象使其能被 ORM 识别。# :uid: 这是一个参数化占位符用于防止 SQL 注入攻击后续会通过 params() 传入实际值。sqltext(SELECT id, user_id, goods_name FROM order WHERE user_id:uid)# 2. 将原生 SQL 与 ORM 模型绑定# sa_select(Order): 创建一个以 Order 模型为目标的 select 语句sa_select 是 sqlalchemy.select 的别名。# .from_statement(sql): 告诉 ORM“不要自动生成 SQL而是使用我提供的原生 SQL 语句# 但请将查询出来的结果按照 Order 模型的字段映射成 ORM 对象”。# .params(uid1): 为原生 SQL 中的占位符 :uid 绑定具体的参数值 1。stmtsa_select(Order).from_statement(sql).params(uid1)# 3. 执行查询并获取结果列表# 返回的 order_list 是一个包含 Order ORM 对象的列表而不是普通的字典或元组order_listsession.exec(stmt).all()# 4. 将 ORM 对象转换为字典格式并打印# model_dump(): Pydantic V2 的方法将 ORM 对象序列化为 Python 字典。# 这样处理后数据就可以非常方便地转换为 JSON 格式返回给前端了。print([itemforiteminorder_list])5.3 原生增删改withSession(engine)assession:insert_sqltext(INSERT INTO order(user_id,goods_name,price,num,create_time) VALUES(:u,:g,:p,:n,:ct))session.exec(insert_sql,params{u:7,g:数据线,p:19.9,n:1,ct:datetime.now()})session.commit()六、综合实战分组统计分页报表 事务整合聚合、分页、事务、原生SQL完整业务流程deforder_report():# 事务包裹报表同步统计数据withSession(engine)assession:withsession.begin():# 1. 批量新增测试订单batch[Order(user_id8,goods_name键盘膜,price15,num1)]session.add_all(batch)# 2. 分组统计各用户消费group_stmtselect(Order.user_id,func.sum(Order.price*Order.num).label(total)).group_by(Order.user_id)group_datasession.exec(group_stmt).all()# 3. 分页订单列表page_dataget_order_page(page1,page_size3)# 4. 原生SQL查询高价订单high_sqltext(SELECT goods_name, price FROM order WHERE price200)high_orderssession.exec(high_sql).all()return{group_stat:group_data,page:page_data,high_price:high_orders}print(order_report())七、阶段核心总结半天必掌握聚合统计func系列函数配合group_by分组having过滤分组结果分页模板offset((page-1)*page_size)limitcount统计总条数事务核心commit持久化、rollback回滚session.begin()自动事务begin_nested()保存点局部回滚批量优化add_all、bulk_insert_mappings、update()批量更新避免循环操作原生SQL规范统一text()params传参禁止字符串拼接防注入支持映射模型结果读取分清all/first/one/one_or_none/scalar使用场景。八、新手高频避坑指南❌ 在where中使用聚合函数必须改用having❌ 分页不写order_by数据库返回顺序不稳定❌ 多步业务操作不包事务中途异常导致数据不一致❌ 原生SQL用f-string拼接参数存在SQL注入风险❌ 循环执行addcommit大批量数据性能极差❌ 事务异常忘记rollback会话残留脏数据✅ 报表统计优先ORM聚合极复杂SQL再使用text原生语句。