leveldb源代码阅读(四)- table cache的实现

一、前言

缓存在整个计算机体系中,占据着举足轻重的地位,往往被用于提升软件的运行速度。在计算机系统中,最典型的当属CPU高速缓存了,CPU高速缓存是介于CPU寄存器和内存之间,CPU向内存中请求数据时,会先检查CPU高速缓存中是否存在数据,如果不存在,则会将内存中的数据放入高速缓存中,再将高速缓存中的数据读入CPU。这个过程中,缓存之所以能够大大的提升系统速度,是因为程序在运行的时候对内存的访问具有局部性的特点,这种局部性我理解为程序在运行时对某一块的内存请求会非常频繁,而这一块内存在第一次请求之后就会被缓存,所以会大大提升之后的数据读取速度。所以,缓存设计的是否合理有效,在于缓存的命中率高不高。

在leveldb中,为了提升对数据的检索速度,也设计了缓存,对应的代码在db/table_cache.h和db/table_cache.cc中,这个table cache的实现主要借助于Cache类,关于Cache类,是可以用户自定义实现的,但leveldb也有一个内置的Cache的实现,文件是util/cache.cc。主要是LRU(最近最少使用)算法,原理是“如果一个数据最近被使用,那么将来被使用的概率同样也很大”。同样也实现了一个HashTable,leveldb中提供的数据显示在g++ 4.4.3下比built-in的哈希表性能稍高,大概5%左右。

二、概览

2.1 hash table数据结构

哈希表是一个很常见的结构了,存储的是key-value结构,一个key-value对常被称作entry,它最大的特点是查找快。在网上找了一张hash table的图
hash table

首先它是一个长为len的数组,每个数组中的元素都是一个链表。如图所示,john Smith和Sandra Dee都被hash到152这个地方,所以在152这个地方使用链表来存储这两个entry.查找的时候Sandra Dee的时候,先找到152这个地方,在遍历链表,直到找到key是Sandra的entry。

2.2 LRU算法

在leveldb中,LRU算法的实现是使用两个双向环形链表,一个链表(in-use)存储当前正在使用的数据,另一个链表(lru)按照访问时间先后顺序存储缓存数据,每个数据都可以在in-use和lru之间切换。当我们需要使用LRU算法来淘汰数据时,只需要在lru上淘汰排序靠后的数据即可。

2.3 分片LRU缓存

分片LRU缓存很简单,其实就是同时创建多个LRU缓存对象,然后使用hash将特定的缓存数据放置到相应的LRU缓存对象中。这个方式可以避免一个LRU缓存中存储过多的数据。

三、详细分析

3.1 Hash Table

leveldb中实现了HashTable,使用的是最经典的数组+链表的实现。通过hash值定位到数组中的某一处,然后在这一处的链表上遍历查找。

HashTable的长度是哈希算法效率的最大影响因素,如果存在较多的hash冲突,则在链表上遍历查找的时间花费会很长。所以这个长度的选择是很重要的,比如在Java中的HashMap,实现中有一个负载因子0.7,如果数组上已经有值的数量超过总长度的0.7就会对整个哈希表resize。在leveldb中,这个resize的阈值是当前所有Insert进来的元素个数超过了数组的长度length,就会进行resize。这里简单的分析一下最重要的查找和resize过程。

  • 哈希表查找过程
  // Return a pointer to slot that points to a cache entry that
  // matches key/hash.  If there is no such cache entry, return a
  // pointer to the trailing slot in the corresponding linked list.
  LRUHandle** FindPointer(const Slice& key, uint32_t hash) {
    LRUHandle** ptr = &list_[hash & (length_ - 1)];     //这里hash & (length_ - 1)会定位到小于length的位置,ptr就是hash到的位置
    while (*ptr != NULL &&
           ((*ptr)->hash != hash || key != (*ptr)->key())) {
           //hash到特定位置后,如果当前位置的hash和当前hash不一样,或者key不一样,并且指针也不为空,则继续向下找,直到找到
      ptr = &(*ptr)->next_hash;
    }
    return ptr;
  }
  • 哈希表resize的过程
   void Resize() {
    uint32_t new_length = 4;        //哈希表的长度从4开始,逐倍增长
    while (new_length < elems_) {
      new_length *= 2;
    }
    LRUHandle** new_list = new LRUHandle*[new_length];
    memset(new_list, 0, sizeof(new_list[0]) * new_length);      //申请新的哈希表的所需的空间
    uint32_t count = 0;
    for (uint32_t i = 0; i < length_; i++) {    //将旧的哈希表的数据重新计算复制到新的哈希表中
      LRUHandle* h = list_[i];
      while (h != NULL) {
        LRUHandle* next = h->next_hash;
        uint32_t hash = h->hash;
        LRUHandle** ptr = &new_list[hash & (new_length - 1)];
        h->next_hash = *ptr;
        *ptr = h;
        h = next;
        count++;
      }
    }
    assert(elems_ == count);
    delete[] list_; //删除旧的哈希表
    list_ = new_list;
    length_ = new_length;
  }
};

3.2 LRUCache的实现

这里的LRUCache的私有成员变量包括以下几个

  // Initialized before use.
  size_t capacity_;         //LRUCache的大小。

  // mutex_ protects the following state.
  mutable port::Mutex mutex_;       //互斥变量,用来同步访问
  size_t usage_;                    //LRUCache已经使用的大小

  // Dummy head of LRU list.
  // lru.prev is newest entry, lru.next is oldest entry.
  // Entries have refs==1 and in_cache==true.
  LRUHandle lru_;                   //LRU链表的头,lru.prev代表新的节点,lru.next代表旧的节点,节点的refs == 1,in_cache == true

  // Dummy head of in-use list.
  // Entries are in use by clients, and have refs >= 2 and in_cache==true.
  LRUHandle in_use_;                //正在使用的节点链表的头,refs >= 2,in_cache == true

  HandleTable table_;               //哈希表,用来在缓存中实现快速查找

LRU的构造函数,直接构造出空的环形链表

LRUCache::LRUCache()
    : usage_(0) {
  // Make empty circular linked lists.
  lru_.next = &lru_;
  lru_.prev = &lru_;
  in_use_.next = &in_use_;
  in_use_.prev = &in_use_;
}

LRU中节点的引用和解引用

//增加引用时,如果在缓存中,则移动到in_use中
void LRUCache::Ref(LRUHandle* e) {
  if (e->refs == 1 && e->in_cache) {  // If on lru_ list, move to in_use_ list.
    LRU_Remove(e);
    LRU_Append(&in_use_, e);
  }
  e->refs++;
}

//解引用时会出现两种情况。1.节点不再需要,使用deleter来删除节点 2.不再被使用,移动到缓存
void LRUCache::Unref(LRUHandle* e) {
  assert(e->refs > 0);
  e->refs--;
  if (e->refs == 0) { // Deallocate.
    assert(!e->in_cache);
    (*e->deleter)(e->key(), e->value);
    free(e);
  } else if (e->in_cache && e->refs == 1) {  // No longer in use; move to lru_ list.
    LRU_Remove(e);
    LRU_Append(&lru_, e);
  }
}

缓存的插入

Cache::Handle* LRUCache::Insert(
    const Slice& key, uint32_t hash, void* value, size_t charge,
    void (*deleter)(const Slice& key, void* value)) {
  MutexLock l(&mutex_);

  LRUHandle* e = reinterpret_cast<LRUHandle*>(
      malloc(sizeof(LRUHandle)-1 + key.size()));    //这个地方申请内存需要注意,在LRUHandle中,key_data只有1个字节,其实是整个key的开头一个字节,所以申请的空间实际上是包含整个key的
  e->value = value;
  e->deleter = deleter;
  e->charge = charge;
  e->key_length = key.size();
  e->hash = hash;
  e->in_cache = false;
  e->refs = 1;  // for the returned handle.
  memcpy(e->key_data, key.data(), key.size());

  if (capacity_ > 0) {      //如果capacity大于0,也就是需要进行缓存
    e->refs++;  // for the cache's reference.
    e->in_cache = true;
    LRU_Append(&in_use_, e);
    usage_ += charge;
    FinishErase(table_.Insert(e));
  } // else don't cache.  (Tests use capacity_==0 to turn off caching.)

    //当使用的内存大于容量时,则要移除旧的缓存,直到缓存小于指定的容量
  while (usage_ > capacity_ && lru_.next != &lru_) {
    LRUHandle* old = lru_.next;
    assert(old->refs == 1);
    bool erased = FinishErase(table_.Remove(old->key(), old->hash));
    if (!erased) {  // to avoid unused variable when compiled NDEBUG
      assert(erased);
    }
  }

  return reinterpret_cast<Cache::Handle*>(e);
}

缓存的查找

Cache::Handle* LRUCache::Lookup(const Slice& key, uint32_t hash) {
  MutexLock l(&mutex_);
  LRUHandle* e = table_.Lookup(key, hash);      //直接借用哈希表完成缓存的快速查找
  if (e != NULL) {
    Ref(e);
  }
  return reinterpret_cast<Cache::Handle*>(e);
}

3.3 分片缓存的实现(ShardedLRUCache)

分片缓存的实现其实就是借助上面的LRUCaChe,只不过同时拥有多个LRUCache,然后特定的缓存数据会被缓存到相应的LRUCache中。

static const int kNumShardBits = 4;                     //可以理解为缓存片数量的容量因子(这里是4个bits,所以共有16个缓存片)
static const int kNumShards = 1 << kNumShardBits;       //一共有多少个缓存片

//通过Shard函数可以将任意一个hash分配到16个缓存片中的任意一个)
static uint32_t Shard(uint32_t hash) {
    return hash >> (32 - kNumShardBits);
}

所有的缓存存储和读取都会通过上面的Shard函数来找到特定的缓存片,从而实现了分片缓存。

五、总结

在这个部分,400行左右的代码就实现了哈希表,LRU缓存,分片LRU缓存。了解到LRU缓存的具体实现方式,特别是LRU中Ref和UnRef的实现非常精炼,很好的解决了一个数据在LRU缓存中被引用时脱离缓存,不使用时进入LRU缓存的功能。同时将LRU缓存简单的封装,就可以实现分片LRU缓存。

leveldb源代码阅读(三)-memtable的实现

一、序言

在之前的文章中提到,leveldb所有的记录都是由一个个log文件逐渐转变来的,与存储在磁盘上的log文件对应的就是内存中的memtable。memtable和log文件存储的内容是一致的。

二、SkipList

memtable中的键值对是通过SkipList这种数据结构存储的,SkipList相对于普通的List来说,普通的List查找时间复杂度为O(n),而SkipList可以达到O(log(n)),同时,SkipList相对与红黑树,在多线程上,需要锁住的数据量更小,性能更优。

SkipList示意图

比如上图就是一个SkipList结构,如果我们需要搜索45这个点,我们首先在最高层(第二层)找到30,然后发现下一个点是57,已经大于45了,于是来到下一层(第一层)从30开始,向后找,就能找到45了,在这个过程中,我们跳过了很多个点。因此这种数据结构得名SkipList。

2.1 SkipList的基本信息

SkipList是memtable中用到的最主要的数据结构。
SkipList的线程安全特点:写操作需要外部的同步,比如信号量这种。读操作需要保证在读的时候SkipList不会被销毁。除此之外,读操作没有任何的内部锁或者同步。
SkipList的基本原则:
(1)分配节点在SkipList销毁之前不会被删除。因为在代码中我们没有任何的删除节点的操作。
(2)一个节点的内容,除了next/prev指针,在这个节点被存储到SkipList之后其他数据都是不变的。只有Insert()会修改list的内容,初始化一个节点并使用release-stores去将这个节点存放到一个或多个list中需要谨慎对待。

2.2 SkipList的成员函数

SkipList中,公共的成员函数只有两个
void Insert(const Key& key);
bool Contains(const Key& key) const;

顾名思义,Insert用来向SkipList中插入key,Contains用来检查SkipList中是否包含某个key。

私有的成员函数如下:

  Node* NewNode(const Key& key, int height);
  int RandomHeight();
  bool Equal(const Key& a, const Key& b) const { return (compare_(a, b) == 0); }

  // Return true if key is greater than the data stored in "n"
  bool KeyIsAfterNode(const Key& key, Node* n) const;

  // Return the earliest node that comes at or after key.
  // Return NULL if there is no such node.
  //
  // If prev is non-NULL, fills prev[level] with pointer to previous
  // node at "level" for every level in [0..max_height_-1].
  Node* FindGreaterOrEqual(const Key& key, Node** prev) const;

  // Return the latest node with a key < key.
  // Return head_ if there is no such node.
  Node* FindLessThan(const Key& key) const;

  // Return the last node in the list.
  // Return head_ if list is empty.
  Node* FindLast() const;

2.3 SkipList的节点Node实现

// Implementation details follow
template<typename Key, class Comparator>
struct SkipList<Key,Comparator>::Node {
  explicit Node(const Key& k) : key(k) { }

  Key const key;

  // Accessors/mutators for links.  Wrapped in methods so we can
  // add the appropriate barriers as necessary.
  Node* Next(int n) {
    assert(n >= 0);
    // Use an 'acquire load' so that we observe a fully initialized
    // version of the returned Node.
    return reinterpret_cast<Node*>(next_[n].Acquire_Load());
  }
  void SetNext(int n, Node* x) {
    assert(n >= 0);
    // Use a 'release store' so that anybody who reads through this
    // pointer observes a fully initialized version of the inserted node.
    next_[n].Release_Store(x);
  }

