leveldb源码阅读(一)-log读取和写入
标签(空格分隔): leveldb log
一、 引言
leveldb中,所有对数据库的操作都会记录到log中,当log文件达到预定义的大小后,就会被转换成数据表(sstable),所以log文件的生成和读取算是leveldb核心部分的第一步。
这篇文章涉及到的leveldb的源代码文件有:
db/log_format.h
db/log_reader.h
db/log_reader.cc
db/log_writer.h
db/log_writer.cc
leveldb/slice.h
leveldb/status.h
二、日志存储格式——log_format.h
在之前的文章中,了解到log是由一个个block
组成,每个block是32KB
的大小。
block := record* trailer?
record :=
checksum: uint32 // crc32c of type and data[] ; little-endian
length: uint16 // little-endian
type: uint8 // One of FULL, FIRST, MIDDLE, LAST
data: uint8[length]
每个block
中存储的是一条条的记录
,如果一个block
的末尾最后剩下小于等于6bytes的空间,就会使用一个trailer
填充而不是用新的记录填充,这个trailer
都是由0字节组成。一条记录包括一个crc32c生成的校验码checksum、记录的长度length、记录的类型type、数据。
如果一条记录跨越了多个block
,就会被分段,分段类型有3种——First、Middle、Last
关于这部分的实现是在log_format.h中
enum RecordType {
//0是保留字,用来表示预分配的文件
kZeroType = 0,
kFullType = 1,
// 一条记录被分段的3种类型
kFirstType = 2,
kMiddleType = 3,
kLastType = 4
};
//定义了最大的记录类型值
static const int kMaxRecordType = kLastType;
//定义了最大的`block`大小
static const int kBlockSize = 32768;
// 记录头是 checksum (4 bytes), length (2 bytes), type (1 byte).总共7bytes
static const int kHeaderSize = 4 + 2 + 1;
三、日志的写入
在log_writer代码中,实现了Writer类,最主要的就是一个AddRecord(const Slice& slice)
。
Writer类有两个构造函数,对于只传了一个参数——WritableFile指针的构造函数,使用了explicit显示构造,这样可以防止隐式转换。另外将Writer(const Writer&)
和void operator=(const Writer&)
放在private域,防止拷贝赋值。
Writer有三个成员变量 WritableFile* dest_
用来保存可写的文件,int block_offset_
用来保存当前块的偏移量, uint32_t type_crc_[kMaxRecordType + 1]
用来保存5种记录类型的crc32c计算结果。这里会提前计算好保存进这个数组来减少存储时计算的负载。
接下来就是最重要的AddRecord(const Slice& slice)
了。Slice是leveldb中用来保存key或者value的值的,包含一个指针,一个长度。
Status Writer::AddRecord(const Slice& slice) {
const char* ptr = slice.data(); //取出数据的指针
size_t left = slice.size(); //取出数据的长度
// 如果有必要的话,对记录分段,并保存.
// 注意如果slice是空的,我们也会迭代一次,保存一个长度为0的记录
Status s; //用来保存这次操作的结果
bool begin = true; //是否是一条记录的开始
do {
//计算出当前块剩下的空间
const int leftover = kBlockSize - block_offset_;
//使用assert进行断言,这边是一定会>=0的,assert这里只是为了帮助调试程序,实际使用时在编译时通过参数去除assert语句
assert(leftover >= 0);
//如果剩下的空间已经不足一条记录的header了
if (leftover < kHeaderSize) {
// 切换到新的块
if (leftover > 0) {
// 如果剩下的空间>0,需要使用\x00填充
assert(kHeaderSize == 7);
dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover));
}
//新的块偏移量是0
block_offset_ = 0;
}
// 不变的道理:我们绝对不会让剩下的空间<kHeaderSize.
assert(kBlockSize - block_offset_ - kHeaderSize >= 0);
//当前还可用的空间
const size_t avail = kBlockSize - block_offset_ - kHeaderSize;
//记录片段的大小,取可用的空间大小和记录的大小的最小值
const size_t fragment_length = (left < avail) ? left : avail;
//当前的记录类型
RecordType type;
//如果记录的大小和片段的长度是一样的,表示到达了记录的末尾为true,否则false
const bool end = (left == fragment_length);
//记录类型的判断
if (begin && end) {
type = kFullType;
} else if (begin) {
type = kFirstType;
} else if (end) {
type = kLastType;
} else {
type = kMiddleType;
}
//写入到物理存储中,
s = EmitPhysicalRecord(type, ptr, fragment_length);
ptr += fragment_length; //数据指针右移一个片段长度
left -= fragment_length; //数据的长度减去一个片段的长度
begin = false; //不再是开始了
} while (s.ok() && left > 0); //如果没有出错,并且数据还剩就再循环
return s;
}
AddRecord(const Slice& slice)
调用了一个私有成员函数EmitPhysicalRecord(RecordType t, const char* ptr, size_t n)
,看一看这个将记录写入持久化存储是如何实现的。
Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) {
assert(n <= 0xffff); // n必须可以用2KB表示,因为一条记录的length是uint16大小的,little-endian
assert(block_offset_ + kHeaderSize + n <= kBlockSize);
// 对记录的header进行格式化
char buf[kHeaderSize];
//取低位字节
buf[4] = static_cast<char>(n & 0xff);
//取高位字节
buf[5] = static_cast<char>(n >> 8);
//一位的记录类型
buf[6] = static_cast<char>(t);
// 计算记录类型的crc值和有效载荷
uint32_t crc = crc32c::Extend(type_crc_[t], ptr, n);
crc = crc32c::Mask(crc); // Adjust for storage
//填充4位的校验值
EncodeFixed32(buf, crc);
// 将记录的header写入
Status s = dest_->Append(Slice(buf, kHeaderSize));
if (s.ok()) {
//将记录的数据写入
s = dest_->Append(Slice(ptr, n));
if (s.ok()) {
//flush一下
s = dest_->Flush();
}
}
//块的偏移量右移header和数据的长度和
block_offset_ += kHeaderSize + n;
return s;
}
上面的代码分析其实就是log部分逻辑上的实现,将一条记录格式化成设计好的格式。这里有一个疑问就是dest_->Append(Slice& slice)
和dest_->Flush()
是如何工作的。所以继续追踪下去,到include/leveldb/env.h
中:
// A file abstraction for sequential writing. The implementation
// must provide buffering since callers may append small fragments
// at a time to the file.
class WritableFile {
public:
WritableFile() { }
virtual ~WritableFile();
virtual Status Append(const Slice& data) = 0;
virtual Status Close() = 0;
virtual Status Flush() = 0;
virtual Status Sync() = 0;
private:
// No copying allowed
WritableFile(const WritableFile&);
void operator=(const WritableFile&);
};
这部分是WritableFile的抽象部分,Append、Close、Flush、Sync都是纯虚函数,具体实现必须由用户自己去做。并且实现时必须提供缓冲区,这样调用者可以append小的数据段到文件中。
实际上leveldb中也有默认(posix)的实现。在util/env/env_posix.cc
中。目前只看Append和Flush具体实现:
virtual Status Append(const Slice& data) {
size_t r = fwrite_unlocked(data.data(), 1, data.size(), file_);
if (r != data.size()) {
return IOError(filename_, errno);
}
return Status::OK();
}
virtual Status Flush() {
if (fflush_unlocked(file_) != 0) {
return IOError(filename_, errno);
}
return Status::OK();
}
fwrite_unlocked和fflush_unlocked都是define的fwrite和fflush方法。
所以这里的设计是很好的案例。通过将WritableFile抽象化,提供给用户自己来实现,从而很轻松的就可以在多平台间移植。
四、日志的读取
在log_reader中,实现了Reader类,用来读取日志文件。
Reader的构造函数定义如下:
Reader(SequentialFile* file, Reporter* reporter, bool checksum,
uint64_t initial_offset);
file是代表要读取的文件,reporter是用来报告日志文件中发现错误的,checksum是当前文件的校验码,initial_offset是当前文件中开始读取的物理位置。
Reader类中私有的成员变量和函数需要注意的有:
SequentialFile* const file_; //要读取的文件
Reporter* const reporter_; //报告错误的对象
bool const checksum_; //校验值
char* const backing_store_; //备份存储
Slice buffer_; //缓冲
bool eof_; // Last Read() indicated EOF by returning < kBlockSize
// Offset of the last record returned by ReadRecord.
uint64_t last_record_offset_; //最后一个Record的偏移量
// Offset of the first location past the end of buffer_.
uint64_t end_of_buffer_offset_; //
// Offset at which to start looking for the first record to return
uint64_t const initial_offset_; //第一个记录的初始偏移量
// True if we are resynchronizing after a seek (initial_offset_ > 0). In
// particular, a run of kMiddleType and kLastType records can be silently
// skipped in this mode
bool resyncing_; //在一次查找后我们再同步则是True。特别的,kMiddleType和kLastType会被跳过
// Extend record types with the following special values
enum {
kEof = kMaxRecordType + 1,
// 无效的记录值,可能有以下情况
// * 具有无效的CRC值(checksum不正确)(ReadPhysicalRecord reports a drop)
// * 长度为0的记录 (No drop is reported)
// * The record is below constructor's initial_offset (No drop is reported)
kBadRecord = kMaxRecordType + 2
};
bool SkipToInitialBlock(); //跳过直到initial_offset的位置,成功则返回true
unsigned int ReadPhysicalRecord(Slice* result); //读取Record,返回记录类型,或者kEof、kBadRecord
void ReportCorruption(uint64_t bytes, const char* reason); //汇报丢弃的字节,以及原因
void ReportDrop(uint64_t bytes, const Status& reason); //汇报丢弃的字节,以及原因
这些成员函数中着重看一下ReadPhysicalRecord
unsigned int Reader::ReadPhysicalRecord(Slice* result) {
while (true) {
if (buffer_.size() < kHeaderSize) {
if (!eof_) {
// Last read was a full read, so this is a trailer to skip
buffer_.clear();
Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
end_of_buffer_offset_ += buffer_.size();
if (!status.ok()) {
buffer_.clear();
ReportDrop(kBlockSize, status);
eof_ = true;
return kEof;
} else if (buffer_.size() < kBlockSize) {
eof_ = true;
}
continue;
} else {
// Note that if buffer_ is non-empty, we have a truncated header at the
// end of the file, which can be caused by the writer crashing in the
// middle of writing the header. Instead of considering this an error,
// just report EOF.
buffer_.clear();
return kEof;
}
}
// Parse the header
const char* header = buffer_.data();
const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff;
const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff;
const unsigned int type = header[6];
const uint32_t length = a | (b << 8);
if (kHeaderSize + length > buffer_.size()) {
size_t drop_size = buffer_.size();
buffer_.clear();
if (!eof_) {
ReportCorruption(drop_size, "bad record length");
return kBadRecord;
}
// If the end of the file has been reached without reading |length| bytes
// of payload, assume the writer died in the middle of writing the record.
// Don't report a corruption.
return kEof;
}
if (type == kZeroType && length == 0) {
// Skip zero length record without reporting any drops since
// such records are produced by the mmap based writing code in
// env_posix.cc that preallocates file regions.
buffer_.clear();
return kBadRecord;
}
// Check crc
if (checksum_) {
uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header));
uint32_t actual_crc = crc32c::Value(header + 6, 1 + length);
if (actual_crc != expected_crc) {
// Drop the rest of the buffer since "length" itself may have
// been corrupted and if we trust it, we could find some
// fragment of a real log record that just happens to look
// like a valid log record.
size_t drop_size = buffer_.size();
buffer_.clear();
ReportCorruption(drop_size, "checksum mismatch");
return kBadRecord;
}
}
buffer_.remove_prefix(kHeaderSize + length);
// Skip physical record that started before initial_offset_
if (end_of_buffer_offset_ - buffer_.size() - kHeaderSize - length <
initial_offset_) {
result->clear();
return kBadRecord;
}
*result = Slice(header + kHeaderSize, length);
return type;
}
}
Reader类中公共的成员函数需要注意的有:
bool ReadRecord(Slice* record, std::string* scratch);
uint64_t LastRecordOffset();
ReadRecord用来读取下一个要读取的Record,如果读取成功,返回true,存储在record中,如果读到了结尾没有一个完整的Record,则存储在scratch中
bool Reader::ReadRecord(Slice* record, std::string* scratch) {
if (last_record_offset_ < initial_offset_) {
if (!SkipToInitialBlock()) { //跳到初始的偏移位置
return false;
}
}
scratch->clear();
record->clear();
bool in_fragmented_record = false;
// Record offset of the logical record that we're reading
// 0 is a dummy value to make compilers happy
uint64_t prospective_record_offset = 0;
Slice fragment;
while (true) {
const unsigned int record_type = ReadPhysicalRecord(&fragment);
// ReadPhysicalRecord may have only had an empty trailer remaining in its
// internal buffer. Calculate the offset of the next physical record now
// that it has returned, properly accounting for its header size.
uint64_t physical_record_offset =
end_of_buffer_offset_ - buffer_.size() - kHeaderSize - fragment.size();
if (resyncing_) {
if (record_type == kMiddleType) {
continue;
} else if (record_type == kLastType) {
resyncing_ = false;
continue;
} else {
resyncing_ = false;
}
}
switch (record_type) {
case kFullType:
if (in_fragmented_record) {
// Handle bug in earlier versions of log::Writer where
// it could emit an empty kFirstType record at the tail end
// of a block followed by a kFullType or kFirstType record
// at the beginning of the next block.
if (scratch->empty()) {
in_fragmented_record = false;
} else {
ReportCorruption(scratch->size(), "partial record without end(1)");
}
}
prospective_record_offset = physical_record_offset;
scratch->clear();
*record = fragment;
last_record_offset_ = prospective_record_offset;
return true;
case kFirstType:
if (in_fragmented_record) {
// Handle bug in earlier versions of log::Writer where
// it could emit an empty kFirstType record at the tail end
// of a block followed by a kFullType or kFirstType record
// at the beginning of the next block.
if (scratch->empty()) {
in_fragmented_record = false;
} else {
ReportCorruption(scratch->size(), "partial record without end(2)");
}
}
prospective_record_offset = physical_record_offset;
scratch->assign(fragment.data(), fragment.size());
in_fragmented_record = true;
break;
case kMiddleType:
if (!in_fragmented_record) {
ReportCorruption(fragment.size(),
"missing start of fragmented record(1)");
} else {
scratch->append(fragment.data(), fragment.size());
}
break;
case kLastType:
if (!in_fragmented_record) {
ReportCorruption(fragment.size(),
"missing start of fragmented record(2)");
} else {
scratch->append(fragment.data(), fragment.size());
*record = Slice(*scratch);
last_record_offset_ = prospective_record_offset;
return true;
}
break;
case kEof:
if (in_fragmented_record) {
// This can be caused by the writer dying immediately after
// writing a physical record but before completing the next; don't
// treat it as a corruption, just ignore the entire logical record.
scratch->clear();
}
return false;
case kBadRecord:
if (in_fragmented_record) {
ReportCorruption(scratch->size(), "error in middle of record");
in_fragmented_record = false;
scratch->clear();
}
break;
default: {
char buf[40];
snprintf(buf, sizeof(buf), "unknown record type %u", record_type);
ReportCorruption(
(fragment.size() + (in_fragmented_record ? scratch->size() : 0)),
buf);
in_fragmented_record = false;
scratch->clear();
break;
}
}
}
return false;
}