class AtomicLock
{
std::atomic_flag flag;
public:
void lock() {
while (flag.test_and_set(std::memory_order_acquire))
flag.wait(true, std::memory_order_acquire);
}
void unlock() {
flag.clear(std::memory_order_release);
flag.notify_one();
}
};
class ThreadAtomicLock
{
static std::atomic_long id_generator;
std::atomic<long> owner_thread = -1;
static long get_id() {
thread_local auto id = id_generator++;
return id;
}
public:
void lock() {
long expected = -1;
while (!owner_thread.compare_exchange_weak(expected, get_id(), std::memory_order_acquire)) {
owner_thread.wait(expected, std::memory_order_acquire);
expected = -1;
}
}
void unlock() {
long expected = get_id();
assert (
owner_thread.compare_exchange_strong(expected, -1, std::memory_order_release) &&
"unlock of unowned lock"
);
owner_thread.notify_one();
}
};
std::atomic_long ThreadAtomicLock::id_generator = 0;
void testAtomicLock() {
std::vector<std::thread> threads;
std::atomic<bool> flag = false;
// AtomicLock lock;
ThreadAtomicLock lock;
for (int i = 0; i < 100; i++) {
threads.emplace_back([&] {
while (!flag.load(std::memory_order_acquire));
lock.lock();
std::cout << "Hello";
std::cout << " from thread ";
std::cout << std::this_thread::get_id() << " ";
int n = 10;
while (n--) std::cout << "=";
std::cout << "\n";
lock.unlock();
});
}
flag.store(true, std::memory_order_release);
for (auto& t : threads) t.join();
}
提供的AtomicLock
类可以工作,但我意识到这将允许任何线程解锁,即使它不拥有该锁。
该类ThreadAtomicLock
是为了确保只有所有者线程才能解锁而进行的尝试。
虽然它工作正常,但我怀疑这是否是实现锁所需的全部。这种实现正确吗?可以放宽一些限制吗?
我可以在解锁功能中
compare_exchange_strong
替换吗?compare_exchange_weak
这里的 ID 生成机制为每个线程存储一个额外的 ID,并执行原子操作为每个线程生成一个 ID。虽然微不足道,但我很好奇是否有更好的实现方法。
可以在锁定函数中的调用
memory_order_relaxed
中使用wait
,因为此时我们不需要进一步同步?
(1) 在我看来是正确的。C++20
.wait
并公开库互斥.notify_one/all
实现利用的功能(如 Linuxfutex
),以获得操作系统辅助的睡眠/唤醒。一些实现在使用 进行系统调用之前会旋转重试几次.wait
。您缺少的主要内容是
unlock
当没有其他线程等待时避免进行任何系统调用的方案。在无争用的情况下,这就是现代 x86 上锁定/解锁在最佳情况下可能需要 40 个周期(无争用且 L1d 缓存命中锁本身和关键部分中访问的数据。例如,如果同一线程反复获取和释放同一锁)与无争用涉及缓存未命中的情况需要几百个周期与系统调用需要几千个时钟周期之间的区别(尤其是现在任何系统调用都需要进行 Spectre 缓解)。Glibc 的 pthread mutex 函数是开源的,因此您可以查看并了解它们如何让线程在进入睡眠状态之前将锁编码到锁整数中。https ://codebrowser.dev/glibc/glibc/nptl/pthread_mutex_lock.c.html#211看起来像一个相关的评论,但我不知道是否有一个好的设计文档或指南可以解释它。(或任何其他实现的指南。)
(2)
compare_exchange_weak
可能会因中断或错误共享而意外失败。您需要这样做,compare_exchange_strong
因为您没有在重试循环中使用它。(3) 由于您为每个线程生成一次线程 ID,因此您不需要将其设置为
long
。int
几乎肯定没问题。虽然我猜想随着线程被销毁和重新创建,32 位计数器可能会在很长时间后溢出,而不会出现线程数量过大的现象。所以
long
还不够大,可能最好用来uintptr_t
在基本上所有 64 位系统上获取 64 位计数器,包括 Windows x64,其中long
是 32 位类型。在 32 位系统上使用 32 位计数器似乎比较合理,这是一个很好的权衡,可以将锁定字保持在只有一个指针宽度的宽度,基本上所有现代系统都原生支持原子 RMW。(大多数也支持 2 指针宽度的原子 RMW,但不是全部。)
再次查看 Glibc 的 pthreads 实现,我之前链接的相同函数用于
pid_t id = THREAD_GETMEM (THREAD_SELF, tid);
从线程本地存储中读取线程 ID。(可能比普通thread_local
变量少一个间接级别?我猜 TID 可能存储在线程信息块中,因此在 x86-64 GNU/Linux 上,可能可以使用寻址模式[fs: 0x40]
或类似的从段基数的小偏移量来读取。如果该线程 ID 来自操作系统,它已经是一个唯一的 16 位或 32 位整数,保证在当前运行的线程中是唯一的。(4) 是的,
wait(relaxed)
这里没问题。当您可以直接开始读取通知您的线程生成的数据,而无需先对其写入的变量执行原子操作时,更严格的命令很有用。有关的:
.wait()
(futex
)之前进行旋转重试,则旋转为只读。以及 x86pause
(_mm_pause()
)如何适应这一点。(C++ 没有可移植的方式来做到这一点,不像 Rust 有
std::hint::spin_loop()
;请参阅是否有类似 Rust 的 core::hint::spin_loop for C++?了解一些可移植库。)