  // No-barrier variants that can be safely used in a few locations.
  Node* NoBarrier_Next(int n) {
    assert(n >= 0);
    return reinterpret_cast<Node*>(next_[n].NoBarrier_Load());
  }
  void NoBarrier_SetNext(int n, Node* x) {
    assert(n >= 0);
    next_[n].NoBarrier_Store(x);
  }

 private:
  // Array of length equal to the node height.  next_[0] is lowest level link.
  port::AtomicPointer next_[1];
};

注意到Node实现了两个版本的Next和SetNext,带有NoBarrier_的是多进程不安全的,因为在少数情况下会用到,因此做了额外的实现。另外,可以注意这里只实现了Next()方法,并没有Prev()指向上一个节点,可以知道SkipList并不是一个双向链表,但是SkipList的迭代器是支持Prev()方法的,所以这是一个值得注意的地方。

Node节点的创建是通过NewNode()方法

template<typename Key, class Comparator>
typename SkipList<Key,Comparator>::Node*
SkipList<Key,Comparator>::NewNode(const Key& key, int height) {
  char* mem = arena_->AllocateAligned(
      sizeof(Node) + sizeof(port::AtomicPointer) * (height - 1));
  return new (mem) Node(key);
}

NewNode中使用了arena来分配对齐的内存,分配的内存大小是sizeof(Node)+sizeof(port::AtomicPointer) * (height - 1)。这里可以注意到申请的空间不仅仅是一个Node的大小,还包括sizeof(port::AtomicPointer) * (height – 1),这是因为SkipList是一个多层的结构,所以如果当前Node是3层,那么就需要3个Next指针,在Node的代码中,默认只申请了一个Next指针空间,所以需要额外的多申请(height – 1)个AtomicPointer的空间。

2.4 SkipList的迭代器(Iterator)

SkipList实现了自己的迭代器,迭代器的声明如下:

class Iterator {
   public:
    // Initialize an iterator over the specified list.
    // The returned iterator is not valid.
    explicit Iterator(const SkipList* list);

    // 如果迭代器位于一个有效的节点上,就会返回true.
    bool Valid() const;

    // 返回当前位置的key.
    // REQUIRES: Valid()
    const Key& key() const;

    // 前进到下一个位置
    // REQUIRES: Valid()
    void Next();

    // 后退到上一个位置
    // REQUIRES: Valid()
    void Prev();

    // 移到第一个key >= target的entry
    void Seek(const Key& target);

    // 移到这个list的第一个entry
    // 如果list非空,迭代器的最终状态是Valid()
    void SeekToFirst();

    // 移到这个list的最后一个entry
    // 如果list非空,迭代器的最终状态是Valid()
    void SeekToLast();

   private:
    const SkipList* list_; 
    Node* node_;
    // Intentionally copyable
  };

其中Prev()的实现并不是通过prev指针,因为上文中已经介绍了Node只有next指针。Prev()是通过FindLessThan来实现,找到小于指定Node的最后一个值。这里有一个问题,为什么不使用prev指针,难道不会大大的降低时间复杂度吗?看一下FindLessThan的具体实现。

template<typename Key, class Comparator>
typename SkipList<Key,Comparator>::Node*
SkipList<Key,Comparator>::FindLessThan(const Key& key) const {
  Node* x = head_;
  int level = GetMaxHeight() - 1;
  while (true) {
    assert(x == head_ || compare_(x->key, key) < 0);
    Node* next = x->Next(level);
    if (next == NULL || compare_(next->key, key) >= 0) {
      if (level == 0) {
        return x;
      } else {
        // Switch to next list
        level--;
      }
    } else {
      x = next;
    }
  }
}

其实就是SkipList查找过程的实现。时间复杂度应该是O(log(n)).相比于prev指针的话,在空间占用上是少一点的。具体为什么这样实现还得继续往下看。

SkipList还以同样的方式实现了FindGreaterOrEqualFindLast

template<typename Key, class Comparator>
typename SkipList<Key,Comparator>::Node* SkipList<Key,Comparator>::FindGreaterOrEqual(const Key& key, Node** prev)
    const {
  Node* x = head_;
  int level = GetMaxHeight() - 1;
  while (true) {
    Node* next = x->Next(level);
    if (KeyIsAfterNode(key, next)) {
      // Keep searching in this list
      x = next;
    } else {
      if (prev != NULL) prev[level] = x;
      if (level == 0) {
        return next;
      } else {
        // Switch to next list
        level--;
      }
    }
  }
}

FindGreaterOrEqual的实现还是比较有意思的,在返回比指定key大于或等于的节点的同时,还可以选择性的将该节点的prev指针保存到Node** prev中,返回给用户。其实这也是Iterator中Prev()的实现不采用prev指针的原因,因为有了FindGreaterOrEqual这样的实现,SkipList的内部实现中根本不需要Prev()来将指针向前移动,同时又大大的节约了内存。

下面看一下SkipList的Insert的实现。

void SkipList<Key,Comparator>::Insert(const Key& key) {
  // TODO(opt): We can use a barrier-free variant of FindGreaterOrEqual()
  // here since Insert() is externally synchronized.
  Node* prev[kMaxHeight];
  Node* x = FindGreaterOrEqual(key, prev);

  // Our data structure does not allow duplicate insertion
  assert(x == NULL || !Equal(key, x->key));

  int height = RandomHeight();
  if (height > GetMaxHeight()) {
    for (int i = GetMaxHeight(); i < height; i++) {
      prev[i] = head_;
    }
    //fprintf(stderr, "Change height from %d to %d\n", max_height_, height);

    // It is ok to mutate max_height_ without any synchronization
    // with concurrent readers.  A concurrent reader that observes
    // the new value of max_height_ will see either the old value of
    // new level pointers from head_ (NULL), or a new value set in
    // the loop below.  In the former case the reader will
    // immediately drop to the next level since NULL sorts after all
    // keys.  In the latter case the reader will use the new node.
    max_height_.NoBarrier_Store(reinterpret_cast<void*>(height));
  }

  x = NewNode(key, height);
  for (int i = 0; i < height; i++) {
    // NoBarrier_SetNext() suffices since we will add a barrier when
    // we publish a pointer to "x" in prev[i].
    x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
    prev[i]->SetNext(i, x);
  }
}

首先,Insert方法中没有做任何的多线程同步,所以需要调用者在外部进行写入同步(防止多个线程同步写入)。

插入的过程是先找到需要插入的位置,然后调用RandomHeight()来随机得到当前节点的高度,之后向普通链表一样执行插入操作。

注意到这里有一个多线程读写的问题。也就是当我们改变max_height_的值的时候,并没有改变head_指针。这种情况下有两种可能,一是读到了新的max_height_,但是head_指针还没有更改,这时候指针会立刻下降到下一级,SkipList的多级只是为了提高查找速度,所以在这里并没有其他的副作用;二是正常的情况,不作讨论。

2.Memtable

Memtable的实现主要就是依赖于SkipList,分析了SKipList之后,Memtable的插入,查找操作都是SkipList的插入和查找。

  // Increase reference count.
  void Ref() { ++refs_; }

  // Drop reference count.  Delete if no more references exist.
  void Unref() {
    --refs_;
    assert(refs_ >= 0);
    if (refs_ <= 0) {
      delete this;
    }
  }

Memtable的实现使用了类似于计数器引用的机制,不过这是手动实现的,所以需要用户在使用Memtable时调用Ref()来增加引用计数,引用结束后调用Unref来减少引用计数,当引用计数为0时则销毁自己。

总结

Memtable的设计还是很让人佩服的,至少对我来说是这样。了解到了SkipList这样的数据结构,也了解到内存屏障这种多进程情况下代码执行乱序的问题,还有内存对齐的必要、内存对齐的实现。同时也了解了一些C++编写程序的独特风格(我写的C++一直像Java,反而少了一点C++的美)

leveldb源码阅读(二)—— Varint和Arena的实现

一、Varint

Varint是在leveldb中广泛使用的一种变长的整数类型,Varint其实和unicode的实现非常相似,并且是little-endian。如果当前字节的最高位是1,则表示后面的字节也属于这个整数,如果最高位是0,则表示该整数结束了。所以,一个无符号4个字节的整型数,使用Varint可能以1,2,3,4,5个字节来表示。

比如130:

无符号整型表示起来为:00000000 00000000 00000000 10000010
使用Varint表示为:10000010 00000001             #这里注意Varint是little-endian

因为牺牲了每个字节的最高位作为标志位,所以遇到大的数可能需要5个字节。

某个无符号整型:10000001 00000001 00000001 00000001
使用Varint表示:10000001 10000010 10000100 10001000 00001000

但是平均下来,Varint是绝对会节省大量的存储空间的。

那么leveldb中Varint是如何编码和解码的呢?

char* EncodeVarint32(char* dst, uint32_t v) {
  // Operate on characters as unsigneds
  unsigned char* ptr = reinterpret_cast<unsigned char*>(dst);
  static const int B = 128; //10000000
  if (v < (1<<7)) {  v < 2^7
    *(ptr++) = v;
  } else if (v < (1<<14)) { v < 2^14
    *(ptr++) = v | B;
    *(ptr++) = v>>7;
  } else if (v < (1<<21)) { //v < 2^21
    *(ptr++) = v | B;
    *(ptr++) = (v>>7) | B;
    *(ptr++) = v>>14;
  } else if (v < (1<<28)) {
    *(ptr++) = v | B;
    *(ptr++) = (v>>7) | B;
    *(ptr++) = (v>>14) | B;
    *(ptr++) = v>>21;
  } else {
    *(ptr++) = v | B;
    *(ptr++) = (v>>7) | B;
    *(ptr++) = (v>>14) | B;
    *(ptr++) = (v>>21) | B;
    *(ptr++) = v>>28;
  }
  return reinterpret_cast<char*>(ptr);
}

上面这一段对uint32_t的变量进行Varint32编码的例子,编码的结果保存在dst里,返回的ptr是dst的最后一个字节的指针。使用if-else对1、2、3、4、5个字节的情况分别做了处理。主要就是移位和或运算符的使用。跟着代码走一遍就能理解。

const char* GetVarint32PtrFallback(const char* p,
                                   const char* limit,
                                   uint32_t* value) {
  uint32_t result = 0;
  for (uint32_t shift = 0; shift <= 28 && p < limit; shift += 7) {
    uint32_t byte = *(reinterpret_cast<const unsigned char*>(p));
    p++;
    if (byte & 128) {
      // More bytes are present
      result |= ((byte & 127) << shift);
    } else {
      result |= (byte << shift);
      *value = result;
      return reinterpret_cast<const char*>(p);
    }
  }
  return NULL;
}
inline const char* GetVarint32Ptr(const char* p,
                                  const char* limit,
                                  uint32_t* value) {
  if (p < limit) {
    uint32_t result = *(reinterpret_cast<const unsigned char*>(p));
    if ((result & 128) == 0) {
      //如果低8位是0xxxxxxx,little-endian,也就是小于128
      *value = result;    //直接赋值
      return p + 1;
    }
  }
  return GetVarint32PtrFallback(p, limit, value);
}

上面GetVarint32Ptr是解码Varint32,当值大于128时,则调用GetVarint32PtrFallback。在则调用GetVarint32PtrFallback中,利用循环一个字节一个字节的取出,最终将value指针指向结果result。

二、Arena

Arena是leveldb中管理内存分配的类。所有的内存分配都通过Arena申请,可以根据申请的内存大小使用不同的内存分配策略,也可以避免过多的内存碎片问题,并且在内存释放时统一使用Arena来释放,方便管理。

Arena类的实现并不复杂。首先看一下成员变量。

  // Allocation state
  char* alloc_ptr_;                 //指向当前块中剩余的内存起点
  size_t alloc_bytes_remaining_;    //当前块中剩余的内存

  // Array of new[] allocated memory blocks
  std::vector<char*> blocks_;       //用来保存所有new出来的char数组,释放内存时也会使用到

  // Total memory usage of the arena.
  port::AtomicPointer memory_usage_;    //通过Arena申请的内存

然后就是public的成员函数:

// Return a pointer to a newly allocated memory block of "bytes" bytes.
  char* Allocate(size_t bytes);

  // Allocate memory with the normal alignment guarantees provided by malloc
  char* AllocateAligned(size_t bytes);

  // Returns an estimate of the total memory usage of data allocated
  // by the arena.
  size_t MemoryUsage() const {
    return reinterpret_cast<uintptr_t>(memory_usage_.NoBarrier_Load());
  }

功能上很简单,Allocate用来申请指定大小的内存,AllocateAligned用来申请保证内存对齐的内存空间,MemoryUsage用来获取内存使用情况。

我对内存对齐的理解:现在假定内存对齐的最小单位是8个字节,那么如果你只申请14个字节,内存对齐会多给你分配2个字节凑成16个字节。内存对齐的好处就是访问速度的提升,在CPU一次读取8个字节的情况下。在没有内存对齐的情况下,你要读取第6~12个字节,就需要读取两次,第一次0~7个字节,通过移位取出6~7,再读取第8~15个字节,取出8~12。这样就需要读取两次。而如果在内存对齐的情况下,你需要读取的6~12个字节应该被放在8~15个字节中。只需读取一次即可。

