引言本篇文章是对于调度器scheduler的代码实现代码也是参考了sylar的服务器框架如果大家有兴趣可以看一下我的前两篇文章从零开始手写一个协程库一-CSDN博客从零开始手写一个协程库二-CSDN博客对于调度器的一个基本介绍首先调度器并不是调度协程调度器是最高级的也是只在主线程里面存在的而调度协程是在每个线程里面存在的调度协程的任务是帮助本线程处理任务的调度而调度器则是往这些调度协程里面塞任务的装置。我们这里是多线程对应多协程这样可以更好的提高我们的利用率。然后我们也准备了两种模式一种是主线程参与工作第二种是主线程不参与工作主线程只负责操控调度器来分发任务。所以名义上叫调度器实际上就是管理一堆的线程然后在线程里面放一个调度协程再加上自身的一个任务队列这就是调度器的本质具体的细节我们代码里面说~~~代码实现这个是我们大类的一个基本架构。首先作为调度器我们里面是由一个线程池和一个任务队列组成的。然后我们提供了一个scheduleLock函数这个函数的目的就是往任务队列里面添加任务。这个任务可以是协程可以是函数因为一个函数我们最后也会封装成一个协程。但是为了方便和准确我们还是用一个类模板的形式。首先我们需要做一个标记来判断我们需不需要唤醒线程因为如果之前的队列是空的那么所有的线程都处于idle模式我们需要唤醒如果不是空的说明已经有线程在工作了不需要唤醒。然后我们的idle函数的目的是让每个线程空闲的协程函数不休息执行这么一个函数这样子就不需要来回的切换状态效率可以大大地提高。然后就是我们地任务结构体这个结构体里面封装地几个函数都是为了更加方便地处理这个任务地形式不仅可以接受对象还可以接受对象指针不仅可以是协程还可以是函数不过大家要注意一个点就是无论我们调用哪一个函数我们都必须要指定一个线程号这个的目的就是指定我们需要执行这个任务的线程其他线程不可以执行这一个任务最后我们用了m_useCaller变量目的就是标记我们的主线程是不是参加工作class Scheduler { public: // threads指定线程池的的数量use_caller指定是否将主线程作为工作线程,name调度器的名称 Scheduler(size_t threads 1, bool use_caller true, const std::string name Scheduler); virtual ~Scheduler(); // 防止资源出现泄漏基类指针删除派生类对象的时候不完全销毁对象的问题 const std::string getName() const {return m_name;} public: // 获取正在运行的调度器 static Scheduler* GetThis(); protected: // 设置正在运行的调度器 void SetThis(); public: // 添加到任务队列里面 // FiberOrCb 调度任务类型可以是协程对象或函数指针 /* 这个函数的逻辑是因为调用了这个函数说明有任务进来了然后判断任务队列是不是空的 如果是空的说明线程们都睡觉在需要唤醒线程 如果不是空的说明有线程在运行不需要唤醒线程 */ template typename FiberOrCb void scheduleLock(FiberOrCb fc, int thread -1) { bool need_tickle; // 用于标记任务队列是否为空从而判断是否需要唤醒线程 { std::lock_guardstd::mutex lock(m_mutex); need_tickle m_tasks.empty(); // 创建Task的任务对象 ScheduleTask task(fc, thread); if (task.fiber || task.func) { // 任务的协程或者函数存在一个就可以了 m_tasks.push_back(task); } } if (need_tickle) { // 如果任务队列为空需要唤醒线程 tickle(); } } virtual void start(); // 启动线程池启动调度器 virtual void stop(); // 停止线程池停止调度器,等待所有调度任务完成之后返回 protected: virtual void tickle(); // 唤醒线程 // 线程函数 virtual void run(); // 空闲协程函数无任务调度的时候执行idle协程 virtual void idle(); // 是否可以关闭调度器判断是否有线程在运行是否有任务在列里面 virtual bool stopping(); // 返回是否有空闲的协程 // 当调度协程进入idle时空闲的线程1当调度协程返回空闲时线程数-1 virtual bool hasIdleThread() {return m_idleThreadCount 0;} private: // 任务 struct ScheduleTask { std::shared_ptrFiber fiber; std::functionvoid() func; int thread; // 指定任务需要运行的线程我们这里是多线程多协程 ScheduleTask() { fiber nullptr; func nullptr; thread -1; } /* 如果外部想保留任务的所有权就传值SchedulerTask(fiber, 0) 如果外部想把任务“扔”给调度器就传指针SchedulerTask(fiber, 0) 调度器内部通过 swap 直接接管零开销 */ ScheduleTask(std::shared_ptrFiber f, int thr) { fiber f; thread thr; } ScheduleTask(std::functionvoid() f, int thr) { func f; thread thr; } ScheduleTask(std::shared_ptrFiber* f, int thr) { fiber.swap(*f); // 交换指针避免引用计数增加 thread thr; } ScheduleTask(std::functionvoid()* f, int thr) { func.swap(*f); // 交换指针避免引用计数增加 thread thr; } // 重置 void reset() { fiber nullptr; func nullptr; thread -1; } }; private: std::string m_name; std::mutex m_mutex; std::vectorstd::shared_ptrThread m_threads; // 线程池 std::vectorScheduleTask m_tasks; // 任务队列 std::vectorint m_threadIds; // 线程id列表 size_t m_threadCount 0; // 需要额外创建的线程数量 std::atomicsize_t m_activeThreadCount{0}; // 正在运行的线程数量 std::atomicsize_t m_idleThreadCount{0}; // 空闲线程数量 bool m_useCaller; // 是否将主线程作为工作线程 std::shared_ptrFiber m_schedulerFiber; // 如果是-需要额外创建调度协程 int m_rootThread -1; // 如果是-记录主线程的线程id bool m_stopping false; // 是否停止调度器运行 };这个是构造函数和析构函数对于构造函数来说它必须要确定主线程的情况我们首先要设置调度器的位置在主线程里面用SetThis然后如果主线程参加工作首先我们要初始化环境同时创建主协程然后创建子协程这个协程是调度协程注意一下我们创建子协程的时候绑定了一个run函数这个函数就是调度函数还有一个点我们传递了false这个参数目的是因为这个就是调度协程自然不会受到调度所以是false。然后就是基本的赋值操作// 在主线程里面创建了主协程和调度协程 Scheduler::Scheduler(size_t threads, bool use_caller, const std::string name) : m_useCaller(use_caller), m_name(name) { // 首先判断线程的数量是否大于0并且调度器的对象是否是空指针然后调用SetThis进行设置 assert(threads 0 Scheduler::GetThis() nullptr); SetThis(); Thread::SetName(m_name); // 设置当前线程的名称为调度器的名称 // 使用主线程当作工作线程创建协程的主要原因是为了实现更加高效的任务调度和管理 if (use_caller) { // 表示当前线程会被当作工作线程 threads--; // 减少线程的数量因为主线程会被当作工作线程 // 创建主协程 Fiber::GetThis(); // 创建专门运行run方法的调度协程这个调度协程负责在当前线程中执行任务和分发循环 m_schedulerFiber.reset(new Fiber(std::bind(Scheduler::run, this)), 0, false); // 这里是false意味着协程退出之后将返回主协程(相当于任务调度结束返回到主协程) Fiber::SetSchedulerFiber(m_schedulerFiber.get()); // 设置协程的调度对象 m_rootThread Thread::GetThreadId(); // 获取当前线程的ID m_threadIds.push_back(m_rootThread); } m_threadCount threads; // 将剩余的线程数量赋值给m_threadCount } Scheduler::~Scheduler() { assert(stopping() true); if (GetThis() this) { t_scheduler nullptr; } if (debug) { std::cout Scheduler::~Scheduler() std::endl; } }然后我想补充一个易错的知识点这两行代码看起来差不多其实区别很大。第一个是指针代表的是每一个线程都可以通过这个指针来访问调度器。第二个是对象代表的是每一个线程都有一个属于他们自己的主协程。static thread_local Scheduler* t_scheduler nullptr; static thread_local std::shared_ptrFiber t_thread_fiber nullptr;对于线程池的启动函数start我唯一想说的就是每次塞进线程池里面的线程一定是执行run函数。大家可以把run函数理解成一个协程也就是调度协程我们每一次初始化一个线程肯定是要先给它一个调度的协程再插入任务的。// 判断这个函数是否可以退出 bool Scheduler::stopping() { std::lock_guardstd::mutex lock(m_mutex); return m_stopping m_tasks.empty() m_activeThreadCount 0; } void Scheduler::start() { std::lock_guardstd::mutex lock(m_mutex); assert(m_stopping false); // 调度器还未开启 assert(m_threads.empty()); // 线程池还未开启 m_threads.resize(m_threadCount); for (size_t i 0; i m_threadCount; i) { m_threads[i].reset(new Thread(std::bind(Scheduler::run, this), m_name _ std::to_string(i))); m_threadIds.push_back(m_threads[i]-getId()); } if (debug) { std::cout Scheduler::start() std::endl; } }接下来就是核心run也就是每个调度协程要干的事情。首先要设置一下调度器的位置因为调度器只存在于主线程所以对于子线程如果要找到调度器就必须先设置一个调度器指针副本thread_local)然后因为我们主线程里面的调度协程已经初始化了环境但是子线程里面的调度协程才刚刚创建所以也要初始化。然后就到了关键的地方首先我们先创建一个执行idle的指针还有一个任务对象用来接受要处理的任务。然后就是while循环这个循环如果没有调度器通知退出我们是不会主动退出的。在循环的一开始我们要做的就是取任务这个任务有两个标准一个是对应这个线程的id一个是这个任务没有被执行。同时我们需要设置一个tickle_me变量这个变量为true的条件就是遍历的过程中发现了任务但是这个任务要么是不属于这个线程要么是已经被其他线程正在执行因为我们假设是单核CPU所以这个线程在执行的时候其他线程相当于在睡觉或者是如果第一个就拿到了任务但是发现队伍里面还有任务因为这个任务不是end的前面一个那么就标记为true随后通知一下其他线程你们有任务了别睡了。然后是执行任务如果是函数就封装成协程用智能指针然后就是resume最后结束记得reset这个任务。但是我们没有这个任务呢首先主协程也就是调度协程会调用idle的resume然后在idle里又会yeild这个时候执行权又回到了主协程也就是调度协程这里一直循环直到有任务到来// 调度器的核心负责从任务队列中取出任务并通过协程执行 void Scheduler::run() { int thread_id Thread::GetThreadId(); // 获取当前线程的ID if (debug) { std::cout Scheduler::run() start thread_id std::endl; } SetThis(); // 设置调度器对象 // 如果当前线程不是主线程那么就创建一个协程用于执行任务和分发循环 if (thread_id ! m_rootThread) { Fiber::GetThis(); } std::shared_ptrFiber idle_fiber std::make_sharedFiber(std::bind(Scheduler::idle, this)); ScheduleTask task; while(true) { task.reset(); bool tickle_me false; // 是否唤醒了其他线程进行任务调度 { std::lock_guardstd::mutex lock(m_mutex); auto it m_tasks.begin(); // 遍历任务队列 while(it ! m_tasks.end()) { if (it-thread ! -1 it-thread ! thread_id) { // 我们只执行没有被分配到的线程并且任务分配时传递的id号要求和我们的线程一摸一样 tickle_me true; it; continue; } assert(it-fiber || it-func); task *it; m_tasks.erase(it); m_activeThreadCount; break; // 这里取到了任务就没有必要遍历到尾 } tickle_me tickle_me || (it ! m_tasks.end()); // 仍然存在未处理的任务 } // 具体的代码在io scheduler if (tickle_me) { tickle(); } // 执行任务 if (task.fiber) { { // resume协程resume返回时要么此时任务结束了要么半路yeild了总之任务完成了活跃线程-1 std::lock_guardstd::mutex lock(m_mutex); if (task.fiber-getState() ! Fiber::TERM) { task.fiber-resume(); } } m_activeThreadCount--; // 线程执行完任务之后就不再处于活跃状态 task.reset(); } else if (task.func) { // 封装成协程加入调度 std::shared_ptrFiber fiber std::make_sharedFiber(task.func, 0, true); { std::lock_guardstd::mutex lock(m_mutex); fiber-resume(); } m_activeThreadCount--; // 线程执行完任务之后就不再处于活跃状态 task.reset(); } else { // 无任务执行idle协程 // 系统关闭-》idle协程将从死循环中出来结束-》此时的idle协程状态为TERM-》再次进入将跳出循环并退出run() if (idle_fiber-getState() Fiber::TERM) { /* 如果调度器没有调度任务那么idle协程不断地resume和yield不会结束进入一个忙等待如果idle协程结束 一定是调度器停止了直到有任务才会执行上面的if/else在这里idle_fiber就是不断和主协程交互的子协程 */ if (debug) { std::cout Scheduler::run() ends in thread: thread_id std::endl; } break; } m_idleThreadCount; idle_fiber-resume(); m_idleThreadCount--; } } }这个也就是idle的代码void Scheduler::idle() { while(!stopping()) { if (debug) { std::cout Scheduler::idle(), sleeping in thread: Thread::GetThreadId() std::endl; sleep(1); Fiber::GetThis()-yield(); } } }最后我们要让调度器优雅的终止有几行十分优雅的代码std::vectorstd::shared_ptrThread thrs;从这一行开始我们的回收工作因为我们无法保证在回收的时候没有任务进来但是回收的时候一定需要加锁防止数据的竞争这就导致如果我们直接for加锁然后join可能如果有任务需要长时间执行我们线程又开始工作然后join一直阻塞导致这个锁一直释放不了。这个锁释放不了那些任务也可能执行不了然后就死锁了。为了避免这个状态我们在加锁的一瞬间swap一下把当前的所有线程全部转移到另一个容器里这个时候我们的join就不需要加锁了因为没有人会修改thr容器所以我们完全可以等待所有的线程执行完任务再回收不会占着锁不释放。void Scheduler::stop() { if (debug) { std::cout Scheduler::stop() in thread : Thread::GetThreadId() std::endl; } if (stopping()) { return; } m_stopping true; if (m_useCaller) { assert(GetThis() this); } else{ assert(GetThis() ! this); } for (size_t i 0; i m_threadCount; i) { tickle(); // 唤醒空闲线程 } if (m_schedulerFiber) { tickle(); // 唤醒调度器Fiber } if (m_schedulerFiber) { m_schedulerFiber-resume(); // 开始调度任务 if (debug) { std::cout Scheduler::stop() ends in thread: Thread::GetThreadId() std::endl; } } // 获取此时的线程通过swap不会增加引用计数的方式加入到thrs方便下面的join保持线程的正常退出 std::vectorstd::shared_ptrThread thrs; { std::lock_guardstd::mutex lock(m_mutex); thrs.swap(m_threads); } for (auto thr : thrs) { thr-join(); } if (debug) { std::cout Scheduler::stop() ends in thread: Thread::GetThreadId() std::endl; } }总结本篇文章到这里就结束了希望可以帮助大家理解调度器~~~