目录
  1. 1. 一、原子操作(Atomic Operations)
    1. 1.1. 1.1 基本原理
    2. 1.2. 1.2 在 Binder 驱动中的应用
  2. 2. 二、自旋锁(Spinlock)
    1. 2.1. 2.1 基本原理
    2. 2.2. 2.2 Binder 驱动中的 Spinlock 分层
    3. 2.3. 2.3 Spinlock 在用户空间的使用
  3. 3. 三、互斥锁(Mutex)
    1. 3.1. 3.1 基本原理
    2. 3.2. 3.2 Binder 驱动中的 Mutex
  4. 4. 四、RCU(Read-Copy-Update)
    1. 4.1. 4.1 基本原理
    2. 4.2. 4.2 Binder 驱动中的 RCU
  5. 5. 五、Futex(Fast Userspace Mutex)
    1. 5.1. 5.1 基本原理
    2. 5.2. 5.2 ART 中 Object.wait/notify 的 Futex 实现
  6. 6. 六、Binder 驱动的锁层级设计
  7. 7. 七、核心面试题
【深入内核篇】经典同步机制

并发和同步是操作系统内核设计的核心问题,也是 Android 系统每一个层次——从 Binder 驱动到 ART 虚拟机——都不可回避的主题。本文以 Android/Linux 为背景,深入分析 spinlock、mutex、RCU、atomic 操作和 futex 这五种经典同步机制,并结合 Binder 驱动和 ART 中的实际应用进行剖析。

一、原子操作(Atomic Operations)

1.1 基本原理

原子操作是最底层的同步原语,由 CPU 指令(如 x86 的 LOCK CMPXCHG,ARM 的 LDREX/STREX)保证操作的不可分割性。

// include/linux/atomic.h 和 arch/arm64/include/asm/atomic.h
// atomic_t 基于 int,atomic64_t 基于 long long

static inline void atomic_add(int i, atomic_t *v)
{
// ARM64 实现(使用 LSE 扩展如果可用)
asm volatile("stadd %w[i], %[v]\n"
: [v] "+Q" (v->counter)
: [i] "r" (i));
}

1.2 在 Binder 驱动中的应用

Binder 驱动中的引用计数大量使用原子操作:

// drivers/android/binder.c
static void binder_inc_node(struct binder_node *node, int strong, int weak,
struct list_head *target_list)
{
if (strong) {
// 原子地增加强引用计数
if (target_list)
atomic_inc(&node->tmp_refs);
else
atomic_inc(&node->internal_strong_refs);
}
if (weak)
atomic_inc(&node->local_weak_refs);
}

static bool binder_dec_node(struct binder_node *node, int strong, int weak)
{
if (strong) {
bool free_node;
// 原子地减少强引用计数,并检查是否为 0
if (atomic_dec_and_test(&node->internal_strong_refs)) {
free_node = true;
}
}
if (weak && atomic_dec_and_test(&node->local_weak_refs)) {
// 弱引用也降为 0,可以释放 binder_node
}
}

使用原子操作避免了对多进程共享的 binder_node 引用计数的锁争用。

二、自旋锁(Spinlock)

2.1 基本原理

Spinlock 是最简单的锁实现。当锁被持有时,其他 CPU 核心在其上”自旋”(不断轮询)直到锁被释放。适用于临界区极短的场景。

// include/linux/spinlock.h
// raw_spin_lock 和 raw_spin_unlock

static inline void raw_spin_lock(raw_spinlock_t *lock)
{
preempt_disable(); // 关闭内核抢占
do {
// 原子地尝试获取锁
} while (!arch_spin_trylock(&lock->raw_lock));
}

static inline void raw_spin_unlock(raw_spinlock_t *lock)
{
arch_spin_unlock(&lock->raw_lock);
preempt_enable(); // 重新打开内核抢占
}

2.2 Binder 驱动中的 Spinlock 分层

Binder 驱动使用复杂的锁层级来避免死锁。最外层是 binder_procs_lock(保护全局 binder_procs 列表),内层是进程级别的锁,最内层是 binder_node 级别的锁。

// drivers/android/binder.c
// 锁层级定义
enum {
BINDER_LOOPER_STATE_NEED_RETURN = 0x01,
};

// 全局进程表锁
static DEFINE_MUTEX(binder_procs_lock);

// 每个 binder_proc 有自己的锁
// proc->outer_lock 保护 proc 级别
// proc->inner_lock 保护线程/节点级别

