1 // Copyright 2020 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #define PW_LOG_MODULE_NAME "KVS"
16 #define PW_LOG_LEVEL PW_KVS_LOG_LEVEL
17
18 #include "pw_kvs/key_value_store.h"
19
20 #include <algorithm>
21 #include <cinttypes>
22 #include <cstring>
23 #include <type_traits>
24
25 #include "pw_assert/check.h"
26 #include "pw_kvs_private/config.h"
27 #include "pw_log/log.h"
28 #include "pw_status/try.h"
29
30 namespace pw::kvs {
31 namespace {
32
33 using std::byte;
34
InvalidKey(std::string_view key)35 constexpr bool InvalidKey(std::string_view key) {
36 return key.empty() || (key.size() > internal::Entry::kMaxKeyLength);
37 }
38
39 } // namespace
40
KeyValueStore(FlashPartition * partition,span<const EntryFormat> formats,const Options & options,size_t redundancy,Vector<SectorDescriptor> & sector_descriptor_list,const SectorDescriptor ** temp_sectors_to_skip,Vector<KeyDescriptor> & key_descriptor_list,Address * addresses)41 KeyValueStore::KeyValueStore(FlashPartition* partition,
42 span<const EntryFormat> formats,
43 const Options& options,
44 size_t redundancy,
45 Vector<SectorDescriptor>& sector_descriptor_list,
46 const SectorDescriptor** temp_sectors_to_skip,
47 Vector<KeyDescriptor>& key_descriptor_list,
48 Address* addresses)
49 : partition_(*partition),
50 formats_(formats),
51 sectors_(sector_descriptor_list, *partition, temp_sectors_to_skip),
52 entry_cache_(key_descriptor_list, addresses, redundancy),
53 options_(options),
54 initialized_(InitializationState::kNotInitialized),
55 error_detected_(false),
56 internal_stats_({}),
57 last_transaction_id_(0) {}
58
Init()59 Status KeyValueStore::Init() {
60 initialized_ = InitializationState::kNotInitialized;
61 error_detected_ = false;
62 last_transaction_id_ = 0;
63
64 PW_LOG_INFO("Initializing key value store");
65 if (partition_.sector_count() > sectors_.max_size()) {
66 PW_LOG_ERROR(
67 "KVS init failed: kMaxUsableSectors (=%u) must be at least as "
68 "large as the number of sectors in the flash partition (=%u)",
69 unsigned(sectors_.max_size()),
70 unsigned(partition_.sector_count()));
71 return Status::FailedPrecondition();
72 }
73
74 if (partition_.sector_count() < 2) {
75 PW_LOG_ERROR(
76 "KVS init failed: FlashParition sector count (=%u) must be at 2. KVS "
77 "requires at least 1 working sector + 1 free/reserved sector",
78 unsigned(partition_.sector_count()));
79 return Status::FailedPrecondition();
80 }
81
82 const size_t sector_size_bytes = partition_.sector_size_bytes();
83
84 // TODO(davidrogers): investigate doing this as a static assert/compile-time
85 // check.
86 if (sector_size_bytes > SectorDescriptor::max_sector_size()) {
87 PW_LOG_ERROR(
88 "KVS init failed: sector_size_bytes (=%u) is greater than maximum "
89 "allowed sector size (=%u)",
90 unsigned(sector_size_bytes),
91 unsigned(SectorDescriptor::max_sector_size()));
92 return Status::FailedPrecondition();
93 }
94
95 Status metadata_result = InitializeMetadata();
96
97 if (!error_detected_) {
98 initialized_ = InitializationState::kReady;
99 } else {
100 initialized_ = InitializationState::kNeedsMaintenance;
101
102 if (options_.recovery != ErrorRecovery::kManual) {
103 size_t pre_fix_redundancy_errors =
104 internal_stats_.missing_redundant_entries_recovered;
105 Status recovery_status = FixErrors();
106
107 if (recovery_status.ok()) {
108 if (metadata_result.IsOutOfRange()) {
109 internal_stats_.missing_redundant_entries_recovered =
110 pre_fix_redundancy_errors;
111 PW_LOG_INFO("KVS init: Redundancy level successfully updated");
112 } else {
113 PW_LOG_WARN("KVS init: Corruption detected and fully repaired");
114 }
115 initialized_ = InitializationState::kReady;
116 } else if (recovery_status.IsResourceExhausted()) {
117 PW_LOG_WARN("KVS init: Unable to maintain required free sector");
118 } else {
119 PW_LOG_WARN("KVS init: Corruption detected and unable repair");
120 }
121 } else {
122 PW_LOG_WARN(
123 "KVS init: Corruption detected, no repair attempted due to options");
124 }
125 }
126
127 PW_LOG_INFO(
128 "KeyValueStore init complete: active keys %u, deleted keys %u, sectors "
129 "%u, logical sector size %u bytes",
130 unsigned(size()),
131 unsigned(entry_cache_.total_entries() - size()),
132 unsigned(sectors_.size()),
133 unsigned(partition_.sector_size_bytes()));
134
135 // Report any corruption was not repaired.
136 if (error_detected_) {
137 PW_LOG_WARN(
138 "KVS init: Corruption found but not repaired, KVS unavailable until "
139 "successful maintenance.");
140 return Status::DataLoss();
141 }
142
143 return OkStatus();
144 }
145
InitializeMetadata()146 Status KeyValueStore::InitializeMetadata() {
147 const size_t sector_size_bytes = partition_.sector_size_bytes();
148
149 sectors_.Reset();
150 entry_cache_.Reset();
151
152 PW_LOG_DEBUG("First pass: Read all entries from all sectors");
153 Address sector_address = 0;
154
155 size_t total_corrupt_bytes = 0;
156 size_t corrupt_entries = 0;
157 bool empty_sector_found = false;
158 size_t entry_copies_missing = 0;
159
160 for (SectorDescriptor& sector : sectors_) {
161 Address entry_address = sector_address;
162
163 size_t sector_corrupt_bytes = 0;
164
165 for (int num_entries_in_sector = 0; true; num_entries_in_sector++) {
166 PW_LOG_DEBUG("Load entry: sector=%u, entry#=%d, address=%u",
167 unsigned(sector_address),
168 num_entries_in_sector,
169 unsigned(entry_address));
170
171 if (!sectors_.AddressInSector(sector, entry_address)) {
172 PW_LOG_DEBUG("Fell off end of sector; moving to the next sector");
173 break;
174 }
175
176 Address next_entry_address;
177 Status status = LoadEntry(entry_address, &next_entry_address);
178 if (status.IsNotFound()) {
179 PW_LOG_DEBUG(
180 "Hit un-written data in sector; moving to the next sector");
181 break;
182 } else if (!status.ok()) {
183 // The entry could not be read, indicating likely data corruption within
184 // the sector. Try to scan the remainder of the sector for other
185 // entries.
186
187 error_detected_ = true;
188 corrupt_entries++;
189
190 status = ScanForEntry(sector,
191 entry_address + Entry::kMinAlignmentBytes,
192 &next_entry_address);
193 if (!status.ok()) {
194 // No further entries in this sector. Mark the remaining bytes in the
195 // sector as corrupt (since we can't reliably know the size of the
196 // corrupt entry).
197 sector_corrupt_bytes +=
198 sector_size_bytes - (entry_address - sector_address);
199 break;
200 }
201
202 sector_corrupt_bytes += next_entry_address - entry_address;
203 }
204
205 // Entry loaded successfully; so get ready to load the next one.
206 entry_address = next_entry_address;
207
208 // Update of the number of writable bytes in this sector.
209 sector.set_writable_bytes(sector_size_bytes -
210 (entry_address - sector_address));
211 }
212
213 if (sector_corrupt_bytes > 0) {
214 // If the sector contains corrupt data, prevent any further entries from
215 // being written to it by indicating that it has no space. This should
216 // also make it a decent GC candidate. Valid keys in the sector are still
217 // readable as normal.
218 sector.mark_corrupt();
219 error_detected_ = true;
220
221 PW_LOG_WARN("Sector %u contains %uB of corrupt data",
222 sectors_.Index(sector),
223 unsigned(sector_corrupt_bytes));
224 }
225
226 if (sector.Empty(sector_size_bytes)) {
227 empty_sector_found = true;
228 }
229 sector_address += sector_size_bytes;
230 total_corrupt_bytes += sector_corrupt_bytes;
231 }
232
233 PW_LOG_DEBUG("Second pass: Count valid bytes in each sector");
234 Address newest_key = 0;
235
236 // For every valid entry, for each address, count the valid bytes in that
237 // sector. If the address fails to read, remove the address and mark the
238 // sector as corrupt. Track which entry has the newest transaction ID for
239 // initializing last_new_sector_.
240 for (EntryMetadata& metadata : entry_cache_) {
241 if (metadata.addresses().size() < redundancy()) {
242 PW_LOG_DEBUG("std::string_view 0x%08x missing copies, has %u, needs %u",
243 unsigned(metadata.hash()),
244 unsigned(metadata.addresses().size()),
245 unsigned(redundancy()));
246 entry_copies_missing++;
247 }
248 size_t index = 0;
249 while (index < metadata.addresses().size()) {
250 Address address = metadata.addresses()[index];
251 Entry entry;
252
253 Status read_result = Entry::Read(partition_, address, formats_, &entry);
254
255 SectorDescriptor& sector = sectors_.FromAddress(address);
256
257 if (read_result.ok()) {
258 sector.AddValidBytes(entry.size());
259 index++;
260 } else {
261 corrupt_entries++;
262 total_corrupt_bytes += sector.writable_bytes();
263 error_detected_ = true;
264 sector.mark_corrupt();
265
266 // Remove the bad address and stay at this index. The removal
267 // replaces out the removed address with the back address so
268 // this index needs to be rechecked with the new address.
269 metadata.RemoveAddress(address);
270 }
271 }
272
273 if (metadata.IsNewerThan(last_transaction_id_)) {
274 last_transaction_id_ = metadata.transaction_id();
275 newest_key = metadata.addresses().back();
276 }
277 }
278
279 sectors_.set_last_new_sector(newest_key);
280
281 if (!empty_sector_found) {
282 PW_LOG_DEBUG("No empty sector found");
283 error_detected_ = true;
284 }
285
286 if (entry_copies_missing > 0) {
287 bool other_errors = error_detected_;
288 error_detected_ = true;
289
290 if (!other_errors && entry_copies_missing == entry_cache_.total_entries()) {
291 PW_LOG_INFO(
292 "KVS configuration changed to redundancy of %u total copies per key",
293 unsigned(redundancy()));
294 return Status::OutOfRange();
295 }
296 }
297
298 if (error_detected_) {
299 PW_LOG_WARN(
300 "Corruption detected. Found %u corrupt bytes, %u corrupt entries, "
301 "and %u keys missing redundant copies.",
302 unsigned(total_corrupt_bytes),
303 unsigned(corrupt_entries),
304 unsigned(entry_copies_missing));
305 return Status::FailedPrecondition();
306 }
307 return OkStatus();
308 }
309
GetStorageStats() const310 KeyValueStore::StorageStats KeyValueStore::GetStorageStats() const {
311 StorageStats stats{};
312 const size_t sector_size = partition_.sector_size_bytes();
313 bool found_empty_sector = false;
314 stats.sector_erase_count = internal_stats_.sector_erase_count;
315 stats.corrupt_sectors_recovered = internal_stats_.corrupt_sectors_recovered;
316 stats.missing_redundant_entries_recovered =
317 internal_stats_.missing_redundant_entries_recovered;
318
319 for (const SectorDescriptor& sector : sectors_) {
320 stats.in_use_bytes += sector.valid_bytes();
321 stats.reclaimable_bytes += sector.RecoverableBytes(sector_size);
322
323 if (!found_empty_sector && sector.Empty(sector_size)) {
324 // The KVS tries to always keep an empty sector for GC, so don't count
325 // the first empty sector seen as writable space. However, a free sector
326 // cannot always be assumed to exist; if a GC operation fails, all sectors
327 // may be partially written, in which case the space reported might be
328 // inaccurate.
329 found_empty_sector = true;
330 continue;
331 }
332
333 stats.writable_bytes += sector.writable_bytes();
334 }
335
336 return stats;
337 }
338
339 // Check KVS for any error conditions. Primarily intended for test and
340 // internal use.
CheckForErrors()341 bool KeyValueStore::CheckForErrors() {
342 // Check for corrupted sectors
343 for (SectorDescriptor& sector : sectors_) {
344 if (sector.corrupt()) {
345 error_detected_ = true;
346 return error_detected();
347 }
348 }
349
350 // Check for missing redundancy.
351 if (redundancy() > 1) {
352 for (const EntryMetadata& metadata : entry_cache_) {
353 if (metadata.addresses().size() < redundancy()) {
354 error_detected_ = true;
355 return error_detected();
356 }
357 }
358 }
359
360 return error_detected();
361 }
362
LoadEntry(Address entry_address,Address * next_entry_address)363 Status KeyValueStore::LoadEntry(Address entry_address,
364 Address* next_entry_address) {
365 Entry entry;
366 PW_TRY(Entry::Read(partition_, entry_address, formats_, &entry));
367
368 // Read the key from flash & validate the entry (which reads the value).
369 Entry::KeyBuffer key_buffer;
370 PW_TRY_ASSIGN(size_t key_length, entry.ReadKey(key_buffer));
371 const std::string_view key(key_buffer.data(), key_length);
372
373 PW_TRY(entry.VerifyChecksumInFlash());
374
375 // A valid entry was found, so update the next entry address before doing any
376 // of the checks that happen in AddNewOrUpdateExisting.
377 *next_entry_address = entry.next_address();
378 return entry_cache_.AddNewOrUpdateExisting(
379 entry.descriptor(key), entry.address(), partition_.sector_size_bytes());
380 }
381
382 // Scans flash memory within a sector to find a KVS entry magic.
ScanForEntry(const SectorDescriptor & sector,Address start_address,Address * next_entry_address)383 Status KeyValueStore::ScanForEntry(const SectorDescriptor& sector,
384 Address start_address,
385 Address* next_entry_address) {
386 PW_LOG_DEBUG("Scanning sector %u for entries starting from address %u",
387 sectors_.Index(sector),
388 unsigned(start_address));
389
390 // Entries must start at addresses which are aligned on a multiple of
391 // Entry::kMinAlignmentBytes. However, that multiple can vary between entries.
392 // When scanning, we don't have an entry to tell us what the current alignment
393 // is, so the minimum alignment is used to be exhaustive.
394 for (Address address = AlignUp(start_address, Entry::kMinAlignmentBytes);
395 sectors_.AddressInSector(sector, address);
396 address += Entry::kMinAlignmentBytes) {
397 uint32_t magic;
398 StatusWithSize read_result =
399 partition_.Read(address, as_writable_bytes(span(&magic, 1)));
400 if (!read_result.ok()) {
401 continue;
402 }
403 if (formats_.KnownMagic(magic)) {
404 PW_LOG_DEBUG("Found entry magic at address %u", unsigned(address));
405 *next_entry_address = address;
406 return OkStatus();
407 }
408 }
409
410 return Status::NotFound();
411 }
412
413 #if PW_KVS_REMOVE_DELETED_KEYS_IN_HEAVY_MAINTENANCE
414
RemoveDeletedKeyEntries()415 Status KeyValueStore::RemoveDeletedKeyEntries() {
416 for (internal::EntryCache::iterator it = entry_cache_.begin();
417 it != entry_cache_.end();
418 ++it) {
419 EntryMetadata& entry_metadata = *it;
420
421 // The iterator we are given back from RemoveEntry could also be deleted,
422 // so loop until we find one that isn't deleted.
423 while (entry_metadata.state() == EntryState::kDeleted) {
424 // Read the original entry to get the size for sector accounting purposes.
425 Entry entry;
426 PW_TRY(ReadEntry(entry_metadata, entry));
427
428 for (Address address : entry_metadata.addresses()) {
429 sectors_.FromAddress(address).RemoveValidBytes(entry.size());
430 }
431
432 it = entry_cache_.RemoveEntry(it);
433
434 if (it == entry_cache_.end()) {
435 return OkStatus(); // new iterator is the end, bail
436 }
437
438 entry_metadata = *it; // set entry_metadata to check for deletion again
439 }
440 }
441
442 return OkStatus();
443 }
444
445 #endif // PW_KVS_REMOVE_DELETED_KEYS_IN_HEAVY_MAINTENANCE
446
Get(std::string_view key,span<byte> value_buffer,size_t offset_bytes) const447 StatusWithSize KeyValueStore::Get(std::string_view key,
448 span<byte> value_buffer,
449 size_t offset_bytes) const {
450 PW_TRY_WITH_SIZE(CheckReadOperation(key));
451
452 EntryMetadata metadata;
453 PW_TRY_WITH_SIZE(FindExisting(key, &metadata));
454
455 return Get(key, metadata, value_buffer, offset_bytes);
456 }
457
PutBytes(std::string_view key,span<const byte> value)458 Status KeyValueStore::PutBytes(std::string_view key, span<const byte> value) {
459 PW_TRY(CheckWriteOperation(key));
460 PW_LOG_DEBUG("Writing key/value; key length=%u, value length=%u",
461 unsigned(key.size()),
462 unsigned(value.size()));
463
464 if (Entry::size(partition_, key, value) > partition_.sector_size_bytes()) {
465 PW_LOG_DEBUG("%u B value with %u B key cannot fit in one sector",
466 unsigned(value.size()),
467 unsigned(key.size()));
468 return Status::InvalidArgument();
469 }
470
471 EntryMetadata metadata;
472 Status status = FindEntry(key, &metadata);
473
474 if (status.ok()) {
475 // TODO(davidrogers): figure out logging how to support multiple addresses.
476 PW_LOG_DEBUG("Overwriting entry for key 0x%08x in %u sectors including %u",
477 unsigned(metadata.hash()),
478 unsigned(metadata.addresses().size()),
479 sectors_.Index(metadata.first_address()));
480 return WriteEntryForExistingKey(metadata, EntryState::kValid, key, value);
481 }
482
483 if (status.IsNotFound()) {
484 return WriteEntryForNewKey(key, value);
485 }
486
487 return status;
488 }
489
Delete(std::string_view key)490 Status KeyValueStore::Delete(std::string_view key) {
491 PW_TRY(CheckWriteOperation(key));
492
493 EntryMetadata metadata;
494 PW_TRY(FindExisting(key, &metadata));
495
496 // TODO(davidrogers): figure out logging how to support multiple addresses.
497 PW_LOG_DEBUG("Writing tombstone for key 0x%08x in %u sectors including %u",
498 unsigned(metadata.hash()),
499 unsigned(metadata.addresses().size()),
500 sectors_.Index(metadata.first_address()));
501 return WriteEntryForExistingKey(metadata, EntryState::kDeleted, key, {});
502 }
503
ReadKey()504 void KeyValueStore::Item::ReadKey() {
505 key_buffer_.fill('\0');
506
507 Entry entry;
508 if (kvs_.ReadEntry(*iterator_, entry).ok()) {
509 entry.ReadKey(key_buffer_)
510 .IgnoreError(); // TODO: b/242598609 - Handle Status properly
511 }
512 }
513
operator ++()514 KeyValueStore::iterator& KeyValueStore::iterator::operator++() {
515 // Skip to the next entry that is valid (not deleted).
516 while (++item_.iterator_ != item_.kvs_.entry_cache_.end() &&
517 item_.iterator_->state() != EntryState::kValid) {
518 }
519 return *this;
520 }
521
begin() const522 KeyValueStore::iterator KeyValueStore::begin() const {
523 internal::EntryCache::const_iterator cache_iterator = entry_cache_.begin();
524 // Skip over any deleted entries at the start of the descriptor list.
525 while (cache_iterator != entry_cache_.end() &&
526 cache_iterator->state() != EntryState::kValid) {
527 ++cache_iterator;
528 }
529 return iterator(*this, cache_iterator);
530 }
531
ValueSize(std::string_view key) const532 StatusWithSize KeyValueStore::ValueSize(std::string_view key) const {
533 PW_TRY_WITH_SIZE(CheckReadOperation(key));
534
535 EntryMetadata metadata;
536 PW_TRY_WITH_SIZE(FindExisting(key, &metadata));
537
538 return ValueSize(metadata);
539 }
540
ReadEntry(const EntryMetadata & metadata,Entry & entry) const541 Status KeyValueStore::ReadEntry(const EntryMetadata& metadata,
542 Entry& entry) const {
543 // Try to read an entry
544 Status read_result = Status::DataLoss();
545 for (Address address : metadata.addresses()) {
546 read_result = Entry::Read(partition_, address, formats_, &entry);
547 if (read_result.ok()) {
548 return read_result;
549 }
550
551 // Found a bad address. Set the sector as corrupt.
552 error_detected_ = true;
553 sectors_.FromAddress(address).mark_corrupt();
554 }
555
556 PW_LOG_ERROR("No valid entries for key. Data has been lost!");
557 return read_result;
558 }
559
FindEntry(std::string_view key,EntryMetadata * metadata_out) const560 Status KeyValueStore::FindEntry(std::string_view key,
561 EntryMetadata* metadata_out) const {
562 StatusWithSize find_result =
563 entry_cache_.Find(partition_, sectors_, formats_, key, metadata_out);
564
565 if (find_result.size() > 0u) {
566 error_detected_ = true;
567 }
568 return find_result.status();
569 }
570
FindExisting(std::string_view key,EntryMetadata * metadata_out) const571 Status KeyValueStore::FindExisting(std::string_view key,
572 EntryMetadata* metadata_out) const {
573 Status status = FindEntry(key, metadata_out);
574
575 // If the key's hash collides with an existing key or if the key is deleted,
576 // treat it as if it is not in the KVS.
577 if (status.IsAlreadyExists() ||
578 (status.ok() && metadata_out->state() == EntryState::kDeleted)) {
579 return Status::NotFound();
580 }
581 return status;
582 }
583
Get(std::string_view key,const EntryMetadata & metadata,span<std::byte> value_buffer,size_t offset_bytes) const584 StatusWithSize KeyValueStore::Get(std::string_view key,
585 const EntryMetadata& metadata,
586 span<std::byte> value_buffer,
587 size_t offset_bytes) const {
588 Entry entry;
589
590 PW_TRY_WITH_SIZE(ReadEntry(metadata, entry));
591
592 StatusWithSize result = entry.ReadValue(value_buffer, offset_bytes);
593 if (result.ok() && options_.verify_on_read && offset_bytes == 0u) {
594 Status verify_result =
595 entry.VerifyChecksum(key, value_buffer.first(result.size()));
596 if (!verify_result.ok()) {
597 std::memset(value_buffer.data(), 0, result.size());
598 return StatusWithSize(verify_result, 0);
599 }
600
601 return StatusWithSize(verify_result, result.size());
602 }
603 return result;
604 }
605
FixedSizeGet(std::string_view key,void * value,size_t size_bytes) const606 Status KeyValueStore::FixedSizeGet(std::string_view key,
607 void* value,
608 size_t size_bytes) const {
609 PW_TRY(CheckWriteOperation(key));
610
611 EntryMetadata metadata;
612 PW_TRY(FindExisting(key, &metadata));
613
614 return FixedSizeGet(key, metadata, value, size_bytes);
615 }
616
FixedSizeGet(std::string_view key,const EntryMetadata & metadata,void * value,size_t size_bytes) const617 Status KeyValueStore::FixedSizeGet(std::string_view key,
618 const EntryMetadata& metadata,
619 void* value,
620 size_t size_bytes) const {
621 // Ensure that the size of the stored value matches the size of the type.
622 // Otherwise, report error. This check avoids potential memory corruption.
623 PW_TRY_ASSIGN(const size_t actual_size, ValueSize(metadata));
624
625 if (actual_size != size_bytes) {
626 PW_LOG_DEBUG("Requested %u B read, but value is %u B",
627 unsigned(size_bytes),
628 unsigned(actual_size));
629 return Status::InvalidArgument();
630 }
631
632 StatusWithSize result =
633 Get(key, metadata, span(static_cast<byte*>(value), size_bytes), 0);
634
635 return result.status();
636 }
637
ValueSize(const EntryMetadata & metadata) const638 StatusWithSize KeyValueStore::ValueSize(const EntryMetadata& metadata) const {
639 Entry entry;
640 PW_TRY_WITH_SIZE(ReadEntry(metadata, entry));
641
642 return StatusWithSize(entry.value_size());
643 }
644
CheckWriteOperation(std::string_view key) const645 Status KeyValueStore::CheckWriteOperation(std::string_view key) const {
646 if (InvalidKey(key)) {
647 return Status::InvalidArgument();
648 }
649
650 // For normal write operation the KVS must be fully ready.
651 if (!initialized()) {
652 return Status::FailedPrecondition();
653 }
654 return OkStatus();
655 }
656
CheckReadOperation(std::string_view key) const657 Status KeyValueStore::CheckReadOperation(std::string_view key) const {
658 if (InvalidKey(key)) {
659 return Status::InvalidArgument();
660 }
661
662 // Operations that are explicitly read-only can be done after init() has been
663 // called but not fully ready (when needing maintenance).
664 if (initialized_ == InitializationState::kNotInitialized) {
665 return Status::FailedPrecondition();
666 }
667 return OkStatus();
668 }
669
WriteEntryForExistingKey(EntryMetadata & metadata,EntryState new_state,std::string_view key,span<const byte> value)670 Status KeyValueStore::WriteEntryForExistingKey(EntryMetadata& metadata,
671 EntryState new_state,
672 std::string_view key,
673 span<const byte> value) {
674 // Read the original entry to get the size for sector accounting purposes.
675 Entry entry;
676 PW_TRY(ReadEntry(metadata, entry));
677
678 return WriteEntry(key, value, new_state, &metadata, &entry);
679 }
680
WriteEntryForNewKey(std::string_view key,span<const byte> value)681 Status KeyValueStore::WriteEntryForNewKey(std::string_view key,
682 span<const byte> value) {
683 // If there is no room in the cache for a new entry, it is possible some cache
684 // entries could be freed by removing deleted keys. If deleted key removal is
685 // enabled and the KVS is configured to make all possible writes succeed,
686 // attempt heavy maintenance now.
687 #if PW_KVS_REMOVE_DELETED_KEYS_IN_HEAVY_MAINTENANCE
688 if (options_.gc_on_write == GargbageCollectOnWrite::kAsManySectorsNeeded &&
689 entry_cache_.full()) {
690 Status maintenance_status = HeavyMaintenance();
691 if (!maintenance_status.ok()) {
692 PW_LOG_WARN("KVS Maintenance failed for write: %s",
693 maintenance_status.str());
694 return maintenance_status;
695 }
696 }
697 #endif // PW_KVS_REMOVE_DELETED_KEYS_IN_HEAVY_MAINTENANCE
698
699 if (entry_cache_.full()) {
700 PW_LOG_WARN(
701 "KVS full: trying to store a new entry, but can't. Have %u entries",
702 unsigned(entry_cache_.total_entries()));
703 return Status::ResourceExhausted();
704 }
705
706 return WriteEntry(key, value, EntryState::kValid);
707 }
708
WriteEntry(std::string_view key,span<const byte> value,EntryState new_state,EntryMetadata * prior_metadata,const Entry * prior_entry)709 Status KeyValueStore::WriteEntry(std::string_view key,
710 span<const byte> value,
711 EntryState new_state,
712 EntryMetadata* prior_metadata,
713 const Entry* prior_entry) {
714 // If new entry and prior entry have matching value size, state, and checksum,
715 // check if the values match. Directly compare the prior and new values
716 // because the checksum can not be depended on to establish equality, it can
717 // only be depended on to establish inequality.
718 if (prior_entry != nullptr && prior_entry->value_size() == value.size() &&
719 prior_metadata->state() == new_state &&
720 prior_entry->ValueMatches(value).ok()) {
721 // The new value matches the prior value, don't need to write anything. Just
722 // keep the existing entry.
723 PW_LOG_DEBUG("Write for key 0x%08x with matching value skipped",
724 unsigned(prior_metadata->hash()));
725 return OkStatus();
726 }
727
728 // List of addresses for sectors with space for this entry.
729 Address* reserved_addresses = entry_cache_.TempReservedAddressesForWrite();
730
731 // Find addresses to write the entry to. This may involve garbage collecting
732 // one or more sectors.
733 const size_t entry_size = Entry::size(partition_, key, value);
734 PW_TRY(GetAddressesForWrite(reserved_addresses, entry_size));
735
736 // Write the entry at the first address that was found.
737 Entry entry = CreateEntry(reserved_addresses[0], key, value, new_state);
738 PW_TRY(AppendEntry(entry, key, value));
739
740 // After writing the first entry successfully, update the key descriptors.
741 // Once a single new the entry is written, the old entries are invalidated.
742 size_t prior_size = prior_entry != nullptr ? prior_entry->size() : 0;
743 EntryMetadata new_metadata =
744 CreateOrUpdateKeyDescriptor(entry, key, prior_metadata, prior_size);
745
746 // Write the additional copies of the entry, if redundancy is greater than 1.
747 for (size_t i = 1; i < redundancy(); ++i) {
748 entry.set_address(reserved_addresses[i]);
749 PW_TRY(AppendEntry(entry, key, value));
750 new_metadata.AddNewAddress(reserved_addresses[i]);
751 }
752 return OkStatus();
753 }
754
CreateOrUpdateKeyDescriptor(const Entry & entry,std::string_view key,EntryMetadata * prior_metadata,size_t prior_size)755 KeyValueStore::EntryMetadata KeyValueStore::CreateOrUpdateKeyDescriptor(
756 const Entry& entry,
757 std::string_view key,
758 EntryMetadata* prior_metadata,
759 size_t prior_size) {
760 // If there is no prior descriptor, create a new one.
761 if (prior_metadata == nullptr) {
762 return entry_cache_.AddNew(entry.descriptor(key), entry.address());
763 }
764
765 return UpdateKeyDescriptor(
766 entry, entry.address(), prior_metadata, prior_size);
767 }
768
UpdateKeyDescriptor(const Entry & entry,Address new_address,EntryMetadata * prior_metadata,size_t prior_size)769 KeyValueStore::EntryMetadata KeyValueStore::UpdateKeyDescriptor(
770 const Entry& entry,
771 Address new_address,
772 EntryMetadata* prior_metadata,
773 size_t prior_size) {
774 // Remove valid bytes for the old entry and its copies, which are now stale.
775 for (Address address : prior_metadata->addresses()) {
776 sectors_.FromAddress(address).RemoveValidBytes(prior_size);
777 }
778
779 prior_metadata->Reset(entry.descriptor(prior_metadata->hash()), new_address);
780 return *prior_metadata;
781 }
782
GetAddressesForWrite(Address * write_addresses,size_t write_size)783 Status KeyValueStore::GetAddressesForWrite(Address* write_addresses,
784 size_t write_size) {
785 for (size_t i = 0; i < redundancy(); i++) {
786 SectorDescriptor* sector;
787 PW_TRY(GetSectorForWrite(§or, write_size, span(write_addresses, i)));
788 write_addresses[i] = sectors_.NextWritableAddress(*sector);
789
790 PW_LOG_DEBUG("Found space for entry in sector %u at address %u",
791 sectors_.Index(sector),
792 unsigned(write_addresses[i]));
793 }
794
795 return OkStatus();
796 }
797
798 // Finds a sector to use for writing a new entry to. Does automatic garbage
799 // collection if needed and allowed.
800 //
801 // OK: Sector found with needed space.
802 // RESOURCE_EXHAUSTED: No sector available with the needed space.
GetSectorForWrite(SectorDescriptor ** sector,size_t entry_size,span<const Address> reserved_addresses)803 Status KeyValueStore::GetSectorForWrite(
804 SectorDescriptor** sector,
805 size_t entry_size,
806 span<const Address> reserved_addresses) {
807 Status result = sectors_.FindSpace(sector, entry_size, reserved_addresses);
808
809 size_t gc_sector_count = 0;
810 bool do_auto_gc = options_.gc_on_write != GargbageCollectOnWrite::kDisabled;
811
812 // Do garbage collection as needed, so long as policy allows.
813 while (result.IsResourceExhausted() && do_auto_gc) {
814 if (options_.gc_on_write == GargbageCollectOnWrite::kOneSector) {
815 // If GC config option is kOneSector clear the flag to not do any more
816 // GC after this try.
817 do_auto_gc = false;
818 }
819 // Garbage collect and then try again to find the best sector.
820 Status gc_status = GarbageCollect(reserved_addresses);
821 if (!gc_status.ok()) {
822 if (gc_status.IsNotFound()) {
823 // Not enough space, and no reclaimable bytes, this KVS is full!
824 return Status::ResourceExhausted();
825 }
826 return gc_status;
827 }
828
829 result = sectors_.FindSpace(sector, entry_size, reserved_addresses);
830
831 gc_sector_count++;
832 // Allow total sectors + 2 number of GC cycles so that once reclaimable
833 // bytes in all the sectors have been reclaimed can try and free up space by
834 // moving entries for keys other than the one being worked on in to sectors
835 // that have copies of the key trying to be written.
836 if (gc_sector_count > (partition_.sector_count() + 2)) {
837 PW_LOG_ERROR("Did more GC sectors than total sectors!!!!");
838 return Status::ResourceExhausted();
839 }
840 }
841
842 if (!result.ok()) {
843 PW_LOG_WARN("Unable to find sector to write %u B", unsigned(entry_size));
844 }
845 return result;
846 }
847
MarkSectorCorruptIfNotOk(Status status,SectorDescriptor * sector)848 Status KeyValueStore::MarkSectorCorruptIfNotOk(Status status,
849 SectorDescriptor* sector) {
850 if (!status.ok()) {
851 PW_LOG_DEBUG(" Sector %u corrupt", sectors_.Index(sector));
852 sector->mark_corrupt();
853 error_detected_ = true;
854 }
855 return status;
856 }
857
AppendEntry(const Entry & entry,std::string_view key,span<const byte> value)858 Status KeyValueStore::AppendEntry(const Entry& entry,
859 std::string_view key,
860 span<const byte> value) {
861 const StatusWithSize result = entry.Write(key, value);
862
863 SectorDescriptor& sector = sectors_.FromAddress(entry.address());
864
865 if (!result.ok()) {
866 PW_LOG_ERROR("Failed to write %u bytes at %#x. %u actually written",
867 unsigned(entry.size()),
868 unsigned(entry.address()),
869 unsigned(result.size()));
870 PW_TRY(MarkSectorCorruptIfNotOk(result.status(), §or));
871 }
872
873 if (options_.verify_on_write) {
874 PW_TRY(MarkSectorCorruptIfNotOk(entry.VerifyChecksumInFlash(), §or));
875 }
876
877 sector.RemoveWritableBytes(result.size());
878 sector.AddValidBytes(result.size());
879 return OkStatus();
880 }
881
CopyEntryToSector(Entry & entry,SectorDescriptor * new_sector,Address new_address)882 StatusWithSize KeyValueStore::CopyEntryToSector(Entry& entry,
883 SectorDescriptor* new_sector,
884 Address new_address) {
885 const StatusWithSize result = entry.Copy(new_address);
886
887 PW_TRY_WITH_SIZE(MarkSectorCorruptIfNotOk(result.status(), new_sector));
888
889 if (options_.verify_on_write) {
890 Entry new_entry;
891 PW_TRY_WITH_SIZE(MarkSectorCorruptIfNotOk(
892 Entry::Read(partition_, new_address, formats_, &new_entry),
893 new_sector));
894 // TODO(davidrogers): add test that catches doing the verify on the old
895 // entry.
896 PW_TRY_WITH_SIZE(MarkSectorCorruptIfNotOk(new_entry.VerifyChecksumInFlash(),
897 new_sector));
898 }
899 // Entry was written successfully; update descriptor's address and the sector
900 // descriptors to reflect the new entry.
901 new_sector->RemoveWritableBytes(result.size());
902 new_sector->AddValidBytes(result.size());
903
904 return result;
905 }
906
RelocateEntry(const EntryMetadata & metadata,KeyValueStore::Address & address,span<const Address> reserved_addresses)907 Status KeyValueStore::RelocateEntry(const EntryMetadata& metadata,
908 KeyValueStore::Address& address,
909 span<const Address> reserved_addresses) {
910 Entry entry;
911 PW_TRY(ReadEntry(metadata, entry));
912
913 // Find a new sector for the entry and write it to the new location. For
914 // relocation the find should not not be a sector already containing the key
915 // but can be the always empty sector, since this is part of the GC process
916 // that will result in a new empty sector. Also find a sector that does not
917 // have reclaimable space (mostly for the full GC, where that would result in
918 // an immediate extra relocation).
919 SectorDescriptor* new_sector;
920
921 PW_TRY(sectors_.FindSpaceDuringGarbageCollection(
922 &new_sector, entry.size(), metadata.addresses(), reserved_addresses));
923
924 Address new_address = sectors_.NextWritableAddress(*new_sector);
925 PW_TRY_ASSIGN(const size_t result_size,
926 CopyEntryToSector(entry, new_sector, new_address));
927 sectors_.FromAddress(address).RemoveValidBytes(result_size);
928 address = new_address;
929
930 return OkStatus();
931 }
932
FullMaintenanceHelper(MaintenanceType maintenance_type)933 Status KeyValueStore::FullMaintenanceHelper(MaintenanceType maintenance_type) {
934 if (initialized_ == InitializationState::kNotInitialized) {
935 return Status::FailedPrecondition();
936 }
937
938 // Full maintenance can be a potentially heavy operation, and should be
939 // relatively infrequent, so log start/end at INFO level.
940 PW_LOG_INFO("Beginning full maintenance");
941 CheckForErrors();
942
943 // Step 1: Repair errors
944 if (error_detected_) {
945 PW_TRY(Repair());
946 }
947
948 // Step 2: Make sure all the entries are on the primary format.
949 StatusWithSize update_status = UpdateEntriesToPrimaryFormat();
950 Status overall_status = update_status.status();
951
952 if (!overall_status.ok()) {
953 PW_LOG_ERROR("Failed to update all entries to the primary format");
954 }
955
956 SectorDescriptor* sector = sectors_.last_new();
957
958 // Calculate number of bytes for the threshold.
959 size_t threshold_bytes =
960 (partition_.size_bytes() * kGcUsageThresholdPercentage) / 100;
961
962 // Is bytes in use over the threshold.
963 StorageStats stats = GetStorageStats();
964 bool over_usage_threshold = stats.in_use_bytes > threshold_bytes;
965 bool heavy = (maintenance_type == MaintenanceType::kHeavy);
966 bool force_gc = heavy || over_usage_threshold || (update_status.size() > 0);
967
968 auto do_garbage_collect_pass = [&]() {
969 // TODO(drempel): look in to making an iterator method for cycling through
970 // sectors starting from last_new_sector_.
971 Status gc_status;
972 for (size_t j = 0; j < sectors_.size(); j++) {
973 sector += 1;
974 if (sector == sectors_.end()) {
975 sector = sectors_.begin();
976 }
977
978 if (sector->RecoverableBytes(partition_.sector_size_bytes()) > 0 &&
979 (force_gc || sector->valid_bytes() == 0)) {
980 gc_status = GarbageCollectSector(*sector, {});
981 if (!gc_status.ok()) {
982 PW_LOG_ERROR("Failed to garbage collect all sectors");
983 break;
984 }
985 }
986 }
987 if (overall_status.ok()) {
988 overall_status = gc_status;
989 }
990 };
991
992 // Step 3: Do full garbage collect pass for all sectors. This will erase all
993 // old/state entries from flash and leave only current/valid entries.
994 do_garbage_collect_pass();
995
996 #if PW_KVS_REMOVE_DELETED_KEYS_IN_HEAVY_MAINTENANCE
997 // Step 4: (if heavy maintenance) garbage collect all the deleted keys.
998 if (heavy) {
999 // If enabled, remove deleted keys from the entry cache, including freeing
1000 // sector bytes used by those keys. This must only be done directly after a
1001 // full garbage collection, otherwise the current deleted entry could be
1002 // garbage collected before the older stale entry producing a window for an
1003 // invalid/corrupted KVS state if there was a power-fault, crash or other
1004 // interruption.
1005 overall_status.Update(RemoveDeletedKeyEntries());
1006
1007 // Do another garbage collect pass that will fully remove the deleted keys
1008 // from flash. Garbage collect will only touch sectors that have something
1009 // to garbage collect, which in this case is only sectors containing deleted
1010 // keys.
1011 do_garbage_collect_pass();
1012 }
1013 #endif // PW_KVS_REMOVE_DELETED_KEYS_IN_HEAVY_MAINTENANCE
1014
1015 if (overall_status.ok()) {
1016 PW_LOG_INFO("Full maintenance complete");
1017 } else {
1018 PW_LOG_ERROR("Full maintenance finished with some errors");
1019 }
1020 return overall_status;
1021 }
1022
PartialMaintenance()1023 Status KeyValueStore::PartialMaintenance() {
1024 if (initialized_ == InitializationState::kNotInitialized) {
1025 return Status::FailedPrecondition();
1026 }
1027
1028 CheckForErrors();
1029 // Do automatic repair, if KVS options allow for it.
1030 if (error_detected_ && options_.recovery != ErrorRecovery::kManual) {
1031 PW_TRY(Repair());
1032 }
1033 return GarbageCollect(span<const Address>());
1034 }
1035
GarbageCollect(span<const Address> reserved_addresses)1036 Status KeyValueStore::GarbageCollect(span<const Address> reserved_addresses) {
1037 PW_LOG_DEBUG("Garbage Collect a single sector");
1038 for ([[maybe_unused]] Address address : reserved_addresses) {
1039 PW_LOG_DEBUG(" Avoid address %u", unsigned(address));
1040 }
1041
1042 // Step 1: Find the sector to garbage collect
1043 SectorDescriptor* sector_to_gc =
1044 sectors_.FindSectorToGarbageCollect(reserved_addresses);
1045
1046 if (sector_to_gc == nullptr) {
1047 // Nothing to GC.
1048 return Status::NotFound();
1049 }
1050
1051 // Step 2: Garbage collect the selected sector.
1052 return GarbageCollectSector(*sector_to_gc, reserved_addresses);
1053 }
1054
RelocateKeyAddressesInSector(SectorDescriptor & sector_to_gc,const EntryMetadata & metadata,span<const Address> reserved_addresses)1055 Status KeyValueStore::RelocateKeyAddressesInSector(
1056 SectorDescriptor& sector_to_gc,
1057 const EntryMetadata& metadata,
1058 span<const Address> reserved_addresses) {
1059 for (FlashPartition::Address& address : metadata.addresses()) {
1060 if (sectors_.AddressInSector(sector_to_gc, address)) {
1061 PW_LOG_DEBUG(" Relocate entry for std::string_view 0x%08" PRIx32
1062 ", sector %u",
1063 metadata.hash(),
1064 sectors_.Index(sectors_.FromAddress(address)));
1065 PW_TRY(RelocateEntry(metadata, address, reserved_addresses));
1066 }
1067 }
1068
1069 return OkStatus();
1070 }
1071
GarbageCollectSector(SectorDescriptor & sector_to_gc,span<const Address> reserved_addresses)1072 Status KeyValueStore::GarbageCollectSector(
1073 SectorDescriptor& sector_to_gc, span<const Address> reserved_addresses) {
1074 PW_LOG_DEBUG(" Garbage Collect sector %u", sectors_.Index(sector_to_gc));
1075
1076 // Step 1: Move any valid entries in the GC sector to other sectors
1077 if (sector_to_gc.valid_bytes() != 0) {
1078 for (EntryMetadata& metadata : entry_cache_) {
1079 PW_TRY(RelocateKeyAddressesInSector(
1080 sector_to_gc, metadata, reserved_addresses));
1081 }
1082 }
1083
1084 if (sector_to_gc.valid_bytes() != 0) {
1085 PW_LOG_ERROR(
1086 " Failed to relocate valid entries from sector being garbage "
1087 "collected, %u valid bytes remain",
1088 unsigned(sector_to_gc.valid_bytes()));
1089 return Status::Internal();
1090 }
1091
1092 // Step 2: Reinitialize the sector
1093 if (!sector_to_gc.Empty(partition_.sector_size_bytes())) {
1094 sector_to_gc.mark_corrupt();
1095 internal_stats_.sector_erase_count++;
1096 PW_TRY(partition_.Erase(sectors_.BaseAddress(sector_to_gc), 1));
1097 sector_to_gc.set_writable_bytes(partition_.sector_size_bytes());
1098 }
1099
1100 PW_LOG_DEBUG(" Garbage Collect sector %u complete",
1101 sectors_.Index(sector_to_gc));
1102 return OkStatus();
1103 }
1104
UpdateEntriesToPrimaryFormat()1105 StatusWithSize KeyValueStore::UpdateEntriesToPrimaryFormat() {
1106 size_t entries_updated = 0;
1107 for (EntryMetadata& prior_metadata : entry_cache_) {
1108 Entry entry;
1109 PW_TRY_WITH_SIZE(ReadEntry(prior_metadata, entry));
1110 if (formats_.primary().magic == entry.magic()) {
1111 // Ignore entries that are already on the primary format.
1112 continue;
1113 }
1114
1115 PW_LOG_DEBUG(
1116 "Updating entry 0x%08x from old format [0x%08x] to new format "
1117 "[0x%08x]",
1118 unsigned(prior_metadata.hash()),
1119 unsigned(entry.magic()),
1120 unsigned(formats_.primary().magic));
1121
1122 entries_updated++;
1123
1124 last_transaction_id_ += 1;
1125 PW_TRY_WITH_SIZE(entry.Update(formats_.primary(), last_transaction_id_));
1126
1127 // List of addresses for sectors with space for this entry.
1128 Address* reserved_addresses = entry_cache_.TempReservedAddressesForWrite();
1129
1130 // Find addresses to write the entry to. This may involve garbage collecting
1131 // one or more sectors.
1132 PW_TRY_WITH_SIZE(GetAddressesForWrite(reserved_addresses, entry.size()));
1133
1134 PW_TRY_WITH_SIZE(
1135 CopyEntryToSector(entry,
1136 §ors_.FromAddress(reserved_addresses[0]),
1137 reserved_addresses[0]));
1138
1139 // After writing the first entry successfully, update the key descriptors.
1140 // Once a single new the entry is written, the old entries are invalidated.
1141 EntryMetadata new_metadata = UpdateKeyDescriptor(
1142 entry, reserved_addresses[0], &prior_metadata, entry.size());
1143
1144 // Write the additional copies of the entry, if redundancy is greater
1145 // than 1.
1146 for (size_t i = 1; i < redundancy(); ++i) {
1147 PW_TRY_WITH_SIZE(
1148 CopyEntryToSector(entry,
1149 §ors_.FromAddress(reserved_addresses[i]),
1150 reserved_addresses[i]));
1151 new_metadata.AddNewAddress(reserved_addresses[i]);
1152 }
1153 }
1154
1155 return StatusWithSize(entries_updated);
1156 }
1157
1158 // Add any missing redundant entries/copies for a key.
AddRedundantEntries(EntryMetadata & metadata)1159 Status KeyValueStore::AddRedundantEntries(EntryMetadata& metadata) {
1160 Entry entry;
1161 PW_TRY(ReadEntry(metadata, entry));
1162 PW_TRY(entry.VerifyChecksumInFlash());
1163
1164 while (metadata.addresses().size() < redundancy()) {
1165 SectorDescriptor* new_sector;
1166 PW_TRY(GetSectorForWrite(&new_sector, entry.size(), metadata.addresses()));
1167
1168 Address new_address = sectors_.NextWritableAddress(*new_sector);
1169 PW_TRY(CopyEntryToSector(entry, new_sector, new_address));
1170
1171 metadata.AddNewAddress(new_address);
1172 }
1173 return OkStatus();
1174 }
1175
RepairCorruptSectors()1176 Status KeyValueStore::RepairCorruptSectors() {
1177 // Try to GC each corrupt sector, even if previous sectors fail. If GC of a
1178 // sector failed on the first pass, then do a second pass, since a later
1179 // sector might have cleared up space or otherwise unblocked the earlier
1180 // failed sector.
1181 Status repair_status = OkStatus();
1182
1183 size_t loop_count = 0;
1184 do {
1185 loop_count++;
1186 // Error of RESOURCE_EXHAUSTED indicates no space found for relocation.
1187 // Reset back to OK for the next pass.
1188 if (repair_status.IsResourceExhausted()) {
1189 repair_status = OkStatus();
1190 }
1191
1192 PW_LOG_DEBUG(" Pass %u", unsigned(loop_count));
1193 for (SectorDescriptor& sector : sectors_) {
1194 if (sector.corrupt()) {
1195 PW_LOG_DEBUG(" Found sector %u with corruption",
1196 sectors_.Index(sector));
1197 Status sector_status = GarbageCollectSector(sector, {});
1198 if (sector_status.ok()) {
1199 internal_stats_.corrupt_sectors_recovered += 1;
1200 } else if (repair_status.ok() || repair_status.IsResourceExhausted()) {
1201 repair_status = sector_status;
1202 }
1203 }
1204 }
1205 PW_LOG_DEBUG(" Pass %u complete", unsigned(loop_count));
1206 } while (!repair_status.ok() && loop_count < 2);
1207
1208 return repair_status;
1209 }
1210
EnsureFreeSectorExists()1211 Status KeyValueStore::EnsureFreeSectorExists() {
1212 Status repair_status = OkStatus();
1213 bool empty_sector_found = false;
1214
1215 PW_LOG_DEBUG(" Find empty sector");
1216 for (SectorDescriptor& sector : sectors_) {
1217 if (sector.Empty(partition_.sector_size_bytes())) {
1218 empty_sector_found = true;
1219 PW_LOG_DEBUG(" Empty sector found");
1220 break;
1221 }
1222 }
1223 if (empty_sector_found == false) {
1224 PW_LOG_DEBUG(" No empty sector found, attempting to GC a free sector");
1225 Status sector_status = GarbageCollect(span<const Address, 0>());
1226 if (repair_status.ok() && !sector_status.ok()) {
1227 PW_LOG_DEBUG(" Unable to free an empty sector");
1228 repair_status = sector_status;
1229 }
1230 }
1231
1232 return repair_status;
1233 }
1234
EnsureEntryRedundancy()1235 Status KeyValueStore::EnsureEntryRedundancy() {
1236 Status repair_status = OkStatus();
1237
1238 if (redundancy() == 1) {
1239 PW_LOG_DEBUG(" Redundancy not in use, nothting to check");
1240 return OkStatus();
1241 }
1242
1243 PW_LOG_DEBUG(
1244 " Write any needed additional duplicate copies of keys to fulfill %u"
1245 " redundancy",
1246 unsigned(redundancy()));
1247 for (EntryMetadata& metadata : entry_cache_) {
1248 if (metadata.addresses().size() >= redundancy()) {
1249 continue;
1250 }
1251
1252 PW_LOG_DEBUG(
1253 " std::string_view with %u of %u copies found, adding missing copies",
1254 unsigned(metadata.addresses().size()),
1255 unsigned(redundancy()));
1256 Status fill_status = AddRedundantEntries(metadata);
1257 if (fill_status.ok()) {
1258 internal_stats_.missing_redundant_entries_recovered += 1;
1259 PW_LOG_DEBUG(" std::string_view missing copies added");
1260 } else {
1261 PW_LOG_DEBUG(" Failed to add key missing copies");
1262 if (repair_status.ok()) {
1263 repair_status = fill_status;
1264 }
1265 }
1266 }
1267
1268 return repair_status;
1269 }
1270
FixErrors()1271 Status KeyValueStore::FixErrors() {
1272 PW_LOG_DEBUG("Fixing KVS errors");
1273
1274 // Step 1: Garbage collect any sectors marked as corrupt.
1275 Status overall_status = RepairCorruptSectors();
1276
1277 // Step 2: Make sure there is at least 1 empty sector. This needs to be a
1278 // seperate check of sectors from step 1, because a found empty sector might
1279 // get written to by a later GC that fails and does not result in a free
1280 // sector.
1281 Status repair_status = EnsureFreeSectorExists();
1282 if (overall_status.ok()) {
1283 overall_status = repair_status;
1284 }
1285
1286 // Step 3: Make sure each stored key has the full number of redundant
1287 // entries.
1288 repair_status = EnsureEntryRedundancy();
1289 if (overall_status.ok()) {
1290 overall_status = repair_status;
1291 }
1292
1293 if (overall_status.ok()) {
1294 error_detected_ = false;
1295 initialized_ = InitializationState::kReady;
1296 }
1297 return overall_status;
1298 }
1299
Repair()1300 Status KeyValueStore::Repair() {
1301 // If errors have been detected, just reinit the KVS metadata. This does a
1302 // full deep error check and any needed repairs. Then repair any errors.
1303 PW_LOG_INFO("Starting KVS repair");
1304
1305 PW_LOG_DEBUG("Reinitialize KVS metadata");
1306 InitializeMetadata()
1307 .IgnoreError(); // TODO: b/242598609 - Handle Status properly
1308
1309 return FixErrors();
1310 }
1311
CreateEntry(Address address,std::string_view key,span<const byte> value,EntryState state)1312 KeyValueStore::Entry KeyValueStore::CreateEntry(Address address,
1313 std::string_view key,
1314 span<const byte> value,
1315 EntryState state) {
1316 // Always bump the transaction ID when creating a new entry.
1317 //
1318 // Burning transaction IDs prevents inconsistencies between flash and memory
1319 // that which could happen if a write succeeds, but for some reason the read
1320 // and verify step fails. Here's how this would happen:
1321 //
1322 // 1. The entry is written but for some reason the flash reports failure OR
1323 // The write succeeds, but the read / verify operation fails.
1324 // 2. The transaction ID is NOT incremented, because of the failure
1325 // 3. (later) A new entry is written, re-using the transaction ID (oops)
1326 //
1327 // By always burning transaction IDs, the above problem can't happen.
1328 last_transaction_id_ += 1;
1329
1330 if (state == EntryState::kDeleted) {
1331 return Entry::Tombstone(
1332 partition_, address, formats_.primary(), key, last_transaction_id_);
1333 }
1334 return Entry::Valid(partition_,
1335 address,
1336 formats_.primary(),
1337 key,
1338 value,
1339 last_transaction_id_);
1340 }
1341
LogDebugInfo() const1342 void KeyValueStore::LogDebugInfo() const {
1343 const size_t sector_size_bytes = partition_.sector_size_bytes();
1344 PW_LOG_DEBUG(
1345 "====================== KEY VALUE STORE DUMP =========================");
1346 PW_LOG_DEBUG(" ");
1347 PW_LOG_DEBUG("Flash partition:");
1348 PW_LOG_DEBUG(" Sector count = %u", unsigned(partition_.sector_count()));
1349 PW_LOG_DEBUG(" Sector max count = %u", unsigned(sectors_.max_size()));
1350 PW_LOG_DEBUG(" Sectors in use = %u", unsigned(sectors_.size()));
1351 PW_LOG_DEBUG(" Sector size = %u", unsigned(sector_size_bytes));
1352 PW_LOG_DEBUG(" Total size = %u", unsigned(partition_.size_bytes()));
1353 PW_LOG_DEBUG(" Alignment = %u",
1354 unsigned(partition_.alignment_bytes()));
1355 PW_LOG_DEBUG(" ");
1356 PW_LOG_DEBUG("std::string_view descriptors:");
1357 PW_LOG_DEBUG(" Entry count = %u",
1358 unsigned(entry_cache_.total_entries()));
1359 PW_LOG_DEBUG(" Max entry count = %u", unsigned(entry_cache_.max_entries()));
1360 PW_LOG_DEBUG(" ");
1361 PW_LOG_DEBUG(" # hash version address address (hex)");
1362 size_t count = 0;
1363 for (const EntryMetadata& metadata : entry_cache_) {
1364 PW_LOG_DEBUG(" |%3zu: | %8zx |%8zu | %8zu | %8zx",
1365 count++,
1366 size_t(metadata.hash()),
1367 size_t(metadata.transaction_id()),
1368 size_t(metadata.first_address()),
1369 size_t(metadata.first_address()));
1370 }
1371 PW_LOG_DEBUG(" ");
1372
1373 PW_LOG_DEBUG("Sector descriptors:");
1374 PW_LOG_DEBUG(" # tail free valid has_space");
1375 for (const SectorDescriptor& sd : sectors_) {
1376 PW_LOG_DEBUG(" |%3u: | %8zu |%8zu | %s",
1377 sectors_.Index(sd),
1378 size_t(sd.writable_bytes()),
1379 sd.valid_bytes(),
1380 sd.writable_bytes() ? "YES" : "");
1381 }
1382 PW_LOG_DEBUG(" ");
1383
1384 // TODO(keir): This should stop logging after some threshold.
1385 // size_t dumped_bytes = 0;
1386 PW_LOG_DEBUG("Sector raw data:");
1387 for (size_t sector_id = 0; sector_id < sectors_.size(); ++sector_id) {
1388 // Read sector data. Yes, this will blow the stack on embedded.
1389 std::array<byte, 500> raw_sector_data; // TODO!!!
1390 [[maybe_unused]] StatusWithSize sws =
1391 partition_.Read(sector_id * sector_size_bytes, raw_sector_data);
1392 PW_LOG_DEBUG("Read: %u bytes", unsigned(sws.size()));
1393
1394 PW_LOG_DEBUG(" base addr offs 0 1 2 3 4 5 6 7");
1395 for (size_t i = 0; i < sector_size_bytes; i += 8) {
1396 PW_LOG_DEBUG(" %3zu %8zx %5zu | %02x %02x %02x %02x %02x %02x %02x %02x",
1397 sector_id,
1398 (sector_id * sector_size_bytes) + i,
1399 i,
1400 static_cast<unsigned int>(raw_sector_data[i + 0]),
1401 static_cast<unsigned int>(raw_sector_data[i + 1]),
1402 static_cast<unsigned int>(raw_sector_data[i + 2]),
1403 static_cast<unsigned int>(raw_sector_data[i + 3]),
1404 static_cast<unsigned int>(raw_sector_data[i + 4]),
1405 static_cast<unsigned int>(raw_sector_data[i + 5]),
1406 static_cast<unsigned int>(raw_sector_data[i + 6]),
1407 static_cast<unsigned int>(raw_sector_data[i + 7]));
1408
1409 // TODO(keir): Fix exit condition.
1410 if (i > 128) {
1411 break;
1412 }
1413 }
1414 PW_LOG_DEBUG(" ");
1415 }
1416
1417 PW_LOG_DEBUG(
1418 "////////////////////// KEY VALUE STORE DUMP END /////////////////////");
1419 }
1420
LogSectors() const1421 void KeyValueStore::LogSectors() const {
1422 PW_LOG_DEBUG("Sector descriptors: count %u", unsigned(sectors_.size()));
1423 for (auto& sector : sectors_) {
1424 PW_LOG_DEBUG(
1425 " - Sector %u: valid %u, recoverable %u, free %u",
1426 sectors_.Index(sector),
1427 unsigned(sector.valid_bytes()),
1428 unsigned(sector.RecoverableBytes(partition_.sector_size_bytes())),
1429 unsigned(sector.writable_bytes()));
1430 }
1431 }
1432
LogKeyDescriptor() const1433 void KeyValueStore::LogKeyDescriptor() const {
1434 PW_LOG_DEBUG("std::string_view descriptors: count %u",
1435 unsigned(entry_cache_.total_entries()));
1436 for (const EntryMetadata& metadata : entry_cache_) {
1437 PW_LOG_DEBUG(
1438 " - std::string_view: %s, hash %#x, transaction ID %u, first address "
1439 "%#x",
1440 metadata.state() == EntryState::kDeleted ? "Deleted" : "Valid",
1441 unsigned(metadata.hash()),
1442 unsigned(metadata.transaction_id()),
1443 unsigned(metadata.first_address()));
1444 }
1445 }
1446
1447 } // namespace pw::kvs
1448