ubuntu上使用chrome进行手机页面调试的方案

虽然大多数时候我们都可以使用chrome的开发者工具,通过模拟手机来调试手机页面,但是对于一些特殊的动作是无法在电脑上做的,比如说手机的多点触控操作,在页面上模拟双指缩放时没法使用鼠标完成,因此需要通过手机来进行真机调试,但是调试的同时,我们希望可以看到调试信息,希望可以实时修改查看。这个时候就可以借助chrome的远程设备使用手机chrome打开页面,在电脑chrome中调试。具体的调试过程在
远程调试 Android 设备使用入门
可以看到。下面只是记录ubuntu下使用该方案遇到的问题。

一.打开远程调试界面

打开chrome的控制台,点击右上角的列表按钮,找到more tool->remote device即可。

二.找不到已连接的手机

这个在windows上一般比较容易,因为基本上各种安全软件都会自动帮助用户连上,在Ubuntu下我们需要使用adb来连接手机。使用adb start-server来开启服务监听手机的连接。如果出现权限错误,可以adb kill-server,再adb start-server。只要手机上弹出连接确认框,即说明已经连上了。

三.Inspect的页面空白

这个问题困扰了我很久,因为我在windows上是没有问题的,后来查了一下,是因为chrome需要翻墙才行,在windows上我使用的是shadowsocks的pac方案,linux上则是自己建立的翻墙规则,导致有遗漏。所以如果出现Inspect页面空白,可以尝试全局翻墙

四.如果在手机上访问开发机器上的服务

这个问题有这种解决方案。
1.如果手机和电脑在同一个局域网内,可以直接访问电脑的ip地址即可。但是http服务监听的地址不能只是localhost,应该是电脑的ip地址或者0.0.0.0
2.使用远程设备的端口转发功能,设置如下图
端口转发设置
成功的界面如下
端口转发设置成功
这样,我在手机上访问localhost:4200,相当于在电脑上访问localhost:4200,在手机上访问localhost:1025,相当于在电脑上访问localhost:80。需要注意的是,端口转发的左侧输入框端口号必须大于1024,这是因为小于等于1024的端口号是被划分出来特殊使用的。

树的可视化以及家谱绘制的算法

一、前言

树的可视化意思就是将一颗树(数据结构的树)用图形展现出来。树的可视化有很多种用途,比如说很常见的组织结构图就是树的可视化的应用。家谱也可以算是一颗树,但是与树的可视化稍微不同的是,会有配偶这个角色,同时一个家谱可能并不是从一个根节点向下(可能存在某个家族向上追溯到某一代,之前的资料已经完全丢失了,那么这个家谱的起源应该是这一代人,而不是具体到某一个人)。

二、难点

在树的可视化过程中,难点就在于如何确定每一个节点的位置(X,Y),Y坐标相对来说是比较好确定的,可以根据它在一颗树中的第几层来确定Y的值;X的值既受到同一层其他的节点位置的影响,同时也受到其子代位置的影响。因为其子代可能会和其兄弟节点的子代交叉。所以这篇文章主要讲解的是每个点的位置的计算。

家谱的绘制和树的可视化也有前言中的一些不同之处,因为也不能完全的将树的可视化算法生搬硬套。

下面我将一步步的讲解整个树的可视化算法的过程,以及如何将这个算法应用到家谱的绘制上,并给出具体的实现代码(因为是网页上有这个需求,所以代码是js写的)。算法的主要提出者是John Q. Walker II,这里有他对这个算法的相关讲解。POSITIONING NODES FOR GENERAL TREES

三、算法介绍

图1:
示意图

1、节点属性定义

var Node = function(key,image,description){
    this.key = key;
    this.parent = null;         //父辈
    this.offspring = null;      //指向最左边的子辈
    this.leftSibling = null;    //左兄弟
    this.rightSibling = null;   //右兄弟
    this.prelim = 0;            //x坐标的预定义
    this.modifier = 0;          //调整的大小
    this.level = 0;
    this.width = 60;            //每个节点的最终宽度,如果存在配偶这个宽度是会发生改变的
    this.height = 80;           
    this.nodeWidth = 60;        //每个节点的基本宽度,不会变
    this.nodeHeight = 80;
    this.x = 0;
    this.y = 0;
    this.spouses = [];
    this.image = image||"assets/post-photo.jpg";
    this.description = description || "";
};

这里首先介绍几个重要的属性:

parent,offspring,leftSibling,rightSibling:分别是当前节点的父节点指针、最左边的孩子节点的指针、左兄弟、右兄弟。以上图为例,M点的parent是N,offspring是H,leftSibling是G,rightSibling是null。

prelim:当前点的X的预定义位置。如同在难点中所说,X的位置受到很多因素的影响,所以需要多次计算。prelim只是预定义的位置,不是最终的位置。

modifier:调整值,不过这个调整值并不是说当前点的X还要调整多少,而是当前点的后代的X还要调整的大小。

level:当前点所处的层级。

width、height、nodeWidth、nodeHeight:这些参数用来表示节点的宽和高,width和nodeWidth的区别在于,nodeWidth是画出来的节点的宽度,是一个基本属性,不会改变,width则表示这个点最终会占用的宽度。在树的可视化算法中,只需要nodeWidth和nodeHeight,width和height是专门为家谱的绘制增加的。

x:当前点的X,如何确定X的大小呢?以M为例,M.X = M.prelim + N.modifier + O.modifier。

y:当前点的y,以M为例: M.y = M.level * (每层的间隔 + M.height);

spouses:这个是专门为家谱的绘制准备的。表示当前点的配偶,一个点的配偶可以有多个,所以使用数组。

2、算法的大概流程

算法大概可以划分为:

  • 1.倒序遍历整颗树,初次确定每个点的prelim和modifier。
  • 2.从最底层逐级向上检查是否存在交叉,如果存在交叉则调整(这一步和我参考的算法不同,它是从上向下检查,但是我在使用中发现,如果存在子树的子树交叉的情况,那么在高层做了调整之后,这里再做一次调整就可能出现再次重叠的情况,所以改用从底层向上检查)。
    (勘误:这个地方不对,应该还是从上向下遍历,这样从上方调整后,即使下方还有重叠,也可以再次检测并调整)
  • 3.最后确定根节点的位置

3、算法详细介绍

这里我们先定义几个常量:

SiblingSeparation = 4,   //兄弟节点之间的间隔
SubtreeSeparation = 6,   //子树之间的间隔
width = nodeWidth = 2   //节点的宽度

还是以图1所示的树为例,这里先做一遍倒序遍历,初步确定每一个节点的位置:

A:因为A的左节点是null,所以

A.prelim = 0;
A.modifier = 0;

B:

B.prelim = 0;
B.modifier = 0;

C:因为C有左节点B,所以C要在B的基础上做向右的偏移,偏移量为B的prelim加上B的宽度加上间隔大小(这个地方的处理和John Q. Walker II的算法不同,是为了家谱的显示做的更改)。所以

C.prelim = B.prelim + B.width + SiblingSeparation = 0 + 2 + 4 = 6;
C.modifier = 0;

D:因为D是B、C的父节点,同时是A的右兄弟,所以D的位置由A确定后,为了保证D应该在B和C中间,应该对B、C做调整,但是我们并不会再回头对B、C做处理,而是计算D的modifier,以此来表示D的后代需要调整的位置。并且D.modifer应该等于D.prelim – (B.prelim + C.prelim)/2。

D.prelim = A.prelim + A.width +  SiblingSeparation= 0 + 2 + 4 = 6;
D.modifier = D.prelim - (B.prelim + C.prelim) / 2 = 6 - (0 + 6) / 2 = 3;

E:因为E没有左兄弟,所以直接通过A、D来确定位置即可

E.prelim = (A.prelim + D.prelim) / 2 = (0 + 6) / 2  = 3;
E.modifier = 0;

F:

F.prelim = E.prelim + E.width + SiblingSeparation = 3 + 2 + 4 = 9;
F.modifier = 0;

G:

G.prelim = 0;
G.modifier = 0;

H:

H.prelim = 0;
H.modifier = 0;

I:

I.prelim = H.prelim + H.width + SiblingSeparation = 0 + 2 + 4 = 6;
I.modifier = 0;

J:

J.prelim = I.prelim + I.width + SiblingSeparation = 6 + 2 + 4 = 12;
J.modifier = 0;

K:

K.prelim = J.prelim + J.width + SiblingSeparation = 12 + 2 + 4 = 18;
K.modifier = 0;

L:

L.prelim = K.prelim + k.width + SiblingSeparation = 18 + 2 + 4 = 24;
L.modifier = 0;

M:

M.prelim = G.prelim + G.width + SiblingSeparation = 6;
M.modifer = M.prelim - (H.prelim + L.prelim) / 2 = -6;

N:

N.prelim = F.prelim + F.width + SiblingSeparation = 9 + 2 + 4 = 15;
N.modifier = N.prelim - (G.prelim + M.prelim) / 2 =  12;

走到这里我们就不再向上走了,因为根节点的位置肯定是有E和N来最终确定。所以这个时候应该来调整整颗树,检查是否存在节点位置重叠的情况。检查是否重叠是一个递归,所以使用递归检查会降低很多复杂度。但是这里我们走一遍过程,并非是严格按照代码执行过程。

我们先检查最底层,也就是这颗树的第4层:
检查C和H是否有重叠(当然是最右和最左做对比了):
C.X = D.modifier + E.modifier + C.prelim = 3 + 0 + 6 = 9;
H.X = M.modifier + N.modifier + H.prelim = -6 + 12 + 0 = 6;

所以H在C的左边3处,显然是重叠了的。所以需要将H所在的子树全部向右移offset = SubtreeSeparation - (H.X-C.X) = 9;我们一直向上找,直到C和H的祖先是兄弟时,也就是E和N,修改N.prelim、N.modifier就代表将N和其子树全体右移。

N.prelim = N.prelim + offset = 15 + 9 = 24;
N.modifier = N.modifier + offset = 12 + 9 = 21;

这时候E和N之间距离变得更大了,但是E和F的距离与F和N的距离是不同的,为了让树显示的好看一点

F.prelim = F.prelim + offset / 2 = 9 + 9 / 2 = 13.5;

F的子树也应该调整一下(虽然这里并没有)

F.modifier = F.modifier + offset / 2 = 0 + 9 / 2 = 4.5;

我们再往上看,发现已经没有重叠的了。

这时候确定根节点O的位置