// binder_inner_proc_lock(proc) - 获取进程内层锁
// 调用时需要确保已持有 outer_lock 或明确表明只取 inner_lock
static void binder_inner_proc_lock(struct binder_proc *proc)
{
mutex_lock(&proc->inner_lock);
}

2.3 Spinlock 在用户空间的使用

Android 的 pthread 实现(bionic libc)中,pthread_mutex_t 的快速路径使用原子操作,慢速路径才进入 futex。pthread_spinlock_t 则是纯粹的用户空间 spinlock。

三、互斥锁(Mutex)

3.1 基本原理

Mutex 允许锁持有者睡眠(与 spinlock 不同),适用于临界区较长的场景。在 Linux 内核中通过 mutex 结构实现:

// include/linux/mutex.h
struct mutex {
atomic_long_t owner; // 锁持有者的 task_struct 指针
raw_spinlock_t wait_lock;
struct list_head wait_list; // 等待队列
};

// 加锁(可能会睡眠)
void mutex_lock(struct mutex *lock);

// 尝试加锁,不睡眠
int mutex_trylock(struct mutex *lock);

// 解锁
void mutex_unlock(struct mutex *lock);

3.2 Binder 驱动中的 Mutex

// drivers/android/binder.c

// 获取节点的锁(mutex)
static int binder_node_lock(struct binder_node *node, bool internal)
{
if (internal)
mutex_lock(&node->internal_lock);
else
mutex_lock(&node->lock);
return 0;
}

// 在 binder_transaction 中
static void binder_transaction(...)
{
// 获取目标进程锁
binder_proc_lock(proc);
binder_inner_proc_lock(target_proc);

// ... 操作 binder_node 和 binder_ref ...

binder_inner_proc_unlock(target_proc);
binder_proc_unlock(proc);
}

四、RCU(Read-Copy-Update)

4.1 基本原理

RCU 是一种为”读写比例极大”的场景优化的同步机制。基本思想是:读者无锁访问数据(无需获取任何锁),写者先复制数据、修改副本、然后原子地更新指针。

// include/linux/rcupdate.h

// 读者侧
rcu_read_lock();
// 访问受 RCU 保护的数据(无锁,只需禁用抢占)
struct my_data *p = rcu_dereference(global_ptr);
// ... 使用 p ...
rcu_read_unlock();

// 写者侧
struct my_data *new_data = kmalloc(...);
*new_data = *old_data;
new_data->field = new_value;
// 原子地替换指针
rcu_assign_pointer(global_ptr, new_data);
// 等待所有读者完成
synchronize_rcu();
// 释放旧数据
kfree(old_data);

4.2 Binder 驱动中的 RCU

Binder 驱动在全局 binder_procs 哈希表的查找中使用 RCU。查找 binder_proc 是高频只读操作,而添加/删除 binder_proc 相对很少:

// drivers/android/binder.c
// 查找进程——无需锁!
struct binder_proc *binder_get_proc_rcu(struct binder_proc *proc)
{
// 使用 RCU 保护的哈希表查找
struct binder_proc *p;
hash_for_each_possible_rcu(binder_procs, p, proc_node, pid);
return p;
}

五、Futex(Fast Userspace Mutex)

5.1 基本原理

Futex 是 Linux 提供的最重要的用户空间同步系统调用。它在无竞争时完全在用户空间运行(原子操作),只在有竞争时才陷入内核:

// include/uapi/linux/futex.h
// futex 系统调用原型
long sys_futex(void __user *uaddr, int op, u32 val,
struct __kernel_timespec __user *utime,
void __user *uaddr2, u32 val3);

// 常用操作
#define FUTEX_WAIT 0 // 在 uaddr 处等待(如果 *uaddr == val,则睡眠)
#define FUTEX_WAKE 1 // 唤醒等待在 uaddr 上的最多 val 个线程
#define FUTEX_REQUEUE 3 // 将等待者从 uaddr 移到 uaddr2

5.2 ART 中 Object.wait/notify 的 Futex 实现

ART 虚拟机使用 futex 来实现 Java 的 Object.wait() / Object.notify()

// art/runtime/base/mutex.cc
// Mutex 的底层实现使用了 futex

void Mutex::Lock(Thread* self) {
if (state_and_contenders_.fetch_or(1, std::memory_order_acquire) == 0) {
return; // 无竞争:快速路径,纯原子操作
}
// 有竞争:进入慢速路径,使用 futex
LockSlowPath(self);
}

