leveldb源码阅读(一)-log读取和写入

leveldb源码阅读(一)-log读取和写入

标签(空格分隔): leveldb log


一、 引言

leveldb中,所有对数据库的操作都会记录到log中,当log文件达到预定义的大小后,就会被转换成数据表(sstable),所以log文件的生成和读取算是leveldb核心部分的第一步。
这篇文章涉及到的leveldb的源代码文件有:

db/log_format.h
db/log_reader.h
db/log_reader.cc
db/log_writer.h
db/log_writer.cc
leveldb/slice.h
leveldb/status.h

二、日志存储格式——log_format.h

在之前的文章中,了解到log是由一个个block组成,每个block是32KB的大小。

block := record* trailer?
record :=
  checksum: uint32     // crc32c of type and data[] ; little-endian
  length: uint16       // little-endian
  type: uint8          // One of FULL, FIRST, MIDDLE, LAST
  data: uint8[length]

每个block中存储的是一条条的记录,如果一个block的末尾最后剩下小于等于6bytes的空间,就会使用一个trailer填充而不是用新的记录填充,这个trailer都是由0字节组成。一条记录包括一个crc32c生成的校验码checksum、记录的长度length、记录的类型type、数据。

如果一条记录跨越了多个block,就会被分段,分段类型有3种——First、Middle、Last

关于这部分的实现是在log_format.h中

enum RecordType {
  //0是保留字,用来表示预分配的文件
  kZeroType = 0,

  kFullType = 1,

  // 一条记录被分段的3种类型
  kFirstType = 2,
  kMiddleType = 3,
  kLastType = 4
};
//定义了最大的记录类型值
static const int kMaxRecordType = kLastType;
//定义了最大的`block`大小
static const int kBlockSize = 32768;

// 记录头是  checksum (4 bytes), length (2 bytes), type (1 byte).总共7bytes
static const int kHeaderSize = 4 + 2 + 1;

三、日志的写入

在log_writer代码中,实现了Writer类,最主要的就是一个AddRecord(const Slice& slice)

Writer类有两个构造函数,对于只传了一个参数——WritableFile指针的构造函数,使用了explicit显示构造,这样可以防止隐式转换。另外将Writer(const Writer&)void operator=(const Writer&)放在private域,防止拷贝赋值。

Writer有三个成员变量 WritableFile* dest_用来保存可写的文件,int block_offset_用来保存当前块的偏移量, uint32_t type_crc_[kMaxRecordType + 1]用来保存5种记录类型的crc32c计算结果。这里会提前计算好保存进这个数组来减少存储时计算的负载。

接下来就是最重要的AddRecord(const Slice& slice)了。Slice是leveldb中用来保存key或者value的值的,包含一个指针,一个长度。

Status Writer::AddRecord(const Slice& slice) {
  const char* ptr = slice.data();   //取出数据的指针
  size_t left = slice.size();       //取出数据的长度

  // 如果有必要的话,对记录分段,并保存.
  // 注意如果slice是空的,我们也会迭代一次,保存一个长度为0的记录
  Status s;     //用来保存这次操作的结果
  bool begin = true;    //是否是一条记录的开始
  do {
    //计算出当前块剩下的空间
    const int leftover = kBlockSize - block_offset_;
    //使用assert进行断言,这边是一定会>=0的,assert这里只是为了帮助调试程序,实际使用时在编译时通过参数去除assert语句
    assert(leftover >= 0);

    //如果剩下的空间已经不足一条记录的header了
    if (leftover < kHeaderSize) {
      // 切换到新的块
      if (leftover > 0) {
        // 如果剩下的空间>0,需要使用\x00填充
        assert(kHeaderSize == 7);
        dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover));
      }
      //新的块偏移量是0
      block_offset_ = 0;
    }

    // 不变的道理:我们绝对不会让剩下的空间<kHeaderSize.
    assert(kBlockSize - block_offset_ - kHeaderSize >= 0);

    //当前还可用的空间
    const size_t avail = kBlockSize - block_offset_ - kHeaderSize;

    //记录片段的大小,取可用的空间大小和记录的大小的最小值
    const size_t fragment_length = (left < avail) ? left : avail;

    //当前的记录类型
    RecordType type;

    //如果记录的大小和片段的长度是一样的,表示到达了记录的末尾为true,否则false
    const bool end = (left == fragment_length);

    //记录类型的判断
    if (begin && end) {
      type = kFullType;
    } else if (begin) {
      type = kFirstType;
    } else if (end) {
      type = kLastType;
    } else {
      type = kMiddleType;
    }

    //写入到物理存储中,
    s = EmitPhysicalRecord(type, ptr, fragment_length);
    ptr += fragment_length;     //数据指针右移一个片段长度
    left -= fragment_length;    //数据的长度减去一个片段的长度
    begin = false;              //不再是开始了
  } while (s.ok() && left > 0); //如果没有出错,并且数据还剩就再循环

  return s;
}

