一、Varint
Varint是在leveldb中广泛使用的一种变长的整数类型,Varint其实和unicode的实现非常相似,并且是little-endian。如果当前字节的最高位是1,则表示后面的字节也属于这个整数,如果最高位是0,则表示该整数结束了。所以,一个无符号4个字节的整型数,使用Varint可能以1,2,3,4,5个字节来表示。
比如130:
无符号整型表示起来为:00000000 00000000 00000000 10000010
使用Varint表示为:10000010 00000001 #这里注意Varint是little-endian
因为牺牲了每个字节的最高位作为标志位,所以遇到大的数可能需要5个字节。
某个无符号整型:10000001 00000001 00000001 00000001
使用Varint表示:10000001 10000010 10000100 10001000 00001000
但是平均下来,Varint是绝对会节省大量的存储空间的。
那么leveldb中Varint是如何编码和解码的呢?
char* EncodeVarint32(char* dst, uint32_t v) {
// Operate on characters as unsigneds
unsigned char* ptr = reinterpret_cast<unsigned char*>(dst);
static const int B = 128; //10000000
if (v < (1<<7)) { v < 2^7
*(ptr++) = v;
} else if (v < (1<<14)) { v < 2^14
*(ptr++) = v | B;
*(ptr++) = v>>7;
} else if (v < (1<<21)) { //v < 2^21
*(ptr++) = v | B;
*(ptr++) = (v>>7) | B;
*(ptr++) = v>>14;
} else if (v < (1<<28)) {
*(ptr++) = v | B;
*(ptr++) = (v>>7) | B;
*(ptr++) = (v>>14) | B;
*(ptr++) = v>>21;
} else {
*(ptr++) = v | B;
*(ptr++) = (v>>7) | B;
*(ptr++) = (v>>14) | B;
*(ptr++) = (v>>21) | B;
*(ptr++) = v>>28;
}
return reinterpret_cast<char*>(ptr);
}
上面这一段对uint32_t的变量进行Varint32编码的例子,编码的结果保存在dst里,返回的ptr是dst的最后一个字节的指针。使用if-else对1、2、3、4、5个字节的情况分别做了处理。主要就是移位和或运算符的使用。跟着代码走一遍就能理解。
const char* GetVarint32PtrFallback(const char* p,
const char* limit,
uint32_t* value) {
uint32_t result = 0;
for (uint32_t shift = 0; shift <= 28 && p < limit; shift += 7) {
uint32_t byte = *(reinterpret_cast<const unsigned char*>(p));
p++;
if (byte & 128) {
// More bytes are present
result |= ((byte & 127) << shift);
} else {
result |= (byte << shift);
*value = result;
return reinterpret_cast<const char*>(p);
}
}
return NULL;
}
inline const char* GetVarint32Ptr(const char* p,
const char* limit,
uint32_t* value) {
if (p < limit) {
uint32_t result = *(reinterpret_cast<const unsigned char*>(p));
if ((result & 128) == 0) {
//如果低8位是0xxxxxxx,little-endian,也就是小于128
*value = result; //直接赋值
return p + 1;
}
}
return GetVarint32PtrFallback(p, limit, value);
}
上面GetVarint32Ptr是解码Varint32,当值大于128时,则调用GetVarint32PtrFallback。在则调用GetVarint32PtrFallback中,利用循环一个字节一个字节的取出,最终将value指针指向结果result。
二、Arena
Arena是leveldb中管理内存分配的类。所有的内存分配都通过Arena申请,可以根据申请的内存大小使用不同的内存分配策略,也可以避免过多的内存碎片问题,并且在内存释放时统一使用Arena来释放,方便管理。
Arena类的实现并不复杂。首先看一下成员变量。
// Allocation state
char* alloc_ptr_; //指向当前块中剩余的内存起点
size_t alloc_bytes_remaining_; //当前块中剩余的内存
// Array of new[] allocated memory blocks
std::vector<char*> blocks_; //用来保存所有new出来的char数组,释放内存时也会使用到
// Total memory usage of the arena.
port::AtomicPointer memory_usage_; //通过Arena申请的内存
然后就是public的成员函数:
// Return a pointer to a newly allocated memory block of "bytes" bytes.
char* Allocate(size_t bytes);
// Allocate memory with the normal alignment guarantees provided by malloc
char* AllocateAligned(size_t bytes);
// Returns an estimate of the total memory usage of data allocated
// by the arena.
size_t MemoryUsage() const {
return reinterpret_cast<uintptr_t>(memory_usage_.NoBarrier_Load());
}
功能上很简单,Allocate用来申请指定大小的内存,AllocateAligned用来申请保证内存对齐的内存空间,MemoryUsage用来获取内存使用情况。
我对内存对齐的理解:现在假定内存对齐的最小单位是8个字节,那么如果你只申请14个字节,内存对齐会多给你分配2个字节凑成16个字节。内存对齐的好处就是访问速度的提升,在CPU一次读取8个字节的情况下。在没有内存对齐的情况下,你要读取第6~12个字节,就需要读取两次,第一次0~7个字节,通过移位取出6~7,再读取第8~15个字节,取出8~12。这样就需要读取两次。而如果在内存对齐的情况下,你需要读取的6~12个字节应该被放在8~15个字节中。只需读取一次即可。
私有的成员函数有:
char* AllocateFallback(size_t bytes);
char* AllocateNewBlock(size_t block_bytes);
这两个函数提供了具体的内存分配的实现。
具体分析这5个成员函数。
1.Allocate(size_t bytes)
inline char* Arena::Allocate(size_t bytes) {
// The semantics of what to return are a bit messy if we allow
// 0-byte allocations, so we disallow them here (we don't need
// them for our internal use).
assert(bytes > 0);
if (bytes <= alloc_bytes_remaining_) {
char* result = alloc_ptr_;
alloc_ptr_ += bytes;
alloc_bytes_remaining_ -= bytes;
return result;
}
return AllocateFallback(bytes);
}
Allocate是一个内联函数。内联函数的好处在于编译时会直接在调用处展开,对于程序执行速度有一定提升,但是也会导致程序大小变大。
assert(bytes > 0);
是一个断言语句,断言语句可以在编译时由编译器去除,但是在调试的时候可以帮助程序员发现问题所在。这里用来保证没有出现0个字节分配的情况。
后面的判断语句用来判断当前块的剩余空间是否够分配,如果够则分配。如果不够则交给AllocateFallback()处理。返回的是分配的空间起点。
2.AllocateAligned(size_t bytes)
char* Arena::AllocateAligned(size_t bytes) {
const int align = (sizeof(void*) > 8) ? sizeof(void*) : 8;
assert((align & (align-1)) == 0); // Pointer size should be a power of 2
size_t current_mod = reinterpret_cast<uintptr_t>(alloc_ptr_) & (align-1);
size_t slop = (current_mod == 0 ? 0 : align - current_mod);
size_t needed = bytes + slop;
char* result;
if (needed <= alloc_bytes_remaining_) {
result = alloc_ptr_ + slop;
alloc_ptr_ += needed;
alloc_bytes_remaining_ -= needed;
} else {
// AllocateFallback always returned aligned memory
result = AllocateFallback(bytes);
}
assert((reinterpret_cast<uintptr_t>(result) & (align-1)) == 0);
return result;
}
AllocateAligned(size_t bytes)用来做内存对齐的分配。
const int align = (sizeof(void*) > 8) ? sizeof(void*) : 8;
用来获取分配的最小单位,如果指针大小大于8,则取指针大小,否则使用8作为最小的对齐单位。
assert((align & (align-1)) == 0);
这是一个很有意思的判断,可以保证只有2的n次方才能满足。
size_t current_mod = reinterpret_cast<uintptr_t>(alloc_ptr_) & (align-1);
current_mod是余数,比如当前指针地址是15,align是8,那么余数为7,这个余数就是alloc_prt_&(align-1)的值。
size_t slop = (current_mod == 0 ? 0 : align - current_mod);
因为当前指针可能不是align的整数倍,slop即代表当前指针需要偏移的大小,来保证是align的整数倍。
size_t needed = bytes + slop;
needed代表需要分配的空间。
之后的判断则和Allocate()函数做法相同了。需要注意的是返回的result指针是align的整数倍,这样就能保证以最少的内存读取次数来获取想要的数据。
3.AllocateFallback(size_t bytes)
char* Arena::AllocateFallback(size_t bytes) {
if (bytes > kBlockSize / 4) {
//如果需要分配的bytes大于块大小的1/4,则单独在新的块中分配,以此避免浪费过多的剩下的空间(这样能保证浪费的空间小与kBlockSize / 4)
char* result = AllocateNewBlock(bytes);
return result;
}
// 重新开辟新的块空间进行分配,上一个块剩下的空间都浪费了。
alloc_ptr_ = AllocateNewBlock(kBlockSize);
alloc_bytes_remaining_ = kBlockSize;
char* result = alloc_ptr_;
alloc_ptr_ += bytes;
alloc_bytes_remaining_ -= bytes;
return result;
}
上面两个成员函数在当前块分配空间不足时,都将分配任务交给AllocateFallback处理。
4.AllocateNewBlock(size_t block_bytes)。
char* Arena::AllocateNewBlock(size_t block_bytes) {
char* result = new char[block_bytes];
blocks_.push_back(result);
memory_usage_.NoBarrier_Store(
reinterpret_cast<void*>(MemoryUsage() + block_bytes + sizeof(char*)));
return result;
}
使用new char来申请一个新的块,并将块的起始指针存入blocks_中。更新memory_usage的大小。
5.MemoryUsage()
“`
size_t MemoryUsage() const {
return reinterpret_cast<uintptr_t>(memory_usage_.NoBarrier_Load());
}
“`
获取memory_usage_中存储的使用空间数据。
三、Memeory Barrier(内存屏障)
memory_usage_的类型是AtomicPointer,这是一个原子类型,针对不同平台有不同的实现。可以用来线程安全的存储和获取值。
AtomicPointer类实现了几个函数:
NoBarrier_Load();
NoBarrier_Store(void* v)
Acquire_Load();
Release_Store(void* v)
这里面涉及到一个Memory Barrier(内存屏障,Memory fence)的概念。
引用一个wikipedia的例子:
Processor #1:
while (f 0);
// Memory fence required here
print x;Processor #2:
x = 42;
// Memory fence required here
f = 1;
一个两核心的CPU按照上述例子执行。可能会出现很多种情况:
虽然我们希望的是打印“42”。但是如果核心#2存储操作乱序执行(out-of-order),就有可能f在x之前被更新了。这样打印语句就可能打印出“0”。同样的,核心#1读取操作可能也乱序执行,x就有可能在f被判断之前读取了,打印语句可能打印出一个无法预期的值。对于大多数程序来说,上面的这些情况都是不可以接受的。使用memory barrier可以避免out-of-order的问题。
简而言之就是,在多CPU共享内存空间的情况下,两条语句的执行顺序已经无法保证了。需要memory barrier来保证执行顺序的正确。
当然,单CPU是不会出现这个问题的。
现在再来解释上面的4个函数的作用。
NoBarrier_Load(); //不使用内存屏障的读取
NoBarrier_Store(void* v) //不使用内存屏障的存储
Acquire_Load(); //使用内存屏障的读取
Release_Store(void* v) //使用内存屏障的存储