正确实现 C++ 自旋锁:TAS、TTAS 与 PAUSE
网上最常见的自旋锁实现在一个微妙但可测的问题上出错了。它们在一个读-改-写(RMW)操作上自旋,每次迭代都强迫缓存行进入 exclusive 状态。在竞争下,这会产生与等待线程数量成正比的缓存一致性流量 —— 恰好与你想要的相反。
错误的做法:Test-and-Set(TAS)
struct tas_lock {
std::atomic<bool> lock_{false};
void lock() {
while (lock_.exchange(true, std::memory_order_acquire))
;
}
void unlock() {
lock_.store(false, std::memory_order_release);
}
};
exchange 是一次读-改-写。在 x86 上它编译为 XCHG,要求缓存行处于 exclusive(modified)状态。当 N 个线程用 exchange 自旋时,每次迭代都会产生 N−1 次缓存行转移 —— 相对于等待者数量的二次级别的一致性流量。
正确的做法:Test-and-Test-and-Set(TTAS)
修复办法是:在一个普通的 load 上自旋(只要求 shared 状态),只有当 load 暗示锁可能可用时,才去尝试 RMW:
struct ttas_lock {
std::atomic<bool> lock_{false};
void lock() {
for (;;) {
// 乐观快路径:立刻尝试抓锁
if (!lock_.exchange(true, std::memory_order_acquire))
return;
// 慢路径:在只读 load 上自旋,直到锁看起来空闲
while (lock_.load(std::memory_order_relaxed))
;
}
}
void unlock() {
lock_.store(false, std::memory_order_release);
}
};
多个线程同时读取同一条缓存行开销很低 —— 它们都以 shared 状态持有。只有当持锁线程调用 unlock() 时,等待者才会争抢 exclusive 状态,然后恰好一个胜出。这把一致性流量从”每次循环 O(N)“降到”每次获取锁 O(N)“。
减轻加载-存储单元压力:PAUSE
SMT(Hyper-Threading)在同一物理核上的逻辑核之间共享执行资源。一个紧凑的 load 循环会饿死同核上的另一个逻辑核。
PAUSE 指令(x86)/ YIELD(ARM)告诉 CPU:“我在自旋” —— 它引入一小段流水线延迟,减轻共享执行单元的压力;在某些微架构上,它还能改进投机流水线中对内存序违反的检测:
void lock() {
for (;;) {
if (!lock_.exchange(true, std::memory_order_acquire))
return;
while (lock_.load(std::memory_order_relaxed)) {
__builtin_ia32_pause(); // x86
// asm volatile("yield" ::: "memory"); // ARM
}
}
}
基准:4 核 / 8 线程,1 亿次 lock-unlock
| 线程数 | TTAS + PAUSE(ns/op) | TAS(ns/op) |
|---|---|---|
| 1 | 7 | 7 |
| 2 | 28 | 43 |
| 4 | 112 | 205 |
| 6 | 241 | 509 |
| 8 | 442 | 854 |
在 8 个线程时,TAS 几乎慢 2 倍。perf 计数器证实了原因:
| 指标 | TTAS + PAUSE | TAS |
|---|---|---|
| L1 dcache load miss | 27.2% | 47.7% |
| 原子 lock 指令数 | 161M | 552M |
TAS 实现发出的原子指令多 3.4 倍,缓存未命中多 1.7 倍。
完整实现
struct spinlock {
std::atomic<bool> lock_{false};
void lock() noexcept {
for (;;) {
if (!lock_.exchange(true, std::memory_order_acquire))
return;
while (lock_.load(std::memory_order_relaxed))
__builtin_ia32_pause();
}
}
bool try_lock() noexcept {
// 先做 relaxed load:锁被持有时避免一次没必要的 RMW
return !lock_.load(std::memory_order_relaxed)
&& !lock_.exchange(true, std::memory_order_acquire);
}
void unlock() noexcept {
lock_.store(false, std::memory_order_release);
}
};
try_lock 的实现对 while (!m.try_lock()) 这种自旋重要 —— 如果没有开头那一次 relaxed 读,每次 try_lock 都会发射一条 XCHG,又退化成 TAS 循环。
什么时候不该用自旋锁
自旋锁适合那些在几十纳秒内完成、且绝不能阻塞的临界区(内核代码、中断处理器、实时系统)。对持有时间更长的任何场景,std::mutex 才是正确选择:操作系统可以把等待线程切出,而不必烧 CPU。
绝不要把 volatile 当作自旋锁原语来用。它既不提供原子性,也不提供内存序。