xref: /aosp_15_r20/external/leveldb/benchmarks/db_bench.cc (revision 9507f98c5f32dee4b5f9e4a38cd499f3ff5c4490)
1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
4 
5 #include <sys/types.h>
6 
7 #include <atomic>
8 #include <cstdio>
9 #include <cstdlib>
10 
11 #include "leveldb/cache.h"
12 #include "leveldb/comparator.h"
13 #include "leveldb/db.h"
14 #include "leveldb/env.h"
15 #include "leveldb/filter_policy.h"
16 #include "leveldb/write_batch.h"
17 #include "port/port.h"
18 #include "util/crc32c.h"
19 #include "util/histogram.h"
20 #include "util/mutexlock.h"
21 #include "util/random.h"
22 #include "util/testutil.h"
23 
24 // Comma-separated list of operations to run in the specified order
25 //   Actual benchmarks:
26 //      fillseq       -- write N values in sequential key order in async mode
27 //      fillrandom    -- write N values in random key order in async mode
28 //      overwrite     -- overwrite N values in random key order in async mode
29 //      fillsync      -- write N/100 values in random key order in sync mode
30 //      fill100K      -- write N/1000 100K values in random order in async mode
31 //      deleteseq     -- delete N keys in sequential order
32 //      deleterandom  -- delete N keys in random order
33 //      readseq       -- read N times sequentially
34 //      readreverse   -- read N times in reverse order
35 //      readrandom    -- read N times in random order
36 //      readmissing   -- read N missing keys in random order
37 //      readhot       -- read N times in random order from 1% section of DB
38 //      seekrandom    -- N random seeks
39 //      seekordered   -- N ordered seeks
40 //      open          -- cost of opening a DB
41 //      crc32c        -- repeated crc32c of 4K of data
42 //   Meta operations:
43 //      compact     -- Compact the entire DB
44 //      stats       -- Print DB stats
45 //      sstables    -- Print sstable info
46 //      heapprofile -- Dump a heap profile (if supported by this port)
47 static const char* FLAGS_benchmarks =
48     "fillseq,"
49     "fillsync,"
50     "fillrandom,"
51     "overwrite,"
52     "readrandom,"
53     "readrandom,"  // Extra run to allow previous compactions to quiesce
54     "readseq,"
55     "readreverse,"
56     "compact,"
57     "readrandom,"
58     "readseq,"
59     "readreverse,"
60     "fill100K,"
61     "crc32c,"
62     "snappycomp,"
63     "snappyuncomp,";
64 
65 // Number of key/values to place in database
66 static int FLAGS_num = 1000000;
67 
68 // Number of read operations to do.  If negative, do FLAGS_num reads.
69 static int FLAGS_reads = -1;
70 
71 // Number of concurrent threads to run.
72 static int FLAGS_threads = 1;
73 
74 // Size of each value
75 static int FLAGS_value_size = 100;
76 
77 // Arrange to generate values that shrink to this fraction of
78 // their original size after compression
79 static double FLAGS_compression_ratio = 0.5;
80 
81 // Print histogram of operation timings
82 static bool FLAGS_histogram = false;
83 
84 // Count the number of string comparisons performed
85 static bool FLAGS_comparisons = false;
86 
87 // Number of bytes to buffer in memtable before compacting
88 // (initialized to default value by "main")
89 static int FLAGS_write_buffer_size = 0;
90 
91 // Number of bytes written to each file.
92 // (initialized to default value by "main")
93 static int FLAGS_max_file_size = 0;
94 
95 // Approximate size of user data packed per block (before compression.
96 // (initialized to default value by "main")
97 static int FLAGS_block_size = 0;
98 
99 // Number of bytes to use as a cache of uncompressed data.
100 // Negative means use default settings.
101 static int FLAGS_cache_size = -1;
102 
103 // Maximum number of files to keep open at the same time (use default if == 0)
104 static int FLAGS_open_files = 0;
105 
106 // Bloom filter bits per key.
107 // Negative means use default settings.
108 static int FLAGS_bloom_bits = -1;
109 
110 // Common key prefix length.
111 static int FLAGS_key_prefix = 0;
112 
113 // If true, do not destroy the existing database.  If you set this
114 // flag and also specify a benchmark that wants a fresh database, that
115 // benchmark will fail.
116 static bool FLAGS_use_existing_db = false;
117 
118 // If true, reuse existing log/MANIFEST files when re-opening a database.
119 static bool FLAGS_reuse_logs = false;
120 
121 // Use the db with the following name.
122 static const char* FLAGS_db = nullptr;
123 
124 namespace leveldb {
125 
126 namespace {
127 leveldb::Env* g_env = nullptr;
128 
129 class CountComparator : public Comparator {
130  public:
CountComparator(const Comparator * wrapped)131   CountComparator(const Comparator* wrapped) : wrapped_(wrapped) {}
~CountComparator()132   ~CountComparator() override {}
Compare(const Slice & a,const Slice & b) const133   int Compare(const Slice& a, const Slice& b) const override {
134     count_.fetch_add(1, std::memory_order_relaxed);
135     return wrapped_->Compare(a, b);
136   }
Name() const137   const char* Name() const override { return wrapped_->Name(); }
FindShortestSeparator(std::string * start,const Slice & limit) const138   void FindShortestSeparator(std::string* start,
139                              const Slice& limit) const override {
140     wrapped_->FindShortestSeparator(start, limit);
141   }
142 
FindShortSuccessor(std::string * key) const143   void FindShortSuccessor(std::string* key) const override {
144     return wrapped_->FindShortSuccessor(key);
145   }
146 
comparisons() const147   size_t comparisons() const { return count_.load(std::memory_order_relaxed); }
148 
reset()149   void reset() { count_.store(0, std::memory_order_relaxed); }
150 
151  private:
152   mutable std::atomic<size_t> count_{0};
153   const Comparator* const wrapped_;
154 };
155 
156 // Helper for quickly generating random data.
157 class RandomGenerator {
158  private:
159   std::string data_;
160   int pos_;
161 
162  public:
RandomGenerator()163   RandomGenerator() {
164     // We use a limited amount of data over and over again and ensure
165     // that it is larger than the compression window (32KB), and also
166     // large enough to serve all typical value sizes we want to write.
167     Random rnd(301);
168     std::string piece;
169     while (data_.size() < 1048576) {
170       // Add a short fragment that is as compressible as specified
171       // by FLAGS_compression_ratio.
172       test::CompressibleString(&rnd, FLAGS_compression_ratio, 100, &piece);
173       data_.append(piece);
174     }
175     pos_ = 0;
176   }
177 
Generate(size_t len)178   Slice Generate(size_t len) {
179     if (pos_ + len > data_.size()) {
180       pos_ = 0;
181       assert(len < data_.size());
182     }
183     pos_ += len;
184     return Slice(data_.data() + pos_ - len, len);
185   }
186 };
187 
188 class KeyBuffer {
189  public:
KeyBuffer()190   KeyBuffer() {
191     assert(FLAGS_key_prefix < sizeof(buffer_));
192     memset(buffer_, 'a', FLAGS_key_prefix);
193   }
194   KeyBuffer& operator=(KeyBuffer& other) = delete;
195   KeyBuffer(KeyBuffer& other) = delete;
196 
Set(int k)197   void Set(int k) {
198     std::snprintf(buffer_ + FLAGS_key_prefix,
199                   sizeof(buffer_) - FLAGS_key_prefix, "%016d", k);
200   }
201 
slice() const202   Slice slice() const { return Slice(buffer_, FLAGS_key_prefix + 16); }
203 
204  private:
205   char buffer_[1024];
206 };
207 
208 #if defined(__linux)
TrimSpace(Slice s)209 static Slice TrimSpace(Slice s) {
210   size_t start = 0;
211   while (start < s.size() && isspace(s[start])) {
212     start++;
213   }
214   size_t limit = s.size();
215   while (limit > start && isspace(s[limit - 1])) {
216     limit--;
217   }
218   return Slice(s.data() + start, limit - start);
219 }
220 #endif
221 
AppendWithSpace(std::string * str,Slice msg)222 static void AppendWithSpace(std::string* str, Slice msg) {
223   if (msg.empty()) return;
224   if (!str->empty()) {
225     str->push_back(' ');
226   }
227   str->append(msg.data(), msg.size());
228 }
229 
230 class Stats {
231  private:
232   double start_;
233   double finish_;
234   double seconds_;
235   int done_;
236   int next_report_;
237   int64_t bytes_;
238   double last_op_finish_;
239   Histogram hist_;
240   std::string message_;
241 
242  public:
Stats()243   Stats() { Start(); }
244 
Start()245   void Start() {
246     next_report_ = 100;
247     hist_.Clear();
248     done_ = 0;
249     bytes_ = 0;
250     seconds_ = 0;
251     message_.clear();
252     start_ = finish_ = last_op_finish_ = g_env->NowMicros();
253   }
254 
Merge(const Stats & other)255   void Merge(const Stats& other) {
256     hist_.Merge(other.hist_);
257     done_ += other.done_;
258     bytes_ += other.bytes_;
259     seconds_ += other.seconds_;
260     if (other.start_ < start_) start_ = other.start_;
261     if (other.finish_ > finish_) finish_ = other.finish_;
262 
263     // Just keep the messages from one thread
264     if (message_.empty()) message_ = other.message_;
265   }
266 
Stop()267   void Stop() {
268     finish_ = g_env->NowMicros();
269     seconds_ = (finish_ - start_) * 1e-6;
270   }
271 
AddMessage(Slice msg)272   void AddMessage(Slice msg) { AppendWithSpace(&message_, msg); }
273 
FinishedSingleOp()274   void FinishedSingleOp() {
275     if (FLAGS_histogram) {
276       double now = g_env->NowMicros();
277       double micros = now - last_op_finish_;
278       hist_.Add(micros);
279       if (micros > 20000) {
280         std::fprintf(stderr, "long op: %.1f micros%30s\r", micros, "");
281         std::fflush(stderr);
282       }
283       last_op_finish_ = now;
284     }
285 
286     done_++;
287     if (done_ >= next_report_) {
288       if (next_report_ < 1000)
289         next_report_ += 100;
290       else if (next_report_ < 5000)
291         next_report_ += 500;
292       else if (next_report_ < 10000)
293         next_report_ += 1000;
294       else if (next_report_ < 50000)
295         next_report_ += 5000;
296       else if (next_report_ < 100000)
297         next_report_ += 10000;
298       else if (next_report_ < 500000)
299         next_report_ += 50000;
300       else
301         next_report_ += 100000;
302       std::fprintf(stderr, "... finished %d ops%30s\r", done_, "");
303       std::fflush(stderr);
304     }
305   }
306 
AddBytes(int64_t n)307   void AddBytes(int64_t n) { bytes_ += n; }
308 
Report(const Slice & name)309   void Report(const Slice& name) {
310     // Pretend at least one op was done in case we are running a benchmark
311     // that does not call FinishedSingleOp().
312     if (done_ < 1) done_ = 1;
313 
314     std::string extra;
315     if (bytes_ > 0) {
316       // Rate is computed on actual elapsed time, not the sum of per-thread
317       // elapsed times.
318       double elapsed = (finish_ - start_) * 1e-6;
319       char rate[100];
320       std::snprintf(rate, sizeof(rate), "%6.1f MB/s",
321                     (bytes_ / 1048576.0) / elapsed);
322       extra = rate;
323     }
324     AppendWithSpace(&extra, message_);
325 
326     std::fprintf(stdout, "%-12s : %11.3f micros/op;%s%s\n",
327                  name.ToString().c_str(), seconds_ * 1e6 / done_,
328                  (extra.empty() ? "" : " "), extra.c_str());
329     if (FLAGS_histogram) {
330       std::fprintf(stdout, "Microseconds per op:\n%s\n",
331                    hist_.ToString().c_str());
332     }
333     std::fflush(stdout);
334   }
335 };
336 
337 // State shared by all concurrent executions of the same benchmark.
338 struct SharedState {
339   port::Mutex mu;
340   port::CondVar cv GUARDED_BY(mu);
341   int total GUARDED_BY(mu);
342 
343   // Each thread goes through the following states:
344   //    (1) initializing
345   //    (2) waiting for others to be initialized
346   //    (3) running
347   //    (4) done
348 
349   int num_initialized GUARDED_BY(mu);
350   int num_done GUARDED_BY(mu);
351   bool start GUARDED_BY(mu);
352 
SharedStateleveldb::__anon7755747a0111::SharedState353   SharedState(int total)
354       : cv(&mu), total(total), num_initialized(0), num_done(0), start(false) {}
355 };
356 
357 // Per-thread state for concurrent executions of the same benchmark.
358 struct ThreadState {
359   int tid;      // 0..n-1 when running in n threads
360   Random rand;  // Has different seeds for different threads
361   Stats stats;
362   SharedState* shared;
363 
ThreadStateleveldb::__anon7755747a0111::ThreadState364   ThreadState(int index, int seed) : tid(index), rand(seed), shared(nullptr) {}
365 };
366 
367 }  // namespace
368 
369 class Benchmark {
370  private:
371   Cache* cache_;
372   const FilterPolicy* filter_policy_;
373   DB* db_;
374   int num_;
375   int value_size_;
376   int entries_per_batch_;
377   WriteOptions write_options_;
378   int reads_;
379   int heap_counter_;
380   CountComparator count_comparator_;
381   int total_thread_count_;
382 
PrintHeader()383   void PrintHeader() {
384     const int kKeySize = 16 + FLAGS_key_prefix;
385     PrintEnvironment();
386     std::fprintf(stdout, "Keys:       %d bytes each\n", kKeySize);
387     std::fprintf(
388         stdout, "Values:     %d bytes each (%d bytes after compression)\n",
389         FLAGS_value_size,
390         static_cast<int>(FLAGS_value_size * FLAGS_compression_ratio + 0.5));
391     std::fprintf(stdout, "Entries:    %d\n", num_);
392     std::fprintf(stdout, "RawSize:    %.1f MB (estimated)\n",
393                  ((static_cast<int64_t>(kKeySize + FLAGS_value_size) * num_) /
394                   1048576.0));
395     std::fprintf(
396         stdout, "FileSize:   %.1f MB (estimated)\n",
397         (((kKeySize + FLAGS_value_size * FLAGS_compression_ratio) * num_) /
398          1048576.0));
399     PrintWarnings();
400     std::fprintf(stdout, "------------------------------------------------\n");
401   }
402 
PrintWarnings()403   void PrintWarnings() {
404 #if defined(__GNUC__) && !defined(__OPTIMIZE__)
405     std::fprintf(
406         stdout,
407         "WARNING: Optimization is disabled: benchmarks unnecessarily slow\n");
408 #endif
409 #ifndef NDEBUG
410     std::fprintf(
411         stdout,
412         "WARNING: Assertions are enabled; benchmarks unnecessarily slow\n");
413 #endif
414 
415     // See if snappy is working by attempting to compress a compressible string
416     const char text[] = "yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy";
417     std::string compressed;
418     if (!port::Snappy_Compress(text, sizeof(text), &compressed)) {
419       std::fprintf(stdout, "WARNING: Snappy compression is not enabled\n");
420     } else if (compressed.size() >= sizeof(text)) {
421       std::fprintf(stdout, "WARNING: Snappy compression is not effective\n");
422     }
423   }
424 
PrintEnvironment()425   void PrintEnvironment() {
426     std::fprintf(stderr, "LevelDB:    version %d.%d\n", kMajorVersion,
427                  kMinorVersion);
428 
429 #if defined(__linux)
430     time_t now = time(nullptr);
431     std::fprintf(stderr, "Date:       %s",
432                  ctime(&now));  // ctime() adds newline
433 
434     FILE* cpuinfo = std::fopen("/proc/cpuinfo", "r");
435     if (cpuinfo != nullptr) {
436       char line[1000];
437       int num_cpus = 0;
438       std::string cpu_type;
439       std::string cache_size;
440       while (fgets(line, sizeof(line), cpuinfo) != nullptr) {
441         const char* sep = strchr(line, ':');
442         if (sep == nullptr) {
443           continue;
444         }
445         Slice key = TrimSpace(Slice(line, sep - 1 - line));
446         Slice val = TrimSpace(Slice(sep + 1));
447         if (key == "model name") {
448           ++num_cpus;
449           cpu_type = val.ToString();
450         } else if (key == "cache size") {
451           cache_size = val.ToString();
452         }
453       }
454       std::fclose(cpuinfo);
455       std::fprintf(stderr, "CPU:        %d * %s\n", num_cpus, cpu_type.c_str());
456       std::fprintf(stderr, "CPUCache:   %s\n", cache_size.c_str());
457     }
458 #endif
459   }
460 
461  public:
Benchmark()462   Benchmark()
463       : cache_(FLAGS_cache_size >= 0 ? NewLRUCache(FLAGS_cache_size) : nullptr),
464         filter_policy_(FLAGS_bloom_bits >= 0
465                            ? NewBloomFilterPolicy(FLAGS_bloom_bits)
466                            : nullptr),
467         db_(nullptr),
468         num_(FLAGS_num),
469         value_size_(FLAGS_value_size),
470         entries_per_batch_(1),
471         reads_(FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads),
472         heap_counter_(0),
473         count_comparator_(BytewiseComparator()),
474         total_thread_count_(0) {
475     std::vector<std::string> files;
476     g_env->GetChildren(FLAGS_db, &files);
477     for (size_t i = 0; i < files.size(); i++) {
478       if (Slice(files[i]).starts_with("heap-")) {
479         g_env->RemoveFile(std::string(FLAGS_db) + "/" + files[i]);
480       }
481     }
482     if (!FLAGS_use_existing_db) {
483       DestroyDB(FLAGS_db, Options());
484     }
485   }
486 
~Benchmark()487   ~Benchmark() {
488     delete db_;
489     delete cache_;
490     delete filter_policy_;
491   }
492 
Run()493   void Run() {
494     PrintHeader();
495     Open();
496 
497     const char* benchmarks = FLAGS_benchmarks;
498     while (benchmarks != nullptr) {
499       const char* sep = strchr(benchmarks, ',');
500       Slice name;
501       if (sep == nullptr) {
502         name = benchmarks;
503         benchmarks = nullptr;
504       } else {
505         name = Slice(benchmarks, sep - benchmarks);
506         benchmarks = sep + 1;
507       }
508 
509       // Reset parameters that may be overridden below
510       num_ = FLAGS_num;
511       reads_ = (FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads);
512       value_size_ = FLAGS_value_size;
513       entries_per_batch_ = 1;
514       write_options_ = WriteOptions();
515 
516       void (Benchmark::*method)(ThreadState*) = nullptr;
517       bool fresh_db = false;
518       int num_threads = FLAGS_threads;
519 
520       if (name == Slice("open")) {
521         method = &Benchmark::OpenBench;
522         num_ /= 10000;
523         if (num_ < 1) num_ = 1;
524       } else if (name == Slice("fillseq")) {
525         fresh_db = true;
526         method = &Benchmark::WriteSeq;
527       } else if (name == Slice("fillbatch")) {
528         fresh_db = true;
529         entries_per_batch_ = 1000;
530         method = &Benchmark::WriteSeq;
531       } else if (name == Slice("fillrandom")) {
532         fresh_db = true;
533         method = &Benchmark::WriteRandom;
534       } else if (name == Slice("overwrite")) {
535         fresh_db = false;
536         method = &Benchmark::WriteRandom;
537       } else if (name == Slice("fillsync")) {
538         fresh_db = true;
539         num_ /= 1000;
540         write_options_.sync = true;
541         method = &Benchmark::WriteRandom;
542       } else if (name == Slice("fill100K")) {
543         fresh_db = true;
544         num_ /= 1000;
545         value_size_ = 100 * 1000;
546         method = &Benchmark::WriteRandom;
547       } else if (name == Slice("readseq")) {
548         method = &Benchmark::ReadSequential;
549       } else if (name == Slice("readreverse")) {
550         method = &Benchmark::ReadReverse;
551       } else if (name == Slice("readrandom")) {
552         method = &Benchmark::ReadRandom;
553       } else if (name == Slice("readmissing")) {
554         method = &Benchmark::ReadMissing;
555       } else if (name == Slice("seekrandom")) {
556         method = &Benchmark::SeekRandom;
557       } else if (name == Slice("seekordered")) {
558         method = &Benchmark::SeekOrdered;
559       } else if (name == Slice("readhot")) {
560         method = &Benchmark::ReadHot;
561       } else if (name == Slice("readrandomsmall")) {
562         reads_ /= 1000;
563         method = &Benchmark::ReadRandom;
564       } else if (name == Slice("deleteseq")) {
565         method = &Benchmark::DeleteSeq;
566       } else if (name == Slice("deleterandom")) {
567         method = &Benchmark::DeleteRandom;
568       } else if (name == Slice("readwhilewriting")) {
569         num_threads++;  // Add extra thread for writing
570         method = &Benchmark::ReadWhileWriting;
571       } else if (name == Slice("compact")) {
572         method = &Benchmark::Compact;
573       } else if (name == Slice("crc32c")) {
574         method = &Benchmark::Crc32c;
575       } else if (name == Slice("snappycomp")) {
576         method = &Benchmark::SnappyCompress;
577       } else if (name == Slice("snappyuncomp")) {
578         method = &Benchmark::SnappyUncompress;
579       } else if (name == Slice("heapprofile")) {
580         HeapProfile();
581       } else if (name == Slice("stats")) {
582         PrintStats("leveldb.stats");
583       } else if (name == Slice("sstables")) {
584         PrintStats("leveldb.sstables");
585       } else {
586         if (!name.empty()) {  // No error message for empty name
587           std::fprintf(stderr, "unknown benchmark '%s'\n",
588                        name.ToString().c_str());
589         }
590       }
591 
592       if (fresh_db) {
593         if (FLAGS_use_existing_db) {
594           std::fprintf(stdout, "%-12s : skipped (--use_existing_db is true)\n",
595                        name.ToString().c_str());
596           method = nullptr;
597         } else {
598           delete db_;
599           db_ = nullptr;
600           DestroyDB(FLAGS_db, Options());
601           Open();
602         }
603       }
604 
605       if (method != nullptr) {
606         RunBenchmark(num_threads, name, method);
607       }
608     }
609   }
610 
611  private:
612   struct ThreadArg {
613     Benchmark* bm;
614     SharedState* shared;
615     ThreadState* thread;
616     void (Benchmark::*method)(ThreadState*);
617   };
618 
ThreadBody(void * v)619   static void ThreadBody(void* v) {
620     ThreadArg* arg = reinterpret_cast<ThreadArg*>(v);
621     SharedState* shared = arg->shared;
622     ThreadState* thread = arg->thread;
623     {
624       MutexLock l(&shared->mu);
625       shared->num_initialized++;
626       if (shared->num_initialized >= shared->total) {
627         shared->cv.SignalAll();
628       }
629       while (!shared->start) {
630         shared->cv.Wait();
631       }
632     }
633 
634     thread->stats.Start();
635     (arg->bm->*(arg->method))(thread);
636     thread->stats.Stop();
637 
638     {
639       MutexLock l(&shared->mu);
640       shared->num_done++;
641       if (shared->num_done >= shared->total) {
642         shared->cv.SignalAll();
643       }
644     }
645   }
646 
RunBenchmark(int n,Slice name,void (Benchmark::* method)(ThreadState *))647   void RunBenchmark(int n, Slice name,
648                     void (Benchmark::*method)(ThreadState*)) {
649     SharedState shared(n);
650 
651     ThreadArg* arg = new ThreadArg[n];
652     for (int i = 0; i < n; i++) {
653       arg[i].bm = this;
654       arg[i].method = method;
655       arg[i].shared = &shared;
656       ++total_thread_count_;
657       // Seed the thread's random state deterministically based upon thread
658       // creation across all benchmarks. This ensures that the seeds are unique
659       // but reproducible when rerunning the same set of benchmarks.
660       arg[i].thread = new ThreadState(i, /*seed=*/1000 + total_thread_count_);
661       arg[i].thread->shared = &shared;
662       g_env->StartThread(ThreadBody, &arg[i]);
663     }
664 
665     shared.mu.Lock();
666     while (shared.num_initialized < n) {
667       shared.cv.Wait();
668     }
669 
670     shared.start = true;
671     shared.cv.SignalAll();
672     while (shared.num_done < n) {
673       shared.cv.Wait();
674     }
675     shared.mu.Unlock();
676 
677     for (int i = 1; i < n; i++) {
678       arg[0].thread->stats.Merge(arg[i].thread->stats);
679     }
680     arg[0].thread->stats.Report(name);
681     if (FLAGS_comparisons) {
682       fprintf(stdout, "Comparisons: %zu\n", count_comparator_.comparisons());
683       count_comparator_.reset();
684       fflush(stdout);
685     }
686 
687     for (int i = 0; i < n; i++) {
688       delete arg[i].thread;
689     }
690     delete[] arg;
691   }
692 
Crc32c(ThreadState * thread)693   void Crc32c(ThreadState* thread) {
694     // Checksum about 500MB of data total
695     const int size = 4096;
696     const char* label = "(4K per op)";
697     std::string data(size, 'x');
698     int64_t bytes = 0;
699     uint32_t crc = 0;
700     while (bytes < 500 * 1048576) {
701       crc = crc32c::Value(data.data(), size);
702       thread->stats.FinishedSingleOp();
703       bytes += size;
704     }
705     // Print so result is not dead
706     std::fprintf(stderr, "... crc=0x%x\r", static_cast<unsigned int>(crc));
707 
708     thread->stats.AddBytes(bytes);
709     thread->stats.AddMessage(label);
710   }
711 
SnappyCompress(ThreadState * thread)712   void SnappyCompress(ThreadState* thread) {
713     RandomGenerator gen;
714     Slice input = gen.Generate(Options().block_size);
715     int64_t bytes = 0;
716     int64_t produced = 0;
717     bool ok = true;
718     std::string compressed;
719     while (ok && bytes < 1024 * 1048576) {  // Compress 1G
720       ok = port::Snappy_Compress(input.data(), input.size(), &compressed);
721       produced += compressed.size();
722       bytes += input.size();
723       thread->stats.FinishedSingleOp();
724     }
725 
726     if (!ok) {
727       thread->stats.AddMessage("(snappy failure)");
728     } else {
729       char buf[100];
730       std::snprintf(buf, sizeof(buf), "(output: %.1f%%)",
731                     (produced * 100.0) / bytes);
732       thread->stats.AddMessage(buf);
733       thread->stats.AddBytes(bytes);
734     }
735   }
736 
SnappyUncompress(ThreadState * thread)737   void SnappyUncompress(ThreadState* thread) {
738     RandomGenerator gen;
739     Slice input = gen.Generate(Options().block_size);
740     std::string compressed;
741     bool ok = port::Snappy_Compress(input.data(), input.size(), &compressed);
742     int64_t bytes = 0;
743     char* uncompressed = new char[input.size()];
744     while (ok && bytes < 1024 * 1048576) {  // Compress 1G
745       ok = port::Snappy_Uncompress(compressed.data(), compressed.size(),
746                                    uncompressed);
747       bytes += input.size();
748       thread->stats.FinishedSingleOp();
749     }
750     delete[] uncompressed;
751 
752     if (!ok) {
753       thread->stats.AddMessage("(snappy failure)");
754     } else {
755       thread->stats.AddBytes(bytes);
756     }
757   }
758 
Open()759   void Open() {
760     assert(db_ == nullptr);
761     Options options;
762     options.env = g_env;
763     options.create_if_missing = !FLAGS_use_existing_db;
764     options.block_cache = cache_;
765     options.write_buffer_size = FLAGS_write_buffer_size;
766     options.max_file_size = FLAGS_max_file_size;
767     options.block_size = FLAGS_block_size;
768     if (FLAGS_comparisons) {
769       options.comparator = &count_comparator_;
770     }
771     options.max_open_files = FLAGS_open_files;
772     options.filter_policy = filter_policy_;
773     options.reuse_logs = FLAGS_reuse_logs;
774     Status s = DB::Open(options, FLAGS_db, &db_);
775     if (!s.ok()) {
776       std::fprintf(stderr, "open error: %s\n", s.ToString().c_str());
777       std::exit(1);
778     }
779   }
780 
OpenBench(ThreadState * thread)781   void OpenBench(ThreadState* thread) {
782     for (int i = 0; i < num_; i++) {
783       delete db_;
784       Open();
785       thread->stats.FinishedSingleOp();
786     }
787   }
788 
WriteSeq(ThreadState * thread)789   void WriteSeq(ThreadState* thread) { DoWrite(thread, true); }
790 
WriteRandom(ThreadState * thread)791   void WriteRandom(ThreadState* thread) { DoWrite(thread, false); }
792 
DoWrite(ThreadState * thread,bool seq)793   void DoWrite(ThreadState* thread, bool seq) {
794     if (num_ != FLAGS_num) {
795       char msg[100];
796       std::snprintf(msg, sizeof(msg), "(%d ops)", num_);
797       thread->stats.AddMessage(msg);
798     }
799 
800     RandomGenerator gen;
801     WriteBatch batch;
802     Status s;
803     int64_t bytes = 0;
804     KeyBuffer key;
805     for (int i = 0; i < num_; i += entries_per_batch_) {
806       batch.Clear();
807       for (int j = 0; j < entries_per_batch_; j++) {
808         const int k = seq ? i + j : thread->rand.Uniform(FLAGS_num);
809         key.Set(k);
810         batch.Put(key.slice(), gen.Generate(value_size_));
811         bytes += value_size_ + key.slice().size();
812         thread->stats.FinishedSingleOp();
813       }
814       s = db_->Write(write_options_, &batch);
815       if (!s.ok()) {
816         std::fprintf(stderr, "put error: %s\n", s.ToString().c_str());
817         std::exit(1);
818       }
819     }
820     thread->stats.AddBytes(bytes);
821   }
822 
ReadSequential(ThreadState * thread)823   void ReadSequential(ThreadState* thread) {
824     Iterator* iter = db_->NewIterator(ReadOptions());
825     int i = 0;
826     int64_t bytes = 0;
827     for (iter->SeekToFirst(); i < reads_ && iter->Valid(); iter->Next()) {
828       bytes += iter->key().size() + iter->value().size();
829       thread->stats.FinishedSingleOp();
830       ++i;
831     }
832     delete iter;
833     thread->stats.AddBytes(bytes);
834   }
835 
ReadReverse(ThreadState * thread)836   void ReadReverse(ThreadState* thread) {
837     Iterator* iter = db_->NewIterator(ReadOptions());
838     int i = 0;
839     int64_t bytes = 0;
840     for (iter->SeekToLast(); i < reads_ && iter->Valid(); iter->Prev()) {
841       bytes += iter->key().size() + iter->value().size();
842       thread->stats.FinishedSingleOp();
843       ++i;
844     }
845     delete iter;
846     thread->stats.AddBytes(bytes);
847   }
848 
ReadRandom(ThreadState * thread)849   void ReadRandom(ThreadState* thread) {
850     ReadOptions options;
851     std::string value;
852     int found = 0;
853     KeyBuffer key;
854     for (int i = 0; i < reads_; i++) {
855       const int k = thread->rand.Uniform(FLAGS_num);
856       key.Set(k);
857       if (db_->Get(options, key.slice(), &value).ok()) {
858         found++;
859       }
860       thread->stats.FinishedSingleOp();
861     }
862     char msg[100];
863     std::snprintf(msg, sizeof(msg), "(%d of %d found)", found, num_);
864     thread->stats.AddMessage(msg);
865   }
866 
ReadMissing(ThreadState * thread)867   void ReadMissing(ThreadState* thread) {
868     ReadOptions options;
869     std::string value;
870     KeyBuffer key;
871     for (int i = 0; i < reads_; i++) {
872       const int k = thread->rand.Uniform(FLAGS_num);
873       key.Set(k);
874       Slice s = Slice(key.slice().data(), key.slice().size() - 1);
875       db_->Get(options, s, &value);
876       thread->stats.FinishedSingleOp();
877     }
878   }
879 
ReadHot(ThreadState * thread)880   void ReadHot(ThreadState* thread) {
881     ReadOptions options;
882     std::string value;
883     const int range = (FLAGS_num + 99) / 100;
884     KeyBuffer key;
885     for (int i = 0; i < reads_; i++) {
886       const int k = thread->rand.Uniform(range);
887       key.Set(k);
888       db_->Get(options, key.slice(), &value);
889       thread->stats.FinishedSingleOp();
890     }
891   }
892 
SeekRandom(ThreadState * thread)893   void SeekRandom(ThreadState* thread) {
894     ReadOptions options;
895     int found = 0;
896     KeyBuffer key;
897     for (int i = 0; i < reads_; i++) {
898       Iterator* iter = db_->NewIterator(options);
899       const int k = thread->rand.Uniform(FLAGS_num);
900       key.Set(k);
901       iter->Seek(key.slice());
902       if (iter->Valid() && iter->key() == key.slice()) found++;
903       delete iter;
904       thread->stats.FinishedSingleOp();
905     }
906     char msg[100];
907     snprintf(msg, sizeof(msg), "(%d of %d found)", found, num_);
908     thread->stats.AddMessage(msg);
909   }
910 
SeekOrdered(ThreadState * thread)911   void SeekOrdered(ThreadState* thread) {
912     ReadOptions options;
913     Iterator* iter = db_->NewIterator(options);
914     int found = 0;
915     int k = 0;
916     KeyBuffer key;
917     for (int i = 0; i < reads_; i++) {
918       k = (k + (thread->rand.Uniform(100))) % FLAGS_num;
919       key.Set(k);
920       iter->Seek(key.slice());
921       if (iter->Valid() && iter->key() == key.slice()) found++;
922       thread->stats.FinishedSingleOp();
923     }
924     delete iter;
925     char msg[100];
926     std::snprintf(msg, sizeof(msg), "(%d of %d found)", found, num_);
927     thread->stats.AddMessage(msg);
928   }
929 
DoDelete(ThreadState * thread,bool seq)930   void DoDelete(ThreadState* thread, bool seq) {
931     RandomGenerator gen;
932     WriteBatch batch;
933     Status s;
934     KeyBuffer key;
935     for (int i = 0; i < num_; i += entries_per_batch_) {
936       batch.Clear();
937       for (int j = 0; j < entries_per_batch_; j++) {
938         const int k = seq ? i + j : (thread->rand.Uniform(FLAGS_num));
939         key.Set(k);
940         batch.Delete(key.slice());
941         thread->stats.FinishedSingleOp();
942       }
943       s = db_->Write(write_options_, &batch);
944       if (!s.ok()) {
945         std::fprintf(stderr, "del error: %s\n", s.ToString().c_str());
946         std::exit(1);
947       }
948     }
949   }
950 
DeleteSeq(ThreadState * thread)951   void DeleteSeq(ThreadState* thread) { DoDelete(thread, true); }
952 
DeleteRandom(ThreadState * thread)953   void DeleteRandom(ThreadState* thread) { DoDelete(thread, false); }
954 
ReadWhileWriting(ThreadState * thread)955   void ReadWhileWriting(ThreadState* thread) {
956     if (thread->tid > 0) {
957       ReadRandom(thread);
958     } else {
959       // Special thread that keeps writing until other threads are done.
960       RandomGenerator gen;
961       KeyBuffer key;
962       while (true) {
963         {
964           MutexLock l(&thread->shared->mu);
965           if (thread->shared->num_done + 1 >= thread->shared->num_initialized) {
966             // Other threads have finished
967             break;
968           }
969         }
970 
971         const int k = thread->rand.Uniform(FLAGS_num);
972         key.Set(k);
973         Status s =
974             db_->Put(write_options_, key.slice(), gen.Generate(value_size_));
975         if (!s.ok()) {
976           std::fprintf(stderr, "put error: %s\n", s.ToString().c_str());
977           std::exit(1);
978         }
979       }
980 
981       // Do not count any of the preceding work/delay in stats.
982       thread->stats.Start();
983     }
984   }
985 
Compact(ThreadState * thread)986   void Compact(ThreadState* thread) { db_->CompactRange(nullptr, nullptr); }
987 
PrintStats(const char * key)988   void PrintStats(const char* key) {
989     std::string stats;
990     if (!db_->GetProperty(key, &stats)) {
991       stats = "(failed)";
992     }
993     std::fprintf(stdout, "\n%s\n", stats.c_str());
994   }
995 
WriteToFile(void * arg,const char * buf,int n)996   static void WriteToFile(void* arg, const char* buf, int n) {
997     reinterpret_cast<WritableFile*>(arg)->Append(Slice(buf, n));
998   }
999 
HeapProfile()1000   void HeapProfile() {
1001     char fname[100];
1002     std::snprintf(fname, sizeof(fname), "%s/heap-%04d", FLAGS_db,
1003                   ++heap_counter_);
1004     WritableFile* file;
1005     Status s = g_env->NewWritableFile(fname, &file);
1006     if (!s.ok()) {
1007       std::fprintf(stderr, "%s\n", s.ToString().c_str());
1008       return;
1009     }
1010     bool ok = port::GetHeapProfile(WriteToFile, file);
1011     delete file;
1012     if (!ok) {
1013       std::fprintf(stderr, "heap profiling not supported\n");
1014       g_env->RemoveFile(fname);
1015     }
1016   }
1017 };
1018 
1019 }  // namespace leveldb
1020 
main(int argc,char ** argv)1021 int main(int argc, char** argv) {
1022   FLAGS_write_buffer_size = leveldb::Options().write_buffer_size;
1023   FLAGS_max_file_size = leveldb::Options().max_file_size;
1024   FLAGS_block_size = leveldb::Options().block_size;
1025   FLAGS_open_files = leveldb::Options().max_open_files;
1026   std::string default_db_path;
1027 
1028   for (int i = 1; i < argc; i++) {
1029     double d;
1030     int n;
1031     char junk;
1032     if (leveldb::Slice(argv[i]).starts_with("--benchmarks=")) {
1033       FLAGS_benchmarks = argv[i] + strlen("--benchmarks=");
1034     } else if (sscanf(argv[i], "--compression_ratio=%lf%c", &d, &junk) == 1) {
1035       FLAGS_compression_ratio = d;
1036     } else if (sscanf(argv[i], "--histogram=%d%c", &n, &junk) == 1 &&
1037                (n == 0 || n == 1)) {
1038       FLAGS_histogram = n;
1039     } else if (sscanf(argv[i], "--comparisons=%d%c", &n, &junk) == 1 &&
1040                (n == 0 || n == 1)) {
1041       FLAGS_comparisons = n;
1042     } else if (sscanf(argv[i], "--use_existing_db=%d%c", &n, &junk) == 1 &&
1043                (n == 0 || n == 1)) {
1044       FLAGS_use_existing_db = n;
1045     } else if (sscanf(argv[i], "--reuse_logs=%d%c", &n, &junk) == 1 &&
1046                (n == 0 || n == 1)) {
1047       FLAGS_reuse_logs = n;
1048     } else if (sscanf(argv[i], "--num=%d%c", &n, &junk) == 1) {
1049       FLAGS_num = n;
1050     } else if (sscanf(argv[i], "--reads=%d%c", &n, &junk) == 1) {
1051       FLAGS_reads = n;
1052     } else if (sscanf(argv[i], "--threads=%d%c", &n, &junk) == 1) {
1053       FLAGS_threads = n;
1054     } else if (sscanf(argv[i], "--value_size=%d%c", &n, &junk) == 1) {
1055       FLAGS_value_size = n;
1056     } else if (sscanf(argv[i], "--write_buffer_size=%d%c", &n, &junk) == 1) {
1057       FLAGS_write_buffer_size = n;
1058     } else if (sscanf(argv[i], "--max_file_size=%d%c", &n, &junk) == 1) {
1059       FLAGS_max_file_size = n;
1060     } else if (sscanf(argv[i], "--block_size=%d%c", &n, &junk) == 1) {
1061       FLAGS_block_size = n;
1062     } else if (sscanf(argv[i], "--key_prefix=%d%c", &n, &junk) == 1) {
1063       FLAGS_key_prefix = n;
1064     } else if (sscanf(argv[i], "--cache_size=%d%c", &n, &junk) == 1) {
1065       FLAGS_cache_size = n;
1066     } else if (sscanf(argv[i], "--bloom_bits=%d%c", &n, &junk) == 1) {
1067       FLAGS_bloom_bits = n;
1068     } else if (sscanf(argv[i], "--open_files=%d%c", &n, &junk) == 1) {
1069       FLAGS_open_files = n;
1070     } else if (strncmp(argv[i], "--db=", 5) == 0) {
1071       FLAGS_db = argv[i] + 5;
1072     } else {
1073       std::fprintf(stderr, "Invalid flag '%s'\n", argv[i]);
1074       std::exit(1);
1075     }
1076   }
1077 
1078   leveldb::g_env = leveldb::Env::Default();
1079 
1080   // Choose a location for the test database if none given with --db=<path>
1081   if (FLAGS_db == nullptr) {
1082     leveldb::g_env->GetTestDirectory(&default_db_path);
1083     default_db_path += "/dbbench";
1084     FLAGS_db = default_db_path.c_str();
1085   }
1086 
1087   leveldb::Benchmark benchmark;
1088   benchmark.Run();
1089   return 0;
1090 }
1091