089、aiohttp 异步 HTTP:协程客户端、并发请求、WebSocket 通信
089、aiohttp 异步 HTTP协程客户端、并发请求、WebSocket 通信从一次线上事故说起去年冬天我负责的一个数据采集服务突然在凌晨三点报警——任务队列堆积响应延迟从200ms飙升到15秒。查日志发现罪魁祸首是requests库的同步调用阻塞了事件循环。当时代码里混用了asyncio和同步HTTP请求每个请求都要等I/O完成才能继续并发量一上来直接拖垮整个进程。这个教训让我彻底转向aiohttp。如果你还在用requests配合asyncio做异步赶紧停手——requests的阻塞会破坏整个事件循环的调度。aiohttp才是asyncio生态里真正的HTTP客户端和服务端解决方案。协程客户端从零开始先看一个最基础的GET请求。别急着复制注意看异常处理那块我踩过坑。importasyncioimportaiohttpasyncdeffetch_url(url):# 这里必须用async with管理session否则连接池泄漏asyncwithaiohttp.ClientSession()assession:try:# timeout参数别漏默认没有超时生产环境会挂死asyncwithsession.get(url,timeoutaiohttp.ClientTimeout(total10))asresponse:# 注意response.text()也是协程别漏了awaitreturnawaitresponse.text()exceptaiohttp.ClientErrorase:# 别这样写print(e) 然后return None会丢失上下文raiseRuntimeError(f请求失败:{url})fromeasyncdefmain():htmlawaitfetch_url(https://httpbin.org/get)print(html[:200])asyncio.run(main())这里有个容易忽略的细节ClientSession内部维护连接池每次async with session.get()都会从池中取连接。如果你在循环里反复创建session等于每次新建连接池性能还不如同步。正确做法是全局复用session。并发请求控制节奏的艺术并发请求不是简单地把所有任务扔给asyncio.gather。我见过最蠢的写法是同时发起1000个请求结果被对方服务器封IP。控制并发量是必修课。方案一信号量控制importasyncioimportaiohttp semaphoreasyncio.Semaphore(10)# 最多10个并发asyncdefbounded_fetch(session,url):asyncwithsemaphore:asyncwithsession.get(url)asresponse:returnawaitresponse.text()asyncdefmain():urls[fhttps://httpbin.org/delay/{i}foriinrange(1,20)]asyncwithaiohttp.ClientSession()assession:tasks[bounded_fetch(session,url)forurlinurls]resultsawaitasyncio.gather(*tasks,return_exceptionsTrue)# return_exceptionsTrue很重要否则一个失败全部取消forresultinresults:ifisinstance(result,Exception):print(f任务失败:{result})方案二使用asyncio.Queue做生产者消费者当请求数量不确定或者需要动态添加时队列模式更灵活。asyncdefworker(session,queue,results):whileTrue:urlawaitqueue.get()ifurlisNone:# 终止信号breaktry:asyncwithsession.get(url)asresponse:dataawaitresponse.text()results.append(data)exceptExceptionase:results.append(e)finally:queue.task_done()asyncdefmain():queueasyncio.Queue()results[]urls[fhttps://httpbin.org/get?n{i}foriinrange(100)]asyncwithaiohttp.ClientSession()assession:# 启动5个workerworkers[asyncio.create_task(worker(session,queue,results))for_inrange(5)]# 放入任务forurlinurls:awaitqueue.put(url)# 等待所有任务完成awaitqueue.join()# 发送终止信号for_inworkers:awaitqueue.put(None)awaitasyncio.gather(*workers)WebSocket通信实时性的正确姿势aiohttp的WebSocket支持比websockets库更轻量但坑也不少。最典型的是心跳检测——如果不做连接会在几分钟后静默断开。asyncdefwebsocket_client():asyncwithaiohttp.ClientSession()assession:asyncwithsession.ws_connect(wss://echo.websocket.org)asws:# 这里踩过坑必须手动处理ping/pongasyncdefheartbeat():whileTrue:awaitasyncio.sleep(30)try:awaitws.ping()exceptException:breakheartbeat_taskasyncio.create_task(heartbeat())try:# 发送消息awaitws.send_str(Hello, WebSocket!)# 接收消息注意msg的类型判断asyncformsginws:ifmsg.typeaiohttp.WSMsgType.TEXT:print(f收到:{msg.data})elifmsg.typeaiohttp.WSMsgType.CLOSED:breakelifmsg.typeaiohttp.WSMsgType.ERROR:breakfinally:heartbeat_task.cancel()awaitws.close()WebSocket的异常处理比HTTP复杂。连接断开、超时、协议错误都可能发生。我的经验是在ws_connect时设置receive_timeout和heartbeat参数而不是手动实现心跳。# 更稳健的写法asyncwithsession.ws_connect(wss://example.com/ws,heartbeat30,# 自动发送pingreceive_timeout60,# 接收超时max_msg_size1024*1024# 限制消息大小防止内存溢出)asws:asyncformsginws:# 处理消息调试技巧那些年我踩过的坑连接池耗尽默认连接池大小是100如果并发超过这个数新请求会排队。用ClientSession(connectoraiohttp.TCPConnector(limit50))调整。SSL证书验证内网环境经常遇到自签名证书别直接设置verify_sslFalse应该用connector aiohttp.TCPConnector(sslFalse)这样只影响当前连接器。Cookie持久化ClientSession默认不保存cookie需要手动设置cookie_jaraiohttp.CookieJar()。重试机制aiohttp没有内置重试需要自己实现。我通常用tenacity库配合但注意重试时不要重复创建session。fromtenacityimportretry,stop_after_attempt,wait_exponentialretry(stopstop_after_attempt(3),waitwait_exponential(multiplier1,min2,max10))asyncdeffetch_with_retry(session,url):asyncwithsession.get(url)asresponse:response.raise_for_status()returnawaitresponse.text()个人经验建议别把aiohttp当成万能工具。如果你的应用只是偶尔发几个HTTP请求用requests更简单。aiohttp的真正价值在于高并发场景——比如爬虫、实时数据流、微服务网关。生产环境中一定要给ClientSession设置timeout和connector参数。默认配置只适合开发环境。另外建议把session作为应用级别的单例通过依赖注入传递而不是在每个函数里创建。最后测试异步代码时用pytest-asyncio别用unittest。异步调试比同步复杂得多善用asyncio.run()的debug模式设置PYTHONASYNCIODEBUG1环境变量它能帮你发现未等待的协程和阻塞调用。记住异步不是魔法它只是让I/O等待时不阻塞其他任务。如果你的代码里还有time.sleep()赶紧换成asyncio.sleep()。