私有的成员函数有:

char* AllocateFallback(size_t bytes);
char* AllocateNewBlock(size_t block_bytes);

这两个函数提供了具体的内存分配的实现。

具体分析这5个成员函数。

1.Allocate(size_t bytes)

inline char* Arena::Allocate(size_t bytes) {
  // The semantics of what to return are a bit messy if we allow
  // 0-byte allocations, so we disallow them here (we don't need
  // them for our internal use).
  assert(bytes > 0);
  if (bytes <= alloc_bytes_remaining_) {
    char* result = alloc_ptr_;
    alloc_ptr_ += bytes;
    alloc_bytes_remaining_ -= bytes;
    return result;
  }
  return AllocateFallback(bytes);
}

Allocate是一个内联函数。内联函数的好处在于编译时会直接在调用处展开,对于程序执行速度有一定提升,但是也会导致程序大小变大。
assert(bytes > 0);是一个断言语句,断言语句可以在编译时由编译器去除,但是在调试的时候可以帮助程序员发现问题所在。这里用来保证没有出现0个字节分配的情况。
后面的判断语句用来判断当前块的剩余空间是否够分配,如果够则分配。如果不够则交给AllocateFallback()处理。返回的是分配的空间起点。

2.AllocateAligned(size_t bytes)

char* Arena::AllocateAligned(size_t bytes) {
  const int align = (sizeof(void*) > 8) ? sizeof(void*) : 8;
  assert((align & (align-1)) == 0);   // Pointer size should be a power of 2
  size_t current_mod = reinterpret_cast<uintptr_t>(alloc_ptr_) & (align-1);
  size_t slop = (current_mod == 0 ? 0 : align - current_mod);
  size_t needed = bytes + slop;
  char* result;
  if (needed <= alloc_bytes_remaining_) {
    result = alloc_ptr_ + slop;
    alloc_ptr_ += needed;
    alloc_bytes_remaining_ -= needed;
  } else {
    // AllocateFallback always returned aligned memory
    result = AllocateFallback(bytes);
  }
  assert((reinterpret_cast<uintptr_t>(result) & (align-1)) == 0);
  return result;
}

AllocateAligned(size_t bytes)用来做内存对齐的分配。
const int align = (sizeof(void*) > 8) ? sizeof(void*) : 8;用来获取分配的最小单位,如果指针大小大于8,则取指针大小,否则使用8作为最小的对齐单位。
assert((align & (align-1)) == 0);这是一个很有意思的判断,可以保证只有2的n次方才能满足。

size_t current_mod = reinterpret_cast<uintptr_t>(alloc_ptr_) & (align-1);current_mod是余数,比如当前指针地址是15,align是8,那么余数为7,这个余数就是alloc_prt_&(align-1)的值。
size_t slop = (current_mod == 0 ? 0 : align - current_mod);因为当前指针可能不是align的整数倍,slop即代表当前指针需要偏移的大小,来保证是align的整数倍。
size_t needed = bytes + slop; needed代表需要分配的空间。

之后的判断则和Allocate()函数做法相同了。需要注意的是返回的result指针是align的整数倍,这样就能保证以最少的内存读取次数来获取想要的数据。

3.AllocateFallback(size_t bytes)

char* Arena::AllocateFallback(size_t bytes) {
  if (bytes > kBlockSize / 4) {
    //如果需要分配的bytes大于块大小的1/4,则单独在新的块中分配,以此避免浪费过多的剩下的空间(这样能保证浪费的空间小与kBlockSize / 4)
    char* result = AllocateNewBlock(bytes);
    return result;
  }

  // 重新开辟新的块空间进行分配,上一个块剩下的空间都浪费了。
  alloc_ptr_ = AllocateNewBlock(kBlockSize);
  alloc_bytes_remaining_ = kBlockSize;

  char* result = alloc_ptr_;
  alloc_ptr_ += bytes;
  alloc_bytes_remaining_ -= bytes;
  return result;
}

上面两个成员函数在当前块分配空间不足时,都将分配任务交给AllocateFallback处理。

4.AllocateNewBlock(size_t block_bytes)。

char* Arena::AllocateNewBlock(size_t block_bytes) {
  char* result = new char[block_bytes];
  blocks_.push_back(result);
  memory_usage_.NoBarrier_Store(
      reinterpret_cast<void*>(MemoryUsage() + block_bytes + sizeof(char*)));
  return result;
}

使用new char来申请一个新的块,并将块的起始指针存入blocks_中。更新memory_usage的大小。

5.MemoryUsage()

“`
size_t MemoryUsage() const {
return reinterpret_cast<uintptr_t>(memory_usage_.NoBarrier_Load());
}
“`
获取memory_usage_中存储的使用空间数据。

三、Memeory Barrier(内存屏障)

memory_usage_的类型是AtomicPointer,这是一个原子类型,针对不同平台有不同的实现。可以用来线程安全的存储和获取值。

AtomicPointer类实现了几个函数:

NoBarrier_Load();
NoBarrier_Store(void* v)
Acquire_Load();
Release_Store(void* v)

这里面涉及到一个Memory Barrier(内存屏障,Memory fence)的概念。

引用一个wikipedia的例子:

Processor #1:

while (f 0);
// Memory fence required here
print x;

Processor #2:

x = 42;
// Memory fence required here
f = 1;

一个两核心的CPU按照上述例子执行。可能会出现很多种情况:
虽然我们希望的是打印“42”。但是如果核心#2存储操作乱序执行(out-of-order),就有可能f在x之前被更新了。这样打印语句就可能打印出“0”。同样的,核心#1读取操作可能也乱序执行,x就有可能在f被判断之前读取了,打印语句可能打印出一个无法预期的值。对于大多数程序来说,上面的这些情况都是不可以接受的。使用memory barrier可以避免out-of-order的问题。

简而言之就是,在多CPU共享内存空间的情况下,两条语句的执行顺序已经无法保证了。需要memory barrier来保证执行顺序的正确。

当然,单CPU是不会出现这个问题的。

现在再来解释上面的4个函数的作用。

NoBarrier_Load();           //不使用内存屏障的读取
NoBarrier_Store(void* v)    //不使用内存屏障的存储
Acquire_Load();             //使用内存屏障的读取
Release_Store(void* v)      //使用内存屏障的存储

leveldb源码阅读(一)-log读取和写入

leveldb源码阅读(一)-log读取和写入

标签(空格分隔): leveldb log


一、 引言

leveldb中,所有对数据库的操作都会记录到log中,当log文件达到预定义的大小后,就会被转换成数据表(sstable),所以log文件的生成和读取算是leveldb核心部分的第一步。
这篇文章涉及到的leveldb的源代码文件有:

db/log_format.h
db/log_reader.h
db/log_reader.cc
db/log_writer.h
db/log_writer.cc
leveldb/slice.h
leveldb/status.h

二、日志存储格式——log_format.h

在之前的文章中,了解到log是由一个个block组成,每个block是32KB的大小。

block := record* trailer?
record :=
  checksum: uint32     // crc32c of type and data[] ; little-endian
  length: uint16       // little-endian
  type: uint8          // One of FULL, FIRST, MIDDLE, LAST
  data: uint8[length]

每个block中存储的是一条条的记录,如果一个block的末尾最后剩下小于等于6bytes的空间,就会使用一个trailer填充而不是用新的记录填充,这个trailer都是由0字节组成。一条记录包括一个crc32c生成的校验码checksum、记录的长度length、记录的类型type、数据。

如果一条记录跨越了多个block,就会被分段,分段类型有3种——First、Middle、Last

关于这部分的实现是在log_format.h中

enum RecordType {
  //0是保留字,用来表示预分配的文件
  kZeroType = 0,

  kFullType = 1,

  // 一条记录被分段的3种类型
  kFirstType = 2,
  kMiddleType = 3,
  kLastType = 4
};
//定义了最大的记录类型值
static const int kMaxRecordType = kLastType;
//定义了最大的`block`大小
static const int kBlockSize = 32768;

// 记录头是  checksum (4 bytes), length (2 bytes), type (1 byte).总共7bytes
static const int kHeaderSize = 4 + 2 + 1;

三、日志的写入

在log_writer代码中,实现了Writer类,最主要的就是一个AddRecord(const Slice& slice)

Writer类有两个构造函数,对于只传了一个参数——WritableFile指针的构造函数,使用了explicit显示构造,这样可以防止隐式转换。另外将Writer(const Writer&)void operator=(const Writer&)放在private域,防止拷贝赋值。

Writer有三个成员变量 WritableFile* dest_用来保存可写的文件,int block_offset_用来保存当前块的偏移量, uint32_t type_crc_[kMaxRecordType + 1]用来保存5种记录类型的crc32c计算结果。这里会提前计算好保存进这个数组来减少存储时计算的负载。

接下来就是最重要的AddRecord(const Slice& slice)了。Slice是leveldb中用来保存key或者value的值的,包含一个指针,一个长度。

Status Writer::AddRecord(const Slice& slice) {
  const char* ptr = slice.data();   //取出数据的指针
  size_t left = slice.size();       //取出数据的长度

  // 如果有必要的话,对记录分段,并保存.
  // 注意如果slice是空的,我们也会迭代一次,保存一个长度为0的记录
  Status s;     //用来保存这次操作的结果
  bool begin = true;    //是否是一条记录的开始
  do {
    //计算出当前块剩下的空间
    const int leftover = kBlockSize - block_offset_;
    //使用assert进行断言,这边是一定会>=0的,assert这里只是为了帮助调试程序,实际使用时在编译时通过参数去除assert语句
    assert(leftover >= 0);

    //如果剩下的空间已经不足一条记录的header了
    if (leftover < kHeaderSize) {
      // 切换到新的块
      if (leftover > 0) {
        // 如果剩下的空间>0,需要使用\x00填充
        assert(kHeaderSize == 7);
        dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover));
      }
      //新的块偏移量是0
      block_offset_ = 0;
    }

    // 不变的道理:我们绝对不会让剩下的空间<kHeaderSize.
    assert(kBlockSize - block_offset_ - kHeaderSize >= 0);

    //当前还可用的空间
    const size_t avail = kBlockSize - block_offset_ - kHeaderSize;

    //记录片段的大小,取可用的空间大小和记录的大小的最小值
    const size_t fragment_length = (left < avail) ? left : avail;

    //当前的记录类型
    RecordType type;

    //如果记录的大小和片段的长度是一样的,表示到达了记录的末尾为true,否则false
    const bool end = (left == fragment_length);

    //记录类型的判断
    if (begin && end) {
      type = kFullType;
    } else if (begin) {
      type = kFirstType;
    } else if (end) {
      type = kLastType;
    } else {
      type = kMiddleType;
    }

    //写入到物理存储中,
    s = EmitPhysicalRecord(type, ptr, fragment_length);
    ptr += fragment_length;     //数据指针右移一个片段长度
    left -= fragment_length;    //数据的长度减去一个片段的长度
    begin = false;              //不再是开始了
  } while (s.ok() && left > 0); //如果没有出错,并且数据还剩就再循环

  return s;
}

AddRecord(const Slice& slice)调用了一个私有成员函数EmitPhysicalRecord(RecordType t, const char* ptr, size_t n),看一看这个将记录写入持久化存储是如何实现的。

Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) {
  assert(n <= 0xffff);  // n必须可以用2KB表示,因为一条记录的length是uint16大小的,little-endian
  assert(block_offset_ + kHeaderSize + n <= kBlockSize);

  // 对记录的header进行格式化
  char buf[kHeaderSize];
  //取低位字节
  buf[4] = static_cast<char>(n & 0xff);
  //取高位字节
  buf[5] = static_cast<char>(n >> 8);
  //一位的记录类型
  buf[6] = static_cast<char>(t);

  // 计算记录类型的crc值和有效载荷
  uint32_t crc = crc32c::Extend(type_crc_[t], ptr, n);
  crc = crc32c::Mask(crc);                 // Adjust for storage
  //填充4位的校验值
  EncodeFixed32(buf, crc);

  // 将记录的header写入
  Status s = dest_->Append(Slice(buf, kHeaderSize));
  if (s.ok()) {
    //将记录的数据写入
    s = dest_->Append(Slice(ptr, n));
    if (s.ok()) {
      //flush一下
      s = dest_->Flush();
    }
  }

  //块的偏移量右移header和数据的长度和
  block_offset_ += kHeaderSize + n;
  return s;
}

上面的代码分析其实就是log部分逻辑上的实现,将一条记录格式化成设计好的格式。这里有一个疑问就是dest_->Append(Slice& slice)dest_->Flush()是如何工作的。所以继续追踪下去,到include/leveldb/env.h中:

// A file abstraction for sequential writing.  The implementation
// must provide buffering since callers may append small fragments
// at a time to the file.
class WritableFile {
 public:
  WritableFile() { }
  virtual ~WritableFile();

  virtual Status Append(const Slice& data) = 0;
  virtual Status Close() = 0;
  virtual Status Flush() = 0;
  virtual Status Sync() = 0;

