Contents
一、序言
在之前的文章中提到,leveldb所有的记录都是由一个个log文件逐渐转变来的,与存储在磁盘上的log文件对应的就是内存中的memtable。memtable和log文件存储的内容是一致的。
二、SkipList
memtable中的键值对是通过SkipList这种数据结构存储的,SkipList相对于普通的List来说,普通的List查找时间复杂度为O(n),而SkipList可以达到O(log(n)),同时,SkipList相对与红黑树,在多线程上,需要锁住的数据量更小,性能更优。

比如上图就是一个SkipList结构,如果我们需要搜索45这个点,我们首先在最高层(第二层)找到30,然后发现下一个点是57,已经大于45了,于是来到下一层(第一层)从30开始,向后找,就能找到45了,在这个过程中,我们跳过了很多个点。因此这种数据结构得名SkipList。
2.1 SkipList的基本信息
SkipList是memtable中用到的最主要的数据结构。
SkipList的线程安全特点:写操作需要外部的同步,比如信号量这种。读操作需要保证在读的时候SkipList不会被销毁。除此之外,读操作没有任何的内部锁或者同步。
SkipList的基本原则:
(1)分配节点在SkipList销毁之前不会被删除。因为在代码中我们没有任何的删除节点的操作。
(2)一个节点的内容,除了next/prev指针,在这个节点被存储到SkipList之后其他数据都是不变的。只有Insert()会修改list的内容,初始化一个节点并使用release-stores去将这个节点存放到一个或多个list中需要谨慎对待。
2.2 SkipList的成员函数
SkipList中,公共的成员函数只有两个
void Insert(const Key& key);
bool Contains(const Key& key) const;
顾名思义,Insert用来向SkipList中插入key,Contains用来检查SkipList中是否包含某个key。
私有的成员函数如下:
  Node* NewNode(const Key& key, int height);
  int RandomHeight();
  bool Equal(const Key& a, const Key& b) const { return (compare_(a, b) == 0); }
  // Return true if key is greater than the data stored in "n"
  bool KeyIsAfterNode(const Key& key, Node* n) const;
  // Return the earliest node that comes at or after key.
  // Return NULL if there is no such node.
  //
  // If prev is non-NULL, fills prev[level] with pointer to previous
  // node at "level" for every level in [0..max_height_-1].
  Node* FindGreaterOrEqual(const Key& key, Node** prev) const;
  // Return the latest node with a key < key.
  // Return head_ if there is no such node.
  Node* FindLessThan(const Key& key) const;
  // Return the last node in the list.
  // Return head_ if list is empty.
  Node* FindLast() const;
2.3 SkipList的节点Node实现
// Implementation details follow
template<typename Key, class Comparator>
struct SkipList<Key,Comparator>::Node {
  explicit Node(const Key& k) : key(k) { }
  Key const key;
  // Accessors/mutators for links.  Wrapped in methods so we can
  // add the appropriate barriers as necessary.
  Node* Next(int n) {
    assert(n >= 0);
    // Use an 'acquire load' so that we observe a fully initialized
    // version of the returned Node.
    return reinterpret_cast<Node*>(next_[n].Acquire_Load());
  }
  void SetNext(int n, Node* x) {
    assert(n >= 0);
    // Use a 'release store' so that anybody who reads through this
    // pointer observes a fully initialized version of the inserted node.
    next_[n].Release_Store(x);
  }
  // No-barrier variants that can be safely used in a few locations.
  Node* NoBarrier_Next(int n) {
    assert(n >= 0);
    return reinterpret_cast<Node*>(next_[n].NoBarrier_Load());
  }
  void NoBarrier_SetNext(int n, Node* x) {
    assert(n >= 0);
    next_[n].NoBarrier_Store(x);
  }
 private:
  // Array of length equal to the node height.  next_[0] is lowest level link.
  port::AtomicPointer next_[1];
};
注意到Node实现了两个版本的Next和SetNext,带有NoBarrier_的是多进程不安全的,因为在少数情况下会用到,因此做了额外的实现。另外,可以注意这里只实现了Next()方法,并没有Prev()指向上一个节点,可以知道SkipList并不是一个双向链表,但是SkipList的迭代器是支持Prev()方法的,所以这是一个值得注意的地方。
Node节点的创建是通过NewNode()方法
template<typename Key, class Comparator>
typename SkipList<Key,Comparator>::Node*
SkipList<Key,Comparator>::NewNode(const Key& key, int height) {
  char* mem = arena_->AllocateAligned(
      sizeof(Node) + sizeof(port::AtomicPointer) * (height - 1));
  return new (mem) Node(key);
}
NewNode中使用了arena来分配对齐的内存,分配的内存大小是sizeof(Node)+sizeof(port::AtomicPointer) * (height - 1)。这里可以注意到申请的空间不仅仅是一个Node的大小,还包括sizeof(port::AtomicPointer) * (height – 1),这是因为SkipList是一个多层的结构,所以如果当前Node是3层,那么就需要3个Next指针,在Node的代码中,默认只申请了一个Next指针空间,所以需要额外的多申请(height – 1)个AtomicPointer的空间。
2.4 SkipList的迭代器(Iterator)
SkipList实现了自己的迭代器,迭代器的声明如下:
class Iterator {
   public:
    // Initialize an iterator over the specified list.
    // The returned iterator is not valid.
    explicit Iterator(const SkipList* list);
    // 如果迭代器位于一个有效的节点上,就会返回true.
    bool Valid() const;
    // 返回当前位置的key.
    // REQUIRES: Valid()
    const Key& key() const;
    // 前进到下一个位置
    // REQUIRES: Valid()
    void Next();
    // 后退到上一个位置
    // REQUIRES: Valid()
    void Prev();
    // 移到第一个key >= target的entry
    void Seek(const Key& target);
    // 移到这个list的第一个entry
    // 如果list非空,迭代器的最终状态是Valid()
    void SeekToFirst();
    // 移到这个list的最后一个entry
    // 如果list非空,迭代器的最终状态是Valid()
    void SeekToLast();
   private:
    const SkipList* list_; 
    Node* node_;
    // Intentionally copyable
  };