AddRecord(const Slice& slice)调用了一个私有成员函数EmitPhysicalRecord(RecordType t, const char* ptr, size_t n),看一看这个将记录写入持久化存储是如何实现的。

Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) {
  assert(n <= 0xffff);  // n必须可以用2KB表示,因为一条记录的length是uint16大小的,little-endian
  assert(block_offset_ + kHeaderSize + n <= kBlockSize);

  // 对记录的header进行格式化
  char buf[kHeaderSize];
  //取低位字节
  buf[4] = static_cast<char>(n & 0xff);
  //取高位字节
  buf[5] = static_cast<char>(n >> 8);
  //一位的记录类型
  buf[6] = static_cast<char>(t);

  // 计算记录类型的crc值和有效载荷
  uint32_t crc = crc32c::Extend(type_crc_[t], ptr, n);
  crc = crc32c::Mask(crc);                 // Adjust for storage
  //填充4位的校验值
  EncodeFixed32(buf, crc);

  // 将记录的header写入
  Status s = dest_->Append(Slice(buf, kHeaderSize));
  if (s.ok()) {
    //将记录的数据写入
    s = dest_->Append(Slice(ptr, n));
    if (s.ok()) {
      //flush一下
      s = dest_->Flush();
    }
  }

  //块的偏移量右移header和数据的长度和
  block_offset_ += kHeaderSize + n;
  return s;
}

上面的代码分析其实就是log部分逻辑上的实现,将一条记录格式化成设计好的格式。这里有一个疑问就是dest_->Append(Slice& slice)dest_->Flush()是如何工作的。所以继续追踪下去,到include/leveldb/env.h中:

// A file abstraction for sequential writing.  The implementation
// must provide buffering since callers may append small fragments
// at a time to the file.
class WritableFile {
 public:
  WritableFile() { }
  virtual ~WritableFile();

  virtual Status Append(const Slice& data) = 0;
  virtual Status Close() = 0;
  virtual Status Flush() = 0;
  virtual Status Sync() = 0;

 private:
  // No copying allowed
  WritableFile(const WritableFile&);
  void operator=(const WritableFile&);
};

这部分是WritableFile的抽象部分,Append、Close、Flush、Sync都是纯虚函数,具体实现必须由用户自己去做。并且实现时必须提供缓冲区,这样调用者可以append小的数据段到文件中。

实际上leveldb中也有默认(posix)的实现。在util/env/env_posix.cc中。目前只看Append和Flush具体实现:

  virtual Status Append(const Slice& data) {
    size_t r = fwrite_unlocked(data.data(), 1, data.size(), file_);
    if (r != data.size()) {
      return IOError(filename_, errno);
    }
    return Status::OK();
  }
  virtual Status Flush() {
    if (fflush_unlocked(file_) != 0) {
      return IOError(filename_, errno);
    }
    return Status::OK();
  }

fwrite_unlocked和fflush_unlocked都是define的fwrite和fflush方法。

所以这里的设计是很好的案例。通过将WritableFile抽象化,提供给用户自己来实现,从而很轻松的就可以在多平台间移植。

四、日志的读取

在log_reader中,实现了Reader类,用来读取日志文件。
Reader的构造函数定义如下:

Reader(SequentialFile* file, Reporter* reporter, bool checksum,
         uint64_t initial_offset);

file是代表要读取的文件,reporter是用来报告日志文件中发现错误的,checksum是当前文件的校验码,initial_offset是当前文件中开始读取的物理位置。

