1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
4
5 #include <sys/types.h>
6
7 #include <atomic>
8 #include <cstdio>
9 #include <cstdlib>
10
11 #include "leveldb/cache.h"
12 #include "leveldb/comparator.h"
13 #include "leveldb/db.h"
14 #include "leveldb/env.h"
15 #include "leveldb/filter_policy.h"
16 #include "leveldb/write_batch.h"
17 #include "port/port.h"
18 #include "util/crc32c.h"
19 #include "util/histogram.h"
20 #include "util/mutexlock.h"
21 #include "util/random.h"
22 #include "util/testutil.h"
23
24 // Comma-separated list of operations to run in the specified order
25 // Actual benchmarks:
26 // fillseq -- write N values in sequential key order in async mode
27 // fillrandom -- write N values in random key order in async mode
28 // overwrite -- overwrite N values in random key order in async mode
29 // fillsync -- write N/100 values in random key order in sync mode
30 // fill100K -- write N/1000 100K values in random order in async mode
31 // deleteseq -- delete N keys in sequential order
32 // deleterandom -- delete N keys in random order
33 // readseq -- read N times sequentially
34 // readreverse -- read N times in reverse order
35 // readrandom -- read N times in random order
36 // readmissing -- read N missing keys in random order
37 // readhot -- read N times in random order from 1% section of DB
38 // seekrandom -- N random seeks
39 // seekordered -- N ordered seeks
40 // open -- cost of opening a DB
41 // crc32c -- repeated crc32c of 4K of data
42 // Meta operations:
43 // compact -- Compact the entire DB
44 // stats -- Print DB stats
45 // sstables -- Print sstable info
46 // heapprofile -- Dump a heap profile (if supported by this port)
47 static const char* FLAGS_benchmarks =
48 "fillseq,"
49 "fillsync,"
50 "fillrandom,"
51 "overwrite,"
52 "readrandom,"
53 "readrandom," // Extra run to allow previous compactions to quiesce
54 "readseq,"
55 "readreverse,"
56 "compact,"
57 "readrandom,"
58 "readseq,"
59 "readreverse,"
60 "fill100K,"
61 "crc32c,"
62 "snappycomp,"
63 "snappyuncomp,";
64
65 // Number of key/values to place in database
66 static int FLAGS_num = 1000000;
67
68 // Number of read operations to do. If negative, do FLAGS_num reads.
69 static int FLAGS_reads = -1;
70
71 // Number of concurrent threads to run.
72 static int FLAGS_threads = 1;
73
74 // Size of each value
75 static int FLAGS_value_size = 100;
76
77 // Arrange to generate values that shrink to this fraction of
78 // their original size after compression
79 static double FLAGS_compression_ratio = 0.5;
80
81 // Print histogram of operation timings
82 static bool FLAGS_histogram = false;
83
84 // Count the number of string comparisons performed
85 static bool FLAGS_comparisons = false;
86
87 // Number of bytes to buffer in memtable before compacting
88 // (initialized to default value by "main")
89 static int FLAGS_write_buffer_size = 0;
90
91 // Number of bytes written to each file.
92 // (initialized to default value by "main")
93 static int FLAGS_max_file_size = 0;
94
95 // Approximate size of user data packed per block (before compression.
96 // (initialized to default value by "main")
97 static int FLAGS_block_size = 0;
98
99 // Number of bytes to use as a cache of uncompressed data.
100 // Negative means use default settings.
101 static int FLAGS_cache_size = -1;
102
103 // Maximum number of files to keep open at the same time (use default if == 0)
104 static int FLAGS_open_files = 0;
105
106 // Bloom filter bits per key.
107 // Negative means use default settings.
108 static int FLAGS_bloom_bits = -1;
109
110 // Common key prefix length.
111 static int FLAGS_key_prefix = 0;
112
113 // If true, do not destroy the existing database. If you set this
114 // flag and also specify a benchmark that wants a fresh database, that
115 // benchmark will fail.
116 static bool FLAGS_use_existing_db = false;
117
118 // If true, reuse existing log/MANIFEST files when re-opening a database.
119 static bool FLAGS_reuse_logs = false;
120
121 // Use the db with the following name.
122 static const char* FLAGS_db = nullptr;
123
124 namespace leveldb {
125
126 namespace {
127 leveldb::Env* g_env = nullptr;
128
129 class CountComparator : public Comparator {
130 public:
CountComparator(const Comparator * wrapped)131 CountComparator(const Comparator* wrapped) : wrapped_(wrapped) {}
~CountComparator()132 ~CountComparator() override {}
Compare(const Slice & a,const Slice & b) const133 int Compare(const Slice& a, const Slice& b) const override {
134 count_.fetch_add(1, std::memory_order_relaxed);
135 return wrapped_->Compare(a, b);
136 }
Name() const137 const char* Name() const override { return wrapped_->Name(); }
FindShortestSeparator(std::string * start,const Slice & limit) const138 void FindShortestSeparator(std::string* start,
139 const Slice& limit) const override {
140 wrapped_->FindShortestSeparator(start, limit);
141 }
142
FindShortSuccessor(std::string * key) const143 void FindShortSuccessor(std::string* key) const override {
144 return wrapped_->FindShortSuccessor(key);
145 }
146
comparisons() const147 size_t comparisons() const { return count_.load(std::memory_order_relaxed); }
148
reset()149 void reset() { count_.store(0, std::memory_order_relaxed); }
150
151 private:
152 mutable std::atomic<size_t> count_{0};
153 const Comparator* const wrapped_;
154 };
155
156 // Helper for quickly generating random data.
157 class RandomGenerator {
158 private:
159 std::string data_;
160 int pos_;
161
162 public:
RandomGenerator()163 RandomGenerator() {
164 // We use a limited amount of data over and over again and ensure
165 // that it is larger than the compression window (32KB), and also
166 // large enough to serve all typical value sizes we want to write.
167 Random rnd(301);
168 std::string piece;
169 while (data_.size() < 1048576) {
170 // Add a short fragment that is as compressible as specified
171 // by FLAGS_compression_ratio.
172 test::CompressibleString(&rnd, FLAGS_compression_ratio, 100, &piece);
173 data_.append(piece);
174 }
175 pos_ = 0;
176 }
177
Generate(size_t len)178 Slice Generate(size_t len) {
179 if (pos_ + len > data_.size()) {
180 pos_ = 0;
181 assert(len < data_.size());
182 }
183 pos_ += len;
184 return Slice(data_.data() + pos_ - len, len);
185 }
186 };
187
188 class KeyBuffer {
189 public:
KeyBuffer()190 KeyBuffer() {
191 assert(FLAGS_key_prefix < sizeof(buffer_));
192 memset(buffer_, 'a', FLAGS_key_prefix);
193 }
194 KeyBuffer& operator=(KeyBuffer& other) = delete;
195 KeyBuffer(KeyBuffer& other) = delete;
196
Set(int k)197 void Set(int k) {
198 std::snprintf(buffer_ + FLAGS_key_prefix,
199 sizeof(buffer_) - FLAGS_key_prefix, "%016d", k);
200 }
201
slice() const202 Slice slice() const { return Slice(buffer_, FLAGS_key_prefix + 16); }
203
204 private:
205 char buffer_[1024];
206 };
207
208 #if defined(__linux)
TrimSpace(Slice s)209 static Slice TrimSpace(Slice s) {
210 size_t start = 0;
211 while (start < s.size() && isspace(s[start])) {
212 start++;
213 }
214 size_t limit = s.size();
215 while (limit > start && isspace(s[limit - 1])) {
216 limit--;
217 }
218 return Slice(s.data() + start, limit - start);
219 }
220 #endif
221
AppendWithSpace(std::string * str,Slice msg)222 static void AppendWithSpace(std::string* str, Slice msg) {
223 if (msg.empty()) return;
224 if (!str->empty()) {
225 str->push_back(' ');
226 }
227 str->append(msg.data(), msg.size());
228 }
229
230 class Stats {
231 private:
232 double start_;
233 double finish_;
234 double seconds_;
235 int done_;
236 int next_report_;
237 int64_t bytes_;
238 double last_op_finish_;
239 Histogram hist_;
240 std::string message_;
241
242 public:
Stats()243 Stats() { Start(); }
244
Start()245 void Start() {
246 next_report_ = 100;
247 hist_.Clear();
248 done_ = 0;
249 bytes_ = 0;
250 seconds_ = 0;
251 message_.clear();
252 start_ = finish_ = last_op_finish_ = g_env->NowMicros();
253 }
254
Merge(const Stats & other)255 void Merge(const Stats& other) {
256 hist_.Merge(other.hist_);
257 done_ += other.done_;
258 bytes_ += other.bytes_;
259 seconds_ += other.seconds_;
260 if (other.start_ < start_) start_ = other.start_;
261 if (other.finish_ > finish_) finish_ = other.finish_;
262
263 // Just keep the messages from one thread
264 if (message_.empty()) message_ = other.message_;
265 }
266
Stop()267 void Stop() {
268 finish_ = g_env->NowMicros();
269 seconds_ = (finish_ - start_) * 1e-6;
270 }
271
AddMessage(Slice msg)272 void AddMessage(Slice msg) { AppendWithSpace(&message_, msg); }
273
FinishedSingleOp()274 void FinishedSingleOp() {
275 if (FLAGS_histogram) {
276 double now = g_env->NowMicros();
277 double micros = now - last_op_finish_;
278 hist_.Add(micros);
279 if (micros > 20000) {
280 std::fprintf(stderr, "long op: %.1f micros%30s\r", micros, "");
281 std::fflush(stderr);
282 }
283 last_op_finish_ = now;
284 }
285
286 done_++;
287 if (done_ >= next_report_) {
288 if (next_report_ < 1000)
289 next_report_ += 100;
290 else if (next_report_ < 5000)
291 next_report_ += 500;
292 else if (next_report_ < 10000)
293 next_report_ += 1000;
294 else if (next_report_ < 50000)
295 next_report_ += 5000;
296 else if (next_report_ < 100000)
297 next_report_ += 10000;
298 else if (next_report_ < 500000)
299 next_report_ += 50000;
300 else
301 next_report_ += 100000;
302 std::fprintf(stderr, "... finished %d ops%30s\r", done_, "");
303 std::fflush(stderr);
304 }
305 }
306
AddBytes(int64_t n)307 void AddBytes(int64_t n) { bytes_ += n; }
308
Report(const Slice & name)309 void Report(const Slice& name) {
310 // Pretend at least one op was done in case we are running a benchmark
311 // that does not call FinishedSingleOp().
312 if (done_ < 1) done_ = 1;
313
314 std::string extra;
315 if (bytes_ > 0) {
316 // Rate is computed on actual elapsed time, not the sum of per-thread
317 // elapsed times.
318 double elapsed = (finish_ - start_) * 1e-6;
319 char rate[100];
320 std::snprintf(rate, sizeof(rate), "%6.1f MB/s",
321 (bytes_ / 1048576.0) / elapsed);
322 extra = rate;
323 }
324 AppendWithSpace(&extra, message_);
325
326 std::fprintf(stdout, "%-12s : %11.3f micros/op;%s%s\n",
327 name.ToString().c_str(), seconds_ * 1e6 / done_,
328 (extra.empty() ? "" : " "), extra.c_str());
329 if (FLAGS_histogram) {
330 std::fprintf(stdout, "Microseconds per op:\n%s\n",
331 hist_.ToString().c_str());
332 }
333 std::fflush(stdout);
334 }
335 };
336
337 // State shared by all concurrent executions of the same benchmark.
338 struct SharedState {
339 port::Mutex mu;
340 port::CondVar cv GUARDED_BY(mu);
341 int total GUARDED_BY(mu);
342
343 // Each thread goes through the following states:
344 // (1) initializing
345 // (2) waiting for others to be initialized
346 // (3) running
347 // (4) done
348
349 int num_initialized GUARDED_BY(mu);
350 int num_done GUARDED_BY(mu);
351 bool start GUARDED_BY(mu);
352
SharedStateleveldb::__anon7755747a0111::SharedState353 SharedState(int total)
354 : cv(&mu), total(total), num_initialized(0), num_done(0), start(false) {}
355 };
356
357 // Per-thread state for concurrent executions of the same benchmark.
358 struct ThreadState {
359 int tid; // 0..n-1 when running in n threads
360 Random rand; // Has different seeds for different threads
361 Stats stats;
362 SharedState* shared;
363
ThreadStateleveldb::__anon7755747a0111::ThreadState364 ThreadState(int index, int seed) : tid(index), rand(seed), shared(nullptr) {}
365 };
366
367 } // namespace
368
369 class Benchmark {
370 private:
371 Cache* cache_;
372 const FilterPolicy* filter_policy_;
373 DB* db_;
374 int num_;
375 int value_size_;
376 int entries_per_batch_;
377 WriteOptions write_options_;
378 int reads_;
379 int heap_counter_;
380 CountComparator count_comparator_;
381 int total_thread_count_;
382
PrintHeader()383 void PrintHeader() {
384 const int kKeySize = 16 + FLAGS_key_prefix;
385 PrintEnvironment();
386 std::fprintf(stdout, "Keys: %d bytes each\n", kKeySize);
387 std::fprintf(
388 stdout, "Values: %d bytes each (%d bytes after compression)\n",
389 FLAGS_value_size,
390 static_cast<int>(FLAGS_value_size * FLAGS_compression_ratio + 0.5));
391 std::fprintf(stdout, "Entries: %d\n", num_);
392 std::fprintf(stdout, "RawSize: %.1f MB (estimated)\n",
393 ((static_cast<int64_t>(kKeySize + FLAGS_value_size) * num_) /
394 1048576.0));
395 std::fprintf(
396 stdout, "FileSize: %.1f MB (estimated)\n",
397 (((kKeySize + FLAGS_value_size * FLAGS_compression_ratio) * num_) /
398 1048576.0));
399 PrintWarnings();
400 std::fprintf(stdout, "------------------------------------------------\n");
401 }
402
PrintWarnings()403 void PrintWarnings() {
404 #if defined(__GNUC__) && !defined(__OPTIMIZE__)
405 std::fprintf(
406 stdout,
407 "WARNING: Optimization is disabled: benchmarks unnecessarily slow\n");
408 #endif
409 #ifndef NDEBUG
410 std::fprintf(
411 stdout,
412 "WARNING: Assertions are enabled; benchmarks unnecessarily slow\n");
413 #endif
414
415 // See if snappy is working by attempting to compress a compressible string
416 const char text[] = "yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy";
417 std::string compressed;
418 if (!port::Snappy_Compress(text, sizeof(text), &compressed)) {
419 std::fprintf(stdout, "WARNING: Snappy compression is not enabled\n");
420 } else if (compressed.size() >= sizeof(text)) {
421 std::fprintf(stdout, "WARNING: Snappy compression is not effective\n");
422 }
423 }
424
PrintEnvironment()425 void PrintEnvironment() {
426 std::fprintf(stderr, "LevelDB: version %d.%d\n", kMajorVersion,
427 kMinorVersion);
428
429 #if defined(__linux)
430 time_t now = time(nullptr);
431 std::fprintf(stderr, "Date: %s",
432 ctime(&now)); // ctime() adds newline
433
434 FILE* cpuinfo = std::fopen("/proc/cpuinfo", "r");
435 if (cpuinfo != nullptr) {
436 char line[1000];
437 int num_cpus = 0;
438 std::string cpu_type;
439 std::string cache_size;
440 while (fgets(line, sizeof(line), cpuinfo) != nullptr) {
441 const char* sep = strchr(line, ':');
442 if (sep == nullptr) {
443 continue;
444 }
445 Slice key = TrimSpace(Slice(line, sep - 1 - line));
446 Slice val = TrimSpace(Slice(sep + 1));
447 if (key == "model name") {
448 ++num_cpus;
449 cpu_type = val.ToString();
450 } else if (key == "cache size") {
451 cache_size = val.ToString();
452 }
453 }
454 std::fclose(cpuinfo);
455 std::fprintf(stderr, "CPU: %d * %s\n", num_cpus, cpu_type.c_str());
456 std::fprintf(stderr, "CPUCache: %s\n", cache_size.c_str());
457 }
458 #endif
459 }
460
461 public:
Benchmark()462 Benchmark()
463 : cache_(FLAGS_cache_size >= 0 ? NewLRUCache(FLAGS_cache_size) : nullptr),
464 filter_policy_(FLAGS_bloom_bits >= 0
465 ? NewBloomFilterPolicy(FLAGS_bloom_bits)
466 : nullptr),
467 db_(nullptr),
468 num_(FLAGS_num),
469 value_size_(FLAGS_value_size),
470 entries_per_batch_(1),
471 reads_(FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads),
472 heap_counter_(0),
473 count_comparator_(BytewiseComparator()),
474 total_thread_count_(0) {
475 std::vector<std::string> files;
476 g_env->GetChildren(FLAGS_db, &files);
477 for (size_t i = 0; i < files.size(); i++) {
478 if (Slice(files[i]).starts_with("heap-")) {
479 g_env->RemoveFile(std::string(FLAGS_db) + "/" + files[i]);
480 }
481 }
482 if (!FLAGS_use_existing_db) {
483 DestroyDB(FLAGS_db, Options());
484 }
485 }
486
~Benchmark()487 ~Benchmark() {
488 delete db_;
489 delete cache_;
490 delete filter_policy_;
491 }
492
Run()493 void Run() {
494 PrintHeader();
495 Open();
496
497 const char* benchmarks = FLAGS_benchmarks;
498 while (benchmarks != nullptr) {
499 const char* sep = strchr(benchmarks, ',');
500 Slice name;
501 if (sep == nullptr) {
502 name = benchmarks;
503 benchmarks = nullptr;
504 } else {
505 name = Slice(benchmarks, sep - benchmarks);
506 benchmarks = sep + 1;
507 }
508
509 // Reset parameters that may be overridden below
510 num_ = FLAGS_num;
511 reads_ = (FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads);
512 value_size_ = FLAGS_value_size;
513 entries_per_batch_ = 1;
514 write_options_ = WriteOptions();
515
516 void (Benchmark::*method)(ThreadState*) = nullptr;
517 bool fresh_db = false;
518 int num_threads = FLAGS_threads;
519
520 if (name == Slice("open")) {
521 method = &Benchmark::OpenBench;
522 num_ /= 10000;
523 if (num_ < 1) num_ = 1;
524 } else if (name == Slice("fillseq")) {
525 fresh_db = true;
526 method = &Benchmark::WriteSeq;
527 } else if (name == Slice("fillbatch")) {
528 fresh_db = true;
529 entries_per_batch_ = 1000;
530 method = &Benchmark::WriteSeq;
531 } else if (name == Slice("fillrandom")) {
532 fresh_db = true;
533 method = &Benchmark::WriteRandom;
534 } else if (name == Slice("overwrite")) {
535 fresh_db = false;
536 method = &Benchmark::WriteRandom;
537 } else if (name == Slice("fillsync")) {
538 fresh_db = true;
539 num_ /= 1000;
540 write_options_.sync = true;
541 method = &Benchmark::WriteRandom;
542 } else if (name == Slice("fill100K")) {
543 fresh_db = true;
544 num_ /= 1000;
545 value_size_ = 100 * 1000;
546 method = &Benchmark::WriteRandom;
547 } else if (name == Slice("readseq")) {
548 method = &Benchmark::ReadSequential;
549 } else if (name == Slice("readreverse")) {
550 method = &Benchmark::ReadReverse;
551 } else if (name == Slice("readrandom")) {
552 method = &Benchmark::ReadRandom;
553 } else if (name == Slice("readmissing")) {
554 method = &Benchmark::ReadMissing;
555 } else if (name == Slice("seekrandom")) {
556 method = &Benchmark::SeekRandom;
557 } else if (name == Slice("seekordered")) {
558 method = &Benchmark::SeekOrdered;
559 } else if (name == Slice("readhot")) {
560 method = &Benchmark::ReadHot;
561 } else if (name == Slice("readrandomsmall")) {
562 reads_ /= 1000;
563 method = &Benchmark::ReadRandom;
564 } else if (name == Slice("deleteseq")) {
565 method = &Benchmark::DeleteSeq;
566 } else if (name == Slice("deleterandom")) {
567 method = &Benchmark::DeleteRandom;
568 } else if (name == Slice("readwhilewriting")) {
569 num_threads++; // Add extra thread for writing
570 method = &Benchmark::ReadWhileWriting;
571 } else if (name == Slice("compact")) {
572 method = &Benchmark::Compact;
573 } else if (name == Slice("crc32c")) {
574 method = &Benchmark::Crc32c;
575 } else if (name == Slice("snappycomp")) {
576 method = &Benchmark::SnappyCompress;
577 } else if (name == Slice("snappyuncomp")) {
578 method = &Benchmark::SnappyUncompress;
579 } else if (name == Slice("heapprofile")) {
580 HeapProfile();
581 } else if (name == Slice("stats")) {
582 PrintStats("leveldb.stats");
583 } else if (name == Slice("sstables")) {
584 PrintStats("leveldb.sstables");
585 } else {
586 if (!name.empty()) { // No error message for empty name
587 std::fprintf(stderr, "unknown benchmark '%s'\n",
588 name.ToString().c_str());
589 }
590 }
591
592 if (fresh_db) {
593 if (FLAGS_use_existing_db) {
594 std::fprintf(stdout, "%-12s : skipped (--use_existing_db is true)\n",
595 name.ToString().c_str());
596 method = nullptr;
597 } else {
598 delete db_;
599 db_ = nullptr;
600 DestroyDB(FLAGS_db, Options());
601 Open();
602 }
603 }
604
605 if (method != nullptr) {
606 RunBenchmark(num_threads, name, method);
607 }
608 }
609 }
610
611 private:
612 struct ThreadArg {
613 Benchmark* bm;
614 SharedState* shared;
615 ThreadState* thread;
616 void (Benchmark::*method)(ThreadState*);
617 };
618
ThreadBody(void * v)619 static void ThreadBody(void* v) {
620 ThreadArg* arg = reinterpret_cast<ThreadArg*>(v);
621 SharedState* shared = arg->shared;
622 ThreadState* thread = arg->thread;
623 {
624 MutexLock l(&shared->mu);
625 shared->num_initialized++;
626 if (shared->num_initialized >= shared->total) {
627 shared->cv.SignalAll();
628 }
629 while (!shared->start) {
630 shared->cv.Wait();
631 }
632 }
633
634 thread->stats.Start();
635 (arg->bm->*(arg->method))(thread);
636 thread->stats.Stop();
637
638 {
639 MutexLock l(&shared->mu);
640 shared->num_done++;
641 if (shared->num_done >= shared->total) {
642 shared->cv.SignalAll();
643 }
644 }
645 }
646
RunBenchmark(int n,Slice name,void (Benchmark::* method)(ThreadState *))647 void RunBenchmark(int n, Slice name,
648 void (Benchmark::*method)(ThreadState*)) {
649 SharedState shared(n);
650
651 ThreadArg* arg = new ThreadArg[n];
652 for (int i = 0; i < n; i++) {
653 arg[i].bm = this;
654 arg[i].method = method;
655 arg[i].shared = &shared;
656 ++total_thread_count_;
657 // Seed the thread's random state deterministically based upon thread
658 // creation across all benchmarks. This ensures that the seeds are unique
659 // but reproducible when rerunning the same set of benchmarks.
660 arg[i].thread = new ThreadState(i, /*seed=*/1000 + total_thread_count_);
661 arg[i].thread->shared = &shared;
662 g_env->StartThread(ThreadBody, &arg[i]);
663 }
664
665 shared.mu.Lock();
666 while (shared.num_initialized < n) {
667 shared.cv.Wait();
668 }
669
670 shared.start = true;
671 shared.cv.SignalAll();
672 while (shared.num_done < n) {
673 shared.cv.Wait();
674 }
675 shared.mu.Unlock();
676
677 for (int i = 1; i < n; i++) {
678 arg[0].thread->stats.Merge(arg[i].thread->stats);
679 }
680 arg[0].thread->stats.Report(name);
681 if (FLAGS_comparisons) {
682 fprintf(stdout, "Comparisons: %zu\n", count_comparator_.comparisons());
683 count_comparator_.reset();
684 fflush(stdout);
685 }
686
687 for (int i = 0; i < n; i++) {
688 delete arg[i].thread;
689 }
690 delete[] arg;
691 }
692
Crc32c(ThreadState * thread)693 void Crc32c(ThreadState* thread) {
694 // Checksum about 500MB of data total
695 const int size = 4096;
696 const char* label = "(4K per op)";
697 std::string data(size, 'x');
698 int64_t bytes = 0;
699 uint32_t crc = 0;
700 while (bytes < 500 * 1048576) {
701 crc = crc32c::Value(data.data(), size);
702 thread->stats.FinishedSingleOp();
703 bytes += size;
704 }
705 // Print so result is not dead
706 std::fprintf(stderr, "... crc=0x%x\r", static_cast<unsigned int>(crc));
707
708 thread->stats.AddBytes(bytes);
709 thread->stats.AddMessage(label);
710 }
711
SnappyCompress(ThreadState * thread)712 void SnappyCompress(ThreadState* thread) {
713 RandomGenerator gen;
714 Slice input = gen.Generate(Options().block_size);
715 int64_t bytes = 0;
716 int64_t produced = 0;
717 bool ok = true;
718 std::string compressed;
719 while (ok && bytes < 1024 * 1048576) { // Compress 1G
720 ok = port::Snappy_Compress(input.data(), input.size(), &compressed);
721 produced += compressed.size();
722 bytes += input.size();
723 thread->stats.FinishedSingleOp();
724 }
725
726 if (!ok) {
727 thread->stats.AddMessage("(snappy failure)");
728 } else {
729 char buf[100];
730 std::snprintf(buf, sizeof(buf), "(output: %.1f%%)",
731 (produced * 100.0) / bytes);
732 thread->stats.AddMessage(buf);
733 thread->stats.AddBytes(bytes);
734 }
735 }
736
SnappyUncompress(ThreadState * thread)737 void SnappyUncompress(ThreadState* thread) {
738 RandomGenerator gen;
739 Slice input = gen.Generate(Options().block_size);
740 std::string compressed;
741 bool ok = port::Snappy_Compress(input.data(), input.size(), &compressed);
742 int64_t bytes = 0;
743 char* uncompressed = new char[input.size()];
744 while (ok && bytes < 1024 * 1048576) { // Compress 1G
745 ok = port::Snappy_Uncompress(compressed.data(), compressed.size(),
746 uncompressed);
747 bytes += input.size();
748 thread->stats.FinishedSingleOp();
749 }
750 delete[] uncompressed;
751
752 if (!ok) {
753 thread->stats.AddMessage("(snappy failure)");
754 } else {
755 thread->stats.AddBytes(bytes);
756 }
757 }
758
Open()759 void Open() {
760 assert(db_ == nullptr);
761 Options options;
762 options.env = g_env;
763 options.create_if_missing = !FLAGS_use_existing_db;
764 options.block_cache = cache_;
765 options.write_buffer_size = FLAGS_write_buffer_size;
766 options.max_file_size = FLAGS_max_file_size;
767 options.block_size = FLAGS_block_size;
768 if (FLAGS_comparisons) {
769 options.comparator = &count_comparator_;
770 }
771 options.max_open_files = FLAGS_open_files;
772 options.filter_policy = filter_policy_;
773 options.reuse_logs = FLAGS_reuse_logs;
774 Status s = DB::Open(options, FLAGS_db, &db_);
775 if (!s.ok()) {
776 std::fprintf(stderr, "open error: %s\n", s.ToString().c_str());
777 std::exit(1);
778 }
779 }
780
OpenBench(ThreadState * thread)781 void OpenBench(ThreadState* thread) {
782 for (int i = 0; i < num_; i++) {
783 delete db_;
784 Open();
785 thread->stats.FinishedSingleOp();
786 }
787 }
788
WriteSeq(ThreadState * thread)789 void WriteSeq(ThreadState* thread) { DoWrite(thread, true); }
790
WriteRandom(ThreadState * thread)791 void WriteRandom(ThreadState* thread) { DoWrite(thread, false); }
792
DoWrite(ThreadState * thread,bool seq)793 void DoWrite(ThreadState* thread, bool seq) {
794 if (num_ != FLAGS_num) {
795 char msg[100];
796 std::snprintf(msg, sizeof(msg), "(%d ops)", num_);
797 thread->stats.AddMessage(msg);
798 }
799
800 RandomGenerator gen;
801 WriteBatch batch;
802 Status s;
803 int64_t bytes = 0;
804 KeyBuffer key;
805 for (int i = 0; i < num_; i += entries_per_batch_) {
806 batch.Clear();
807 for (int j = 0; j < entries_per_batch_; j++) {
808 const int k = seq ? i + j : thread->rand.Uniform(FLAGS_num);
809 key.Set(k);
810 batch.Put(key.slice(), gen.Generate(value_size_));
811 bytes += value_size_ + key.slice().size();
812 thread->stats.FinishedSingleOp();
813 }
814 s = db_->Write(write_options_, &batch);
815 if (!s.ok()) {
816 std::fprintf(stderr, "put error: %s\n", s.ToString().c_str());
817 std::exit(1);
818 }
819 }
820 thread->stats.AddBytes(bytes);
821 }
822
ReadSequential(ThreadState * thread)823 void ReadSequential(ThreadState* thread) {
824 Iterator* iter = db_->NewIterator(ReadOptions());
825 int i = 0;
826 int64_t bytes = 0;
827 for (iter->SeekToFirst(); i < reads_ && iter->Valid(); iter->Next()) {
828 bytes += iter->key().size() + iter->value().size();
829 thread->stats.FinishedSingleOp();
830 ++i;
831 }
832 delete iter;
833 thread->stats.AddBytes(bytes);
834 }
835
ReadReverse(ThreadState * thread)836 void ReadReverse(ThreadState* thread) {
837 Iterator* iter = db_->NewIterator(ReadOptions());
838 int i = 0;
839 int64_t bytes = 0;
840 for (iter->SeekToLast(); i < reads_ && iter->Valid(); iter->Prev()) {
841 bytes += iter->key().size() + iter->value().size();
842 thread->stats.FinishedSingleOp();
843 ++i;
844 }
845 delete iter;
846 thread->stats.AddBytes(bytes);
847 }
848
ReadRandom(ThreadState * thread)849 void ReadRandom(ThreadState* thread) {
850 ReadOptions options;
851 std::string value;
852 int found = 0;
853 KeyBuffer key;
854 for (int i = 0; i < reads_; i++) {
855 const int k = thread->rand.Uniform(FLAGS_num);
856 key.Set(k);
857 if (db_->Get(options, key.slice(), &value).ok()) {
858 found++;
859 }
860 thread->stats.FinishedSingleOp();
861 }
862 char msg[100];
863 std::snprintf(msg, sizeof(msg), "(%d of %d found)", found, num_);
864 thread->stats.AddMessage(msg);
865 }
866
ReadMissing(ThreadState * thread)867 void ReadMissing(ThreadState* thread) {
868 ReadOptions options;
869 std::string value;
870 KeyBuffer key;
871 for (int i = 0; i < reads_; i++) {
872 const int k = thread->rand.Uniform(FLAGS_num);
873 key.Set(k);
874 Slice s = Slice(key.slice().data(), key.slice().size() - 1);
875 db_->Get(options, s, &value);
876 thread->stats.FinishedSingleOp();
877 }
878 }
879
ReadHot(ThreadState * thread)880 void ReadHot(ThreadState* thread) {
881 ReadOptions options;
882 std::string value;
883 const int range = (FLAGS_num + 99) / 100;
884 KeyBuffer key;
885 for (int i = 0; i < reads_; i++) {
886 const int k = thread->rand.Uniform(range);
887 key.Set(k);
888 db_->Get(options, key.slice(), &value);
889 thread->stats.FinishedSingleOp();
890 }
891 }
892
SeekRandom(ThreadState * thread)893 void SeekRandom(ThreadState* thread) {
894 ReadOptions options;
895 int found = 0;
896 KeyBuffer key;
897 for (int i = 0; i < reads_; i++) {
898 Iterator* iter = db_->NewIterator(options);
899 const int k = thread->rand.Uniform(FLAGS_num);
900 key.Set(k);
901 iter->Seek(key.slice());
902 if (iter->Valid() && iter->key() == key.slice()) found++;
903 delete iter;
904 thread->stats.FinishedSingleOp();
905 }
906 char msg[100];
907 snprintf(msg, sizeof(msg), "(%d of %d found)", found, num_);
908 thread->stats.AddMessage(msg);
909 }
910
SeekOrdered(ThreadState * thread)911 void SeekOrdered(ThreadState* thread) {
912 ReadOptions options;
913 Iterator* iter = db_->NewIterator(options);
914 int found = 0;
915 int k = 0;
916 KeyBuffer key;
917 for (int i = 0; i < reads_; i++) {
918 k = (k + (thread->rand.Uniform(100))) % FLAGS_num;
919 key.Set(k);
920 iter->Seek(key.slice());
921 if (iter->Valid() && iter->key() == key.slice()) found++;
922 thread->stats.FinishedSingleOp();
923 }
924 delete iter;
925 char msg[100];
926 std::snprintf(msg, sizeof(msg), "(%d of %d found)", found, num_);
927 thread->stats.AddMessage(msg);
928 }
929
DoDelete(ThreadState * thread,bool seq)930 void DoDelete(ThreadState* thread, bool seq) {
931 RandomGenerator gen;
932 WriteBatch batch;
933 Status s;
934 KeyBuffer key;
935 for (int i = 0; i < num_; i += entries_per_batch_) {
936 batch.Clear();
937 for (int j = 0; j < entries_per_batch_; j++) {
938 const int k = seq ? i + j : (thread->rand.Uniform(FLAGS_num));
939 key.Set(k);
940 batch.Delete(key.slice());
941 thread->stats.FinishedSingleOp();
942 }
943 s = db_->Write(write_options_, &batch);
944 if (!s.ok()) {
945 std::fprintf(stderr, "del error: %s\n", s.ToString().c_str());
946 std::exit(1);
947 }
948 }
949 }
950
DeleteSeq(ThreadState * thread)951 void DeleteSeq(ThreadState* thread) { DoDelete(thread, true); }
952
DeleteRandom(ThreadState * thread)953 void DeleteRandom(ThreadState* thread) { DoDelete(thread, false); }
954
ReadWhileWriting(ThreadState * thread)955 void ReadWhileWriting(ThreadState* thread) {
956 if (thread->tid > 0) {
957 ReadRandom(thread);
958 } else {
959 // Special thread that keeps writing until other threads are done.
960 RandomGenerator gen;
961 KeyBuffer key;
962 while (true) {
963 {
964 MutexLock l(&thread->shared->mu);
965 if (thread->shared->num_done + 1 >= thread->shared->num_initialized) {
966 // Other threads have finished
967 break;
968 }
969 }
970
971 const int k = thread->rand.Uniform(FLAGS_num);
972 key.Set(k);
973 Status s =
974 db_->Put(write_options_, key.slice(), gen.Generate(value_size_));
975 if (!s.ok()) {
976 std::fprintf(stderr, "put error: %s\n", s.ToString().c_str());
977 std::exit(1);
978 }
979 }
980
981 // Do not count any of the preceding work/delay in stats.
982 thread->stats.Start();
983 }
984 }
985
Compact(ThreadState * thread)986 void Compact(ThreadState* thread) { db_->CompactRange(nullptr, nullptr); }
987
PrintStats(const char * key)988 void PrintStats(const char* key) {
989 std::string stats;
990 if (!db_->GetProperty(key, &stats)) {
991 stats = "(failed)";
992 }
993 std::fprintf(stdout, "\n%s\n", stats.c_str());
994 }
995
WriteToFile(void * arg,const char * buf,int n)996 static void WriteToFile(void* arg, const char* buf, int n) {
997 reinterpret_cast<WritableFile*>(arg)->Append(Slice(buf, n));
998 }
999
HeapProfile()1000 void HeapProfile() {
1001 char fname[100];
1002 std::snprintf(fname, sizeof(fname), "%s/heap-%04d", FLAGS_db,
1003 ++heap_counter_);
1004 WritableFile* file;
1005 Status s = g_env->NewWritableFile(fname, &file);
1006 if (!s.ok()) {
1007 std::fprintf(stderr, "%s\n", s.ToString().c_str());
1008 return;
1009 }
1010 bool ok = port::GetHeapProfile(WriteToFile, file);
1011 delete file;
1012 if (!ok) {
1013 std::fprintf(stderr, "heap profiling not supported\n");
1014 g_env->RemoveFile(fname);
1015 }
1016 }
1017 };
1018
1019 } // namespace leveldb
1020
main(int argc,char ** argv)1021 int main(int argc, char** argv) {
1022 FLAGS_write_buffer_size = leveldb::Options().write_buffer_size;
1023 FLAGS_max_file_size = leveldb::Options().max_file_size;
1024 FLAGS_block_size = leveldb::Options().block_size;
1025 FLAGS_open_files = leveldb::Options().max_open_files;
1026 std::string default_db_path;
1027
1028 for (int i = 1; i < argc; i++) {
1029 double d;
1030 int n;
1031 char junk;
1032 if (leveldb::Slice(argv[i]).starts_with("--benchmarks=")) {
1033 FLAGS_benchmarks = argv[i] + strlen("--benchmarks=");
1034 } else if (sscanf(argv[i], "--compression_ratio=%lf%c", &d, &junk) == 1) {
1035 FLAGS_compression_ratio = d;
1036 } else if (sscanf(argv[i], "--histogram=%d%c", &n, &junk) == 1 &&
1037 (n == 0 || n == 1)) {
1038 FLAGS_histogram = n;
1039 } else if (sscanf(argv[i], "--comparisons=%d%c", &n, &junk) == 1 &&
1040 (n == 0 || n == 1)) {
1041 FLAGS_comparisons = n;
1042 } else if (sscanf(argv[i], "--use_existing_db=%d%c", &n, &junk) == 1 &&
1043 (n == 0 || n == 1)) {
1044 FLAGS_use_existing_db = n;
1045 } else if (sscanf(argv[i], "--reuse_logs=%d%c", &n, &junk) == 1 &&
1046 (n == 0 || n == 1)) {
1047 FLAGS_reuse_logs = n;
1048 } else if (sscanf(argv[i], "--num=%d%c", &n, &junk) == 1) {
1049 FLAGS_num = n;
1050 } else if (sscanf(argv[i], "--reads=%d%c", &n, &junk) == 1) {
1051 FLAGS_reads = n;
1052 } else if (sscanf(argv[i], "--threads=%d%c", &n, &junk) == 1) {
1053 FLAGS_threads = n;
1054 } else if (sscanf(argv[i], "--value_size=%d%c", &n, &junk) == 1) {
1055 FLAGS_value_size = n;
1056 } else if (sscanf(argv[i], "--write_buffer_size=%d%c", &n, &junk) == 1) {
1057 FLAGS_write_buffer_size = n;
1058 } else if (sscanf(argv[i], "--max_file_size=%d%c", &n, &junk) == 1) {
1059 FLAGS_max_file_size = n;
1060 } else if (sscanf(argv[i], "--block_size=%d%c", &n, &junk) == 1) {
1061 FLAGS_block_size = n;
1062 } else if (sscanf(argv[i], "--key_prefix=%d%c", &n, &junk) == 1) {
1063 FLAGS_key_prefix = n;
1064 } else if (sscanf(argv[i], "--cache_size=%d%c", &n, &junk) == 1) {
1065 FLAGS_cache_size = n;
1066 } else if (sscanf(argv[i], "--bloom_bits=%d%c", &n, &junk) == 1) {
1067 FLAGS_bloom_bits = n;
1068 } else if (sscanf(argv[i], "--open_files=%d%c", &n, &junk) == 1) {
1069 FLAGS_open_files = n;
1070 } else if (strncmp(argv[i], "--db=", 5) == 0) {
1071 FLAGS_db = argv[i] + 5;
1072 } else {
1073 std::fprintf(stderr, "Invalid flag '%s'\n", argv[i]);
1074 std::exit(1);
1075 }
1076 }
1077
1078 leveldb::g_env = leveldb::Env::Default();
1079
1080 // Choose a location for the test database if none given with --db=<path>
1081 if (FLAGS_db == nullptr) {
1082 leveldb::g_env->GetTestDirectory(&default_db_path);
1083 default_db_path += "/dbbench";
1084 FLAGS_db = default_db_path.c_str();
1085 }
1086
1087 leveldb::Benchmark benchmark;
1088 benchmark.Run();
1089 return 0;
1090 }
1091