 private:
  // No copying allowed
  WritableFile(const WritableFile&);
  void operator=(const WritableFile&);
};

这部分是WritableFile的抽象部分,Append、Close、Flush、Sync都是纯虚函数,具体实现必须由用户自己去做。并且实现时必须提供缓冲区,这样调用者可以append小的数据段到文件中。

实际上leveldb中也有默认(posix)的实现。在util/env/env_posix.cc中。目前只看Append和Flush具体实现:

  virtual Status Append(const Slice& data) {
    size_t r = fwrite_unlocked(data.data(), 1, data.size(), file_);
    if (r != data.size()) {
      return IOError(filename_, errno);
    }
    return Status::OK();
  }
  virtual Status Flush() {
    if (fflush_unlocked(file_) != 0) {
      return IOError(filename_, errno);
    }
    return Status::OK();
  }

fwrite_unlocked和fflush_unlocked都是define的fwrite和fflush方法。

所以这里的设计是很好的案例。通过将WritableFile抽象化,提供给用户自己来实现,从而很轻松的就可以在多平台间移植。

四、日志的读取

在log_reader中,实现了Reader类,用来读取日志文件。
Reader的构造函数定义如下:

Reader(SequentialFile* file, Reporter* reporter, bool checksum,
         uint64_t initial_offset);

file是代表要读取的文件,reporter是用来报告日志文件中发现错误的,checksum是当前文件的校验码,initial_offset是当前文件中开始读取的物理位置。

Reader类中私有的成员变量和函数需要注意的有:

  SequentialFile* const file_;      //要读取的文件
  Reporter* const reporter_;        //报告错误的对象
  bool const checksum_;             //校验值
  char* const backing_store_;       //备份存储
  Slice buffer_;                    //缓冲
  bool eof_;   // Last Read() indicated EOF by returning < kBlockSize

  // Offset of the last record returned by ReadRecord.
  uint64_t last_record_offset_;         //最后一个Record的偏移量
  // Offset of the first location past the end of buffer_.
  uint64_t end_of_buffer_offset_;       //

  // Offset at which to start looking for the first record to return
  uint64_t const initial_offset_;   //第一个记录的初始偏移量

  // True if we are resynchronizing after a seek (initial_offset_ > 0). In
  // particular, a run of kMiddleType and kLastType records can be silently
  // skipped in this mode
  bool resyncing_;      //在一次查找后我们再同步则是True。特别的,kMiddleType和kLastType会被跳过   

  // Extend record types with the following special values
  enum {
    kEof = kMaxRecordType + 1,
    // 无效的记录值,可能有以下情况
    // * 具有无效的CRC值(checksum不正确)(ReadPhysicalRecord reports a drop)
    // * 长度为0的记录 (No drop is reported)
    // * The record is below constructor's initial_offset (No drop is reported)
    kBadRecord = kMaxRecordType + 2
  };

  bool SkipToInitialBlock();        //跳过直到initial_offset的位置,成功则返回true
  unsigned int ReadPhysicalRecord(Slice* result);   //读取Record,返回记录类型,或者kEof、kBadRecord

  void ReportCorruption(uint64_t bytes, const char* reason);    //汇报丢弃的字节,以及原因
  void ReportDrop(uint64_t bytes, const Status& reason);        //汇报丢弃的字节,以及原因

这些成员函数中着重看一下ReadPhysicalRecord

unsigned int Reader::ReadPhysicalRecord(Slice* result) {
  while (true) {
    if (buffer_.size() < kHeaderSize) {
      if (!eof_) {
        // Last read was a full read, so this is a trailer to skip
        buffer_.clear();
        Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
        end_of_buffer_offset_ += buffer_.size();
        if (!status.ok()) {
          buffer_.clear();
          ReportDrop(kBlockSize, status);
          eof_ = true;
          return kEof;
        } else if (buffer_.size() < kBlockSize) {
          eof_ = true;
        }
        continue;
      } else {
        // Note that if buffer_ is non-empty, we have a truncated header at the
        // end of the file, which can be caused by the writer crashing in the
        // middle of writing the header. Instead of considering this an error,
        // just report EOF.
        buffer_.clear();
        return kEof;
      }
    }

    // Parse the header
    const char* header = buffer_.data();
    const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff;
    const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff;
    const unsigned int type = header[6];
    const uint32_t length = a | (b << 8);
    if (kHeaderSize + length > buffer_.size()) {
      size_t drop_size = buffer_.size();
      buffer_.clear();
      if (!eof_) {
        ReportCorruption(drop_size, "bad record length");
        return kBadRecord;
      }
      // If the end of the file has been reached without reading |length| bytes
      // of payload, assume the writer died in the middle of writing the record.
      // Don't report a corruption.
      return kEof;
    }

    if (type == kZeroType && length == 0) {
      // Skip zero length record without reporting any drops since
      // such records are produced by the mmap based writing code in
      // env_posix.cc that preallocates file regions.
      buffer_.clear();
      return kBadRecord;
    }

    // Check crc
    if (checksum_) {
      uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header));
      uint32_t actual_crc = crc32c::Value(header + 6, 1 + length);
      if (actual_crc != expected_crc) {
        // Drop the rest of the buffer since "length" itself may have
        // been corrupted and if we trust it, we could find some
        // fragment of a real log record that just happens to look
        // like a valid log record.
        size_t drop_size = buffer_.size();
        buffer_.clear();
        ReportCorruption(drop_size, "checksum mismatch");
        return kBadRecord;
      }
    }

    buffer_.remove_prefix(kHeaderSize + length);

    // Skip physical record that started before initial_offset_
    if (end_of_buffer_offset_ - buffer_.size() - kHeaderSize - length <
        initial_offset_) {
      result->clear();
      return kBadRecord;
    }

    *result = Slice(header + kHeaderSize, length);
    return type;
  }
}

Reader类中公共的成员函数需要注意的有:
bool ReadRecord(Slice* record, std::string* scratch);
uint64_t LastRecordOffset();

ReadRecord用来读取下一个要读取的Record,如果读取成功,返回true,存储在record中,如果读到了结尾没有一个完整的Record,则存储在scratch中

bool Reader::ReadRecord(Slice* record, std::string* scratch) {
  if (last_record_offset_ < initial_offset_) {
    if (!SkipToInitialBlock()) {    //跳到初始的偏移位置
      return false;
    }
  }

  scratch->clear();
  record->clear();
  bool in_fragmented_record = false;
  // Record offset of the logical record that we're reading
  // 0 is a dummy value to make compilers happy
  uint64_t prospective_record_offset = 0;

  Slice fragment;
  while (true) {
    const unsigned int record_type = ReadPhysicalRecord(&fragment);

    // ReadPhysicalRecord may have only had an empty trailer remaining in its
    // internal buffer. Calculate the offset of the next physical record now
    // that it has returned, properly accounting for its header size.
    uint64_t physical_record_offset =
        end_of_buffer_offset_ - buffer_.size() - kHeaderSize - fragment.size();

    if (resyncing_) {
      if (record_type == kMiddleType) {
        continue;
      } else if (record_type == kLastType) {
        resyncing_ = false;
        continue;
      } else {
        resyncing_ = false;
      }
    }

    switch (record_type) {
      case kFullType:
        if (in_fragmented_record) {
          // Handle bug in earlier versions of log::Writer where
          // it could emit an empty kFirstType record at the tail end
          // of a block followed by a kFullType or kFirstType record
          // at the beginning of the next block.
          if (scratch->empty()) {
            in_fragmented_record = false;
          } else {
            ReportCorruption(scratch->size(), "partial record without end(1)");
          }
        }
        prospective_record_offset = physical_record_offset;
        scratch->clear();
        *record = fragment;
        last_record_offset_ = prospective_record_offset;
        return true;

      case kFirstType:
        if (in_fragmented_record) {
          // Handle bug in earlier versions of log::Writer where
          // it could emit an empty kFirstType record at the tail end
          // of a block followed by a kFullType or kFirstType record
          // at the beginning of the next block.
          if (scratch->empty()) {
            in_fragmented_record = false;
          } else {
            ReportCorruption(scratch->size(), "partial record without end(2)");
          }
        }
        prospective_record_offset = physical_record_offset;
        scratch->assign(fragment.data(), fragment.size());
        in_fragmented_record = true;
        break;

      case kMiddleType:
        if (!in_fragmented_record) {
          ReportCorruption(fragment.size(),
                           "missing start of fragmented record(1)");
        } else {
          scratch->append(fragment.data(), fragment.size());
        }
        break;

      case kLastType:
        if (!in_fragmented_record) {
          ReportCorruption(fragment.size(),
                           "missing start of fragmented record(2)");
        } else {
          scratch->append(fragment.data(), fragment.size());
          *record = Slice(*scratch);
          last_record_offset_ = prospective_record_offset;
          return true;
        }
        break;

      case kEof:
        if (in_fragmented_record) {
          // This can be caused by the writer dying immediately after
          // writing a physical record but before completing the next; don't
          // treat it as a corruption, just ignore the entire logical record.
          scratch->clear();
        }
        return false;

      case kBadRecord:
        if (in_fragmented_record) {
          ReportCorruption(scratch->size(), "error in middle of record");
          in_fragmented_record = false;
          scratch->clear();
        }
        break;

      default: {
        char buf[40];
        snprintf(buf, sizeof(buf), "unknown record type %u", record_type);
        ReportCorruption(
            (fragment.size() + (in_fragmented_record ? scratch->size() : 0)),
            buf);
        in_fragmented_record = false;
        scratch->clear();
        break;
      }
    }
  }
  return false;
}

leveldb文档翻译(3)-leveldb的日志格式和表格式

日志文件的格式

日志文件的内容是一个32KB大小的块的序列。唯一可能的意外是日志文件结尾可能包含一个不完整的块。

每一个块由以下记录组成

block := record* trailer?
record :=
  checksum: uint32     // crc32c of type and data[] ; little-endian
  length: uint16       // little-endian
  type: uint8          // One of FULL, FIRST, MIDDLE, LAST
  data: uint8[length]

(注:crc32c是一种检验算法)

一条记录绝不会在块的最后6个字节内开始(因为剩下的6个字节已经不够大,checksum,length,type总共需要7个字节)。任何留下的字节形成trailer(拖车?),这部分必须完全是0字节,读取的时候必须跳过。

注:如果当前块恰好有7个字节留下来,一个新的非0长度的记录被增加,写入必须用一个FIRST记录(包含0字节的用户数据,FIRST用来表示这是一条记录的开始部分)去填充块的最后7个字节,之后将所有的用户数据写在后来的块中。

更多的类型可能在以后会加入进来。一些读取程序可能会跳过它们不认识的记录类型,其他读取程序可能会报告有些数据被跳过了。

FULL == 1
FIRST == 2
MIDDLE == 3
LAST == 4

FULL记录包含一个完整的用户记录的内容

FIRST,MIDDLE,LAST是用来表示用户记录被分成多个片段(尤其是块的边界)。FIRST是用户记录的第一个片段,LAST是用户记录的最后一个片段,MIDDLE是所有用户数据的内部片段。

例如:有以下的用户数据

A: length 1000
B: length 97270
C: length 8000

A将会在第一块中存储为FULL记录。

B将会划分成3段:第一段占用第一块剩下的部分,第二段占用整个第二块,第三段占用第三块的前面部分。这将会在第三块中留下6个字节的空间,被留作空白当成trailer。

C将会在第四块中存储为一个FULL记录。

这种记录格式的优点

1.我们不需要任何的同步的启发式算法-直接跳到下一个块的边界并且扫描。如果有一个错误发生,跳到下一块。作为一个边界效益,当一个日志文件的内容的一部分被作为一个记录嵌入到另外一个日志文件中,我们不会变的困惑。

2.在近似边界上分割(比如mapreduce)是简单的:找到下一个块的边界,跳过记录直到我们遇到一个FULL或FIRST记录。

3.对于大的记录我们不需要额外的缓冲区。

对比这个记录格式的缺点

1.没有对微小的记录的包装。这可能通过添加新的记录类型来修复,因此它只是当前实现的短处,不是这种格式的必须。

2.没有压缩。同样的,这也可以通过添加新的记录格式来修复。

表文件的格式

<beginning_of_file>
[data block 1]
[data block 2]
...
[data block N]
[meta block 1]
...
[meta block K]
[metaindex block]
[index block]
[Footer]        (fixed size; starts at file_size - sizeof(Footer))
<end_of_file>

这个文件包含内部的指针。每一个指针都被称作BlockHandle,包含下面的信息:

offset:   varint64
size:     varint64

参考varints,里面有varint64格式的解释。

1.键值对序列是有序存储的,被分成多个数据块。这些数据块从文件开头一个接着一个。每一个数据块被block_builder.cc中的代码格式化,之后有选择的进行压缩。

2.在数据块之后我们存储一束元块。支持的元块类型在下面描述了。更多的元块类型可能在以后会加进来。每一个元块类型也是用block_builder.cc格式化,然后之后被有选择的压缩。

3.一个“元索引”块。它包含每一个其他的元块的入口,入口的键是这个元块的名字,值是一个BlockHandle指向元块。

4.一个“索引”块。这个块包含每个数据块的入口,键是一个大于等于当前数据块的最后一个键的字符串,在连续的数据块的第一个键之前。值是这个数据块的BlockHandle。

