Linux线程4.0-线程同步-条件变量cond,深入生产者-消费者模型。
bit::Shadow✧(≖ ◡ ≖✿目录问题线程不同步下单线程占用CPU过度其他线程无法获取平均分配的资源。解决办法条件变量定义销毁等待唤醒注意对于pthread_cond_signal()唤醒顺序是不一定的。生产者——消费者 条件变量实现☆生产者——消费者模型生产者、消费者关系的321☆☆☆阻塞队列Blocking Queue 复习时非常值得重新完整的再实现一次结构图示队列EQueue()Pop()视频演示生产者消费者模型的意义复盘核心点条件变量POSIX信号量回顾接口sem_init() sem_detroy() sem_wait() sem_post()信号量与条件变量的对比条件变量的封装信号量的极简封装问题线程不同步下单线程占用CPU过度其他线程无法获取平均分配的资源。单线程对锁频繁地申请与释放由于此线程“距离近”CPU切换调度相对对而言耗费大。导致单个线程被过度调用而非我们预期的各个线程平均分配机会。而导致其他线程的“饥饿问题”。解决办法使用线程同步控制——条件变量 / POSIX信号量规范线程调取资源的顺序。条件变量让一个线程在某个条件不满足时“休眠等待”其他线程在适当时间对其唤醒。定义全局定义pthread_cond_t cond PTHREAD_COND_INITIALIZER;局部定义需要调用初始化函数int pthread_cond_init(pthread_cond_t* restrict cond, \ pthread_condattr_t* restrict condattr);C进阶4.0restrict关键字自C99引入表示该指针是访问其指向数据的唯一访问方式。使用pthread_cond_t cond; pthread_cond_init(cond, nullptr);销毁int pthread_cond_destroy(pthread_cond_t* cond);等待int pthread_cond_wait(pthread_cond_t* restrict cond, \ pthread_mutex_t* restrict mutex);唤醒适当条件下其他线程对目标线程wait的唤醒。//全部唤醒 int pthread_cond_broadcast(pthread_cond_t* cond); //指定cond唤醒 int pthread_cond_signal(pthread_cond_t* cond);注意对于pthread_cond_signal()唤醒顺序是不一定的。生产者——消费者 条件变量实现☆生产者——消费者模型生产者多位负责生产果实。消费者多位负责消耗果实。交易场所位于生产者消费者之间的中间商。生产者、消费者关系的3213种关系消费者之间竞争互斥。生产者之间竞争互斥。生产者与消费者之间同步同步资源情况互斥中间商不能同时访问2个角色生产者消费者都是由线程承担的角色。1个交易场所由特定结构构成的“内存”空间。☆☆☆阻塞队列Blocking Queue 复习时非常值得重新完整的再实现一次ps理论纯手搓。使用阻塞队列模拟实现生产者消费者模型。作为一种实现生产者、消费者之间通信的数据结构。其与普通的队列区别在于队列空时“获取端”会被阻塞消费者被阻塞暂停消耗队列满时“输入端”会被阻塞生产者被阻塞暂停生产。结构图示圆环区域内各个区间存储数据当实现线程间灵活通信。队列使用queue即 std::queueT, class Container std::dequeueT 做封装queue是一个容器适配器其底层是 dequeT 封装的结果。templatetypename T class BlockQueue { public: private: size_t _capacity; //仅是手动限定queue可自动扩容。这是使用 条件变量 而非 POSIX信号量 的原因。 std::queueT _BlockQueue;//无reserve pthread_cond_t _IsFull; pthread_cond_t _UnEnough; //锁为cond的等待 pthread_mutex_t _EQMutex; //生产的标志被用于消费者识别唤醒。 可以使用size()代替啊/err size有上限且逻辑错误 int psleep; int csleep; //消费者的锁 pthread_mutex_t _PpMutex; };EQueue()入队列生产者产生果实。主操作——主操作边界条件像:满 空 打开失败等等——加锁/解锁——外部依赖像EQueue线程-考虑Pop与线程的通信唤醒条件——内部依赖线程池的单例模式GetInstance() { if(inc nullptr) //..... inc newThreadPoolT()“内部调用构造函数”}若单if下可能快进程先new后进程判断nullptr失败。所以需要双重ifbool EQueue(T val) { /* 1.条件队列未满_capacity 2.入队要原子操作 3.唤醒标记作为消费者由于 空 而等待(阻塞)。待生产者生产后立即 唤醒 的标志 */ //入队 //在此处加锁 若锁错误则打印输出错误 pthread_mutex_lock(_EQMutex); if(_BlockQueue.size() _capacity) //可替换为while防止伪等待 健壮性考虑 { // 队列满了入的应该等待处理 // pthread_wait?/err printf(生产满了正在等待...\n); psleep; pthread_cond_wait(_IsFull, _EQMutex); //等待结束(必然是消费者的唤醒) printf(生产者被唤醒\n); psleep--; // 满了就应该等待这时若消费者消耗了就应该立即启动 } //入队列(定然未满) _BlockQueue.push(val); std::cout 生产了 _BlockQueue.back() std::endl; if(csleep) { std::cout 消费者被唤醒成功 std::endl; pthread_cond_signal(_UnEnough); } // pthread_mutex_lock使用自己封装的C Lock pthread_mutex_unlock(_EQMutex); // 解锁 return true; }Pop()出队列bool Pop() { //出队列 pthread_mutex_lock(_PpMutex); if(_BlockQueue.size() 0) // 可替换为while防止伪等待 { //小于等于0 要等待 printf(消耗殆尽正在等待...\n); csleep; pthread_cond_wait(_UnEnough, _PpMutex); csleep--; } // std::cout _BlockQueue.front() 被消耗 std::endl; _BlockQueue.pop(); if(psleep) { std::cout 生产者唤醒成功 std::endl; pthread_cond_signal(_IsFull);//唤醒生产者 若没有等待的生产者你不炸了所以要使用特定的sleep标记 } pthread_mutex_unlock(_PpMutex); return true; }视频演示生产者消费者gitee.com内test.cc、Thread_cond.hpp即可实现调试现象。生产者消费者模型的意义为什么要有生产端、消费端模型1.支持忙闲不均衡。2.提高效率。3.生产过程与消费过程解耦。复盘生产端 消费端cond条件变量实现了消费者、生产者之间的通信决定了何时唤醒对方。使用psleep、csleep来作为等待的标志。核心点掌握多线程下原子性精准加锁\解锁掌握设计模板主操作——主操作边界条件像:满 空 打开失败等等——加锁/解锁——外部依赖像EQueue线程-考虑Pop与线程的通信唤醒条件——内部依赖线程池的单例模式GetInstance() { if(inc nullptr) //..... inc newThreadPoolT()“内部调用构造函数”}若单if下可能快进程先new后进程判断nullptr失败。所以需要双重if熟稔于心通信psleep、csleep的精妙设计。static ThreadPoolT *GetInstance() { if (inc nullptr) { LockGuard lockguard(_lock); LOG(LogLevel::DEBUG) 获取单例....; if (inc nullptr) { LOG(LogLevel::DEBUG) 首次使用单例, 创建之....; inc new ThreadPoolT(); inc-Start(); } } return inc; }条件变量条件变量的初始化与锁完全一致均是 定义的地址, nullptr(默认属性) 。锁的lock、unlock条件变量的wait、signal、broadcast参数均是指针类型。使用数组模拟回环队列 》queue[size % capacity]POSIX信号量回顾信号量又称信号灯与信号(进程)无任何关系其本质是一个计数器衡量资源的预订机制信号量又有二元信号量以及多元信号量对信号量的P(--)操作V()操作均是原子的。接口sem_init() sem_detroy() sem_wait() sem_post()信号量的初始化int sem_init(sem_t* sem, int pshared, unsigned int value);sem要设置的信号量。pshared0表示线程间共享非0表示进程间共享。value信号量初始值。信号量的销毁int sem_destroy(sem_t* sem);信号量的等待/减少int sem_wait(sem_t* sem);//P(--)是信号量的 -- 操作代表信号量维护的资源即将被使用。若信号量的值为0则阻塞等待直到有资源可用。 // 同一信号量的V()操作。信号量的释放/增加int sem_post(sem_t* sem);//V()是信号量的操作代表信号量维护的可用资源又增加了。若存在于信号量处等待的线程线程会被释放。顺序无明确规定但通常是FIFO/优先级调度Equeue()由于sem_wait()自带的阻塞作用因此大大简化了代码.sem_post(Pop_sem);//V()通知消费者信号量。void EQueue(T val) { //入队 sem_wait(Equeue_Sem);//P(--) pthread_mutex_lock(_EQMutex); _BlockQueue.push(val); std::cout 生产了 _BlockQueue.back() std::endl; pthread_mutex_unlock(_EQMutex); // 解锁 sem_post(Pop_Sem);//V() }Pop()sem_wait(Pop_Sem);通知Pop()占用情况。void Pop() { sem_wait(Pop_Sem);//P(--) pthread_mutex_lock(mutex); std::cout 消费了 _BlockQueue.front() std::endl; _BlockQueue.pop(val); pthread_mutex_unlock(mutex); sem_post(Equeue_Sem); // V()生产者 }信号量与条件变量的对比对固定已知大小的资源块信号量进行维护相对于条件变量的“循环判断while(IsFull())”信号量内部已经封装适时的阻塞与释放另一方V()操作所以信号量变量常大于等于2个。对比之下条件变量用于了queue的“自动扩容机制”再使用信号量描述“容量”难以实现。信号量固定大小块状形态。条件变量整体状态。表示资源的特定信号量(像有效数据、剩余空间)情况——“信号量是资源的一种预订机制”。感谢支持长期连载欢迎关注条件变量的封装#pragma once #includepthread.h #includecstdio namespace CondModule { class Cond { public: Cond() {} ~Cond() { perror(Cond Destroy() 未被调用\n); } //条件变量 //pthread_cond_init wait signal broadcast void Init() { int n pthread_cond_init(_cond, nullptr); if (!n) perror(Cond init failed\n); } void Wait(pthread_mutex_t* mutex) { int n pthread_cond_wait(_cond, mutex); if (!n) perror(Wait failed\n); } void Signal(pthread_cond_t* cond) { int n pthread_cond_signal(cond); if (!n) perror(Signal failed\n); } void BroadCast(pthread_cond_t* cond) { int n pthread_cond_broadcast(cond); if (!n) perror(Cond BroadCast failed\n); } void Destroy() { int n pthread_cond_destroy(_cond); if (!n) perror(Cond Destory failed\n); } private: pthread_cond_t _cond; }; }; // namespace CondModule信号量的极简封装#pragma once #includesemaphore.h //wait post namespace SemModule { class Sem { public: Sem(unsigned int val) { sem_init(_sem,0,val); } ~Sem() { sem_destroy(_sem); } void Wait() { //P(--) sem_wait(_sem); } void Post() { sem_post(_sem); } private: sem_t _sem; }; }感谢支持持续更新欢迎关注