1. 概述
最近有些同学在学习 Kafka 时, 问到 Kafka 的日志压缩 (Log Compaction) 问题, 对于 Kafka 的日志压缩有些疑惑, 今天笔者就为大家来剖析一下 Kafka 的日志压缩的相关内容.
2. 内容
2.1 日志压缩是什么?
Kafka 是一个基于 Log 的流处理系统, 一个 Topic 可以有若干个 Partition,Partition 是复制的基本单元, 在一个 Broker 节点上, 一个 Partition 的数据文件可以存储在若干个独立磁盘目录中, 每个 Partition 的日志文件存储的时候又会被分成一个个的 Segment, 默认的 Segment 的大小是 1GB, 有属性 offsets.topic.segment.bytes 来控制. Segment 是日志清理的基本单元, 当前正在使用的 Segment 是不会被清理的, 对于每一个 Partition 的日志, 以 Segment 为单位, 都会被分为两部分, 已清理和未清理的部分. 同时, 未清理的那部分又分为可以清理和不可清理. 日志压缩是 Kafka 的一种机制, 可以提供较为细粒度的记录保留, 而不是基于粗粒度的基于时间的保留.
Kafka 中的每一条数据都包含 Key 和 Value, 数据存储在磁盘上, 一般不会永久保留, 而是在数据达到一定的量或者时间后, 对最早写入的数据进行删除. 日志压缩在默认的删除规则之外提供了另一种删除过时数据 (或者说是保留有价值的数据) 的方式, 就是对于具有相同的 Key, 而数据不同, 值保留最后一条数据, 前面的数据在合适的情况下删除.
2.2 日志压缩的应用场景
日志压缩特性, 就实时计算来说, 可以在异常容灾方面有很好的应用途径. 比如, 我们在 Spark,Flink 中做实时计算时, 需要长期在内存里面维护一些数据, 这些数据可能是通过聚合了一天或者一周的日志得到的, 这些数据一旦由于异常因素 (内存, 网络, 磁盘等) 崩溃了, 从头开始计算需要很长的时间. 一个比较有效可行的方式就是定时将内存里的数据备份到外部存储介质中, 当崩溃出现时, 再从外部存储介质中恢复并继续计算.
使用日志压缩来替代这些外部存储有哪些优势及好处呢? 这里为大家列举并总结了几点:
Kafka 即是数据源又是存储工具, 可以简化技术栈, 降低维护成本
使用外部存储介质的话, 需要将存储的 Key 记录下来, 恢复的时候再使用这些 Key 将数据取回, 实现起来有一定的工程难度和复杂度. 使用 Kafka 的日志压缩特性, 只需要把数据写进 Kafka, 等异常出现恢复任务时再读回到内存就可以了
Kafka 对于磁盘的读写做了大量的优化工作, 比如磁盘顺序读写. 相对于外部存储介质没有索引查询等工作量的负担, 可以实现高性能. 同时, Kafka 的日志压缩机制可以充分利用廉价的磁盘, 不用依赖昂贵的内存来处理, 在性能相似的情况下, 实现非常高的性价比(这个观点仅仅针对于异常处理和容灾的场景来说)
2.3 日志压缩方式的实现细节
当 Topic 中的 cleanup.policy(默认为 delete)设置为 compact 时, Kafka 的后台线程会定时将 Topic 遍历两次, 第一次将每个 Key 的哈希值最后一次出现的 offset 记录下来, 第二次检查每个 offset 对应的 Key 是否在较为后面的日志中出现过, 如果出现了就删除对应的日志.
日志压缩是允许删除的, 这个删除标记将导致删除任何先前带有该 Key 的消息, 但是删除标记的特殊之处在于, 它们将在一段时间后从日志中清理, 以释放空间. 这些需要注意的是, 日志压缩是针对 Key 的, 所以在使用时应注意每个消息的 Key 值不为 NULL.
压缩是在 Kafka 后台通过定时的重新打开 Segment 来完成的, Segment 的压缩细节如下图所示:
日志压缩可以确保的内容, 这里笔者总结了以下几点:
任何保持在日志头部以内的使用者都将看到所写的每条消息, 这些消息将具有顺序偏移量. 可以使用 Topic 的 min.compaction.lag.ms 属性来保证消息在被压缩之前必须经过的最短时间. 也就是说, 它为每个消息在 (未压缩) 头部停留的时间提供了一个下限. 可以使用 Topic 的 max.compaction.lag.ms 属性来保证从编写消息到消息符合压缩条件之间的最大延时
消息始终保持顺序, 压缩永远不会重新排序消息, 只是删除一些而已
消息的偏移量永远不会改变, 它是日志中位置的永久标识符
从日志开始的任何使用者将至少看到所有记录的最终状态, 按记录的顺序写入. 另外, 如果使用者在比 Topic 的 log.cleaner.delete.retention.ms 短的时间内到达日志的头部, 则会看到已删除记录的所有 delete 标记. 保留时间默认是 24 小时.
2.4 日志压缩核心代码实现
日志压缩的核心实现代码大部分的功能在 CleanerThread 中, 核心实现逻辑在 Cleaner 中的 clean 方法中, 实现细节如下:
- /**
- * Clean the given log
- *
- * @param cleanable The log to be cleaned
- *
- * @return The first offset not cleaned and the statistics for this round of cleaning
- */
- private[log] def clean(cleanable: LogToClean): (Long, CleanerStats) = {
- // figure out the timestamp below which it is safe to remove delete tombstones
- // this position is defined to be a configurable time beneath the last modified time of the last clean segment
- val deleteHorizonMs =
- cleanable.log.logSegments(0, cleanable.firstDirtyOffset).lastOption match {
- case None => 0L
- case Some(seg) => seg.lastModified - cleanable.log.config.deleteRetentionMs
- }
- doClean(cleanable, deleteHorizonMs)
- }
- private[log] def doClean(cleanable: LogToClean, deleteHorizonMs: Long): (Long, CleanerStats) = {
- info("Beginning cleaning of log %s.".format(cleanable.log.name))
- val log = cleanable.log
- val stats = new CleanerStats()
- // build the offset map
- info("Building offset map for %s...".format(cleanable.log.name))
- val upperBoundOffset = cleanable.firstUncleanableOffset
- buildOffsetMap(log, cleanable.firstDirtyOffset, upperBoundOffset, offsetMap, stats)
- val endOffset = offsetMap.latestOffset + 1
- stats.indexDone()
- // determine the timestamp up to which the log will be cleaned
- // this is the lower of the last active segment and the compaction lag
- val cleanableHorizonMs = log.logSegments(0, cleanable.firstUncleanableOffset).lastOption.map(_.lastModified).getOrElse(0L)
- // group the segments and clean the groups
- info("Cleaning log %s (cleaning prior to %s, discarding tombstones prior to %s)...".format(log.name, new Date(cleanableHorizonMs), new Date(deleteHorizonMs)))
- val transactionMetadata = new CleanedTransactionMetadata
- val groupedSegments = groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize,
- log.config.maxIndexSize, cleanable.firstUncleanableOffset)
- for (group <- groupedSegments)
- cleanSegments(log, group, offsetMap, deleteHorizonMs, stats, transactionMetadata)
- // record buffer utilization
- stats.bufferUtilization = offsetMap.utilization
- stats.allDone()
- (endOffset, stats)
- }
日志压缩通过两次遍历所有的数据来实现, 两次遍历之间交流的通道就是一个, 下面是 OffsetMap 的内容:
- trait OffsetMap {
- def slots: Int
- def put(key: ByteBuffer, offset: Long): Unit
- def get(key: ByteBuffer): Long
- def updateLatestOffset(offset: Long): Unit
- def clear(): Unit
- def size: Int
- def utilization: Double = size.toDouble / slots
- def latestOffset: Long
- }
这基本就是一个普通的 MuTable Map, 在 Kafka 代码中, 它的实现只有一个叫做 SkimpyOffsetMap
2.4.1 PUT 方法
PUT 方法会为每个 Key 生成一份信息, 默认使用 MD5 方法生成一个 Byte, 根据这个信息在 Byte 中哈希的到一个下标, 如果这个下标已经被别的占用, 则线性查找到下个空余的下标为止, 然后对应位置插入该 Key 的 Offset, 实现代码如下:
- /**
- * Associate this offset to the given key.
- * @param key The key
- * @param offset The offset
- */
- override def put(key: ByteBuffer, offset: Long): Unit = {
- require(entries <slots, "Attempt to add a new entry to a full offset map.")
- lookups += 1
- hashInto(key, hash1)
- // probe until we find the first empty slot
- var attempt = 0
- var pos = positionOf(hash1, attempt)
- while(!isEmpty(pos)) {
- bytes.position(pos)
- bytes.get(hash2)
- if(Arrays.equals(hash1, hash2)) {
- // we found an existing entry, overwrite it and return (size does not change)
- bytes.putLong(offset)
- lastOffset = offset
- return
- }
- attempt += 1
- pos = positionOf(hash1, attempt)
- }
- // found an empty slot, update it--size grows by 1
- bytes.position(pos)
- bytes.put(hash1)
- bytes.putLong(offset)
- lastOffset = offset
- entries += 1
- }
2.4.2 GET 方法
GET 方法使用和 PUT 同样的算法获取 Key 的信息, 通过信息获得 Offset 的存储位置, 实现代码如下:
- /**
- * Get the offset associated with this key.
- * @param key The key
- * @return The offset associated with this key or -1 if the key is not found
- */
- override def get(key: ByteBuffer): Long = {
- lookups += 1
- hashInto(key, hash1)
- // search for the hash of this key by repeated probing until we find the hash we are looking for or we find an empty slot
- var attempt = 0
- var pos = 0
- //we need to guard against attempt integer overflow if the map is full
- //limit attempt to number of slots once positionOf(..) enters linear search mode
- val maxAttempts = slots + hashSize - 4
- do {
- if(attempt>= maxAttempts)
- return -1L
- pos = positionOf(hash1, attempt)
- bytes.position(pos)
- if(isEmpty(pos))
- return -1L
- bytes.get(hash2)
- attempt += 1
- } while(!Arrays.equals(hash1, hash2))
- bytes.getLong()
- }
3. 配置实践注意事项
默认情况下, 启动日志清理器, 若需要启动特定 Topic 的日志清理, 请添加特定的属性. 配置日志清理器, 这里为大家总结了以下几点:
log.cleanup.policy 设置为 compact, 该策略属性是在 Broker 中配置, 它会影响到集群中所有的 Topic.
log.cleaner.min.compaction.lag.ms 这个属性用来防止对更新超过最小消息进行压缩, 如果没有设置, 除最后一个 Segment 之外, 所有 Segment 都有资格进行压缩
log.cleaner.max.compaction.lag.ms 这个可以用来防止低生产速率的日志在无限制的时间内不适合压缩
4. 总结
Kafka 的日志压缩原理并不复杂, 就是定时把所有的日志读取两遍, 写一遍, 而 CPU 的速度超过磁盘完全不是问题, 只要日志的量对应的读取两遍和写入一遍的时间在可接受的范围内, 那么它的性能就是可以接受的.
另外, 笔者开源的一款 Kafka 监控关系系统 Kafka-Eagle, 喜欢的同学可以 Star 一下, 进行关注.
Kafka Eagle 源代码地址: https://github.com/smartloli/kafka-eagle
5. 结束语
这篇博客就和大家分享到这里, 如果大家在研究学习的过程当中有什么问题, 可以加群进行讨论或发送邮件给我, 我会尽我所能为您解答, 与君共勉!
另外, 博主出书了《Kafka 并不难学 https://item.jd.com/12455361.html 》和《Hadoop 大数据挖掘从入门到进阶实战 https://item.jd.com/12371763.html 》, 喜欢的朋友或同学, 可以在公告栏那里点击购买链接购买博主的书进行学习, 在此感谢大家的支持. 关注下面公众号, 根据提示, 可免费获取书籍的教学视频.
来源: https://www.cnblogs.com/smartloli/p/12212541.html