5.在文件的最后面是一个填充满长度的脚部,包含了元索引块和索引块的BlockHandle,还有一个魔术数字。

    metaindex_handle: char[p];     // Block handle for metaindex
    index_handle:     char[q];     // Block handle for index
    padding:          char[40-p-q];// zeroed bytes to make fixed length
                                   // (40==2*BlockHandle::kMaxEncodedLength)
    magic:            fixed64;     // == 0xdb4775248b80fb57 (little-endian)

“filter”元块

在数据库打开时如果 FilterPolicy被指定,每个表都会有一个filter块。“元索引”块包含一个入口,将filter.<N>指向了 BlockHandle,对于filter块,<N>是filter 策略的
Name() 方法返回的。

filter块存储了一系列的filters,filter i包含了FilterPolicy::CreateFilter()在所有存储在文件偏移落入到范围[ ibase … (i+1)base-1 ]的块中键的输出。

当前,”base”是2KB,对这个例子来说,如果块X和Y开始在范围 [ 0KB .. 2KB-1 ],所有的在X和Y之间的键将会通过调用FilterPolicy::CreateFilter()被转换成一个filter,得到的filter将会被存储成当前filter块的第一个filter。

filter block是以下格式的:

[filter 0]
[filter 1]
[filter 2]
...
[filter N-1]

[offset of filter 0]                  : 4 bytes
[offset of filter 1]                  : 4 bytes
[offset of filter 2]                  : 4 bytes
...
[offset of filter N-1]                : 4 bytes

[offset of beginning of offset array] : 4 bytes
lg(base)                              : 1 byte

filter块的最后的偏移数组允许高效地mapping一个数据块偏移到相应的filter。

“统计”元块

这个元块包含一束统计。键是统计项的名字。值包含着统计内容。

TODO(预览):记录了以下的统计

data size
index size
key size (uncompressed)
value size (uncompressed)
number of entries
number of data blocks

leveldb文档翻译(2)-leveldb的实现

这部分是来自于leveldb的impl.md文档,详细介绍了leveldb实现方面的设计。

1、文件

leveldb其实是Bigtable的简单的实现。Bigtable论文地址:http://research.google.com/archive/bigtable.html。当然,文件的组织方法和bigtable有一些不同之处,下面会逐个介绍。

1.1 日志文件

一个日志文件(*.log)存储一系列的最近的更新。每一个更新都会添加到当前日志文件的最后。当所有日志文件达到了预定义的大小(默认是4M),它会转换成一个有序的表(参见下面介绍),一个新的日志文件被创建提供给之后的更新。

当前日志文件的副本是保存在一个内存结构中(称为memtable)。每一次读都会向这个memtable中查询,所以读操作其实还是映射到所有记录下来的更新中。

2.有序表

一个有序表(*.ldb)存储了一系列的由键排序的键值对。每个键值对要么键对应着值,要么是键对应着一个删除标记。(删除标记保存在旧的排序表中存储着过时的值)。

排序表的集合是由一系列的等级(level)组织的。排序表如果是由一个日志文件生成,就会被置为一个特别的younglevel(也被称为level-0)。当young文件的数量超过了某个特定的阈值(当前是4),所有的young level文件和所有键范围重叠的level-1文件被合并在一起产生一系列的新的level-1文件(我们每2MB的数据创建一个新的level-1文件)

属于young level的文件可能自己相互包含重叠的键,但属于其他等级的文件有唯一的不重叠的键范围。level-L(L >= 1),当level-L所有的文件大小超过了(10^L)MB(例如,level-1是10MB,level2是100MB),在level-L中取一个文件,和所有在level-(L+1)中与之重叠的文件,合并成level-(L+1)的新的文件集合。这些合并使得新的更新逐渐从young-level的文件转移到最大的等级,这个过程只需要批量的读写操作即可完成(注:最小的时间复杂度)

2.1 Manifest

一个MANIFEST文件中个列举了由各个level组成的有序表的集合,包含了相应的键范围,以及其他重要的元数据。任何时候数据库被打开,一个新的MANIFEST文件(文件名中有一个新的数字代表)被创建。MANIFEST文件和log文件的格式相同,任何发生的改变都会被添加到这个文件中(比如文件添加或者移除)。

2.2 Current

CURRENT是一个简单的文本文件,包含了最新的MANIFEST文件的名字。

2.3 信息日志(Info logs)

信息消息被打印到LOG和LOG.old文件中

2.4 其他

其他文件用来记录一些杂项。(LOCK,*.dbtmp)

3.Level 0

当一个日志文件增长到一个特定的值(默认1MB):
创建一个新的memtable和log文件,并将未来的更新放在这里。
在后台的操作:
将上一个memtable的内容写入到sstable
丢弃上一个memtable
删除旧的日志文件和旧的memtable
将新的sstable添加到young(level 0) level。

4.压实(不是对数据进行压缩,只是将数据整合到一起,并且在这个过程中会去除重复键以及删除的键)

当level L超出了文件大小的限制,我们在另外一个后台线程中压实它。这个压实操作选择level L中的一个文件以及来自下一个level L+1的所有与之重叠的文件。注意:如果一个level-L文件只和一个level-(L+1)文件部分重叠,这个文件的所有部分都被作为压实的输入,然后在压实之后被丢弃。例外:因为level-0是特别的(level-0的文件可能彼此重叠),我们特殊对待从level-0到level-1的压实:当这些level-0文件彼此重叠时,一个level-0的压实会选择多个level-0的文件。

一个压实操作合并选择的文件的内容,产生一系列的level-(L+1)文件。在当前的输出文件超过目标文件大小(2MB)时,我们切换到一个产生的新的level-(L+1)文件。在当前输出文件的键范围增长到足够大,以致于和超过10个level-(L+2)的文件产生重叠时,我们同样切换到产生的新的输出文件。最后一条准则保证了之后的一次level-(L+1)文件压实不会从level-(L+2)中选择太多的数据。

旧的文件被丢弃,新的文件被加入到服务状态。

