openYuanrong 官网官网gitcode仓库仓库使用 yr.wait 限制并发/待处理任务的数量如果发送作业的速率大于处理作业的速率会导致作业积压在作业队列中甚至出现 OOM。yr.wait()允许反压并且可以限制待处理作业的总数从而使作业队列不会无限扩展进而避免 OOM。注意该方法主要用于限制同一时间内允许执行作业的数量。该方法也可以用于限制作业并发的数量但这会损失分发作业的性能所以不建议这样用。openYuanrong 会根据资源的数量和作业需要的资源大小自动分发和调整并发作业的数量。使用示例importyrimporttime# 初始化 Rayyr.init()yr.invokedefheavy_computation_task(i):# 模拟耗时操作例如图像处理或模型推理time.sleep(1)returnfResult from task{i}# --- 配置参数 ---TOTAL_TASKS100MAX_CONCURRENT_TASKS20# 最大并行/在途任务数防止 OOMTIMEOUT10WAIT_NUM1# 存储正在执行的任务句柄 (Object Refs)pending_refs[]results[]print(f开始提交任务限制最大在途任务数为:{MAX_CONCURRENT_TASKS})foriinrange(TOTAL_TASKS):# 【核心逻辑】如果当前正在运行的任务达到了上限iflen(pending_refs)MAX_CONCURRENT_TASKS:# 使用 yr.wait 阻塞直到至少有一个任务完成# timeoutNone 表示无限等待直到有结果返回ready_refs,pending_refsyr.wait(pending_refs,wait_numWAIT_NUM,timeoutTIMEOUT)# 处理已经完成的结果forrefinready_refs:resultyr.get(ref)results.append(result)# print(f完成并清理内存: {result})# 提交新任务task_refheavy_computation_task.invoke(i)pending_refs.append(task_ref)ifi%100:print(f已提交任务{i}当前队列负载:{len(pending_refs)})# --- 收尾工作 ---# 提交完所有任务后等待最后剩下的任务完成print(所有任务已提交正在等待最后剩余的任务...)final_resultsyr.get(pending_refs)results.extend(final_results)print(f全部完成成功处理了{len(results)}个任务。)yr.finalize()