HLog 的作用:
HBase 写入数据时会同时写入到 WAL 和 Memstore 中,其中 Memstore 是位于内存中的 store,类似于写缓存,当 Memstore 的大小超过限定的阈值时会触发 flush 行为,将内存中的数据刷写到磁盘做持久化。其中的 wal 也称为 hlog,作用类似于 mysql 中的 binlog,记录了客户端的每次 update 动作,只有当 wal 写入成功之后,本次写事务才会返回。
我们知道内存中的数据是易失的,当 regionserver 宕机时,HMaster 会切割按 region 切割宕机 regionserver 上的 hlog,并将它分发到 region 被迁移到的新 regionserver 上以恢复该在 memstore 中还未来得及刷盘的数据。相应地如果 hlog 里面的记录已经完成了 flush,则该 hlog 会被 regionServer 移动到. oldlog 目录下,由 HMaster 上的定时线程 LogCleaner 周期性地扫描该目录,删除掉不再使用的 hlog。
此外 hlog 还有一个意义就是用于 hbase 的 replication,hbase 的 replication 是通过将主集群的 hlog 推送到备集群,然后在备集群上 reply 来实现的。这个推送过程是异步完成的,因此会存在. oldlog 目录下的 hlog 还未被 replication 推送完成的情况,此时 HMaster 会将这些未推送完成的 hlog 记录在 zk 上。方便在清除. oldlog 目录时跳过有 zk 指向的 hlog 文件。
WALFactory 类:
先从 WALFactory 开始分析,HRegionServer 中管理着一个 WALFactory 变量,定义的格式如下:
- protected volatile WALFactory walFactory
下面分析一下 walFactory 在 regionserver 中的应用姿势,首先在 RegionServer 中维护着一个与 Master 之间的心跳逻辑,这段代码在 RegionServer 的主循环 run() 里,如下所示:
- while(keepLooping()) {
- RegionServerStartupResponse w = repartForDuty();
- if (w == null) {
- this.sleeper.sleep();
- } else {
- handleReportForDutyResponse(w);
- break;
- }
- }
reportForDuty 是 RegionServer 向 master 上报注册信息,master 会回应一个 key/value 格式的信息给 regionserver,以标识 regionserver 本次 register 成功。
regionserver 收到 master 的回应消息后,开始调用 handleReportForDutyResponse,这个函数的主要逻辑列在下面:
- try{
- 根据master的返回值处理hostname逻辑;
- ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
- this.walFactory = setupWALAndReplication(); //启动walFactory
- 初始化metricsRegionServer;
- startServiceThreads(); //启动各路服务线程;
- startHeapMemoryManager(); //启动内存管理了;
- synchronized (online) {
- online.set(true);
- online.notifyAll();
- }
- }
setupWALAndReplication 创建并返回了 WALFactory 类,setupWALAndRepliaction 中的几个主要步骤摘录如下:
- private WALFactory setupWALAndReplication() throws IOException {
- final Path oldLogDir = new Paht(……..) //获取old WAL日志的路径
- final String logName=DefaultWALProvider.getWALDirectoryName(this.serverName.toString());
- Path lodger = new Path(rootDir, logName);
- //如果设置了replication相关,初始化replication manager
- createNewReplicationInstance(conf, this, this.fs, logdir, oldLogDir);
- listeners添加,添加了WALActionsListener;
- final List<WALActionsListener> listeners = new ArrayList<>();
- return new WALFactory(conf, listeners, serverName.toString);
- }
WALFactory 的构造函数中除了设置超时时间等之外,还初始化了一个 DefaultWALProvider 类型的变量,几乎所有与 wal 文件操作相关的方法都定义在这个接口类中。它里面分别定义了 Reader&Writer,用于对 WAL 文件的读写。WALFactory 中提供了两个接口 createReader&createWriter,实际也是初始化了 DefaultWALProvider 中的这两个类。Reader&Writer 实现了对 hlog 文件的读写。
DefaultWALProvider 中还包括了一个 FSHLog 类型的成员变量,FSHLog 管理了将 WAL 持久化的线程模型。下面详细分析 FSHLog 的实现。
FSHLog 与 HLog 写入模型:
首先从 hbase 的写入路径入手分析,前面分析过客户端的 put 操作在服务端最终调用的是 doMiniBatchMutation。数据在被成功写入到 memstore 之后,会收集此次写入动作的 table name、region info 等信息并构造成一个 HlogKey 结构的对象记为 walkey,并将当前写入的数据作为 walEdit,然后将 walkey 和 walEdit 共同组装成一个 entry 之后将之 append 到内存中一个 ringbuffer 类型的缓冲区中。返回值 txid 用于标识本次写事务在缓冲区的写入序号。
- if (walEdit.size() > 0) {
- walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
- this.htableDescriptor.getTableName(), now, m.getClusterIds(),
- currentNonceGroup, currentNonce);
- txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey,
- walEdit, getSequenceId(), true, null);
- walEdit = new WALEdit(isInReplay);
- walKey = null;
- }
待所有的锁释放之后,再将 buffer 中的数据刷写到磁盘,最后将版本号向前推进,提交本次写事务,其中刷写磁盘的代码如下:
- if(taxied != 0) {
- syncOrDefer(txid, durabiliby);
- }
syncOrDefer 会根据客户端设置的持久化级别选择是否将日志数据落盘,其中 client 可以选择的 wal 持久化等级划分为如下四个等级:
SKIP_WAL:数据只写 memstore,不写 Hlog,此时写入性能最高,但是数据有丢失风险;
ASYNC_WAL:异步将数据写入 HLog 文件中;
SYNC_WAL:同步将数据写入日志文件,但是此时数据只是被写入文件系统缓存,并没有真正落盘;
FSYNC_WAL:同步将数据写入日志文件并强制落盘,可以保证数据不丢失,但是性能最差;
客户端可以通过如:put.setDurability(Durability.SYNC_WAL) 设置 WAL 持久化级别,不设置时默认是 SYNC_WAL。syncOrDefer 通过调用 this.wal.sync(txid) 将数据落盘,
经过前面的分析,可以看到 hlog 的写入最终是调用了 FSHLog 向外暴露的 append()&sync() 两个方法来实现的,可见 FSHLog 中实现了 hlog 的写线程模型。因此想要分析写线程模型的实现,分析的入口就在上面两个方法,在具体分析方法细节前,先看看构造 FSHLog 时都初始化了哪些变量:
- this.appendExecutor = Executors.
- newSingleThreadExecutor(Threads.getNamedThreadFactory(hostingThreadName + ".append"));
- final int preallocatedEventCount =
- this.conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16);
- this.disruptor =
- new Disruptor<RingBufferTruck>(RingBufferTruck.EVENT_FACTORY, preallocatedEventCount,
- this.appendExecutor, ProducerType.MULTI, new BlockingWaitStrategy());
- this.disruptor.getRingBuffer().next();
- this.ringBufferEventHandler =
- new RingBufferEventHandler(conf.getInt("hbase.regionserver.hlog.syncer.count", 5),
- maxHandlersCount);
- this.disruptor.handleExceptionsWith(new RingBufferExceptionHandler());
- this.disruptor.handleEventsWith(new RingBufferEventHandler [] {this.ringBufferEventHandler});
- this.syncFuturesByHandler = new ConcurrentHashMap<Thread, SyncFuture>(maxHandlersCount);
- this.disruptor.start();
其中最关键的就是变量 disruptor,它是一个 Disruptor<RingBufferTruck> 类型的成员变量,Distuptor 是 LMAX 开发的一个高性能无锁队列,本质还是个生产者 - 消费者模型,它采用一个环形数组结构,称为 RingBuffer 来复用内存,同时 Buffer 上的读写序列号经过优化可以避免伪共享,多线程并发访问该序列号时通过 CPU 级别的 CAS 自旋来获得,以此实现了 lock free。这样只要 buffer 中有事件就会被递交到消费者线程池去处理。
回头看 disruptor 构造函数中的各参数的含义,首先第一个参数指定了通过 Disruptor 交换的事件类型,这里定义为 RingBufferTruck 类型,参数 EVENT_FACTORY 指代事件工厂,用于 Disruptor 通过该工厂在 RingBuffer 中预创建 Event 实例。参数 preallocatedEventCount 指定了 ringBuffer 的大小。ProducerType 是数据生产方式,客户端写入数据时,调用 this.wal.append() 方法实际上就是以生产者的身份将数据写入到 RingBuffer 中。append 关键代码如下:
- public long append(final HTableDescriptor htd, final HRegionInfo hri, final WALKey key,
- final WALEdit edits, final AtomicLong sequenceId, final boolean inMemstore,
- final List<Cell> memstoreCells) throws IOException {
- FSWALEntry entry = null;
- long sequence = this.disruptor.getRingBuffer().next();
- try {
- RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence);
- entry = new FSWALEntry(sequence, key, edits, sequenceId, inMemstore, htd, hri, memstoreCells); //用key和edits构造一个对象entry
- truck.loadPayload(entry, scope.detach()); //将上面构造的对象包装为RingBufferTruck事件并添加到Ring Buffer
- } finally {
- this.disruptor.getRingBuffer().publish(sequence);
- }
- return sequence;
- }
消费者线程由 appendExecutor 指定,这里用到的是 newSingleThreadExecutor 定义的单线程线程。BlockingWaitStrategy() 指定了 consumer 的等待策略。appendExecutor 并不处理具体的 event,而是从 Ringbuffer 中接收之后将它转交给后端的 ringBufferEventHandler 来处理,因为 appendExecutor 中不包含事件处理逻辑,所以非常轻量,只需一个线程就可以处理生产端高并发的请求。
真正的事件处理在 ringBufferEventHandler 中完成,如下面定义,hbase 中默认是 5 个 handler:
- this.ringBufferEventHandler = new
- RingBufferEventHandler(conf.getInt("hbase.regionserver.hlog.syncer.count",5), maxHandersCount);
从 RingBufferEventHandler 起分析 event 的处理逻辑,每个 RingBufferEventHandler 中定义了两组主要的线程数组,如下所示:
- class RingBufferEventHandler implements EventHandler < RingBufferTruck > ,
- LifecycleAware {
- private final SyncRunner[] syncRunners;
- private final SyncFuture[] syncFutures;
- //其它变量
- }
每接收到 RingBufferTruck 事件,RingBufferEventHandler 便会调用 onEvent 对该事件进行处理,主要的处理逻辑代码列在了下面:
- public void onEvent(final RingBufferTruck truck, final long sequence, boolean endOfBatch)
- throws Exception {
- try {
- if (truck.hasSyncFuturePayload()) {
- this.syncFutures[this.syncFuturesCount++] = truck.unloadSyncFuturePayload();
- if (this.syncFuturesCount == this.syncFutures.length) endOfBatch = true;
- } else if (truck.hasFSWALEntryPayload()) {
- TraceScope scope = Trace.continueSpan(truck.unloadSpanPayload());
- try {
- append(truck.unloadFSWALEntryPayload());
- } catch (Exception e) {
- 。。。。
- }
- } else {
- return;
- }
- int index = Math.abs(this.syncRunnerIndex++) % this.syncRunners.length;
- try {
- this.syncRunners[index].offer(sequence, this.syncFutures, this.syncFuturesCount);
- } catch (Exception e) {
- cleanupOutstandingSyncsOnException(sequence, e);
- throw e;
- }
- attainSafePoint(sequence);
- this.syncFuturesCount = 0;
- } catch (Throwable t) {
- LOG.error("UNEXPECTED!!! syncFutures.length=" + this.syncFutures.length, t);
- }
- }
RingBufferTruck 中封装可能封装了两种不同类型的对象,分别是 WALEntry 或者 SyncFuture,消费线程的执行方法 onEvent() 中对上述对象的处理也不同,如果是 WALEntry,则调用 append 方法,使用 writer 将输入的 WALEntry 经 protobuf 序列化后写入 hadoop 文件。如果是 SyncFuture,则把该对象放入 RingBufferEventHandler 自身维护的 SyncFutures[] 数组中。
然后,onEvent 从 syncRunners[] 中取出一个线程,并调用它的 offer 方法,offer 中将该 EventHandler 中的所有 syncFuture 添加到 SyncRunner 自身维护的阻塞队列中,在 syncRunner 线程的 run 方法里等到写满一批 syncFuture 之后,会调用 writer.sync() 将数据落盘,待数据成功刷到磁盘后,释放 syncFuture,并将其中的 scope 置位。之所以如此设计,是因为对比客户端的屡次 append 操作,刷盘是相对比较耗时的,以此采用写文件缓存并结合异步刷盘的方式平衡对 client 端 API 的友好和客户端写吞吐。run 方法简化后的主干代码如下:
- public void run() {
- long currentSequence;
- while (!isInterrupted()) {
- int syncCount = 0;
- SyncFuture takeSyncFuture;
- try {
- while (true) {
- takeSyncFuture = this.syncFutures.take();
- currentSequence = this.sequence;
- long syncFutureSequence = takeSyncFuture.getRingBufferSequence();
- long currentHighestSyncedSequence = highestSyncedSequence.get();
- if (currentSequence < currentHighestSyncedSequence) {
- syncCount += releaseSyncFuture(takeSyncFuture, currentHighestSyncedSequence, null);
- continue;
- }
- break;
- }
- try {
- writer.sync();
- currentSequence = updateHighestSyncedSequence(currentSequence);
- } catch (IOException e) {
- 。。。。。
- } catch (Exception e) {
- 。。。。。
- } finally {
- takeSyncFuture.setSpan(scope.detach());
- syncCount += releaseSyncFuture(takeSyncFuture, currentSequence, t);
- syncCount += releaseSyncFutures(currentSequence, t);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- } catch (Throwable t) {
- LOG.warn("UNEXPECTED, continuing", t);
- }
- }
- }
- }
需要注意的是 writer.sync() 的预处理,其取出当前已处理的最大 sequence 与本次待处理 syncFuture 中的 sequence 相对比,sequence 是按照事务提交的顺序递增赋值的,事务 append 到缓存的顺序也是与 sequence 的赠序一致,如果当前 sequence 小于最大已提交 sequence,则表明 hlog 中已写入相应记录,因此调用 releaseSyncFuture() 释放 syncFuture。
还有一个问题,syncFuture 中 scope 的置位是什么时候来查的呢,答案就是 FSHLog 向外暴露的 this.wal.sync(txid) 方法,客户端写操作调用 sync 后会阻塞等待数据刷盘成功,sync 中调用 syncFuture 的 get 后阻塞在文件系统的同步操作上,当文件系统将数据落盘完成之后,get 方法返回,并将 syncFuture 中置位的 scope 返回给客户端。客户端工作线程被唤醒,返回继续写入 memstore,完成一次写操作。
- private Span blockOnSync(final SyncFuture syncFuture) throws IOException {
- try {
- syncFuture.get();
- return syncFuture.getSpan();
- } catch (InterruptedException ie) {
- 。。。。
- } catch (ExecutionException e) {
- 。。。。
- }
- }
总而言之,WAL 的写入模型是一个多消费者单生产者模型,生产者调用的方法 append(),将包装好的 WALEdit 写入到线程安全的消息队列 RingBuffer,同时只有一个消费者从这个队列中拉取数据并调用 sync() 方法把数据异步刷写到磁盘,单消费者保证了 WAL 日志并发写入时日志的全局顺序唯一,同时采用无锁队列 Disruptor RingBuffer 保证了写入端(生产者)的高吞吐低延时。
LogRoller:
LogRoller 是个定期执行的线程。每个 RegionServer 中都有一个 LogRoller 线程,线程执行的周期由 hbase.regionserver.logroll.period 给出,默认时间是 1hr。也就是说每过一个小时会产生一个新的 hlog 文件,hlog 的文件名由 regionserver 名称+hlog 形成时的时间戳构成。
LogRoller 的 run 方法中的主要流程如下面列出:
- rollLock.lock();
- try {
- for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) {
- final WAL wal = entry.getKey();
- final byte [][] regionsToFlush = wal.rollWriter(periodic || entry.getValue().booleanValue());
- walNeedsRoll.put(wal, Boolean.FALSE);
- if (regionsToFlush != null) {
- for (byte [] r : regionsToFlush) scheduleFlush(r);
- }
- }
- } finally {
- try {
- rollLog.set(false);
- } finally {
- rollLock.unlock();
- }
- }
在第四行中调用了 HLog 的 rollWriter,rollWriter 中会打开一个新的 hdfs 文件供 log 写入,并将 old 的 hlog 文件关闭。
来源: http://www.bubuko.com/infodetail-1978683.html