【Linux驱动开发】并发与竞争详解——原子操作、自旋锁、信号量与互斥体
前言今天我们来聊一聊Linux驱动开发中一个非常重要的话题——并发与竞争。如果你是从单片机比如STM32转过来学习Linux驱动的那么你对中断、临界区这些概念一定不陌生。但是在Linux这个多任务操作系统中并发的情况要复杂得多。如果你在编写驱动的时候不注意处理并发和竞争很可能会埋下一些难以排查的隐患轻则数据错乱重则系统崩溃。这篇文章会从最基础的概念讲起用通俗易懂的例子带你理解Linux内核提供的几种并发处理机制原子操作、自旋锁、信号量和互斥体。文章会详细介绍每种机制的原理、API函数以及使用注意事项最后还会做一个对比总结帮助你在实际开发中选择最合适的方案。一、并发与竞争简介1.1 什么是并发与竞争先给大家举个生活中的例子你们公司有一台打印机所有人都可以使用。现在小李和小王要同时打印文件。小李要打印的内容我叫小李 电话123456 工号16小王要打印的内容我叫小王 电话678910 工号20如果打印机不做任何处理两个人同时发送打印任务会发生什么呢可能会出现这样的情况我叫小王 电话123456 工号20看到了吗小王的电话号码变成小李的了这就是典型的竞争问题——多个用户同时访问同一个共享资源打印机导致数据错乱。在Linux系统中也是一样的。Linux是个多任务操作系统会存在多个任务同时访问同一片内存区域的情况这些任务可能会相互覆盖这段内存中的数据造成内存数据混乱。这就是我们常说的并发与竞争问题。1.2 并发产生的原因Linux系统中并发产生的原因比较复杂总结起来主要有以下几个序号并发原因说明①多线程并发访问Linux是多任务线程系统多线程访问是最基本的原因②抢占式并发访问从2.6版本内核开始Linux内核支持抢占调度程序可以在任意时刻抢占正在运行的线程③中断程序并发访问学过STM32的同学都知道硬件中断的优先级很高可以打断正在执行的线程④SMP多核核间并发访问现在ARM架构的多核SOC很常见多核CPU之间存在核间并发访问 提示很多Linux驱动初学者往往不注意并发问题在驱动程序中埋下了隐患。这类问题通常又很不容易查找导致驱动调试难度加大、费时费力。所以我们一定要在编写驱动的时候就考虑到并发与竞争而不是等驱动都写完了再去处理。1.3 保护的内容是什么前面一直说要防止并发访问共享资源那么问题来了我们到底在保护什么答案是数据某个线程的局部变量不需要保护因为只有它自己能访问我们要保护的是多个线程都会访问的共享数据比如全局变量设备结构体共享的硬件寄存器找到要保护的数据才是重点也是难点。因为驱动程序各不相同数据也千变万化。一般像全局变量、设备结构体这些肯定是要保护的至于其他的数据就要根据实际的驱动程序而定了。二、原子操作2.1 原子操作简介首先来看最简单的一种并发处理方式——原子操作。什么是原子操作呢原子操作就是指不能再进一步分割的操作一般用于变量或者位操作。可能有人会说不就是给变量赋个值吗这有什么难的哎还真没那么简单。我们来看一个例子假设现在要对无符号整型变量a赋值为3C语言很简单a 3;但是C语言要先编译成汇编指令。在ARM架构下不支持直接对内存进行读写操作需要借助寄存器来完成。上面这一行C语言可能会被编译成3条汇编指令ldr r0, 0X30000000 /* 变量a的地址 */ ldr r1, 3 /* 要写入的值 */ str r1, [r0] /* 将3写入变量a */看到了吗简简单单的一句C语言编译后变成了3条汇编指令。那么问题就来了假设线程A要把a设为10线程B要把a设为20理想的执行流程是这样的线程Aldr → ldr → str → 完成a10 线程B ldr → ldr → str → 完成a20但实际上的执行流程可能是这样的线程Aldr → ldr → 被打断 线程B ldr → ldr → str → 完成a20 线程A str → 完成a10不对最终结果是线程A把a设成了10而不是我们期望的20这就是一个最简单的并发与竞争的例子。要解决这个问题就要保证这3条汇编指令作为一个整体运行也就是作为一个原子存在。这就是原子操作的由来。Linux内核提供了两组原子操作API函数一组是对整型变量进行操作的一组是对位进行操作的2.2 原子整形操作API函数Linux内核定义了一个叫做atomic_t的结构体来完成整型数据的原子操作typedef struct { int counter; } atomic_t;定义和初始化原子变量atomic_t a; // 定义原子变量a atomic_t b ATOMIC_INIT(0); // 定义原子变量b并赋初值为0常用API函数函数描述ATOMIC_INIT(int i)定义原子变量的时候对其初始化int atomic_read(atomic_t *v)读取v的值并且返回void atomic_set(atomic_t *v, int i)向v写入i值void atomic_add(int i, atomic_t *v)给v加上i值void atomic_sub(int i, atomic_t *v)从v减去i值void atomic_inc(atomic_t *v)给v加1自增void atomic_dec(atomic_t *v)从v减1自减int atomic_dec_return(atomic_t *v)从v减1并且返回v的值int atomic_inc_return(atomic_t *v)给v加1并且返回v的值int atomic_sub_and_test(int i, atomic_t *v)从v减i如果结果为0就返回真否则返回假int atomic_dec_and_test(atomic_t *v)从v减1如果结果为0就返回真否则返回假int atomic_inc_and_test(atomic_t *v)给v加1如果结果为0就返回真否则返回假int atomic_add_negative(int i, atomic_t *v)给v加i如果结果为负就返回真否则返回假使用示例atomic_t v ATOMIC_INIT(0); // 定义并初始化原子变量v0 atomic_set(v, 10); // 设置v10 atomic_read(v); // 读取v的值肯定是10 atomic_inc(v); // v的值加1v11 64位原子操作如果使用64位的SOC就要用到64位的原子变量atomic64_t对应的API函数只是把atomic_前缀换成atomic64_把int换成long long用法是一样的。2.3 原子位操作API函数位操作也是很常用的操作。Linux内核也提供了一系列的原子位操作API函数。和原子整形变量不同原子位操作不需要专门的数据结构它是直接对内存进行操作的。函数描述void set_bit(int nr, void *p)将p地址的第nr位置1void clear_bit(int nr, void *p)将p地址的第nr位清零void change_bit(int nr, void *p)将p地址的第nr位进行翻转int test_bit(int nr, void *p)获取p地址的第nr位的值int test_and_set_bit(int nr, void *p)将p地址的第nr位置1并且返回nr位原来的值int test_and_clear_bit(int nr, void *p)将p地址的第nr位清零并且返回nr位原来的值int test_and_change_bit(int nr, void *p)将p地址的第nr位翻转并且返回nr位原来的值三、自旋锁3.1 自旋锁简介原子操作虽然好用但它只能对整型变量或者位进行保护。在实际的使用环境中临界区怎么可能只有整型变量或位这么简单呢举个最简单的例子设备结构体变量就不是整型变量我们对于结构体中成员变量的操作也要保证原子性。这些工作原子操作都不能胜任这时候就需要用到锁机制了。在Linux内核中最基础的锁就是自旋锁Spinlock。自旋锁的工作原理自旋锁的工作原理很简单当一个线程要访问某个共享资源的时候首先要获取相应的锁锁只能被一个线程持有只要此线程不释放持有的锁其他的线程就不能获取此锁对于自旋锁而言如果锁正在被线程A持有线程B想要获取锁那么线程B就会处于忙循环-旋转-等待状态不会进入休眠而是一直在那里转圈圈等待锁可用再给大家举个生活中的例子 有个公用电话亭一次只能进去一个人打电话。现在电话亭里面有人正在打电话获得了自旋锁。你到了电话亭门口因为里面有人所以你不能进去没有获取自旋锁。这个时候你就站在原地等待可能因为无聊而转圈圈消遣时光反正哪里也不能去要一直等到里面的人打完电话出来释放自旋锁你才能进去打电话获取到自旋锁。这就是自旋这个名字的由来——原地打转等待锁可用。自旋锁的优缺点优点实现简单不需要上下文切换等待时间短的情况下效率很高缺点等待锁的线程会一直处于自旋状态浪费处理器时间降低系统性能所以自旋锁的持有时间不能太长适用于短时期的轻量级加锁Linux内核使用结构体spinlock_t表示自旋锁typedef struct spinlock { union { struct raw_spinlock rlock; #ifdef CONFIG_DEBUG_LOCK_ALLOC struct { u8 __padding[LOCK_PADSIZE]; struct lockdep_map dep_map; }; #endif }; } spinlock_t;3.2 自旋锁API函数基本自旋锁API最基本的自旋锁API函数如下函数描述DEFINE_SPINLOCK(spinlock_t lock)定义并初始化一个自旋锁变量int spin_lock_init(spinlock_t *lock)初始化自旋锁void spin_lock(spinlock_t *lock)获取指定的自旋锁加锁void spin_unlock(spinlock_t *lock)释放指定的自旋锁解锁int spin_trylock(spinlock_t *lock)尝试获取指定的自旋锁如果没有获取到就返回0int spin_is_locked(spinlock_t *lock)检查指定的自旋锁是否被获取⚠️ 重要提醒被自旋锁保护的临界区一定不能调用任何能够引起睡眠和阻塞的API函数否则可能会导致死锁现象的发生为什么呢因为自旋锁会自动禁止抢占。如果线程A在持有锁期间进入了休眠状态那么线程A会自动放弃CPU使用权。线程B开始运行线程B也想要获取锁但是此时锁被A线程持有而且内核抢占还被禁止了线程B无法被调度出去那么线程A就无法运行锁也就无法释放——死锁就发生了中断场景下的自旋锁上面的API函数适用于线程之间的并发访问。那如果中断也要来访问共享资源怎么办呢首先可以肯定的是中断里面可以使用自旋锁。但是在中断里面使用自旋锁的时候在获取锁之前一定要先禁止本地中断也就是本CPU的中断否则可能导致死锁现象的发生。我们来看一下为什么会发生死锁线程Aspin_lock(lock) → 正在执行临界区代码 中断 ← 中断发生打断线程A 中断spin_lock(lock) → 中断也想获取锁但是锁被线程A持有了... 中断一直自旋等待...在这个场景中线程A先运行并且获取到了lock这个锁当线程A运行临界区代码的时候中断发生了中断抢走了CPU使用权中断服务函数也要获取lock这个锁但是这个锁被线程A占有着中断就会一直自旋等待锁有效但是在中断服务函数执行完之前线程A是不可能执行的场面就这么僵持着——死锁发生了最好的解决方法就是获取锁之前关闭本地中断。Linux内核提供了相应的API函数函数描述void spin_lock_irq(spinlock_t *lock)禁止本地中断并获取自旋锁void spin_unlock_irq(spinlock_t *lock)激活本地中断并释放自旋锁void spin_lock_irqsave(spinlock_t *lock, unsigned long flags)保存中断状态禁止本地中断并获取自旋锁void spin_unlock_irqrestore(spinlock_t *lock, unsigned long flags)将中断状态恢复到以前的状态并且激活本地中断释放自旋锁 使用建议推荐使用spin_lock_irqsave / spin_unlock_irqrestore这一组函数因为它们会保存中断状态在释放锁的时候会恢复中断状态更加安全。使用示例DEFINE_SPINLOCK(lock); // 定义并初始化一个锁 /* 线程A */ void functionA() { unsigned long flags; // 中断状态 spin_lock_irqsave(lock, flags); // 获取锁 /* 临界区 */ spin_unlock_irqrestore(lock, flags); // 释放锁 } /* 中断服务函数 */ void irq_handler() { spin_lock(lock); // 获取锁 /* 临界区 */ spin_unlock(lock); // 释放锁 }下半部BH场景如果要在下半部Bottom Half里面使用自旋锁可以使用下面这组API函数函数描述void spin_lock_bh(spinlock_t *lock)关闭下半部并获取自旋锁void spin_unlock_bh(spinlock_t *lock)打开下半部并释放自旋锁3.3 其他类型的锁在自旋锁的基础上Linux内核还衍生出了其他一些特定场合使用的锁。1读写自旋锁rwlock_t想象这样一个场景有一个学生信息表存放着学生的年龄、家庭住址、班级等信息。此表可以随时被修改和读取。如果我们使用普通的自旋锁对其进行保护每次只能一个读操作或者写操作。但是实际上此表是可以并发读取的——只需要保证在修改的时候没人读取或者在读取的时候没人修改就行了。像这样符合读/写或生产者/消费者模型的数据结构就可以使用读写自旋锁。读写自旋锁的特点一次只能允许一个写操作只能一个线程持有写锁没有写操作的时候允许多个线程持有读锁可以并发读取Linux内核使用rwlock_t结构体表示读写锁操作API分为读锁和写锁两部分读锁操作函数描述DEFINE_RWLOCK(rwlock_t lock)定义并初始化读写锁void rwlock_init(rwlock_t *lock)初始化读写锁void read_lock(rwlock_t *lock)获取读锁void read_unlock(rwlock_t *lock)释放读锁void read_lock_irq(rwlock_t *lock)禁止本地中断并且获取读锁void read_unlock_irq(rwlock_t *lock)打开本地中断并且释放读锁void read_lock_irqsave(rwlock_t *lock, unsigned long flags)保存中断状态禁止本地中断并获取读锁void read_unlock_irqrestore(rwlock_t *lock, unsigned long flags)恢复中断状态激活本地中断释放读锁void read_lock_bh(rwlock_t *lock)关闭下半部并获取读锁void read_unlock_bh(rwlock_t *lock)打开下半部并释放读锁写锁操作函数描述void write_lock(rwlock_t *lock)获取写锁void write_unlock(rwlock_t *lock)释放写锁void write_lock_irq(rwlock_t *lock)禁止本地中断并且获取写锁void write_unlock_irq(rwlock_t *lock)打开本地中断并且释放写锁void write_lock_irqsave(rwlock_t *lock, unsigned long flags)保存中断状态禁止本地中断并获取写锁void write_unlock_irqrestore(rwlock_t *lock, unsigned long flags)恢复中断状态激活本地中断释放写锁void write_lock_bh(rwlock_t *lock)关闭下半部并获取写锁void write_unlock_bh(rwlock_t *lock)打开下半部并释放写锁2顺序锁seqlock_t顺序锁是在读写锁的基础上衍生而来的。使用读写锁的时候读操作和写操作不能同时进行。而使用顺序锁的话可以允许在写的时候进行读操作也就是实现同时读写。顺序锁的特点允许读和写同时进行不允许同时进行并发的写操作如果在读的过程中发生了写操作最好重新进行读取保证数据完整性⚠️ 顺序锁保护的资源不能是指针因为写操作可能会导致指针无效读操作访问野指针可能导致系统崩溃Linux内核使用seqlock_t结构体表示顺序锁typedef struct { struct seqcount seqcount; spinlock_t lock; } seqlock_t;顺序锁写操作API函数描述DEFINE_SEQLOCK(seqlock_t sl)定义并初始化顺序锁void seqlock_init(seqlock_t *sl)初始化顺序锁void write_seqlock(seqlock_t *sl)获取写顺序锁void write_sequnlock(seqlock_t *sl)释放写顺序锁void write_seqlock_irq(seqlock_t *sl)禁止本地中断并且获取写顺序锁void write_sequnlock_irq(seqlock_t *sl)打开本地中断并且释放写顺序锁void write_seqlock_irqsave(seqlock_t *sl, unsigned long flags)保存中断状态禁止本地中断并获取写顺序锁void write_sequnlock_irqrestore(seqlock_t *sl, unsigned long flags)恢复中断状态激活本地中断释放写顺序锁void write_seqlock_bh(seqlock_t *sl)关闭下半部并获取写顺序锁void write_sequnlock_bh(seqlock_t *sl)打开下半部并释放写顺序锁顺序锁读操作API函数描述unsigned read_seqbegin(const seqlock_t *sl)读单元访问共享资源的时候调用此函数返回顺序锁的顺序号unsigned read_seqretry(const seqlock_t *sl, unsigned start)读结束以后调用此函数检查在读的过程中有没有对资源进行写操作如果有就要重读3.4 自旋锁使用注意事项综合前面的内容使用自旋锁的时候要注意以下几点① 锁的持有时间不能太长 因为等待自旋锁的线程处于自旋状态会浪费处理器时间。如果临界区比较大运行时间比较长要选择其他的并发处理方式比如信号量或互斥体。② 临界区内不能调用可能导致休眠的API函数 否则可能导致死锁。因为自旋锁会禁止内核抢占如果持有锁的线程休眠了其他线程就无法运行锁也无法释放。③ 不能递归申请自旋锁 因为一旦通过递归的方式申请一个你正在持有的锁那么你就必须自旋等待锁被释放然而你正处于自旋状态根本没法释放锁。结果就是自己把自己锁死了④ 考虑可移植性 在编写驱动程序的时候必须考虑到驱动的可移植性。因此不管你用的是单核的还是多核的SOC都将其当做多核SOC来编写驱动程序。四、信号量4.1 信号量简介如果你学过FreeRTOS或者UCOS那你对信号量一定不陌生。信号量是同步的一种方式Linux内核也提供了信号量机制常常用于控制对共享资源的访问。先给大家举一个经典的例子 某个停车场有100个停车位这100个停车位大家都可以用。当前停车数量就是一个信号量具体的停车数量就是这个信号量值。当这个值到100的时候说明停车场满了。当有车开出停车场的时候停车数量减一信号量减一此时你就可以把车停进去了停进去以后停车数量加一信号量加一。这就是一个典型的计数型信号量的例子。信号量 vs 自旋锁相比于自旋锁信号量可以使线程进入休眠状态。再给大家举个例子 A与B、C合租了一套房子这个房子只有一个厕所一次只能一个人使用。某一天早上A去上厕所了过了一会B也想用厕所。B一直在厕所门口等着等A出来 → 这相当于自旋锁B告诉A让A出来以后通知他一下然后B继续回房间睡觉 → 这相当于信号量可以看出使用信号量会提高处理器的使用效率毕竟不用一直傻乎乎的在那里自旋等待。但是信号量的开销要比自旋锁大因为信号量使线程进入休眠状态以后会切换线程切换线程就会有开销。信号量的特点① 适用于占用资源比较久的场合 因为信号量可以使等待资源的线程进入休眠状态。② 不能用于中断中 因为信号量会引起休眠而中断是不能休眠的。③ 不适合持有时间短的场景 如果共享资源的持有时间比较短频繁的休眠、切换线程引起的开销要远大于信号量带来的优势这时候用自旋锁更合适。信号量的分类根据信号量值的大小信号量可以分为计数型信号量信号量值大于1允许多个线程同时访问共享资源不能用于互斥访问二值信号量信号量值等于1同一时刻只允许一个线程访问共享资源可以用于互斥访问4.2 信号量API函数Linux内核使用semaphore结构体表示信号量struct semaphore { raw_spinlock_t lock; unsigned int count; struct list_head wait_list; };常用API函数函数描述DEFINE_SEMAPHORE(name)定义一个信号量并且设置信号量的值为1void sema_init(struct semaphore *sem, int val)初始化信号量sem设置信号量值为valvoid down(struct semaphore *sem)获取信号量会导致休眠因此不能在中断中使用int down_trylock(struct semaphore *sem)尝试获取信号量如果能获取到就获取并返回0如果不能就返回非0并且不会进入休眠int down_interruptible(struct semaphore *sem)获取信号量和down类似只是down进入休眠后不能被信号打断而此函数进入休眠后可以被信号打断void up(struct semaphore *sem)释放信号量使用示例struct semaphore sem; // 定义信号量 sema_init(sem, 1); // 初始化信号量值为1二值信号量 down(sem); // 申请信号量获取锁 /* 临界区 */ up(sem); // 释放信号量释放锁五、互斥体5.1 互斥体简介在FreeRTOS和UCOS中也有互斥体的概念。将信号量的值设置为1就可以使用信号量进行互斥访问了。虽然可以通过信号量实现互斥但是Linux提供了一个比信号量更专业的机制来进行互斥——它就是互斥体mutex。互斥访问表示一次只有一个线程可以访问共享资源。Linux内核使用mutex结构体表示互斥体struct mutex { /* 1: unlocked, 0: locked, negative: locked, possible waiters */ atomic_t count; spinlock_t wait_lock; };使用互斥体的注意事项① 不能在中断中使用mutex 因为mutex可以导致休眠中断中只能使用自旋锁。② 临界区可以调用引起阻塞的API函数 和信号量一样mutex保护的临界区可以调用引起阻塞的API函数。③ 必须由持有者释放 因为一次只有一个线程可以持有mutex因此必须由mutex的持有者释放mutex。④ 不能递归上锁和解锁 和自旋锁一样mutex也不能递归上锁和解锁。 建议在编写Linux驱动的时候遇到需要互斥访问的地方建议使用mutex。5.2 互斥体API函数函数描述DEFINE_MUTEX(name)定义并初始化一个mutex变量void mutex_init(mutex *lock)初始化mutexvoid mutex_lock(struct mutex *lock)获取mutex上锁如果获取不到就进入休眠void mutex_unlock(struct mutex *lock)释放mutex解锁int mutex_trylock(struct mutex *lock)尝试获取mutex如果成功就返回1如果失败就返回0int mutex_is_locked(struct mutex *lock)判断mutex是否被获取如果是就返回1否则返回0int mutex_lock_interruptible(struct mutex *lock)使用此函数获取信号量失败进入休眠以后可以被信号打断使用示例struct mutex lock; // 定义一个互斥体 mutex_init(lock); // 初始化互斥体 mutex_lock(lock); // 上锁 /* 临界区 */ mutex_unlock(lock); // 解锁六、总结与对比到这里Linux内核中常用的几种并发与竞争处理机制就都讲完了。下面给大家做一个总结和对比帮助大家在实际开发中选择合适的机制。各种机制对比机制适用场景能否休眠能否用于中断持有时间开销原子操作整型变量或位操作❌ 不能✅ 可以极短极小自旋锁短时间的轻量级加锁❌ 不能✅ 可以需关中断短小信号量长时间持有资源✅ 可以❌ 不能可长可短较大互斥体互斥访问共享资源✅ 可以❌ 不能可长可短较大选型建议如果只是简单的整型变量或位操作 → 选原子操作如果临界区很小持有时间很短 → 选自旋锁如果临界区较大持有时间较长 → 选信号量或互斥体如果需要互斥访问同一时间只能一个线程访问 → 优先选互斥体如果在中断中使用 → 只能选原子操作或自旋锁记得关中断学习建议并发与竞争是Linux驱动开发中非常重要的一个知识点也是初学者容易忽略的地方。建议大家先理解概念搞清楚什么是并发、什么是竞争为什么会产生这些问题再理解原理每种机制是怎么解决并发问题的各自的优缺点是什么最后动手实践写一些简单的驱动代码实际使用这些机制加深理解Linux内核还有很多其他的处理并发和竞争的机制这篇文章主要讲解了常用的原子操作、自旋锁、信号量和互斥体。以后我们在编写Linux驱动的时候会频繁使用到这几种机制希望大家能够深入理解。参考资料《I.MX6U 嵌入式Linux驱动开发指南》——正点原子Linux内核源码