Take a look at Level source code

最近看了一点leveldb源码,了解一下实现机制。

之前也看过leveldb相关介绍以及别人的分析blog,已经有了一定了解。leveldb如其名,按照层级来组织数据,数据从内存到磁盘一层一层迁移。在内存中是通过skiplist来管理数据,而磁盘上则是一种名为SSTable(Sorted Strings Table)的结构来存储数据的。

DB::Get实现

这个头文件include/leveldb/db.h定义了DB抽象类,Get接口也在其中,具体实现在db/db_impl.cc文件中。
下面引用的代码因为篇幅会删除一些代码行,完整代码参考源文件。

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
Status DBImpl::Get(const ReadOptions& options,
const Slice& key,
std::string* value) {
Status s;
MutexLock l(&mutex_);
SequenceNumber snapshot;
...

// Unlock while reading from files and memtables
{
mutex_.Unlock();
// First look in the memtable, then in the immutable memtable (if any).
LookupKey lkey(key, snapshot);
if (mem->Get(lkey, value, &s)) {
// Done
} else if (imm != NULL && imm->Get(lkey, value, &s)) {
// Done
} else {
s = current->Get(options, lkey, value, &stats);
have_stat_update = true;
}
mutex_.Lock();
}

if (have_stat_update && current->UpdateStats(stats)) {
MaybeScheduleCompaction();
}
...
return s;
}

上面是Get的实现函数,省略了一些代码。Get主要的查询过程在中间if-else语句分支中。在查询之前mutex_.Unlock();进行了解锁,是因为数据是只追加不删除的,可以同时读写。数据删除会转换成一条标记key-deleted的数据追加到库中。

SequenceNumber snapshot为数据序号,每一条数据都有序号,后追加的序号比之前的序号要大,相同key的数据,序号大的要排在前面,参见db/dbformat.cc InternalKeyComparator::Compare函数。

第一个分支mem指向一个MemTable,MemTable只有Add和Get两个接口来操作数据,底层实现为skiplist,这个mem指向可修改的MemTable。

第二个分支imm指向一个不可修改的MemTable,imm是mem达到一定条件后转换来的,具体的逻辑在db/db_impl.cc DBImpl::MakeRoomForWrite函数中。

前面2个分支都是在内存中进行查询,没找到就只能到磁盘上查询。最后一个分支current指向当前的Version,Version包含数据文件的元信息。

最后根据情况调用MaybeScheduleCompaction函数,在后台对数据进行Compact,将内存的迁到磁盘,对磁盘上的数据进行合并等。

Version::Get实现。这个函数就是上一节最后一个if分支调用的函数,也是查询磁盘数据的入口。

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
Status Version::Get(const ReadOptions& options,
const LookupKey& k,
std::string* value,
GetStats* stats) {
...
// We can search level-by-level since entries never hop across
// levels. Therefore we are guaranteed that if we find data
// in an smaller level, later levels are irrelevant.
std::vector<FileMetaData*> tmp;
FileMetaData* tmp2;
for (int level = 0; level < config::kNumLevels; level++) {
size_t num_files = files_[level].size();
if (num_files == 0) continue;

// Get the list of files to search in this level
FileMetaData* const* files = &files_[level][0];
if (level == 0) {
// Level-0 files may overlap each other. Find all files that
// overlap user_key and process them in order from newest to oldest.
tmp.reserve(num_files);
for (uint32_t i = 0; i < num_files; i++) {
FileMetaData* f = files[i];
if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 &&
ucmp->Compare(user_key, f->largest.user_key()) <= 0) {
tmp.push_back(f);
}
}
if (tmp.empty()) continue;

std::sort(tmp.begin(), tmp.end(), NewestFirst);
...
} else {
// Binary search to find earliest index whose largest key >= ikey.
uint32_t index = FindFile(vset_->icmp_, files_[level], ikey);
if (index >= num_files) {
...
} else {
tmp2 = files[index];
if (ucmp->Compare(user_key, tmp2->smallest.user_key()) < 0) {
// All of "tmp2" is past any data for user_key
...
} else {
files = &tmp2;
num_files = 1;
}
}
}

for (uint32_t i = 0; i < num_files; ++i) {
...

FileMetaData* f = files[i];
last_file_read = f;
last_file_read_level = level;

Saver saver;
saver.state = kNotFound;
saver.ucmp = ucmp;
saver.user_key = user_key;
saver.value = value;
s = vset_->table_cache_->Get(options, f->number, f->file_size,
ikey, &saver, SaveValue);
...
}
}
return Status::NotFound(Slice()); // Use an empty error message for speed
}

上面的函数主要是对每个level上的数据从低到高进行查询,比较新的数据放在低的level。

主for循环所有level,先根据key查找符合要求的文件,由于Sstable是排序数据,每个文件都有key的范围,所以可以查找包含了查询key的文件即可。level-0和其他的level处理方式不太一样,level-0是直接遍历,而其他level调用FindFile进行查询。

找到符合要求的文件之后,进入后一个for循环,通过vset_->table_cache_->Get查找所有的文件。

上面提到的FindFile使用internal_key即带序号的查询key在一层的文件中进行二分查找,找到离查询key最近且文件largest-key比查询key大的文件,如果key存在库的这一层中,那应该会落在这个文件。

