什么是 A2A 协议A2AAgent-to-Agent是一种智能体通信协议基于 JSON-RPC 2.0 格式通过统一的端点路由所有方法内置鉴权和会话管理机制。在前面的章节中我们配置了智能体的 URL、会话维持方式和认证信息。接下来我们将通过代码实现可以与之通信的 A2A API 服务完整演示鉴权、会话分配和消息处理三个核心流程。搭建 A2A API 服务下面我们一步步搭建一个 A2A API 服务完整演示鉴权、会话分配和消息处理三个核心流程。第一步创建项目并安装依赖mkdira2a-democda2a-demo pipinstallfastapi uvicorn a2a-sdka2a-sdk是 Google 官方提供的 A2A Python SDK包含TaskState、TaskStatus、Message等核心类型保证响应数据结构与 A2A 规范一致。第二步保存 API 密钥和会话数据创建main.py先定义 API 密钥和会话存储。这一步相当于搭建服务的基础设施。importuuidfromfastapiimportFastAPI,Request,Headerfromfastapi.responsesimportJSONResponse appFastAPI(titleA2A Demo)# 演示用 API 密钥生产环境应写入配置文件或数据库API_KEYsk-test-api-key# 会话存储生产环境应使用 Redissessions:dict[str,dict]{}代码说明API_KEY服务端预设的 API 密钥客户端通过Authorization: Bearer头部传递sessions存储已初始化的会话key 为agentSessionIdvalue 为会话信息第三步实现鉴权验证鉴权是 A2A 协议的第一道关卡。客户端在请求头的Authorization字段中携带 Bearer Token服务端需要验证这个 Token 是否合法。defverify_auth(request:Request)-bool:authrequest.headers.get(Authorization,)returnauthfBearer{API_KEY}代码说明从请求 Header 中获取Authorization字段判断值是否为Bearer {预设密钥}匹配则通过否则拒绝第四步创建统一消息入口A2A 协议的所有方法都通过POST /agent/message端点处理。这里我们编写入口函数串联整个请求处理流程。app.post(/agent/message)asyncdefhandle_message(request:Request,agent_session_id:str|NoneHeader(None,aliasagent-session-id),):# 1. 鉴权ifnotverify_auth(request):returnJSONResponse(status_code401,content{jsonrpc:2.0,id:None,error:{code:401,message:Invalid auth}},)bodyawaitrequest.json()methodbody.get(method)req_idbody.get(id)# 2. 分配 Sessionifmethodinitialize:returnawaithandle_initialize(req_id)# 3. 验证 Sessioninitialize 之外的请求都需要ifnotagent_session_idoragent_session_idnotinsessions:returnJSONResponse(status_code401,content{jsonrpc:2.0,id:req_id,error:{code:401,message:Invalid session}},)# 4. 处理消息ifmethodmessage/stream:returnawaithandle_message_stream(body,req_id)returnJSONResponse(status_code400,content{jsonrpc:2.0,id:req_id,error:{code:-32601,message:fUnknown method:{method}}},)代码说明agent_session_id从 Header 中提取initialize方法不需要此参数通过body.get(method)判断请求类型路由到不同的处理方法所有响应遵循 JSON-RPC 2.0 格式第五步实现会话分配当鸿蒙智能体首次连接时会调用initialize方法获取会话 ID。服务端生成一个唯一 ID 并返回后续请求都需要携带这个 ID。asyncdefhandle_initialize(req_id:int|str|None):session_iduuid.uuid4().hexsessions[session_id]{status:active}return{jsonrpc:2.0,id:req_id,result:{agentSessionId:session_id,agentSessionTtl:604800,# 会话有效期7 天},}代码说明uuid.uuid4().hex生成 32 位随机字符串作为会话 IDagentSessionTtl告知客户端该会话的有效期秒客户端需要在过期前重新初始化会话信息存入sessions字典供后续请求验证第六步实现消息处理会话建立后鸿蒙智能体会发送message/stream请求将用户输入的消息体传给我们。我们从请求中提取用户文本构造 A2A 规范的响应返回。froma2a.typesimportTaskState,TaskStatus,Message,TextPartasyncdefhandle_message_stream(body:dict,req_id:int|str|None):# 从请求中提取用户输入paramsbody.get(params,{})task_idparams.get(id)user_text(params.get(message,{}).get(parts,[{}])[0].get(text,))# 使用 a2a-sdk 构建消息messageMessage(roleagent,parts[TextPart(textf你说了{user_text})],)return{jsonrpc:2.0,id:req_id,result:{taskId:task_id,kind:status-update,final:True,status:TaskStatus(stateTaskState.completed,messagemessage).model_dump(),},}代码说明从params.message.parts[0].text路径提取用户的文本输入TaskState.completedSDK 提供的任务状态枚举表示处理完成TaskStatusSDK 提供的状态模型包含state、message、timestamp字段MessageTextPartSDK 提供的消息模型roleagent表示由智能体回复完整代码将以上所有步骤组合在一起就是完整的main.pyimportuuidfromfastapiimportFastAPI,Request,Headerfromfastapi.responsesimportJSONResponsefroma2a.typesimportTaskState,TaskStatus,Message,TextPart appFastAPI(titleA2A Demo)API_KEYsk-test-api-keysessions:dict[str,dict]{}defverify_auth(request:Request)-bool:authrequest.headers.get(Authorization,)returnauthfBearer{API_KEY}asyncdefhandle_initialize(req_id:int|str|None):session_iduuid.uuid4().hexsessions[session_id]{status:active}return{jsonrpc:2.0,id:req_id,result:{agentSessionId:session_id,agentSessionTtl:604800,},}asyncdefhandle_message_stream(body:dict,req_id:int|str|None):paramsbody.get(params,{})user_text(params.get(message,{}).get(parts,[{}])[0].get(text,))messageMessage(roleagent,parts[TextPart(textf你说了{user_text})],)return{jsonrpc:2.0,id:req_id,result:{taskId:params.get(id),kind:status-update,final:True,status:TaskStatus(stateTaskState.completed,messagemessage).model_dump(),},}app.post(/agent/message)asyncdefhandle_message(request:Request,agent_session_id:str|NoneHeader(None,aliasagent-session-id),):ifnotverify_auth(request):returnJSONResponse(status_code401,content{jsonrpc:2.0,id:None,error:{code:401,message:Invalid auth}},)bodyawaitrequest.json()methodbody.get(method)req_idbody.get(id)ifmethodinitialize:returnawaithandle_initialize(req_id)ifnotagent_session_idoragent_session_idnotinsessions:returnJSONResponse(status_code401,content{jsonrpc:2.0,id:req_id,error:{code:401,message:Invalid session}},)ifmethodmessage/stream:returnawaithandle_message_stream(body,req_id)returnJSONResponse(status_code400,content{jsonrpc:2.0,id:req_id,error:{code:-32601,message:fUnknown method:{method}}},)运行测试启动服务python main.py打开新终端依次执行以下命令验证三个核心流程1. 不携带密钥 → 鉴权失败curl-XPOST http://localhost:8080/agent/message\-HContent-Type: application/json\-d{jsonrpc:2.0,id:1,method:initialize}预期返回 401提示Invalid auth。2. 携带密钥调用 initialize → 获取会话 IDcurl-XPOST http://localhost:8080/agent/message\-HContent-Type: application/json\-HAuthorization: Bearer sk-test-api-key\-d{jsonrpc:2.0,id:1,method:initialize}预期返回agentSessionId和agentSessionTtl。3. 携带 Session ID 发送消息 → 得到回复SESSION_ID替换为上面返回的 agentSessionIdcurl-XPOST http://localhost:8080/agent/message\-HContent-Type: application/json\-Hagent-session-id:$SESSION_ID\-HAuthorization: Bearer sk-test-api-key\-d{ jsonrpc: 2.0, id: 2, method: message/stream, params: { id: task-001, sessionId: session-001, message: { role: user, parts: [{kind: text, text: 你好}] } } }预期返回state: completed消息内容为你说了你好。测试运行截图总结本文从零开始分六步搭建了一个完整的 A2A API 服务。每个步骤对应 A2A 协议的一个核心环节步骤对应代码说明鉴权verify_auth()验证Authorization: BearerToken消息入口handle_message()统一端点路由分发会话分配handle_initialize()生成并返回agentSessionId消息处理handle_message_stream()解析用户输入返回处理结果使用a2a-sdk提供的TaskState、TaskStatus、Message、TextPart等类型构建响应保证了数据结构与 A2A 规范的一致性。下一篇文章将详细介绍message/stream的 SSE 流式输出和tasks/cancel等进阶方法。