openYuanrong 官网官网gitcode仓库仓库使用有状态函数构造树状作业图这种模式将“业务逻辑”与“任务编排”解耦构建高效的分布式系统。核心架构逻辑在这种模式下Driver 驱动程序 就像是公司的 CEO他只对接几位 Supervisor 主管。每位主管带一个 Workers 员工 团队。Driver负责高层决策和初始化 Supervisor。Supervisor负责创建 Worker、分配任务、监控状态、以及在 Worker 崩溃时进行重启Handle failures。Worker执行具体的重体力活。如果 Supervisor 被销毁那么由于引用计数的在维系有状态函数的生命周期它管理的 Workers 也将被销毁。状态函数可以嵌套多层进而构造一个树状结构。以数据训练为例构建双层结构第一层启动多个有状态函数。每个有状态函数负责一组特定的超参数例如学习率 (0.01)(0.001)。第二层每个有状态函数内部再创建一组 Worker 有状态函数。这些 Worker 共享同一组超参数但各自处理数据的不同切片DataShards并通过 all-reduce 或参数服务器同步梯度。资源管理建议在这个模式中资源计算变得复杂了。如果你有 32 个 CPU如果每个 Supervisor 占 1 个 CPU它创建 3 个各占 1 CPU 的 Worker。那么一个试验总共占用 4 个 CPU。你最多只能同时运行 (32 / 4 8) 个超参数试验。务必给 Supervisor 设置 num_cpus1 或更小否则如果你有大量超参数所有 CPU 都会被 Supervisor占满导致 Worker 无法启动发生资源死锁。使用示例importyrimporttime# 创建有状态函数类实例并设置其运行所需资源1核CPUoptyr.InvokeOptions(cpu1000)# --- 2. 底层执行实际训练的 Worker ---yr.instance(invoke_optionsopt)classTrainingWorker:deftrain(self,shard_id,hyperparams):# 模拟训练过程lrhyperparams[lr]print(fWorker{shard_id}正在使用 lr{lr}训练数据分片...)time.sleep(2)returnfLoss:{0.1/lr}# --- 1. 中层负责编排的 Supervisor ---yr.instance(invoke_optionsopt)classTrainingSupervisor:def__init__(self,hyperparams):self.hyperparamshyperparams self.workers[]defstart_training(self,num_workers):# 动态创建属于自己的 Worker 团队self.workers[TrainingWorker.invoke()foriinrange(num_workers)]# 调度所有 Worker 并行训练数据分片resultsyr.get([w.train.invoke(i,self.hyperparams)fori,winenumerate(self.workers)])returnf超参数{self.hyperparams}试验完成结果:{results}# --- 主控端 ---yr.init()# 并行启动两个不同超参数的试验两个 Supervisorconfigs[{lr:0.01},{lr:0.001}]supervisors[TrainingSupervisor.invoke(cfg)forcfginconfigs]# 所有的试验并行运行final_reportsyr.get([s.start_training.invoke(num_workers2)forsinsupervisors])forreportinfinal_reports:print(report)yr.finalize()