xref: /aosp_15_r20/external/leveldb/db/db_impl.cc (revision 9507f98c5f32dee4b5f9e4a38cd499f3ff5c4490)
1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
4 
5 #include "db/db_impl.h"
6 
7 #include <algorithm>
8 #include <atomic>
9 #include <cstdint>
10 #include <cstdio>
11 #include <set>
12 #include <string>
13 #include <vector>
14 
15 #include "db/builder.h"
16 #include "db/db_iter.h"
17 #include "db/dbformat.h"
18 #include "db/filename.h"
19 #include "db/log_reader.h"
20 #include "db/log_writer.h"
21 #include "db/memtable.h"
22 #include "db/table_cache.h"
23 #include "db/version_set.h"
24 #include "db/write_batch_internal.h"
25 #include "leveldb/db.h"
26 #include "leveldb/env.h"
27 #include "leveldb/status.h"
28 #include "leveldb/table.h"
29 #include "leveldb/table_builder.h"
30 #include "port/port.h"
31 #include "table/block.h"
32 #include "table/merger.h"
33 #include "table/two_level_iterator.h"
34 #include "util/coding.h"
35 #include "util/logging.h"
36 #include "util/mutexlock.h"
37 
38 namespace leveldb {
39 
40 const int kNumNonTableCacheFiles = 10;
41 
42 // Information kept for every waiting writer
43 struct DBImpl::Writer {
Writerleveldb::DBImpl::Writer44   explicit Writer(port::Mutex* mu)
45       : batch(nullptr), sync(false), done(false), cv(mu) {}
46 
47   Status status;
48   WriteBatch* batch;
49   bool sync;
50   bool done;
51   port::CondVar cv;
52 };
53 
54 struct DBImpl::CompactionState {
55   // Files produced by compaction
56   struct Output {
57     uint64_t number;
58     uint64_t file_size;
59     InternalKey smallest, largest;
60   };
61 
current_outputleveldb::DBImpl::CompactionState62   Output* current_output() { return &outputs[outputs.size() - 1]; }
63 
CompactionStateleveldb::DBImpl::CompactionState64   explicit CompactionState(Compaction* c)
65       : compaction(c),
66         smallest_snapshot(0),
67         outfile(nullptr),
68         builder(nullptr),
69         total_bytes(0) {}
70 
71   Compaction* const compaction;
72 
73   // Sequence numbers < smallest_snapshot are not significant since we
74   // will never have to service a snapshot below smallest_snapshot.
75   // Therefore if we have seen a sequence number S <= smallest_snapshot,
76   // we can drop all entries for the same key with sequence numbers < S.
77   SequenceNumber smallest_snapshot;
78 
79   std::vector<Output> outputs;
80 
81   // State kept for output being generated
82   WritableFile* outfile;
83   TableBuilder* builder;
84 
85   uint64_t total_bytes;
86 };
87 
88 // Fix user-supplied options to be reasonable
89 template <class T, class V>
ClipToRange(T * ptr,V minvalue,V maxvalue)90 static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
91   if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue;
92   if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue;
93 }
SanitizeOptions(const std::string & dbname,const InternalKeyComparator * icmp,const InternalFilterPolicy * ipolicy,const Options & src)94 Options SanitizeOptions(const std::string& dbname,
95                         const InternalKeyComparator* icmp,
96                         const InternalFilterPolicy* ipolicy,
97                         const Options& src) {
98   Options result = src;
99   result.comparator = icmp;
100   result.filter_policy = (src.filter_policy != nullptr) ? ipolicy : nullptr;
101   ClipToRange(&result.max_open_files, 64 + kNumNonTableCacheFiles, 50000);
102   ClipToRange(&result.write_buffer_size, 64 << 10, 1 << 30);
103   ClipToRange(&result.max_file_size, 1 << 20, 1 << 30);
104   ClipToRange(&result.block_size, 1 << 10, 4 << 20);
105   if (result.info_log == nullptr) {
106     // Open a log file in the same directory as the db
107     src.env->CreateDir(dbname);  // In case it does not exist
108     src.env->RenameFile(InfoLogFileName(dbname), OldInfoLogFileName(dbname));
109     Status s = src.env->NewLogger(InfoLogFileName(dbname), &result.info_log);
110     if (!s.ok()) {
111       // No place suitable for logging
112       result.info_log = nullptr;
113     }
114   }
115   if (result.block_cache == nullptr) {
116     result.block_cache = NewLRUCache(8 << 20);
117   }
118   return result;
119 }
120 
TableCacheSize(const Options & sanitized_options)121 static int TableCacheSize(const Options& sanitized_options) {
122   // Reserve ten files or so for other uses and give the rest to TableCache.
123   return sanitized_options.max_open_files - kNumNonTableCacheFiles;
124 }
125 
DBImpl(const Options & raw_options,const std::string & dbname)126 DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
127     : env_(raw_options.env),
128       internal_comparator_(raw_options.comparator),
129       internal_filter_policy_(raw_options.filter_policy),
130       options_(SanitizeOptions(dbname, &internal_comparator_,
131                                &internal_filter_policy_, raw_options)),
132       owns_info_log_(options_.info_log != raw_options.info_log),
133       owns_cache_(options_.block_cache != raw_options.block_cache),
134       dbname_(dbname),
135       table_cache_(new TableCache(dbname_, options_, TableCacheSize(options_))),
136       db_lock_(nullptr),
137       shutting_down_(false),
138       background_work_finished_signal_(&mutex_),
139       mem_(nullptr),
140       imm_(nullptr),
141       has_imm_(false),
142       logfile_(nullptr),
143       logfile_number_(0),
144       log_(nullptr),
145       seed_(0),
146       tmp_batch_(new WriteBatch),
147       background_compaction_scheduled_(false),
148       manual_compaction_(nullptr),
149       versions_(new VersionSet(dbname_, &options_, table_cache_,
150                                &internal_comparator_)) {}
151 
~DBImpl()152 DBImpl::~DBImpl() {
153   // Wait for background work to finish.
154   mutex_.Lock();
155   shutting_down_.store(true, std::memory_order_release);
156   while (background_compaction_scheduled_) {
157     background_work_finished_signal_.Wait();
158   }
159   mutex_.Unlock();
160 
161   if (db_lock_ != nullptr) {
162     env_->UnlockFile(db_lock_);
163   }
164 
165   delete versions_;
166   if (mem_ != nullptr) mem_->Unref();
167   if (imm_ != nullptr) imm_->Unref();
168   delete tmp_batch_;
169   delete log_;
170   delete logfile_;
171   delete table_cache_;
172 
173   if (owns_info_log_) {
174     delete options_.info_log;
175   }
176   if (owns_cache_) {
177     delete options_.block_cache;
178   }
179 }
180 
NewDB()181 Status DBImpl::NewDB() {
182   VersionEdit new_db;
183   new_db.SetComparatorName(user_comparator()->Name());
184   new_db.SetLogNumber(0);
185   new_db.SetNextFile(2);
186   new_db.SetLastSequence(0);
187 
188   const std::string manifest = DescriptorFileName(dbname_, 1);
189   WritableFile* file;
190   Status s = env_->NewWritableFile(manifest, &file);
191   if (!s.ok()) {
192     return s;
193   }
194   {
195     log::Writer log(file);
196     std::string record;
197     new_db.EncodeTo(&record);
198     s = log.AddRecord(record);
199     if (s.ok()) {
200       s = file->Sync();
201     }
202     if (s.ok()) {
203       s = file->Close();
204     }
205   }
206   delete file;
207   if (s.ok()) {
208     // Make "CURRENT" file that points to the new manifest file.
209     s = SetCurrentFile(env_, dbname_, 1);
210   } else {
211     env_->RemoveFile(manifest);
212   }
213   return s;
214 }
215 
MaybeIgnoreError(Status * s) const216 void DBImpl::MaybeIgnoreError(Status* s) const {
217   if (s->ok() || options_.paranoid_checks) {
218     // No change needed
219   } else {
220     Log(options_.info_log, "Ignoring error %s", s->ToString().c_str());
221     *s = Status::OK();
222   }
223 }
224 
RemoveObsoleteFiles()225 void DBImpl::RemoveObsoleteFiles() {
226   mutex_.AssertHeld();
227 
228   if (!bg_error_.ok()) {
229     // After a background error, we don't know whether a new version may
230     // or may not have been committed, so we cannot safely garbage collect.
231     return;
232   }
233 
234   // Make a set of all of the live files
235   std::set<uint64_t> live = pending_outputs_;
236   versions_->AddLiveFiles(&live);
237 
238   std::vector<std::string> filenames;
239   env_->GetChildren(dbname_, &filenames);  // Ignoring errors on purpose
240   uint64_t number;
241   FileType type;
242   std::vector<std::string> files_to_delete;
243   for (std::string& filename : filenames) {
244     if (ParseFileName(filename, &number, &type)) {
245       bool keep = true;
246       switch (type) {
247         case kLogFile:
248           keep = ((number >= versions_->LogNumber()) ||
249                   (number == versions_->PrevLogNumber()));
250           break;
251         case kDescriptorFile:
252           // Keep my manifest file, and any newer incarnations'
253           // (in case there is a race that allows other incarnations)
254           keep = (number >= versions_->ManifestFileNumber());
255           break;
256         case kTableFile:
257           keep = (live.find(number) != live.end());
258           break;
259         case kTempFile:
260           // Any temp files that are currently being written to must
261           // be recorded in pending_outputs_, which is inserted into "live"
262           keep = (live.find(number) != live.end());
263           break;
264         case kCurrentFile:
265         case kDBLockFile:
266         case kInfoLogFile:
267           keep = true;
268           break;
269       }
270 
271       if (!keep) {
272         files_to_delete.push_back(std::move(filename));
273         if (type == kTableFile) {
274           table_cache_->Evict(number);
275         }
276         Log(options_.info_log, "Delete type=%d #%lld\n", static_cast<int>(type),
277             static_cast<unsigned long long>(number));
278       }
279     }
280   }
281 
282   // While deleting all files unblock other threads. All files being deleted
283   // have unique names which will not collide with newly created files and
284   // are therefore safe to delete while allowing other threads to proceed.
285   mutex_.Unlock();
286   for (const std::string& filename : files_to_delete) {
287     env_->RemoveFile(dbname_ + "/" + filename);
288   }
289   mutex_.Lock();
290 }
291 
Recover(VersionEdit * edit,bool * save_manifest)292 Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest) {
293   mutex_.AssertHeld();
294 
295   // Ignore error from CreateDir since the creation of the DB is
296   // committed only when the descriptor is created, and this directory
297   // may already exist from a previous failed creation attempt.
298   env_->CreateDir(dbname_);
299   assert(db_lock_ == nullptr);
300   Status s = env_->LockFile(LockFileName(dbname_), &db_lock_);
301   if (!s.ok()) {
302     return s;
303   }
304 
305   if (!env_->FileExists(CurrentFileName(dbname_))) {
306     if (options_.create_if_missing) {
307       Log(options_.info_log, "Creating DB %s since it was missing.",
308           dbname_.c_str());
309       s = NewDB();
310       if (!s.ok()) {
311         return s;
312       }
313     } else {
314       return Status::InvalidArgument(
315           dbname_, "does not exist (create_if_missing is false)");
316     }
317   } else {
318     if (options_.error_if_exists) {
319       return Status::InvalidArgument(dbname_,
320                                      "exists (error_if_exists is true)");
321     }
322   }
323 
324   s = versions_->Recover(save_manifest);
325   if (!s.ok()) {
326     return s;
327   }
328   SequenceNumber max_sequence(0);
329 
330   // Recover from all newer log files than the ones named in the
331   // descriptor (new log files may have been added by the previous
332   // incarnation without registering them in the descriptor).
333   //
334   // Note that PrevLogNumber() is no longer used, but we pay
335   // attention to it in case we are recovering a database
336   // produced by an older version of leveldb.
337   const uint64_t min_log = versions_->LogNumber();
338   const uint64_t prev_log = versions_->PrevLogNumber();
339   std::vector<std::string> filenames;
340   s = env_->GetChildren(dbname_, &filenames);
341   if (!s.ok()) {
342     return s;
343   }
344   std::set<uint64_t> expected;
345   versions_->AddLiveFiles(&expected);
346   uint64_t number;
347   FileType type;
348   std::vector<uint64_t> logs;
349   for (size_t i = 0; i < filenames.size(); i++) {
350     if (ParseFileName(filenames[i], &number, &type)) {
351       expected.erase(number);
352       if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
353         logs.push_back(number);
354     }
355   }
356   if (!expected.empty()) {
357     char buf[50];
358     std::snprintf(buf, sizeof(buf), "%d missing files; e.g.",
359                   static_cast<int>(expected.size()));
360     return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin())));
361   }
362 
363   // Recover in the order in which the logs were generated
364   std::sort(logs.begin(), logs.end());
365   for (size_t i = 0; i < logs.size(); i++) {
366     s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit,
367                        &max_sequence);
368     if (!s.ok()) {
369       return s;
370     }
371 
372     // The previous incarnation may not have written any MANIFEST
373     // records after allocating this log number.  So we manually
374     // update the file number allocation counter in VersionSet.
375     versions_->MarkFileNumberUsed(logs[i]);
376   }
377 
378   if (versions_->LastSequence() < max_sequence) {
379     versions_->SetLastSequence(max_sequence);
380   }
381 
382   return Status::OK();
383 }
384 
RecoverLogFile(uint64_t log_number,bool last_log,bool * save_manifest,VersionEdit * edit,SequenceNumber * max_sequence)385 Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log,
386                               bool* save_manifest, VersionEdit* edit,
387                               SequenceNumber* max_sequence) {
388   struct LogReporter : public log::Reader::Reporter {
389     Env* env;
390     Logger* info_log;
391     const char* fname;
392     Status* status;  // null if options_.paranoid_checks==false
393     void Corruption(size_t bytes, const Status& s) override {
394       Log(info_log, "%s%s: dropping %d bytes; %s",
395           (this->status == nullptr ? "(ignoring error) " : ""), fname,
396           static_cast<int>(bytes), s.ToString().c_str());
397       if (this->status != nullptr && this->status->ok()) *this->status = s;
398     }
399   };
400 
401   mutex_.AssertHeld();
402 
403   // Open the log file
404   std::string fname = LogFileName(dbname_, log_number);
405   SequentialFile* file;
406   Status status = env_->NewSequentialFile(fname, &file);
407   if (!status.ok()) {
408     MaybeIgnoreError(&status);
409     return status;
410   }
411 
412   // Create the log reader.
413   LogReporter reporter;
414   reporter.env = env_;
415   reporter.info_log = options_.info_log;
416   reporter.fname = fname.c_str();
417   reporter.status = (options_.paranoid_checks ? &status : nullptr);
418   // We intentionally make log::Reader do checksumming even if
419   // paranoid_checks==false so that corruptions cause entire commits
420   // to be skipped instead of propagating bad information (like overly
421   // large sequence numbers).
422   log::Reader reader(file, &reporter, true /*checksum*/, 0 /*initial_offset*/);
423   Log(options_.info_log, "Recovering log #%llu",
424       (unsigned long long)log_number);
425 
426   // Read all the records and add to a memtable
427   std::string scratch;
428   Slice record;
429   WriteBatch batch;
430   int compactions = 0;
431   MemTable* mem = nullptr;
432   while (reader.ReadRecord(&record, &scratch) && status.ok()) {
433     if (record.size() < 12) {
434       reporter.Corruption(record.size(),
435                           Status::Corruption("log record too small"));
436       continue;
437     }
438     WriteBatchInternal::SetContents(&batch, record);
439 
440     if (mem == nullptr) {
441       mem = new MemTable(internal_comparator_);
442       mem->Ref();
443     }
444     status = WriteBatchInternal::InsertInto(&batch, mem);
445     MaybeIgnoreError(&status);
446     if (!status.ok()) {
447       break;
448     }
449     const SequenceNumber last_seq = WriteBatchInternal::Sequence(&batch) +
450                                     WriteBatchInternal::Count(&batch) - 1;
451     if (last_seq > *max_sequence) {
452       *max_sequence = last_seq;
453     }
454 
455     if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
456       compactions++;
457       *save_manifest = true;
458       status = WriteLevel0Table(mem, edit, nullptr);
459       mem->Unref();
460       mem = nullptr;
461       if (!status.ok()) {
462         // Reflect errors immediately so that conditions like full
463         // file-systems cause the DB::Open() to fail.
464         break;
465       }
466     }
467   }
468 
469   delete file;
470 
471   // See if we should keep reusing the last log file.
472   if (status.ok() && options_.reuse_logs && last_log && compactions == 0) {
473     assert(logfile_ == nullptr);
474     assert(log_ == nullptr);
475     assert(mem_ == nullptr);
476     uint64_t lfile_size;
477     if (env_->GetFileSize(fname, &lfile_size).ok() &&
478         env_->NewAppendableFile(fname, &logfile_).ok()) {
479       Log(options_.info_log, "Reusing old log %s \n", fname.c_str());
480       log_ = new log::Writer(logfile_, lfile_size);
481       logfile_number_ = log_number;
482       if (mem != nullptr) {
483         mem_ = mem;
484         mem = nullptr;
485       } else {
486         // mem can be nullptr if lognum exists but was empty.
487         mem_ = new MemTable(internal_comparator_);
488         mem_->Ref();
489       }
490     }
491   }
492 
493   if (mem != nullptr) {
494     // mem did not get reused; compact it.
495     if (status.ok()) {
496       *save_manifest = true;
497       status = WriteLevel0Table(mem, edit, nullptr);
498     }
499     mem->Unref();
500   }
501 
502   return status;
503 }
504 
WriteLevel0Table(MemTable * mem,VersionEdit * edit,Version * base)505 Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
506                                 Version* base) {
507   mutex_.AssertHeld();
508   const uint64_t start_micros = env_->NowMicros();
509   FileMetaData meta;
510   meta.number = versions_->NewFileNumber();
511   pending_outputs_.insert(meta.number);
512   Iterator* iter = mem->NewIterator();
513   Log(options_.info_log, "Level-0 table #%llu: started",
514       (unsigned long long)meta.number);
515 
516   Status s;
517   {
518     mutex_.Unlock();
519     s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);
520     mutex_.Lock();
521   }
522 
523   Log(options_.info_log, "Level-0 table #%llu: %lld bytes %s",
524       (unsigned long long)meta.number, (unsigned long long)meta.file_size,
525       s.ToString().c_str());
526   delete iter;
527   pending_outputs_.erase(meta.number);
528 
529   // Note that if file_size is zero, the file has been deleted and
530   // should not be added to the manifest.
531   int level = 0;
532   if (s.ok() && meta.file_size > 0) {
533     const Slice min_user_key = meta.smallest.user_key();
534     const Slice max_user_key = meta.largest.user_key();
535     if (base != nullptr) {
536       level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
537     }
538     edit->AddFile(level, meta.number, meta.file_size, meta.smallest,
539                   meta.largest);
540   }
541 
542   CompactionStats stats;
543   stats.micros = env_->NowMicros() - start_micros;
544   stats.bytes_written = meta.file_size;
545   stats_[level].Add(stats);
546   return s;
547 }
548 
CompactMemTable()549 void DBImpl::CompactMemTable() {
550   mutex_.AssertHeld();
551   assert(imm_ != nullptr);
552 
553   // Save the contents of the memtable as a new Table
554   VersionEdit edit;
555   Version* base = versions_->current();
556   base->Ref();
557   Status s = WriteLevel0Table(imm_, &edit, base);
558   base->Unref();
559 
560   if (s.ok() && shutting_down_.load(std::memory_order_acquire)) {
561     s = Status::IOError("Deleting DB during memtable compaction");
562   }
563 
564   // Replace immutable memtable with the generated Table
565   if (s.ok()) {
566     edit.SetPrevLogNumber(0);
567     edit.SetLogNumber(logfile_number_);  // Earlier logs no longer needed
568     s = versions_->LogAndApply(&edit, &mutex_);
569   }
570 
571   if (s.ok()) {
572     // Commit to the new state
573     imm_->Unref();
574     imm_ = nullptr;
575     has_imm_.store(false, std::memory_order_release);
576     RemoveObsoleteFiles();
577   } else {
578     RecordBackgroundError(s);
579   }
580 }
581 
CompactRange(const Slice * begin,const Slice * end)582 void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
583   int max_level_with_files = 1;
584   {
585     MutexLock l(&mutex_);
586     Version* base = versions_->current();
587     for (int level = 1; level < config::kNumLevels; level++) {
588       if (base->OverlapInLevel(level, begin, end)) {
589         max_level_with_files = level;
590       }
591     }
592   }
593   TEST_CompactMemTable();  // TODO(sanjay): Skip if memtable does not overlap
594   for (int level = 0; level < max_level_with_files; level++) {
595     TEST_CompactRange(level, begin, end);
596   }
597 }
598 
TEST_CompactRange(int level,const Slice * begin,const Slice * end)599 void DBImpl::TEST_CompactRange(int level, const Slice* begin,
600                                const Slice* end) {
601   assert(level >= 0);
602   assert(level + 1 < config::kNumLevels);
603 
604   InternalKey begin_storage, end_storage;
605 
606   ManualCompaction manual;
607   manual.level = level;
608   manual.done = false;
609   if (begin == nullptr) {
610     manual.begin = nullptr;
611   } else {
612     begin_storage = InternalKey(*begin, kMaxSequenceNumber, kValueTypeForSeek);
613     manual.begin = &begin_storage;
614   }
615   if (end == nullptr) {
616     manual.end = nullptr;
617   } else {
618     end_storage = InternalKey(*end, 0, static_cast<ValueType>(0));
619     manual.end = &end_storage;
620   }
621 
622   MutexLock l(&mutex_);
623   while (!manual.done && !shutting_down_.load(std::memory_order_acquire) &&
624          bg_error_.ok()) {
625     if (manual_compaction_ == nullptr) {  // Idle
626       manual_compaction_ = &manual;
627       MaybeScheduleCompaction();
628     } else {  // Running either my compaction or another compaction.
629       background_work_finished_signal_.Wait();
630     }
631   }
632   if (manual_compaction_ == &manual) {
633     // Cancel my manual compaction since we aborted early for some reason.
634     manual_compaction_ = nullptr;
635   }
636 }
637 
TEST_CompactMemTable()638 Status DBImpl::TEST_CompactMemTable() {
639   // nullptr batch means just wait for earlier writes to be done
640   Status s = Write(WriteOptions(), nullptr);
641   if (s.ok()) {
642     // Wait until the compaction completes
643     MutexLock l(&mutex_);
644     while (imm_ != nullptr && bg_error_.ok()) {
645       background_work_finished_signal_.Wait();
646     }
647     if (imm_ != nullptr) {
648       s = bg_error_;
649     }
650   }
651   return s;
652 }
653 
RecordBackgroundError(const Status & s)654 void DBImpl::RecordBackgroundError(const Status& s) {
655   mutex_.AssertHeld();
656   if (bg_error_.ok()) {
657     bg_error_ = s;
658     background_work_finished_signal_.SignalAll();
659   }
660 }
661 
MaybeScheduleCompaction()662 void DBImpl::MaybeScheduleCompaction() {
663   mutex_.AssertHeld();
664   if (background_compaction_scheduled_) {
665     // Already scheduled
666   } else if (shutting_down_.load(std::memory_order_acquire)) {
667     // DB is being deleted; no more background compactions
668   } else if (!bg_error_.ok()) {
669     // Already got an error; no more changes
670   } else if (imm_ == nullptr && manual_compaction_ == nullptr &&
671              !versions_->NeedsCompaction()) {
672     // No work to be done
673   } else {
674     background_compaction_scheduled_ = true;
675     env_->Schedule(&DBImpl::BGWork, this);
676   }
677 }
678 
BGWork(void * db)679 void DBImpl::BGWork(void* db) {
680   reinterpret_cast<DBImpl*>(db)->BackgroundCall();
681 }
682 
BackgroundCall()683 void DBImpl::BackgroundCall() {
684   MutexLock l(&mutex_);
685   assert(background_compaction_scheduled_);
686   if (shutting_down_.load(std::memory_order_acquire)) {
687     // No more background work when shutting down.
688   } else if (!bg_error_.ok()) {
689     // No more background work after a background error.
690   } else {
691     BackgroundCompaction();
692   }
693 
694   background_compaction_scheduled_ = false;
695 
696   // Previous compaction may have produced too many files in a level,
697   // so reschedule another compaction if needed.
698   MaybeScheduleCompaction();
699   background_work_finished_signal_.SignalAll();
700 }
701 
BackgroundCompaction()702 void DBImpl::BackgroundCompaction() {
703   mutex_.AssertHeld();
704 
705   if (imm_ != nullptr) {
706     CompactMemTable();
707     return;
708   }
709 
710   Compaction* c;
711   bool is_manual = (manual_compaction_ != nullptr);
712   InternalKey manual_end;
713   if (is_manual) {
714     ManualCompaction* m = manual_compaction_;
715     c = versions_->CompactRange(m->level, m->begin, m->end);
716     m->done = (c == nullptr);
717     if (c != nullptr) {
718       manual_end = c->input(0, c->num_input_files(0) - 1)->largest;
719     }
720     Log(options_.info_log,
721         "Manual compaction at level-%d from %s .. %s; will stop at %s\n",
722         m->level, (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
723         (m->end ? m->end->DebugString().c_str() : "(end)"),
724         (m->done ? "(end)" : manual_end.DebugString().c_str()));
725   } else {
726     c = versions_->PickCompaction();
727   }
728 
729   Status status;
730   if (c == nullptr) {
731     // Nothing to do
732   } else if (!is_manual && c->IsTrivialMove()) {
733     // Move file to next level
734     assert(c->num_input_files(0) == 1);
735     FileMetaData* f = c->input(0, 0);
736     c->edit()->RemoveFile(c->level(), f->number);
737     c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest,
738                        f->largest);
739     status = versions_->LogAndApply(c->edit(), &mutex_);
740     if (!status.ok()) {
741       RecordBackgroundError(status);
742     }
743     VersionSet::LevelSummaryStorage tmp;
744     Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
745         static_cast<unsigned long long>(f->number), c->level() + 1,
746         static_cast<unsigned long long>(f->file_size),
747         status.ToString().c_str(), versions_->LevelSummary(&tmp));
748   } else {
749     CompactionState* compact = new CompactionState(c);
750     status = DoCompactionWork(compact);
751     if (!status.ok()) {
752       RecordBackgroundError(status);
753     }
754     CleanupCompaction(compact);
755     c->ReleaseInputs();
756     RemoveObsoleteFiles();
757   }
758   delete c;
759 
760   if (status.ok()) {
761     // Done
762   } else if (shutting_down_.load(std::memory_order_acquire)) {
763     // Ignore compaction errors found during shutting down
764   } else {
765     Log(options_.info_log, "Compaction error: %s", status.ToString().c_str());
766   }
767 
768   if (is_manual) {
769     ManualCompaction* m = manual_compaction_;
770     if (!status.ok()) {
771       m->done = true;
772     }
773     if (!m->done) {
774       // We only compacted part of the requested range.  Update *m
775       // to the range that is left to be compacted.
776       m->tmp_storage = manual_end;
777       m->begin = &m->tmp_storage;
778     }
779     manual_compaction_ = nullptr;
780   }
781 }
782 
CleanupCompaction(CompactionState * compact)783 void DBImpl::CleanupCompaction(CompactionState* compact) {
784   mutex_.AssertHeld();
785   if (compact->builder != nullptr) {
786     // May happen if we get a shutdown call in the middle of compaction
787     compact->builder->Abandon();
788     delete compact->builder;
789   } else {
790     assert(compact->outfile == nullptr);
791   }
792   delete compact->outfile;
793   for (size_t i = 0; i < compact->outputs.size(); i++) {
794     const CompactionState::Output& out = compact->outputs[i];
795     pending_outputs_.erase(out.number);
796   }
797   delete compact;
798 }
799 
OpenCompactionOutputFile(CompactionState * compact)800 Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
801   assert(compact != nullptr);
802   assert(compact->builder == nullptr);
803   uint64_t file_number;
804   {
805     mutex_.Lock();
806     file_number = versions_->NewFileNumber();
807     pending_outputs_.insert(file_number);
808     CompactionState::Output out;
809     out.number = file_number;
810     out.smallest.Clear();
811     out.largest.Clear();
812     compact->outputs.push_back(out);
813     mutex_.Unlock();
814   }
815 
816   // Make the output file
817   std::string fname = TableFileName(dbname_, file_number);
818   Status s = env_->NewWritableFile(fname, &compact->outfile);
819   if (s.ok()) {
820     compact->builder = new TableBuilder(options_, compact->outfile);
821   }
822   return s;
823 }
824 
FinishCompactionOutputFile(CompactionState * compact,Iterator * input)825 Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
826                                           Iterator* input) {
827   assert(compact != nullptr);
828   assert(compact->outfile != nullptr);
829   assert(compact->builder != nullptr);
830 
831   const uint64_t output_number = compact->current_output()->number;
832   assert(output_number != 0);
833 
834   // Check for iterator errors
835   Status s = input->status();
836   const uint64_t current_entries = compact->builder->NumEntries();
837   if (s.ok()) {
838     s = compact->builder->Finish();
839   } else {
840     compact->builder->Abandon();
841   }
842   const uint64_t current_bytes = compact->builder->FileSize();
843   compact->current_output()->file_size = current_bytes;
844   compact->total_bytes += current_bytes;
845   delete compact->builder;
846   compact->builder = nullptr;
847 
848   // Finish and check for file errors
849   if (s.ok()) {
850     s = compact->outfile->Sync();
851   }
852   if (s.ok()) {
853     s = compact->outfile->Close();
854   }
855   delete compact->outfile;
856   compact->outfile = nullptr;
857 
858   if (s.ok() && current_entries > 0) {
859     // Verify that the table is usable
860     Iterator* iter =
861         table_cache_->NewIterator(ReadOptions(), output_number, current_bytes);
862     s = iter->status();
863     delete iter;
864     if (s.ok()) {
865       Log(options_.info_log, "Generated table #%llu@%d: %lld keys, %lld bytes",
866           (unsigned long long)output_number, compact->compaction->level(),
867           (unsigned long long)current_entries,
868           (unsigned long long)current_bytes);
869     }
870   }
871   return s;
872 }
873 
InstallCompactionResults(CompactionState * compact)874 Status DBImpl::InstallCompactionResults(CompactionState* compact) {
875   mutex_.AssertHeld();
876   Log(options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes",
877       compact->compaction->num_input_files(0), compact->compaction->level(),
878       compact->compaction->num_input_files(1), compact->compaction->level() + 1,
879       static_cast<long long>(compact->total_bytes));
880 
881   // Add compaction outputs
882   compact->compaction->AddInputDeletions(compact->compaction->edit());
883   const int level = compact->compaction->level();
884   for (size_t i = 0; i < compact->outputs.size(); i++) {
885     const CompactionState::Output& out = compact->outputs[i];
886     compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size,
887                                          out.smallest, out.largest);
888   }
889   return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
890 }
891 
DoCompactionWork(CompactionState * compact)892 Status DBImpl::DoCompactionWork(CompactionState* compact) {
893   const uint64_t start_micros = env_->NowMicros();
894   int64_t imm_micros = 0;  // Micros spent doing imm_ compactions
895 
896   Log(options_.info_log, "Compacting %d@%d + %d@%d files",
897       compact->compaction->num_input_files(0), compact->compaction->level(),
898       compact->compaction->num_input_files(1),
899       compact->compaction->level() + 1);
900 
901   assert(versions_->NumLevelFiles(compact->compaction->level()) > 0);
902   assert(compact->builder == nullptr);
903   assert(compact->outfile == nullptr);
904   if (snapshots_.empty()) {
905     compact->smallest_snapshot = versions_->LastSequence();
906   } else {
907     compact->smallest_snapshot = snapshots_.oldest()->sequence_number();
908   }
909 
910   Iterator* input = versions_->MakeInputIterator(compact->compaction);
911 
912   // Release mutex while we're actually doing the compaction work
913   mutex_.Unlock();
914 
915   input->SeekToFirst();
916   Status status;
917   ParsedInternalKey ikey;
918   std::string current_user_key;
919   bool has_current_user_key = false;
920   SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
921   while (input->Valid() && !shutting_down_.load(std::memory_order_acquire)) {
922     // Prioritize immutable compaction work
923     if (has_imm_.load(std::memory_order_relaxed)) {
924       const uint64_t imm_start = env_->NowMicros();
925       mutex_.Lock();
926       if (imm_ != nullptr) {
927         CompactMemTable();
928         // Wake up MakeRoomForWrite() if necessary.
929         background_work_finished_signal_.SignalAll();
930       }
931       mutex_.Unlock();
932       imm_micros += (env_->NowMicros() - imm_start);
933     }
934 
935     Slice key = input->key();
936     if (compact->compaction->ShouldStopBefore(key) &&
937         compact->builder != nullptr) {
938       status = FinishCompactionOutputFile(compact, input);
939       if (!status.ok()) {
940         break;
941       }
942     }
943 
944     // Handle key/value, add to state, etc.
945     bool drop = false;
946     if (!ParseInternalKey(key, &ikey)) {
947       // Do not hide error keys
948       current_user_key.clear();
949       has_current_user_key = false;
950       last_sequence_for_key = kMaxSequenceNumber;
951     } else {
952       if (!has_current_user_key ||
953           user_comparator()->Compare(ikey.user_key, Slice(current_user_key)) !=
954               0) {
955         // First occurrence of this user key
956         current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
957         has_current_user_key = true;
958         last_sequence_for_key = kMaxSequenceNumber;
959       }
960 
961       if (last_sequence_for_key <= compact->smallest_snapshot) {
962         // Hidden by an newer entry for same user key
963         drop = true;  // (A)
964       } else if (ikey.type == kTypeDeletion &&
965                  ikey.sequence <= compact->smallest_snapshot &&
966                  compact->compaction->IsBaseLevelForKey(ikey.user_key)) {
967         // For this user key:
968         // (1) there is no data in higher levels
969         // (2) data in lower levels will have larger sequence numbers
970         // (3) data in layers that are being compacted here and have
971         //     smaller sequence numbers will be dropped in the next
972         //     few iterations of this loop (by rule (A) above).
973         // Therefore this deletion marker is obsolete and can be dropped.
974         drop = true;
975       }
976 
977       last_sequence_for_key = ikey.sequence;
978     }
979 #if 0
980     Log(options_.info_log,
981         "  Compact: %s, seq %d, type: %d %d, drop: %d, is_base: %d, "
982         "%d smallest_snapshot: %d",
983         ikey.user_key.ToString().c_str(),
984         (int)ikey.sequence, ikey.type, kTypeValue, drop,
985         compact->compaction->IsBaseLevelForKey(ikey.user_key),
986         (int)last_sequence_for_key, (int)compact->smallest_snapshot);
987 #endif
988 
989     if (!drop) {
990       // Open output file if necessary
991       if (compact->builder == nullptr) {
992         status = OpenCompactionOutputFile(compact);
993         if (!status.ok()) {
994           break;
995         }
996       }
997       if (compact->builder->NumEntries() == 0) {
998         compact->current_output()->smallest.DecodeFrom(key);
999       }
1000       compact->current_output()->largest.DecodeFrom(key);
1001       compact->builder->Add(key, input->value());
1002 
1003       // Close output file if it is big enough
1004       if (compact->builder->FileSize() >=
1005           compact->compaction->MaxOutputFileSize()) {
1006         status = FinishCompactionOutputFile(compact, input);
1007         if (!status.ok()) {
1008           break;
1009         }
1010       }
1011     }
1012 
1013     input->Next();
1014   }
1015 
1016   if (status.ok() && shutting_down_.load(std::memory_order_acquire)) {
1017     status = Status::IOError("Deleting DB during compaction");
1018   }
1019   if (status.ok() && compact->builder != nullptr) {
1020     status = FinishCompactionOutputFile(compact, input);
1021   }
1022   if (status.ok()) {
1023     status = input->status();
1024   }
1025   delete input;
1026   input = nullptr;
1027 
1028   CompactionStats stats;
1029   stats.micros = env_->NowMicros() - start_micros - imm_micros;
1030   for (int which = 0; which < 2; which++) {
1031     for (int i = 0; i < compact->compaction->num_input_files(which); i++) {
1032       stats.bytes_read += compact->compaction->input(which, i)->file_size;
1033     }
1034   }
1035   for (size_t i = 0; i < compact->outputs.size(); i++) {
1036     stats.bytes_written += compact->outputs[i].file_size;
1037   }
1038 
1039   mutex_.Lock();
1040   stats_[compact->compaction->level() + 1].Add(stats);
1041 
1042   if (status.ok()) {
1043     status = InstallCompactionResults(compact);
1044   }
1045   if (!status.ok()) {
1046     RecordBackgroundError(status);
1047   }
1048   VersionSet::LevelSummaryStorage tmp;
1049   Log(options_.info_log, "compacted to: %s", versions_->LevelSummary(&tmp));
1050   return status;
1051 }
1052 
1053 namespace {
1054 
1055 struct IterState {
1056   port::Mutex* const mu;
1057   Version* const version GUARDED_BY(mu);
1058   MemTable* const mem GUARDED_BY(mu);
1059   MemTable* const imm GUARDED_BY(mu);
1060 
IterStateleveldb::__anon7aea9cb40111::IterState1061   IterState(port::Mutex* mutex, MemTable* mem, MemTable* imm, Version* version)
1062       : mu(mutex), version(version), mem(mem), imm(imm) {}
1063 };
1064 
CleanupIteratorState(void * arg1,void * arg2)1065 static void CleanupIteratorState(void* arg1, void* arg2) {
1066   IterState* state = reinterpret_cast<IterState*>(arg1);
1067   state->mu->Lock();
1068   state->mem->Unref();
1069   if (state->imm != nullptr) state->imm->Unref();
1070   state->version->Unref();
1071   state->mu->Unlock();
1072   delete state;
1073 }
1074 
1075 }  // anonymous namespace
1076 
NewInternalIterator(const ReadOptions & options,SequenceNumber * latest_snapshot,uint32_t * seed)1077 Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
1078                                       SequenceNumber* latest_snapshot,
1079                                       uint32_t* seed) {
1080   mutex_.Lock();
1081   *latest_snapshot = versions_->LastSequence();
1082 
1083   // Collect together all needed child iterators
1084   std::vector<Iterator*> list;
1085   list.push_back(mem_->NewIterator());
1086   mem_->Ref();
1087   if (imm_ != nullptr) {
1088     list.push_back(imm_->NewIterator());
1089     imm_->Ref();
1090   }
1091   versions_->current()->AddIterators(options, &list);
1092   Iterator* internal_iter =
1093       NewMergingIterator(&internal_comparator_, &list[0], list.size());
1094   versions_->current()->Ref();
1095 
1096   IterState* cleanup = new IterState(&mutex_, mem_, imm_, versions_->current());
1097   internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr);
1098 
1099   *seed = ++seed_;
1100   mutex_.Unlock();
1101   return internal_iter;
1102 }
1103 
TEST_NewInternalIterator()1104 Iterator* DBImpl::TEST_NewInternalIterator() {
1105   SequenceNumber ignored;
1106   uint32_t ignored_seed;
1107   return NewInternalIterator(ReadOptions(), &ignored, &ignored_seed);
1108 }
1109 
TEST_MaxNextLevelOverlappingBytes()1110 int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
1111   MutexLock l(&mutex_);
1112   return versions_->MaxNextLevelOverlappingBytes();
1113 }
1114 
Get(const ReadOptions & options,const Slice & key,std::string * value)1115 Status DBImpl::Get(const ReadOptions& options, const Slice& key,
1116                    std::string* value) {
1117   Status s;
1118   MutexLock l(&mutex_);
1119   SequenceNumber snapshot;
1120   if (options.snapshot != nullptr) {
1121     snapshot =
1122         static_cast<const SnapshotImpl*>(options.snapshot)->sequence_number();
1123   } else {
1124     snapshot = versions_->LastSequence();
1125   }
1126 
1127   MemTable* mem = mem_;
1128   MemTable* imm = imm_;
1129   Version* current = versions_->current();
1130   mem->Ref();
1131   if (imm != nullptr) imm->Ref();
1132   current->Ref();
1133 
1134   bool have_stat_update = false;
1135   Version::GetStats stats;
1136 
1137   // Unlock while reading from files and memtables
1138   {
1139     mutex_.Unlock();
1140     // First look in the memtable, then in the immutable memtable (if any).
1141     LookupKey lkey(key, snapshot);
1142     if (mem->Get(lkey, value, &s)) {
1143       // Done
1144     } else if (imm != nullptr && imm->Get(lkey, value, &s)) {
1145       // Done
1146     } else {
1147       s = current->Get(options, lkey, value, &stats);
1148       have_stat_update = true;
1149     }
1150     mutex_.Lock();
1151   }
1152 
1153   if (have_stat_update && current->UpdateStats(stats)) {
1154     MaybeScheduleCompaction();
1155   }
1156   mem->Unref();
1157   if (imm != nullptr) imm->Unref();
1158   current->Unref();
1159   return s;
1160 }
1161 
NewIterator(const ReadOptions & options)1162 Iterator* DBImpl::NewIterator(const ReadOptions& options) {
1163   SequenceNumber latest_snapshot;
1164   uint32_t seed;
1165   Iterator* iter = NewInternalIterator(options, &latest_snapshot, &seed);
1166   return NewDBIterator(this, user_comparator(), iter,
1167                        (options.snapshot != nullptr
1168                             ? static_cast<const SnapshotImpl*>(options.snapshot)
1169                                   ->sequence_number()
1170                             : latest_snapshot),
1171                        seed);
1172 }
1173 
RecordReadSample(Slice key)1174 void DBImpl::RecordReadSample(Slice key) {
1175   MutexLock l(&mutex_);
1176   if (versions_->current()->RecordReadSample(key)) {
1177     MaybeScheduleCompaction();
1178   }
1179 }
1180 
GetSnapshot()1181 const Snapshot* DBImpl::GetSnapshot() {
1182   MutexLock l(&mutex_);
1183   return snapshots_.New(versions_->LastSequence());
1184 }
1185 
ReleaseSnapshot(const Snapshot * snapshot)1186 void DBImpl::ReleaseSnapshot(const Snapshot* snapshot) {
1187   MutexLock l(&mutex_);
1188   snapshots_.Delete(static_cast<const SnapshotImpl*>(snapshot));
1189 }
1190 
1191 // Convenience methods
Put(const WriteOptions & o,const Slice & key,const Slice & val)1192 Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
1193   return DB::Put(o, key, val);
1194 }
1195 
Delete(const WriteOptions & options,const Slice & key)1196 Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
1197   return DB::Delete(options, key);
1198 }
1199 
Write(const WriteOptions & options,WriteBatch * updates)1200 Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
1201   Writer w(&mutex_);
1202   w.batch = updates;
1203   w.sync = options.sync;
1204   w.done = false;
1205 
1206   MutexLock l(&mutex_);
1207   writers_.push_back(&w);
1208   while (!w.done && &w != writers_.front()) {
1209     w.cv.Wait();
1210   }
1211   if (w.done) {
1212     return w.status;
1213   }
1214 
1215   // May temporarily unlock and wait.
1216   Status status = MakeRoomForWrite(updates == nullptr);
1217   uint64_t last_sequence = versions_->LastSequence();
1218   Writer* last_writer = &w;
1219   if (status.ok() && updates != nullptr) {  // nullptr batch is for compactions
1220     WriteBatch* write_batch = BuildBatchGroup(&last_writer);
1221     WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
1222     last_sequence += WriteBatchInternal::Count(write_batch);
1223 
1224     // Add to log and apply to memtable.  We can release the lock
1225     // during this phase since &w is currently responsible for logging
1226     // and protects against concurrent loggers and concurrent writes
1227     // into mem_.
1228     {
1229       mutex_.Unlock();
1230       status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
1231       bool sync_error = false;
1232       if (status.ok() && options.sync) {
1233         status = logfile_->Sync();
1234         if (!status.ok()) {
1235           sync_error = true;
1236         }
1237       }
1238       if (status.ok()) {
1239         status = WriteBatchInternal::InsertInto(write_batch, mem_);
1240       }
1241       mutex_.Lock();
1242       if (sync_error) {
1243         // The state of the log file is indeterminate: the log record we
1244         // just added may or may not show up when the DB is re-opened.
1245         // So we force the DB into a mode where all future writes fail.
1246         RecordBackgroundError(status);
1247       }
1248     }
1249     if (write_batch == tmp_batch_) tmp_batch_->Clear();
1250 
1251     versions_->SetLastSequence(last_sequence);
1252   }
1253 
1254   while (true) {
1255     Writer* ready = writers_.front();
1256     writers_.pop_front();
1257     if (ready != &w) {
1258       ready->status = status;
1259       ready->done = true;
1260       ready->cv.Signal();
1261     }
1262     if (ready == last_writer) break;
1263   }
1264 
1265   // Notify new head of write queue
1266   if (!writers_.empty()) {
1267     writers_.front()->cv.Signal();
1268   }
1269 
1270   return status;
1271 }
1272 
1273 // REQUIRES: Writer list must be non-empty
1274 // REQUIRES: First writer must have a non-null batch
BuildBatchGroup(Writer ** last_writer)1275 WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
1276   mutex_.AssertHeld();
1277   assert(!writers_.empty());
1278   Writer* first = writers_.front();
1279   WriteBatch* result = first->batch;
1280   assert(result != nullptr);
1281 
1282   size_t size = WriteBatchInternal::ByteSize(first->batch);
1283 
1284   // Allow the group to grow up to a maximum size, but if the
1285   // original write is small, limit the growth so we do not slow
1286   // down the small write too much.
1287   size_t max_size = 1 << 20;
1288   if (size <= (128 << 10)) {
1289     max_size = size + (128 << 10);
1290   }
1291 
1292   *last_writer = first;
1293   std::deque<Writer*>::iterator iter = writers_.begin();
1294   ++iter;  // Advance past "first"
1295   for (; iter != writers_.end(); ++iter) {
1296     Writer* w = *iter;
1297     if (w->sync && !first->sync) {
1298       // Do not include a sync write into a batch handled by a non-sync write.
1299       break;
1300     }
1301 
1302     if (w->batch != nullptr) {
1303       size += WriteBatchInternal::ByteSize(w->batch);
1304       if (size > max_size) {
1305         // Do not make batch too big
1306         break;
1307       }
1308 
1309       // Append to *result
1310       if (result == first->batch) {
1311         // Switch to temporary batch instead of disturbing caller's batch
1312         result = tmp_batch_;
1313         assert(WriteBatchInternal::Count(result) == 0);
1314         WriteBatchInternal::Append(result, first->batch);
1315       }
1316       WriteBatchInternal::Append(result, w->batch);
1317     }
1318     *last_writer = w;
1319   }
1320   return result;
1321 }
1322 
1323 // REQUIRES: mutex_ is held
1324 // REQUIRES: this thread is currently at the front of the writer queue
MakeRoomForWrite(bool force)1325 Status DBImpl::MakeRoomForWrite(bool force) {
1326   mutex_.AssertHeld();
1327   assert(!writers_.empty());
1328   bool allow_delay = !force;
1329   Status s;
1330   while (true) {
1331     if (!bg_error_.ok()) {
1332       // Yield previous error
1333       s = bg_error_;
1334       break;
1335     } else if (allow_delay && versions_->NumLevelFiles(0) >=
1336                                   config::kL0_SlowdownWritesTrigger) {
1337       // We are getting close to hitting a hard limit on the number of
1338       // L0 files.  Rather than delaying a single write by several
1339       // seconds when we hit the hard limit, start delaying each
1340       // individual write by 1ms to reduce latency variance.  Also,
1341       // this delay hands over some CPU to the compaction thread in
1342       // case it is sharing the same core as the writer.
1343       mutex_.Unlock();
1344       env_->SleepForMicroseconds(1000);
1345       allow_delay = false;  // Do not delay a single write more than once
1346       mutex_.Lock();
1347     } else if (!force &&
1348                (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
1349       // There is room in current memtable
1350       break;
1351     } else if (imm_ != nullptr) {
1352       // We have filled up the current memtable, but the previous
1353       // one is still being compacted, so we wait.
1354       Log(options_.info_log, "Current memtable full; waiting...\n");
1355       background_work_finished_signal_.Wait();
1356     } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
1357       // There are too many level-0 files.
1358       Log(options_.info_log, "Too many L0 files; waiting...\n");
1359       background_work_finished_signal_.Wait();
1360     } else {
1361       // Attempt to switch to a new memtable and trigger compaction of old
1362       assert(versions_->PrevLogNumber() == 0);
1363       uint64_t new_log_number = versions_->NewFileNumber();
1364       WritableFile* lfile = nullptr;
1365       s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
1366       if (!s.ok()) {
1367         // Avoid chewing through file number space in a tight loop.
1368         versions_->ReuseFileNumber(new_log_number);
1369         break;
1370       }
1371       delete log_;
1372       delete logfile_;
1373       logfile_ = lfile;
1374       logfile_number_ = new_log_number;
1375       log_ = new log::Writer(lfile);
1376       imm_ = mem_;
1377       has_imm_.store(true, std::memory_order_release);
1378       mem_ = new MemTable(internal_comparator_);
1379       mem_->Ref();
1380       force = false;  // Do not force another compaction if have room
1381       MaybeScheduleCompaction();
1382     }
1383   }
1384   return s;
1385 }
1386 
GetProperty(const Slice & property,std::string * value)1387 bool DBImpl::GetProperty(const Slice& property, std::string* value) {
1388   value->clear();
1389 
1390   MutexLock l(&mutex_);
1391   Slice in = property;
1392   Slice prefix("leveldb.");
1393   if (!in.starts_with(prefix)) return false;
1394   in.remove_prefix(prefix.size());
1395 
1396   if (in.starts_with("num-files-at-level")) {
1397     in.remove_prefix(strlen("num-files-at-level"));
1398     uint64_t level;
1399     bool ok = ConsumeDecimalNumber(&in, &level) && in.empty();
1400     if (!ok || level >= config::kNumLevels) {
1401       return false;
1402     } else {
1403       char buf[100];
1404       std::snprintf(buf, sizeof(buf), "%d",
1405                     versions_->NumLevelFiles(static_cast<int>(level)));
1406       *value = buf;
1407       return true;
1408     }
1409   } else if (in == "stats") {
1410     char buf[200];
1411     std::snprintf(buf, sizeof(buf),
1412                   "                               Compactions\n"
1413                   "Level  Files Size(MB) Time(sec) Read(MB) Write(MB)\n"
1414                   "--------------------------------------------------\n");
1415     value->append(buf);
1416     for (int level = 0; level < config::kNumLevels; level++) {
1417       int files = versions_->NumLevelFiles(level);
1418       if (stats_[level].micros > 0 || files > 0) {
1419         std::snprintf(buf, sizeof(buf), "%3d %8d %8.0f %9.0f %8.0f %9.0f\n",
1420                       level, files, versions_->NumLevelBytes(level) / 1048576.0,
1421                       stats_[level].micros / 1e6,
1422                       stats_[level].bytes_read / 1048576.0,
1423                       stats_[level].bytes_written / 1048576.0);
1424         value->append(buf);
1425       }
1426     }
1427     return true;
1428   } else if (in == "sstables") {
1429     *value = versions_->current()->DebugString();
1430     return true;
1431   } else if (in == "approximate-memory-usage") {
1432     size_t total_usage = options_.block_cache->TotalCharge();
1433     if (mem_) {
1434       total_usage += mem_->ApproximateMemoryUsage();
1435     }
1436     if (imm_) {
1437       total_usage += imm_->ApproximateMemoryUsage();
1438     }
1439     char buf[50];
1440     std::snprintf(buf, sizeof(buf), "%llu",
1441                   static_cast<unsigned long long>(total_usage));
1442     value->append(buf);
1443     return true;
1444   }
1445 
1446   return false;
1447 }
1448 
GetApproximateSizes(const Range * range,int n,uint64_t * sizes)1449 void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) {
1450   // TODO(opt): better implementation
1451   MutexLock l(&mutex_);
1452   Version* v = versions_->current();
1453   v->Ref();
1454 
1455   for (int i = 0; i < n; i++) {
1456     // Convert user_key into a corresponding internal key.
1457     InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
1458     InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
1459     uint64_t start = versions_->ApproximateOffsetOf(v, k1);
1460     uint64_t limit = versions_->ApproximateOffsetOf(v, k2);
1461     sizes[i] = (limit >= start ? limit - start : 0);
1462   }
1463 
1464   v->Unref();
1465 }
1466 
1467 // Default implementations of convenience methods that subclasses of DB
1468 // can call if they wish
Put(const WriteOptions & opt,const Slice & key,const Slice & value)1469 Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
1470   WriteBatch batch;
1471   batch.Put(key, value);
1472   return Write(opt, &batch);
1473 }
1474 
Delete(const WriteOptions & opt,const Slice & key)1475 Status DB::Delete(const WriteOptions& opt, const Slice& key) {
1476   WriteBatch batch;
1477   batch.Delete(key);
1478   return Write(opt, &batch);
1479 }
1480 
1481 DB::~DB() = default;
1482 
Open(const Options & options,const std::string & dbname,DB ** dbptr)1483 Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
1484   *dbptr = nullptr;
1485 
1486   DBImpl* impl = new DBImpl(options, dbname);
1487   impl->mutex_.Lock();
1488   VersionEdit edit;
1489   // Recover handles create_if_missing, error_if_exists
1490   bool save_manifest = false;
1491   Status s = impl->Recover(&edit, &save_manifest);
1492   if (s.ok() && impl->mem_ == nullptr) {
1493     // Create new log and a corresponding memtable.
1494     uint64_t new_log_number = impl->versions_->NewFileNumber();
1495     WritableFile* lfile;
1496     s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
1497                                      &lfile);
1498     if (s.ok()) {
1499       edit.SetLogNumber(new_log_number);
1500       impl->logfile_ = lfile;
1501       impl->logfile_number_ = new_log_number;
1502       impl->log_ = new log::Writer(lfile);
1503       impl->mem_ = new MemTable(impl->internal_comparator_);
1504       impl->mem_->Ref();
1505     }
1506   }
1507   if (s.ok() && save_manifest) {
1508     edit.SetPrevLogNumber(0);  // No older logs needed after recovery.
1509     edit.SetLogNumber(impl->logfile_number_);
1510     s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
1511   }
1512   if (s.ok()) {
1513     impl->RemoveObsoleteFiles();
1514     impl->MaybeScheduleCompaction();
1515   }
1516   impl->mutex_.Unlock();
1517   if (s.ok()) {
1518     assert(impl->mem_ != nullptr);
1519     *dbptr = impl;
1520   } else {
1521     delete impl;
1522   }
1523   return s;
1524 }
1525 
1526 Snapshot::~Snapshot() = default;
1527 
DestroyDB(const std::string & dbname,const Options & options)1528 Status DestroyDB(const std::string& dbname, const Options& options) {
1529   Env* env = options.env;
1530   std::vector<std::string> filenames;
1531   Status result = env->GetChildren(dbname, &filenames);
1532   if (!result.ok()) {
1533     // Ignore error in case directory does not exist
1534     return Status::OK();
1535   }
1536 
1537   FileLock* lock;
1538   const std::string lockname = LockFileName(dbname);
1539   result = env->LockFile(lockname, &lock);
1540   if (result.ok()) {
1541     uint64_t number;
1542     FileType type;
1543     for (size_t i = 0; i < filenames.size(); i++) {
1544       if (ParseFileName(filenames[i], &number, &type) &&
1545           type != kDBLockFile) {  // Lock file will be deleted at end
1546         Status del = env->RemoveFile(dbname + "/" + filenames[i]);
1547         if (result.ok() && !del.ok()) {
1548           result = del;
1549         }
1550       }
1551     }
1552     env->UnlockFile(lock);  // Ignore error since state is already gone
1553     env->RemoveFile(lockname);
1554     env->RemoveDir(dbname);  // Ignore error in case dir contains other files
1555   }
1556   return result;
1557 }
1558 
1559 }  // namespace leveldb
1560