C++20:Coroutines实践(上):巧用异步文件操作库
引言在上一章中我们掌握了 C20 标准下需要实现的协程接口约定。就目前来说在没有标准库支持的情况下这些约定我们都需要自己实现。但是仅通过阅读标准文档或参考代码编写满足 C 协程约定的程序比较困难。因此我安排了两讲内容带你实战演练一下以一个异步文件系统操作库为例学习如何编写满足 C 协程约定的程序。这一章我们先明确模块架构完成基础类型模块和任务调度模块为后面实现基于协程的异步 I/O 调度打好基础今天的重点内容是任务调度模块。好话不多说就让我们从模块架构开始一步步实现任务调度模块课程配套代码https://github.com/samblg/cpp20-plus-indepth模块组织方式由于这是一个用 C 实现的异步文件操作库我们就将它命名为 asyncpp取 async即异步 asynchronous 这一单词的缩写和 cpp 的组合。这个基于 C 协程的库支持通用异步任务、I/O 异步任务以及异步文件系统操作主要用于 I/O 等任务而非计算任务。整个项目的模块架构图如下。我们用 C Modules 组装整个库我先带你了解一下里面的模块有哪些。asyncpp.core核心的基础类型模块主要用来定义基础的类型与 concepts。asyncpp.task通用异步任务模块实现了主线程内的异步任务框架包括 queue、loop、coroutine 和 asyncify 几个分区。asyncpp.io异步 I/O 模块实现了独立的异步 I/O 线程和任务处理框架用于独立异步处理 I/O包括 task、loop 和 asyncify 几个分区。asyncpp.fs异步文件系统模块基于 asyncpp.io 模块实现了异步的文件系统处理函数。对照示意图从下往上看所有模块都是基于 asyncpp.core 这个基础类型模块实现的。而 asyncpp.task 是库的核心模块asyncpp.io 在该核心模块的基础上提供了异步 I/O 的支持。有了清晰的模块划分我们先从基础类型模块——asyncpp.core 开始编写。基础类型模块基础类型模块提供了库中使用的基本类型的 Concepts因此我们重点关注这个 concepts 分区实现在 core/Concepts.cpp 中。export module asyncpp.core:concepts; import type_traits; namespace asyncpp::core { export template typename T concept Invocable std::is_invocable_vT; }在这段代码中我们定义了 Invocable 这个 concept用于判定 T 是否是可调用的。这个 concept 定义的约束为 std::is_invocable_v用于判定 T() 这个函数调用表达式是否合法。由于用户传入的类型可能是普通函数、成员函数、函数对象或者 lambda 表达式因此这里不能用 std::is_function_v因为这个 traits 只支持普通函数不支持其他的可作为函数调用的类型。接下来我们还要定义基础类型模块的导出模块代码在 core/Core.cpp 中。我们可以看到代码中导入并重新导出了所有的分区。export module asyncpp.core; export import :concepts;基础类型模块的工作告一段落接下来要实现的所有模块我们都会直接或间接使用基础类型模块中的 Concepts。任务调度模块接下来就到了重头戏——完成任务调度模块这是库的核心模块。为了让你更直接地了解 C20 以后可以怎么使用协程接下来我们基于协程约定实现异步任务的定义与调用。同时你也会看到协程的调度细节隐藏在封装的接口实现中这样可以降低协程的使用门槛。先说说设计思路。因为 asyncpp 主要用于 I/O 等任务而非计算任务所以我们模仿了 NodeJS 的实现——在主线程中实现任务循环所有的异步任务都会放入这个任务循环中执行并通过循环实现协程的调度。如果要实现真正的异步需要结合另外的工作线程来执行需要异步化的任务task 模块中提供了异步任务的提交接口提交后的实现我们在后续的 I/O 调度模块中完成。现在我们来实现任务调度模块的各个分区。queue 分区首先我们看一下 queue 分区 task/AsyncTaskQueue.cpp这是一个任务队列的实现。export module asyncpp.task:queue; import functional; import mutex; import vector; namespace asyncpp::task { export struct AsyncTask { // 异步任务处理函数类型 using Handler std::functionvoid(); // 异步任务处理函数 Handler handler; }; export class AsyncTaskQueue { public: static AsyncTaskQueue getInstance(); void enqueue(const AsyncTask item) { std::lock_guardstd::mutex guard(_queueMutex); _queue.push_back(item); } bool dequeue(AsyncTask* item) { std::lock_guardstd::mutex guard(_queueMutex); if (_queue.size() 0) { return false; } *item _queue.back(); _queue.pop_back(); return true; } size_t getSize() const { return _queue.size(); } private: // 支持单例模式通过default修饰符说明构造函数使用默认版本 AsyncTaskQueue() default; // 支持单例模式通过delete修饰符说明拷贝构造函数不可调用 AsyncTaskQueue(const AsyncTaskQueue) delete; // 支持单例模式通过delete修饰符说明赋值操作符不可调用 AsyncTaskQueue operator(const AsyncTaskQueue) delete; // 异步任务队列 std::vectorAsyncTask _queue; // 异步任务队列互斥锁用于实现线程同步确保队列操作的线程安全 std::mutex _queueMutex; }; AsyncTaskQueue AsyncTaskQueue::getInstance() { static AsyncTaskQueue queue; return queue; } }这段代码的核心部分是 AsyncTaskQueue 类主要实现了 enqueue 函数和 dequeue 函数。enqueue 函数负责将任务添加到任务队列尾部这里我们用到了互斥锁来实现线程同步。dequeue 则是从任务队列头部获取任务取出任务后会将任务数据从队列中清理掉防止重复执行任务。这里同样用了互斥锁来实现线程同步如果任务不存在会返回 false如果任务存在会将任务写入到参数传入的指针中并返回 true。loop 分区接下来是 loop 分区 task/AsyncTaskLoop.cpp实现了消息循环我们会在 loop 分区使用刚才实现的 queue 分区用作消息循环中的任务队列。后面是具体代码。export module asyncpp.task:loop; import :queue; import cstdint; import chrono; namespace asyncpp::task { export class AsyncTaskLoop { public: // 常量定义了任务循环的等待间隔时间单位为毫秒 static const int32_t SLEEP_MS 1000; static AsyncTaskLoop getInstance(); static void start() { getInstance().startLoop(); } private: // 支持单例模式通过default修饰符说明构造函数使用默认版本 AsyncTaskLoop() default; // 支持单例模式通过delete修饰符说明拷贝构造函数不可调用 AsyncTaskLoop(const AsyncTaskLoop) delete; // 支持单例模式通过delete修饰符说明赋值操作符不可调用 AsyncTaskLoop operator(const AsyncTaskLoop) delete; void startLoop() { while (true) { loopExecution(); std::this_thread::sleep_for(std::chrono::milliseconds(SLEEP_MS)); } } void loopExecution() { AsyncTask asyncEvent; if (!AsyncTaskQueue::getInstance().dequeue(asyncEvent)) { return; } asyncEvent.handler(); } }; AsyncTaskLoop AsyncTaskLoop::getInstance() { static AsyncTaskLoop eventLoop; return eventLoop; } }这段代码的核心是 AsyncTaskLoop 类主要实现了 start、startLoop 和 loopExecution 这三个成员函数我们依次来看看这些函数的作用。start 用于在当前线程启动任务循环实现是调用 startLoop调用后当前线程会阻塞直到出现需要执行的任务。startLoop 用来启动任务循环其实现是一个循环每次循环会调用 loopExecution 成员函数然后通过 this_thread 的 sleep 睡眠等待一段时间给其他线程让出 CPU。如果你足够细心刚才看代码时可能已经注意到了这里的时间定义成了一个常量。在真实的开发场景里这个时间会很短我们这里为了演示任务调度过程特意将时间设置为 1000ms这样输出过程会更加明显。loopExecution 用来执行任务其实现是从任务队列 AsyncTaskQueue 实例中获取最早的任务如果任务不存在就直接返回。coroutine 分区接下来是 coroutine 分区 task/Coroutine.cpp实现了 C 协程约定的几个类型与相关接口为使用协程进行任务调度提供关键支持。代码如下所示。export module asyncpp.task:coroutine; import coroutine; import functional; namespace asyncpp::task { // 协程类 export struct Coroutine { // 协程Promise定义 struct promise_type { Coroutine get_return_object() { return { ._handle std::coroutine_handlepromise_type::from_promise(*this) }; } std::suspend_never initial_suspend() { return {}; } std::suspend_never final_suspend() noexcept { return {}; } void return_void() {} void unhandled_exception() {} }; // 协程的句柄可用于构建Coroutine类并在业务代码中调用接口进行相关操作 std::coroutine_handlepromise_type _handle; }; // AsyncTaskSuspender类型声明 export template typename ResultType struct Awaitable; export using AsyncTaskResumer std::functionvoid(); export using CoroutineHandle std::coroutine_handleCoroutine::promise_type; export template typename ResultType using AsyncTaskSuspender std::functionvoid( AwaitableResultType*, AsyncTaskResumer, CoroutineHandle ); // Awaitable类型定义当任务函数返回类型不为void时 export template typename ResultType struct Awaitable { // co_await时需要执行的任务开发者可以在suspend实现中调用该函数执行用户期望的任务 std::functionResultType() _taskHandler; // 存储任务执行的结果会在await_resume中作为co_await表达式的值返回 ResultType _taskResult; // 存储开发者自定义的await_suspend实现会在await_suspend中调用 AsyncTaskSuspenderResultType _suspender; bool await_ready() { return false; } void await_suspend(CoroutineHandle h) { _suspender(this, [h] { h.resume(); }, h); } ResultType await_resume() { return _taskResult; } }; // Awaitable类型定义当任务函数返回类型为void时 export template struct Awaitablevoid { // co_await时需要执行的任务开发者可以在suspend实现中调用该函数执行用户期望的任务 std::functionvoid() _taskHandler; // 存储开发者自定义的await_suspend实现会在await_suspend中调用 AsyncTaskSuspendervoid _suspender; bool await_ready() { return false; } void await_suspend(CoroutineHandle h) { _suspender(this, [h] { h.resume(); }, h); } void await_resume() {} }; }在这段代码中我们定义了 C 协程支持的几个关键类型。首先是协程类型 Coroutine协程调用者一般需要通过该类型操作 coroutine_handle来实现协程的调度该定义包含了嵌套类型 promise_type 和协程句柄变量 _handle。接着在 Coroutine 中定义了 Promise 类型该对象在协程生命周期中一直存在因此可以在不同的线程或者函数之间传递协程的各类数据与状态。类型中的大多数接口没有特殊行为所以都用了默认实现空函数。其中比较特殊的是 get_return_object我们在上一讲说过协程调用者调用协程时获取到的返回值就是该函数的返回值。这里我们通过 coroutine_handle 的 from_promise 函数获取到 promise 对象对应的协程句柄调用 Coroutine 的构造函数生成 Coroutine 对象并返回因此协程函数的调用者获取到该对象后可以根据业务控制调度协程。接着我们定义了 CoroutineHandle 类型这是 std::coroutine_handle 的别名也就是协程的句柄。协程句柄是 C 提供的唯一的协程标准类型指向一次协程调用生成的协程帧因此可以访问到存储在协程帧上的 Promise 对象。协程句柄提供了协程调度的标准函数是协程调用者进行协程调度的基础。由于该类型是一个泛型类模板参数是 Promise 的类型而且会在后续代码中频繁使用为了方便我们为 std::coroutine_handle 定义了一个别名 CoroutineHandle。最后我们定义了 Awaitable 类型在协程中使用 co_await 进行休眠时需要该类型支持。Awaitable 对于实现协程调度至关重要其中 await_resume 和 await_suspend 的实现是重点。我们在此做出进一步分析。首先是 await_resume 的实现。假设用户需要通过 co_await 异步执行函数 f并在 f 结束后获取到 f 的返回值作为 co_await 表达式的值也就是我们希望实现的效果是auto result co_await Awaitable(f);当 f 执行结束后协程会被唤醒并将 f 的返回值赋给 result。另外考虑到函数 f 的返回类型为 void 的情况相当于没有返回值它与“返回值类型不为 void”时的实现完全不同不需要存储函数 f 的返回值。因此我在这里定义了一个 Awaitable 的特化版本——当函数 f 的返回类型为 void 时会使用该版本的 Awaitable 类。在该版本中不会存储函数 f 的返回值await_resume 的返回类型固定为 void并且不会返回任何值。接着是 await_suspend 的实现通过它我们就能控制在“何时、何处”唤醒被 co_await 休眠的协程。这里允许开发者通过 AsyncTaskSuspender 来实现 await_suspend 的具体行为。await_suspend 中会调用开发者实现的函数来唤醒休眠的协程。AsyncTaskSuspender 包含后面这三个参数开发者可以利用这些参数实现不同的调度机制。Awaiter 对象指针Awaitable*。协程的唤醒函数AsyncTaskResumer。协程的句柄CoroutineHandle。asyncify 分区接下来是 asyncify 分区 task/Asyncify.cpp该分区实现了 asyncify 工具函数用于将一个普通的函数 f 转换成一个返回 Awaitable 对象的函数 asyncF。通过这个分区实现的工具可以让库的用户更容易使用我们在上一节实现的 Coroutine。开发者通过 co_await 调用 asyncF就可以实现函数 f 的异步调用并在 f 执行完成后重新唤醒协程。如果你了解过 JavaScript可以将其类比成 ES6 中的 promsify。后面是代码实现。export module asyncpp.task:asyncify; export import :queue; export import :loop; export import :coroutine; import asyncpp.core; namespace asyncpp::task { using asyncpp::core::Invocable; // 默认的AsyncTaskSuspender当任务函数返回类型不为void时 template typename ResultType void defaultAsyncAwaitableSuspend( AwaitableResultType* awaitable, AsyncTaskResumer resumer, CoroutineHandle h ) { auto asyncTaskQueue AsyncTaskQueue::getInstance(); asyncTaskQueue.enqueue({ .handler [resumer, awaitable] { awaitable-_taskResult awaitable-_taskHandler(); resumer(); } }); } /* 默认的AsyncTaskSuspender当任务函数返回类型为void时的特化版本 * * 当f的返回类型为void时函数f没有返回值。因此我们定义了一个函数返回类型为void的特化版本 * 在该版本中构造的AsyncTask对象的handler调用用户函数f后直接调用resumer唤醒协程 * 不会将f的返回值存储到Awaitable对象中。 */ template void defaultAsyncAwaitableSuspendvoid( Awaitablevoid* awaitable, AsyncTaskResumer resumer, CoroutineHandle h ) { auto asyncTaskQueue AsyncTaskQueue::getInstance(); asyncTaskQueue.enqueue({ .handler [resumer, awaitable] { awaitable-_taskHandler(); resumer(); } }); } // 异步化工具函数支持将普通函数f异步化 export template Invocable T auto asyncify( T taskHandler, AsyncTaskSuspenderstd::invoke_result_tT suspender defaultAsyncAwaitableSuspendstd::invoke_result_tT ) { return Awaitablestd::invoke_result_tT { ._taskHandler taskHandler, ._suspender suspender }; } }在这段代码中我定义了两个版本的 defaultAsyncAwaitableSuspend 函数它就是 Coroutine 模块中 Awaitable 类型所需的 AsyncTaskSuspender 函数该函数的作用是在 co_await 休眠协程后执行用户函数 f 和唤醒协程。我们的实现其实很简单就是构造一个 AsyncTask 对象并添加到 AsyncTaskQueue 中。AsyncTask 对象的 handler 会执行用户函数 f将 f 的返回值存储到 awaitable 对象中最后调用 resumer 唤醒协程。接着我们定义了 asyncify 模版函数模板参数 T 必须符合 Invocable 约束也就是必须可调用对应了用户函数 f 的类型。该函数包含两个参数。taskHandler期望异步执行的函数 f。suspenderAwaitable 中用户可以自己设置的 AsyncTaskSuspender 函数。总结为了帮你解决难题熟悉怎样编写满足 C 协程约定的程序我们实现了一个异步文件系统操作库中的任务调度模块。其中 coroutine 分区实现了 C 协程约定的几个类型与相关接口为使用协程进行任务调度提供关键支持。一般来说提供异步调用的库的底层实现各有不同但是它们的目标是一致的就是在某个消息循环上提供异步调用的基础设施。而我们选择使用 C Coroutines 来实现高性能异步调度能力。在下一章我们将继续编程实战使用这一讲实现的任务调度模块继续实现基于协程的异步 I/O 调度。