Reader类中私有的成员变量和函数需要注意的有:

  SequentialFile* const file_;      //要读取的文件
  Reporter* const reporter_;        //报告错误的对象
  bool const checksum_;             //校验值
  char* const backing_store_;       //备份存储
  Slice buffer_;                    //缓冲
  bool eof_;   // Last Read() indicated EOF by returning < kBlockSize

  // Offset of the last record returned by ReadRecord.
  uint64_t last_record_offset_;         //最后一个Record的偏移量
  // Offset of the first location past the end of buffer_.
  uint64_t end_of_buffer_offset_;       //

  // Offset at which to start looking for the first record to return
  uint64_t const initial_offset_;   //第一个记录的初始偏移量

  // True if we are resynchronizing after a seek (initial_offset_ > 0). In
  // particular, a run of kMiddleType and kLastType records can be silently
  // skipped in this mode
  bool resyncing_;      //在一次查找后我们再同步则是True。特别的,kMiddleType和kLastType会被跳过   

  // Extend record types with the following special values
  enum {
    kEof = kMaxRecordType + 1,
    // 无效的记录值,可能有以下情况
    // * 具有无效的CRC值(checksum不正确)(ReadPhysicalRecord reports a drop)
    // * 长度为0的记录 (No drop is reported)
    // * The record is below constructor's initial_offset (No drop is reported)
    kBadRecord = kMaxRecordType + 2
  };

  bool SkipToInitialBlock();        //跳过直到initial_offset的位置,成功则返回true
  unsigned int ReadPhysicalRecord(Slice* result);   //读取Record,返回记录类型,或者kEof、kBadRecord

  void ReportCorruption(uint64_t bytes, const char* reason);    //汇报丢弃的字节,以及原因
  void ReportDrop(uint64_t bytes, const Status& reason);        //汇报丢弃的字节,以及原因

这些成员函数中着重看一下ReadPhysicalRecord

unsigned int Reader::ReadPhysicalRecord(Slice* result) {
  while (true) {
    if (buffer_.size() < kHeaderSize) {
      if (!eof_) {
        // Last read was a full read, so this is a trailer to skip
        buffer_.clear();
        Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
        end_of_buffer_offset_ += buffer_.size();
        if (!status.ok()) {
          buffer_.clear();
          ReportDrop(kBlockSize, status);
          eof_ = true;
          return kEof;
        } else if (buffer_.size() < kBlockSize) {
          eof_ = true;
        }
        continue;
      } else {
        // Note that if buffer_ is non-empty, we have a truncated header at the
        // end of the file, which can be caused by the writer crashing in the
        // middle of writing the header. Instead of considering this an error,
        // just report EOF.
        buffer_.clear();
        return kEof;
      }
    }

    // Parse the header
    const char* header = buffer_.data();
    const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff;
    const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff;
    const unsigned int type = header[6];
    const uint32_t length = a | (b << 8);
    if (kHeaderSize + length > buffer_.size()) {
      size_t drop_size = buffer_.size();
      buffer_.clear();
      if (!eof_) {
        ReportCorruption(drop_size, "bad record length");
        return kBadRecord;
      }
      // If the end of the file has been reached without reading |length| bytes
      // of payload, assume the writer died in the middle of writing the record.
      // Don't report a corruption.
      return kEof;
    }

    if (type == kZeroType && length == 0) {
      // Skip zero length record without reporting any drops since
      // such records are produced by the mmap based writing code in
      // env_posix.cc that preallocates file regions.
      buffer_.clear();
      return kBadRecord;
    }

    // Check crc
    if (checksum_) {
      uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header));
      uint32_t actual_crc = crc32c::Value(header + 6, 1 + length);
      if (actual_crc != expected_crc) {
        // Drop the rest of the buffer since "length" itself may have
        // been corrupted and if we trust it, we could find some
        // fragment of a real log record that just happens to look
        // like a valid log record.
        size_t drop_size = buffer_.size();
        buffer_.clear();
        ReportCorruption(drop_size, "checksum mismatch");
        return kBadRecord;
      }
    }

    buffer_.remove_prefix(kHeaderSize + length);

    // Skip physical record that started before initial_offset_
    if (end_of_buffer_offset_ - buffer_.size() - kHeaderSize - length <
        initial_offset_) {
      result->clear();
      return kBadRecord;
    }

    *result = Slice(header + kHeaderSize, length);
    return type;
  }
}