特定level的压实通过键空间的压实。详细来说,对于每个level L,我们记住最后一次level L的压实的结尾的键。下一次level L的压实将会选择在这个键之后的第一个文件(如果没有这个文件,就从键空间的起点环绕。

压实会丢弃覆盖的值,如果没有更大的数字level包含一个范围重叠当前键的文件,也会丢弃删除标记。

4.1 时间复杂度

level-0的压实将会从level-0中读取最多4个1MB的文件,在最坏情况下读取所有的level-1文件(10MB)。这样一来,我们将会读取14MB,写入14MB。

对于其他等级的压实,我们将会从level L中选择一个2MB的文件。最坏的情况下,将会从level L+1中重叠大概12个文件(10个文件是因为level-(L+1)是level-L的10倍大,其他在两边的两个文件是因为level L的文件范围通常和level-(L+1)的文件范围是不对齐的)。压实因此会读取26MB,写入26MB。假设磁盘IO速度为100MB/s,最差的压实大概消耗0.5s

如果我们减慢后台写入,只取全速100MB/s的10%,一次压实可能耗时5s。如果用户的写入速度为10MB/s,我们可能创建许多level-0的文件(大概5*10MB总共50MB)。这将明显的增加读取消耗,因为每次读取合并了更多的文件增加了负载。

解决方案1:为了减轻这个问题,我们可能希望当level-0文件数很大的时候,增加日志切换的阈值。虽然缺点是这个阈值越大,需要越多的内存来保存相应的memtable。

解决方案2:我们可能希望当level-0的文件数变多时,人为增加写入速度。

解决方案3:我们致力于降低大量文件合并的成本。也许大多数level-0文件在缓存中都有未压缩的块,我们仅仅需要担心O(N)复杂度的合并迭代。

4.2 文件的数量

除了总是生成2MB的文件,我们也可以为更高的等级生成更大的文件,以此减少文件的总数,虽然代价是更多突发的压缩。当然,我们也可以将文件集合分片放在不同的文件夹中。

在2011年2月4日,在ex3文件系统上做了一次测试,该测试显示,在很多文件的目录中做100K次文件打开的平均用时如下:

Files in directory Microseconds to open a file
1000 9
10000 10
100000 16

因此,在现代文件系统中,也许连分片也是不需要的。

5.恢复

  • 读取CURRENT去找到最后一次提交的MANIFEST文件名
  • 读取该MANIFEST文件
  • 清除过期的文件
  • 我们可以在这里打开所有的sstables,但是懒加载可能会更好。
  • 将log块转换成新的level-0 sstable
  • 开始将恢复的序列导向到新的日志文件的新的写入流。

6.文件的垃圾回收

每次恢复的最后以及每次压缩的最后DeleteObsoleteFiles()会被调用。它会找出数据库中所有的文件的名字。然后删除所有的不是当前日志文件的日志文件。再删除所有的不涉及到某些level和不是一个活跃的压缩输出的表文件。

leveldb文档翻译(1)-leveldb概览

这是leveldb源代码包中提供的 readme.md,index.md 的部分翻译文档,不是逐字逐句翻译,只是把自己感兴趣的部分翻译出来了,作为看代码之前的准备工作。

1.公共接口

include文件夹下面的是公共接口,所有使用者应该从这里的头文件直接调用。

  • include/db.h:数据库的主接口:从这里开始
  • include/options.h:控制整个数据的行为,也控制数据库的读写,可以看做是配置文件。
  • include/comparator.h:用户自定义的比较函数的抽象。如果想要使用基于字节的方法对键大小进行比较,可以使用默认的比较器,用户也可以自己实现比较器来按自己的想法对存储进行排序。
  • include/iterator.h:遍历数据的接口。可以从DB对象中获取这样一个迭代器。
  • include/write_batch.h:对数据库原子的进行多个更新操作的接口。
  • include/slice.h:一个简单的模块,用来维护一个指针和长度,来表示一个字节数组。
  • include/status.h:用来汇报成功或是其他各种各样的错误。
  • include/env.h:操作系统环境的抽象。这个接口的posix实现是在util/env_posix.cc中
  • include/table.h,include/table_builder.h:低层次的模块,绝大多数客户端不会使用这个接口。

2.leveldb基本使用

leveldb提供持久化的键值存储。键和值是任意的字节数组。键值对的存储是按照键的顺序存储的,并且这个顺序是按照用户自定义的排序函数来的。

2.1 打开一个数组库

leveldb存储是使用文件系统中的文件夹。所有数据库内容都会存储在这个文件夹里。下面是一个打开数据库的例子,并且如果数据库不存在会自动创建:

#include <cassert>
#include "leveldb/db.h"

leveldb::DB* db;
leveldb::Options options;
options.create_if_missing = true;
leveldb::Status status = leveldb::DB::Open(options, "/tmp/testdb", &db);
assert(status.ok());

如果希望在数据库已经存在时报错,在leveldb::DB::Open前调用:

options.error_if_exists = true;

2.2 状态

上面的代码中使用了leveldb::Status类型,leveldb中大多数函数都会返回这个值。你可以检查这个值是不是ok,或者打印出相关的错误。

leveldb::Status s = ....;
if (!s.ok()) cerr << s.ToString() << endl;

2.3 关闭数据库

如果数据库操作结束后,删除数据库对象即可关闭

.... open the db as described above ...
.... do something with db ....
delete db;

2.4 读写

数据库提供了Put、Delete、Get方法来修改/查询数据库。例如,下面的代码用来将key1的值移动到key2中。

std::string value;
leveldb::Status s = db->Get(leveldb::ReadOptions(), key1, &value);
if (s.ok()) s = db->Put(leveldb::WriteOptions(), key2, value);
if (s.ok()) s = db->Delete(leveldb::WriteOptions(), key1);

2.5 原子更新

上面的例子中,如果在Put完key2的值,Delete key1的值前,进程崩溃了,同一个值就会被存储在多个key下。使用WriteBatch类可以原子的执行多个更新操作:

#include "leveldb/write_batch.h"
....
std::string value;
leveldb::Status s = db->Get(leveldb::ReadOptions(), key1, &value);
if (s.ok()) {
  leveldb::WriteBatch batch;
  batch.Delete(key1);
  batch.Put(key2, value);
  s = db->Write(leveldb::WriteOptions(), &batch);
}

WriteBatch存储了一系列的对数据库的编辑操作,这些操作会被顺序执行。注意到我们先执行了Delete,再执行Put,这样当key1和key2相同时,我们不会错误的删除所有的值。
WirteBatch不仅仅有原子操作的特点,还可以用来加速大量的更新操作。原理是将大量的单独的变化放入同一个批操作中。

2.6 同步写入

默认情况下,对leveldb的写入都是异步的:它在将进程中的写操作提交给操作系统后返回。数据从操作系统内存转移到持久化存储这个过程是异步的(也就是和这里不会等持久化之后才返回)。使用同步写入,可以让某些特定的写入等到数据被写入持久化存储后才返回。(在Posix系统中,这个通过调用 fsync(....)fdatasync(....)msync(...., MS_SYNC) 来实现。

leveldb::WriteOptions write_options;
write_options.sync = true;
db->Put(write_options, ....);

异步写入通常情况下是千百倍的快于同步写入。异步写入的缺点是如果机器崩溃可能会造成最后的一些更新丢失。注意仅仅是写进程的崩溃不会造成任何数据丢失,即使没有使用同步写。因为写进程只负责将写操作提交给操作系统,提交完就意味着写入已经结束(即使这个时候没有真正的写入到持久化存储中)。

异步写大多数时候都能被安全使用。比如,当载入大量数据到数据库中,如果出现崩溃你可以重新开始这个载入操作。也可以使用混合写入模式,也就是每多少个异步写之后使用一次同步写。如果崩溃了,可以从上一次同步写开始(同步写会产生一个标记,所以可以从上一次同步写开始)。

WriteBatch对于异步写提供了多个选择。多个更新操作也被放在同一个批操作中,然后使用一个同步写被一起执行(write_options.sync设置为true)。同步写产生的额外性能消耗被均摊在所有的写操作上。

2.7 并发

数据库在同一时间只能由一个进程打开。leveldb使用了操作系统的锁来防止被错误使用。在一个单进程中,同一个leveldb::DB对象可以被多个并发线程安全的共享。不同的线程之间在对同一个数据库对象写入或者遍历或者调用Get时,不需要额外的同步操作(leveldb的实现会自动做这个必要的同步操作。然而其他对象(比如Iterator和WriteBatch)可能需要额外的同步。如果两个线程共享了这样的对象,它们必须使用它们自己的锁协议来保护对象的使用。更多的细节可以在公共头文件中看到。

2.8 迭代器(Iteration)

下面的例子展示了如何打印一个数据库中所有的键值对。

leveldb::Iterator* it = db->NewIterator(leveldb::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next()) {
  cout << it->key().ToString() << ": "  << it->value().ToString() << endl;
}
assert(it->status().ok());  // Check for any errors found during the scan
delete it;

下面的例子展示了如何显示一个特定范围的数据
[start,limit):

for (it->Seek(start);
   it->Valid() && it->key().ToString() < limit;
   it->Next()) {
  ....
}

你也可以倒序遍历所有的数据(倒序比正序可能要慢)

for (it->SeekToLast(); it->Valid(); it->Prev()) {
  ....
}

2.9 快照(Snapshots)

快照提供了键值存储所有状态一致的只读视图。ReadOptions::snapshot可能是non-NULL的值,来表示一个读操作应该执行在特定的DB状态版本上。如果 ReadOptions::snapshot是NULL,读操作可能执行在当前状态的隐式快照上。

快照的创建:DB::GetSnapshot()

leveldb::ReadOptions options;
options.snapshot = db->GetSnapshot();
.... apply some updates to db ....
leveldb::Iterator* iter = db->NewIterator(options);
.... read using iter to view the state when the snapshot was created ....
delete iter;
db->ReleaseSnapshot(options.snapshot);

当一个快照不需要的时候,使用DB::ReleaseSnapshot接口来释放。

2.10 片(Slice)

*Slice不知道翻译成什么*

it->key()it->value()的返回值都调用了leveldb::Slice类型的实例。Slice是一个简单的结构:包含字节数组的长度和指针。返回一个Slice比返回std::string是一个更轻量划算的选择,因为我们不需要复制潜在的很大的键和值(返回的是指针和长度)。额外的,leveldb的方法不返回null-terminated(\0结尾)的 C-style 字符串,因为leveldb的键和值是允许存储'\0'字节的。

Slice和C++字符串以及C-style字符串之间可以互相转换。

leveldb::Slice s1 = "hello";

std::string str("world");
leveldb::Slice s2 = str;
std::string str = s1.ToString();
assert(str == std::string("hello"));

使用Slice时要小心,因为Slice取决于调用者来保证Slice中存储的数据的生命周期。下面就是一段有问题的代码的示例:

leveldb::Slice slice;
if (....) {
  std::string str = ....;
  slice = str;
}
Use(slice);

当执行到if外面的时候,str被销毁,slice中的存储也就消失了(注:这是因为slice中存储的是指针和长度)。

2.11 比较器(Comparators)

前面的例子使用的都是默认的排序函数来对键做排序,是对字节做字典排序的。当打开数据库的时候,可以使用自定义的比较器来排序。例如,假设每个数据的键都由两个数字组成,我们应该使用第一个数字来排序,使用第二个数字来打破联系。首先,定义一个合适的leveldb::Comparator的子类。

class TwoPartComparator : public leveldb::Comparator {
 public:
  // Three-way comparison function:
  //   if a < b: negative result
  //   if a > b: positive result
  //   else: zero result
  int Compare(const leveldb::Slice& a, const leveldb::Slice& b) const {
    int a1, a2, b1, b2;
    ParseKey(a, &a1, &a2);
    ParseKey(b, &b1, &b2);
    if (a1 < b1) return -1;
    if (a1 > b1) return +1;
    if (a2 < b2) return -1;
    if (a2 > b2) return +1;
    return 0;
  }

  // Ignore the following methods for now:
  const char* Name() const { return "TwoPartComparator"; }
  void FindShortestSeparator(std::string*, const leveldb::Slice&) const {}
  void FindShortSuccessor(std::string*) const {}
};

现在使用这个比较器来打开数据库:

TwoPartComparator cmp;
leveldb::DB* db;
leveldb::Options options;
options.create_if_missing = true;
options.comparator = &cmp;
leveldb::Status status = leveldb::DB::Open(options, "/tmp/testdb", &db);
....

2.11.1 向后兼容

比较器的Name()方法的返回值在数据库被创建时就已经联系起来了,在之后的每次数据库打开时都会被检查。如果name改变了,leveldb::DB::Open调用时会失败。因此,当且仅当新的键格式和比较函数和已经存在的数据库不兼容时,才会改变name的值,这时不得不丢弃所有已经存在的数据。

当然你也可以通过一些提前的计划来逐步的拓展你的键格式。例如,你可以存储一个版本号在每一个键的末尾(一个字节就够用了)。当你想要改变到新的键格式(例如添加第三个部分到键上)
– (a) 保持同样的比较器名字
– (b) 增加新的键的版本号
– (c) 改变比较函数,使用键的版本号来决定如何解释这些键

2.12 性能

性能可以通过改变配置文件来调节
include/leveldb/options.h

2.13 块大小

leveldb将相邻的键合成一组存储在同一个块中,这样的块是从持久化存储和内存间转移的最小单元。默认的块大小是大约4096个未压缩的字节。如果应用程序总是在数据库中做大量的扫描,可以考虑适当增加这个值的大小。应用程序做大量的很小的值的点读取,可以考虑使用小一点的块大小(如果这些设置确实提升了性能)。如果块大小小于1kilobyte,或者大于几个megabytes是没有太多好处的。注意对于更大的块大小使用压缩会更有效率。

2.14 压缩

每一个数据块在写入持久化存储前会单独的被压缩,压缩是默认的,因为默认的压缩函数是非常快的。对于不可压缩的数据会自动禁用。在极少数的例子中,程序可能会希望完全禁用压缩,仅仅当性能测试显示性能提升了才推荐这么做。

leveldb::Options options;
options.compression = leveldb::kNoCompression;
.... leveldb::DB::Open(options, name, ....) .....

2.15 缓存

数据库的内容被存储在文件系统的文件集合中。每一个文件存储了一系列的压缩的数据块。如果options.cache是non-NULL的,经常使用的未压缩的数据块内容将会被缓存。

#include "leveldb/cache.h"

leveldb::Options options;
options.cache = leveldb::NewLRUCache(100 * 1048576);  // 100MB cache
leveldb::DB* db;
leveldb::DB::Open(options, name, &db);
.... use the db ....
delete db
delete options.cache;

注意缓存保存了未压缩的数据,因此根据程序级的数据量来设置缓存的大小,使用压缩的数据不会有任何的减少。(压缩的数据块的存储留给操作系统缓冲区缓存,或者客户端的其他自定义环境变量来实现)

当执行大量的读取操作时,应用程序最好禁用缓存,这样大量的数据读取不会导致缓存内容大量被改变。per-iterator选项用来实现这个:

leveldb::ReadOptions options;
options.fill_cache = false;
leveldb::Iterator* it = db->NewIterator(options);
for (it->SeekToFirst(); it->Valid(); it->Next()) {
  ....
}

2.15.1 键层(Key Layout)

注意硬盘中转移和缓存的单元是一个数据块。相邻的键(根据数据库排序顺序)将被放在同一个数据块中。因此应用程序通过将所有的键依次相邻的排放,将不常访问的值内容(通过key来索引)放在另外一个区域可以提升程序性能。

举例来说,假设我们正在在leveldb上实现一个简单的文件系统,下面的入口类型我们可能希望存储。

filename -> permission-bits, length, list of file_block_ids
file_block_id -> data

我们可能想要设置filename的键前缀为一个字母(‘/’),file_block_id的键前缀为另一个字母(‘0’),因此扫描这些元数据不会强制我们取出和缓存大量的文件内容。

2.16 过滤器

因为leveldb的数据在磁盘上有结构地存放,一个简单的Get()可能调用多个从磁盘的读取。选项 FilterPolicy机制可以被用来减少磁盘读取的次数。

leveldb::Options options;
options.filter_policy = NewBloomFilterPolicy(10);
leveldb::DB* db;
leveldb::DB::Open(options, "/tmp/testdb", &db);
.... use the database ....
delete db;
delete options.filter_policy;

上面的代码将一个基于过滤策略的布隆过滤器(Bloom filter)和数据库联系起来。布隆过滤器的基础过滤依赖于保存每个键的一些位的数据在内存中(在这个例子中每个键有10个bit)。这个过滤器将会减少Get()带来的不必要的磁盘读取的次数大概为100。增加保存的每个键的bit数会更明显的减少读取次数,但是这是在更多的内存消耗的代价上得来的(注:典型的以空间换时间)。我们建议应用程序工作时还剩很多的内存,并且会做大量的随机读取时,设置一个过滤策略。

(注:布隆过滤器是一个针对于大数据量的去重算法,它会通过多个hash将一个字节数组映射到一个很大的bit向量中的k个位置,通过检查一个字节数组hash后的k个位置对应的bit向量是否都为1,如果不是则这个字节数组没有出现过,否则出现过(会有少量的误报)。使用在这里,是为了在读取前检查这个键是否存在,如果不存在则避免了以此全数据库查询,从而大大的提升的性能)

如果你使用一个自定义的比较器,你应该保证你正在使用的过滤策略是和你比较器兼容的。例如,想象一个比较器比较键时忽略了末尾的空间,NewBloomFilterPolicy不能和这样的比较器一起使用。相反的,应用程序需要提供一个自定义的过滤策略同样忽略这些末尾的空间。比如:

class CustomFilterPolicy : public leveldb::FilterPolicy {
 private:
  FilterPolicy* builtin_policy_;

 public:
  CustomFilterPolicy() : builtin_policy_(NewBloomFilterPolicy(10)) {}
  ~CustomFilterPolicy() { delete builtin_policy_; }

  const char* Name() const { return "IgnoreTrailingSpacesFilter"; }

  void CreateFilter(const Slice* keys, int n, std::string* dst) const {
    // Use builtin bloom filter code after removing trailing spaces
    std::vector<Slice> trimmed(n);
    for (int i = 0; i < n; i++) {
      trimmed[i] = RemoveTrailingSpaces(keys[i]);
    }
    return builtin_policy_->CreateFilter(&trimmed[i], n, dst);
  }
};

有一些应用程序可能提供其他过滤策略不使用布隆过滤器,而是使用其他的机制来处理键的集合。细节看leveldb/filter_policy.h

2.17 校验(Checksums)

leveldb会校验所有存储在文件系统中的数据。这里有两个不相关的控制部分提供积极的校验:

ReadOptions::verify_checksums可被设置为true,强制校验从文件系统一次读取的所有数据。默认情况是不会做这个验证的。

Options::paranoid_checks在打开数据库之前可被设置为true,来使得数据库一旦检查到内部错误立马报错。这取决于数据库的哪一部分出错,当数据库被打开时还是其他后来的操作时,这个错误会被警告。默认情况下,偏执检查是关闭的,因此即使有时候持久化存储已经出错了,数据库依然可以被使用。

如果一个数据库出错了(可能它在偏执检查开启时不能打开), leveldb::RepairDB方法会尽可能的修复数据库的数据。

2.18 近似大小

GetApproximateSizes方法可以用来通过一个或多个键范围来获取文件系统已用的空间的近似字节数

leveldb::Range ranges[2];
ranges[0] = leveldb::Range("a", "c");
ranges[1] = leveldb::Range("x", "z");
uint64_t sizes[2];
leveldb::Status s = db->GetApproximateSizes(ranges, 2, sizes);

前面的代码调用将会设置sizes[0]为键范围在[a..c)之间的数据使用的文件系统存储空间的大小,设置sizes[1]为键范围在[x..z)之间的数据使用的文件系统存储空间的大小。

2.19 环境

所有的文件操作(以及其他的系统调用)都在leveldb:Env中实现了。复杂的客户端可能希望提供它们自己的Env的实现来获得更好的控制。例如,一个应用程序可能会在文件IO中引入人为延迟,限制leveldb对系统其他活动的影响。

class SlowEnv : public leveldb::Env {
  .... implementation of the Env interface ....
};

SlowEnv env;
leveldb::Options options;
options.env = &env;
Status s = leveldb::DB::Open(options, ....);

2.20 移植(Porting)

leveldb可能通过提供被leveldb/port/port.h导出的类型/方法/函数/的平台相关的实现,来移植到新的平台。详细内容参考leveldb/port/port_example.h

另外的,新的平台可能需要新的默认leveldb::Env的实现。详细内容参考leveldb/util/env_posix.h

2.21 其他信息

其他信息可以参考其他的文档文件
1. impl.md
2. table_format.md
3. log_format.md

c++11实现的跨平台定时器

一、引言

在一个比较大的系统当中,很多地方都需要定时功能,比如某些存在内存中的数据,需要定时的做持久化(实时持久化可能带来很大的性能问题),如果这个系统要求跨平台。那么实现一个跨平台的定时器就很有必要了。
c++11的标准中,实现了多线程,头文件为<thread>,也有了系统时间相关的库,头文件为<chrono>,这使得使用c++11实现跨平台的定时器有了可能。在这之前,多线程一般都是使用操作系统相关的库。

二、 定时器的基本功能

在定时器的实现之前,需要事先计划好一个定时器的基本功能。大概的功能如下:
1.添加一个定时事件
2.删除一个指定的定时事件
3.定时事件支持只执行一次或者重复执行
4.定时器启动指令
这个部分的内容在Linux 下定时器的实现方式分析中有很详细的介绍

三、 数据结构知识预备

在实现定时器中,需要有一个容器去存储所有的定时事件,比如说最容易想到的数组,如果我们用数组保存定时事件,那么在找出当前需要执行的定时事件的伪代码就是

    for(i = 0 ; i < events_array_size; ++i){
        if(events[i].time <= current_time){
            exec_in_child_thread(events);
        }
    }
Null

这里就是遍历所有的事件,将已经到了执行时间的事件放到子线程中去执行。

在这里有一个可以优化效率问题:注意到每一次查询是否有事件到了执行时间,都会遍历整个数组,也就是O(n)的复杂度,并且查询周期也是非常短的,所以会有非常大的性能问题。因此,我们可以将这里的数组变成有序的,每一次只需查询第一个事件,如果还没到执行时间,那么之后的事件就不用遍历了。这样平均下来就是O(1)的时间复杂度。考虑到我们在插入,删除时始终要保证数组的有序,可以采用最小堆。也就是堆顶始终是最先到执行时间的一项。

四、c++11多线程的知识预备

4.1 join()还是detach()

c++11中创建一个线程很容易

#include <iostream>
#include <thread>

void function_1(){
    std::cout << "hello world" << std::endl;
    while(true){}
}

int main(){
    std::thread t1(function_1);
    t1.join();      //加入运行

    std::cout << "hahah" << std::endl;

    return 0;
}
Null

这段代码就是创建了一个t1的线程,来执行function_1函数。注意到我们这里使用了t1.join(),字面上的意思是将子线程加入的主线程来执行,这样导致的结果是主线程会在t1.join()这里停止,直到t1执行结束后再往下执行。在这里function_1中存在死循环,所以hahah永远不会被输出。而如果使用t1.detach()的话,即将子线程从主线程分离出去,那么主线程就会一直向下执行,在这里就会执行到return 0,然后主线程销毁,detach出去的子线程可能就没有任何显示的机会了(查过资料说detach出去子线程的行为的undefined的)。

在这个定时器的实现中,当然不能使用join(),因为会阻塞程序的正常运行。

4.2 如何使得定时器的使用是线程安全的。

在一个程序中,可能很多地方会使用到定时器去触发某个动作,但是为了效率,肯定不会创建多个定时器实例,因此定时器的实现应该是单例的。因为需要单例,因此定时器的创建,定时器中定时事件的添加和删除都是线程安全的(因为同一时间可能会有多个线程操作定时器)。

4.2.1 线程安全的单例模式

单例模式的实现方法有很多种,因为能力问题,不对这一点多做叙述。可以看这一篇单例模式(Singleton)及其C++实现

我这里使用的是使用static来创建局部可见的全局变量

static Timer* getInstance(std::chrono::milliseconds tick){
    static Timer timer(tick);
    return &timer;
}
Null

4.2.2 使用锁保证定时事件的添加和删除是线程安全的。

在往定时器中添加和删除定时事件时,就是在做最小堆的添加和删除。这个中间线程不安全的现象为:以添加为例,往最小堆数组末尾加入一个数据,然后再逐层向上调整。如果在这个调整的过程中,另外一个线程执行了删除操作,两个调整过程就可能同时发生,造成不可预知的错误,使得最小堆并不有序。

所以添加和删除操作应该是互斥的,只有其中一个操作执行结束,另一个操作才可以开始。c++中可以使用信号量mutex和锁locker轻松的完成互斥的需求。示例代码如下:

#include <iostream>
#include <thread>
#include <string>
#include <mutex>
#include <fstream>

class LofFile{
    public:
        LofFile(){
            f.open("log.txt");
        }

        void shared_print(std::string id,int value){
            std::lock(m_mutex,m_mutex2);
            std::lock_guard<std::mutex> locker(m_mutex,std::adopt_lock);
            std::lock_guard<std::mutex> locker2(m_mutex2,std::adopt_lock);
            std::cout << "from" << id << ":" << value << std::endl;
        }

        void shared_print2(std::string id,int value){
            std::lock(m_mutex,m_mutex2);
            std::lock_guard<std::mutex> locker2(m_mutex2,std::adopt_lock);      //lock_guard是为了防止语句执行中出现异常,锁不被释放。这样做可以保证在退出代码块后解锁
            std::lock_guard<std::mutex> locker(m_mutex,std::adopt_lock);
            std::cout << "from" << id << ":" << value << std::endl;
        }

    protected:
    private:
        std::mutex m_mutex;     //使用信号量来解决资源竞争
        std::mutex m_mutex2;
        std::ofstream f;

};

void function_1(LofFile& log){
    for(int i = 0; i > -100; i--){
        log.shared_print("From t1:",i);
    }
};

int main(){
    LofFile log;
    std::thread t1(function_1,std::ref(log));//线程开始运行
    for(int i = 0; i < 100; i++){
        log.shared_print2("From main",i);
    }

    t1.join();
    return 0;
}
Null

五、总结与实现

分析到这里,一个定时器的基本要素已经具备了。
1.用来存储定时事件,并且添加和删除操作都是线程安全的最小堆
2.可以多线程实现定时事件的执行从而不阻塞主线程。

下面贴出所有的实现代码:

5.1 最小堆的实现。

SortedHeap.hpp

/**
 * 排序堆的实现
 * 堆使用的是完全二叉树的结构,使用数组(数组从零开始)保存,那么最后一个非叶子节点为(n - 1) / 2
 * 堆的构建一直都是在有序的基础上的,那么每次调整只需比较i和(i - 1) / 2的元素,依次上推
 * 支持任意类的排序
 * 当前还不支持多线程环境下的使用
 * author:jiangpengfie
 * date:2017-05-09
 */
#ifndef SORTEDHEAP_H
#define SORTEDHEAP_H
#include <iostream>
#include <vector>
#include <functional>
#include <memory>
#include <mutex>
#include <condition_variable>
#include "src/core/util/util.h"


template<class T>
class SortedHeap{
    private:
        struct HeapNode{
            unsigned int id;
            T obj;
            HeapNode(unsigned int id,T t):obj(t){
                this->id = id;
            }
        };
        std::vector<HeapNode> heap;
        unsigned int autoIncrementId;
        std::function<bool(T& ,T&)> cmp;    //比较函数,实现选择构造最大堆还是最小堆
        std::mutex mu1;                
        std::mutex mu2;                

        /**
         * 插入节点后调整堆中不符合的节点
         */
        void adjustAfterInsert();

        /**
         * pop出堆顶元素后调整堆中不符合的节点
         */
        void adjustAfterPopTop();

        /**
         * 删除节点后调整堆中不符合的节点
         * @param i 删除的节点id
         */
        void adjustAfterDelete(int id);         

        void swap(HeapNode& t1,HeapNode& t2);

        void deleteNodeByPos(const unsigned int pos);
    public:
        /**
         * 构造函数
         * @param cmp 用来比较
         */
        SortedHeap(std::function<bool(T&,T&)> cmp);
        /**
         * 插入节点
         * @param node 插入的节点
         */
        unsigned int insertNode(T& node);
        /**
         * 删除节点,时间复杂度为O(n)
         * @param id  要删除的节点id
         */
        void deleteNode(unsigned int id);

        /**
         * pop最小的节点
         * @return T* 返回的最顶部的节点指针
         */
        std::unique_ptr<T> popTopNode();

        /**
         * 获取最顶部的节点
         * @return T 最顶部的节点指针
         */
        std::unique_ptr<T> getTopNode();

        /**
         * 删除顶部的节点
         *
         */
        void deleteTopNode();
};

template<typename T>
SortedHeap<T>::SortedHeap(std::function<bool(T&,T&)> cmp){
    this->cmp = cmp;
    this->autoIncrementId = 0;
}

template<typename T>
void SortedHeap<T>::swap(HeapNode& t1,HeapNode& t2){
    HeapNode tmp = t1;
    t1 = t2;
    t2 = tmp;
}

template<typename T>
void SortedHeap<T>::adjustAfterInsert(){
    int last = this->heap.size() - 1;
    int flag = true;
    //从插入的节点位置开始向上调整
    while(last > 0 && flag){
        if(this->cmp(this->heap[last].obj,this->heap[(last - 1) / 2].obj)){
            this->swap(this->heap[(last - 1) / 2],this->heap[last]);
        }else{
            //不需要调整了
            flag = false;
        }
        last = (last - 1) / 2;
    }
}

template<typename T>
void SortedHeap<T>::adjustAfterDelete(int pos){
    //从pos位置开始向下调整
    int last = this->heap.size() - 1;
    if(last == 0)
        return;     //最后一个不需要调整
    bool flag = true;   //标记是否需要调整
    while(pos <= (last - 1) / 2 && flag){
        //一直调整到最后一个非叶子结点
        int topNum = 0;     //记录最小的结点编号

          //(pos + 1) * 2 - 1是左孩子,pos是父
        if(this->cmp(this->heap[(pos + 1) * 2 - 1].obj,this->heap[pos].obj)){
            topNum = (pos + 1) * 2 - 1;
        }else{
            topNum = pos;
        }

        if((pos + 1) * 2 <= last){
            //如果存在右结点
            if(this->cmp(this->heap[(pos + 1) * 2].obj,this->heap[topNum].obj)){
                topNum = (pos + 1) * 2;
            }
        }

        //看看topNum是不是自己
        if(pos == topNum){
            //是自己就不用调整了
            flag = false;
        }else{
            //交换
            this->swap(this->heap[pos],this->heap[topNum]);
        }
        pos = topNum;
    }
}


template<typename T>
void SortedHeap<T>::deleteNodeByPos(const unsigned int pos){
    unsigned int last = this->heap.size() - 1;
    if(pos > last){
        return;
    }
    std::lock(mu1,mu2);             //上锁
    std::lock_guard<std::mutex> locker1(mu1,std::adopt_lock);
    std::lock_guard<std::mutex> locker2(mu2,std::adopt_lock);
    //与最后一个交换
    swap(this->heap[pos],this->heap[last]);
    //删除最后一个
    this->heap.pop_back();      

    this->adjustAfterDelete(pos);
}



template<typename T>
unsigned int SortedHeap<T>::insertNode(T& node){
    HeapNode hNode(this->autoIncrementId++,node);
    std::lock(mu1,mu2);             //上锁
    std::lock_guard<std::mutex> locker1(mu1,std::adopt_lock);
    std::lock_guard<std::mutex> locker2(mu2,std::adopt_lock);
    this->heap.push_back(hNode);     //先将node放在最后一位
    if(this->heap.size() != 1){
        //如果大小不等于1,则在新增节点后调整
        this->adjustAfterInsert();
    }
    return this->autoIncrementId - 1;
}


template<typename T>
void SortedHeap<T>::deleteNode(unsigned int id){
    for(unsigned int i = 0; i < this->heap.size(); i++){
        if(heap[i].id == id){
            //找到了id
            this->deleteNodeByPos(i);
            break;
        }
    }


}

template<typename T>
std::unique_ptr<T> SortedHeap<T>::popTopNode(){
    if(this->heap.size() != 0){
        std::unique_ptr<T> top(new T(this->heap[0].obj));
        this->deleteNodeByPos(0);
        return top;
    }else{
        std::unique_ptr<T> p = nullptr;
        return p;
    }
}

template<typename T>
std::unique_ptr<T> SortedHeap<T>::getTopNode(){
    if(this->heap.size() != 0){
        std::unique_ptr<T> top(new T(this->heap[0].obj));
        return top;
    }else{
        std::unique_ptr<T> p = nullptr;
        return p;
    }
}

template<typename T>
void SortedHeap<T>::deleteTopNode(){
   if(this->heap.size() != 0){
        this->deleteNodeByPos(0);
    }
}

#endif



Null

5.2 定时器的实现

Timer.h

/**
 * 定时器的实现
 * 支持int setTimer(T interval,function action):设置一个定时器,指定间隔interval和回调函数action,返回定时器id
 * 支持void deleteTimer(int timerId):删除一个定时器
 * 数据结构:最小堆模型,按照定时器触发的时间排序
 * author:jiangpengfei
 * date:2017-05-09
 */
#ifndef TIMER_H
#define TIMER_H
#include <iostream>
#include <chrono>
#include <functional>
#include <thread>
#include <memory>
#include "SortedHeap.hpp"

class Timer{
    private:
        std::chrono::milliseconds tick;
        double timeline;     //当前时间线,long double的字节数为12
        bool isStart;        //标志当前定时器的启动状态
        struct SchedulerEvent{
          unsigned int id;                   //定时事件的唯一标示id
          double interval;                   //事件的触发间隔,在重复事件中会用到这个属性
          double deadline;                   //定时事件的触发时间
          std::function<void()> action;      //触发的事件
          bool isRepeat;                     //是否是重复执行事件
          SchedulerEvent( double interval, double timeline,std::function<void()> action,bool isRepeat){
              this->interval = interval;
              this->deadline = interval + timeline;
              this->action = action;
              this->isRepeat = isRepeat;
          }
        };

        SortedHeap<SchedulerEvent> eventQueue;

        /**
         * 执行到达期限的定时器
         */
        void loopForExecute();

        //私有的构造函数
        Timer(std::chrono::milliseconds tick):eventQueue(
            [](SchedulerEvent& a,SchedulerEvent& b){
                return a.deadline < b.deadline;
            }
        ){
            this->timeline = 0;
            this->tick = tick;
            this->isStart = false;
        }

    public:

        //单例模式
        static Timer* getInstance(std::chrono::milliseconds tick){
            static Timer timer(tick);
            return &timer;
        }

        /**
         * 设置定时器
         * @param interval 定时间隔
         * @param action 定时执行的动作
         * @param isRepeat 是否重复执行,默认不重复执行
         * @return unsigned int 定时器的id,可以根据这个id执行删除操作
         */
        unsigned int addEvent(double interval,std::function<void()> action,bool isRepeat = false);

        /**
         * 删除定时器
         * @param timerId 定时器id
         *
         */
        void deleteEvent(unsigned int timerId);

        /**
         * 同步执行启动定时器
         */
         void syncStart();

         /**
         * 异步执行启动定时器
         */
         void asyncStart();

};


#endif
Null

Timer.cpp

#include "src/core/util/Timer.h"

unsigned int Timer::addEvent(double interval,std::function<void()> action,bool isRepeat){
    SchedulerEvent event(interval,this->timeline,action,isRepeat);
    return this->eventQueue.insertNode(event);
}

void Timer::deleteEvent(unsigned int timerId){
    this->eventQueue.deleteNode(timerId);
}

void Timer::loopForExecute(){
    std::unique_ptr<SchedulerEvent> top = this->eventQueue.getTopNode();
    while(top != nullptr && top->deadline <= this->timeline){
        //如果已经到了执行的时间,新开一个子线程执行任务
        std::thread t(top->action);
        t.detach();    //子线程分离

        if(top->isRepeat){
            //如果是重复事件,则重新添加
            this->addEvent(top->interval,top->action,top->isRepeat);
        }

        //从堆中删除
        this->eventQueue.deleteTopNode();
        top = this->eventQueue.getTopNode();
    }
    //执行一次后等待一个周期
    std::this_thread::sleep_for(this->tick);
    //周期增1
    this->timeline++;
}

void Timer::asyncStart(){
    if(!this->isStart){
        std::thread daemon_thread(&Timer::syncStart,this);
        daemon_thread.detach();     //从当前主线程分离
    }
}

void Timer::syncStart(){
    if(!this->isStart){
        while(1)
            this->loopForExecute();
    }
}
Null

测试执行的代码

#include <iostream>
#include <chrono>
#include <ctime>
#include <iomanip>
#include <string>
#include <functional>
#include <thread>
#include <memory>
#include <fstream>
#include "src/core/util/Timer.h"

void myprint(std::string msg){
    std::ofstream of("timer.txt", std::ios::app);
    std::thread::id this_id = std::this_thread::get_id();
    auto t = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
    of << "From Thread " << this_id << "at time " << std::put_time(std::localtime(&t), "%Y-%m-%d %H.%M.%S") << ":" << msg << std::endl;
}

int main(){
    std::chrono::milliseconds tick(2000);       //1000毫秒作为一个周期
    Timer* timer = Timer::getInstance(tick);
    std::function<void()> f1 = std::bind(myprint,"第一个加入,10tick后执行");
    std::function<void()> f2 = std::bind(myprint,"第二个加入,被删除不执行");
    std::function<void()> f3 = std::bind(myprint,"第三个加入,每5tick重复执行");
    std::function<void()> f4 = std::bind(myprint,"第四个加入,5tick后执行");
    std::function<void()> f5 = std::bind(myprint,"第五个加入,5tick后执行");
    std::function<void()> f6 = std::bind(myprint,"第六个加入,5tick后执行");
    std::function<void()> f7 = std::bind(myprint,"第七个加入,5tick后执行");
    std::function<void()> f8 = std::bind(myprint,"第八个加入,5tick后执行");
    std::function<void()> f9 = std::bind(myprint,"第九个加入,5tick后执行");
    std::function<void()> f10 = std::bind(myprint,"第十个加入,5tick后执行");
    std::function<void()> f11 = std::bind(myprint,"第十一个加入,15tick后执行");
    std::function<void()> f12 = std::bind(myprint,"第十二个在执行后加入,20tick+5s后执行");

    timer->addEvent(10,f1);
    int id = timer->addEvent(11,f2);
    timer->addEvent(5,f3,true);
    timer->addEvent(5,f4);
    timer->addEvent(5,f5);
    timer->addEvent(5,f6);   
    timer->addEvent(5,f7);
    timer->addEvent(5,f8);
    timer->addEvent(5,f9);
    timer->addEvent(5,f10);
    timer->addEvent(15,f11);

    timer->deleteEvent(id);

    myprint("线程开始启动,每tick是2秒");

    //异步执行,程序退出后计时器也会终止,因此在下面使用while循环保证程序不会退出
    timer->asyncStart();
    //timer->syncStart();


    //休眠5秒钟
    std::this_thread::sleep_for(std::chrono::seconds(5));   
    //应该在大概20*tick+5秒后执行,
    //TODO 执行后加入的定时器不对
    timer->addEvent(20,f12);

    getchar();

    return 0;
}
Null

c++中的中文字符分割

在utf-8编码的前提下,一个字符可能占用的空间为1~4个char,由于这种不确定的字符长度导致在utf-8编码的string中,无法直接使用substr进行字符串分割。

解决的方案的关键在于利用utf-8编码的特点,utf-8的第一个字符的前缀连续为1的个数代表这个”字“占用的字符数量,当只有一个字符时,第一个字符的第一位为0。具体的说明可以参考这篇文章。http://www.ruanyifeng.com/blog/2007/10/ascii_unicode_and_utf-8.html

这里主要说明怎么利用代码解决这个问题

int getUtf8CharLength(char c){
    int len = 0;
    if(c > 0){              //利用计算机中数字的存储特点,第一位为1为负,第一位为0为正,很容易判断
        return len + 1;     //第一位为0时为特殊情况,需要加1
    }
    while(c < 0){
        len++;
        c = c << 1;
    }
    return len;
}

std::string substrWithChinese(std::string str,unsigned int start,unsigned int length){
    unsigned int i = 0;             //标识前进的几个“字符”
    unsigned int cursor = 0;        //标识前进了几个“字”
    unsigned int save = 0;          //保存的字符标识
    unsigned int len = str.length();
    char* c = new char[len + 1];
    while(str[i] != '\0'&&length > 0){
        unsigned int l = getUtf8CharLength(str[i]); //获取字符长度
        if(cursor >= start){
            unsigned int m = l;
            while(m--){
                c[save++] = str[i++];
            }
            length--;
        }else{
            i+=l;
        }
        cursor++;
    }
    c[i] = '\0';
    return std::string(c);
}

c++和Java的对象内存异同

##前言

好久都没有写博客了。经过一段时间的实习,收获上感觉并不是很大,还好自己也有看一些东西。因为毕业设计做的是c++方面的编程,算是初探c++的内容。这里简单记录自己对c++和java的对象内存区别,谈谈c++为什么性能要比java高。想到哪里就记到哪里,因为很多东西自己也要查证才能确定。

##1.栈和堆

栈和堆在数据结构上是两种不同的结构,栈的特点是先进后出,而堆则可以看做是一大堆数据的集合。在c++和Java的内存中,也有堆和栈的概念。
首先理解一下,内存中栈中的每一个元素都被称为栈帧,每个栈帧中存储一个方法(function)中的用到的变量内存。程序执行时,就是一个栈中的每一帧被读取执行的过程。堆则是主要用来存储对象。那么在c++和java中到底有什么不同呢?

“`c++
object* processObject(int param){
object obj;
obj.param = param;
return &obj;
}

int main(){
object* obj = processObject(10);
obj->do_something(); //1
}

<pre><code><br />1处是会出现一个空指针错误的。而如果这是一段java的代码则完全没有问题。

在c++和java中,一个栈帧被执行完,这部分内存就直接被释放了。而在c++中,所有通过声明产生的对象都是在栈上开辟内存空间,所以函数执行完后这个对象也就被释放了,那么返回出来就是一个空指针了。只有通过new生成的对象才会在堆中申请空间,因此通过new出来的对象都需要在不用时手动释放内存,不然就会内存泄露。而在java中,对象是在堆中生成的(JDK7中的字符串是在常量池中,int等字面值在限定范围内也会在常量池中申请内存),所有方法内的对象都是一个指向堆中相应对象的指针(注意是指针而不是引用,很多人认为java中向方法传递参数是引用传递,但其实是值传递,只不过这个值是指向对象的指针)。而在java中不需要手动释放内存则是因为Java拥有GC机制(Garbage collect,垃圾回收)。这个在之后再谈。

##2 java的强弱引用和c++的智能指针

java的强弱引用和c++的智能指针都是在希望可以更好的进行内存管理的前提下出现的,java的强弱引用可以帮助GC机制进行粒度更细的内存回收,而c++的智能指针则是让没有GC机制的c++有了一定能力的自动释放内存的能力。
###a. java的强弱引用
java的引用共有四种,分别为强引用(Strong Reference)、软引用(Soft Reference)、弱引用(Weak Reference)、虚引用(Phantom Reference)。
之所以定义出这么多引用,是希望在GC发生时,可以更灵活的进行对象的销毁。

强引用就是通过new出来的对象,强引用表示的都是必需的对象,这是无论如何都不会被回收的。

而软引用则表示有用但非必需的对象。当系统内存不够将要发生内存溢出异常的时候,会将软引用的对象列入回收范围,进行二次回收。只有这次回收仍然内存不足才会抛出内存溢出异常。JDK1.2之后提供了SoftReference类来实现这个。

弱引用也用来表示非必需的对象,不同的是,这个引用并不能帮助对象躲过任何的GC,也就是说无论如何,发生GC时这个对象都会被回收,弱引用的唯一作用就是用来取得一个对象实例。JDK1.2之后提供了WeakReference类来实现这个。

综合上述,虚引用的存在也比较清晰了。它不仅不能帮助对象躲过GC,甚至不能取得对象的实例。唯一存在的用处就是当其引用的对象被GC回收时会收到一个系统的通知。JDK1.2之后使用PhantomReference类来实现。

###b. C++的智能指针
相比于Java的GC机制以及为GC服务的各种引用,C++的智能指针则稍显简单。但是这个看似简单的c++的智能指针,对于c++来说意义也许并不是那么低。在我的理解中,c++之所以实用,因为相对于c,c++有着丰富高效的STL库和被大多数开发者接受的OOP。对于一个巨大的项目来说,OOP往往可以更好的帮助系统模块化,降低耦合,而STL库则是开发者进行高效工作的基础。相对于Java,单单从c++没有GC机制,就意味着它有着更高的性能,另外Java的虚拟机也是影响性能的一个方面。可也是因为c++没有GC机制,对于一个多人合作的巨大的项目来说,内存泄露则是面临的首要问题。举个很低端的例子
“`c++
class Example{
Mysql* getMysqlConnect(){
Mysql* mysql = new Mysql;
return mysql;
}
};

一个项目中所有的mysql对象指针都通过上面的getMysqlConnect()获取,看似没有问题,但是没有人敢保证一个项目中所有使用这个方法的人都会在使用完mysql对象后,手动将其释放。因此这里需要一个shared_ptr或者unique_ptr去包装一下指针,这样调用函数的人就不需要在外部释放对象内存,也就避免了内存泄露的问题。当然在使用shared_ptr时也一定要注意,因为可能会出现循环引用的问题,循环引用则会导致对象内存一直不被释放。
当然了,上面的例子只是为了说明这个例子而列举出来的,事实上可以直接返回对象而不是指针。
c++
class Example{
Mysql getMysqlConnect(){
Mysql mysql;
return mysql;
}
};

这里我一开始以为是会将对象复制一遍返回出来,但事实上在c++11中有了移动语义,即这种在拷贝语义和移动语义中,会优先使用移动语义来将mysql对象从方法中移出来,而不是拷贝mysql对象返回,并把方法中的对象删除。
##3 c++如何尽量避免内存泄露

后天再写啦