其中Prev()的实现并不是通过prev指针,因为上文中已经介绍了Node只有next指针。Prev()是通过FindLessThan来实现,找到小于指定Node的最后一个值。这里有一个问题,为什么不使用prev指针,难道不会大大的降低时间复杂度吗?看一下FindLessThan的具体实现。
template<typename Key, class Comparator>
typename SkipList<Key,Comparator>::Node*
SkipList<Key,Comparator>::FindLessThan(const Key& key) const {
  Node* x = head_;
  int level = GetMaxHeight() - 1;
  while (true) {
    assert(x == head_ || compare_(x->key, key) < 0);
    Node* next = x->Next(level);
    if (next == NULL || compare_(next->key, key) >= 0) {
      if (level == 0) {
        return x;
      } else {
        // Switch to next list
        level--;
      }
    } else {
      x = next;
    }
  }
}
其实就是SkipList查找过程的实现。时间复杂度应该是O(log(n)).相比于prev指针的话,在空间占用上是少一点的。具体为什么这样实现还得继续往下看。
SkipList还以同样的方式实现了FindGreaterOrEqual和FindLast。
template<typename Key, class Comparator>
typename SkipList<Key,Comparator>::Node* SkipList<Key,Comparator>::FindGreaterOrEqual(const Key& key, Node** prev)
    const {
  Node* x = head_;
  int level = GetMaxHeight() - 1;
  while (true) {
    Node* next = x->Next(level);
    if (KeyIsAfterNode(key, next)) {
      // Keep searching in this list
      x = next;
    } else {
      if (prev != NULL) prev[level] = x;
      if (level == 0) {
        return next;
      } else {
        // Switch to next list
        level--;
      }
    }
  }
}
FindGreaterOrEqual的实现还是比较有意思的,在返回比指定key大于或等于的节点的同时,还可以选择性的将该节点的prev指针保存到Node** prev中,返回给用户。其实这也是Iterator中Prev()的实现不采用prev指针的原因,因为有了FindGreaterOrEqual这样的实现,SkipList的内部实现中根本不需要Prev()来将指针向前移动,同时又大大的节约了内存。
下面看一下SkipList的Insert的实现。
void SkipList<Key,Comparator>::Insert(const Key& key) {
  // TODO(opt): We can use a barrier-free variant of FindGreaterOrEqual()
  // here since Insert() is externally synchronized.
  Node* prev[kMaxHeight];
  Node* x = FindGreaterOrEqual(key, prev);
  // Our data structure does not allow duplicate insertion
  assert(x == NULL || !Equal(key, x->key));
  int height = RandomHeight();
  if (height > GetMaxHeight()) {
    for (int i = GetMaxHeight(); i < height; i++) {
      prev[i] = head_;
    }
    //fprintf(stderr, "Change height from %d to %d\n", max_height_, height);
    // It is ok to mutate max_height_ without any synchronization
    // with concurrent readers.  A concurrent reader that observes
    // the new value of max_height_ will see either the old value of
    // new level pointers from head_ (NULL), or a new value set in
    // the loop below.  In the former case the reader will
    // immediately drop to the next level since NULL sorts after all
    // keys.  In the latter case the reader will use the new node.
    max_height_.NoBarrier_Store(reinterpret_cast<void*>(height));
  }
  x = NewNode(key, height);
  for (int i = 0; i < height; i++) {
    // NoBarrier_SetNext() suffices since we will add a barrier when
    // we publish a pointer to "x" in prev[i].
    x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
    prev[i]->SetNext(i, x);
  }
}
首先,Insert方法中没有做任何的多线程同步,所以需要调用者在外部进行写入同步(防止多个线程同步写入)。
插入的过程是先找到需要插入的位置,然后调用RandomHeight()来随机得到当前节点的高度,之后向普通链表一样执行插入操作。
注意到这里有一个多线程读写的问题。也就是当我们改变max_height_的值的时候,并没有改变head_指针。这种情况下有两种可能,一是读到了新的max_height_,但是head_指针还没有更改,这时候指针会立刻下降到下一级,SkipList的多级只是为了提高查找速度,所以在这里并没有其他的副作用;二是正常的情况,不作讨论。
2.Memtable
Memtable的实现主要就是依赖于SkipList,分析了SKipList之后,Memtable的插入,查找操作都是SkipList的插入和查找。
  // Increase reference count.
  void Ref() { ++refs_; }
  // Drop reference count.  Delete if no more references exist.
  void Unref() {
    --refs_;
    assert(refs_ >= 0);
    if (refs_ <= 0) {
      delete this;
    }
  }
Memtable的实现使用了类似于计数器引用的机制,不过这是手动实现的,所以需要用户在使用Memtable时调用Ref()来增加引用计数,引用结束后调用Unref来减少引用计数,当引用计数为0时则销毁自己。
总结
Memtable的设计还是很让人佩服的,至少对我来说是这样。了解到了SkipList这样的数据结构,也了解到内存屏障这种多进程情况下代码执行乱序的问题,还有内存对齐的必要、内存对齐的实现。同时也了解了一些C++编写程序的独特风格(我写的C++一直像Java,反而少了一点C++的美)