O.prelim = (E.prelim + N.prelim) / 2 = (3 + 24) / 2 = 13.5;
O.modifier = 0;

四、如何将树的可视化算法应该到家谱的绘制上。

在之前已经介绍过家谱和树的结构有稍许不同,所以我们尽量将家谱做一些处理,使其变成标准的树结构。

  • 1.配偶问题。
    将某个节点的配偶不当成一个节点看待,而是这个节点的一部分,所以我们只需在有配偶的时候改变节点的width就可以了。
  • 2.不只一个根节点的问题
    既然有不只一个根节点存在,那么我们就自己定义一个虚拟的节点作为根节点,将第一层所有的点的parent都指向这个根节点即可。这样就是一个非常标准的树结构了。

五、源代码

源代码放在了github上,地址为https://github.com/joyme123/candraw

leveldb源代码阅读(三)-memtable的实现

一、序言

在之前的文章中提到,leveldb所有的记录都是由一个个log文件逐渐转变来的,与存储在磁盘上的log文件对应的就是内存中的memtable。memtable和log文件存储的内容是一致的。

二、SkipList

memtable中的键值对是通过SkipList这种数据结构存储的,SkipList相对于普通的List来说,普通的List查找时间复杂度为O(n),而SkipList可以达到O(log(n)),同时,SkipList相对与红黑树,在多线程上,需要锁住的数据量更小,性能更优。

SkipList示意图

比如上图就是一个SkipList结构,如果我们需要搜索45这个点,我们首先在最高层(第二层)找到30,然后发现下一个点是57,已经大于45了,于是来到下一层(第一层)从30开始,向后找,就能找到45了,在这个过程中,我们跳过了很多个点。因此这种数据结构得名SkipList。

2.1 SkipList的基本信息

SkipList是memtable中用到的最主要的数据结构。
SkipList的线程安全特点:写操作需要外部的同步,比如信号量这种。读操作需要保证在读的时候SkipList不会被销毁。除此之外,读操作没有任何的内部锁或者同步。
SkipList的基本原则:
(1)分配节点在SkipList销毁之前不会被删除。因为在代码中我们没有任何的删除节点的操作。
(2)一个节点的内容,除了next/prev指针,在这个节点被存储到SkipList之后其他数据都是不变的。只有Insert()会修改list的内容,初始化一个节点并使用release-stores去将这个节点存放到一个或多个list中需要谨慎对待。

2.2 SkipList的成员函数

SkipList中,公共的成员函数只有两个
void Insert(const Key& key);
bool Contains(const Key& key) const;

顾名思义,Insert用来向SkipList中插入key,Contains用来检查SkipList中是否包含某个key。

私有的成员函数如下:

  Node* NewNode(const Key& key, int height);
  int RandomHeight();
  bool Equal(const Key& a, const Key& b) const { return (compare_(a, b) == 0); }

  // Return true if key is greater than the data stored in "n"
  bool KeyIsAfterNode(const Key& key, Node* n) const;

  // Return the earliest node that comes at or after key.
  // Return NULL if there is no such node.
  //
  // If prev is non-NULL, fills prev[level] with pointer to previous
  // node at "level" for every level in [0..max_height_-1].
  Node* FindGreaterOrEqual(const Key& key, Node** prev) const;

  // Return the latest node with a key < key.
  // Return head_ if there is no such node.
  Node* FindLessThan(const Key& key) const;

  // Return the last node in the list.
  // Return head_ if list is empty.
  Node* FindLast() const;

2.3 SkipList的节点Node实现

// Implementation details follow
template<typename Key, class Comparator>
struct SkipList<Key,Comparator>::Node {
  explicit Node(const Key& k) : key(k) { }

  Key const key;

  // Accessors/mutators for links.  Wrapped in methods so we can
  // add the appropriate barriers as necessary.
  Node* Next(int n) {
    assert(n >= 0);
    // Use an 'acquire load' so that we observe a fully initialized
    // version of the returned Node.
    return reinterpret_cast<Node*>(next_[n].Acquire_Load());
  }
  void SetNext(int n, Node* x) {
    assert(n >= 0);
    // Use a 'release store' so that anybody who reads through this
    // pointer observes a fully initialized version of the inserted node.
    next_[n].Release_Store(x);
  }

  // No-barrier variants that can be safely used in a few locations.
  Node* NoBarrier_Next(int n) {
    assert(n >= 0);
    return reinterpret_cast<Node*>(next_[n].NoBarrier_Load());
  }
  void NoBarrier_SetNext(int n, Node* x) {
    assert(n >= 0);
    next_[n].NoBarrier_Store(x);
  }

 private:
  // Array of length equal to the node height.  next_[0] is lowest level link.
  port::AtomicPointer next_[1];
};

注意到Node实现了两个版本的Next和SetNext,带有NoBarrier_的是多进程不安全的,因为在少数情况下会用到,因此做了额外的实现。另外,可以注意这里只实现了Next()方法,并没有Prev()指向上一个节点,可以知道SkipList并不是一个双向链表,但是SkipList的迭代器是支持Prev()方法的,所以这是一个值得注意的地方。

Node节点的创建是通过NewNode()方法

template<typename Key, class Comparator>
typename SkipList<Key,Comparator>::Node*
SkipList<Key,Comparator>::NewNode(const Key& key, int height) {
  char* mem = arena_->AllocateAligned(
      sizeof(Node) + sizeof(port::AtomicPointer) * (height - 1));
  return new (mem) Node(key);
}

NewNode中使用了arena来分配对齐的内存,分配的内存大小是sizeof(Node)+sizeof(port::AtomicPointer) * (height - 1)。这里可以注意到申请的空间不仅仅是一个Node的大小,还包括sizeof(port::AtomicPointer) * (height – 1),这是因为SkipList是一个多层的结构,所以如果当前Node是3层,那么就需要3个Next指针,在Node的代码中,默认只申请了一个Next指针空间,所以需要额外的多申请(height – 1)个AtomicPointer的空间。

2.4 SkipList的迭代器(Iterator)

SkipList实现了自己的迭代器,迭代器的声明如下:

class Iterator {
   public:
    // Initialize an iterator over the specified list.
    // The returned iterator is not valid.
    explicit Iterator(const SkipList* list);

    // 如果迭代器位于一个有效的节点上,就会返回true.
    bool Valid() const;

    // 返回当前位置的key.
    // REQUIRES: Valid()
    const Key& key() const;

    // 前进到下一个位置
    // REQUIRES: Valid()
    void Next();

    // 后退到上一个位置
    // REQUIRES: Valid()
    void Prev();

    // 移到第一个key >= target的entry
    void Seek(const Key& target);

    // 移到这个list的第一个entry
    // 如果list非空,迭代器的最终状态是Valid()
    void SeekToFirst();

    // 移到这个list的最后一个entry
    // 如果list非空,迭代器的最终状态是Valid()
    void SeekToLast();

   private:
    const SkipList* list_; 
    Node* node_;
    // Intentionally copyable
  };

其中Prev()的实现并不是通过prev指针,因为上文中已经介绍了Node只有next指针。Prev()是通过FindLessThan来实现,找到小于指定Node的最后一个值。这里有一个问题,为什么不使用prev指针,难道不会大大的降低时间复杂度吗?看一下FindLessThan的具体实现。

template<typename Key, class Comparator>
typename SkipList<Key,Comparator>::Node*
SkipList<Key,Comparator>::FindLessThan(const Key& key) const {
  Node* x = head_;
  int level = GetMaxHeight() - 1;
  while (true) {
    assert(x == head_ || compare_(x->key, key) < 0);
    Node* next = x->Next(level);
    if (next == NULL || compare_(next->key, key) >= 0) {
      if (level == 0) {
        return x;
      } else {
        // Switch to next list
        level--;
      }
    } else {
      x = next;
    }
  }
}

其实就是SkipList查找过程的实现。时间复杂度应该是O(log(n)).相比于prev指针的话,在空间占用上是少一点的。具体为什么这样实现还得继续往下看。

SkipList还以同样的方式实现了FindGreaterOrEqualFindLast

template<typename Key, class Comparator>
typename SkipList<Key,Comparator>::Node* SkipList<Key,Comparator>::FindGreaterOrEqual(const Key& key, Node** prev)
    const {
  Node* x = head_;
  int level = GetMaxHeight() - 1;
  while (true) {
    Node* next = x->Next(level);
    if (KeyIsAfterNode(key, next)) {
      // Keep searching in this list
      x = next;
    } else {
      if (prev != NULL) prev[level] = x;
      if (level == 0) {
        return next;
      } else {
        // Switch to next list
        level--;
      }
    }
  }
}

FindGreaterOrEqual的实现还是比较有意思的,在返回比指定key大于或等于的节点的同时,还可以选择性的将该节点的prev指针保存到Node** prev中,返回给用户。其实这也是Iterator中Prev()的实现不采用prev指针的原因,因为有了FindGreaterOrEqual这样的实现,SkipList的内部实现中根本不需要Prev()来将指针向前移动,同时又大大的节约了内存。

下面看一下SkipList的Insert的实现。

void SkipList<Key,Comparator>::Insert(const Key& key) {
  // TODO(opt): We can use a barrier-free variant of FindGreaterOrEqual()
  // here since Insert() is externally synchronized.
  Node* prev[kMaxHeight];
  Node* x = FindGreaterOrEqual(key, prev);

  // Our data structure does not allow duplicate insertion
  assert(x == NULL || !Equal(key, x->key));

  int height = RandomHeight();
  if (height > GetMaxHeight()) {
    for (int i = GetMaxHeight(); i < height; i++) {
      prev[i] = head_;
    }
    //fprintf(stderr, "Change height from %d to %d\n", max_height_, height);

    // It is ok to mutate max_height_ without any synchronization
    // with concurrent readers.  A concurrent reader that observes
    // the new value of max_height_ will see either the old value of
    // new level pointers from head_ (NULL), or a new value set in
    // the loop below.  In the former case the reader will
    // immediately drop to the next level since NULL sorts after all
    // keys.  In the latter case the reader will use the new node.
    max_height_.NoBarrier_Store(reinterpret_cast<void*>(height));
  }

  x = NewNode(key, height);
  for (int i = 0; i < height; i++) {
    // NoBarrier_SetNext() suffices since we will add a barrier when
    // we publish a pointer to "x" in prev[i].
    x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
    prev[i]->SetNext(i, x);
  }
}

