概述
在开发过程中, 我们经常会遇到并发问题, 解决并发问题通常的方法是加锁保护, 比如常用的 spinlock,mutex 或者 rwlock, 当然也可以采用无锁编程, 对实现要求就比较高了. 对于任何一个共享变量, 只要有读写并发, 就需要加锁保护, 而读写并发通常就会面临一个基本问题, 写阻塞读, 或则写优先级比较低, 就会出现写饿死的现象. 这些加锁的方法可以归类为悲观锁方法, 今天介绍一种乐观锁机制来控制并发, 每个线程通过线程局部变量缓存共享变量的副本, 读不加锁, 读的时候如果感知到共享变量发生变化, 再利用共享变量的最新值填充本地缓存; 对于写操作, 则需要加锁, 通知所有线程局部变量发生变化. 所以, 简单来说, 就是读不加锁, 读写不冲突, 只有写写冲突. 这个实现逻辑来源于 Rocksdb 的线程局部缓存实现, 下面详细介绍 Rocksdb 的线程局部缓存 ThreadLocalPtr 的原理.
线程局部存储 (TLS)
简单介绍下线程局部变量, 线程局部变量就是每个线程有自己独立的副本, 各个线程对其修改相互不影响, 虽然变量名相同, 但存储空间并没有关系. 一般在 Linux 下, 我们可以通过以下三个函数来实现线程局部存储创建, 存取功能.
- int pthread_key_create(pthread_key_t *key, void (*destr_function) (void*)),
- int pthread_setspecific(pthread_key_t key, const void *pointer) ,
- void * pthread_getspecific(pthread_key_t key)
ThreadLocalPtr 类
有时候, 我们并不想要各个线程独立的变量, 我们仍然需要一个全局变量, 线程局部变量只是作为全局变量的缓存, 用以缓解并发. 在 RocksDB 中 ThreadLocalPtr 这个类就是来干这个事情的. ThreadLocalPtr 类包含三个内部类, ThreadLocalPtr::StaticMeta,ThreadLocalPtr::ThreadData 和 ThreadLocalPtr::Entry. 其中 StaticMeta 是一个单例, 管理所有的 ThreadLocalPtr 对象, 我们可以简单认为一个 ThreadLocalPtr 对象, 就是一个线程局部存储 (ThreadLocalStorage). 但实际上, 全局我们只定义了一个线程局部变量, 从 StaticMeta 构造函数可见一斑. 那么全局需要多个线程局部缓存怎么办, 实际上是在局部存储空间做文章, 线程局部变量实际存储的是 ThreadData 对象的指针, 而 ThreadData 里面包含一个数组, 每个 ThreadLocalPtr 对象有一个独立的 id, 在其中占有一个独立空间. 获取某个变量局部缓存时, 传入分配的 id 即可, 每个 Entry 中 ptr 指针就是对应变量的指针.
- ThreadLocalPtr::StaticMeta::StaticMeta() : next_instance_id_(0), head_(this) {
- if (pthread_key_create(&pthread_key_, &OnThreadExit) != 0) {
- abort();
- }
- ......
- }
- void* ThreadLocalPtr::StaticMeta::Get(uint32_t id) const {
- auto* tls = GetThreadLocal();
- return tls->entries[id].ptr.load(std::memory_order_acquire);
- }
- struct Entry {
- Entry() : ptr(nullptr) {}
- Entry(const Entry& e) : ptr(e.ptr.load(std::memory_order_relaxed)) {}
- std::atomic<void*> ptr;
- };
整体结构如下: 每个线程有一个线程局部变量 ThreadData, 里面包含了一组 ThreadLocalPtr 的指针, 对应的是多个变量, 同时 ThreadData 之间相互通过指针串联起来, 这个非常重要, 因为执行写操作时, 写线程需要修改所有 thread 的局部缓存值来通知共享变量发生变化了.
- ---------------------------------------------------
- | | instance 1 | instance 2 | instnace 3 |
- ---------------------------------------------------
- | thread 1 | void* | void* | void* | <- ThreadData
- ---------------------------------------------------
- | thread 2 | void* | void* | void* | <- ThreadData
- ---------------------------------------------------
- | thread 3 | void* | void* | void* | <- ThreadData
- struct ThreadData {
- explicit ThreadData(ThreadLocalPtr::StaticMeta* _inst)
- : entries(), inst(_inst) {}
- std::vector<Entry> entries;
- ThreadData* next;
- ThreadData* prev;
- ThreadLocalPtr::StaticMeta* inst;
- };
读写无并发冲突
现在说到最核心的问题, 我们如何实现利用 TLS 来实现本地局部缓存, 做到读不上锁, 读写无并发冲突. 读, 写逻辑和并发控制主要通过 ThreadLocalPtr 中通过 3 个关键接口 Swap,CompareAndSwap 和 Scrape 实现. 对于 ThreadLocalPtr<Type*> 变量来说, 在具体的线程局部存储中, 会保存 3 中不同类型的值:
1). 正常的 Type* 类型指针;
2). 一个 Type * 类型的 Dummy 变量, 记为 InUse;
3). nullptr 值, 记为 obsolote;
读线程通过 Swap 接口来获取变量内容, 写线程则通过 Scrape 接口, 遍历并重置所有 ThreadData 为 (obsolote)nullptr, 达到通知其他线程局部缓存失效的目的. 下次读线程再读取时, 发现获取的指针为 nullptr, 就需要重新构造局部缓存.
- // 获取某个 id 对应的局部缓存内容, 每个 ThreadLocalPtr 对象有单独一个 id, 通过单例 StaticMeta 对象管理.
- void* ThreadLocalPtr::StaticMeta::Swap(uint32_t id, void* ptr) {
- // 获取本地局部缓存
- auto* tls = GetThreadLocal();
- return tls->entries[id].ptr.exchange(ptr, std::memory_order_acquire);
- }
- bool ThreadLocalPtr::StaticMeta::CompareAndSwap(uint32_t id, void* ptr,
- void*& expected) {
- // 获取本地局部缓存
- auto* tls = GetThreadLocal();
- return tls->entries[id].ptr.compare_exchange_strong(
- expected, ptr, std::memory_order_release, std::memory_order_relaxed);
- }
- // 将所有管理的对象指针设置为 nullptr, 将过期的指针返回, 供上层释放,
- // 下次进行从局部线程栈获取时, 发现内容为 nullptr, 则重新申请对象.
- void ThreadLocalPtr::StaticMeta::Scrape(uint32_t id, std::vector<void*>* ptrs, void* const replacement) {
- MutexLock l(Mutex());
- for (ThreadData* t = head_.next; t != &head_; t = t->next) {
- if (id <t->entries.size()) {
- void* ptr =
- t->entries[id].ptr.exchange(replacement, std::memory_order_acquire);
- if (ptr != nullptr) {
- // 搜集各个线程缓存, 进行解引用, 必要时释放内存
- ptrs->push_back(ptr);
- }
- }
- }
- }
- // 初始化, 或者被替换为 nullptr 后, 说明缓存对象已经过期, 需要重新申请.
- ThreadData* ThreadLocalPtr::StaticMeta::GetThreadLocal() {
申请线程局部的 ThreadData 对象, 通过 StaticMeta 对象管理成一个双向链表, 每个 instance 对象管理一组线程局部对象.
- if (UNLIKELY(tls_ == nullptr)) {
- auto* inst = Instance();
- tls_ = new ThreadData(inst);
- {
- // Register it in the global chain, needs to be done before thread exit
- // handler registration
- MutexLock l(Mutex());
- inst->AddThreadData(tls_);
- }
- return tls_;
- }
- }
读操作包括两部分, Get 和 Release, 这里面除了从 TLS 中获取缓存, 还涉及到一个释放旧对象内存的问题. Get 时, 利用 InUse 对象替换 TLS 对象, Release 时再将 TLS 对象替换回去, 读写没有并发的场景比较简单, 如下图, 其中 TLS Object 代表本地线程局部缓存, GlobalObject 是全局共享变量, 对所有线程可见.
下面我们再看看读写有并发的场景, 读线程读到 TLS object 后, 写线程修改了全局对象, 并且遍历对所有的 TLS object 进行修改, 设置 nullptr. 在此之后, 读线程进行 Release 时, compareAndSwap 失败, 感知到使用的 object 已经过期, 执行解引用, 必要时释放内存. 当下次再次 Get object 时, 发现 TLS object 为 nullptr, 就会使用当前最新的 object, 并在使用完成后, Release 阶段将 object 填回到 TLS.
应用场景
从前面的分析来看, TLS 作为 cache, 仍然需要一个全局变量, 全局变量保持最新值, 而 TLS 则可能存在滞后, 这就要求我们的使用场景不要求读写要实时严格一致, 或者能容忍多版本. 全局变量和局部缓存有交互, 交互逻辑是, 全局变量变化后, 局部线程要能及时感知到, 但不需要实时. 允许读写并发, 即允许读的时候, 使用旧值读, 待下次读的时候, 再获取到新值. Rocksdb 中的 superversion 管理则符合这种使用场景, swich/flush/compaction 会产生新的 superversion, 读写数据时, 则需要读 supversion. 往往读写等前台操作相对于 switch/flush/compaction 更频繁, 所以读 superversion 比写 superversion 比例更高, 而且允许系统中同时存留多个 superversion.
每个线程可以拿 superversion 进行读写, 若此时并发有 flush/compaction 产生, 会导致 superversion 发生变化, 只要后续再次读取 superversion 时, 能获取到最新即可. 细节上来说, 扩展到应用场景, 一般在读场景下, 我们需要获取 snapshot, 并借助 superversion 信息来确认这次读取要读哪些物理介质 (mem,imm,L0,L1...LN).
1). 获取 snapshot 后, 拿 superversion 之前, 其它线程做了 flush/compaction 导致 superversion 变化
这种情况下, 可以拿到最新的 superversion.
2). 获取 snapshot 后, 拿 superversion 之后, 其它线程做了 flush/compaction 导致 superversion 变化
这种情况下, 虽然 superversion 比较旧, 但是依然包含了所有 snapshot 需要的数据. 那么为什么需要及时获取最新的 superversion, 这里主要是为了回收废弃的 sst 文件和 memtable, 提高内存和存储空间利用率.
总结
RocksDB 的线程局部缓存是一个很不错的实现, 用户使用局部缓存可以大大降低读写并发冲突, 尤其在读远大于写的场景下, 整个缓存维护代价也比较低, 只有写操作时才需要锁保护. 只要系统中允许共享变量的多版本存在, 并且不要求实时保证一致, 那么线程局部缓存是提升并发性能的一个不错的选择.
来源: https://www.cnblogs.com/cchust/p/11562949.html