ConsumeQueue,IndexFile 需要及时更新, 否则无法及时被消费, 根据消息属性查找消息也会出现较大延迟.
mq 通过开启一个线程 ReputMessageService 来准时转发 commitLog 文件更新事件, 相应的任务处理器根据转发的消息及时更新 ConsumeQueue,IndexFile 文件
DefaultMessageStore#start
ReputMessageService 线程每执行一次任务推送休息 1 毫秒旧继续尝试推送消息到消息消费队列和索引文件.
返回 reputFromOffset 偏移量开始的全部有效数据, 然后循环读取每一条消息.
在 DefaultMessageStore 的构造方法中:
topic: 消息主题名称
queueId: 消息队列 ID
commitLogOffset: 消息物理偏移量
msgSize: 消息长度
tagsCode: 消息过滤 tag hashcode
storeTimestamp: 消息存储时间戳
consumeQueueOffset: 消息队列偏移量
key: 消息索引
success: 是否成功解析道完整的消息
uniqKey: 消息唯一键
sysFlag: 消息系统标记
preparetransactionOffset: 消息预处理事务偏移量
propertiesMap: 消息属性
bitMap: 位图
根据消息主题与队列 ID, 先获取对应的 ConsumeQueue 文件.
最后会调用这个方法, 一次将消息偏移量, 消息长度, tag hashcode 写入到 ByteBuffer 中, 并根据 consumeQueueOffset 计算 ConumeQueue 中的物理地址, 将内容追加到 ConsumeQueue 的内存映射文件中, ConsumeQueue 的刷盘方式固定为异步刷盘模式
如果 messageIndexEnable 设置为 true.
获取或创建 IndexFile 文件并获取所有文件最大的物理偏移量. 如果该消息的物理偏移量小于索引文件中的物理偏移量, 则说明是重复数据, 忽略本次索引构建.
如果消息的唯一键不为空, 则添加到 Hash 索引中, 以便加速根据唯一键检索消息
构建索引键, mq 支持为同一个消息建立多个索引, 多个索引键空格分开.
来源: http://www.bubuko.com/infodetail-3395554.html