没有调度器的协程不是好协程——零基础深入浅出 C++20 协程
选取合适的 demo 是头等大事* 以协程为目标涉及到的新语法会简单说明不涉及的不旁征博引很多新语法都是有了某种需求才创建的理解这种需求本身比硬学语法规则更为重要* 若语法的原理非常简单也会简单展开讲讲有利于透过现象看本质用起来更得心应手上一篇文章里不光探讨了协程的本质还说明了一系列 C20 协程概念* 协程体* 协程状态* 承诺对象* 返回对象* 协程句柄及它们之间的关系并简单说明了接入 C20 协程时用户需要实现的类型、接口、及其含义。如果没有这些内容铺垫看本文时会有很多地方将会难以理解还没看过的小伙伴墙裂建议先看那篇。工具还是之前介绍过的 C Insights 和 Compile Explorer也在上一篇中介绍过了这里不再赘述。协程调度器话不多说直接上 demo#include coroutine #include iostream #include queue #include functional #include thread class SingleThreadScheduler { public: void schedule(std::functionvoid() task) { tasks.push(std::move(task)); } void run() { while (!tasks.empty()) { auto task tasks.front(); tasks.pop(); task(); } } private: std::queuestd::functionvoid() tasks; }; struct AsyncTask { struct promise_type { AsyncTask get_return_object() { return AsyncTask(std::coroutine_handlepromise_type::from_promise(*this)); } std::suspend_never initial_suspend() { return {}; } std::suspend_always final_suspend() noexcept { return {}; } void return_void() {} void unhandled_exception() { std::terminate(); } }; std::coroutine_handlepromise_type handle; explicit AsyncTask(std::coroutine_handlepromise_type h) : handle(h) {} ~AsyncTask() { if (handle) handle.destroy(); } }; struct ScheduleAwaiter { SingleThreadScheduler* scheduler; bool await_ready() const { return false; } void await_suspend(std::coroutine_handle h) { scheduler-schedule([h] { h.resume(); }); } void await_resume() {} }; AsyncTask demo_coroutine(SingleThreadScheduler scheduler, int id) { std::cout Task id started on thread: std::this_thread::get_id() std::endl; co_await ScheduleAwaiter{scheduler}; std::cout Task id resumed on thread: std::this_thread::get_id() std::endl; co_await ScheduleAwaiter{scheduler}; std::cout Task id finish on thread: std::this_thread::get_id() std::endl; } int main() { SingleThreadScheduler scheduler; auto task1 demo_coroutine(scheduler, 1); auto task2 demo_coroutine(scheduler, 2); auto task3 demo_coroutine(scheduler, 3); std::cout init done std::endl; scheduler.run(); }这个例子演示了拥有三个协程任务的单线程协程调度器有如下输出Task 1 started on thread: 128258074408768 Task 2 started on thread: 128258074408768 Task 3 started on thread: 128258074408768 init done Task 1 resumed on thread: 128258074408768 Task 2 resumed on thread: 128258074408768 Task 3 resumed on thread: 128258074408768 Task 1 finish on thread: 128258074408768 Task 2 finish on thread: 128258074408768 Task 3 finish on thread: 128258074408768用户只需要调用SingleThreadScheduler::run方法就可以源源不断的驱动注册在其上的协程运行了demo 比较长下面分段看下。#include coroutine #include iostream #include queue #include functional #include thread调度器类型schedule 方法注册协程run 会阻塞当前线程、不停的运行其上的协程协程 resume 方法被包裹在 std::function 中放置在先进先出的队列里保证执行的先后顺序class SingleThreadScheduler { public: void schedule(std::functionvoid() task) { tasks.push(std::move(task)); } void run() { while (!tasks.empty()) { auto task tasks.front(); tasks.pop(); task(); } } private: std::queuestd::functionvoid() tasks; };协程返回对象的定义与之前大体一样包含了承诺对象与协程句柄承诺对象主要的变化是1) initial_suspend 不再挂起协程; 2) 增加了 return_void 接口; 3) 减少了 yield_value 接口;struct AsyncTask { struct promise_type { AsyncTask get_return_object() { return AsyncTask(std::coroutine_handlepromise_type::from_promise(*this)); } std::suspend_never initial_suspend() { return {}; } std::suspend_always final_suspend() noexcept { return {}; } void return_void() {} void unhandled_exception() { std::terminate(); } }; std::coroutine_handlepromise_type handle; explicit AsyncTask(std::coroutine_handlepromise_type h) : handle(h) {} ~AsyncTask() { if (handle) handle.destroy(); } };专用的等待对象主要实现了 await_suspend 方法以便在协程挂起时、向调度器注册协程 resume 方法。增加这个等待对象一来可以挂起协程二来方便获取协程句柄及其 resume 方法struct ScheduleAwaiter { SingleThreadScheduler* scheduler; bool await_ready() const { return false; } void await_suspend(std::coroutine_handle h) { scheduler-schedule([h] { h.resume(); }); } void await_resume() {} };协程体接收调度器、返回返回对象内部 co_await 等待两次异步事件会产生两次中断每次中断前将 resume 注册到调度器以便之后唤醒时继续执行直到协程结束AsyncTask demo_coroutine(SingleThreadScheduler scheduler, int id) { std::cout Task id started on thread: std::this_thread::get_id() std::endl; co_await ScheduleAwaiter{scheduler}; std::cout Task id resumed on thread: std::this_thread::get_id() std::endl; co_await ScheduleAwaiter{scheduler}; std::cout Task id finish on thread: std::this_thread::get_id() std::endl; }程序入口初始化调度器与三个协程任务最后 run 搞定一切int main() { SingleThreadScheduler scheduler; auto task1 demo_coroutine(scheduler, 1); auto task2 demo_coroutine(scheduler, 2); auto task3 demo_coroutine(scheduler, 3); std::cout init done std::endl; scheduler.run(); }这里完善一条规则* 若协程体中有明确的 co_yield则承诺对象必需实现 yield_value 接口* 若协程体中有明确的 co_return xxx则承诺对象必需实现 return_value 接口* 若协程体中有明确的 co_return 或没有任何 co_return则承诺对象至少需要实现 return_void 接口。相比之前的例子没有显式的 co_yield 和 co_return这里承诺对象只需要实现 return_void 即可规范上说没实现的话可能导致未定义行为实测 clang 去掉没引发崩溃不过最好还是带上。老规矩下面有请 C Insights 上场看看编译器底层做的工作与之前相比有何差异查看代码内容比较多只捡关键的看下struct __demo_coroutineFrame { void (*resume_fn)(__demo_coroutineFrame *); void (*destroy_fn)(__demo_coroutineFrame *); std::__coroutine_traits_implAsyncTask::promise_type __promise; int __suspend_index; bool __initial_await_suspend_called; SingleThreadScheduler scheduler; int id; std::suspend_never __suspend_52_11; // initial_suspend ScheduleAwaiter __suspend_56_14; // 第一个 co_await ScheduleAwaiter __suspend_61_14; // 第二个 co_await std::suspend_always __suspend_52_11_1; // final_suspend };协程状态基本结构与之前一致除了返回类型、参数、栈变量外等待对象的数量与类型也发生了变更看起来编译器根据返回值类型推导直接得到了成员类型 (std::suspend_never、SchedulerAwaiter、suspend_always等)。下面进入协程的 resume 方法看看它是整个协程的核心/* This function invoked by coroutine_handle::resume() */ void __demo_coroutineResume(__demo_coroutineFrame * __f) { try {熟悉的 duff device 上场/* Create a switch to get to the correct resume point */ switch(__f-__suspend_index) { case 0: break; case 1: goto __resume_demo_coroutine_1; case 2: goto __resume_demo_coroutine_2; case 3: goto __resume_demo_coroutine_3; case 4: goto __resume_demo_coroutine_4; }promise_type::initial_suspend 返回 suspend_never 导致这里不挂起协程直接略过这个条件继续运行这也是 main 中 init done 输出位于 Task N start on thread 输出之后的原因在构建并返回返回对象前就会向下执行到第一个 co_await/* co_await insights.cpp:52 */ __f-__suspend_52_11 __f-__promise.initial_suspend(); if(!__f-__suspend_52_11.await_ready()) { __f-__suspend_52_11.await_suspend(std::coroutine_handleAsyncTask::promise_type::from_address(static_castvoid *(__f)).operator std::coroutine_handlevoid()); __f-__suspend_index 1; __f-__initial_await_suspend_called true; return; } __resume_demo_coroutine_1: __f-__suspend_52_11.await_resume(); std::operator(std::operator(std::operator(std::cout, Task ).operator(__f-id), started on thread: ), std::this_thread::get_id()).operator(std::endl);第一个 co_awaitScheduleAwaiter 会挂起协程挂起前调用的 ScheduleAwaiter::await_suspend 将 resume 添加到调度器队列以便下次唤醒/* co_await insights.cpp:56 */ __f-__suspend_56_14 ScheduleAwaiter{__f-scheduler}; if(!__f-__suspend_56_14.await_ready()) { __f-__suspend_56_14.await_suspend(std::coroutine_handleAsyncTask::promise_type::from_address(static_castvoid *(__f)).operator std::coroutine_handlevoid()); __f-__suspend_index 2; return; }再次被调度器调度到时根据状态值与 switch-case 直接跳转到这里执行。由于调度器内部使用先进先出队列因此三个协程任务是严格按顺序执行的__resume_demo_coroutine_2: __f-__suspend_56_14.await_resume(); std::operator(std::operator(std::operator(std::cout, Task ).operator(__f-id), resumed on thread: ), std::this_thread::get_id()).operator(std::endl);第二个 co_await如法炮制/* co_await insights.cpp:61 */ __f-__suspend_61_14 ScheduleAwaiter{__f-scheduler}; if(!__f-__suspend_61_14.await_ready()) { __f-__suspend_61_14.await_suspend(std::coroutine_handleAsyncTask::promise_type::from_address(static_castvoid *(__f)).operator std::coroutine_handlevoid()); __f-__suspend_index 3; return; } __resume_demo_coroutine_3: __f-__suspend_61_14.await_resume(); std::operator(std::operator(std::operator(std::cout, Task ).operator(__f-id), finish on thread: ), std::this_thread::get_id()).operator(std::endl);协程退出前没有 co_yield 或 co_return xxx 显示调用则默认调用 co_return 无参版本对应的就是 return_void 啦如果有未捕获的异常promise_type::unhandle_exception 将会被调用进而退出整个进程/* co_return insights.cpp:52 */ __f-__promise.return_void()/* implicit */; goto __final_suspend; } catch(...) { if(!__f-__initial_await_suspend_called) { throw ; } __f-__promise.unhandled_exception(); }协程继续运行promise_type::final_suspend 返回 suspend_always 会导致协程挂起配合返回对象的析构函数可以销毁协程__final_suspend: /* co_await insights.cpp:52 */ __f-__suspend_52_11_1 __f-__promise.final_suspend(); if(!__f-__suspend_52_11_1.await_ready()) { __f-__suspend_52_11_1.await_suspend(std::coroutine_handleAsyncTask::promise_type::from_address(static_castvoid *(__f)).operator std::coroutine_handlevoid()); __f-__suspend_index 4; return; }就不会走到这里协程体的自动销毁逻辑啰__resume_demo_coroutine_4: __f-destroy_fn(__f); }有上一篇文章的铺垫看起来没什么尿点下面来一张图总览下为了便于理解只画了一个协程任务的执行顺序跟着箭头方向和标号就能梳理清楚啦。final_suspend 与协程自清理上面例子中每个协程的返回对象需要保存在临时变量 task1/2/3 中不然在调度器运行时会因协程状态销毁而崩溃int main() { SingleThreadScheduler scheduler; demo_coroutine(scheduler, 1); demo_coroutine(scheduler, 2); demo_coroutine(scheduler, 3);