Contents
一、序言
在之前的文章中提到,leveldb所有的记录都是由一个个log文件逐渐转变来的,与存储在磁盘上的log文件对应的就是内存中的memtable。memtable和log文件存储的内容是一致的。
二、SkipList
memtable中的键值对是通过SkipList这种数据结构存储的,SkipList相对于普通的List来说,普通的List查找时间复杂度为O(n),而SkipList可以达到O(log(n)),同时,SkipList相对与红黑树,在多线程上,需要锁住的数据量更小,性能更优。
比如上图就是一个SkipList结构,如果我们需要搜索45这个点,我们首先在最高层(第二层)找到30,然后发现下一个点是57,已经大于45了,于是来到下一层(第一层)从30开始,向后找,就能找到45了,在这个过程中,我们跳过了很多个点。因此这种数据结构得名SkipList。
2.1 SkipList的基本信息
SkipList是memtable中用到的最主要的数据结构。
SkipList的线程安全特点:写操作需要外部的同步,比如信号量这种。读操作需要保证在读的时候SkipList不会被销毁。除此之外,读操作没有任何的内部锁或者同步。
SkipList的基本原则:
(1)分配节点在SkipList销毁之前不会被删除。因为在代码中我们没有任何的删除节点的操作。
(2)一个节点的内容,除了next/prev指针,在这个节点被存储到SkipList之后其他数据都是不变的。只有Insert()会修改list的内容,初始化一个节点并使用release-stores去将这个节点存放到一个或多个list中需要谨慎对待。
2.2 SkipList的成员函数
SkipList中,公共的成员函数只有两个
void Insert(const Key& key);
bool Contains(const Key& key) const;
顾名思义,Insert用来向SkipList中插入key,Contains用来检查SkipList中是否包含某个key。
私有的成员函数如下:
Node* NewNode(const Key& key, int height);
int RandomHeight();
bool Equal(const Key& a, const Key& b) const { return (compare_(a, b) == 0); }
// Return true if key is greater than the data stored in "n"
bool KeyIsAfterNode(const Key& key, Node* n) const;
// Return the earliest node that comes at or after key.
// Return NULL if there is no such node.
//
// If prev is non-NULL, fills prev[level] with pointer to previous
// node at "level" for every level in [0..max_height_-1].
Node* FindGreaterOrEqual(const Key& key, Node** prev) const;
// Return the latest node with a key < key.
// Return head_ if there is no such node.
Node* FindLessThan(const Key& key) const;
// Return the last node in the list.
// Return head_ if list is empty.
Node* FindLast() const;
2.3 SkipList的节点Node实现
// Implementation details follow
template<typename Key, class Comparator>
struct SkipList<Key,Comparator>::Node {
explicit Node(const Key& k) : key(k) { }
Key const key;
// Accessors/mutators for links. Wrapped in methods so we can
// add the appropriate barriers as necessary.
Node* Next(int n) {
assert(n >= 0);
// Use an 'acquire load' so that we observe a fully initialized
// version of the returned Node.
return reinterpret_cast<Node*>(next_[n].Acquire_Load());
}
void SetNext(int n, Node* x) {
assert(n >= 0);
// Use a 'release store' so that anybody who reads through this
// pointer observes a fully initialized version of the inserted node.
next_[n].Release_Store(x);
}
// No-barrier variants that can be safely used in a few locations.
Node* NoBarrier_Next(int n) {
assert(n >= 0);
return reinterpret_cast<Node*>(next_[n].NoBarrier_Load());
}
void NoBarrier_SetNext(int n, Node* x) {
assert(n >= 0);
next_[n].NoBarrier_Store(x);
}
private:
// Array of length equal to the node height. next_[0] is lowest level link.
port::AtomicPointer next_[1];
};
注意到Node实现了两个版本的Next和SetNext,带有NoBarrier_的是多进程不安全的,因为在少数情况下会用到,因此做了额外的实现。另外,可以注意这里只实现了Next()方法,并没有Prev()指向上一个节点,可以知道SkipList并不是一个双向链表
,但是SkipList的迭代器是支持Prev()方法的,所以这是一个值得注意的地方。
Node节点的创建是通过NewNode()方法
template<typename Key, class Comparator>
typename SkipList<Key,Comparator>::Node*
SkipList<Key,Comparator>::NewNode(const Key& key, int height) {
char* mem = arena_->AllocateAligned(
sizeof(Node) + sizeof(port::AtomicPointer) * (height - 1));
return new (mem) Node(key);
}
NewNode中使用了arena来分配对齐的内存,分配的内存大小是sizeof(Node)+sizeof(port::AtomicPointer) * (height - 1)
。这里可以注意到申请的空间不仅仅是一个Node的大小,还包括sizeof(port::AtomicPointer) * (height – 1),这是因为SkipList是一个多层的结构,所以如果当前Node是3层,那么就需要3个Next指针,在Node的代码中,默认只申请了一个Next指针空间,所以需要额外的多申请(height – 1)个AtomicPointer的空间。
2.4 SkipList的迭代器(Iterator)
SkipList实现了自己的迭代器,迭代器的声明如下:
class Iterator {
public:
// Initialize an iterator over the specified list.
// The returned iterator is not valid.
explicit Iterator(const SkipList* list);
// 如果迭代器位于一个有效的节点上,就会返回true.
bool Valid() const;
// 返回当前位置的key.
// REQUIRES: Valid()
const Key& key() const;
// 前进到下一个位置
// REQUIRES: Valid()
void Next();
// 后退到上一个位置
// REQUIRES: Valid()
void Prev();
// 移到第一个key >= target的entry
void Seek(const Key& target);
// 移到这个list的第一个entry
// 如果list非空,迭代器的最终状态是Valid()
void SeekToFirst();
// 移到这个list的最后一个entry
// 如果list非空,迭代器的最终状态是Valid()
void SeekToLast();
private:
const SkipList* list_;
Node* node_;
// Intentionally copyable
};
其中Prev()的实现并不是通过prev指针,因为上文中已经介绍了Node只有next指针。Prev()是通过FindLessThan来实现,找到小于指定Node的最后一个值。这里有一个问题,为什么不使用prev指针,难道不会大大的降低时间复杂度吗?
看一下FindLessThan的具体实现。
template<typename Key, class Comparator>
typename SkipList<Key,Comparator>::Node*
SkipList<Key,Comparator>::FindLessThan(const Key& key) const {
Node* x = head_;
int level = GetMaxHeight() - 1;
while (true) {
assert(x == head_ || compare_(x->key, key) < 0);
Node* next = x->Next(level);
if (next == NULL || compare_(next->key, key) >= 0) {
if (level == 0) {
return x;
} else {
// Switch to next list
level--;
}
} else {
x = next;
}
}
}
其实就是SkipList查找过程的实现。时间复杂度应该是O(log(n)).相比于prev指针的话,在空间占用上是少一点的。具体为什么这样实现还得继续往下看。
SkipList还以同样的方式实现了FindGreaterOrEqual
和FindLast
。
template<typename Key, class Comparator>
typename SkipList<Key,Comparator>::Node* SkipList<Key,Comparator>::FindGreaterOrEqual(const Key& key, Node** prev)
const {
Node* x = head_;
int level = GetMaxHeight() - 1;
while (true) {
Node* next = x->Next(level);
if (KeyIsAfterNode(key, next)) {
// Keep searching in this list
x = next;
} else {
if (prev != NULL) prev[level] = x;
if (level == 0) {
return next;
} else {
// Switch to next list
level--;
}
}
}
}
FindGreaterOrEqual
的实现还是比较有意思的,在返回比指定key大于或等于的节点的同时,还可以选择性的将该节点的prev指针保存到Node** prev中,返回给用户。其实这也是Iterator中Prev()的实现不采用prev指针的原因,因为有了FindGreaterOrEqual这样的实现,SkipList的内部实现中根本不需要Prev()来将指针向前移动,同时又大大的节约了内存。
下面看一下SkipList的Insert的实现。
void SkipList<Key,Comparator>::Insert(const Key& key) {
// TODO(opt): We can use a barrier-free variant of FindGreaterOrEqual()
// here since Insert() is externally synchronized.
Node* prev[kMaxHeight];
Node* x = FindGreaterOrEqual(key, prev);
// Our data structure does not allow duplicate insertion
assert(x == NULL || !Equal(key, x->key));
int height = RandomHeight();
if (height > GetMaxHeight()) {
for (int i = GetMaxHeight(); i < height; i++) {
prev[i] = head_;
}
//fprintf(stderr, "Change height from %d to %d\n", max_height_, height);
// It is ok to mutate max_height_ without any synchronization
// with concurrent readers. A concurrent reader that observes
// the new value of max_height_ will see either the old value of
// new level pointers from head_ (NULL), or a new value set in
// the loop below. In the former case the reader will
// immediately drop to the next level since NULL sorts after all
// keys. In the latter case the reader will use the new node.
max_height_.NoBarrier_Store(reinterpret_cast<void*>(height));
}
x = NewNode(key, height);
for (int i = 0; i < height; i++) {
// NoBarrier_SetNext() suffices since we will add a barrier when
// we publish a pointer to "x" in prev[i].
x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
prev[i]->SetNext(i, x);
}
}
首先,Insert方法中没有做任何的多线程同步,所以需要调用者在外部进行写入同步(防止多个线程同步写入)。
插入的过程是先找到需要插入的位置,然后调用RandomHeight()来随机得到当前节点的高度,之后向普通链表一样执行插入操作。
注意到这里有一个多线程读写的问题。也就是当我们改变max_height_的值的时候,并没有改变head_指针。这种情况下有两种可能,一是读到了新的max_height_,但是head_指针还没有更改,这时候指针会立刻下降到下一级,SkipList的多级只是为了提高查找速度,所以在这里并没有其他的副作用;二是正常的情况,不作讨论。
2.Memtable
Memtable的实现主要就是依赖于SkipList,分析了SKipList之后,Memtable的插入,查找操作都是SkipList的插入和查找。
// Increase reference count.
void Ref() { ++refs_; }
// Drop reference count. Delete if no more references exist.
void Unref() {
--refs_;
assert(refs_ >= 0);
if (refs_ <= 0) {
delete this;
}
}
Memtable的实现使用了类似于计数器引用的机制,不过这是手动实现的,所以需要用户在使用Memtable时调用Ref()来增加引用计数,引用结束后调用Unref来减少引用计数,当引用计数为0时则销毁自己。
总结
Memtable的设计还是很让人佩服的,至少对我来说是这样。了解到了SkipList这样的数据结构,也了解到内存屏障这种多进程情况下代码执行乱序的问题,还有内存对齐的必要、内存对齐的实现。同时也了解了一些C++编写程序的独特风格(我写的C++一直像Java,反而少了一点C++的美)