首先,Insert方法中没有做任何的多线程同步,所以需要调用者在外部进行写入同步(防止多个线程同步写入)。

插入的过程是先找到需要插入的位置,然后调用RandomHeight()来随机得到当前节点的高度,之后向普通链表一样执行插入操作。

注意到这里有一个多线程读写的问题。也就是当我们改变max_height_的值的时候,并没有改变head_指针。这种情况下有两种可能,一是读到了新的max_height_,但是head_指针还没有更改,这时候指针会立刻下降到下一级,SkipList的多级只是为了提高查找速度,所以在这里并没有其他的副作用;二是正常的情况,不作讨论。

2.Memtable

Memtable的实现主要就是依赖于SkipList,分析了SKipList之后,Memtable的插入,查找操作都是SkipList的插入和查找。

  // Increase reference count.
  void Ref() { ++refs_; }

  // Drop reference count.  Delete if no more references exist.
  void Unref() {
    --refs_;
    assert(refs_ >= 0);
    if (refs_ <= 0) {
      delete this;
    }
  }

Memtable的实现使用了类似于计数器引用的机制,不过这是手动实现的,所以需要用户在使用Memtable时调用Ref()来增加引用计数,引用结束后调用Unref来减少引用计数,当引用计数为0时则销毁自己。

总结

Memtable的设计还是很让人佩服的,至少对我来说是这样。了解到了SkipList这样的数据结构,也了解到内存屏障这种多进程情况下代码执行乱序的问题,还有内存对齐的必要、内存对齐的实现。同时也了解了一些C++编写程序的独特风格(我写的C++一直像Java,反而少了一点C++的美)

leveldb源码阅读(二)—— Varint和Arena的实现

一、Varint

Varint是在leveldb中广泛使用的一种变长的整数类型,Varint其实和unicode的实现非常相似,并且是little-endian。如果当前字节的最高位是1,则表示后面的字节也属于这个整数,如果最高位是0,则表示该整数结束了。所以,一个无符号4个字节的整型数,使用Varint可能以1,2,3,4,5个字节来表示。

比如130:

无符号整型表示起来为:00000000 00000000 00000000 10000010
使用Varint表示为:10000010 00000001             #这里注意Varint是little-endian

因为牺牲了每个字节的最高位作为标志位,所以遇到大的数可能需要5个字节。

某个无符号整型:10000001 00000001 00000001 00000001
使用Varint表示:10000001 10000010 10000100 10001000 00001000

但是平均下来,Varint是绝对会节省大量的存储空间的。

那么leveldb中Varint是如何编码和解码的呢?

char* EncodeVarint32(char* dst, uint32_t v) {
  // Operate on characters as unsigneds
  unsigned char* ptr = reinterpret_cast<unsigned char*>(dst);
  static const int B = 128; //10000000
  if (v < (1<<7)) {  v < 2^7
    *(ptr++) = v;
  } else if (v < (1<<14)) { v < 2^14
    *(ptr++) = v | B;
    *(ptr++) = v>>7;
  } else if (v < (1<<21)) { //v < 2^21
    *(ptr++) = v | B;
    *(ptr++) = (v>>7) | B;
    *(ptr++) = v>>14;
  } else if (v < (1<<28)) {
    *(ptr++) = v | B;
    *(ptr++) = (v>>7) | B;
    *(ptr++) = (v>>14) | B;
    *(ptr++) = v>>21;
  } else {
    *(ptr++) = v | B;
    *(ptr++) = (v>>7) | B;
    *(ptr++) = (v>>14) | B;
    *(ptr++) = (v>>21) | B;
    *(ptr++) = v>>28;
  }
  return reinterpret_cast<char*>(ptr);
}

上面这一段对uint32_t的变量进行Varint32编码的例子,编码的结果保存在dst里,返回的ptr是dst的最后一个字节的指针。使用if-else对1、2、3、4、5个字节的情况分别做了处理。主要就是移位和或运算符的使用。跟着代码走一遍就能理解。

const char* GetVarint32PtrFallback(const char* p,
                                   const char* limit,
                                   uint32_t* value) {
  uint32_t result = 0;
  for (uint32_t shift = 0; shift <= 28 && p < limit; shift += 7) {
    uint32_t byte = *(reinterpret_cast<const unsigned char*>(p));
    p++;
    if (byte & 128) {
      // More bytes are present
      result |= ((byte & 127) << shift);
    } else {
      result |= (byte << shift);
      *value = result;
      return reinterpret_cast<const char*>(p);
    }
  }
  return NULL;
}
inline const char* GetVarint32Ptr(const char* p,
                                  const char* limit,
                                  uint32_t* value) {
  if (p < limit) {
    uint32_t result = *(reinterpret_cast<const unsigned char*>(p));
    if ((result & 128) == 0) {
      //如果低8位是0xxxxxxx,little-endian,也就是小于128
      *value = result;    //直接赋值
      return p + 1;
    }
  }
  return GetVarint32PtrFallback(p, limit, value);
}

上面GetVarint32Ptr是解码Varint32,当值大于128时,则调用GetVarint32PtrFallback。在则调用GetVarint32PtrFallback中,利用循环一个字节一个字节的取出,最终将value指针指向结果result。

二、Arena

Arena是leveldb中管理内存分配的类。所有的内存分配都通过Arena申请,可以根据申请的内存大小使用不同的内存分配策略,也可以避免过多的内存碎片问题,并且在内存释放时统一使用Arena来释放,方便管理。

Arena类的实现并不复杂。首先看一下成员变量。

  // Allocation state
  char* alloc_ptr_;                 //指向当前块中剩余的内存起点
  size_t alloc_bytes_remaining_;    //当前块中剩余的内存

  // Array of new[] allocated memory blocks
  std::vector<char*> blocks_;       //用来保存所有new出来的char数组,释放内存时也会使用到

  // Total memory usage of the arena.
  port::AtomicPointer memory_usage_;    //通过Arena申请的内存

然后就是public的成员函数:

// Return a pointer to a newly allocated memory block of "bytes" bytes.
  char* Allocate(size_t bytes);

  // Allocate memory with the normal alignment guarantees provided by malloc
  char* AllocateAligned(size_t bytes);

  // Returns an estimate of the total memory usage of data allocated
  // by the arena.
  size_t MemoryUsage() const {
    return reinterpret_cast<uintptr_t>(memory_usage_.NoBarrier_Load());
  }

功能上很简单,Allocate用来申请指定大小的内存,AllocateAligned用来申请保证内存对齐的内存空间,MemoryUsage用来获取内存使用情况。

我对内存对齐的理解:现在假定内存对齐的最小单位是8个字节,那么如果你只申请14个字节,内存对齐会多给你分配2个字节凑成16个字节。内存对齐的好处就是访问速度的提升,在CPU一次读取8个字节的情况下。在没有内存对齐的情况下,你要读取第6~12个字节,就需要读取两次,第一次0~7个字节,通过移位取出6~7,再读取第8~15个字节,取出8~12。这样就需要读取两次。而如果在内存对齐的情况下,你需要读取的6~12个字节应该被放在8~15个字节中。只需读取一次即可。

私有的成员函数有:

char* AllocateFallback(size_t bytes);
char* AllocateNewBlock(size_t block_bytes);

这两个函数提供了具体的内存分配的实现。

具体分析这5个成员函数。

1.Allocate(size_t bytes)

inline char* Arena::Allocate(size_t bytes) {
  // The semantics of what to return are a bit messy if we allow
  // 0-byte allocations, so we disallow them here (we don't need
  // them for our internal use).
  assert(bytes > 0);
  if (bytes <= alloc_bytes_remaining_) {
    char* result = alloc_ptr_;
    alloc_ptr_ += bytes;
    alloc_bytes_remaining_ -= bytes;
    return result;
  }
  return AllocateFallback(bytes);
}

Allocate是一个内联函数。内联函数的好处在于编译时会直接在调用处展开,对于程序执行速度有一定提升,但是也会导致程序大小变大。
assert(bytes > 0);是一个断言语句,断言语句可以在编译时由编译器去除,但是在调试的时候可以帮助程序员发现问题所在。这里用来保证没有出现0个字节分配的情况。
后面的判断语句用来判断当前块的剩余空间是否够分配,如果够则分配。如果不够则交给AllocateFallback()处理。返回的是分配的空间起点。

2.AllocateAligned(size_t bytes)

char* Arena::AllocateAligned(size_t bytes) {
  const int align = (sizeof(void*) > 8) ? sizeof(void*) : 8;
  assert((align & (align-1)) == 0);   // Pointer size should be a power of 2
  size_t current_mod = reinterpret_cast<uintptr_t>(alloc_ptr_) & (align-1);
  size_t slop = (current_mod == 0 ? 0 : align - current_mod);
  size_t needed = bytes + slop;
  char* result;
  if (needed <= alloc_bytes_remaining_) {
    result = alloc_ptr_ + slop;
    alloc_ptr_ += needed;
    alloc_bytes_remaining_ -= needed;
  } else {
    // AllocateFallback always returned aligned memory
    result = AllocateFallback(bytes);
  }
  assert((reinterpret_cast<uintptr_t>(result) & (align-1)) == 0);
  return result;
}

AllocateAligned(size_t bytes)用来做内存对齐的分配。
const int align = (sizeof(void*) > 8) ? sizeof(void*) : 8;用来获取分配的最小单位,如果指针大小大于8,则取指针大小,否则使用8作为最小的对齐单位。
assert((align & (align-1)) == 0);这是一个很有意思的判断,可以保证只有2的n次方才能满足。

size_t current_mod = reinterpret_cast<uintptr_t>(alloc_ptr_) & (align-1);current_mod是余数,比如当前指针地址是15,align是8,那么余数为7,这个余数就是alloc_prt_&(align-1)的值。
size_t slop = (current_mod == 0 ? 0 : align - current_mod);因为当前指针可能不是align的整数倍,slop即代表当前指针需要偏移的大小,来保证是align的整数倍。
size_t needed = bytes + slop; needed代表需要分配的空间。

之后的判断则和Allocate()函数做法相同了。需要注意的是返回的result指针是align的整数倍,这样就能保证以最少的内存读取次数来获取想要的数据。

3.AllocateFallback(size_t bytes)

