【从0到1构建一个ClaudeAgent】并
有些操作很慢Agent 不能干等着。例如长时间编译/构建make,mvn compile,gradle build或大数据处理hadoop,spark-submit等的一些工作Java实现代码javapublic class BackgroundTasksSystem { // --- 配置 --- private static final Path WORKDIR Paths.get(System.getProperty(user.dir)); private static final Gson gson new GsonBuilder().setPrettyPrinting().create(); // --- 后台任务管理器 --- static class BackgroundManager { // 任务存储 private final MapString, TaskInfo tasks new ConcurrentHashMap(); // 通知队列 private final QueueTaskNotification notificationQueue new ConcurrentLinkedQueue(); // 任务 ID 生成器 private final AtomicInteger taskIdCounter new AtomicInteger(1); // 锁 private final Object lock new Object(); static class TaskInfo { String taskId; String status; // running, completed, timeout, error String result; String command; long startTime; Thread thread; // 关联的执行线程 } static class TaskNotification { String taskId; String status; String command; String result; } /** * 启动后台任务 * 立即返回任务 ID不等待命令完成 */ public String run(String command) { String taskId task_ taskIdCounter.getAndIncrement(); TaskInfo task new TaskInfo(taskId, command); tasks.put(taskId, task); // 创建并启动后台线程 Thread thread new Thread(() - executeTask(task), BackgroundTask- taskId); thread.setDaemon(true); task.thread thread; thread.start(); // 立即返回不阻塞 return String.format(Background task %s started: %s, taskId, command.substring(0, Math.min(command.length(), 80))); } /** * 线程目标执行子进程捕获输出推送结果到队列 */ private void executeTask(TaskInfo task) { String output; String status; try { ProcessBuilder pb new ProcessBuilder(bash, -c, task.command); pb.directory(WORKDIR.toFile()); pb.redirectErrorStream(true); Process process pb.start(); boolean finished process.waitFor(300, TimeUnit.SECONDS); // 5分钟超时 if (!finished) { process.destroy(); output Error: Timeout (300s); status timeout; } else { output new String(process.getInputStream().readAllBytes()).trim(); status completed; } } catch (Exception e) { output Error: e.getMessage(); status error; } // 更新任务状态 task.status status; task.result output.isEmpty() ? (no output) : output.substring(0, Math.min(output.length(), 50000)); // 添加通知到队列 synchronized (lock) { notificationQueue.offer(new TaskNotification( task.taskId, status, task.command.substring(0, Math.min(task.command.length(), 80)), task.result.substring(0, Math.min(task.result.length(), 500)) )); } } /** * 检查任务状态 * 如果指定 taskId检查单个任务否则列出所有任务 */ public String check(String taskId) { if (taskId ! null !taskId.isEmpty()) { TaskInfo task tasks.get(taskId); if (task null) { return Error: Unknown task taskId; } return String.format([%s] %s\n%s, task.status, task.command.substring(0, Math.min(task.command.length(), 60)), task.result ! null ? task.result : (running)); } else { StringBuilder sb new StringBuilder(); for (Map.EntryString, TaskInfo entry : tasks.entrySet()) { TaskInfo task entry.getValue(); sb.append(String.format(%s: [%s] %s\n, task.taskId, task.status, task.command.substring(0, Math.min(task.command.length(), 60)))); } return sb.length() 0 ? sb.toString().trim() : No background tasks.; } } /** * 清空通知队列并返回所有待处理的通知 */ public ListTaskNotification drainNotifications() { synchronized (lock) { ListTaskNotification notifications new ArrayList(); while (!notificationQueue.isEmpty()) { notifications.add(notificationQueue.poll()); } return notifications; } } /** * 获取所有任务 */ public MapString, TaskInfo getAllTasks() { return new HashMap(tasks); } } // 初始化后台管理器 private static final BackgroundManager BG_MANAGER new BackgroundManager(); // --- 工具枚举 --- public enum ToolType { BASH(bash, Run a shell command (blocking).), READ_FILE(read_file, Read file contents.), WRITE_FILE(write_file, Write content to file.), EDIT_FILE(edit_file, Replace exact text in file.), BACKGROUND_RUN(background_run, Run command in background thread. Returns task_id immediately.), // 新增 CHECK_BACKGROUND(check_background, Check background task status. Omit task_id to list all.); // 新增 public final String name; public final String description; ToolType(String name, String description) { this.name name; this.description description; } } // --- 工具处理器映射 --- private static final MapString, ToolExecutor TOOL_HANDLERS new HashMap(); static { // ... 省略基础工具注册 // 后台任务工具 TOOL_HANDLERS.put(ToolType.BACKGROUND_RUN.name, args - { String command (String) args.get(command); return BG_MANAGER.run(command); }); TOOL_HANDLERS.put(ToolType.CHECK_BACKGROUND.name, args - { String taskId (String) args.get(task_id); return BG_MANAGER.check(taskId); }); } // ... 省略相同的工具实现 // --- Agent 主循环集成后台任务通知--- public static void agentLoop(ListMapString, Object messages) { while (true) { try { // 在 LLM 调用前检查后台通知 ListBackgroundManager.TaskNotification notifications BG_MANAGER.drainNotifications(); if (!notifications.isEmpty() !messages.isEmpty()) { StringBuilder notifText new StringBuilder(); notifText.append(background-results\n); for (BackgroundManager.TaskNotification notif : notifications) { notifText.append(String.format([bg:%s] %s: %s\n, notif.taskId, notif.status, notif.result)); } notifText.append(/background-results); messages.add(Map.of( role, user, content, notifText.toString() )); messages.add(Map.of( role, assistant, content, Noted background results. )); // 异步结果注入将后台任务结果插入到对话中 // 结构化格式用XML标签包裹便于LLM解析 } // 显示当前活动任务 MapString, BackgroundManager.TaskInfo activeTasks BG_MANAGER.getAllTasks(); int runningTasks (int) activeTasks.values().stream() .filter(t - running.equals(t.status)) .count(); if (runningTasks 0) { System.out.printf([Active background tasks: %d]\n, runningTasks); } // ... 省略相同的 LLM 调用和工具执行逻辑 } catch (Exception e) { System.err.println(Error in agent loop: e.getMessage()); e.printStackTrace(); return; } } } }这段代码引入了后台任务系统解决了 Agent 在执行长时间任务时的阻塞问题关键洞察Agent 可以在命令执行时继续工作而不是被阻塞。异步任务处理架构核心思想从同步阻塞的任务执行升级为异步非阻塞的并发处理让Agent能够同时处理多个耗时任务实现并行计算能力大幅提升效率和响应性。java// 后台任务管理器 - 异步执行引擎 static class BackgroundManager { // 任务存储 private final MapString, TaskInfo tasks new ConcurrentHashMap(); // 通知队列 private final QueueTaskNotification notificationQueue new ConcurrentLinkedQueue(); // 任务 ID 生成器 private final AtomicInteger taskIdCounter new AtomicInteger(1); // 并发安全使用线程安全集合 // 异步通信通过队列传递任务结果 // 唯一标识自动生成任务ID }解耦执行任务提交和执行分离立即返回控制权并发管理多个后台任务可以同时运行结果异步收集通过队列机制收集完成的任务结果线程安全使用并发集合确保多线程安全任务信息结构设计java// 任务信息实体 static class TaskInfo { String taskId; // 唯一标识 String status; // 状态running, completed, timeout, error String result; // 执行结果 String command; // 执行的命令 long startTime; // 开始时间 Thread thread; // 关联的执行线程 // 完整状态跟踪从启动到完成的全生命周期 // 线程关联可以控制或监控执行线程 // 时间戳支持超时和性能分析 } // 任务通知实体 static class TaskNotification { String taskId; String status; String command; String result; // 轻量传输只包含必要信息 // 结构化易于解析和处理 // 结果截断避免过大的通知消息 }状态驱动明确的任务状态生命周期结果持久任务结果可以多次查询线程管理可以跟踪和控制执行线程事件驱动通过通知机制传递完成事件异步任务启动机制java/** * 启动后台任务 * 立即返回任务 ID不等待命令完成 */ public String run(String command) { String taskId task_ taskIdCounter.getAndIncrement(); TaskInfo task new TaskInfo(taskId, command); tasks.put(taskId, task); // 创建并启动后台线程 Thread thread new Thread(() - executeTask(task), BackgroundTask- taskId); thread.setDaemon(true); // 守护线程不会阻止JVM退出 task.thread thread; thread.start(); // 立即返回不阻塞调用者 return String.format(Background task %s started: %s, taskId, command.substring(0, Math.min(command.length(), 80))); // 异步启动立即返回任务ID不等待命令完成 // 守护线程不会阻止程序正常退出 // 线程命名便于调试和监控 }立即返回不阻塞主线程立即返回控制权守护线程后台任务不会阻止JVM退出资源管理线程自动清理避免内存泄漏友好反馈返回任务ID和简化的命令描述任务执行与结果收集java/** * 线程目标执行子进程捕获输出推送结果到队列 */ private void executeTask(TaskInfo task) { String output; String status; try { ProcessBuilder pb new ProcessBuilder(bash, -c, task.command); pb.directory(WORKDIR.toFile()); pb.redirectErrorStream(true); Process process pb.start(); boolean finished process.waitFor(300, TimeUnit.SECONDS); // 5分钟超时 if (!finished) { process.destroy(); output Error: Timeout (300s); status timeout; } else { output new String(process.getInputStream().readAllBytes()).trim(); status completed; } } catch (Exception e) { output Error: e.getMessage(); status error; } // 更新任务状态 task.status status; task.result output.isEmpty() ? (no output) : output.substring(0, Math.min(output.length(), 50000)); // 添加通知到队列 synchronized (lock) { notificationQueue.offer(new TaskNotification( task.taskId, status, task.command.substring(0, Math.min(task.command.length(), 80)), task.result.substring(0, Math.min(task.result.length(), 500)) )); } }超时保护防止长时间运行的任务阻塞异常安全全面捕获执行异常内存管理截断大结果避免内存溢出事件驱动完成后立即通知主线程智能通知注入机制java// 在 LLM 调用前检查后台通知 ListBackgroundManager.TaskNotification notifications BG_MANAGER.drainNotifications(); if (!notifications.isEmpty() !messages.isEmpty()) { StringBuilder notifText new StringBuilder(); notifText.append(background-results\n); for (BackgroundManager.TaskNotification notif : notifications) { notifText.append(String.format([bg:%s] %s: %s\n, notif.taskId, notif.status, notif.result)); } notifText.append(/background-results); messages.add(Map.of( role, user, content, notifText.toString() )); messages.add(Map.of( role, assistant, content, Noted background results. )); // 自动注入自动将后台结果插入到对话中 // 结构化格式XML标签明确标识内容类型 // 对话完整添加assistant确认保持对话结构 // 时机智能在LLM调用前插入确保LLM能看到最新结果 }自动同步后台结果自动同步到主对话结构化格式便于LLM识别和解析对话集成无缝集成到现有对话流时机优化在决策前注入确保信息及时性工具集成架构java// 后台任务工具集 public enum ToolType { BACKGROUND_RUN(background_run, Run command in background thread. Returns task_id immediately.), CHECK_BACKGROUND(check_background, Check background task status. Omit task_id to list all.); // 异步执行立即返回不阻塞 // 状态查询支持单个和批量查询 // 语义清晰工具名明确表示异步特性 } // 工具处理器映射 TOOL_HANDLERS.put(ToolType.BACKGROUND_RUN.name, args - { String command (String) args.get(command); return BG_MANAGER.run(command); // 委托执行将命令转交给后台管理器 // 立即返回不等待任务完成 }); TOOL_HANDLERS.put(ToolType.CHECK_BACKGROUND.name, args - { String taskId (String) args.get(task_id); return BG_MANAGER.check(taskId); // 灵活查询支持单任务详查和列表概览 });接口统一与同步工具相同的调用方式异步语义工具名明确区分同步/异步灵活查询支持多种查询方式无缝集成与现有工具系统完全兼容架构演进与价值从 ContextCompactSystem 到 BackgroundTasksSystem 的升级维度ContextCompactSystemBackgroundTasksSystem执行模式同步串行异步并行吞吐量一次一个任务并发多个任务响应性阻塞等待立即响应资源利用单线程多线程并发任务类型短任务为主长短任务混合