intmain(int argc, char** argv){ // Set up database connection information and open database leveldb::DB* db; leveldb::Options options; options.create_if_missing = true;
leveldb::Status status = leveldb::DB::Open(options, "./testdb", &db);
if (false == status.ok()) { cerr << "Unable to open/create test database './testdb'" << endl; cerr << status.ToString() << endl; return-1; }
// Add 256 values to the database leveldb::WriteOptions writeOptions; for (unsignedint i = 0; i < 256; ++i) { ostringstream keyStream; keyStream << "Key" << i;
ostringstream valueStream; valueStream << "Test data value: " << i;
An SSTable provides a persistent, ordered immutable map from keys to values, where both keys and values are arbitrary byte strings. Operations are provided to look up the value associated with a specified key, and to iterate over all key/value pairs in a specified key range. Internally, each SSTable contains a sequence of blocks (typically each block is 64KB in size, but this is configurable). A block index (stored at the end of the SSTable) is used to locate blocks; the index is loaded into memory when the SSTable is opened. A lookup can be performed with a single disk seek: we first find the appropriate block by performing a binary search in the in-memory index, and then reading the appropriate block from disk. Optionally, an SSTable can be completely mapped into memory, which allows us to perform lookups and scans without touching disk.
// 这里有可能别的线程帮助写入了,就是 DONE了,所以下面有个 Return,并且自己是 Head就真的开始写入,不然就是等 while (!w.done && &w != writers_.front()) { w.cv.Wait(); } if (w.done) { return w.status; }
// May temporarily unlock and wait. // MakeRoomForWrite 的主要逻辑是将 MemTable 变成 ImmuateMemTable, 然后创建一个新的 MemTable Status status = MakeRoomForWrite(updates == nullptr); uint64_t last_sequence = versions_->LastSequence(); // 这里的 last_writer 现在就是 Head Writer* last_writer = &w; if (status.ok() && updates != nullptr) { // nullptr batch is for compactions // BuildBatchGroup 会根据队列里面的值尽量的多一次性取出来去 Write // 在这里会更新 last_writer,也就是合并写入的最后一个 WriteBatch* write_batch = BuildBatchGroup(&last_writer); WriteBatchInternal::SetSequence(write_batch, last_sequence + 1); last_sequence += WriteBatchInternal::Count(write_batch);
// Add to log and apply to memtable. We can release the lock // during this phase since &w is currently responsible for logging // and protects against concurrent loggers and concurrent writes // into mem_. // 注意这一步解锁,很关键,因为接下来的写入可能是一个费时的过程,解锁后,其它线程可以Get,其它线程也可以继续将writer // 插入到writers_里面,但是插入后,因为不是头元素,会等待,所以不会冲突 { mutex_.Unlock(); // 写入log status = log_->AddRecord(WriteBatchInternal::Contents(write_batch)); bool sync_error = false; // 根据选项sync,不开就可以会丢数据 if (status.ok() && options.sync) { status = logfile_->Sync(); if (!status.ok()) { sync_error = true; } } if (status.ok()) { status = WriteBatchInternal::InsertInto(write_batch, mem_); } mutex_.Lock(); if (sync_error) { // The state of the log file is indeterminate: the log record we // just added may or may not show up when the DB is re-opened. // So we force the DB into a mode where all future writes fail. RecordBackgroundError(status); } } if (write_batch == tmp_batch_) tmp_batch_->Clear();
int refs; int allowed_seeks; // Seeks allowed until compaction uint64_t number; uint64_t file_size; // File size in bytes InternalKey smallest; // Smallest internal key served by table InternalKey largest; // Largest internal key served by table };
// List of files per level std::vector<FileMetaData*> files_[config::kNumLevels];
那么为什么要版本管理呢? 在数据不断写入后,MemTable写满了,这时候就会转换为Level 0的一个SSTable,或者Level n的一个文件和Level n + 1的多个文件进行Compaction,会转换成Level n + 1的多个文件。这会使SSTable文件数量改变,文件内容改变,也就是版本信息改变了,所以需要管理版本。
// Write new record to MANIFEST log if (s.ok()) { std::string record; // 在这里写入文件 edit->EncodeTo(&record); s = descriptor_log_->AddRecord(record); if (s.ok()) { s = descriptor_file_->Sync(); } if (!s.ok()) { Log(options_->info_log, "MANIFEST write: %s\n", s.ToString().c_str()); } }
// 这里更新了 Current 文件,指向对应的 MANIFEST if (s.ok() && !new_manifest_file.empty()) { s = SetCurrentFile(env_, dbname_, manifest_file_number_); }
Slice key = input->key(); // 当前输出文件的构建是否应该在指定key之前停下,若到达指定key时当前输出文件与level+2层的文件重合的内容数量超出阈值,则新起一个输出文件。 if (compact->compaction->ShouldStopBefore(key) && compact->builder != nullptr) { // 创建一个新的SSTable,写入文件 status = FinishCompactionOutputFile(compact, input); if (!status.ok()) { break; } }
// 这里要判断这个 Key 是不是要处理,用 drop 来标记 bool drop = false; if (!ParseInternalKey(key, &ikey)) { // Do not hide error keys current_user_key.clear(); has_current_user_key = false; last_sequence_for_key = kMaxSequenceNumber; } else { if (!has_current_user_key || user_comparator()->Compare(ikey.user_key, Slice(current_user_key)) != 0) { // 第一次遇见 user key。标记了下当前处理的一些状态 current_user_key.assign(ikey.user_key.data(), ikey.user_key.size()); has_current_user_key = true; last_sequence_for_key = kMaxSequenceNumber; }
// 如果上一个Key的SequenceNumber <= 最小的存活的Snapshot,这个Key就不会被任何线程看到了,可以被丢弃 if (last_sequence_for_key <= compact->smallest_snapshot) { // Hidden by an newer entry for same user key drop = true; // (A) } elseif (ikey.type == kTypeDeletion && ikey.sequence <= compact->smallest_snapshot && compact->compaction->IsBaseLevelForKey(ikey.user_key)) { // 如果碰到了一个删除操作,并且SequenceNumber <= 最小的Snapshot, // 通过IsBaseLevelForKey判断更高Level不会有这个User Key存在,那么这个Key就被丢弃 drop = true; }
last_sequence_for_key = ikey.sequence; }
if (!drop) { // Open output file if necessary if (compact->builder == nullptr) { status = OpenCompactionOutputFile(compact); if (!status.ok()) { break; } } if (compact->builder->NumEntries() == 0) { compact->current_output()->smallest.DecodeFrom(key); } compact->current_output()->largest.DecodeFrom(key); compact->builder->Add(key, input->value());
// 达到文件大小,就写入文件,生成新文件 if (compact->builder->FileSize() >= compact->compaction->MaxOutputFileSize()) { status = FinishCompactionOutputFile(compact, input); if (!status.ok()) { break; } } }
input->Next(); }
if (status.ok()) { // 创建压缩的结果对象,下解释 status = InstallCompactionResults(compact); } return status; }
// 将变更的记录都放到 edit 里面,然后触发 LogAndApply Status DBImpl::InstallCompactionResults(CompactionState* compact){ mutex_.AssertHeld(); // Add compaction outputs compact->compaction->AddInputDeletions(compact->compaction->edit()); constint level = compact->compaction->level(); for (size_t i = 0; i < compact->outputs.size(); i++) { const CompactionState::Output& out = compact->outputs[i]; compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size, out.smallest, out.largest); } return versions_->LogAndApply(compact->compaction->edit(), &mutex_); }
Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest){ mutex_.AssertHeld();
// 创建数据库目录 env_->CreateDir(dbname_); assert(db_lock_ == nullptr); // 文件锁,禁止多进程的操作 Status s = env_->LockFile(LockFileName(dbname_), &db_lock_); if (!s.ok()) { return s; }
// 干净的就创建一个单独的 DB,不然就 恢复 if (!env_->FileExists(CurrentFileName(dbname_))) { if (options_.create_if_missing) { Log(options_.info_log, "Creating DB %s since it was missing.", dbname_.c_str()); s = NewDB(); if (!s.ok()) { return s; } } else { return Status::InvalidArgument( dbname_, "does not exist (create_if_missing is false)"); } } else { if (options_.error_if_exists) { return Status::InvalidArgument(dbname_, "exists (error_if_exists is true)"); } } // 从 Manifest 恢复最后的版本 s = versions_->Recover(save_manifest); if (!s.ok()) { return s; } SequenceNumber max_sequence(0); // 之前的MANIFEST恢复,会得到版本信息,里面包含了之前的log number // 搜索文件系统里的log,如果这些日志的编号 >= 这个log number,那么这些 // 日志都是关闭时丢失的数据,需要恢复,这里将日志按顺序存储在logs里面 constuint64_t min_log = versions_->LogNumber(); constuint64_t prev_log = versions_->PrevLogNumber(); std::vector<std::string> filenames; s = env_->GetChildren(dbname_, &filenames); if (!s.ok()) { return s; } std::set<uint64_t> expected; versions_->AddLiveFiles(&expected); uint64_t number; FileType type; std::vector<uint64_t> logs; for (size_t i = 0; i < filenames.size(); i++) { if (ParseFileName(filenames[i], &number, &type)) { expected.erase(number); if (type == kLogFile && ((number >= min_log) || (number == prev_log))) logs.push_back(number); } } if (!expected.empty()) { char buf[50]; std::snprintf(buf, sizeof(buf), "%d missing files; e.g.", static_cast<int>(expected.size())); return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin()))); }
// Recover in the order in which the logs were generated std::sort(logs.begin(), logs.end()); for (size_t i = 0; i < logs.size(); i++) { s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit, &max_sequence); if (!s.ok()) { return s; }
// The previous incarnation may not have written any MANIFEST // records after allocating this log number. So we manually // update the file number allocation counter in VersionSet. versions_->MarkFileNumberUsed(logs[i]); }
if (versions_->LastSequence() < max_sequence) { versions_->SetLastSequence(max_sequence); }
Status VersionSet::Recover(bool* save_manifest){ structLogReporter : public log::Reader::Reporter { Status* status; voidCorruption(size_t bytes, const Status& s)override{ if (this->status->ok()) *this->status = s; } };
// 读取CURRENT文件的内容,获取当前使用的MANIFEST文件 // 读取MANIFEST文件,将里面的VersionEdit读取应用到一个builder里 std::string current; Status s = ReadFileToString(env_, CurrentFileName(dbname_), ¤t); if (!s.ok()) { return s; } if (current.empty() || current[current.size() - 1] != '\n') { return Status::Corruption("CURRENT file does not end with newline"); } current.resize(current.size() - 1);
std::string dscname = dbname_ + "/" + current; SequentialFile* file; s = env_->NewSequentialFile(dscname, &file); if (!s.ok()) { if (s.IsNotFound()) { return Status::Corruption("CURRENT points to a non-existent file", s.ToString()); } return s; }
if (s.ok()) { if (!have_next_file) { s = Status::Corruption("no meta-nextfile entry in descriptor"); } elseif (!have_log_number) { s = Status::Corruption("no meta-lognumber entry in descriptor"); } elseif (!have_last_sequence) { s = Status::Corruption("no last-sequence-number entry in descriptor"); }
if (!have_prev_log_number) { prev_log_number = 0; }
// See if we can reuse the existing MANIFEST file. if (ReuseManifest(dscname, current)) { // No need to save new manifest } else { *save_manifest = true; } } else { std::string error = s.ToString(); Log(options_->info_log, "Error recovering version set with %d records: %s", read_records, error.c_str()); }