char* Arena::AllocateFallback(size_t bytes) {
  if (bytes > kBlockSize / 4) {
    //如果需要分配的bytes大于块大小的1/4,则单独在新的块中分配,以此避免浪费过多的剩下的空间(这样能保证浪费的空间小与kBlockSize / 4)
    char* result = AllocateNewBlock(bytes);
    return result;
  }

  // 重新开辟新的块空间进行分配,上一个块剩下的空间都浪费了。
  alloc_ptr_ = AllocateNewBlock(kBlockSize);
  alloc_bytes_remaining_ = kBlockSize;

  char* result = alloc_ptr_;
  alloc_ptr_ += bytes;
  alloc_bytes_remaining_ -= bytes;
  return result;
}

上面两个成员函数在当前块分配空间不足时,都将分配任务交给AllocateFallback处理。

4.AllocateNewBlock(size_t block_bytes)。

char* Arena::AllocateNewBlock(size_t block_bytes) {
  char* result = new char[block_bytes];
  blocks_.push_back(result);
  memory_usage_.NoBarrier_Store(
      reinterpret_cast<void*>(MemoryUsage() + block_bytes + sizeof(char*)));
  return result;
}

使用new char来申请一个新的块,并将块的起始指针存入blocks_中。更新memory_usage的大小。

5.MemoryUsage()

“`
size_t MemoryUsage() const {
return reinterpret_cast<uintptr_t>(memory_usage_.NoBarrier_Load());
}
“`
获取memory_usage_中存储的使用空间数据。

三、Memeory Barrier(内存屏障)

memory_usage_的类型是AtomicPointer,这是一个原子类型,针对不同平台有不同的实现。可以用来线程安全的存储和获取值。

AtomicPointer类实现了几个函数:

NoBarrier_Load();
NoBarrier_Store(void* v)
Acquire_Load();
Release_Store(void* v)

这里面涉及到一个Memory Barrier(内存屏障,Memory fence)的概念。

引用一个wikipedia的例子:

Processor #1:

while (f 0);
// Memory fence required here
print x;

Processor #2:

x = 42;
// Memory fence required here
f = 1;

一个两核心的CPU按照上述例子执行。可能会出现很多种情况:
虽然我们希望的是打印“42”。但是如果核心#2存储操作乱序执行(out-of-order),就有可能f在x之前被更新了。这样打印语句就可能打印出“0”。同样的,核心#1读取操作可能也乱序执行,x就有可能在f被判断之前读取了,打印语句可能打印出一个无法预期的值。对于大多数程序来说,上面的这些情况都是不可以接受的。使用memory barrier可以避免out-of-order的问题。

简而言之就是,在多CPU共享内存空间的情况下,两条语句的执行顺序已经无法保证了。需要memory barrier来保证执行顺序的正确。

当然,单CPU是不会出现这个问题的。

现在再来解释上面的4个函数的作用。

NoBarrier_Load();           //不使用内存屏障的读取
NoBarrier_Store(void* v)    //不使用内存屏障的存储
Acquire_Load();             //使用内存屏障的读取
Release_Store(void* v)      //使用内存屏障的存储

leveldb源码阅读(一)-log读取和写入

leveldb源码阅读(一)-log读取和写入

标签(空格分隔): leveldb log


一、 引言

leveldb中,所有对数据库的操作都会记录到log中,当log文件达到预定义的大小后,就会被转换成数据表(sstable),所以log文件的生成和读取算是leveldb核心部分的第一步。
这篇文章涉及到的leveldb的源代码文件有:

db/log_format.h
db/log_reader.h
db/log_reader.cc
db/log_writer.h
db/log_writer.cc
leveldb/slice.h
leveldb/status.h

二、日志存储格式——log_format.h

在之前的文章中,了解到log是由一个个block组成,每个block是32KB的大小。

block := record* trailer?
record :=
  checksum: uint32     // crc32c of type and data[] ; little-endian
  length: uint16       // little-endian
  type: uint8          // One of FULL, FIRST, MIDDLE, LAST
  data: uint8[length]

每个block中存储的是一条条的记录,如果一个block的末尾最后剩下小于等于6bytes的空间,就会使用一个trailer填充而不是用新的记录填充,这个trailer都是由0字节组成。一条记录包括一个crc32c生成的校验码checksum、记录的长度length、记录的类型type、数据。

如果一条记录跨越了多个block,就会被分段,分段类型有3种——First、Middle、Last

关于这部分的实现是在log_format.h中

enum RecordType {
  //0是保留字,用来表示预分配的文件
  kZeroType = 0,

  kFullType = 1,

  // 一条记录被分段的3种类型
  kFirstType = 2,
  kMiddleType = 3,
  kLastType = 4
};
//定义了最大的记录类型值
static const int kMaxRecordType = kLastType;
//定义了最大的`block`大小
static const int kBlockSize = 32768;

// 记录头是  checksum (4 bytes), length (2 bytes), type (1 byte).总共7bytes
static const int kHeaderSize = 4 + 2 + 1;

三、日志的写入

在log_writer代码中,实现了Writer类,最主要的就是一个AddRecord(const Slice& slice)

Writer类有两个构造函数,对于只传了一个参数——WritableFile指针的构造函数,使用了explicit显示构造,这样可以防止隐式转换。另外将Writer(const Writer&)void operator=(const Writer&)放在private域,防止拷贝赋值。

Writer有三个成员变量 WritableFile* dest_用来保存可写的文件,int block_offset_用来保存当前块的偏移量, uint32_t type_crc_[kMaxRecordType + 1]用来保存5种记录类型的crc32c计算结果。这里会提前计算好保存进这个数组来减少存储时计算的负载。

接下来就是最重要的AddRecord(const Slice& slice)了。Slice是leveldb中用来保存key或者value的值的,包含一个指针,一个长度。

Status Writer::AddRecord(const Slice& slice) {
  const char* ptr = slice.data();   //取出数据的指针
  size_t left = slice.size();       //取出数据的长度

  // 如果有必要的话,对记录分段,并保存.
  // 注意如果slice是空的,我们也会迭代一次,保存一个长度为0的记录
  Status s;     //用来保存这次操作的结果
  bool begin = true;    //是否是一条记录的开始
  do {
    //计算出当前块剩下的空间
    const int leftover = kBlockSize - block_offset_;
    //使用assert进行断言,这边是一定会>=0的,assert这里只是为了帮助调试程序,实际使用时在编译时通过参数去除assert语句
    assert(leftover >= 0);

    //如果剩下的空间已经不足一条记录的header了
    if (leftover < kHeaderSize) {
      // 切换到新的块
      if (leftover > 0) {
        // 如果剩下的空间>0,需要使用\x00填充
        assert(kHeaderSize == 7);
        dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover));
      }
      //新的块偏移量是0
      block_offset_ = 0;
    }

    // 不变的道理:我们绝对不会让剩下的空间<kHeaderSize.
    assert(kBlockSize - block_offset_ - kHeaderSize >= 0);

    //当前还可用的空间
    const size_t avail = kBlockSize - block_offset_ - kHeaderSize;

    //记录片段的大小,取可用的空间大小和记录的大小的最小值
    const size_t fragment_length = (left < avail) ? left : avail;

    //当前的记录类型
    RecordType type;

    //如果记录的大小和片段的长度是一样的,表示到达了记录的末尾为true,否则false
    const bool end = (left == fragment_length);

    //记录类型的判断
    if (begin && end) {
      type = kFullType;
    } else if (begin) {
      type = kFirstType;
    } else if (end) {
      type = kLastType;
    } else {
      type = kMiddleType;
    }

    //写入到物理存储中,
    s = EmitPhysicalRecord(type, ptr, fragment_length);
    ptr += fragment_length;     //数据指针右移一个片段长度
    left -= fragment_length;    //数据的长度减去一个片段的长度
    begin = false;              //不再是开始了
  } while (s.ok() && left > 0); //如果没有出错,并且数据还剩就再循环

  return s;
}

AddRecord(const Slice& slice)调用了一个私有成员函数EmitPhysicalRecord(RecordType t, const char* ptr, size_t n),看一看这个将记录写入持久化存储是如何实现的。

Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) {
  assert(n <= 0xffff);  // n必须可以用2KB表示,因为一条记录的length是uint16大小的,little-endian
  assert(block_offset_ + kHeaderSize + n <= kBlockSize);

  // 对记录的header进行格式化
  char buf[kHeaderSize];
  //取低位字节
  buf[4] = static_cast<char>(n & 0xff);
  //取高位字节
  buf[5] = static_cast<char>(n >> 8);
  //一位的记录类型
  buf[6] = static_cast<char>(t);

  // 计算记录类型的crc值和有效载荷
  uint32_t crc = crc32c::Extend(type_crc_[t], ptr, n);
  crc = crc32c::Mask(crc);                 // Adjust for storage
  //填充4位的校验值
  EncodeFixed32(buf, crc);

  // 将记录的header写入
  Status s = dest_->Append(Slice(buf, kHeaderSize));
  if (s.ok()) {
    //将记录的数据写入
    s = dest_->Append(Slice(ptr, n));
    if (s.ok()) {
      //flush一下
      s = dest_->Flush();
    }
  }

  //块的偏移量右移header和数据的长度和
  block_offset_ += kHeaderSize + n;
  return s;
}

上面的代码分析其实就是log部分逻辑上的实现,将一条记录格式化成设计好的格式。这里有一个疑问就是dest_->Append(Slice& slice)dest_->Flush()是如何工作的。所以继续追踪下去,到include/leveldb/env.h中:

// A file abstraction for sequential writing.  The implementation
// must provide buffering since callers may append small fragments
// at a time to the file.
class WritableFile {
 public:
  WritableFile() { }
  virtual ~WritableFile();

  virtual Status Append(const Slice& data) = 0;
  virtual Status Close() = 0;
  virtual Status Flush() = 0;
  virtual Status Sync() = 0;

 private:
  // No copying allowed
  WritableFile(const WritableFile&);
  void operator=(const WritableFile&);
};

这部分是WritableFile的抽象部分,Append、Close、Flush、Sync都是纯虚函数,具体实现必须由用户自己去做。并且实现时必须提供缓冲区,这样调用者可以append小的数据段到文件中。

实际上leveldb中也有默认(posix)的实现。在util/env/env_posix.cc中。目前只看Append和Flush具体实现:

  virtual Status Append(const Slice& data) {
    size_t r = fwrite_unlocked(data.data(), 1, data.size(), file_);
    if (r != data.size()) {
      return IOError(filename_, errno);
    }
    return Status::OK();
  }
  virtual Status Flush() {
    if (fflush_unlocked(file_) != 0) {
      return IOError(filename_, errno);
    }
    return Status::OK();
  }

