consumeQueue 如何知道最大的 offset, 或者说启动一个 broker 后, 从哪个地方开始写入新的数据?
consumeQueue 文件固定 20 字节存储一个 entry, 新建一个 consumeQueue 文件, 只有写入 entry 的地方有值, 其他地方是 0 字节.
1. 调用 fileChannel.map 函数, 可以指定文件大小, 同时文件空白的地方以 0 字节填充
- this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
- this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
2. broker 启动时检查 consumeQueue 文件
- // org.apache.rocketmq.store.DefaultMessageStore#recoverConsumeQueue
- // org.apache.rocketmq.store.ConsumeQueue#recover
- public void recover() {
- final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
- if (!mappedFiles.isEmpty()) {
- int index = mappedFiles.size() - 3;
- if (index <0)
- index = 0;
- int mappedFileSizeLogics = this.mappedFileSize;
- MappedFile mappedFile = mappedFiles.get(index);
- ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
- // 文件名即当前文件开始的物理偏移量
- long processOffset = mappedFile.getFileFromOffset();
- // 当前文件的 entry 的总字节数
- long mappedFileOffset = 0;
- long maxExtAddr = 1;
- while (true) {
- for (int i = 0; i < mappedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) {
- long offset = byteBuffer.getLong();
- int size = byteBuffer.getInt();
- long tagsCode = byteBuffer.getLong();
- if (offset>= 0 && size> 0) {
- // 发现一个 entry
- mappedFileOffset = i + CQ_STORE_UNIT_SIZE;
- this.maxPhysicOffset = offset + size;
- if (isExtAddr(tagsCode)) {
- maxExtAddr = tagsCode;
- }
- } else {
- log.info("recover current consume queue file over," + mappedFile.getFileName() + " "
- + offset + "" + size +" " + tagsCode);
- break;
- }
- }
- if (mappedFileOffset == mappedFileSizeLogics) {
- index++;
- if (index>= mappedFiles.size()) {
- log.info("recover last consume queue file over, last mapped file"
- + mappedFile.getFileName());
- break;
- } else {
- mappedFile = mappedFiles.get(index);
- byteBuffer = mappedFile.sliceByteBuffer();
- processOffset = mappedFile.getFileFromOffset();
- mappedFileOffset = 0;
- log.info("recover next consume queue file," + mappedFile.getFileName());
- }
- } else {
- log.info("recover current consume queue queue over" + mappedFile.getFileName() + " "
- + (processOffset + mappedFileOffset));
- break;
- }
- }
- // 计算总的 entry 的位移
- processOffset += mappedFileOffset;
- this.mappedFileQueue.setFlushedWhere(processOffset);
- // 后续写入的开始点
- this.mappedFileQueue.setCommittedWhere(processOffset);
- this.mappedFileQueue.truncateDirtyFiles(processOffset);
- if (isExtReadEnable()) {
- this.consumeQueueExt.recover();
- log.info("Truncate consume queue extend file by max {}", maxExtAddr);
- this.consumeQueueExt.truncateByMaxAddress(maxExtAddr);
- }
- }
- }
来源: http://www.bubuko.com/infodetail-3384542.html