TableCache::Get比较简单,先调用了FindTable找到对应的Table对象,然后调用Table对象的InternalGet函数。下面说FindTable函数和Table::InternalGet函数。

FindTable函数

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Status TableCache::FindTable(uint64_t file_number, uint64_t file_size,
Cache::Handle** handle) {
Status s;
char buf[sizeof(file_number)];
EncodeFixed64(buf, file_number);
Slice key(buf, sizeof(buf));

*handle = cache_->Lookup(key);

if (*handle == NULL) {
std::string fname = TableFileName(dbname_, file_number);
RandomAccessFile* file = NULL;
...
if (s.ok()) {
s = Table::Open(*options_, file, file_size, &table);
}

if (!s.ok()) {
...
} else {
TableAndFile* tf = new TableAndFile;
tf->file = file;
tf->table = table;
*handle = cache_->Insert(key, tf, 1, &DeleteEntry);
}
}
return s;
}

cache_->Lookup(key)先在cache中查找,cache_指向一个LRU的cache,缓存的内容为打开的文件对象和Table对象的指针,最后一个else语句块里cache_->Insert把要缓存的内容插入了缓存。

若缓存中没有要找的Table则调用Table::Open打开文件载入Table对象,然后插入缓存。

Table::Open代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
Status Table::Open(const Options& options,
RandomAccessFile* file,
uint64_t size,
Table** table) {
...
char footer_space[Footer::kEncodedLength];
Slice footer_input;
Status s = file->Read(size - Footer::kEncodedLength, Footer::kEncodedLength,
&footer_input, footer_space);
...
if (s.ok()) {
...
s = ReadBlock(file, opt, footer.index_handle(), &contents);
if (s.ok()) {
index_block = new Block(contents);
}
}

if (s.ok()) {
// We've successfully read the footer and the index block: we're
// ready to serve requests.
Rep* rep = new Table::Rep;
rep->options = options;
rep->file = file;
rep->metaindex_handle = footer.metaindex_handle();
rep->index_block = index_block;
rep->cache_id = (options.block_cache ? options.block_cache->NewId() : 0);
...
*table = new Table(rep);
(*table)->ReadMeta(footer);
} else {
...
}
return s;
}

Table::Open把index-block的内容读出来缓存起来,如果有meta数据或filter数据,也会读出来并缓存。options.block_cache这个指针如果指向一个cache对象,后面在读入新的block的时候也会把block缓存起来。

Table::InternalGet

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Status Table::InternalGet(const ReadOptions& options, const Slice& k,
void* arg,
void (*saver)(void*, const Slice&, const Slice&)) {
Status s;
Iterator* iiter = rep_->index_block->NewIterator(rep_->options.comparator);
iiter->Seek(k);
if (iiter->Valid()) {
Slice handle_value = iiter->value();
FilterBlockReader* filter = rep_->filter;
BlockHandle handle;
if (filter != NULL &&
handle.DecodeFrom(&handle_value).ok() &&
!filter->KeyMayMatch(handle.offset(), k)) {
// Not found
} else {
Iterator* block_iter = BlockReader(this, options, iiter->value());
block_iter->Seek(k);
if (block_iter->Valid()) {
(*saver)(arg, block_iter->key(), block_iter->value());
}
s = block_iter->status();
delete block_iter;
}
}
...
return s;
}

Table::InternalGet先在index-block中找到距离key最近的block,block也有key范围,查找过程和查找文件类似也是通过二分查找找到最近的block,但是index-block并不是所有block的索引,所以还需要进一步到block附近进行查找。

如果找到key附近的block,就对block进行查找。先结合filter判断key是否不在,如不在直接返回NotFound。然后读index对应的block,进行二次查找。iter->Seek(k)具体可以参考table/block.cc Block::Iter::Seek函数,函数并没有进行相等比较,只能定位范围。由于iter->Seek(k)只能定位到key附近,所以需要调用(*saver)(arg, block_iter->key(), block_iter->value()),saver对应上文提到的db/version_set.cc SaveValue函数,代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static void SaveValue(void* arg, const Slice& ikey, const Slice& v) {
Saver* s = reinterpret_cast<Saver*>(arg);
ParsedInternalKey parsed_key;
if (!ParseInternalKey(ikey, &parsed_key)) {
s->state = kCorrupt;
} else {
if (s->ucmp->Compare(parsed_key.user_key, s->user_key) == 0) {
s->state = (parsed_key.type == kTypeValue) ? kFound : kDeleted;
if (s->state == kFound) {
s->value->assign(v.data(), v.size());
}
}
}
}

数据文件的编码格式比较复杂,就不写了,可以参考源文件或网络。

以上就是Get的过程,流程还是比较长的。网上的测试结果表明leveldb的写性能高于读,跟它的磁盘查找关系很大,对于需要频繁随机读的应用还是要仔细考虑一下性能问题。打开block-cache可能会提高读性能,相应的就需要消耗内存,把文件放到ssd也是一个优化方案,没有具体实践,不知效果如何。

欢迎指出本文的错误,也欢迎分享相关内容!

打赏
  • 版权声明: 本博客所有文章除特别声明外,均采用 Apache License 2.0 许可协议。转载请注明出处!
  • © 2015-2024 RivenZoo
  • Powered by Hexo Theme Ayer
  • PV: UV:

请我喝杯咖啡吧~

支付宝
微信