fwrite_unlocked和fflush_unlocked都是define的fwrite和fflush方法。

所以这里的设计是很好的案例。通过将WritableFile抽象化,提供给用户自己来实现,从而很轻松的就可以在多平台间移植。

四、日志的读取

在log_reader中,实现了Reader类,用来读取日志文件。
Reader的构造函数定义如下:

Reader(SequentialFile* file, Reporter* reporter, bool checksum,
         uint64_t initial_offset);

file是代表要读取的文件,reporter是用来报告日志文件中发现错误的,checksum是当前文件的校验码,initial_offset是当前文件中开始读取的物理位置。

Reader类中私有的成员变量和函数需要注意的有:

  SequentialFile* const file_;      //要读取的文件
  Reporter* const reporter_;        //报告错误的对象
  bool const checksum_;             //校验值
  char* const backing_store_;       //备份存储
  Slice buffer_;                    //缓冲
  bool eof_;   // Last Read() indicated EOF by returning < kBlockSize

  // Offset of the last record returned by ReadRecord.
  uint64_t last_record_offset_;         //最后一个Record的偏移量
  // Offset of the first location past the end of buffer_.
  uint64_t end_of_buffer_offset_;       //

  // Offset at which to start looking for the first record to return
  uint64_t const initial_offset_;   //第一个记录的初始偏移量

  // True if we are resynchronizing after a seek (initial_offset_ > 0). In
  // particular, a run of kMiddleType and kLastType records can be silently
  // skipped in this mode
  bool resyncing_;      //在一次查找后我们再同步则是True。特别的,kMiddleType和kLastType会被跳过   

  // Extend record types with the following special values
  enum {
    kEof = kMaxRecordType + 1,
    // 无效的记录值,可能有以下情况
    // * 具有无效的CRC值(checksum不正确)(ReadPhysicalRecord reports a drop)
    // * 长度为0的记录 (No drop is reported)
    // * The record is below constructor's initial_offset (No drop is reported)
    kBadRecord = kMaxRecordType + 2
  };

  bool SkipToInitialBlock();        //跳过直到initial_offset的位置,成功则返回true
  unsigned int ReadPhysicalRecord(Slice* result);   //读取Record,返回记录类型,或者kEof、kBadRecord

  void ReportCorruption(uint64_t bytes, const char* reason);    //汇报丢弃的字节,以及原因
  void ReportDrop(uint64_t bytes, const Status& reason);        //汇报丢弃的字节,以及原因

这些成员函数中着重看一下ReadPhysicalRecord

unsigned int Reader::ReadPhysicalRecord(Slice* result) {
  while (true) {
    if (buffer_.size() < kHeaderSize) {
      if (!eof_) {
        // Last read was a full read, so this is a trailer to skip
        buffer_.clear();
        Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
        end_of_buffer_offset_ += buffer_.size();
        if (!status.ok()) {
          buffer_.clear();
          ReportDrop(kBlockSize, status);
          eof_ = true;
          return kEof;
        } else if (buffer_.size() < kBlockSize) {
          eof_ = true;
        }
        continue;
      } else {
        // Note that if buffer_ is non-empty, we have a truncated header at the
        // end of the file, which can be caused by the writer crashing in the
        // middle of writing the header. Instead of considering this an error,
        // just report EOF.
        buffer_.clear();
        return kEof;
      }
    }

    // Parse the header
    const char* header = buffer_.data();
    const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff;
    const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff;
    const unsigned int type = header[6];
    const uint32_t length = a | (b << 8);
    if (kHeaderSize + length > buffer_.size()) {
      size_t drop_size = buffer_.size();
      buffer_.clear();
      if (!eof_) {
        ReportCorruption(drop_size, "bad record length");
        return kBadRecord;
      }
      // If the end of the file has been reached without reading |length| bytes
      // of payload, assume the writer died in the middle of writing the record.
      // Don't report a corruption.
      return kEof;
    }

    if (type == kZeroType && length == 0) {
      // Skip zero length record without reporting any drops since
      // such records are produced by the mmap based writing code in
      // env_posix.cc that preallocates file regions.
      buffer_.clear();
      return kBadRecord;
    }

    // Check crc
    if (checksum_) {
      uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header));
      uint32_t actual_crc = crc32c::Value(header + 6, 1 + length);
      if (actual_crc != expected_crc) {
        // Drop the rest of the buffer since "length" itself may have
        // been corrupted and if we trust it, we could find some
        // fragment of a real log record that just happens to look
        // like a valid log record.
        size_t drop_size = buffer_.size();
        buffer_.clear();
        ReportCorruption(drop_size, "checksum mismatch");
        return kBadRecord;
      }
    }

    buffer_.remove_prefix(kHeaderSize + length);

    // Skip physical record that started before initial_offset_
    if (end_of_buffer_offset_ - buffer_.size() - kHeaderSize - length <
        initial_offset_) {
      result->clear();
      return kBadRecord;
    }

    *result = Slice(header + kHeaderSize, length);
    return type;
  }
}

Reader类中公共的成员函数需要注意的有:
bool ReadRecord(Slice* record, std::string* scratch);
uint64_t LastRecordOffset();

ReadRecord用来读取下一个要读取的Record,如果读取成功,返回true,存储在record中,如果读到了结尾没有一个完整的Record,则存储在scratch中

bool Reader::ReadRecord(Slice* record, std::string* scratch) {
  if (last_record_offset_ < initial_offset_) {
    if (!SkipToInitialBlock()) {    //跳到初始的偏移位置
      return false;
    }
  }

  scratch->clear();
  record->clear();
  bool in_fragmented_record = false;
  // Record offset of the logical record that we're reading
  // 0 is a dummy value to make compilers happy
  uint64_t prospective_record_offset = 0;

  Slice fragment;
  while (true) {
    const unsigned int record_type = ReadPhysicalRecord(&fragment);

    // ReadPhysicalRecord may have only had an empty trailer remaining in its
    // internal buffer. Calculate the offset of the next physical record now
    // that it has returned, properly accounting for its header size.
    uint64_t physical_record_offset =
        end_of_buffer_offset_ - buffer_.size() - kHeaderSize - fragment.size();

    if (resyncing_) {
      if (record_type == kMiddleType) {
        continue;
      } else if (record_type == kLastType) {
        resyncing_ = false;
        continue;
      } else {
        resyncing_ = false;
      }
    }

    switch (record_type) {
      case kFullType:
        if (in_fragmented_record) {
          // Handle bug in earlier versions of log::Writer where
          // it could emit an empty kFirstType record at the tail end
          // of a block followed by a kFullType or kFirstType record
          // at the beginning of the next block.
          if (scratch->empty()) {
            in_fragmented_record = false;
          } else {
            ReportCorruption(scratch->size(), "partial record without end(1)");
          }
        }
        prospective_record_offset = physical_record_offset;
        scratch->clear();
        *record = fragment;
        last_record_offset_ = prospective_record_offset;
        return true;

      case kFirstType:
        if (in_fragmented_record) {
          // Handle bug in earlier versions of log::Writer where
          // it could emit an empty kFirstType record at the tail end
          // of a block followed by a kFullType or kFirstType record
          // at the beginning of the next block.
          if (scratch->empty()) {
            in_fragmented_record = false;
          } else {
            ReportCorruption(scratch->size(), "partial record without end(2)");
          }
        }
        prospective_record_offset = physical_record_offset;
        scratch->assign(fragment.data(), fragment.size());
        in_fragmented_record = true;
        break;

      case kMiddleType:
        if (!in_fragmented_record) {
          ReportCorruption(fragment.size(),
                           "missing start of fragmented record(1)");
        } else {
          scratch->append(fragment.data(), fragment.size());
        }
        break;

      case kLastType:
        if (!in_fragmented_record) {
          ReportCorruption(fragment.size(),
                           "missing start of fragmented record(2)");
        } else {
          scratch->append(fragment.data(), fragment.size());
          *record = Slice(*scratch);
          last_record_offset_ = prospective_record_offset;
          return true;
        }
        break;

      case kEof:
        if (in_fragmented_record) {
          // This can be caused by the writer dying immediately after
          // writing a physical record but before completing the next; don't
          // treat it as a corruption, just ignore the entire logical record.
          scratch->clear();
        }
        return false;

      case kBadRecord:
        if (in_fragmented_record) {
          ReportCorruption(scratch->size(), "error in middle of record");
          in_fragmented_record = false;
          scratch->clear();
        }
        break;

      default: {
        char buf[40];
        snprintf(buf, sizeof(buf), "unknown record type %u", record_type);
        ReportCorruption(
            (fragment.size() + (in_fragmented_record ? scratch->size() : 0)),
            buf);
        in_fragmented_record = false;
        scratch->clear();
        break;
      }
    }
  }
  return false;
}

leveldb文档翻译(3)-leveldb的日志格式和表格式

日志文件的格式

日志文件的内容是一个32KB大小的块的序列。唯一可能的意外是日志文件结尾可能包含一个不完整的块。

每一个块由以下记录组成

block := record* trailer?
record :=
  checksum: uint32     // crc32c of type and data[] ; little-endian
  length: uint16       // little-endian
  type: uint8          // One of FULL, FIRST, MIDDLE, LAST
  data: uint8[length]

(注:crc32c是一种检验算法)

一条记录绝不会在块的最后6个字节内开始(因为剩下的6个字节已经不够大,checksum,length,type总共需要7个字节)。任何留下的字节形成trailer(拖车?),这部分必须完全是0字节,读取的时候必须跳过。

注:如果当前块恰好有7个字节留下来,一个新的非0长度的记录被增加,写入必须用一个FIRST记录(包含0字节的用户数据,FIRST用来表示这是一条记录的开始部分)去填充块的最后7个字节,之后将所有的用户数据写在后来的块中。

更多的类型可能在以后会加入进来。一些读取程序可能会跳过它们不认识的记录类型,其他读取程序可能会报告有些数据被跳过了。

FULL == 1
FIRST == 2
MIDDLE == 3
LAST == 4

FULL记录包含一个完整的用户记录的内容

FIRST,MIDDLE,LAST是用来表示用户记录被分成多个片段(尤其是块的边界)。FIRST是用户记录的第一个片段,LAST是用户记录的最后一个片段,MIDDLE是所有用户数据的内部片段。

例如:有以下的用户数据

A: length 1000
B: length 97270
C: length 8000

A将会在第一块中存储为FULL记录。