void Mutex::LockSlowPath(Thread* self) {
do {
// 尝试 CAS 获取锁
if (state_and_contenders_.compare_exchange_weak(...)) {
return;
}
// 失败则调用 futex(FUTEX_WAIT) 进入睡眠
futex(state_and_contenders_.Address(), FUTEX_WAIT_PRIVATE, ...);
} while (true);
}

void Mutex::Unlock(Thread* self) {
if (state_and_contenders_.fetch_and(~1, std::memory_order_release) == 0) {
return; // 无等待者
}
// 有等待者:唤醒
futex(state_and_contenders_.Address(), FUTEX_WAKE_PRIVATE, 1);
}

ART 的 Monitor(对应 Java synchronized 关键字的实现)同样依赖 futex。Monitor::MonitorEnter 在无竞争时走快速路径(CAS),有竞争时通过 futex 睡眠等待;Monitor::Notify 通过 futex wake 唤醒等待者:

// art/runtime/monitor.cc
void Monitor::Wait(Thread* self, int64_t ms, int32_t ns, ...) {
// 通过 Mutex 实现,底层使用 futex(FUTEX_WAIT)
}

void Monitor::Notify(Thread* self) {
// 从 wait_set_ 中取出一个线程,通过 Mutex 的 unlock 唤醒
// 底层使用 futex(FUTEX_WAKE)
}

六、Binder 驱动的锁层级设计

Binder 驱动中有一个严格的锁获取顺序,防止死锁:

binder_procs_lock      (全局互斥锁)
└→ proc->outer_lock (进程级互斥锁)
└→ proc->inner_lock (进程内部互斥锁)
└→ node->lock (binder_node 互斥锁)
└→ node->internal_lock

所有代码路径必须按这个顺序获取锁。Binder 驱动在开发初期曾面临严重的死锁问题,后续通过严格的 lockdep 注解和代码审查解决了这些问题。

// drivers/android/binder.c
// lockdep 注解示例
static void binder_inner_proc_lock(struct binder_proc *proc)
{
mutex_lock_nested(&proc->inner_lock, BINDER_LOOPER_STATE_NEED_RETURN);
}

七、核心面试题

Q1:spinlock 和 mutex 的核心区别是什么?什么场景用哪个?

核心区别:spinlock 等待时 CPU 忙等(不释放 CPU),mutex 等待时线程进入睡眠(释放 CPU)。因此:spinlock 适用于临界区极小(微秒级)且不能睡眠的场景(如中断处理程序),mutex 适用于临界区可能较长、允许睡眠的场景。关键判断:如果临界区中有可能睡眠的操作(如内存分配 GFP_KERNEL、copy_from_user),必须用 mutex 而不能用 spinlock(否则可能死锁)。

Q2:ART 为什么选择基于 futex 实现锁而不是仅依赖 pthread_mutex_t?

ART 需要更精细地控制锁行为:(1) 性能——pthread_mutex 的函数调用开销在某些 ARM 芯片上较大;ART 通过在用户空间用原子变量 CAS 可以避免大量情况下的系统调用。(2) 线程状态管理——ART 需要知道线程是否在等待锁(用于线程 suspend、GC safepoint 等),这需要与运行时深度集成。(3) Monitor 膨胀(thin lock → fat lock)需要精确控制底层 futex 操作,普通 pthread_mutex 无法支持。

Q3:RCU 中的 synchronize_rcu 是如何知道所有读者都完成的?为什么说 RCU 读者是无锁的?

RCU 利用 Linux 内核的抢占调度:rcu_read_lock() 实际只做 preempt_disable(),禁止内核抢占;rcu_read_unlock()preempt_enable()synchronize_rcu() 等待所有 CPU 至少经历一次上下文切换(即一个 grace period),这保证了所有在 rcu_read_lock/unlock 区间内的读者都已离开临界区。读者无锁是因为它仅禁用了抢占,没有获取任何锁——这意味着读者零争用开销,非常适合 Binder 驱动中 binder_procs 的查找这种高频只读场景。

AOSP 核心路径参考:

  • drivers/android/binder.c — Binder 驱动锁层级
  • art/runtime/base/mutex.cc — ART Mutex(基于 futex)
  • art/runtime/monitor.cc — ART Monitor(synchronized 实现)
  • include/linux/spinlock.h — Linux spinlock
  • include/linux/mutex.h — Linux mutex
  • include/linux/rcupdate.h — Linux RCU
  • include/uapi/linux/futex.h — Futex 系统调用接口
打赏
  • 微信
  • 支付宝

评论