leveldb 的数据存储采用 LSM 的思想,将随机写入变为顺序写入,记录写入操作日志,一旦日志被以追加写的形式写入硬盘,就返回写入成功,由后台线程将写入日志作用于原有的磁盘文件生成新的磁盘数据. Leveldb 在内存中维护一个数据结构 memtable, 采用 skiplist 来实现,保存当前写入的数据,当数据达到一定规模后变为不可写的内存表 immutable table. 新的写入操作会写入新的 memtable,而 immutable table 会被后台线程写入到数据文件. Leveldb 的数据文件是按层存放的,默认配置的最高层级是 7,即 level0,level1,...,level7. 内存中的 immutable 总是写入 level0, 除 level0 之外的各个层 leveli 的所有数据文件的 key 范围都是互相不相交的. 当满足一定条件时,leveli 的数据文件会和 leveli+1 的数据文件进行 merge, 产生新的 leveli+1 层级的文件, 这个磁盘文件的 merge 过程和 immutable 的 dump 过程叫做 Compaction,在 leveldb 中是由一个单独的后台线程来完成的.
进行 Compaction 操作的条件如下:
1. 产生了新的 immutable table 需要写入数据文件
2. 某个 level 的数据规模过大
3. 某个文件被无效查询的次数过多(在文件 i 中查询 key, 没有找到 key, 这次查询称为文件 i 的无效查询)
4. 手动 compaction
满足以上条件会启动 Compaction 过程,接下来分析详细的 Compaction 过程.
Leveldb 进行 Compaction 的入口函数是 db 文件夹下 db_impl.cc 文件中的 DBImpl::MaybeScheduleCompaction,该函数在每次 leveldb 进行读写操作时都有可能被调用. 源码内容如下:
void DBImpl: :MaybeScheduleCompaction() {
mutex_.AssertHeld();
if (bg_compaction_scheduled_) {
// Already scheduled
} else if (shutting_down_.Acquire_Load()) {
// DB is being deleted; no more background compactions
} else if (!bg_error_.ok()) {
// Already got an error; no more changes
} else if (imm_ == NULL && manual_compaction_ == NULL && !versions_ - >NeedsCompaction()) {
// No work to be done
} else {
bg_compaction_scheduled_ = true;
env_ - >Schedule( & DBImpl: :BGWork, this); //新建后台任务并进行调度
}
}
首先调用 db 文件夹下 version_set.h 中的 NeedsCompaction() 判断是否需要启动 Compact 任务. 源码内容如下:
// Returns true iff some level needs a compaction.
bool NeedsCompaction() const {
Version * v = current_;
return (v - >compaction_score_ >= 1) || (v - >file_to_compact_ != NULL);
}
version_set.cc 中 compaction_score_ 的计算如下:
void VersionSet: :Finalize(Version * v) {
// Precomputed best level for next compaction
int best_level = -1;
double best_score = -1;
for (int level = 0; level < config: :kNumLevels - 1; level++) {
double score;
if (level == 0) {
// We treat level-0 specially by bounding the number of files
// instead of number of bytes for two reasons:
//
// (1) With larger write-buffer sizes, it is nice not to do too
// many level-0 compactions.
//
// (2) The files in level-0 are merged on every read and
// therefore we wish to avoid too many files when the individual
// file size is small (perhaps because of a small write-buffer
// setting, or very high compression ratios, or lots of
// overwrites/deletions).
score = v - >files_[level].size() / static_cast < double > (config: :kL0_CompactionTrigger);
} else {
// Compute the ratio of current size to size limit.
const uint64_t level_bytes = TotalFileSize(v - >files_[level]);
score = static_cast < double > (level_bytes) / MaxBytesForLevel(level);
}
if (score > best_score) {
best_level = level;
best_score = score;
}
}
v - >compaction_level_ = best_level;
v - >compaction_score_ = best_score;
}
注意,这里同时预计算了进行 compaction 的最佳 level.
确认需要启动 compaction 之后,调用 util 文件夹下 env_posix.cc 文件中的 PosixEnv::Schedule 函数启动 Compact 过程.
void PosixEnv: :Schedule(void( *
function)(void * ), void * arg) {
PthreadCall("lock", pthread_mutex_lock( & mu_));
// Start background thread if necessary
if (!started_bgthread_) {
started_bgthread_ = true;
PthreadCall("create thread", pthread_create( & bgthread_, NULL, &PosixEnv: :BGThreadWrapper, this));
}
// If the queue is currently empty, the background thread may currently be
// waiting.
if (queue_.empty()) {
PthreadCall("signal", pthread_cond_signal( & bgsignal_));
}
// Add to priority queue
queue_.push_back(BGItem());
queue_.back().
function = function;
queue_.back().arg = arg;
PthreadCall("unlock", pthread_mutex_unlock( & mu_));
}
如果没有后台线程,则创建后台线程,否则新建一个后台执行任务 BGItem 压入后台线程任务队列,然后调用 PosixEnv::BGThreadWrapper 唤醒后台线程:
static void * BGThreadWrapper(void * arg) {
reinterpret_cast < PosixEnv * >(arg) - >BGThread();
return NULL;
}
BGThreadWrapper 调用 PosixEnv::BGThread, 不断地从后台任务队列中拿到任务,然后执行任务
void PosixEnv::BGThread() {
while (true) {
// Wait until there is an item that is ready to run
PthreadCall("lock", pthread_mutex_lock(&mu_));
while (queue_.empty()) {
PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_));
}
void (*function)(void*) = queue_.front().function;
void* arg = queue_.front().arg;
queue_.pop_front();
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
(*function)(arg);
}
}
回到 DBImpl::MaybeScheduleCompaction,方便理解起见这里再重复一遍源码
void DBImpl: :MaybeScheduleCompaction() {
mutex_.AssertHeld();
if (bg_compaction_scheduled_) {
// Already scheduled
} else if (shutting_down_.Acquire_Load()) {
// DB is being deleted; no more background compactions
} else if (!bg_error_.ok()) {
// Already got an error; no more changes
} else if (imm_ == NULL && manual_compaction_ == NULL && !versions_ - >NeedsCompaction()) {
// No work to be done
} else {
bg_compaction_scheduled_ = true;
env_ - >Schedule( & DBImpl: :BGWork, this); //新建后台任务并进行调度
}
}
之前分析了 env_->Schedule 进行的调度过程,现在来分析实际进行后台任务的 DBImpl::BGWork.DBImpl::BGWork 在 db 文件夹下 db_impl.cc 文件中.
void DBImpl: :BGWork(void * db) {
reinterpret_cast < DBImpl * >(db) - >BackgroundCall();
}
DBImpl::BGWork 调用 DBImpl::BackgroundCall(),合并完成后可能导致有的 level 的文件数过多,因此会再次调用 MaybeScheduleCompaction() 判断是否需要继续进行合并.
void DBImpl: :BackgroundCall() {
MutexLock l( & mutex_);
assert(bg_compaction_scheduled_);
if (shutting_down_.Acquire_Load()) {
// No more background work when shutting down.
} else if (!bg_error_.ok()) {
// No more background work after a background error.
} else {
BackgroundCompaction();
}
bg_compaction_scheduled_ = false;
// Previous compaction may have produced too many files in a level,
// so reschedule another compaction if needed.
MaybeScheduleCompaction();
bg_cv_.SignalAll();
}
DBImpl::BackgroundCall() 调用 BackgroundCompaction(),在 BackgroundCompaction() 中分别完成三种不同的 Compaction:对 Memtable 进行合并, trivial Compaction(直接将文件移动到下一层)以及一般的合并,调用 DoCompactionWork() 实现.
void DBImpl::BackgroundCompaction() {
mutex_.AssertHeld();
if (imm_ != NULL) {
CompactMemTable();//1,对Memtable进行合并
return;
}
Compaction* c;
bool is_manual = (manual_compaction_ != NULL);//manual_compaction默认为NULL,则is_manual默认为false
InternalKey manual_end;
if (is_manual) { //取得手动compaction对象
ManualCompaction* m = manual_compaction_;
c = versions_->CompactRange(m->level, m->begin, m->end);
m->done = (c == NULL);
if (c != NULL) {
manual_end = c->input(0, c->num_input_files(0) - 1)->largest;
}
Log(options_.info_log,
"Manual compaction at level-%d from %s .. %s; will stop at %s\n",
m->level,
(m->begin ? m->begin->DebugString().c_str() : "(begin)"),
(m->end ? m->end->DebugString().c_str() : "(end)"),
(m->done ? "(end)" : manual_end.DebugString().c_str()));
} else { //取得自动compaction对象
c = versions_->PickCompaction();
}
Status status;
if (c == NULL) {
// Nothing to do
} else if (!is_manual && c->IsTrivialMove()) {//2,IsTrivialMove 返回 True,trivial Compaction,则直接将文件移入 level + 1 层即可
// Move file to next level
assert(c->num_input_files(0) == 1);
FileMetaData* f = c->input(0, 0);
c->edit()->DeleteFile(c->level(), f->number);
c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
f->smallest, f->largest);
status = versions_->LogAndApply(c->edit(), &mutex_);
if (!status.ok()) {
RecordBackgroundError(status);
}
VersionSet::LevelSummaryStorage tmp;
Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
static_cast<unsigned long long>(f->number),
c->level() + 1,
static_cast<unsigned long long>(f->file_size),
status.ToString().c_str(),
versions_->LevelSummary(&tmp));
} else { //3,一般的合并
CompactionState* compact = new CompactionState(c);
status = DoCompactionWork(compact); //进行compaction
if (!status.ok()) {
RecordBackgroundError(status);
}
CleanupCompaction(compact);
c->ReleaseInputs(); // input的文件引用计数减少1
DeleteObsoleteFiles(); //删除无用文件
}
delete c;
if (status.ok()) {
// Done
} else if (shutting_down_.Acquire_Load()) {
// Ignore compaction errors found during shutting down
} else {
Log(options_.info_log,
"Compaction error: %s", status.ToString().c_str());
}
if (is_manual) {
ManualCompaction* m = manual_compaction_; //标记手动compaction任务完成
if (!status.ok()) {
m->done = true;
}
if (!m->done) {
// We only compacted part of the requested range. Update *m
// to the range that is left to be compacted.
m->tmp_storage = manual_end;
m->begin = &m->tmp_storage;
}
manual_compaction_ = NULL;
}
}
首行 mutex_.AssertHeld(),Mutex 的 AssertHeld 函数实现默认为空,在很多函数的实现内有调用,其作用如下:
As you have observed it does nothing in the default implementation. The function seems to be a placeholder for checking whether a particular thread holds a mutex and optionally abort if it doesn't. This would be equivalent to the normal asserts we use for variables but applied on mutexes.
I think the reason it is not implemented yet is we don't have an equivalent light weight function to assert whether a thread holds a lock in pthread_mutex_t used in the default implementation. Some platforms which has that capability could fill this implementation as part of porting process. Searching online I did find some implementation for this function in the windows port of leveldb. I can see one way to implement it using a wrapper class over pthread_mutex_t and setting some sort of a thread id variable to indicate which thread(s) currently holds the mutex, but it will have to be carefully implemented given the race conditions that can arise.
Memtable 的合并
Compaction 首先检查 imm_,及时将已写满的 memtable 写入磁盘 sstable 文件,对 Memtable 的合并,调用 DBImpl::CompactMemTable() 完成:
void DBImpl: :CompactMemTable() {
mutex_.AssertHeld();
assert(imm_ != NULL); //imm_不能为空
VersionEdit edit;
Version * base = versions_ - >current();
base - >Ref();
Status s = WriteLevel0Table(imm_, &edit, base); //将Memtable转化为.sst文件,写入level0 sst table,并写入到edit中
base - >Unref();
if (s.ok()) {
edit.SetPrevLogNumber(0);
edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed
s = versions_ - >LogAndApply( & edit, &mutex_); //应用edit中记录的变化,来生成新的版本
}
if (s.ok()) {
// Commit to the new state
imm_ - >Unref();
imm_ = NULL;
has_imm_.Release_Store(NULL);
DeleteObsoleteFiles();
} else {
RecordBackgroundError(s);
}
}
其中 CompactMemTable() 主要调用了两个函数:WriteLevel0Table() 和 versions_->LogAndApply()
CompactMemTable() 首先调用 WriteLevel0Table(),源码内容如下:
Status DBImpl: :WriteLevel0Table(MemTable * mem, VersionEdit * edit, Version * base) {
mutex_.AssertHeld();
FileMetaData meta;
meta.number = versions_ - >NewFileNumber(); //获取新生成的.sst文件的编号
pending_outputs_.insert(meta.number);
Iterator * iter = mem - >NewIterator(); //用于遍历Memtable中的数据
Status s; {
mutex_.Unlock();
s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta); //创建.sst文件,并将其相关信息记录在meta中
mutex_.Lock();
}
delete iter; //iter用完之后一定要删除
pending_outputs_.erase(meta.number);
int level = 0;
if (s.ok() && meta.file_size > 0) {
const Slice min_user_key = meta.smallest.user_key();
const Slice max_user_key = meta.largest.user_key();
if (base != NULL) {
level = base - >PickLevelForMemTableOutput(min_user_key, max_user_key); //为合并的输出文件选择合适的level
}
edit - >AddFile(level, meta.number, meta.file_size, meta.smallest, meta.largest); //将生成的.sst文件加入到该level
}
return s;
}
WriteLevel0Table() 首先调用 BuildTable() 将 Immutable Memtable 中所有的数据写入到一个. sst 文件中,并将. sst 文件的信息(文件编号,Key 值范围,文件大小)记录到变量 meta 中. 由于 Memtable 是基于 Skiplist 的,是一个有序表,因此在写入. sst 文件时,Key 值也是从小到大来排列的. 可以发现,将 Memtable 中的数据转换为 SSTable 时,是将所有记录都写入 SSTable 的,要删除的记录也一样. 删除操作会在更高 level 的 Compaction 中完成. 因此 level 0 中可能会存在 Key 值相同的记录. 2. 然后调用 PickLevelForMemTableOutput() 为 Memtable 合并的输出文件选择合适的 level,并调用 edit->AddFile() 将生成的. sst 文件加入到该 level 中.
然后 CompactMemTable() 调用 db 文件夹下 version_set.cc 文件中的 versions_->LogAndApply() 基于当前版本和更改 edit 来得到一个新版本.
Trivial Compaction
由之前的分析可知,is_manual 默认为 false,会调用 PickCompaction() 来选出要进行合并的 level 和相应的输入文件. 当 c->IsTrivialMove() 满足时,则直接将文件移动到下一 level.
c = versions_ - >PickCompaction();
Status status;
if (c == NULL) {
// Nothing to do
} else if (!is_manual && c - >IsTrivialMove()) {
// Move file to next level
assert(c - >num_input_files(0) == 1);
FileMetaData * f = c - >input(0, 0);
c - >edit() - >DeleteFile(c - >level(), f - >number); //将文件从该层删除
c - >edit() - >AddFile(c - >level() + 1, f - >number, f - >file_size, //将该文件加入到下一level
f - >smallest, f - >largest);
status = versions_ - >LogAndApply(c - >edit(), &mutex_); //应用更改,创建新的Version
}
首先调用 db 文件夹下 version_set.cc 文件中的 VersionSet::PickCompaction() 为接下来的 Compaction 操作准备输入数据, 由之前对 Compaction 的数据结构分析可知,Compaction 操作有两种触发方式:某一 level 的文件数太多和某一文件的查找次数超过允许值,在进行合并时,将优先考虑文件数过多的情况.
Compaction * VersionSet: :PickCompaction() {
Compaction * c;
int level;
const bool size_compaction = (current_ - >compaction_score_ >= 1); //文件数过多
const bool seek_compaction = (current_ - >file_to_compact_ != NULL); //某一文件的查找次数太多
if (size_compaction) { //文件数太多优先考虑
level = current_ - >compaction_level_; //要进行Compaction的level
c = new Compaction(level);
//每一层有一个compact_pointer,用于记录compaction key,这样可以进行循环compaction
for (size_t i = 0; i < current_ - >files_[level].size(); i++) { //从待合并的level中选择合适的文件完成合并操作
FileMetaData * f = current_ - >files_[level][i]; //level层中的第i个文件
if (compact_pointer_[level].empty() || //compact_pointer_中记录的是下次合并的起始Key值,为空时都可以进行合并
icmp_.Compare(f - >largest.Encode(), compact_pointer_[level]) > 0) { //或者f的最大Key值大于起始值
c - >inputs_[0].push_back(f); //则该文件可以参与合并,将其加入到level输入文件中
break;
}
}
if (c - >inputs_[0].empty()) { //若level输入为空,则将level的第一个文件加入到输入中
c - >inputs_[0].push_back(current_ - >files_[level][0]);
}
} else if (seek_compaction) { //然后考虑查找次数过多的情况
level = current_ - >file_to_compact_level_;
c = new Compaction(level);
c - >inputs_[0].push_back(current_ - >file_to_compact_); //将待合并的文件作为level层的输入
} else {
return NULL;
}
c - >input_version_ = current_;
c - >input_version_ - >Ref();
//level 0中的Key值是可以重复的,因此Key值范围可能相互覆盖,把所有重叠都找出来,一起做compaction
if (level == 0) {
InternalKey smallest,
largest;
GetRange(c - >inputs_[0], &smallest, &largest); //待合并的level层的文件的Key值范围
current_ - >GetOverlappingInputs(0, &smallest, &largest, &c - >inputs_[0]);
assert(!c - >inputs_[0].empty());
}
SetupOtherInputs(c); //获取待合并的level+1层的输入
return c;
}
然后判断是否为 trivial Compaction, 当为 trivial Compaction 时,只需要简单的将 level 层的文件移动到 level +1 层即可
bool Compaction: :IsTrivialMove() const {
return (num_input_files(0) == 1 && //level层只有1个文件
num_input_files(1) == 0 && //level+1层没有文件
TotalFileSize(grandparents_) <= kMaxGrandParentOverlapBytes); //level+2层文件总大小不超过最大覆盖范围,否则会导致后面的merge需要很大的开销
}
最终完成完成 Compaction 操作
c - >edit() - >DeleteFile(c - >level(), f - >number);
c - >edit() - >AddFile(c - >level() + 1, f - >number, f - >file_size, f - >smallest, f - >largest);
status = versions_ - >LogAndApply(c - >edit(), &mutex_);
一般的合并
一般的合并调用 DBImpl::DoCompactionWork() 完成,compact 是调用 VersionSet::PickCompacttion() 得到的,与之前的 trivial Compaction 相同. 不同 level 之间,可能存在 Key 值相同的记录,但是记录的 seq 不同. 由之前的分析可知,最新的数据存放在较低的 level 中,其对应的 seq 也一定比 level+1 中的记录的 seq 要大,因此当出现相同 Key 值的记录时,只需要记录第一条记录,后面的都可以丢弃. level 0 中也可能存在 Key 值相同的数据,其后面的 seq 也不同. 数据越新,其对应的 seq 越大,且记录在 level 0 中的记录是按照 user_key 递增,seq 递减的方式存储的,则相同 user_key 对应的记录是聚集在一起的,且按照 seq 递减的方式存放的. 在更高层的 Compaction 时,只需要处理第一条出现的 user_key 相同的记录即可,后面的相同 user_key 的记录都可以丢弃. 因此合并后的 level +1 层的文件中不会存在 Key 值相同的记录. 删除记录的操作也会在此时完成,删除数据的记录会被丢弃,而不会被写入到更高 level 的文件中.
Status DBImpl: :DoCompactionWork(CompactionState * compact) {
if (snapshots_.empty()) {
compact - >smallest_snapshot = versions_ - >LastSequence();
} else {
compact - >smallest_snapshot = snapshots_.oldest() - >number_;
}
mutex_.Unlock();
//生成iterator:遍历要compaction的数据
Iterator * input = versions_ - >MakeInputIterator(compact - >compaction); //用于遍历待合并的每一个文件
input - >SeekToFirst();
Status status;
ParsedInternalKey ikey;
std: :string current_user_key;
bool has_current_user_key = false;
SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
for (; input - >Valid() && !shutting_down_.Acquire_Load();) {
if (has_imm_.NoBarrier_Load() != NULL) { //immutable memtable的优先级最高
mutex_.Lock();
if (imm_ != NULL) { //当imm_非空时,合并Memtable
CompactMemTable();
bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary
}
mutex_.Unlock();
}
Slice key = input - >key();
if (compact - >compaction - >ShouldStopBefore(key) && //是否需要停止Compaction,中途输出compaction的结果,避免compaction结果和level N+2 files有过多的重叠
compact - >builder != NULL) {
status = FinishCompactionOutputFile(compact, input);
}
bool drop = false;
if (!ParseInternalKey(key, &ikey)) {
current_user_key.clear();
has_current_user_key = false;
last_sequence_for_key = kMaxSequenceNumber;
} else {
if (!has_current_user_key || //获取当前的user_key和sequence
user_comparator() - >Compare(ikey.user_key, Slice(current_user_key)) != 0) { //可能存在Key值相同但seq不同的记录
// 此时是这个Key第一次出现
current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
has_current_user_key = true;
last_sequence_for_key = kMaxSequenceNumber; //则将其seq设为最大值,表示第一次出现
}
if (last_sequence_for_key <= compact - >smallest_snapshot) { //表示key已经出现过,否则seq应为KMaxSequenceNumber
drop = true; // (A) //之前已经存在Key值相同的记录,丢弃
} else if (ikey.type == kTypeDeletion && //要删除该记录
ikey.sequence <= compact - >smallest_snapshot && //记录的序号比数据库之前的最小序号还小
compact - >compaction - >IsBaseLevelForKey(ikey.user_key)) { //高的level中没有数据
drop = true; //此时要丢弃该记录
}
last_sequence_for_key = ikey.sequence; //上次出现的记录对应的sequence,用于判断后面出现相同Key值的情况
}
if (!drop) { //如果不需要丢弃该记录
if (compact - >builder == NULL) {
status = OpenCompactionOutputFile(compact); //若需要,则创建一个.sst文件,用于存放合并后的数据
}
if (compact - >builder - >NumEntries() == 0) {
compact - >current_output() - >smallest.DecodeFrom(key);
}
compact - >current_output() - >largest.DecodeFrom(key);
compact - >builder - >Add(key, input - >value()); //将记录写入.sst文件
if (compact - >builder - >FileSize() >= compact - >compaction - >MaxOutputFileSize()) { //当.sst文件超过最大值时
status = FinishCompactionOutputFile(compact, input); //完成Compaction输出文件
}
}
input - >Next(); //处理下一个文件
}
if (status.ok() && compact - >builder != NULL) {
status = FinishCompactionOutputFile(compact, input);
}
if (status.ok()) {
status = input - >status();
}
delete input;
input = NULL;
//更新compaction的一些统计数据
CompactionStats stats;
stats.micros = env_ - >NowMicros() - start_micros - imm_micros;
for (int which = 0; which < 2; which++) {
for (int i = 0; i < compact - >compaction - >num_input_files(which); i++) {
stats.bytes_read += compact - >compaction - >input(which, i) - >file_size;
}
}
for (size_t i = 0; i < compact - >outputs.size(); i++) {
stats.bytes_written += compact - >outputs[i].file_size;
}
mutex_.Lock();
stats_[compact - >compaction - >level() + 1].Add(stats);
if (status.ok()) {
status = InstallCompactionResults(compact); //完成合并
}
if (!status.ok()) {
RecordBackgroundError(status);
}
VersionSet: :LevelSummaryStorage tmp;
Log(options_.info_log, "compacted to: %s", versions_ - >LevelSummary( & tmp));
return status;
}
首先将可以留下的记录写入到. sst 文件中,并将相关信息保存在变量 compact 中,然后调用 InstallCompactionResults() 将所做的改动加入到 VersionEdit 中,再调用 LogAndApply() 来得到新的版本.
LogAndApply()
在上面三种不同的 Compaction 操作中,最终当对当前版本的更改 VersionEdit 全部完成后,都会调用 VersionSet::LogAndApply() 来应用更改,创建新版本. edit 中保存了 level 和 level+1 层要删除和增加的文件.
Status VersionSet: :LogAndApply(VersionEdit * edit, port: :Mutex * mu) {
Version * v = new Version(this); //创建一个新Version
{
Builder builder(this, current_); //基于当前Version创建一个builder变量
builder.Apply(edit); //将edit中记录的要增加,删除的文件加入到builder类中
builder.SaveTo(v); //然后将edit中的记录保存到新创建的Version中,这样就得到了一个新的版本
}
Finalize(v); //根据各层文件数来判断是否还需要进行Compaction
std: :string new_manifest_file;
Status s;
if (descriptor_log_ == NULL) { //只会在第一次调用时进入
assert(descriptor_file_ == NULL);
new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_); //创建一个新的Manifest文件
edit - >SetNextFile(next_file_number_);
s = env_ - >NewWritableFile(new_manifest_file, &descriptor_file_);
if (s.ok()) {
descriptor_log_ = new log: :Writer(descriptor_file_);
s = WriteSnapshot(descriptor_log_); //快照,系统开始时完整记录数据库的所有信息
}
} {
mu - >Unlock();
if (s.ok()) {
std: :string record;
edit - >EncodeTo( & record);
s = descriptor_log_ - >AddRecord(record); //将数据库的变化记录到Manifest文件中
if (s.ok()) {
s = descriptor_file_ - >Sync();
}
}
if (s.ok() && !new_manifest_file.empty()) {
s = SetCurrentFile(env_, dbname_, manifest_file_number_);
}
mu - >Lock();
}
if (s.ok()) {
AppendVersion(v); //将新得到的Version插入到所有Version形成的双向链表的尾部
log_number_ = edit - >log_number_;
prev_log_number_ = edit - >prev_log_number_;
}
}
return s;
}
为了重启之后能恢复数据库之前的状态,就需要将数据库的历史变化信息记录下来,这些信息都是记录在 Manifest 文件中的. 为了节省空间和时间,leveldb 采用的是在系统开始完整的所有数据库的信息(WriteSnapShot()),以后则只记录数据库的变化,即 VersionEdit 中的信息(descriptor_log_->AddRecord()). 恢复时,只需要根据 Manifest 中的信息就可以一步步的恢复到上次的状态.
VersionSet::LogAndApply 首先创建一个新的 Version,然后调用 builder.Apply(edit) 将 edit 中所有要删除,增加的文件编号记录下来,其源码如下:
// Apply all of the edits in *edit to the current state.
void Apply(VersionEdit * edit) {
// 更新每一层下次合并的起始Key值
for (size_t i = 0; i < edit - >compact_pointers_.size(); i++) {
const int level = edit - >compact_pointers_[i].first;
vset_ - >compact_pointer_[level] = edit - >compact_pointers_[i].second.Encode().ToString();
}
//将所有要删除的文件加入到levels_[level].deleted_files变量中
const VersionEdit: :DeletedFileSet & del = edit - >deleted_files_;
for (VersionEdit: :DeletedFileSet: :const_iterator iter = del.begin(); iter != del.end(); ++iter) {
const int level = iter - >first;
const uint64_t number = iter - >second;
levels_[level].deleted_files.insert(number);
}
// 将所有新增加的文件加入到levels_[level].added_files中
for (size_t i = 0; i < edit - >new_files_.size(); i++) {
const int level = edit - >new_files_[i].first;
FileMetaData * f = new FileMetaData(edit - >new_files_[i].second);
f - >refs = 1;
f - >allowed_seeks = (f - >file_size / 16384);
if (f - >allowed_seeks < 100) f - >allowed_seeks = 100;
levels_[level].deleted_files.erase(f - >number);
levels_[level].added_files - >insert(f);
}
}
然后 VersionSet::LogAndApply 再调用 builder.SaveTo(v) 将更改保存到新的 Version 中,其源码如下:
void SaveTo(Version * v) {
BySmallestKey cmp;
cmp.internal_comparator = &vset_ - >icmp_;
for (int level = 0; level < config: :kNumLevels; level++) {
const std: :vector < FileMetaData * >&base_files = base_ - >files_[level]; //当前Version中原有的各个level的.sst文件
std: :vector < FileMetaData * >::const_iterator base_iter = base_files.begin();
std: :vector < FileMetaData * >::const_iterator base_end = base_files.end();
const FileSet * added = levels_[level].added_files; //对应level新增加的文件
v - >files_[level].reserve(base_files.size() + added - >size());
for (FileSet: :const_iterator added_iter = added - >begin(); added_iter != added - >end(); ++added_iter) {
// 将原有文件中编号比added小的加入到新的Version
for (std: :vector < FileMetaData * >::const_iterator bpos = std: :upper_bound(base_iter, base_end, *added_iter, cmp); base_iter != bpos; ++base_iter) {
MaybeAddFile(v, level, *base_iter);
}
MaybeAddFile(v, level, *added_iter); //再将新增的文件依次加入到新的Version
}
for (; base_iter != base_end; ++base_iter) {
MaybeAddFile(v, level, *base_iter); //再将原有文件中剩余的部分加入到新的Version
}
}
}
bpos = std: :upper_bound(base_iter, base_end, *added_iter, cmp); // 返回 base_iter 到 base_end 之间,第一个大于 * added_iter 的 iter. 假设原有文件的编号为 1,3,4,6,8,新增文件的编号为 2,5,7,则第一次循环时,bpos 为 3 对应的迭代器,因此 base_iter 只遍历一个元素,即将编号 1 加入到新的 Version 中. 总体对新增文件来说,就是首先加入 base 中编号比它小的,然后再将其加入,然后再继续比那里下一个新增文件,因此最终得到的文件编号顺序是 1,2,3,4,5,6,7,8,即每一层的. sst 文件都是按照编号从小到大排列的. 这样就得到了新的 Version 的每一层的所有文件.
参考文献:
.http://blog.csdn.net/u012658346/article/details/45787233
.http://blog.csdn.net/u012658346/article/details/45788939
.http://blog.csdn.net/joeyon1985/article/details/47154249
.http://www.blogjava.net/sandy/archive/2012/03/15/leveldb6.html
.http://www.pandademo.com/2016/04/compaction-of-sstable-leveldb-part-1-source-dissect-9/
来源: https://www.cnblogs.com/xueqiuqiu/p/8302545.html