B将会划分成3段:第一段占用第一块剩下的部分,第二段占用整个第二块,第三段占用第三块的前面部分。这将会在第三块中留下6个字节的空间,被留作空白当成trailer。

C将会在第四块中存储为一个FULL记录。

这种记录格式的优点

1.我们不需要任何的同步的启发式算法-直接跳到下一个块的边界并且扫描。如果有一个错误发生,跳到下一块。作为一个边界效益,当一个日志文件的内容的一部分被作为一个记录嵌入到另外一个日志文件中,我们不会变的困惑。

2.在近似边界上分割(比如mapreduce)是简单的:找到下一个块的边界,跳过记录直到我们遇到一个FULL或FIRST记录。

3.对于大的记录我们不需要额外的缓冲区。

对比这个记录格式的缺点

1.没有对微小的记录的包装。这可能通过添加新的记录类型来修复,因此它只是当前实现的短处,不是这种格式的必须。

2.没有压缩。同样的,这也可以通过添加新的记录格式来修复。

表文件的格式

<beginning_of_file>
[data block 1]
[data block 2]
...
[data block N]
[meta block 1]
...
[meta block K]
[metaindex block]
[index block]
[Footer]        (fixed size; starts at file_size - sizeof(Footer))
<end_of_file>

这个文件包含内部的指针。每一个指针都被称作BlockHandle,包含下面的信息:

offset:   varint64
size:     varint64

参考varints,里面有varint64格式的解释。

1.键值对序列是有序存储的,被分成多个数据块。这些数据块从文件开头一个接着一个。每一个数据块被block_builder.cc中的代码格式化,之后有选择的进行压缩。

2.在数据块之后我们存储一束元块。支持的元块类型在下面描述了。更多的元块类型可能在以后会加进来。每一个元块类型也是用block_builder.cc格式化,然后之后被有选择的压缩。

3.一个“元索引”块。它包含每一个其他的元块的入口,入口的键是这个元块的名字,值是一个BlockHandle指向元块。

4.一个“索引”块。这个块包含每个数据块的入口,键是一个大于等于当前数据块的最后一个键的字符串,在连续的数据块的第一个键之前。值是这个数据块的BlockHandle。

5.在文件的最后面是一个填充满长度的脚部,包含了元索引块和索引块的BlockHandle,还有一个魔术数字。

    metaindex_handle: char[p];     // Block handle for metaindex
    index_handle:     char[q];     // Block handle for index
    padding:          char[40-p-q];// zeroed bytes to make fixed length
                                   // (40==2*BlockHandle::kMaxEncodedLength)
    magic:            fixed64;     // == 0xdb4775248b80fb57 (little-endian)

“filter”元块

在数据库打开时如果 FilterPolicy被指定,每个表都会有一个filter块。“元索引”块包含一个入口,将filter.<N>指向了 BlockHandle,对于filter块,<N>是filter 策略的
Name() 方法返回的。

filter块存储了一系列的filters,filter i包含了FilterPolicy::CreateFilter()在所有存储在文件偏移落入到范围[ ibase … (i+1)base-1 ]的块中键的输出。

当前,”base”是2KB,对这个例子来说,如果块X和Y开始在范围 [ 0KB .. 2KB-1 ],所有的在X和Y之间的键将会通过调用FilterPolicy::CreateFilter()被转换成一个filter,得到的filter将会被存储成当前filter块的第一个filter。

filter block是以下格式的:

[filter 0]
[filter 1]
[filter 2]
...
[filter N-1]

[offset of filter 0]                  : 4 bytes
[offset of filter 1]                  : 4 bytes
[offset of filter 2]                  : 4 bytes
...
[offset of filter N-1]                : 4 bytes

[offset of beginning of offset array] : 4 bytes
lg(base)                              : 1 byte

filter块的最后的偏移数组允许高效地mapping一个数据块偏移到相应的filter。

“统计”元块

这个元块包含一束统计。键是统计项的名字。值包含着统计内容。

TODO(预览):记录了以下的统计

data size
index size
key size (uncompressed)
value size (uncompressed)
number of entries
number of data blocks

leveldb文档翻译(2)-leveldb的实现

这部分是来自于leveldb的impl.md文档,详细介绍了leveldb实现方面的设计。

1、文件

leveldb其实是Bigtable的简单的实现。Bigtable论文地址:http://research.google.com/archive/bigtable.html。当然,文件的组织方法和bigtable有一些不同之处,下面会逐个介绍。

1.1 日志文件

一个日志文件(*.log)存储一系列的最近的更新。每一个更新都会添加到当前日志文件的最后。当所有日志文件达到了预定义的大小(默认是4M),它会转换成一个有序的表(参见下面介绍),一个新的日志文件被创建提供给之后的更新。

当前日志文件的副本是保存在一个内存结构中(称为memtable)。每一次读都会向这个memtable中查询,所以读操作其实还是映射到所有记录下来的更新中。

2.有序表

一个有序表(*.ldb)存储了一系列的由键排序的键值对。每个键值对要么键对应着值,要么是键对应着一个删除标记。(删除标记保存在旧的排序表中存储着过时的值)。

排序表的集合是由一系列的等级(level)组织的。排序表如果是由一个日志文件生成,就会被置为一个特别的younglevel(也被称为level-0)。当young文件的数量超过了某个特定的阈值(当前是4),所有的young level文件和所有键范围重叠的level-1文件被合并在一起产生一系列的新的level-1文件(我们每2MB的数据创建一个新的level-1文件)

属于young level的文件可能自己相互包含重叠的键,但属于其他等级的文件有唯一的不重叠的键范围。level-L(L >= 1),当level-L所有的文件大小超过了(10^L)MB(例如,level-1是10MB,level2是100MB),在level-L中取一个文件,和所有在level-(L+1)中与之重叠的文件,合并成level-(L+1)的新的文件集合。这些合并使得新的更新逐渐从young-level的文件转移到最大的等级,这个过程只需要批量的读写操作即可完成(注:最小的时间复杂度)

2.1 Manifest

一个MANIFEST文件中个列举了由各个level组成的有序表的集合,包含了相应的键范围,以及其他重要的元数据。任何时候数据库被打开,一个新的MANIFEST文件(文件名中有一个新的数字代表)被创建。MANIFEST文件和log文件的格式相同,任何发生的改变都会被添加到这个文件中(比如文件添加或者移除)。

2.2 Current

CURRENT是一个简单的文本文件,包含了最新的MANIFEST文件的名字。

2.3 信息日志(Info logs)

信息消息被打印到LOG和LOG.old文件中

2.4 其他

其他文件用来记录一些杂项。(LOCK,*.dbtmp)

3.Level 0

当一个日志文件增长到一个特定的值(默认1MB):
创建一个新的memtable和log文件,并将未来的更新放在这里。
在后台的操作:
将上一个memtable的内容写入到sstable
丢弃上一个memtable
删除旧的日志文件和旧的memtable
将新的sstable添加到young(level 0) level。

4.压实(不是对数据进行压缩,只是将数据整合到一起,并且在这个过程中会去除重复键以及删除的键)

当level L超出了文件大小的限制,我们在另外一个后台线程中压实它。这个压实操作选择level L中的一个文件以及来自下一个level L+1的所有与之重叠的文件。注意:如果一个level-L文件只和一个level-(L+1)文件部分重叠,这个文件的所有部分都被作为压实的输入,然后在压实之后被丢弃。例外:因为level-0是特别的(level-0的文件可能彼此重叠),我们特殊对待从level-0到level-1的压实:当这些level-0文件彼此重叠时,一个level-0的压实会选择多个level-0的文件。

一个压实操作合并选择的文件的内容,产生一系列的level-(L+1)文件。在当前的输出文件超过目标文件大小(2MB)时,我们切换到一个产生的新的level-(L+1)文件。在当前输出文件的键范围增长到足够大,以致于和超过10个level-(L+2)的文件产生重叠时,我们同样切换到产生的新的输出文件。最后一条准则保证了之后的一次level-(L+1)文件压实不会从level-(L+2)中选择太多的数据。

旧的文件被丢弃,新的文件被加入到服务状态。