Reader类中公共的成员函数需要注意的有:
bool ReadRecord(Slice* record, std::string* scratch);
uint64_t LastRecordOffset();

ReadRecord用来读取下一个要读取的Record,如果读取成功,返回true,存储在record中,如果读到了结尾没有一个完整的Record,则存储在scratch中

bool Reader::ReadRecord(Slice* record, std::string* scratch) {
  if (last_record_offset_ < initial_offset_) {
    if (!SkipToInitialBlock()) {    //跳到初始的偏移位置
      return false;
    }
  }

  scratch->clear();
  record->clear();
  bool in_fragmented_record = false;
  // Record offset of the logical record that we're reading
  // 0 is a dummy value to make compilers happy
  uint64_t prospective_record_offset = 0;

  Slice fragment;
  while (true) {
    const unsigned int record_type = ReadPhysicalRecord(&fragment);

    // ReadPhysicalRecord may have only had an empty trailer remaining in its
    // internal buffer. Calculate the offset of the next physical record now
    // that it has returned, properly accounting for its header size.
    uint64_t physical_record_offset =
        end_of_buffer_offset_ - buffer_.size() - kHeaderSize - fragment.size();

    if (resyncing_) {
      if (record_type == kMiddleType) {
        continue;
      } else if (record_type == kLastType) {
        resyncing_ = false;
        continue;
      } else {
        resyncing_ = false;
      }
    }

    switch (record_type) {
      case kFullType:
        if (in_fragmented_record) {
          // Handle bug in earlier versions of log::Writer where
          // it could emit an empty kFirstType record at the tail end
          // of a block followed by a kFullType or kFirstType record
          // at the beginning of the next block.
          if (scratch->empty()) {
            in_fragmented_record = false;
          } else {
            ReportCorruption(scratch->size(), "partial record without end(1)");
          }
        }
        prospective_record_offset = physical_record_offset;
        scratch->clear();
        *record = fragment;
        last_record_offset_ = prospective_record_offset;
        return true;

      case kFirstType:
        if (in_fragmented_record) {
          // Handle bug in earlier versions of log::Writer where
          // it could emit an empty kFirstType record at the tail end
          // of a block followed by a kFullType or kFirstType record
          // at the beginning of the next block.
          if (scratch->empty()) {
            in_fragmented_record = false;
          } else {
            ReportCorruption(scratch->size(), "partial record without end(2)");
          }
        }
        prospective_record_offset = physical_record_offset;
        scratch->assign(fragment.data(), fragment.size());
        in_fragmented_record = true;
        break;

      case kMiddleType:
        if (!in_fragmented_record) {
          ReportCorruption(fragment.size(),
                           "missing start of fragmented record(1)");
        } else {
          scratch->append(fragment.data(), fragment.size());
        }
        break;

      case kLastType:
        if (!in_fragmented_record) {
          ReportCorruption(fragment.size(),
                           "missing start of fragmented record(2)");
        } else {
          scratch->append(fragment.data(), fragment.size());
          *record = Slice(*scratch);
          last_record_offset_ = prospective_record_offset;
          return true;
        }
        break;

      case kEof:
        if (in_fragmented_record) {
          // This can be caused by the writer dying immediately after
          // writing a physical record but before completing the next; don't
          // treat it as a corruption, just ignore the entire logical record.
          scratch->clear();
        }
        return false;

      case kBadRecord:
        if (in_fragmented_record) {
          ReportCorruption(scratch->size(), "error in middle of record");
          in_fragmented_record = false;
          scratch->clear();
        }
        break;

      default: {
        char buf[40];
        snprintf(buf, sizeof(buf), "unknown record type %u", record_type);
        ReportCorruption(
            (fragment.size() + (in_fragmented_record ? scratch->size() : 0)),
            buf);
        in_fragmented_record = false;
        scratch->clear();
        break;
      }
    }
  }
  return false;
}

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据