Linux rt_mutex实时互斥锁优先级继承与pi链rt_mutex是Linux内核专为实时任务设计的互斥锁,其核心机制是优先级继承(Priority Inheritance, PI),用于解决经典的无优先级继承互斥锁导致的优先级反转问题.一、优先级反转问题当低优先级任务持有锁,中优先级任务抢占CPU,高优先级任务等待锁时:c/* 优先级: T_HIGH(99) T_MID(50) T_LOW(1) *//* T_LOW持有锁L *//* T_HIGH试图获取锁L - 被阻塞 *//* T_MID抢占T_LOW - T_LOW无法释放锁L *//* T_HIGH无限期等待 优先级反转 */rt_mutex通过优先级继承解决:当T_HIGH被T_LOW持有的锁阻塞时,T_LOW临时继承T_HIGH的优先级(99),从而不会被T_MID抢占,可以尽快释放锁.二、rt_mutex数据结构cstruct rt_mutex {raw_spinlock_t wait_lock; /* 保护pi_chain的自旋锁 */struct rb_root_cached waiters; /* 红黑树组织等待者,按优先级排序 */struct task_struct *owner; /* 当前所有者, NULL表示未锁定 *//* 低2位用作标志 */};/* rt_mutex的等待者 */struct rt_mutex_waiter {struct rb_node tree_entry; /* 在rt_mutex等待树中的节点 */struct rb_node pi_tree_entry;/* 在pi链(任务pi_waiters)中的节点 */struct task_struct *task; /* 等待任务 */struct rt_mutex *lock; /* 等待的锁 */int prio; /* 等待者的优先级 */};/* 任务结构体中的PI相关字段 */struct task_struct {...struct rt_mutex_waiter *pi_blocked_on; /* 当前任务被哪个waiter阻塞 */struct rb_root_cached pi_waiters; /* 该任务阻塞了哪些waiter(按优先级) */int prio; /* 当前动态优先级(受PI影响) */int normal_prio; /* 正常优先级(不受PI影响) */...};三、pi_chain(优先级继承链)pi链是被阻塞任务构成的树状结构.Task A等待锁L1(被Task B持有),Task B等待锁L2(被Task C持有),构成A-B-C的pi链.c/* pi链的建立过程 */static int task_blocks_on_rt_mutex(struct rt_mutex *lock,struct rt_mutex_waiter *waiter,struct task_struct *task){struct task_struct *owner rt_mutex_owner(lock);struct rt_mutex_waiter *top_waiter;/* 将waiter加入锁的等待红黑树 */rt_mutex_enqueue(lock, waiter);/* 将waiter加入owner的pi_waiters红黑树 */rt_mutex_enqueue_pi(owner, waiter);/* 获取owner的pi_waiters中优先级最高的waiter */top_waiter rt_mutex_top_waiter(lock);/* 递归调整pi链上所有任务的优先级 */rt_mutex_adjust_prio_chain(owner, top_waiter, lock, waiter, task);/* 递归调用: 如果owner也被其他锁阻塞, 继续向上调整 */if (rt_mutex_owner(owner-pi_blocked_on-lock))task_blocks_on_rt_mutex(owner-pi_blocked_on-lock,owner-pi_blocked_on,owner);}四、优先级传播的核心算法rt_mutex_adjust_prio_chain是pi链中最复杂的函数:cstatic int rt_mutex_adjust_prio_chain(struct task_struct *task,struct rt_mutex_waiter *orig_waiter,struct task_struct *top_task){struct rt_mutex *lock;struct rt_mutex_waiter *waiter, *top_waiter;int depth 0;retry:depth;/* 获取任务当前持有的锁 */lock task-pi_blocked_on-lock;/* 计算该锁上的最高优先级等待者 */top_waiter rt_mutex_top_waiter(lock);/* 如果task的优先级已经适配, 无需调整 */if (task-prio min(top_waiter-prio, task-normal_prio))goto out_unlock_pi;/* 调整task的优先级到需要继承的值 */task-prio min(top_waiter-prio, task-normal_prio);/* 如果task也被阻塞,递归向pi链上游调整 */if (rt_mutex_owner(lock)) {struct task_struct *next_owner rt_mutex_owner(lock);/* 递归调用, depth限制避免无限循环 */raw_spin_lock(next_owner-pi_lock);waiter next_owner-pi_blocked_on;if (waiter) {raw_spin_unlock(lock-wait_lock);/* 向链上游推进 */rt_mutex_adjust_prio_chain(next_owner, waiter, top_task);raw_spin_lock(lock-wait_lock);}raw_spin_unlock(next_owner-pi_lock);}out_unlock_pi:return depth;}五、锁获取和释放锁获取:cstatic int rt_mutex_fastlock(struct rt_mutex *lock, int state){if (likely(rt_mutex_cmpxchg_acquire(lock, NULL, current)))return 0; /* 快路径: 无竞争, 直接获取 */return rt_mutex_slowlock(lock, state); /* 慢路径: 进入pi链 */}static noinline int rt_mutex_slowlock(struct rt_mutex *lock, int state){struct rt_mutex_waiter waiter;int ret 0;/* 初始化waiter并加入等待树 */rt_mutex_init_waiter(waiter, false);waiter.task current;raw_spin_lock(lock-wait_lock);/* 加入pi链 */if (unlikely(!rt_mutex_owner(lock))) {/* 锁已释放, 获取锁 */rt_mutex_set_owner(lock, current);raw_spin_unlock(lock-wait_lock);return 0;}/* 加入等待队列并建立pi链 */task_blocks_on_rt_mutex(lock, waiter, current);/* 循环直到锁可用 */for (;;) {set_current_state(state);if (rt_mutex_owner(lock) current) {/* 成功获取 */break;}/* 睡眠 */raw_spin_unlock(lock-wait_lock);schedule();raw_spin_lock(lock-wait_lock);}__set_current_state(TASK_RUNNING);/* 从pi链中移除 */task_unblocks_on_rt_mutex(lock, waiter, current);raw_spin_unlock(lock-wait_lock);return ret;}锁释放:cstatic void __rt_mutex_unlock(struct rt_mutex *lock){struct task_struct *owner rt_mutex_owner(lock);/* 清除所有者 */rt_mutex_set_owner(lock, NULL);/* 如果有等待者, 将锁传递给最高优先级的等待者 */if (!rt_mutex_has_waiters(lock))return;/* 选择优先级最高的等待者 */struct rt_mutex_waiter *top rt_mutex_top_waiter(lock);struct task_struct *next_task top-task;/* 唤醒下一个等待者 */rt_mutex_set_owner(lock, next_task);wake_up_process(next_task);}六、rt_mutex与RCU PI boostingRT内核中RCU也依赖rt_mutex的PI机制解决RCU read-side的优先级反转:c/* RCU read-side critical section中, 如果rcu_gp_task被低优先级任务阻塞 */void rcu_boost(void){struct task_struct *t;struct rb_node *rbp;/* 遍历所有等待RCU GP的任务 */for_each_rcu_boost_task(t) {/* 通过rt_mutex提升任务优先级 */rt_mutex_lock(t-rcu_boost_mutex);/* t的优先级被提升, GP可以完成 */rt_mutex_unlock(t-rcu_boost_mutex);}}七、PI链的复杂性pi链的最坏情况复杂度:- pi链深度限制: 内核通过MAX_RT_PRIO(100)间接限制,但实际pi链可能跨多个锁形成树状结构- pi链调整的O(n)复杂度: 每次锁获取/释放都需要从叶子节点到根节点的优先级传播- 锁嵌套导致pi链交织: Task A持有L1等待L2, Task B持有L2等待L1 - 死锁检测通过pi链的循环检测实现c/* 死锁检测: 在pi链中检查是否存在环 */static int rt_mutex_check_deadlock(struct rt_mutex_waiter *waiter){struct task_struct *task waiter-task;struct rt_mutex *lock waiter-lock;/* 沿着pi链向上遍历 */while (task) {if (task current) {/* 发现自身, 死锁! */return -EDEADLK;}if (!task-pi_blocked_on)break;task rt_mutex_owner(task-pi_blocked_on-lock);}return 0;}rt_mutex的优先级继承机制解决了实时系统中的优先级反转问题,pi链的递归传播保证了系统中所有受影响的锁相关的任务都能及时调整到正确的优先级.但pi链的维护带来了显著的复杂度,这是为实时性付出的必要代价.