特定level的压实通过键空间的压实。详细来说,对于每个level L,我们记住最后一次level L的压实的结尾的键。下一次level L的压实将会选择在这个键之后的第一个文件(如果没有这个文件,就从键空间的起点环绕。

压实会丢弃覆盖的值,如果没有更大的数字level包含一个范围重叠当前键的文件,也会丢弃删除标记。

4.1 时间复杂度

level-0的压实将会从level-0中读取最多4个1MB的文件,在最坏情况下读取所有的level-1文件(10MB)。这样一来,我们将会读取14MB,写入14MB。

对于其他等级的压实,我们将会从level L中选择一个2MB的文件。最坏的情况下,将会从level L+1中重叠大概12个文件(10个文件是因为level-(L+1)是level-L的10倍大,其他在两边的两个文件是因为level L的文件范围通常和level-(L+1)的文件范围是不对齐的)。压实因此会读取26MB,写入26MB。假设磁盘IO速度为100MB/s,最差的压实大概消耗0.5s

如果我们减慢后台写入,只取全速100MB/s的10%,一次压实可能耗时5s。如果用户的写入速度为10MB/s,我们可能创建许多level-0的文件(大概5*10MB总共50MB)。这将明显的增加读取消耗,因为每次读取合并了更多的文件增加了负载。

解决方案1:为了减轻这个问题,我们可能希望当level-0文件数很大的时候,增加日志切换的阈值。虽然缺点是这个阈值越大,需要越多的内存来保存相应的memtable。

解决方案2:我们可能希望当level-0的文件数变多时,人为增加写入速度。

解决方案3:我们致力于降低大量文件合并的成本。也许大多数level-0文件在缓存中都有未压缩的块,我们仅仅需要担心O(N)复杂度的合并迭代。

4.2 文件的数量

除了总是生成2MB的文件,我们也可以为更高的等级生成更大的文件,以此减少文件的总数,虽然代价是更多突发的压缩。当然,我们也可以将文件集合分片放在不同的文件夹中。

在2011年2月4日,在ex3文件系统上做了一次测试,该测试显示,在很多文件的目录中做100K次文件打开的平均用时如下:

Files in directory Microseconds to open a file
1000 9
10000 10
100000 16

因此,在现代文件系统中,也许连分片也是不需要的。

5.恢复

  • 读取CURRENT去找到最后一次提交的MANIFEST文件名
  • 读取该MANIFEST文件
  • 清除过期的文件
  • 我们可以在这里打开所有的sstables,但是懒加载可能会更好。
  • 将log块转换成新的level-0 sstable
  • 开始将恢复的序列导向到新的日志文件的新的写入流。

6.文件的垃圾回收

每次恢复的最后以及每次压缩的最后DeleteObsoleteFiles()会被调用。它会找出数据库中所有的文件的名字。然后删除所有的不是当前日志文件的日志文件。再删除所有的不涉及到某些level和不是一个活跃的压缩输出的表文件。

leveldb文档翻译(1)-leveldb概览

这是leveldb源代码包中提供的 readme.md,index.md 的部分翻译文档,不是逐字逐句翻译,只是把自己感兴趣的部分翻译出来了,作为看代码之前的准备工作。

1.公共接口

include文件夹下面的是公共接口,所有使用者应该从这里的头文件直接调用。

  • include/db.h:数据库的主接口:从这里开始
  • include/options.h:控制整个数据的行为,也控制数据库的读写,可以看做是配置文件。
  • include/comparator.h:用户自定义的比较函数的抽象。如果想要使用基于字节的方法对键大小进行比较,可以使用默认的比较器,用户也可以自己实现比较器来按自己的想法对存储进行排序。
  • include/iterator.h:遍历数据的接口。可以从DB对象中获取这样一个迭代器。
  • include/write_batch.h:对数据库原子的进行多个更新操作的接口。
  • include/slice.h:一个简单的模块,用来维护一个指针和长度,来表示一个字节数组。
  • include/status.h:用来汇报成功或是其他各种各样的错误。
  • include/env.h:操作系统环境的抽象。这个接口的posix实现是在util/env_posix.cc中
  • include/table.h,include/table_builder.h:低层次的模块,绝大多数客户端不会使用这个接口。

2.leveldb基本使用

leveldb提供持久化的键值存储。键和值是任意的字节数组。键值对的存储是按照键的顺序存储的,并且这个顺序是按照用户自定义的排序函数来的。

2.1 打开一个数组库

leveldb存储是使用文件系统中的文件夹。所有数据库内容都会存储在这个文件夹里。下面是一个打开数据库的例子,并且如果数据库不存在会自动创建:

#include <cassert>
#include "leveldb/db.h"

leveldb::DB* db;
leveldb::Options options;
options.create_if_missing = true;
leveldb::Status status = leveldb::DB::Open(options, "/tmp/testdb", &db);
assert(status.ok());

如果希望在数据库已经存在时报错,在leveldb::DB::Open前调用:

options.error_if_exists = true;

2.2 状态

上面的代码中使用了leveldb::Status类型,leveldb中大多数函数都会返回这个值。你可以检查这个值是不是ok,或者打印出相关的错误。

leveldb::Status s = ....;
if (!s.ok()) cerr << s.ToString() << endl;

2.3 关闭数据库

如果数据库操作结束后,删除数据库对象即可关闭

.... open the db as described above ...
.... do something with db ....
delete db;

2.4 读写

数据库提供了Put、Delete、Get方法来修改/查询数据库。例如,下面的代码用来将key1的值移动到key2中。

std::string value;
leveldb::Status s = db->Get(leveldb::ReadOptions(), key1, &value);
if (s.ok()) s = db->Put(leveldb::WriteOptions(), key2, value);
if (s.ok()) s = db->Delete(leveldb::WriteOptions(), key1);

2.5 原子更新

上面的例子中,如果在Put完key2的值,Delete key1的值前,进程崩溃了,同一个值就会被存储在多个key下。使用WriteBatch类可以原子的执行多个更新操作:

#include "leveldb/write_batch.h"
....
std::string value;
leveldb::Status s = db->Get(leveldb::ReadOptions(), key1, &value);
if (s.ok()) {
  leveldb::WriteBatch batch;
  batch.Delete(key1);
  batch.Put(key2, value);
  s = db->Write(leveldb::WriteOptions(), &batch);
}

WriteBatch存储了一系列的对数据库的编辑操作,这些操作会被顺序执行。注意到我们先执行了Delete,再执行Put,这样当key1和key2相同时,我们不会错误的删除所有的值。
WirteBatch不仅仅有原子操作的特点,还可以用来加速大量的更新操作。原理是将大量的单独的变化放入同一个批操作中。

2.6 同步写入

默认情况下,对leveldb的写入都是异步的:它在将进程中的写操作提交给操作系统后返回。数据从操作系统内存转移到持久化存储这个过程是异步的(也就是和这里不会等持久化之后才返回)。使用同步写入,可以让某些特定的写入等到数据被写入持久化存储后才返回。(在Posix系统中,这个通过调用 fsync(....)fdatasync(....)msync(...., MS_SYNC) 来实现。

leveldb::WriteOptions write_options;
write_options.sync = true;
db->Put(write_options, ....);

异步写入通常情况下是千百倍的快于同步写入。异步写入的缺点是如果机器崩溃可能会造成最后的一些更新丢失。注意仅仅是写进程的崩溃不会造成任何数据丢失,即使没有使用同步写。因为写进程只负责将写操作提交给操作系统,提交完就意味着写入已经结束(即使这个时候没有真正的写入到持久化存储中)。

异步写大多数时候都能被安全使用。比如,当载入大量数据到数据库中,如果出现崩溃你可以重新开始这个载入操作。也可以使用混合写入模式,也就是每多少个异步写之后使用一次同步写。如果崩溃了,可以从上一次同步写开始(同步写会产生一个标记,所以可以从上一次同步写开始)。

WriteBatch对于异步写提供了多个选择。多个更新操作也被放在同一个批操作中,然后使用一个同步写被一起执行(write_options.sync设置为true)。同步写产生的额外性能消耗被均摊在所有的写操作上。

2.7 并发

数据库在同一时间只能由一个进程打开。leveldb使用了操作系统的锁来防止被错误使用。在一个单进程中,同一个leveldb::DB对象可以被多个并发线程安全的共享。不同的线程之间在对同一个数据库对象写入或者遍历或者调用Get时,不需要额外的同步操作(leveldb的实现会自动做这个必要的同步操作。然而其他对象(比如Iterator和WriteBatch)可能需要额外的同步。如果两个线程共享了这样的对象,它们必须使用它们自己的锁协议来保护对象的使用。更多的细节可以在公共头文件中看到。

2.8 迭代器(Iteration)

下面的例子展示了如何打印一个数据库中所有的键值对。

leveldb::Iterator* it = db->NewIterator(leveldb::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next()) {
  cout << it->key().ToString() << ": "  << it->value().ToString() << endl;
}
assert(it->status().ok());  // Check for any errors found during the scan
delete it;

下面的例子展示了如何显示一个特定范围的数据
[start,limit):

for (it->Seek(start);
   it->Valid() && it->key().ToString() < limit;
   it->Next()) {
  ....
}

你也可以倒序遍历所有的数据(倒序比正序可能要慢)

for (it->SeekToLast(); it->Valid(); it->Prev()) {
  ....
}

2.9 快照(Snapshots)

快照提供了键值存储所有状态一致的只读视图。ReadOptions::snapshot可能是non-NULL的值,来表示一个读操作应该执行在特定的DB状态版本上。如果 ReadOptions::snapshot是NULL,读操作可能执行在当前状态的隐式快照上。

快照的创建:DB::GetSnapshot()

leveldb::ReadOptions options;
options.snapshot = db->GetSnapshot();
.... apply some updates to db ....
leveldb::Iterator* iter = db->NewIterator(options);
.... read using iter to view the state when the snapshot was created ....
delete iter;
db->ReleaseSnapshot(options.snapshot);

当一个快照不需要的时候,使用DB::ReleaseSnapshot接口来释放。

2.10 片(Slice)

*Slice不知道翻译成什么*

it->key()it->value()的返回值都调用了leveldb::Slice类型的实例。Slice是一个简单的结构:包含字节数组的长度和指针。返回一个Slice比返回std::string是一个更轻量划算的选择,因为我们不需要复制潜在的很大的键和值(返回的是指针和长度)。额外的,leveldb的方法不返回null-terminated(\0结尾)的 C-style 字符串,因为leveldb的键和值是允许存储'\0'字节的。

Slice和C++字符串以及C-style字符串之间可以互相转换。

leveldb::Slice s1 = "hello";

std::string str("world");
leveldb::Slice s2 = str;
std::string str = s1.ToString();
assert(str == std::string("hello"));

使用Slice时要小心,因为Slice取决于调用者来保证Slice中存储的数据的生命周期。下面就是一段有问题的代码的示例:

leveldb::Slice slice;
if (....) {
  std::string str = ....;
  slice = str;
}
Use(slice);

当执行到if外面的时候,str被销毁,slice中的存储也就消失了(注:这是因为slice中存储的是指针和长度)。

2.11 比较器(Comparators)

前面的例子使用的都是默认的排序函数来对键做排序,是对字节做字典排序的。当打开数据库的时候,可以使用自定义的比较器来排序。例如,假设每个数据的键都由两个数字组成,我们应该使用第一个数字来排序,使用第二个数字来打破联系。首先,定义一个合适的leveldb::Comparator的子类。

class TwoPartComparator : public leveldb::Comparator {
 public:
  // Three-way comparison function:
  //   if a < b: negative result
  //   if a > b: positive result
  //   else: zero result
  int Compare(const leveldb::Slice& a, const leveldb::Slice& b) const {
    int a1, a2, b1, b2;
    ParseKey(a, &a1, &a2);
    ParseKey(b, &b1, &b2);
    if (a1 < b1) return -1;
    if (a1 > b1) return +1;
    if (a2 < b2) return -1;
    if (a2 > b2) return +1;
    return 0;
  }

  // Ignore the following methods for now:
  const char* Name() const { return "TwoPartComparator"; }
  void FindShortestSeparator(std::string*, const leveldb::Slice&) const {}
  void FindShortSuccessor(std::string*) const {}
};

现在使用这个比较器来打开数据库:

TwoPartComparator cmp;
leveldb::DB* db;
leveldb::Options options;
options.create_if_missing = true;
options.comparator = &cmp;
leveldb::Status status = leveldb::DB::Open(options, "/tmp/testdb", &db);
....

2.11.1 向后兼容

比较器的Name()方法的返回值在数据库被创建时就已经联系起来了,在之后的每次数据库打开时都会被检查。如果name改变了,leveldb::DB::Open调用时会失败。因此,当且仅当新的键格式和比较函数和已经存在的数据库不兼容时,才会改变name的值,这时不得不丢弃所有已经存在的数据。

当然你也可以通过一些提前的计划来逐步的拓展你的键格式。例如,你可以存储一个版本号在每一个键的末尾(一个字节就够用了)。当你想要改变到新的键格式(例如添加第三个部分到键上)
– (a) 保持同样的比较器名字
– (b) 增加新的键的版本号
– (c) 改变比较函数,使用键的版本号来决定如何解释这些键

2.12 性能

性能可以通过改变配置文件来调节
include/leveldb/options.h

2.13 块大小

leveldb将相邻的键合成一组存储在同一个块中,这样的块是从持久化存储和内存间转移的最小单元。默认的块大小是大约4096个未压缩的字节。如果应用程序总是在数据库中做大量的扫描,可以考虑适当增加这个值的大小。应用程序做大量的很小的值的点读取,可以考虑使用小一点的块大小(如果这些设置确实提升了性能)。如果块大小小于1kilobyte,或者大于几个megabytes是没有太多好处的。注意对于更大的块大小使用压缩会更有效率。

2.14 压缩

每一个数据块在写入持久化存储前会单独的被压缩,压缩是默认的,因为默认的压缩函数是非常快的。对于不可压缩的数据会自动禁用。在极少数的例子中,程序可能会希望完全禁用压缩,仅仅当性能测试显示性能提升了才推荐这么做。

leveldb::Options options;
options.compression = leveldb::kNoCompression;
.... leveldb::DB::Open(options, name, ....) .....

2.15 缓存

数据库的内容被存储在文件系统的文件集合中。每一个文件存储了一系列的压缩的数据块。如果options.cache是non-NULL的,经常使用的未压缩的数据块内容将会被缓存。

#include "leveldb/cache.h"

leveldb::Options options;
options.cache = leveldb::NewLRUCache(100 * 1048576);  // 100MB cache
leveldb::DB* db;
leveldb::DB::Open(options, name, &db);
.... use the db ....
delete db
delete options.cache;

注意缓存保存了未压缩的数据,因此根据程序级的数据量来设置缓存的大小,使用压缩的数据不会有任何的减少。(压缩的数据块的存储留给操作系统缓冲区缓存,或者客户端的其他自定义环境变量来实现)

当执行大量的读取操作时,应用程序最好禁用缓存,这样大量的数据读取不会导致缓存内容大量被改变。per-iterator选项用来实现这个:

leveldb::ReadOptions options;
options.fill_cache = false;
leveldb::Iterator* it = db->NewIterator(options);
for (it->SeekToFirst(); it->Valid(); it->Next()) {
  ....
}

2.15.1 键层(Key Layout)

注意硬盘中转移和缓存的单元是一个数据块。相邻的键(根据数据库排序顺序)将被放在同一个数据块中。因此应用程序通过将所有的键依次相邻的排放,将不常访问的值内容(通过key来索引)放在另外一个区域可以提升程序性能。

举例来说,假设我们正在在leveldb上实现一个简单的文件系统,下面的入口类型我们可能希望存储。

filename -> permission-bits, length, list of file_block_ids
file_block_id -> data

我们可能想要设置filename的键前缀为一个字母(‘/’),file_block_id的键前缀为另一个字母(‘0’),因此扫描这些元数据不会强制我们取出和缓存大量的文件内容。

2.16 过滤器

因为leveldb的数据在磁盘上有结构地存放,一个简单的Get()可能调用多个从磁盘的读取。选项 FilterPolicy机制可以被用来减少磁盘读取的次数。

leveldb::Options options;
options.filter_policy = NewBloomFilterPolicy(10);
leveldb::DB* db;
leveldb::DB::Open(options, "/tmp/testdb", &db);
.... use the database ....
delete db;
delete options.filter_policy;

上面的代码将一个基于过滤策略的布隆过滤器(Bloom filter)和数据库联系起来。布隆过滤器的基础过滤依赖于保存每个键的一些位的数据在内存中(在这个例子中每个键有10个bit)。这个过滤器将会减少Get()带来的不必要的磁盘读取的次数大概为100。增加保存的每个键的bit数会更明显的减少读取次数,但是这是在更多的内存消耗的代价上得来的(注:典型的以空间换时间)。我们建议应用程序工作时还剩很多的内存,并且会做大量的随机读取时,设置一个过滤策略。

(注:布隆过滤器是一个针对于大数据量的去重算法,它会通过多个hash将一个字节数组映射到一个很大的bit向量中的k个位置,通过检查一个字节数组hash后的k个位置对应的bit向量是否都为1,如果不是则这个字节数组没有出现过,否则出现过(会有少量的误报)。使用在这里,是为了在读取前检查这个键是否存在,如果不存在则避免了以此全数据库查询,从而大大的提升的性能)

如果你使用一个自定义的比较器,你应该保证你正在使用的过滤策略是和你比较器兼容的。例如,想象一个比较器比较键时忽略了末尾的空间,NewBloomFilterPolicy不能和这样的比较器一起使用。相反的,应用程序需要提供一个自定义的过滤策略同样忽略这些末尾的空间。比如:

class CustomFilterPolicy : public leveldb::FilterPolicy {
 private:
  FilterPolicy* builtin_policy_;

 public:
  CustomFilterPolicy() : builtin_policy_(NewBloomFilterPolicy(10)) {}
  ~CustomFilterPolicy() { delete builtin_policy_; }

  const char* Name() const { return "IgnoreTrailingSpacesFilter"; }

  void CreateFilter(const Slice* keys, int n, std::string* dst) const {
    // Use builtin bloom filter code after removing trailing spaces
    std::vector<Slice> trimmed(n);
    for (int i = 0; i < n; i++) {
      trimmed[i] = RemoveTrailingSpaces(keys[i]);
    }
    return builtin_policy_->CreateFilter(&trimmed[i], n, dst);
  }
};

有一些应用程序可能提供其他过滤策略不使用布隆过滤器,而是使用其他的机制来处理键的集合。细节看leveldb/filter_policy.h

2.17 校验(Checksums)

leveldb会校验所有存储在文件系统中的数据。这里有两个不相关的控制部分提供积极的校验:

ReadOptions::verify_checksums可被设置为true,强制校验从文件系统一次读取的所有数据。默认情况是不会做这个验证的。

Options::paranoid_checks在打开数据库之前可被设置为true,来使得数据库一旦检查到内部错误立马报错。这取决于数据库的哪一部分出错,当数据库被打开时还是其他后来的操作时,这个错误会被警告。默认情况下,偏执检查是关闭的,因此即使有时候持久化存储已经出错了,数据库依然可以被使用。

如果一个数据库出错了(可能它在偏执检查开启时不能打开), leveldb::RepairDB方法会尽可能的修复数据库的数据。

2.18 近似大小

GetApproximateSizes方法可以用来通过一个或多个键范围来获取文件系统已用的空间的近似字节数

leveldb::Range ranges[2];
ranges[0] = leveldb::Range("a", "c");
ranges[1] = leveldb::Range("x", "z");
uint64_t sizes[2];
leveldb::Status s = db->GetApproximateSizes(ranges, 2, sizes);

前面的代码调用将会设置sizes[0]为键范围在[a..c)之间的数据使用的文件系统存储空间的大小,设置sizes[1]为键范围在[x..z)之间的数据使用的文件系统存储空间的大小。

2.19 环境

所有的文件操作(以及其他的系统调用)都在leveldb:Env中实现了。复杂的客户端可能希望提供它们自己的Env的实现来获得更好的控制。例如,一个应用程序可能会在文件IO中引入人为延迟,限制leveldb对系统其他活动的影响。

class SlowEnv : public leveldb::Env {
  .... implementation of the Env interface ....
};

SlowEnv env;
leveldb::Options options;
options.env = &env;
Status s = leveldb::DB::Open(options, ....);

2.20 移植(Porting)

leveldb可能通过提供被leveldb/port/port.h导出的类型/方法/函数/的平台相关的实现,来移植到新的平台。详细内容参考leveldb/port/port_example.h

另外的,新的平台可能需要新的默认leveldb::Env的实现。详细内容参考leveldb/util/env_posix.h

2.21 其他信息

其他信息可以参考其他的文档文件
1. impl.md
2. table_format.md
3. log_format.md

基于Jenkins的自动部署方案

一、前言

在一家小公司工作,负责开发一个项目,前台用的angular2,后台是php和c++。每天下班之前都要将当天的代码部署的线上的开发服务器上,常常也需要更新到生产服务器(项目并没有正式运行,但是老板要用项目去拉投资)。手动部署代码总有一种浪费时间的感觉(因为下班时间到了啊!!!)。使用Jenkins+shell脚本完成自动部署就可以避免这种情况了。感谢赖同学的指导

二、流程介绍

公司内网搭建了git服务器,线上的开发用的服务器部署在阿里云,但是这台服务器的配置实在是太低了,完全没法胜任在线编译部署的需求。所以我要做的是将代码提交到git服务器后,使用Jenkins从git的dev分支拉取代码,构建部署计划(手动或者使用钩子都行)。

三、Jenkins的使用

第一步:部署Jenkins,这一步很简单,直接从Jenkins的官网上wget Jenkins的war包,然后java -jar jenkins.war即可。可以配合supervisor来实现自动启动和重启。

第二步:访问http://ip:8080进行配置,第一次配置的密码是java -jar jenkins.war时,在控制台上输出的密文。然后新建项目,然后选择构建一个多配置项目

第二步:进入到一个配置界面,填一下项目名称,其他的都不用管,直接来到源代码管理。我用的是Git服务器,所以选择git,然后填入仓库地址,配置项目仓库地址可以是HTTP协议,也可以是SSH协议,然后记得在Credentials字段填入认证信息,HTTP协议可以是用户名和密码,SSH的话可以是秘钥。选择的是dev分支。之后在填写构建信息。构建信息构建命令主要就是执行一个脚本

第三步:编写自动部署的脚本,主要就是编译,然后发送到远程服务器,这里用了scp来传输文件,非常好用。

脚本很简单,具体如下:

#!/bin/bash
## 自动部署脚本

cd homepage/mobile
cnpm install
ng build -prod -aot -env=prod
scp -r dist/* root@120.26.103.174:/alidata/www/m_family

注意这里的scp传输文件到远程服务器的前提是,远程服务器上.ssh文件夹下的authorized_keys中有Jenkins所在服务器的公钥,这样才能不需要密码通过认证。