1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3 * Copyright 2023 Red Hat
4 */
5
6 #include "slab-depot.h"
7
8 #include <linux/atomic.h>
9 #include <linux/bio.h>
10 #include <linux/err.h>
11 #include <linux/log2.h>
12 #include <linux/min_heap.h>
13 #include <linux/minmax.h>
14
15 #include "logger.h"
16 #include "memory-alloc.h"
17 #include "numeric.h"
18 #include "permassert.h"
19 #include "string-utils.h"
20
21 #include "action-manager.h"
22 #include "admin-state.h"
23 #include "completion.h"
24 #include "constants.h"
25 #include "data-vio.h"
26 #include "encodings.h"
27 #include "io-submitter.h"
28 #include "physical-zone.h"
29 #include "priority-table.h"
30 #include "recovery-journal.h"
31 #include "repair.h"
32 #include "status-codes.h"
33 #include "types.h"
34 #include "vdo.h"
35 #include "vio.h"
36 #include "wait-queue.h"
37
38 static const u64 BYTES_PER_WORD = sizeof(u64);
39 static const bool NORMAL_OPERATION = true;
40
41 /**
42 * get_lock() - Get the lock object for a slab journal block by sequence number.
43 * @journal: vdo_slab journal to retrieve from.
44 * @sequence_number: Sequence number of the block.
45 *
46 * Return: The lock object for the given sequence number.
47 */
get_lock(struct slab_journal * journal,sequence_number_t sequence_number)48 static inline struct journal_lock * __must_check get_lock(struct slab_journal *journal,
49 sequence_number_t sequence_number)
50 {
51 return &journal->locks[sequence_number % journal->size];
52 }
53
is_slab_open(struct vdo_slab * slab)54 static bool is_slab_open(struct vdo_slab *slab)
55 {
56 return (!vdo_is_state_quiescing(&slab->state) &&
57 !vdo_is_state_quiescent(&slab->state));
58 }
59
60 /**
61 * must_make_entries_to_flush() - Check whether there are entry waiters which should delay a flush.
62 * @journal: The journal to check.
63 *
64 * Return: true if there are no entry waiters, or if the slab is unrecovered.
65 */
must_make_entries_to_flush(struct slab_journal * journal)66 static inline bool __must_check must_make_entries_to_flush(struct slab_journal *journal)
67 {
68 return ((journal->slab->status != VDO_SLAB_REBUILDING) &&
69 vdo_waitq_has_waiters(&journal->entry_waiters));
70 }
71
72 /**
73 * is_reaping() - Check whether a reap is currently in progress.
74 * @journal: The journal which may be reaping.
75 *
76 * Return: true if the journal is reaping.
77 */
is_reaping(struct slab_journal * journal)78 static inline bool __must_check is_reaping(struct slab_journal *journal)
79 {
80 return (journal->head != journal->unreapable);
81 }
82
83 /**
84 * initialize_tail_block() - Initialize tail block as a new block.
85 * @journal: The journal whose tail block is being initialized.
86 */
initialize_tail_block(struct slab_journal * journal)87 static void initialize_tail_block(struct slab_journal *journal)
88 {
89 struct slab_journal_block_header *header = &journal->tail_header;
90
91 header->sequence_number = journal->tail;
92 header->entry_count = 0;
93 header->has_block_map_increments = false;
94 }
95
96 /**
97 * initialize_journal_state() - Set all journal fields appropriately to start journaling.
98 * @journal: The journal to be reset, based on its tail sequence number.
99 */
initialize_journal_state(struct slab_journal * journal)100 static void initialize_journal_state(struct slab_journal *journal)
101 {
102 journal->unreapable = journal->head;
103 journal->reap_lock = get_lock(journal, journal->unreapable);
104 journal->next_commit = journal->tail;
105 journal->summarized = journal->last_summarized = journal->tail;
106 initialize_tail_block(journal);
107 }
108
109 /**
110 * block_is_full() - Check whether a journal block is full.
111 * @journal: The slab journal for the block.
112 *
113 * Return: true if the tail block is full.
114 */
block_is_full(struct slab_journal * journal)115 static bool __must_check block_is_full(struct slab_journal *journal)
116 {
117 journal_entry_count_t count = journal->tail_header.entry_count;
118
119 return (journal->tail_header.has_block_map_increments ?
120 (journal->full_entries_per_block == count) :
121 (journal->entries_per_block == count));
122 }
123
124 static void add_entries(struct slab_journal *journal);
125 static void update_tail_block_location(struct slab_journal *journal);
126 static void release_journal_locks(struct vdo_waiter *waiter, void *context);
127
128 /**
129 * is_slab_journal_blank() - Check whether a slab's journal is blank.
130 *
131 * A slab journal is blank if it has never had any entries recorded in it.
132 *
133 * Return: true if the slab's journal has never been modified.
134 */
is_slab_journal_blank(const struct vdo_slab * slab)135 static bool is_slab_journal_blank(const struct vdo_slab *slab)
136 {
137 return ((slab->journal.tail == 1) &&
138 (slab->journal.tail_header.entry_count == 0));
139 }
140
141 /**
142 * mark_slab_journal_dirty() - Put a slab journal on the dirty ring of its allocator in the correct
143 * order.
144 * @journal: The journal to be marked dirty.
145 * @lock: The recovery journal lock held by the slab journal.
146 */
mark_slab_journal_dirty(struct slab_journal * journal,sequence_number_t lock)147 static void mark_slab_journal_dirty(struct slab_journal *journal, sequence_number_t lock)
148 {
149 struct slab_journal *dirty_journal;
150 struct list_head *dirty_list = &journal->slab->allocator->dirty_slab_journals;
151
152 VDO_ASSERT_LOG_ONLY(journal->recovery_lock == 0, "slab journal was clean");
153
154 journal->recovery_lock = lock;
155 list_for_each_entry_reverse(dirty_journal, dirty_list, dirty_entry) {
156 if (dirty_journal->recovery_lock <= journal->recovery_lock)
157 break;
158 }
159
160 list_move_tail(&journal->dirty_entry, dirty_journal->dirty_entry.next);
161 }
162
mark_slab_journal_clean(struct slab_journal * journal)163 static void mark_slab_journal_clean(struct slab_journal *journal)
164 {
165 journal->recovery_lock = 0;
166 list_del_init(&journal->dirty_entry);
167 }
168
check_if_slab_drained(struct vdo_slab * slab)169 static void check_if_slab_drained(struct vdo_slab *slab)
170 {
171 bool read_only;
172 struct slab_journal *journal = &slab->journal;
173 const struct admin_state_code *code;
174
175 if (!vdo_is_state_draining(&slab->state) ||
176 must_make_entries_to_flush(journal) ||
177 is_reaping(journal) ||
178 journal->waiting_to_commit ||
179 !list_empty(&journal->uncommitted_blocks) ||
180 journal->updating_slab_summary ||
181 (slab->active_count > 0))
182 return;
183
184 /* When not suspending or recovering, the slab must be clean. */
185 code = vdo_get_admin_state_code(&slab->state);
186 read_only = vdo_is_read_only(slab->allocator->depot->vdo);
187 if (!read_only &&
188 vdo_waitq_has_waiters(&slab->dirty_blocks) &&
189 (code != VDO_ADMIN_STATE_SUSPENDING) &&
190 (code != VDO_ADMIN_STATE_RECOVERING))
191 return;
192
193 vdo_finish_draining_with_result(&slab->state,
194 (read_only ? VDO_READ_ONLY : VDO_SUCCESS));
195 }
196
197 /* FULLNESS HINT COMPUTATION */
198
199 /**
200 * compute_fullness_hint() - Translate a slab's free block count into a 'fullness hint' that can be
201 * stored in a slab_summary_entry's 7 bits that are dedicated to its free
202 * count.
203 * @depot: The depot whose summary being updated.
204 * @free_blocks: The number of free blocks.
205 *
206 * Note: the number of free blocks must be strictly less than 2^23 blocks, even though
207 * theoretically slabs could contain precisely 2^23 blocks; there is an assumption that at least
208 * one block is used by metadata. This assumption is necessary; otherwise, the fullness hint might
209 * overflow. The fullness hint formula is roughly (fullness >> 16) & 0x7f, but (2^23 >> 16) & 0x7f
210 * is 0, which would make it impossible to distinguish completely full from completely empty.
211 *
212 * Return: A fullness hint, which can be stored in 7 bits.
213 */
compute_fullness_hint(struct slab_depot * depot,block_count_t free_blocks)214 static u8 __must_check compute_fullness_hint(struct slab_depot *depot,
215 block_count_t free_blocks)
216 {
217 block_count_t hint;
218
219 VDO_ASSERT_LOG_ONLY((free_blocks < (1 << 23)), "free blocks must be less than 2^23");
220
221 if (free_blocks == 0)
222 return 0;
223
224 hint = free_blocks >> depot->hint_shift;
225 return ((hint == 0) ? 1 : hint);
226 }
227
228 /**
229 * check_summary_drain_complete() - Check whether an allocators summary has finished draining.
230 */
check_summary_drain_complete(struct block_allocator * allocator)231 static void check_summary_drain_complete(struct block_allocator *allocator)
232 {
233 if (!vdo_is_state_draining(&allocator->summary_state) ||
234 (allocator->summary_write_count > 0))
235 return;
236
237 vdo_finish_operation(&allocator->summary_state,
238 (vdo_is_read_only(allocator->depot->vdo) ?
239 VDO_READ_ONLY : VDO_SUCCESS));
240 }
241
242 /**
243 * notify_summary_waiters() - Wake all the waiters in a given queue.
244 * @allocator: The block allocator summary which owns the queue.
245 * @queue: The queue to notify.
246 */
notify_summary_waiters(struct block_allocator * allocator,struct vdo_wait_queue * queue)247 static void notify_summary_waiters(struct block_allocator *allocator,
248 struct vdo_wait_queue *queue)
249 {
250 int result = (vdo_is_read_only(allocator->depot->vdo) ?
251 VDO_READ_ONLY : VDO_SUCCESS);
252
253 vdo_waitq_notify_all_waiters(queue, NULL, &result);
254 }
255
256 static void launch_write(struct slab_summary_block *summary_block);
257
258 /**
259 * finish_updating_slab_summary_block() - Finish processing a block which attempted to write,
260 * whether or not the attempt succeeded.
261 * @block: The block.
262 */
finish_updating_slab_summary_block(struct slab_summary_block * block)263 static void finish_updating_slab_summary_block(struct slab_summary_block *block)
264 {
265 notify_summary_waiters(block->allocator, &block->current_update_waiters);
266 block->writing = false;
267 block->allocator->summary_write_count--;
268 if (vdo_waitq_has_waiters(&block->next_update_waiters))
269 launch_write(block);
270 else
271 check_summary_drain_complete(block->allocator);
272 }
273
274 /**
275 * finish_update() - This is the callback for a successful summary block write.
276 * @completion: The write vio.
277 */
finish_update(struct vdo_completion * completion)278 static void finish_update(struct vdo_completion *completion)
279 {
280 struct slab_summary_block *block =
281 container_of(as_vio(completion), struct slab_summary_block, vio);
282
283 atomic64_inc(&block->allocator->depot->summary_statistics.blocks_written);
284 finish_updating_slab_summary_block(block);
285 }
286
287 /**
288 * handle_write_error() - Handle an error writing a slab summary block.
289 * @completion: The write VIO.
290 */
handle_write_error(struct vdo_completion * completion)291 static void handle_write_error(struct vdo_completion *completion)
292 {
293 struct slab_summary_block *block =
294 container_of(as_vio(completion), struct slab_summary_block, vio);
295
296 vio_record_metadata_io_error(as_vio(completion));
297 vdo_enter_read_only_mode(completion->vdo, completion->result);
298 finish_updating_slab_summary_block(block);
299 }
300
write_slab_summary_endio(struct bio * bio)301 static void write_slab_summary_endio(struct bio *bio)
302 {
303 struct vio *vio = bio->bi_private;
304 struct slab_summary_block *block =
305 container_of(vio, struct slab_summary_block, vio);
306
307 continue_vio_after_io(vio, finish_update, block->allocator->thread_id);
308 }
309
310 /**
311 * launch_write() - Write a slab summary block unless it is currently out for writing.
312 * @block: The block that needs to be committed.
313 */
launch_write(struct slab_summary_block * block)314 static void launch_write(struct slab_summary_block *block)
315 {
316 struct block_allocator *allocator = block->allocator;
317 struct slab_depot *depot = allocator->depot;
318 physical_block_number_t pbn;
319
320 if (block->writing)
321 return;
322
323 allocator->summary_write_count++;
324 vdo_waitq_transfer_all_waiters(&block->next_update_waiters,
325 &block->current_update_waiters);
326 block->writing = true;
327
328 if (vdo_is_read_only(depot->vdo)) {
329 finish_updating_slab_summary_block(block);
330 return;
331 }
332
333 memcpy(block->outgoing_entries, block->entries, VDO_BLOCK_SIZE);
334
335 /*
336 * Flush before writing to ensure that the slab journal tail blocks and reference updates
337 * covered by this summary update are stable. Otherwise, a subsequent recovery could
338 * encounter a slab summary update that refers to a slab journal tail block that has not
339 * actually been written. In such cases, the slab journal referenced will be treated as
340 * empty, causing any data within the slab which predates the existing recovery journal
341 * entries to be lost.
342 */
343 pbn = (depot->summary_origin +
344 (VDO_SLAB_SUMMARY_BLOCKS_PER_ZONE * allocator->zone_number) +
345 block->index);
346 vdo_submit_metadata_vio(&block->vio, pbn, write_slab_summary_endio,
347 handle_write_error, REQ_OP_WRITE | REQ_PREFLUSH);
348 }
349
350 /**
351 * update_slab_summary_entry() - Update the entry for a slab.
352 * @slab: The slab whose entry is to be updated
353 * @waiter: The waiter that is updating the summary.
354 * @tail_block_offset: The offset of the slab journal's tail block.
355 * @load_ref_counts: Whether the reference counts must be loaded from disk on the vdo load.
356 * @is_clean: Whether the slab is clean.
357 * @free_blocks: The number of free blocks.
358 */
update_slab_summary_entry(struct vdo_slab * slab,struct vdo_waiter * waiter,tail_block_offset_t tail_block_offset,bool load_ref_counts,bool is_clean,block_count_t free_blocks)359 static void update_slab_summary_entry(struct vdo_slab *slab, struct vdo_waiter *waiter,
360 tail_block_offset_t tail_block_offset,
361 bool load_ref_counts, bool is_clean,
362 block_count_t free_blocks)
363 {
364 u8 index = slab->slab_number / VDO_SLAB_SUMMARY_ENTRIES_PER_BLOCK;
365 struct block_allocator *allocator = slab->allocator;
366 struct slab_summary_block *block = &allocator->summary_blocks[index];
367 int result;
368 struct slab_summary_entry *entry;
369
370 if (vdo_is_read_only(block->vio.completion.vdo)) {
371 result = VDO_READ_ONLY;
372 waiter->callback(waiter, &result);
373 return;
374 }
375
376 if (vdo_is_state_draining(&allocator->summary_state) ||
377 vdo_is_state_quiescent(&allocator->summary_state)) {
378 result = VDO_INVALID_ADMIN_STATE;
379 waiter->callback(waiter, &result);
380 return;
381 }
382
383 entry = &allocator->summary_entries[slab->slab_number];
384 *entry = (struct slab_summary_entry) {
385 .tail_block_offset = tail_block_offset,
386 .load_ref_counts = (entry->load_ref_counts || load_ref_counts),
387 .is_dirty = !is_clean,
388 .fullness_hint = compute_fullness_hint(allocator->depot, free_blocks),
389 };
390 vdo_waitq_enqueue_waiter(&block->next_update_waiters, waiter);
391 launch_write(block);
392 }
393
394 /**
395 * finish_reaping() - Actually advance the head of the journal now that any necessary flushes are
396 * complete.
397 * @journal: The journal to be reaped.
398 */
finish_reaping(struct slab_journal * journal)399 static void finish_reaping(struct slab_journal *journal)
400 {
401 journal->head = journal->unreapable;
402 add_entries(journal);
403 check_if_slab_drained(journal->slab);
404 }
405
406 static void reap_slab_journal(struct slab_journal *journal);
407
408 /**
409 * complete_reaping() - Finish reaping now that we have flushed the lower layer and then try
410 * reaping again in case we deferred reaping due to an outstanding vio.
411 * @completion: The flush vio.
412 */
complete_reaping(struct vdo_completion * completion)413 static void complete_reaping(struct vdo_completion *completion)
414 {
415 struct slab_journal *journal = completion->parent;
416
417 return_vio_to_pool(journal->slab->allocator->vio_pool,
418 vio_as_pooled_vio(as_vio(vdo_forget(completion))));
419 finish_reaping(journal);
420 reap_slab_journal(journal);
421 }
422
423 /**
424 * handle_flush_error() - Handle an error flushing the lower layer.
425 * @completion: The flush vio.
426 */
handle_flush_error(struct vdo_completion * completion)427 static void handle_flush_error(struct vdo_completion *completion)
428 {
429 vio_record_metadata_io_error(as_vio(completion));
430 vdo_enter_read_only_mode(completion->vdo, completion->result);
431 complete_reaping(completion);
432 }
433
flush_endio(struct bio * bio)434 static void flush_endio(struct bio *bio)
435 {
436 struct vio *vio = bio->bi_private;
437 struct slab_journal *journal = vio->completion.parent;
438
439 continue_vio_after_io(vio, complete_reaping,
440 journal->slab->allocator->thread_id);
441 }
442
443 /**
444 * flush_for_reaping() - A waiter callback for getting a vio with which to flush the lower layer
445 * prior to reaping.
446 * @waiter: The journal as a flush waiter.
447 * @context: The newly acquired flush vio.
448 */
flush_for_reaping(struct vdo_waiter * waiter,void * context)449 static void flush_for_reaping(struct vdo_waiter *waiter, void *context)
450 {
451 struct slab_journal *journal =
452 container_of(waiter, struct slab_journal, flush_waiter);
453 struct pooled_vio *pooled = context;
454 struct vio *vio = &pooled->vio;
455
456 vio->completion.parent = journal;
457 vdo_submit_flush_vio(vio, flush_endio, handle_flush_error);
458 }
459
460 /**
461 * reap_slab_journal() - Conduct a reap on a slab journal to reclaim unreferenced blocks.
462 * @journal: The slab journal.
463 */
reap_slab_journal(struct slab_journal * journal)464 static void reap_slab_journal(struct slab_journal *journal)
465 {
466 bool reaped = false;
467
468 if (is_reaping(journal)) {
469 /* We already have a reap in progress so wait for it to finish. */
470 return;
471 }
472
473 if ((journal->slab->status != VDO_SLAB_REBUILT) ||
474 !vdo_is_state_normal(&journal->slab->state) ||
475 vdo_is_read_only(journal->slab->allocator->depot->vdo)) {
476 /*
477 * We must not reap in the first two cases, and there's no point in read-only mode.
478 */
479 return;
480 }
481
482 /*
483 * Start reclaiming blocks only when the journal head has no references. Then stop when a
484 * block is referenced or reap reaches the most recently written block, referenced by the
485 * slab summary, which has the sequence number just before the tail.
486 */
487 while ((journal->unreapable < journal->tail) && (journal->reap_lock->count == 0)) {
488 reaped = true;
489 journal->unreapable++;
490 journal->reap_lock++;
491 if (journal->reap_lock == &journal->locks[journal->size])
492 journal->reap_lock = &journal->locks[0];
493 }
494
495 if (!reaped)
496 return;
497
498 /*
499 * It is never safe to reap a slab journal block without first issuing a flush, regardless
500 * of whether a user flush has been received or not. In the absence of the flush, the
501 * reference block write which released the locks allowing the slab journal to reap may not
502 * be persisted. Although slab summary writes will eventually issue flushes, multiple slab
503 * journal block writes can be issued while previous slab summary updates have not yet been
504 * made. Even though those slab journal block writes will be ignored if the slab summary
505 * update is not persisted, they may still overwrite the to-be-reaped slab journal block
506 * resulting in a loss of reference count updates.
507 */
508 journal->flush_waiter.callback = flush_for_reaping;
509 acquire_vio_from_pool(journal->slab->allocator->vio_pool,
510 &journal->flush_waiter);
511 }
512
513 /**
514 * adjust_slab_journal_block_reference() - Adjust the reference count for a slab journal block.
515 * @journal: The slab journal.
516 * @sequence_number: The journal sequence number of the referenced block.
517 * @adjustment: Amount to adjust the reference counter.
518 *
519 * Note that when the adjustment is negative, the slab journal will be reaped.
520 */
adjust_slab_journal_block_reference(struct slab_journal * journal,sequence_number_t sequence_number,int adjustment)521 static void adjust_slab_journal_block_reference(struct slab_journal *journal,
522 sequence_number_t sequence_number,
523 int adjustment)
524 {
525 struct journal_lock *lock;
526
527 if (sequence_number == 0)
528 return;
529
530 if (journal->slab->status == VDO_SLAB_REPLAYING) {
531 /* Locks should not be used during offline replay. */
532 return;
533 }
534
535 VDO_ASSERT_LOG_ONLY((adjustment != 0), "adjustment must be non-zero");
536 lock = get_lock(journal, sequence_number);
537 if (adjustment < 0) {
538 VDO_ASSERT_LOG_ONLY((-adjustment <= lock->count),
539 "adjustment %d of lock count %u for slab journal block %llu must not underflow",
540 adjustment, lock->count,
541 (unsigned long long) sequence_number);
542 }
543
544 lock->count += adjustment;
545 if (lock->count == 0)
546 reap_slab_journal(journal);
547 }
548
549 /**
550 * release_journal_locks() - Callback invoked after a slab summary update completes.
551 * @waiter: The slab summary waiter that has just been notified.
552 * @context: The result code of the update.
553 *
554 * Registered in the constructor on behalf of update_tail_block_location().
555 *
556 * Implements waiter_callback_fn.
557 */
release_journal_locks(struct vdo_waiter * waiter,void * context)558 static void release_journal_locks(struct vdo_waiter *waiter, void *context)
559 {
560 sequence_number_t first, i;
561 struct slab_journal *journal =
562 container_of(waiter, struct slab_journal, slab_summary_waiter);
563 int result = *((int *) context);
564
565 if (result != VDO_SUCCESS) {
566 if (result != VDO_READ_ONLY) {
567 /*
568 * Don't bother logging what might be lots of errors if we are already in
569 * read-only mode.
570 */
571 vdo_log_error_strerror(result, "failed slab summary update %llu",
572 (unsigned long long) journal->summarized);
573 }
574
575 journal->updating_slab_summary = false;
576 vdo_enter_read_only_mode(journal->slab->allocator->depot->vdo, result);
577 check_if_slab_drained(journal->slab);
578 return;
579 }
580
581 if (journal->partial_write_in_progress && (journal->summarized == journal->tail)) {
582 journal->partial_write_in_progress = false;
583 add_entries(journal);
584 }
585
586 first = journal->last_summarized;
587 journal->last_summarized = journal->summarized;
588 for (i = journal->summarized - 1; i >= first; i--) {
589 /*
590 * Release the lock the summarized block held on the recovery journal. (During
591 * replay, recovery_start will always be 0.)
592 */
593 if (journal->recovery_journal != NULL) {
594 zone_count_t zone_number = journal->slab->allocator->zone_number;
595 struct journal_lock *lock = get_lock(journal, i);
596
597 vdo_release_recovery_journal_block_reference(journal->recovery_journal,
598 lock->recovery_start,
599 VDO_ZONE_TYPE_PHYSICAL,
600 zone_number);
601 }
602
603 /*
604 * Release our own lock against reaping for blocks that are committed. (This
605 * function will not change locks during replay.)
606 */
607 adjust_slab_journal_block_reference(journal, i, -1);
608 }
609
610 journal->updating_slab_summary = false;
611
612 reap_slab_journal(journal);
613
614 /* Check if the slab summary needs to be updated again. */
615 update_tail_block_location(journal);
616 }
617
618 /**
619 * update_tail_block_location() - Update the tail block location in the slab summary, if necessary.
620 * @journal: The slab journal that is updating its tail block location.
621 */
update_tail_block_location(struct slab_journal * journal)622 static void update_tail_block_location(struct slab_journal *journal)
623 {
624 block_count_t free_block_count;
625 struct vdo_slab *slab = journal->slab;
626
627 if (journal->updating_slab_summary ||
628 vdo_is_read_only(journal->slab->allocator->depot->vdo) ||
629 (journal->last_summarized >= journal->next_commit)) {
630 check_if_slab_drained(slab);
631 return;
632 }
633
634 if (slab->status != VDO_SLAB_REBUILT) {
635 u8 hint = slab->allocator->summary_entries[slab->slab_number].fullness_hint;
636
637 free_block_count = ((block_count_t) hint) << slab->allocator->depot->hint_shift;
638 } else {
639 free_block_count = slab->free_blocks;
640 }
641
642 journal->summarized = journal->next_commit;
643 journal->updating_slab_summary = true;
644
645 /*
646 * Update slab summary as dirty.
647 * vdo_slab journal can only reap past sequence number 1 when all the ref counts for this
648 * slab have been written to the layer. Therefore, indicate that the ref counts must be
649 * loaded when the journal head has reaped past sequence number 1.
650 */
651 update_slab_summary_entry(slab, &journal->slab_summary_waiter,
652 journal->summarized % journal->size,
653 (journal->head > 1), false, free_block_count);
654 }
655
656 /**
657 * reopen_slab_journal() - Reopen a slab's journal by emptying it and then adding pending entries.
658 */
reopen_slab_journal(struct vdo_slab * slab)659 static void reopen_slab_journal(struct vdo_slab *slab)
660 {
661 struct slab_journal *journal = &slab->journal;
662 sequence_number_t block;
663
664 VDO_ASSERT_LOG_ONLY(journal->tail_header.entry_count == 0,
665 "vdo_slab journal's active block empty before reopening");
666 journal->head = journal->tail;
667 initialize_journal_state(journal);
668
669 /* Ensure no locks are spuriously held on an empty journal. */
670 for (block = 1; block <= journal->size; block++) {
671 VDO_ASSERT_LOG_ONLY((get_lock(journal, block)->count == 0),
672 "Scrubbed journal's block %llu is not locked",
673 (unsigned long long) block);
674 }
675
676 add_entries(journal);
677 }
678
get_committing_sequence_number(const struct pooled_vio * vio)679 static sequence_number_t get_committing_sequence_number(const struct pooled_vio *vio)
680 {
681 const struct packed_slab_journal_block *block =
682 (const struct packed_slab_journal_block *) vio->vio.data;
683
684 return __le64_to_cpu(block->header.sequence_number);
685 }
686
687 /**
688 * complete_write() - Handle post-commit processing.
689 * @completion: The write vio as a completion.
690 *
691 * This is the callback registered by write_slab_journal_block().
692 */
complete_write(struct vdo_completion * completion)693 static void complete_write(struct vdo_completion *completion)
694 {
695 int result = completion->result;
696 struct pooled_vio *pooled = vio_as_pooled_vio(as_vio(completion));
697 struct slab_journal *journal = completion->parent;
698 sequence_number_t committed = get_committing_sequence_number(pooled);
699
700 list_del_init(&pooled->list_entry);
701 return_vio_to_pool(journal->slab->allocator->vio_pool, vdo_forget(pooled));
702
703 if (result != VDO_SUCCESS) {
704 vio_record_metadata_io_error(as_vio(completion));
705 vdo_log_error_strerror(result, "cannot write slab journal block %llu",
706 (unsigned long long) committed);
707 vdo_enter_read_only_mode(journal->slab->allocator->depot->vdo, result);
708 check_if_slab_drained(journal->slab);
709 return;
710 }
711
712 WRITE_ONCE(journal->events->blocks_written, journal->events->blocks_written + 1);
713
714 if (list_empty(&journal->uncommitted_blocks)) {
715 /* If no blocks are outstanding, then the commit point is at the tail. */
716 journal->next_commit = journal->tail;
717 } else {
718 /* The commit point is always the beginning of the oldest incomplete block. */
719 pooled = container_of(journal->uncommitted_blocks.next,
720 struct pooled_vio, list_entry);
721 journal->next_commit = get_committing_sequence_number(pooled);
722 }
723
724 update_tail_block_location(journal);
725 }
726
write_slab_journal_endio(struct bio * bio)727 static void write_slab_journal_endio(struct bio *bio)
728 {
729 struct vio *vio = bio->bi_private;
730 struct slab_journal *journal = vio->completion.parent;
731
732 continue_vio_after_io(vio, complete_write, journal->slab->allocator->thread_id);
733 }
734
735 /**
736 * write_slab_journal_block() - Write a slab journal block.
737 * @waiter: The vio pool waiter which was just notified.
738 * @context: The vio pool entry for the write.
739 *
740 * Callback from acquire_vio_from_pool() registered in commit_tail().
741 */
write_slab_journal_block(struct vdo_waiter * waiter,void * context)742 static void write_slab_journal_block(struct vdo_waiter *waiter, void *context)
743 {
744 struct pooled_vio *pooled = context;
745 struct vio *vio = &pooled->vio;
746 struct slab_journal *journal =
747 container_of(waiter, struct slab_journal, resource_waiter);
748 struct slab_journal_block_header *header = &journal->tail_header;
749 int unused_entries = journal->entries_per_block - header->entry_count;
750 physical_block_number_t block_number;
751 const struct admin_state_code *operation;
752
753 header->head = journal->head;
754 list_add_tail(&pooled->list_entry, &journal->uncommitted_blocks);
755 vdo_pack_slab_journal_block_header(header, &journal->block->header);
756
757 /* Copy the tail block into the vio. */
758 memcpy(pooled->vio.data, journal->block, VDO_BLOCK_SIZE);
759
760 VDO_ASSERT_LOG_ONLY(unused_entries >= 0, "vdo_slab journal block is not overfull");
761 if (unused_entries > 0) {
762 /*
763 * Release the per-entry locks for any unused entries in the block we are about to
764 * write.
765 */
766 adjust_slab_journal_block_reference(journal, header->sequence_number,
767 -unused_entries);
768 journal->partial_write_in_progress = !block_is_full(journal);
769 }
770
771 block_number = journal->slab->journal_origin +
772 (header->sequence_number % journal->size);
773 vio->completion.parent = journal;
774
775 /*
776 * This block won't be read in recovery until the slab summary is updated to refer to it.
777 * The slab summary update does a flush which is sufficient to protect us from corruption
778 * due to out of order slab journal, reference block, or block map writes.
779 */
780 vdo_submit_metadata_vio(vdo_forget(vio), block_number, write_slab_journal_endio,
781 complete_write, REQ_OP_WRITE);
782
783 /* Since the write is submitted, the tail block structure can be reused. */
784 journal->tail++;
785 initialize_tail_block(journal);
786 journal->waiting_to_commit = false;
787
788 operation = vdo_get_admin_state_code(&journal->slab->state);
789 if (operation == VDO_ADMIN_STATE_WAITING_FOR_RECOVERY) {
790 vdo_finish_operation(&journal->slab->state,
791 (vdo_is_read_only(journal->slab->allocator->depot->vdo) ?
792 VDO_READ_ONLY : VDO_SUCCESS));
793 return;
794 }
795
796 add_entries(journal);
797 }
798
799 /**
800 * commit_tail() - Commit the tail block of the slab journal.
801 * @journal: The journal whose tail block should be committed.
802 */
commit_tail(struct slab_journal * journal)803 static void commit_tail(struct slab_journal *journal)
804 {
805 if ((journal->tail_header.entry_count == 0) && must_make_entries_to_flush(journal)) {
806 /*
807 * There are no entries at the moment, but there are some waiters, so defer
808 * initiating the flush until those entries are ready to write.
809 */
810 return;
811 }
812
813 if (vdo_is_read_only(journal->slab->allocator->depot->vdo) ||
814 journal->waiting_to_commit ||
815 (journal->tail_header.entry_count == 0)) {
816 /*
817 * There is nothing to do since the tail block is empty, or writing, or the journal
818 * is in read-only mode.
819 */
820 return;
821 }
822
823 /*
824 * Since we are about to commit the tail block, this journal no longer needs to be on the
825 * ring of journals which the recovery journal might ask to commit.
826 */
827 mark_slab_journal_clean(journal);
828
829 journal->waiting_to_commit = true;
830
831 journal->resource_waiter.callback = write_slab_journal_block;
832 acquire_vio_from_pool(journal->slab->allocator->vio_pool,
833 &journal->resource_waiter);
834 }
835
836 /**
837 * encode_slab_journal_entry() - Encode a slab journal entry.
838 * @tail_header: The unpacked header for the block.
839 * @payload: The journal block payload to hold the entry.
840 * @sbn: The slab block number of the entry to encode.
841 * @operation: The type of the entry.
842 * @increment: True if this is an increment.
843 *
844 * Exposed for unit tests.
845 */
encode_slab_journal_entry(struct slab_journal_block_header * tail_header,slab_journal_payload * payload,slab_block_number sbn,enum journal_operation operation,bool increment)846 static void encode_slab_journal_entry(struct slab_journal_block_header *tail_header,
847 slab_journal_payload *payload,
848 slab_block_number sbn,
849 enum journal_operation operation,
850 bool increment)
851 {
852 journal_entry_count_t entry_number = tail_header->entry_count++;
853
854 if (operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING) {
855 if (!tail_header->has_block_map_increments) {
856 memset(payload->full_entries.entry_types, 0,
857 VDO_SLAB_JOURNAL_ENTRY_TYPES_SIZE);
858 tail_header->has_block_map_increments = true;
859 }
860
861 payload->full_entries.entry_types[entry_number / 8] |=
862 ((u8)1 << (entry_number % 8));
863 }
864
865 vdo_pack_slab_journal_entry(&payload->entries[entry_number], sbn, increment);
866 }
867
868 /**
869 * expand_journal_point() - Convert a recovery journal journal_point which refers to both an
870 * increment and a decrement to a single point which refers to one or the
871 * other.
872 * @recovery_point: The journal point to convert.
873 * @increment: Whether the current entry is an increment.
874 *
875 * Return: The expanded journal point
876 *
877 * Because each data_vio has but a single recovery journal point, but may need to make both
878 * increment and decrement entries in the same slab journal. In order to distinguish the two
879 * entries, the entry count of the expanded journal point is twice the actual recovery journal
880 * entry count for increments, and one more than that for decrements.
881 */
expand_journal_point(struct journal_point recovery_point,bool increment)882 static struct journal_point expand_journal_point(struct journal_point recovery_point,
883 bool increment)
884 {
885 recovery_point.entry_count *= 2;
886 if (!increment)
887 recovery_point.entry_count++;
888
889 return recovery_point;
890 }
891
892 /**
893 * add_entry() - Actually add an entry to the slab journal, potentially firing off a write if a
894 * block becomes full.
895 * @journal: The slab journal to append to.
896 * @pbn: The pbn being adjusted.
897 * @operation: The type of entry to make.
898 * @increment: True if this is an increment.
899 * @recovery_point: The expanded recovery point.
900 *
901 * This function is synchronous.
902 */
add_entry(struct slab_journal * journal,physical_block_number_t pbn,enum journal_operation operation,bool increment,struct journal_point recovery_point)903 static void add_entry(struct slab_journal *journal, physical_block_number_t pbn,
904 enum journal_operation operation, bool increment,
905 struct journal_point recovery_point)
906 {
907 struct packed_slab_journal_block *block = journal->block;
908 int result;
909
910 result = VDO_ASSERT(vdo_before_journal_point(&journal->tail_header.recovery_point,
911 &recovery_point),
912 "recovery journal point is monotonically increasing, recovery point: %llu.%u, block recovery point: %llu.%u",
913 (unsigned long long) recovery_point.sequence_number,
914 recovery_point.entry_count,
915 (unsigned long long) journal->tail_header.recovery_point.sequence_number,
916 journal->tail_header.recovery_point.entry_count);
917 if (result != VDO_SUCCESS) {
918 vdo_enter_read_only_mode(journal->slab->allocator->depot->vdo, result);
919 return;
920 }
921
922 if (operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING) {
923 result = VDO_ASSERT((journal->tail_header.entry_count <
924 journal->full_entries_per_block),
925 "block has room for full entries");
926 if (result != VDO_SUCCESS) {
927 vdo_enter_read_only_mode(journal->slab->allocator->depot->vdo,
928 result);
929 return;
930 }
931 }
932
933 encode_slab_journal_entry(&journal->tail_header, &block->payload,
934 pbn - journal->slab->start, operation, increment);
935 journal->tail_header.recovery_point = recovery_point;
936 if (block_is_full(journal))
937 commit_tail(journal);
938 }
939
journal_length(const struct slab_journal * journal)940 static inline block_count_t journal_length(const struct slab_journal *journal)
941 {
942 return journal->tail - journal->head;
943 }
944
945 /**
946 * vdo_attempt_replay_into_slab() - Replay a recovery journal entry into a slab's journal.
947 * @slab: The slab to play into.
948 * @pbn: The PBN for the entry.
949 * @operation: The type of entry to add.
950 * @increment: True if this entry is an increment.
951 * @recovery_point: The recovery journal point corresponding to this entry.
952 * @parent: The completion to notify when there is space to add the entry if the entry could not be
953 * added immediately.
954 *
955 * Return: true if the entry was added immediately.
956 */
vdo_attempt_replay_into_slab(struct vdo_slab * slab,physical_block_number_t pbn,enum journal_operation operation,bool increment,struct journal_point * recovery_point,struct vdo_completion * parent)957 bool vdo_attempt_replay_into_slab(struct vdo_slab *slab, physical_block_number_t pbn,
958 enum journal_operation operation, bool increment,
959 struct journal_point *recovery_point,
960 struct vdo_completion *parent)
961 {
962 struct slab_journal *journal = &slab->journal;
963 struct slab_journal_block_header *header = &journal->tail_header;
964 struct journal_point expanded = expand_journal_point(*recovery_point, increment);
965
966 /* Only accept entries after the current recovery point. */
967 if (!vdo_before_journal_point(&journal->tail_header.recovery_point, &expanded))
968 return true;
969
970 if ((header->entry_count >= journal->full_entries_per_block) &&
971 (header->has_block_map_increments || (operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING))) {
972 /*
973 * The tail block does not have room for the entry we are attempting to add so
974 * commit the tail block now.
975 */
976 commit_tail(journal);
977 }
978
979 if (journal->waiting_to_commit) {
980 vdo_start_operation_with_waiter(&journal->slab->state,
981 VDO_ADMIN_STATE_WAITING_FOR_RECOVERY,
982 parent, NULL);
983 return false;
984 }
985
986 if (journal_length(journal) >= journal->size) {
987 /*
988 * We must have reaped the current head before the crash, since the blocked
989 * threshold keeps us from having more entries than fit in a slab journal; hence we
990 * can just advance the head (and unreapable block), as needed.
991 */
992 journal->head++;
993 journal->unreapable++;
994 }
995
996 if (journal->slab->status == VDO_SLAB_REBUILT)
997 journal->slab->status = VDO_SLAB_REPLAYING;
998
999 add_entry(journal, pbn, operation, increment, expanded);
1000 return true;
1001 }
1002
1003 /**
1004 * requires_reaping() - Check whether the journal must be reaped before adding new entries.
1005 * @journal: The journal to check.
1006 *
1007 * Return: true if the journal must be reaped.
1008 */
requires_reaping(const struct slab_journal * journal)1009 static bool requires_reaping(const struct slab_journal *journal)
1010 {
1011 return (journal_length(journal) >= journal->blocking_threshold);
1012 }
1013
1014 /** finish_summary_update() - A waiter callback that resets the writing state of a slab. */
finish_summary_update(struct vdo_waiter * waiter,void * context)1015 static void finish_summary_update(struct vdo_waiter *waiter, void *context)
1016 {
1017 struct vdo_slab *slab = container_of(waiter, struct vdo_slab, summary_waiter);
1018 int result = *((int *) context);
1019
1020 slab->active_count--;
1021
1022 if ((result != VDO_SUCCESS) && (result != VDO_READ_ONLY)) {
1023 vdo_log_error_strerror(result, "failed to update slab summary");
1024 vdo_enter_read_only_mode(slab->allocator->depot->vdo, result);
1025 }
1026
1027 check_if_slab_drained(slab);
1028 }
1029
1030 static void write_reference_block(struct vdo_waiter *waiter, void *context);
1031
1032 /**
1033 * launch_reference_block_write() - Launch the write of a dirty reference block by first acquiring
1034 * a VIO for it from the pool.
1035 * @waiter: The waiter of the block which is starting to write.
1036 * @context: The parent slab of the block.
1037 *
1038 * This can be asynchronous since the writer will have to wait if all VIOs in the pool are
1039 * currently in use.
1040 */
launch_reference_block_write(struct vdo_waiter * waiter,void * context)1041 static void launch_reference_block_write(struct vdo_waiter *waiter, void *context)
1042 {
1043 struct vdo_slab *slab = context;
1044
1045 if (vdo_is_read_only(slab->allocator->depot->vdo))
1046 return;
1047
1048 slab->active_count++;
1049 container_of(waiter, struct reference_block, waiter)->is_writing = true;
1050 waiter->callback = write_reference_block;
1051 acquire_vio_from_pool(slab->allocator->vio_pool, waiter);
1052 }
1053
save_dirty_reference_blocks(struct vdo_slab * slab)1054 static void save_dirty_reference_blocks(struct vdo_slab *slab)
1055 {
1056 vdo_waitq_notify_all_waiters(&slab->dirty_blocks,
1057 launch_reference_block_write, slab);
1058 check_if_slab_drained(slab);
1059 }
1060
1061 /**
1062 * finish_reference_block_write() - After a reference block has written, clean it, release its
1063 * locks, and return its VIO to the pool.
1064 * @completion: The VIO that just finished writing.
1065 */
finish_reference_block_write(struct vdo_completion * completion)1066 static void finish_reference_block_write(struct vdo_completion *completion)
1067 {
1068 struct vio *vio = as_vio(completion);
1069 struct pooled_vio *pooled = vio_as_pooled_vio(vio);
1070 struct reference_block *block = completion->parent;
1071 struct vdo_slab *slab = block->slab;
1072 tail_block_offset_t offset;
1073
1074 slab->active_count--;
1075
1076 /* Release the slab journal lock. */
1077 adjust_slab_journal_block_reference(&slab->journal,
1078 block->slab_journal_lock_to_release, -1);
1079 return_vio_to_pool(slab->allocator->vio_pool, pooled);
1080
1081 /*
1082 * We can't clear the is_writing flag earlier as releasing the slab journal lock may cause
1083 * us to be dirtied again, but we don't want to double enqueue.
1084 */
1085 block->is_writing = false;
1086
1087 if (vdo_is_read_only(completion->vdo)) {
1088 check_if_slab_drained(slab);
1089 return;
1090 }
1091
1092 /* Re-queue the block if it was re-dirtied while it was writing. */
1093 if (block->is_dirty) {
1094 vdo_waitq_enqueue_waiter(&block->slab->dirty_blocks, &block->waiter);
1095 if (vdo_is_state_draining(&slab->state)) {
1096 /* We must be saving, and this block will otherwise not be relaunched. */
1097 save_dirty_reference_blocks(slab);
1098 }
1099
1100 return;
1101 }
1102
1103 /*
1104 * Mark the slab as clean in the slab summary if there are no dirty or writing blocks
1105 * and no summary update in progress.
1106 */
1107 if ((slab->active_count > 0) || vdo_waitq_has_waiters(&slab->dirty_blocks)) {
1108 check_if_slab_drained(slab);
1109 return;
1110 }
1111
1112 offset = slab->allocator->summary_entries[slab->slab_number].tail_block_offset;
1113 slab->active_count++;
1114 slab->summary_waiter.callback = finish_summary_update;
1115 update_slab_summary_entry(slab, &slab->summary_waiter, offset,
1116 true, true, slab->free_blocks);
1117 }
1118
1119 /**
1120 * get_reference_counters_for_block() - Find the reference counters for a given block.
1121 * @block: The reference_block in question.
1122 *
1123 * Return: A pointer to the reference counters for this block.
1124 */
get_reference_counters_for_block(struct reference_block * block)1125 static vdo_refcount_t * __must_check get_reference_counters_for_block(struct reference_block *block)
1126 {
1127 size_t block_index = block - block->slab->reference_blocks;
1128
1129 return &block->slab->counters[block_index * COUNTS_PER_BLOCK];
1130 }
1131
1132 /**
1133 * pack_reference_block() - Copy data from a reference block to a buffer ready to be written out.
1134 * @block: The block to copy.
1135 * @buffer: The char buffer to fill with the packed block.
1136 */
pack_reference_block(struct reference_block * block,void * buffer)1137 static void pack_reference_block(struct reference_block *block, void *buffer)
1138 {
1139 struct packed_reference_block *packed = buffer;
1140 vdo_refcount_t *counters = get_reference_counters_for_block(block);
1141 sector_count_t i;
1142 struct packed_journal_point commit_point;
1143
1144 vdo_pack_journal_point(&block->slab->slab_journal_point, &commit_point);
1145
1146 for (i = 0; i < VDO_SECTORS_PER_BLOCK; i++) {
1147 packed->sectors[i].commit_point = commit_point;
1148 memcpy(packed->sectors[i].counts, counters + (i * COUNTS_PER_SECTOR),
1149 (sizeof(vdo_refcount_t) * COUNTS_PER_SECTOR));
1150 }
1151 }
1152
write_reference_block_endio(struct bio * bio)1153 static void write_reference_block_endio(struct bio *bio)
1154 {
1155 struct vio *vio = bio->bi_private;
1156 struct reference_block *block = vio->completion.parent;
1157 thread_id_t thread_id = block->slab->allocator->thread_id;
1158
1159 continue_vio_after_io(vio, finish_reference_block_write, thread_id);
1160 }
1161
1162 /**
1163 * handle_io_error() - Handle an I/O error reading or writing a reference count block.
1164 * @completion: The VIO doing the I/O as a completion.
1165 */
handle_io_error(struct vdo_completion * completion)1166 static void handle_io_error(struct vdo_completion *completion)
1167 {
1168 int result = completion->result;
1169 struct vio *vio = as_vio(completion);
1170 struct vdo_slab *slab = ((struct reference_block *) completion->parent)->slab;
1171
1172 vio_record_metadata_io_error(vio);
1173 return_vio_to_pool(slab->allocator->vio_pool, vio_as_pooled_vio(vio));
1174 slab->active_count--;
1175 vdo_enter_read_only_mode(slab->allocator->depot->vdo, result);
1176 check_if_slab_drained(slab);
1177 }
1178
1179 /**
1180 * write_reference_block() - After a dirty block waiter has gotten a VIO from the VIO pool, copy
1181 * its counters and associated data into the VIO, and launch the write.
1182 * @waiter: The waiter of the dirty block.
1183 * @context: The VIO returned by the pool.
1184 */
write_reference_block(struct vdo_waiter * waiter,void * context)1185 static void write_reference_block(struct vdo_waiter *waiter, void *context)
1186 {
1187 size_t block_offset;
1188 physical_block_number_t pbn;
1189 struct pooled_vio *pooled = context;
1190 struct vdo_completion *completion = &pooled->vio.completion;
1191 struct reference_block *block = container_of(waiter, struct reference_block,
1192 waiter);
1193
1194 pack_reference_block(block, pooled->vio.data);
1195 block_offset = (block - block->slab->reference_blocks);
1196 pbn = (block->slab->ref_counts_origin + block_offset);
1197 block->slab_journal_lock_to_release = block->slab_journal_lock;
1198 completion->parent = block;
1199
1200 /*
1201 * Mark the block as clean, since we won't be committing any updates that happen after this
1202 * moment. As long as VIO order is preserved, two VIOs updating this block at once will not
1203 * cause complications.
1204 */
1205 block->is_dirty = false;
1206
1207 /*
1208 * Flush before writing to ensure that the recovery journal and slab journal entries which
1209 * cover this reference update are stable. This prevents data corruption that can be caused
1210 * by out of order writes.
1211 */
1212 WRITE_ONCE(block->slab->allocator->ref_counts_statistics.blocks_written,
1213 block->slab->allocator->ref_counts_statistics.blocks_written + 1);
1214
1215 completion->callback_thread_id = ((struct block_allocator *) pooled->context)->thread_id;
1216 vdo_submit_metadata_vio(&pooled->vio, pbn, write_reference_block_endio,
1217 handle_io_error, REQ_OP_WRITE | REQ_PREFLUSH);
1218 }
1219
reclaim_journal_space(struct slab_journal * journal)1220 static void reclaim_journal_space(struct slab_journal *journal)
1221 {
1222 block_count_t length = journal_length(journal);
1223 struct vdo_slab *slab = journal->slab;
1224 block_count_t write_count = vdo_waitq_num_waiters(&slab->dirty_blocks);
1225 block_count_t written;
1226
1227 if ((length < journal->flushing_threshold) || (write_count == 0))
1228 return;
1229
1230 /* The slab journal is over the first threshold, schedule some reference block writes. */
1231 WRITE_ONCE(journal->events->flush_count, journal->events->flush_count + 1);
1232 if (length < journal->flushing_deadline) {
1233 /* Schedule more writes the closer to the deadline we get. */
1234 write_count /= journal->flushing_deadline - length + 1;
1235 write_count = max_t(block_count_t, write_count, 1);
1236 }
1237
1238 for (written = 0; written < write_count; written++) {
1239 vdo_waitq_notify_next_waiter(&slab->dirty_blocks,
1240 launch_reference_block_write, slab);
1241 }
1242 }
1243
1244 /**
1245 * reference_count_to_status() - Convert a reference count to a reference status.
1246 * @count: The count to convert.
1247 *
1248 * Return: The appropriate reference status.
1249 */
reference_count_to_status(vdo_refcount_t count)1250 static enum reference_status __must_check reference_count_to_status(vdo_refcount_t count)
1251 {
1252 if (count == EMPTY_REFERENCE_COUNT)
1253 return RS_FREE;
1254 else if (count == 1)
1255 return RS_SINGLE;
1256 else if (count == PROVISIONAL_REFERENCE_COUNT)
1257 return RS_PROVISIONAL;
1258 else
1259 return RS_SHARED;
1260 }
1261
1262 /**
1263 * dirty_block() - Mark a reference count block as dirty, potentially adding it to the dirty queue
1264 * if it wasn't already dirty.
1265 * @block: The reference block to mark as dirty.
1266 */
dirty_block(struct reference_block * block)1267 static void dirty_block(struct reference_block *block)
1268 {
1269 if (block->is_dirty)
1270 return;
1271
1272 block->is_dirty = true;
1273 if (!block->is_writing)
1274 vdo_waitq_enqueue_waiter(&block->slab->dirty_blocks, &block->waiter);
1275 }
1276
1277 /**
1278 * get_reference_block() - Get the reference block that covers the given block index.
1279 */
get_reference_block(struct vdo_slab * slab,slab_block_number index)1280 static struct reference_block * __must_check get_reference_block(struct vdo_slab *slab,
1281 slab_block_number index)
1282 {
1283 return &slab->reference_blocks[index / COUNTS_PER_BLOCK];
1284 }
1285
1286 /**
1287 * slab_block_number_from_pbn() - Determine the index within the slab of a particular physical
1288 * block number.
1289 * @slab: The slab.
1290 * @pbn: The physical block number.
1291 * @slab_block_number_ptr: A pointer to the slab block number.
1292 *
1293 * Return: VDO_SUCCESS or an error code.
1294 */
slab_block_number_from_pbn(struct vdo_slab * slab,physical_block_number_t pbn,slab_block_number * slab_block_number_ptr)1295 static int __must_check slab_block_number_from_pbn(struct vdo_slab *slab,
1296 physical_block_number_t pbn,
1297 slab_block_number *slab_block_number_ptr)
1298 {
1299 u64 slab_block_number;
1300
1301 if (pbn < slab->start)
1302 return VDO_OUT_OF_RANGE;
1303
1304 slab_block_number = pbn - slab->start;
1305 if (slab_block_number >= slab->allocator->depot->slab_config.data_blocks)
1306 return VDO_OUT_OF_RANGE;
1307
1308 *slab_block_number_ptr = slab_block_number;
1309 return VDO_SUCCESS;
1310 }
1311
1312 /**
1313 * get_reference_counter() - Get the reference counter that covers the given physical block number.
1314 * @slab: The slab to query.
1315 * @pbn: The physical block number.
1316 * @counter_ptr: A pointer to the reference counter.
1317 */
get_reference_counter(struct vdo_slab * slab,physical_block_number_t pbn,vdo_refcount_t ** counter_ptr)1318 static int __must_check get_reference_counter(struct vdo_slab *slab,
1319 physical_block_number_t pbn,
1320 vdo_refcount_t **counter_ptr)
1321 {
1322 slab_block_number index;
1323 int result = slab_block_number_from_pbn(slab, pbn, &index);
1324
1325 if (result != VDO_SUCCESS)
1326 return result;
1327
1328 *counter_ptr = &slab->counters[index];
1329
1330 return VDO_SUCCESS;
1331 }
1332
calculate_slab_priority(struct vdo_slab * slab)1333 static unsigned int calculate_slab_priority(struct vdo_slab *slab)
1334 {
1335 block_count_t free_blocks = slab->free_blocks;
1336 unsigned int unopened_slab_priority = slab->allocator->unopened_slab_priority;
1337 unsigned int priority;
1338
1339 /*
1340 * Wholly full slabs must be the only ones with lowest priority, 0.
1341 *
1342 * Slabs that have never been opened (empty, newly initialized, and never been written to)
1343 * have lower priority than previously opened slabs that have a significant number of free
1344 * blocks. This ranking causes VDO to avoid writing physical blocks for the first time
1345 * unless there are very few free blocks that have been previously written to.
1346 *
1347 * Since VDO doesn't discard blocks currently, reusing previously written blocks makes VDO
1348 * a better client of any underlying storage that is thinly-provisioned (though discarding
1349 * would be better).
1350 *
1351 * For all other slabs, the priority is derived from the logarithm of the number of free
1352 * blocks. Slabs with the same order of magnitude of free blocks have the same priority.
1353 * With 2^23 blocks, the priority will range from 1 to 25. The reserved
1354 * unopened_slab_priority divides the range and is skipped by the logarithmic mapping.
1355 */
1356
1357 if (free_blocks == 0)
1358 return 0;
1359
1360 if (is_slab_journal_blank(slab))
1361 return unopened_slab_priority;
1362
1363 priority = (1 + ilog2(free_blocks));
1364 return ((priority < unopened_slab_priority) ? priority : priority + 1);
1365 }
1366
1367 /*
1368 * Slabs are essentially prioritized by an approximation of the number of free blocks in the slab
1369 * so slabs with lots of free blocks will be opened for allocation before slabs that have few free
1370 * blocks.
1371 */
prioritize_slab(struct vdo_slab * slab)1372 static void prioritize_slab(struct vdo_slab *slab)
1373 {
1374 VDO_ASSERT_LOG_ONLY(list_empty(&slab->allocq_entry),
1375 "a slab must not already be on a ring when prioritizing");
1376 slab->priority = calculate_slab_priority(slab);
1377 vdo_priority_table_enqueue(slab->allocator->prioritized_slabs,
1378 slab->priority, &slab->allocq_entry);
1379 }
1380
1381 /**
1382 * adjust_free_block_count() - Adjust the free block count and (if needed) reprioritize the slab.
1383 * @incremented: true if the free block count went up.
1384 */
adjust_free_block_count(struct vdo_slab * slab,bool incremented)1385 static void adjust_free_block_count(struct vdo_slab *slab, bool incremented)
1386 {
1387 struct block_allocator *allocator = slab->allocator;
1388
1389 WRITE_ONCE(allocator->allocated_blocks,
1390 allocator->allocated_blocks + (incremented ? -1 : 1));
1391
1392 /* The open slab doesn't need to be reprioritized until it is closed. */
1393 if (slab == allocator->open_slab)
1394 return;
1395
1396 /* Don't bother adjusting the priority table if unneeded. */
1397 if (slab->priority == calculate_slab_priority(slab))
1398 return;
1399
1400 /*
1401 * Reprioritize the slab to reflect the new free block count by removing it from the table
1402 * and re-enqueuing it with the new priority.
1403 */
1404 vdo_priority_table_remove(allocator->prioritized_slabs, &slab->allocq_entry);
1405 prioritize_slab(slab);
1406 }
1407
1408 /**
1409 * increment_for_data() - Increment the reference count for a data block.
1410 * @slab: The slab which owns the block.
1411 * @block: The reference block which contains the block being updated.
1412 * @block_number: The block to update.
1413 * @old_status: The reference status of the data block before this increment.
1414 * @lock: The pbn_lock associated with this increment (may be NULL).
1415 * @counter_ptr: A pointer to the count for the data block (in, out).
1416 * @adjust_block_count: Whether to update the allocator's free block count.
1417 *
1418 * Return: VDO_SUCCESS or an error.
1419 */
increment_for_data(struct vdo_slab * slab,struct reference_block * block,slab_block_number block_number,enum reference_status old_status,struct pbn_lock * lock,vdo_refcount_t * counter_ptr,bool adjust_block_count)1420 static int increment_for_data(struct vdo_slab *slab, struct reference_block *block,
1421 slab_block_number block_number,
1422 enum reference_status old_status,
1423 struct pbn_lock *lock, vdo_refcount_t *counter_ptr,
1424 bool adjust_block_count)
1425 {
1426 switch (old_status) {
1427 case RS_FREE:
1428 *counter_ptr = 1;
1429 block->allocated_count++;
1430 slab->free_blocks--;
1431 if (adjust_block_count)
1432 adjust_free_block_count(slab, false);
1433
1434 break;
1435
1436 case RS_PROVISIONAL:
1437 *counter_ptr = 1;
1438 break;
1439
1440 default:
1441 /* Single or shared */
1442 if (*counter_ptr >= MAXIMUM_REFERENCE_COUNT) {
1443 return vdo_log_error_strerror(VDO_REF_COUNT_INVALID,
1444 "Incrementing a block already having 254 references (slab %u, offset %u)",
1445 slab->slab_number, block_number);
1446 }
1447 (*counter_ptr)++;
1448 }
1449
1450 if (lock != NULL)
1451 vdo_unassign_pbn_lock_provisional_reference(lock);
1452 return VDO_SUCCESS;
1453 }
1454
1455 /**
1456 * decrement_for_data() - Decrement the reference count for a data block.
1457 * @slab: The slab which owns the block.
1458 * @block: The reference block which contains the block being updated.
1459 * @block_number: The block to update.
1460 * @old_status: The reference status of the data block before this decrement.
1461 * @updater: The reference updater doing this operation in case we need to look up the pbn lock.
1462 * @counter_ptr: A pointer to the count for the data block (in, out).
1463 * @adjust_block_count: Whether to update the allocator's free block count.
1464 *
1465 * Return: VDO_SUCCESS or an error.
1466 */
decrement_for_data(struct vdo_slab * slab,struct reference_block * block,slab_block_number block_number,enum reference_status old_status,struct reference_updater * updater,vdo_refcount_t * counter_ptr,bool adjust_block_count)1467 static int decrement_for_data(struct vdo_slab *slab, struct reference_block *block,
1468 slab_block_number block_number,
1469 enum reference_status old_status,
1470 struct reference_updater *updater,
1471 vdo_refcount_t *counter_ptr, bool adjust_block_count)
1472 {
1473 switch (old_status) {
1474 case RS_FREE:
1475 return vdo_log_error_strerror(VDO_REF_COUNT_INVALID,
1476 "Decrementing free block at offset %u in slab %u",
1477 block_number, slab->slab_number);
1478
1479 case RS_PROVISIONAL:
1480 case RS_SINGLE:
1481 if (updater->zpbn.zone != NULL) {
1482 struct pbn_lock *lock = vdo_get_physical_zone_pbn_lock(updater->zpbn.zone,
1483 updater->zpbn.pbn);
1484
1485 if (lock != NULL) {
1486 /*
1487 * There is a read lock on this block, so the block must not become
1488 * unreferenced.
1489 */
1490 *counter_ptr = PROVISIONAL_REFERENCE_COUNT;
1491 vdo_assign_pbn_lock_provisional_reference(lock);
1492 break;
1493 }
1494 }
1495
1496 *counter_ptr = EMPTY_REFERENCE_COUNT;
1497 block->allocated_count--;
1498 slab->free_blocks++;
1499 if (adjust_block_count)
1500 adjust_free_block_count(slab, true);
1501
1502 break;
1503
1504 default:
1505 /* Shared */
1506 (*counter_ptr)--;
1507 }
1508
1509 return VDO_SUCCESS;
1510 }
1511
1512 /**
1513 * increment_for_block_map() - Increment the reference count for a block map page.
1514 * @slab: The slab which owns the block.
1515 * @block: The reference block which contains the block being updated.
1516 * @block_number: The block to update.
1517 * @old_status: The reference status of the block before this increment.
1518 * @lock: The pbn_lock associated with this increment (may be NULL).
1519 * @normal_operation: Whether we are in normal operation vs. recovery or rebuild.
1520 * @counter_ptr: A pointer to the count for the block (in, out).
1521 * @adjust_block_count: Whether to update the allocator's free block count.
1522 *
1523 * All block map increments should be from provisional to MAXIMUM_REFERENCE_COUNT. Since block map
1524 * blocks never dedupe they should never be adjusted from any other state. The adjustment always
1525 * results in MAXIMUM_REFERENCE_COUNT as this value is used to prevent dedupe against block map
1526 * blocks.
1527 *
1528 * Return: VDO_SUCCESS or an error.
1529 */
increment_for_block_map(struct vdo_slab * slab,struct reference_block * block,slab_block_number block_number,enum reference_status old_status,struct pbn_lock * lock,bool normal_operation,vdo_refcount_t * counter_ptr,bool adjust_block_count)1530 static int increment_for_block_map(struct vdo_slab *slab, struct reference_block *block,
1531 slab_block_number block_number,
1532 enum reference_status old_status,
1533 struct pbn_lock *lock, bool normal_operation,
1534 vdo_refcount_t *counter_ptr, bool adjust_block_count)
1535 {
1536 switch (old_status) {
1537 case RS_FREE:
1538 if (normal_operation) {
1539 return vdo_log_error_strerror(VDO_REF_COUNT_INVALID,
1540 "Incrementing unallocated block map block (slab %u, offset %u)",
1541 slab->slab_number, block_number);
1542 }
1543
1544 *counter_ptr = MAXIMUM_REFERENCE_COUNT;
1545 block->allocated_count++;
1546 slab->free_blocks--;
1547 if (adjust_block_count)
1548 adjust_free_block_count(slab, false);
1549
1550 return VDO_SUCCESS;
1551
1552 case RS_PROVISIONAL:
1553 if (!normal_operation)
1554 return vdo_log_error_strerror(VDO_REF_COUNT_INVALID,
1555 "Block map block had provisional reference during replay (slab %u, offset %u)",
1556 slab->slab_number, block_number);
1557
1558 *counter_ptr = MAXIMUM_REFERENCE_COUNT;
1559 if (lock != NULL)
1560 vdo_unassign_pbn_lock_provisional_reference(lock);
1561 return VDO_SUCCESS;
1562
1563 default:
1564 return vdo_log_error_strerror(VDO_REF_COUNT_INVALID,
1565 "Incrementing a block map block which is already referenced %u times (slab %u, offset %u)",
1566 *counter_ptr, slab->slab_number,
1567 block_number);
1568 }
1569 }
1570
is_valid_journal_point(const struct journal_point * point)1571 static bool __must_check is_valid_journal_point(const struct journal_point *point)
1572 {
1573 return ((point != NULL) && (point->sequence_number > 0));
1574 }
1575
1576 /**
1577 * update_reference_count() - Update the reference count of a block.
1578 * @slab: The slab which owns the block.
1579 * @block: The reference block which contains the block being updated.
1580 * @block_number: The block to update.
1581 * @slab_journal_point: The slab journal point at which this update is journaled.
1582 * @updater: The reference updater.
1583 * @normal_operation: Whether we are in normal operation vs. recovery or rebuild.
1584 * @adjust_block_count: Whether to update the slab's free block count.
1585 * @provisional_decrement_ptr: A pointer which will be set to true if this update was a decrement
1586 * of a provisional reference.
1587 *
1588 * Return: VDO_SUCCESS or an error.
1589 */
update_reference_count(struct vdo_slab * slab,struct reference_block * block,slab_block_number block_number,const struct journal_point * slab_journal_point,struct reference_updater * updater,bool normal_operation,bool adjust_block_count,bool * provisional_decrement_ptr)1590 static int update_reference_count(struct vdo_slab *slab, struct reference_block *block,
1591 slab_block_number block_number,
1592 const struct journal_point *slab_journal_point,
1593 struct reference_updater *updater,
1594 bool normal_operation, bool adjust_block_count,
1595 bool *provisional_decrement_ptr)
1596 {
1597 vdo_refcount_t *counter_ptr = &slab->counters[block_number];
1598 enum reference_status old_status = reference_count_to_status(*counter_ptr);
1599 int result;
1600
1601 if (!updater->increment) {
1602 result = decrement_for_data(slab, block, block_number, old_status,
1603 updater, counter_ptr, adjust_block_count);
1604 if ((result == VDO_SUCCESS) && (old_status == RS_PROVISIONAL)) {
1605 if (provisional_decrement_ptr != NULL)
1606 *provisional_decrement_ptr = true;
1607 return VDO_SUCCESS;
1608 }
1609 } else if (updater->operation == VDO_JOURNAL_DATA_REMAPPING) {
1610 result = increment_for_data(slab, block, block_number, old_status,
1611 updater->lock, counter_ptr, adjust_block_count);
1612 } else {
1613 result = increment_for_block_map(slab, block, block_number, old_status,
1614 updater->lock, normal_operation,
1615 counter_ptr, adjust_block_count);
1616 }
1617
1618 if (result != VDO_SUCCESS)
1619 return result;
1620
1621 if (is_valid_journal_point(slab_journal_point))
1622 slab->slab_journal_point = *slab_journal_point;
1623
1624 return VDO_SUCCESS;
1625 }
1626
adjust_reference_count(struct vdo_slab * slab,struct reference_updater * updater,const struct journal_point * slab_journal_point)1627 static int __must_check adjust_reference_count(struct vdo_slab *slab,
1628 struct reference_updater *updater,
1629 const struct journal_point *slab_journal_point)
1630 {
1631 slab_block_number block_number;
1632 int result;
1633 struct reference_block *block;
1634 bool provisional_decrement = false;
1635
1636 if (!is_slab_open(slab))
1637 return VDO_INVALID_ADMIN_STATE;
1638
1639 result = slab_block_number_from_pbn(slab, updater->zpbn.pbn, &block_number);
1640 if (result != VDO_SUCCESS)
1641 return result;
1642
1643 block = get_reference_block(slab, block_number);
1644 result = update_reference_count(slab, block, block_number, slab_journal_point,
1645 updater, NORMAL_OPERATION, true,
1646 &provisional_decrement);
1647 if ((result != VDO_SUCCESS) || provisional_decrement)
1648 return result;
1649
1650 if (block->is_dirty && (block->slab_journal_lock > 0)) {
1651 sequence_number_t entry_lock = slab_journal_point->sequence_number;
1652 /*
1653 * This block is already dirty and a slab journal entry has been made for it since
1654 * the last time it was clean. We must release the per-entry slab journal lock for
1655 * the entry associated with the update we are now doing.
1656 */
1657 result = VDO_ASSERT(is_valid_journal_point(slab_journal_point),
1658 "Reference count adjustments need slab journal points.");
1659 if (result != VDO_SUCCESS)
1660 return result;
1661
1662 adjust_slab_journal_block_reference(&slab->journal, entry_lock, -1);
1663 return VDO_SUCCESS;
1664 }
1665
1666 /*
1667 * This may be the first time we are applying an update for which there is a slab journal
1668 * entry to this block since the block was cleaned. Therefore, we convert the per-entry
1669 * slab journal lock to an uncommitted reference block lock, if there is a per-entry lock.
1670 */
1671 if (is_valid_journal_point(slab_journal_point))
1672 block->slab_journal_lock = slab_journal_point->sequence_number;
1673 else
1674 block->slab_journal_lock = 0;
1675
1676 dirty_block(block);
1677 return VDO_SUCCESS;
1678 }
1679
1680 /**
1681 * add_entry_from_waiter() - Add an entry to the slab journal.
1682 * @waiter: The vio which should make an entry now.
1683 * @context: The slab journal to make an entry in.
1684 *
1685 * This callback is invoked by add_entries() once it has determined that we are ready to make
1686 * another entry in the slab journal. Implements waiter_callback_fn.
1687 */
add_entry_from_waiter(struct vdo_waiter * waiter,void * context)1688 static void add_entry_from_waiter(struct vdo_waiter *waiter, void *context)
1689 {
1690 int result;
1691 struct reference_updater *updater =
1692 container_of(waiter, struct reference_updater, waiter);
1693 struct data_vio *data_vio = data_vio_from_reference_updater(updater);
1694 struct slab_journal *journal = context;
1695 struct slab_journal_block_header *header = &journal->tail_header;
1696 struct journal_point slab_journal_point = {
1697 .sequence_number = header->sequence_number,
1698 .entry_count = header->entry_count,
1699 };
1700 sequence_number_t recovery_block = data_vio->recovery_journal_point.sequence_number;
1701
1702 if (header->entry_count == 0) {
1703 /*
1704 * This is the first entry in the current tail block, so get a lock on the recovery
1705 * journal which we will hold until this tail block is committed.
1706 */
1707 get_lock(journal, header->sequence_number)->recovery_start = recovery_block;
1708 if (journal->recovery_journal != NULL) {
1709 zone_count_t zone_number = journal->slab->allocator->zone_number;
1710
1711 vdo_acquire_recovery_journal_block_reference(journal->recovery_journal,
1712 recovery_block,
1713 VDO_ZONE_TYPE_PHYSICAL,
1714 zone_number);
1715 }
1716
1717 mark_slab_journal_dirty(journal, recovery_block);
1718 reclaim_journal_space(journal);
1719 }
1720
1721 add_entry(journal, updater->zpbn.pbn, updater->operation, updater->increment,
1722 expand_journal_point(data_vio->recovery_journal_point,
1723 updater->increment));
1724
1725 if (journal->slab->status != VDO_SLAB_REBUILT) {
1726 /*
1727 * If the slab is unrecovered, scrubbing will take care of the count since the
1728 * update is now recorded in the journal.
1729 */
1730 adjust_slab_journal_block_reference(journal,
1731 slab_journal_point.sequence_number, -1);
1732 result = VDO_SUCCESS;
1733 } else {
1734 /* Now that an entry has been made in the slab journal, update the counter. */
1735 result = adjust_reference_count(journal->slab, updater,
1736 &slab_journal_point);
1737 }
1738
1739 if (updater->increment)
1740 continue_data_vio_with_error(data_vio, result);
1741 else
1742 vdo_continue_completion(&data_vio->decrement_completion, result);
1743 }
1744
1745 /**
1746 * is_next_entry_a_block_map_increment() - Check whether the next entry to be made is a block map
1747 * increment.
1748 * @journal: The journal.
1749 *
1750 * Return: true if the first entry waiter's operation is a block map increment.
1751 */
is_next_entry_a_block_map_increment(struct slab_journal * journal)1752 static inline bool is_next_entry_a_block_map_increment(struct slab_journal *journal)
1753 {
1754 struct vdo_waiter *waiter = vdo_waitq_get_first_waiter(&journal->entry_waiters);
1755 struct reference_updater *updater =
1756 container_of(waiter, struct reference_updater, waiter);
1757
1758 return (updater->operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING);
1759 }
1760
1761 /**
1762 * add_entries() - Add as many entries as possible from the queue of vios waiting to make entries.
1763 * @journal: The journal to which entries may be added.
1764 *
1765 * By processing the queue in order, we ensure that slab journal entries are made in the same order
1766 * as recovery journal entries for the same increment or decrement.
1767 */
add_entries(struct slab_journal * journal)1768 static void add_entries(struct slab_journal *journal)
1769 {
1770 if (journal->adding_entries) {
1771 /* Protect against re-entrancy. */
1772 return;
1773 }
1774
1775 journal->adding_entries = true;
1776 while (vdo_waitq_has_waiters(&journal->entry_waiters)) {
1777 struct slab_journal_block_header *header = &journal->tail_header;
1778
1779 if (journal->partial_write_in_progress ||
1780 (journal->slab->status == VDO_SLAB_REBUILDING)) {
1781 /*
1782 * Don't add entries while rebuilding or while a partial write is
1783 * outstanding, as it could result in reference count corruption.
1784 */
1785 break;
1786 }
1787
1788 if (journal->waiting_to_commit) {
1789 /*
1790 * If we are waiting for resources to write the tail block, and the tail
1791 * block is full, we can't make another entry.
1792 */
1793 WRITE_ONCE(journal->events->tail_busy_count,
1794 journal->events->tail_busy_count + 1);
1795 break;
1796 } else if (is_next_entry_a_block_map_increment(journal) &&
1797 (header->entry_count >= journal->full_entries_per_block)) {
1798 /*
1799 * The tail block does not have room for a block map increment, so commit
1800 * it now.
1801 */
1802 commit_tail(journal);
1803 if (journal->waiting_to_commit) {
1804 WRITE_ONCE(journal->events->tail_busy_count,
1805 journal->events->tail_busy_count + 1);
1806 break;
1807 }
1808 }
1809
1810 /* If the slab is over the blocking threshold, make the vio wait. */
1811 if (requires_reaping(journal)) {
1812 WRITE_ONCE(journal->events->blocked_count,
1813 journal->events->blocked_count + 1);
1814 save_dirty_reference_blocks(journal->slab);
1815 break;
1816 }
1817
1818 if (header->entry_count == 0) {
1819 struct journal_lock *lock =
1820 get_lock(journal, header->sequence_number);
1821
1822 /*
1823 * Check if the on disk slab journal is full. Because of the blocking and
1824 * scrubbing thresholds, this should never happen.
1825 */
1826 if (lock->count > 0) {
1827 VDO_ASSERT_LOG_ONLY((journal->head + journal->size) == journal->tail,
1828 "New block has locks, but journal is not full");
1829
1830 /*
1831 * The blocking threshold must let the journal fill up if the new
1832 * block has locks; if the blocking threshold is smaller than the
1833 * journal size, the new block cannot possibly have locks already.
1834 */
1835 VDO_ASSERT_LOG_ONLY((journal->blocking_threshold >= journal->size),
1836 "New block can have locks already iff blocking threshold is at the end of the journal");
1837
1838 WRITE_ONCE(journal->events->disk_full_count,
1839 journal->events->disk_full_count + 1);
1840 save_dirty_reference_blocks(journal->slab);
1841 break;
1842 }
1843
1844 /*
1845 * Don't allow the new block to be reaped until all of the reference count
1846 * blocks are written and the journal block has been fully committed as
1847 * well.
1848 */
1849 lock->count = journal->entries_per_block + 1;
1850
1851 if (header->sequence_number == 1) {
1852 struct vdo_slab *slab = journal->slab;
1853 block_count_t i;
1854
1855 /*
1856 * This is the first entry in this slab journal, ever. Dirty all of
1857 * the reference count blocks. Each will acquire a lock on the tail
1858 * block so that the journal won't be reaped until the reference
1859 * counts are initialized. The lock acquisition must be done by the
1860 * ref_counts since here we don't know how many reference blocks
1861 * the ref_counts has.
1862 */
1863 for (i = 0; i < slab->reference_block_count; i++) {
1864 slab->reference_blocks[i].slab_journal_lock = 1;
1865 dirty_block(&slab->reference_blocks[i]);
1866 }
1867
1868 adjust_slab_journal_block_reference(journal, 1,
1869 slab->reference_block_count);
1870 }
1871 }
1872
1873 vdo_waitq_notify_next_waiter(&journal->entry_waiters,
1874 add_entry_from_waiter, journal);
1875 }
1876
1877 journal->adding_entries = false;
1878
1879 /* If there are no waiters, and we are flushing or saving, commit the tail block. */
1880 if (vdo_is_state_draining(&journal->slab->state) &&
1881 !vdo_is_state_suspending(&journal->slab->state) &&
1882 !vdo_waitq_has_waiters(&journal->entry_waiters))
1883 commit_tail(journal);
1884 }
1885
1886 /**
1887 * reset_search_cursor() - Reset the free block search back to the first reference counter in the
1888 * first reference block of a slab.
1889 */
reset_search_cursor(struct vdo_slab * slab)1890 static void reset_search_cursor(struct vdo_slab *slab)
1891 {
1892 struct search_cursor *cursor = &slab->search_cursor;
1893
1894 cursor->block = cursor->first_block;
1895 cursor->index = 0;
1896 /* Unit tests have slabs with only one reference block (and it's a runt). */
1897 cursor->end_index = min_t(u32, COUNTS_PER_BLOCK, slab->block_count);
1898 }
1899
1900 /**
1901 * advance_search_cursor() - Advance the search cursor to the start of the next reference block in
1902 * a slab,
1903 *
1904 * Wraps around to the first reference block if the current block is the last reference block.
1905 *
1906 * Return: true unless the cursor was at the last reference block.
1907 */
advance_search_cursor(struct vdo_slab * slab)1908 static bool advance_search_cursor(struct vdo_slab *slab)
1909 {
1910 struct search_cursor *cursor = &slab->search_cursor;
1911
1912 /*
1913 * If we just finished searching the last reference block, then wrap back around to the
1914 * start of the array.
1915 */
1916 if (cursor->block == cursor->last_block) {
1917 reset_search_cursor(slab);
1918 return false;
1919 }
1920
1921 /* We're not already at the end, so advance to cursor to the next block. */
1922 cursor->block++;
1923 cursor->index = cursor->end_index;
1924
1925 if (cursor->block == cursor->last_block) {
1926 /* The last reference block will usually be a runt. */
1927 cursor->end_index = slab->block_count;
1928 } else {
1929 cursor->end_index += COUNTS_PER_BLOCK;
1930 }
1931
1932 return true;
1933 }
1934
1935 /**
1936 * vdo_adjust_reference_count_for_rebuild() - Adjust the reference count of a block during rebuild.
1937 *
1938 * Return: VDO_SUCCESS or an error.
1939 */
vdo_adjust_reference_count_for_rebuild(struct slab_depot * depot,physical_block_number_t pbn,enum journal_operation operation)1940 int vdo_adjust_reference_count_for_rebuild(struct slab_depot *depot,
1941 physical_block_number_t pbn,
1942 enum journal_operation operation)
1943 {
1944 int result;
1945 slab_block_number block_number;
1946 struct reference_block *block;
1947 struct vdo_slab *slab = vdo_get_slab(depot, pbn);
1948 struct reference_updater updater = {
1949 .operation = operation,
1950 .increment = true,
1951 };
1952
1953 result = slab_block_number_from_pbn(slab, pbn, &block_number);
1954 if (result != VDO_SUCCESS)
1955 return result;
1956
1957 block = get_reference_block(slab, block_number);
1958 result = update_reference_count(slab, block, block_number, NULL,
1959 &updater, !NORMAL_OPERATION, false, NULL);
1960 if (result != VDO_SUCCESS)
1961 return result;
1962
1963 dirty_block(block);
1964 return VDO_SUCCESS;
1965 }
1966
1967 /**
1968 * replay_reference_count_change() - Replay the reference count adjustment from a slab journal
1969 * entry into the reference count for a block.
1970 * @slab: The slab.
1971 * @entry_point: The slab journal point for the entry.
1972 * @entry: The slab journal entry being replayed.
1973 *
1974 * The adjustment will be ignored if it was already recorded in the reference count.
1975 *
1976 * Return: VDO_SUCCESS or an error code.
1977 */
replay_reference_count_change(struct vdo_slab * slab,const struct journal_point * entry_point,struct slab_journal_entry entry)1978 static int replay_reference_count_change(struct vdo_slab *slab,
1979 const struct journal_point *entry_point,
1980 struct slab_journal_entry entry)
1981 {
1982 int result;
1983 struct reference_block *block = get_reference_block(slab, entry.sbn);
1984 sector_count_t sector = (entry.sbn % COUNTS_PER_BLOCK) / COUNTS_PER_SECTOR;
1985 struct reference_updater updater = {
1986 .operation = entry.operation,
1987 .increment = entry.increment,
1988 };
1989
1990 if (!vdo_before_journal_point(&block->commit_points[sector], entry_point)) {
1991 /* This entry is already reflected in the existing counts, so do nothing. */
1992 return VDO_SUCCESS;
1993 }
1994
1995 /* This entry is not yet counted in the reference counts. */
1996 result = update_reference_count(slab, block, entry.sbn, entry_point,
1997 &updater, !NORMAL_OPERATION, false, NULL);
1998 if (result != VDO_SUCCESS)
1999 return result;
2000
2001 dirty_block(block);
2002 return VDO_SUCCESS;
2003 }
2004
2005 /**
2006 * find_zero_byte_in_word() - Find the array index of the first zero byte in word-sized range of
2007 * reference counters.
2008 * @word_ptr: A pointer to the eight counter bytes to check.
2009 * @start_index: The array index corresponding to word_ptr[0].
2010 * @fail_index: The array index to return if no zero byte is found.
2011 *
2012 * The search does no bounds checking; the function relies on the array being sufficiently padded.
2013 *
2014 * Return: The array index of the first zero byte in the word, or the value passed as fail_index if
2015 * no zero byte was found.
2016 */
find_zero_byte_in_word(const u8 * word_ptr,slab_block_number start_index,slab_block_number fail_index)2017 static inline slab_block_number find_zero_byte_in_word(const u8 *word_ptr,
2018 slab_block_number start_index,
2019 slab_block_number fail_index)
2020 {
2021 u64 word = get_unaligned_le64(word_ptr);
2022
2023 /* This looks like a loop, but GCC will unroll the eight iterations for us. */
2024 unsigned int offset;
2025
2026 for (offset = 0; offset < BYTES_PER_WORD; offset++) {
2027 /* Assumes little-endian byte order, which we have on X86. */
2028 if ((word & 0xFF) == 0)
2029 return (start_index + offset);
2030 word >>= 8;
2031 }
2032
2033 return fail_index;
2034 }
2035
2036 /**
2037 * find_free_block() - Find the first block with a reference count of zero in the specified
2038 * range of reference counter indexes.
2039 * @slab: The slab counters to scan.
2040 * @index_ptr: A pointer to hold the array index of the free block.
2041 *
2042 * Exposed for unit testing.
2043 *
2044 * Return: true if a free block was found in the specified range.
2045 */
find_free_block(const struct vdo_slab * slab,slab_block_number * index_ptr)2046 static bool find_free_block(const struct vdo_slab *slab, slab_block_number *index_ptr)
2047 {
2048 slab_block_number zero_index;
2049 slab_block_number next_index = slab->search_cursor.index;
2050 slab_block_number end_index = slab->search_cursor.end_index;
2051 u8 *next_counter = &slab->counters[next_index];
2052 u8 *end_counter = &slab->counters[end_index];
2053
2054 /*
2055 * Search every byte of the first unaligned word. (Array is padded so reading past end is
2056 * safe.)
2057 */
2058 zero_index = find_zero_byte_in_word(next_counter, next_index, end_index);
2059 if (zero_index < end_index) {
2060 *index_ptr = zero_index;
2061 return true;
2062 }
2063
2064 /*
2065 * On architectures where unaligned word access is expensive, this would be a good place to
2066 * advance to an alignment boundary.
2067 */
2068 next_index += BYTES_PER_WORD;
2069 next_counter += BYTES_PER_WORD;
2070
2071 /*
2072 * Now we're word-aligned; check an word at a time until we find a word containing a zero.
2073 * (Array is padded so reading past end is safe.)
2074 */
2075 while (next_counter < end_counter) {
2076 /*
2077 * The following code is currently an exact copy of the code preceding the loop,
2078 * but if you try to merge them by using a do loop, it runs slower because a jump
2079 * instruction gets added at the start of the iteration.
2080 */
2081 zero_index = find_zero_byte_in_word(next_counter, next_index, end_index);
2082 if (zero_index < end_index) {
2083 *index_ptr = zero_index;
2084 return true;
2085 }
2086
2087 next_index += BYTES_PER_WORD;
2088 next_counter += BYTES_PER_WORD;
2089 }
2090
2091 return false;
2092 }
2093
2094 /**
2095 * search_current_reference_block() - Search the reference block currently saved in the search
2096 * cursor for a reference count of zero, starting at the saved
2097 * counter index.
2098 * @slab: The slab to search.
2099 * @free_index_ptr: A pointer to receive the array index of the zero reference count.
2100 *
2101 * Return: true if an unreferenced counter was found.
2102 */
search_current_reference_block(const struct vdo_slab * slab,slab_block_number * free_index_ptr)2103 static bool search_current_reference_block(const struct vdo_slab *slab,
2104 slab_block_number *free_index_ptr)
2105 {
2106 /* Don't bother searching if the current block is known to be full. */
2107 return ((slab->search_cursor.block->allocated_count < COUNTS_PER_BLOCK) &&
2108 find_free_block(slab, free_index_ptr));
2109 }
2110
2111 /**
2112 * search_reference_blocks() - Search each reference block for a reference count of zero.
2113 * @slab: The slab to search.
2114 * @free_index_ptr: A pointer to receive the array index of the zero reference count.
2115 *
2116 * Searches each reference block for a reference count of zero, starting at the reference block and
2117 * counter index saved in the search cursor and searching up to the end of the last reference
2118 * block. The search does not wrap.
2119 *
2120 * Return: true if an unreferenced counter was found.
2121 */
search_reference_blocks(struct vdo_slab * slab,slab_block_number * free_index_ptr)2122 static bool search_reference_blocks(struct vdo_slab *slab,
2123 slab_block_number *free_index_ptr)
2124 {
2125 /* Start searching at the saved search position in the current block. */
2126 if (search_current_reference_block(slab, free_index_ptr))
2127 return true;
2128
2129 /* Search each reference block up to the end of the slab. */
2130 while (advance_search_cursor(slab)) {
2131 if (search_current_reference_block(slab, free_index_ptr))
2132 return true;
2133 }
2134
2135 return false;
2136 }
2137
2138 /**
2139 * make_provisional_reference() - Do the bookkeeping for making a provisional reference.
2140 */
make_provisional_reference(struct vdo_slab * slab,slab_block_number block_number)2141 static void make_provisional_reference(struct vdo_slab *slab,
2142 slab_block_number block_number)
2143 {
2144 struct reference_block *block = get_reference_block(slab, block_number);
2145
2146 /*
2147 * Make the initial transition from an unreferenced block to a
2148 * provisionally allocated block.
2149 */
2150 slab->counters[block_number] = PROVISIONAL_REFERENCE_COUNT;
2151
2152 /* Account for the allocation. */
2153 block->allocated_count++;
2154 slab->free_blocks--;
2155 }
2156
2157 /**
2158 * dirty_all_reference_blocks() - Mark all reference count blocks in a slab as dirty.
2159 */
dirty_all_reference_blocks(struct vdo_slab * slab)2160 static void dirty_all_reference_blocks(struct vdo_slab *slab)
2161 {
2162 block_count_t i;
2163
2164 for (i = 0; i < slab->reference_block_count; i++)
2165 dirty_block(&slab->reference_blocks[i]);
2166 }
2167
2168 /**
2169 * clear_provisional_references() - Clear the provisional reference counts from a reference block.
2170 * @block: The block to clear.
2171 */
clear_provisional_references(struct reference_block * block)2172 static void clear_provisional_references(struct reference_block *block)
2173 {
2174 vdo_refcount_t *counters = get_reference_counters_for_block(block);
2175 block_count_t j;
2176
2177 for (j = 0; j < COUNTS_PER_BLOCK; j++) {
2178 if (counters[j] == PROVISIONAL_REFERENCE_COUNT) {
2179 counters[j] = EMPTY_REFERENCE_COUNT;
2180 block->allocated_count--;
2181 }
2182 }
2183 }
2184
journal_points_equal(struct journal_point first,struct journal_point second)2185 static inline bool journal_points_equal(struct journal_point first,
2186 struct journal_point second)
2187 {
2188 return ((first.sequence_number == second.sequence_number) &&
2189 (first.entry_count == second.entry_count));
2190 }
2191
2192 /**
2193 * unpack_reference_block() - Unpack reference counts blocks into the internal memory structure.
2194 * @packed: The written reference block to be unpacked.
2195 * @block: The internal reference block to be loaded.
2196 */
unpack_reference_block(struct packed_reference_block * packed,struct reference_block * block)2197 static void unpack_reference_block(struct packed_reference_block *packed,
2198 struct reference_block *block)
2199 {
2200 block_count_t index;
2201 sector_count_t i;
2202 struct vdo_slab *slab = block->slab;
2203 vdo_refcount_t *counters = get_reference_counters_for_block(block);
2204
2205 for (i = 0; i < VDO_SECTORS_PER_BLOCK; i++) {
2206 struct packed_reference_sector *sector = &packed->sectors[i];
2207
2208 vdo_unpack_journal_point(§or->commit_point, &block->commit_points[i]);
2209 memcpy(counters + (i * COUNTS_PER_SECTOR), sector->counts,
2210 (sizeof(vdo_refcount_t) * COUNTS_PER_SECTOR));
2211 /* The slab_journal_point must be the latest point found in any sector. */
2212 if (vdo_before_journal_point(&slab->slab_journal_point,
2213 &block->commit_points[i]))
2214 slab->slab_journal_point = block->commit_points[i];
2215
2216 if ((i > 0) &&
2217 !journal_points_equal(block->commit_points[0],
2218 block->commit_points[i])) {
2219 size_t block_index = block - block->slab->reference_blocks;
2220
2221 vdo_log_warning("Torn write detected in sector %u of reference block %zu of slab %u",
2222 i, block_index, block->slab->slab_number);
2223 }
2224 }
2225
2226 block->allocated_count = 0;
2227 for (index = 0; index < COUNTS_PER_BLOCK; index++) {
2228 if (counters[index] != EMPTY_REFERENCE_COUNT)
2229 block->allocated_count++;
2230 }
2231 }
2232
2233 /**
2234 * finish_reference_block_load() - After a reference block has been read, unpack it.
2235 * @completion: The VIO that just finished reading.
2236 */
finish_reference_block_load(struct vdo_completion * completion)2237 static void finish_reference_block_load(struct vdo_completion *completion)
2238 {
2239 struct vio *vio = as_vio(completion);
2240 struct pooled_vio *pooled = vio_as_pooled_vio(vio);
2241 struct reference_block *block = completion->parent;
2242 struct vdo_slab *slab = block->slab;
2243
2244 unpack_reference_block((struct packed_reference_block *) vio->data, block);
2245 return_vio_to_pool(slab->allocator->vio_pool, pooled);
2246 slab->active_count--;
2247 clear_provisional_references(block);
2248
2249 slab->free_blocks -= block->allocated_count;
2250 check_if_slab_drained(slab);
2251 }
2252
load_reference_block_endio(struct bio * bio)2253 static void load_reference_block_endio(struct bio *bio)
2254 {
2255 struct vio *vio = bio->bi_private;
2256 struct reference_block *block = vio->completion.parent;
2257
2258 continue_vio_after_io(vio, finish_reference_block_load,
2259 block->slab->allocator->thread_id);
2260 }
2261
2262 /**
2263 * load_reference_block() - After a block waiter has gotten a VIO from the VIO pool, load the
2264 * block.
2265 * @waiter: The waiter of the block to load.
2266 * @context: The VIO returned by the pool.
2267 */
load_reference_block(struct vdo_waiter * waiter,void * context)2268 static void load_reference_block(struct vdo_waiter *waiter, void *context)
2269 {
2270 struct pooled_vio *pooled = context;
2271 struct vio *vio = &pooled->vio;
2272 struct reference_block *block =
2273 container_of(waiter, struct reference_block, waiter);
2274 size_t block_offset = (block - block->slab->reference_blocks);
2275
2276 vio->completion.parent = block;
2277 vdo_submit_metadata_vio(vio, block->slab->ref_counts_origin + block_offset,
2278 load_reference_block_endio, handle_io_error,
2279 REQ_OP_READ);
2280 }
2281
2282 /**
2283 * load_reference_blocks() - Load a slab's reference blocks from the underlying storage into a
2284 * pre-allocated reference counter.
2285 */
load_reference_blocks(struct vdo_slab * slab)2286 static void load_reference_blocks(struct vdo_slab *slab)
2287 {
2288 block_count_t i;
2289
2290 slab->free_blocks = slab->block_count;
2291 slab->active_count = slab->reference_block_count;
2292 for (i = 0; i < slab->reference_block_count; i++) {
2293 struct vdo_waiter *waiter = &slab->reference_blocks[i].waiter;
2294
2295 waiter->callback = load_reference_block;
2296 acquire_vio_from_pool(slab->allocator->vio_pool, waiter);
2297 }
2298 }
2299
2300 /**
2301 * drain_slab() - Drain all reference count I/O.
2302 *
2303 * Depending upon the type of drain being performed (as recorded in the ref_count's vdo_slab), the
2304 * reference blocks may be loaded from disk or dirty reference blocks may be written out.
2305 */
drain_slab(struct vdo_slab * slab)2306 static void drain_slab(struct vdo_slab *slab)
2307 {
2308 bool save;
2309 bool load;
2310 const struct admin_state_code *state = vdo_get_admin_state_code(&slab->state);
2311
2312 if (state == VDO_ADMIN_STATE_SUSPENDING)
2313 return;
2314
2315 if ((state != VDO_ADMIN_STATE_REBUILDING) &&
2316 (state != VDO_ADMIN_STATE_SAVE_FOR_SCRUBBING))
2317 commit_tail(&slab->journal);
2318
2319 if ((state == VDO_ADMIN_STATE_RECOVERING) || (slab->counters == NULL))
2320 return;
2321
2322 save = false;
2323 load = slab->allocator->summary_entries[slab->slab_number].load_ref_counts;
2324 if (state == VDO_ADMIN_STATE_SCRUBBING) {
2325 if (load) {
2326 load_reference_blocks(slab);
2327 return;
2328 }
2329 } else if (state == VDO_ADMIN_STATE_SAVE_FOR_SCRUBBING) {
2330 if (!load) {
2331 /* These reference counts were never written, so mark them all dirty. */
2332 dirty_all_reference_blocks(slab);
2333 }
2334 save = true;
2335 } else if (state == VDO_ADMIN_STATE_REBUILDING) {
2336 /*
2337 * Write out the counters if the slab has written them before, or it has any
2338 * non-zero reference counts, or there are any slab journal blocks.
2339 */
2340 block_count_t data_blocks = slab->allocator->depot->slab_config.data_blocks;
2341
2342 if (load || (slab->free_blocks != data_blocks) ||
2343 !is_slab_journal_blank(slab)) {
2344 dirty_all_reference_blocks(slab);
2345 save = true;
2346 }
2347 } else if (state == VDO_ADMIN_STATE_SAVING) {
2348 save = (slab->status == VDO_SLAB_REBUILT);
2349 } else {
2350 vdo_finish_draining_with_result(&slab->state, VDO_SUCCESS);
2351 return;
2352 }
2353
2354 if (save)
2355 save_dirty_reference_blocks(slab);
2356 }
2357
allocate_slab_counters(struct vdo_slab * slab)2358 static int allocate_slab_counters(struct vdo_slab *slab)
2359 {
2360 int result;
2361 size_t index, bytes;
2362
2363 result = VDO_ASSERT(slab->reference_blocks == NULL,
2364 "vdo_slab %u doesn't allocate refcounts twice",
2365 slab->slab_number);
2366 if (result != VDO_SUCCESS)
2367 return result;
2368
2369 result = vdo_allocate(slab->reference_block_count, struct reference_block,
2370 __func__, &slab->reference_blocks);
2371 if (result != VDO_SUCCESS)
2372 return result;
2373
2374 /*
2375 * Allocate such that the runt slab has a full-length memory array, plus a little padding
2376 * so we can word-search even at the very end.
2377 */
2378 bytes = (slab->reference_block_count * COUNTS_PER_BLOCK) + (2 * BYTES_PER_WORD);
2379 result = vdo_allocate(bytes, vdo_refcount_t, "ref counts array",
2380 &slab->counters);
2381 if (result != VDO_SUCCESS) {
2382 vdo_free(vdo_forget(slab->reference_blocks));
2383 return result;
2384 }
2385
2386 slab->search_cursor.first_block = slab->reference_blocks;
2387 slab->search_cursor.last_block = &slab->reference_blocks[slab->reference_block_count - 1];
2388 reset_search_cursor(slab);
2389
2390 for (index = 0; index < slab->reference_block_count; index++) {
2391 slab->reference_blocks[index] = (struct reference_block) {
2392 .slab = slab,
2393 };
2394 }
2395
2396 return VDO_SUCCESS;
2397 }
2398
allocate_counters_if_clean(struct vdo_slab * slab)2399 static int allocate_counters_if_clean(struct vdo_slab *slab)
2400 {
2401 if (vdo_is_state_clean_load(&slab->state))
2402 return allocate_slab_counters(slab);
2403
2404 return VDO_SUCCESS;
2405 }
2406
finish_loading_journal(struct vdo_completion * completion)2407 static void finish_loading_journal(struct vdo_completion *completion)
2408 {
2409 struct vio *vio = as_vio(completion);
2410 struct slab_journal *journal = completion->parent;
2411 struct vdo_slab *slab = journal->slab;
2412 struct packed_slab_journal_block *block = (struct packed_slab_journal_block *) vio->data;
2413 struct slab_journal_block_header header;
2414
2415 vdo_unpack_slab_journal_block_header(&block->header, &header);
2416
2417 /* FIXME: should it be an error if the following conditional fails? */
2418 if ((header.metadata_type == VDO_METADATA_SLAB_JOURNAL) &&
2419 (header.nonce == slab->allocator->nonce)) {
2420 journal->tail = header.sequence_number + 1;
2421
2422 /*
2423 * If the slab is clean, this implies the slab journal is empty, so advance the
2424 * head appropriately.
2425 */
2426 journal->head = (slab->allocator->summary_entries[slab->slab_number].is_dirty ?
2427 header.head : journal->tail);
2428 journal->tail_header = header;
2429 initialize_journal_state(journal);
2430 }
2431
2432 return_vio_to_pool(slab->allocator->vio_pool, vio_as_pooled_vio(vio));
2433 vdo_finish_loading_with_result(&slab->state, allocate_counters_if_clean(slab));
2434 }
2435
read_slab_journal_tail_endio(struct bio * bio)2436 static void read_slab_journal_tail_endio(struct bio *bio)
2437 {
2438 struct vio *vio = bio->bi_private;
2439 struct slab_journal *journal = vio->completion.parent;
2440
2441 continue_vio_after_io(vio, finish_loading_journal,
2442 journal->slab->allocator->thread_id);
2443 }
2444
handle_load_error(struct vdo_completion * completion)2445 static void handle_load_error(struct vdo_completion *completion)
2446 {
2447 int result = completion->result;
2448 struct slab_journal *journal = completion->parent;
2449 struct vio *vio = as_vio(completion);
2450
2451 vio_record_metadata_io_error(vio);
2452 return_vio_to_pool(journal->slab->allocator->vio_pool, vio_as_pooled_vio(vio));
2453 vdo_finish_loading_with_result(&journal->slab->state, result);
2454 }
2455
2456 /**
2457 * read_slab_journal_tail() - Read the slab journal tail block by using a vio acquired from the vio
2458 * pool.
2459 * @waiter: The vio pool waiter which has just been notified.
2460 * @context: The vio pool entry given to the waiter.
2461 *
2462 * This is the success callback from acquire_vio_from_pool() when loading a slab journal.
2463 */
read_slab_journal_tail(struct vdo_waiter * waiter,void * context)2464 static void read_slab_journal_tail(struct vdo_waiter *waiter, void *context)
2465 {
2466 struct slab_journal *journal =
2467 container_of(waiter, struct slab_journal, resource_waiter);
2468 struct vdo_slab *slab = journal->slab;
2469 struct pooled_vio *pooled = context;
2470 struct vio *vio = &pooled->vio;
2471 tail_block_offset_t last_commit_point =
2472 slab->allocator->summary_entries[slab->slab_number].tail_block_offset;
2473
2474 /*
2475 * Slab summary keeps the commit point offset, so the tail block is the block before that.
2476 * Calculation supports small journals in unit tests.
2477 */
2478 tail_block_offset_t tail_block = ((last_commit_point == 0) ?
2479 (tail_block_offset_t)(journal->size - 1) :
2480 (last_commit_point - 1));
2481
2482 vio->completion.parent = journal;
2483 vio->completion.callback_thread_id = slab->allocator->thread_id;
2484 vdo_submit_metadata_vio(vio, slab->journal_origin + tail_block,
2485 read_slab_journal_tail_endio, handle_load_error,
2486 REQ_OP_READ);
2487 }
2488
2489 /**
2490 * load_slab_journal() - Load a slab's journal by reading the journal's tail.
2491 */
load_slab_journal(struct vdo_slab * slab)2492 static void load_slab_journal(struct vdo_slab *slab)
2493 {
2494 struct slab_journal *journal = &slab->journal;
2495 tail_block_offset_t last_commit_point;
2496
2497 last_commit_point = slab->allocator->summary_entries[slab->slab_number].tail_block_offset;
2498 if ((last_commit_point == 0) &&
2499 !slab->allocator->summary_entries[slab->slab_number].load_ref_counts) {
2500 /*
2501 * This slab claims that it has a tail block at (journal->size - 1), but a head of
2502 * 1. This is impossible, due to the scrubbing threshold, on a real system, so
2503 * don't bother reading the (bogus) data off disk.
2504 */
2505 VDO_ASSERT_LOG_ONLY(((journal->size < 16) ||
2506 (journal->scrubbing_threshold < (journal->size - 1))),
2507 "Scrubbing threshold protects against reads of unwritten slab journal blocks");
2508 vdo_finish_loading_with_result(&slab->state,
2509 allocate_counters_if_clean(slab));
2510 return;
2511 }
2512
2513 journal->resource_waiter.callback = read_slab_journal_tail;
2514 acquire_vio_from_pool(slab->allocator->vio_pool, &journal->resource_waiter);
2515 }
2516
register_slab_for_scrubbing(struct vdo_slab * slab,bool high_priority)2517 static void register_slab_for_scrubbing(struct vdo_slab *slab, bool high_priority)
2518 {
2519 struct slab_scrubber *scrubber = &slab->allocator->scrubber;
2520
2521 VDO_ASSERT_LOG_ONLY((slab->status != VDO_SLAB_REBUILT),
2522 "slab to be scrubbed is unrecovered");
2523
2524 if (slab->status != VDO_SLAB_REQUIRES_SCRUBBING)
2525 return;
2526
2527 list_del_init(&slab->allocq_entry);
2528 if (!slab->was_queued_for_scrubbing) {
2529 WRITE_ONCE(scrubber->slab_count, scrubber->slab_count + 1);
2530 slab->was_queued_for_scrubbing = true;
2531 }
2532
2533 if (high_priority) {
2534 slab->status = VDO_SLAB_REQUIRES_HIGH_PRIORITY_SCRUBBING;
2535 list_add_tail(&slab->allocq_entry, &scrubber->high_priority_slabs);
2536 return;
2537 }
2538
2539 list_add_tail(&slab->allocq_entry, &scrubber->slabs);
2540 }
2541
2542 /* Queue a slab for allocation or scrubbing. */
queue_slab(struct vdo_slab * slab)2543 static void queue_slab(struct vdo_slab *slab)
2544 {
2545 struct block_allocator *allocator = slab->allocator;
2546 block_count_t free_blocks;
2547 int result;
2548
2549 VDO_ASSERT_LOG_ONLY(list_empty(&slab->allocq_entry),
2550 "a requeued slab must not already be on a ring");
2551
2552 if (vdo_is_read_only(allocator->depot->vdo))
2553 return;
2554
2555 free_blocks = slab->free_blocks;
2556 result = VDO_ASSERT((free_blocks <= allocator->depot->slab_config.data_blocks),
2557 "rebuilt slab %u must have a valid free block count (has %llu, expected maximum %llu)",
2558 slab->slab_number, (unsigned long long) free_blocks,
2559 (unsigned long long) allocator->depot->slab_config.data_blocks);
2560 if (result != VDO_SUCCESS) {
2561 vdo_enter_read_only_mode(allocator->depot->vdo, result);
2562 return;
2563 }
2564
2565 if (slab->status != VDO_SLAB_REBUILT) {
2566 register_slab_for_scrubbing(slab, false);
2567 return;
2568 }
2569
2570 if (!vdo_is_state_resuming(&slab->state)) {
2571 /*
2572 * If the slab is resuming, we've already accounted for it here, so don't do it
2573 * again.
2574 * FIXME: under what situation would the slab be resuming here?
2575 */
2576 WRITE_ONCE(allocator->allocated_blocks,
2577 allocator->allocated_blocks - free_blocks);
2578 if (!is_slab_journal_blank(slab)) {
2579 WRITE_ONCE(allocator->statistics.slabs_opened,
2580 allocator->statistics.slabs_opened + 1);
2581 }
2582 }
2583
2584 if (allocator->depot->vdo->suspend_type == VDO_ADMIN_STATE_SAVING)
2585 reopen_slab_journal(slab);
2586
2587 prioritize_slab(slab);
2588 }
2589
2590 /**
2591 * initiate_slab_action() - Initiate a slab action.
2592 *
2593 * Implements vdo_admin_initiator_fn.
2594 */
initiate_slab_action(struct admin_state * state)2595 static void initiate_slab_action(struct admin_state *state)
2596 {
2597 struct vdo_slab *slab = container_of(state, struct vdo_slab, state);
2598
2599 if (vdo_is_state_draining(state)) {
2600 const struct admin_state_code *operation = vdo_get_admin_state_code(state);
2601
2602 if (operation == VDO_ADMIN_STATE_SCRUBBING)
2603 slab->status = VDO_SLAB_REBUILDING;
2604
2605 drain_slab(slab);
2606 check_if_slab_drained(slab);
2607 return;
2608 }
2609
2610 if (vdo_is_state_loading(state)) {
2611 load_slab_journal(slab);
2612 return;
2613 }
2614
2615 if (vdo_is_state_resuming(state)) {
2616 queue_slab(slab);
2617 vdo_finish_resuming(state);
2618 return;
2619 }
2620
2621 vdo_finish_operation(state, VDO_INVALID_ADMIN_STATE);
2622 }
2623
2624 /**
2625 * get_next_slab() - Get the next slab to scrub.
2626 * @scrubber: The slab scrubber.
2627 *
2628 * Return: The next slab to scrub or NULL if there are none.
2629 */
get_next_slab(struct slab_scrubber * scrubber)2630 static struct vdo_slab *get_next_slab(struct slab_scrubber *scrubber)
2631 {
2632 struct vdo_slab *slab;
2633
2634 slab = list_first_entry_or_null(&scrubber->high_priority_slabs,
2635 struct vdo_slab, allocq_entry);
2636 if (slab != NULL)
2637 return slab;
2638
2639 return list_first_entry_or_null(&scrubber->slabs, struct vdo_slab,
2640 allocq_entry);
2641 }
2642
2643 /**
2644 * has_slabs_to_scrub() - Check whether a scrubber has slabs to scrub.
2645 * @scrubber: The scrubber to check.
2646 *
2647 * Return: true if the scrubber has slabs to scrub.
2648 */
has_slabs_to_scrub(struct slab_scrubber * scrubber)2649 static inline bool __must_check has_slabs_to_scrub(struct slab_scrubber *scrubber)
2650 {
2651 return (get_next_slab(scrubber) != NULL);
2652 }
2653
2654 /**
2655 * uninitialize_scrubber_vio() - Clean up the slab_scrubber's vio.
2656 * @scrubber: The scrubber.
2657 */
uninitialize_scrubber_vio(struct slab_scrubber * scrubber)2658 static void uninitialize_scrubber_vio(struct slab_scrubber *scrubber)
2659 {
2660 vdo_free(vdo_forget(scrubber->vio.data));
2661 free_vio_components(&scrubber->vio);
2662 }
2663
2664 /**
2665 * finish_scrubbing() - Stop scrubbing, either because there are no more slabs to scrub or because
2666 * there's been an error.
2667 * @scrubber: The scrubber.
2668 */
finish_scrubbing(struct slab_scrubber * scrubber,int result)2669 static void finish_scrubbing(struct slab_scrubber *scrubber, int result)
2670 {
2671 bool notify = vdo_waitq_has_waiters(&scrubber->waiters);
2672 bool done = !has_slabs_to_scrub(scrubber);
2673 struct block_allocator *allocator =
2674 container_of(scrubber, struct block_allocator, scrubber);
2675
2676 if (done)
2677 uninitialize_scrubber_vio(scrubber);
2678
2679 if (scrubber->high_priority_only) {
2680 scrubber->high_priority_only = false;
2681 vdo_fail_completion(vdo_forget(scrubber->vio.completion.parent), result);
2682 } else if (done && (atomic_add_return(-1, &allocator->depot->zones_to_scrub) == 0)) {
2683 /* All of our slabs were scrubbed, and we're the last allocator to finish. */
2684 enum vdo_state prior_state =
2685 atomic_cmpxchg(&allocator->depot->vdo->state, VDO_RECOVERING,
2686 VDO_DIRTY);
2687
2688 /*
2689 * To be safe, even if the CAS failed, ensure anything that follows is ordered with
2690 * respect to whatever state change did happen.
2691 */
2692 smp_mb__after_atomic();
2693
2694 /*
2695 * We must check the VDO state here and not the depot's read_only_notifier since
2696 * the compare-swap-above could have failed due to a read-only entry which our own
2697 * thread does not yet know about.
2698 */
2699 if (prior_state == VDO_DIRTY)
2700 vdo_log_info("VDO commencing normal operation");
2701 else if (prior_state == VDO_RECOVERING)
2702 vdo_log_info("Exiting recovery mode");
2703 }
2704
2705 /*
2706 * Note that the scrubber has stopped, and inform anyone who might be waiting for that to
2707 * happen.
2708 */
2709 if (!vdo_finish_draining(&scrubber->admin_state))
2710 WRITE_ONCE(scrubber->admin_state.current_state,
2711 VDO_ADMIN_STATE_SUSPENDED);
2712
2713 /*
2714 * We can't notify waiters until after we've finished draining or they'll just requeue.
2715 * Fortunately if there were waiters, we can't have been freed yet.
2716 */
2717 if (notify)
2718 vdo_waitq_notify_all_waiters(&scrubber->waiters, NULL, NULL);
2719 }
2720
2721 static void scrub_next_slab(struct slab_scrubber *scrubber);
2722
2723 /**
2724 * slab_scrubbed() - Notify the scrubber that a slab has been scrubbed.
2725 * @completion: The slab rebuild completion.
2726 *
2727 * This callback is registered in apply_journal_entries().
2728 */
slab_scrubbed(struct vdo_completion * completion)2729 static void slab_scrubbed(struct vdo_completion *completion)
2730 {
2731 struct slab_scrubber *scrubber =
2732 container_of(as_vio(completion), struct slab_scrubber, vio);
2733 struct vdo_slab *slab = scrubber->slab;
2734
2735 slab->status = VDO_SLAB_REBUILT;
2736 queue_slab(slab);
2737 reopen_slab_journal(slab);
2738 WRITE_ONCE(scrubber->slab_count, scrubber->slab_count - 1);
2739 scrub_next_slab(scrubber);
2740 }
2741
2742 /**
2743 * abort_scrubbing() - Abort scrubbing due to an error.
2744 * @scrubber: The slab scrubber.
2745 * @result: The error.
2746 */
abort_scrubbing(struct slab_scrubber * scrubber,int result)2747 static void abort_scrubbing(struct slab_scrubber *scrubber, int result)
2748 {
2749 vdo_enter_read_only_mode(scrubber->vio.completion.vdo, result);
2750 finish_scrubbing(scrubber, result);
2751 }
2752
2753 /**
2754 * handle_scrubber_error() - Handle errors while rebuilding a slab.
2755 * @completion: The slab rebuild completion.
2756 */
handle_scrubber_error(struct vdo_completion * completion)2757 static void handle_scrubber_error(struct vdo_completion *completion)
2758 {
2759 struct vio *vio = as_vio(completion);
2760
2761 vio_record_metadata_io_error(vio);
2762 abort_scrubbing(container_of(vio, struct slab_scrubber, vio),
2763 completion->result);
2764 }
2765
2766 /**
2767 * apply_block_entries() - Apply all the entries in a block to the reference counts.
2768 * @block: A block with entries to apply.
2769 * @entry_count: The number of entries to apply.
2770 * @block_number: The sequence number of the block.
2771 * @slab: The slab to apply the entries to.
2772 *
2773 * Return: VDO_SUCCESS or an error code.
2774 */
apply_block_entries(struct packed_slab_journal_block * block,journal_entry_count_t entry_count,sequence_number_t block_number,struct vdo_slab * slab)2775 static int apply_block_entries(struct packed_slab_journal_block *block,
2776 journal_entry_count_t entry_count,
2777 sequence_number_t block_number, struct vdo_slab *slab)
2778 {
2779 struct journal_point entry_point = {
2780 .sequence_number = block_number,
2781 .entry_count = 0,
2782 };
2783 int result;
2784 slab_block_number max_sbn = slab->end - slab->start;
2785
2786 while (entry_point.entry_count < entry_count) {
2787 struct slab_journal_entry entry =
2788 vdo_decode_slab_journal_entry(block, entry_point.entry_count);
2789
2790 if (entry.sbn > max_sbn) {
2791 /* This entry is out of bounds. */
2792 return vdo_log_error_strerror(VDO_CORRUPT_JOURNAL,
2793 "vdo_slab journal entry (%llu, %u) had invalid offset %u in slab (size %u blocks)",
2794 (unsigned long long) block_number,
2795 entry_point.entry_count,
2796 entry.sbn, max_sbn);
2797 }
2798
2799 result = replay_reference_count_change(slab, &entry_point, entry);
2800 if (result != VDO_SUCCESS) {
2801 vdo_log_error_strerror(result,
2802 "vdo_slab journal entry (%llu, %u) (%s of offset %u) could not be applied in slab %u",
2803 (unsigned long long) block_number,
2804 entry_point.entry_count,
2805 vdo_get_journal_operation_name(entry.operation),
2806 entry.sbn, slab->slab_number);
2807 return result;
2808 }
2809 entry_point.entry_count++;
2810 }
2811
2812 return VDO_SUCCESS;
2813 }
2814
2815 /**
2816 * apply_journal_entries() - Find the relevant vio of the slab journal and apply all valid entries.
2817 * @completion: The metadata read vio completion.
2818 *
2819 * This is a callback registered in start_scrubbing().
2820 */
apply_journal_entries(struct vdo_completion * completion)2821 static void apply_journal_entries(struct vdo_completion *completion)
2822 {
2823 int result;
2824 struct slab_scrubber *scrubber =
2825 container_of(as_vio(completion), struct slab_scrubber, vio);
2826 struct vdo_slab *slab = scrubber->slab;
2827 struct slab_journal *journal = &slab->journal;
2828
2829 /* Find the boundaries of the useful part of the journal. */
2830 sequence_number_t tail = journal->tail;
2831 tail_block_offset_t end_index = (tail - 1) % journal->size;
2832 char *end_data = scrubber->vio.data + (end_index * VDO_BLOCK_SIZE);
2833 struct packed_slab_journal_block *end_block =
2834 (struct packed_slab_journal_block *) end_data;
2835
2836 sequence_number_t head = __le64_to_cpu(end_block->header.head);
2837 tail_block_offset_t head_index = head % journal->size;
2838 block_count_t index = head_index;
2839
2840 struct journal_point ref_counts_point = slab->slab_journal_point;
2841 struct journal_point last_entry_applied = ref_counts_point;
2842 sequence_number_t sequence;
2843
2844 for (sequence = head; sequence < tail; sequence++) {
2845 char *block_data = scrubber->vio.data + (index * VDO_BLOCK_SIZE);
2846 struct packed_slab_journal_block *block =
2847 (struct packed_slab_journal_block *) block_data;
2848 struct slab_journal_block_header header;
2849
2850 vdo_unpack_slab_journal_block_header(&block->header, &header);
2851
2852 if ((header.nonce != slab->allocator->nonce) ||
2853 (header.metadata_type != VDO_METADATA_SLAB_JOURNAL) ||
2854 (header.sequence_number != sequence) ||
2855 (header.entry_count > journal->entries_per_block) ||
2856 (header.has_block_map_increments &&
2857 (header.entry_count > journal->full_entries_per_block))) {
2858 /* The block is not what we expect it to be. */
2859 vdo_log_error("vdo_slab journal block for slab %u was invalid",
2860 slab->slab_number);
2861 abort_scrubbing(scrubber, VDO_CORRUPT_JOURNAL);
2862 return;
2863 }
2864
2865 result = apply_block_entries(block, header.entry_count, sequence, slab);
2866 if (result != VDO_SUCCESS) {
2867 abort_scrubbing(scrubber, result);
2868 return;
2869 }
2870
2871 last_entry_applied.sequence_number = sequence;
2872 last_entry_applied.entry_count = header.entry_count - 1;
2873 index++;
2874 if (index == journal->size)
2875 index = 0;
2876 }
2877
2878 /*
2879 * At the end of rebuild, the reference counters should be accurate to the end of the
2880 * journal we just applied.
2881 */
2882 result = VDO_ASSERT(!vdo_before_journal_point(&last_entry_applied,
2883 &ref_counts_point),
2884 "Refcounts are not more accurate than the slab journal");
2885 if (result != VDO_SUCCESS) {
2886 abort_scrubbing(scrubber, result);
2887 return;
2888 }
2889
2890 /* Save out the rebuilt reference blocks. */
2891 vdo_prepare_completion(completion, slab_scrubbed, handle_scrubber_error,
2892 slab->allocator->thread_id, completion->parent);
2893 vdo_start_operation_with_waiter(&slab->state,
2894 VDO_ADMIN_STATE_SAVE_FOR_SCRUBBING,
2895 completion, initiate_slab_action);
2896 }
2897
read_slab_journal_endio(struct bio * bio)2898 static void read_slab_journal_endio(struct bio *bio)
2899 {
2900 struct vio *vio = bio->bi_private;
2901 struct slab_scrubber *scrubber = container_of(vio, struct slab_scrubber, vio);
2902
2903 continue_vio_after_io(bio->bi_private, apply_journal_entries,
2904 scrubber->slab->allocator->thread_id);
2905 }
2906
2907 /**
2908 * start_scrubbing() - Read the current slab's journal from disk now that it has been flushed.
2909 * @completion: The scrubber's vio completion.
2910 *
2911 * This callback is registered in scrub_next_slab().
2912 */
start_scrubbing(struct vdo_completion * completion)2913 static void start_scrubbing(struct vdo_completion *completion)
2914 {
2915 struct slab_scrubber *scrubber =
2916 container_of(as_vio(completion), struct slab_scrubber, vio);
2917 struct vdo_slab *slab = scrubber->slab;
2918
2919 if (!slab->allocator->summary_entries[slab->slab_number].is_dirty) {
2920 slab_scrubbed(completion);
2921 return;
2922 }
2923
2924 vdo_submit_metadata_vio(&scrubber->vio, slab->journal_origin,
2925 read_slab_journal_endio, handle_scrubber_error,
2926 REQ_OP_READ);
2927 }
2928
2929 /**
2930 * scrub_next_slab() - Scrub the next slab if there is one.
2931 * @scrubber: The scrubber.
2932 */
scrub_next_slab(struct slab_scrubber * scrubber)2933 static void scrub_next_slab(struct slab_scrubber *scrubber)
2934 {
2935 struct vdo_completion *completion = &scrubber->vio.completion;
2936 struct vdo_slab *slab;
2937
2938 /*
2939 * Note: this notify call is always safe only because scrubbing can only be started when
2940 * the VDO is quiescent.
2941 */
2942 vdo_waitq_notify_all_waiters(&scrubber->waiters, NULL, NULL);
2943
2944 if (vdo_is_read_only(completion->vdo)) {
2945 finish_scrubbing(scrubber, VDO_READ_ONLY);
2946 return;
2947 }
2948
2949 slab = get_next_slab(scrubber);
2950 if ((slab == NULL) ||
2951 (scrubber->high_priority_only && list_empty(&scrubber->high_priority_slabs))) {
2952 finish_scrubbing(scrubber, VDO_SUCCESS);
2953 return;
2954 }
2955
2956 if (vdo_finish_draining(&scrubber->admin_state))
2957 return;
2958
2959 list_del_init(&slab->allocq_entry);
2960 scrubber->slab = slab;
2961 vdo_prepare_completion(completion, start_scrubbing, handle_scrubber_error,
2962 slab->allocator->thread_id, completion->parent);
2963 vdo_start_operation_with_waiter(&slab->state, VDO_ADMIN_STATE_SCRUBBING,
2964 completion, initiate_slab_action);
2965 }
2966
2967 /**
2968 * scrub_slabs() - Scrub all of an allocator's slabs that are eligible for scrubbing.
2969 * @allocator: The block_allocator to scrub.
2970 * @parent: The completion to notify when scrubbing is done, implies high_priority, may be NULL.
2971 */
scrub_slabs(struct block_allocator * allocator,struct vdo_completion * parent)2972 static void scrub_slabs(struct block_allocator *allocator, struct vdo_completion *parent)
2973 {
2974 struct slab_scrubber *scrubber = &allocator->scrubber;
2975
2976 scrubber->vio.completion.parent = parent;
2977 scrubber->high_priority_only = (parent != NULL);
2978 if (!has_slabs_to_scrub(scrubber)) {
2979 finish_scrubbing(scrubber, VDO_SUCCESS);
2980 return;
2981 }
2982
2983 if (scrubber->high_priority_only &&
2984 vdo_is_priority_table_empty(allocator->prioritized_slabs) &&
2985 list_empty(&scrubber->high_priority_slabs))
2986 register_slab_for_scrubbing(get_next_slab(scrubber), true);
2987
2988 vdo_resume_if_quiescent(&scrubber->admin_state);
2989 scrub_next_slab(scrubber);
2990 }
2991
assert_on_allocator_thread(thread_id_t thread_id,const char * function_name)2992 static inline void assert_on_allocator_thread(thread_id_t thread_id,
2993 const char *function_name)
2994 {
2995 VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == thread_id),
2996 "%s called on correct thread", function_name);
2997 }
2998
register_slab_with_allocator(struct block_allocator * allocator,struct vdo_slab * slab)2999 static void register_slab_with_allocator(struct block_allocator *allocator,
3000 struct vdo_slab *slab)
3001 {
3002 allocator->slab_count++;
3003 allocator->last_slab = slab->slab_number;
3004 }
3005
3006 /**
3007 * get_depot_slab_iterator() - Return a slab_iterator over the slabs in a slab_depot.
3008 * @depot: The depot over which to iterate.
3009 * @start: The number of the slab to start iterating from.
3010 * @end: The number of the last slab which may be returned.
3011 * @stride: The difference in slab number between successive slabs.
3012 *
3013 * Iteration always occurs from higher to lower numbered slabs.
3014 *
3015 * Return: An initialized iterator structure.
3016 */
get_depot_slab_iterator(struct slab_depot * depot,slab_count_t start,slab_count_t end,slab_count_t stride)3017 static struct slab_iterator get_depot_slab_iterator(struct slab_depot *depot,
3018 slab_count_t start, slab_count_t end,
3019 slab_count_t stride)
3020 {
3021 struct vdo_slab **slabs = depot->slabs;
3022
3023 return (struct slab_iterator) {
3024 .slabs = slabs,
3025 .next = (((slabs == NULL) || (start < end)) ? NULL : slabs[start]),
3026 .end = end,
3027 .stride = stride,
3028 };
3029 }
3030
get_slab_iterator(const struct block_allocator * allocator)3031 static struct slab_iterator get_slab_iterator(const struct block_allocator *allocator)
3032 {
3033 return get_depot_slab_iterator(allocator->depot, allocator->last_slab,
3034 allocator->zone_number,
3035 allocator->depot->zone_count);
3036 }
3037
3038 /**
3039 * next_slab() - Get the next slab from a slab_iterator and advance the iterator
3040 * @iterator: The slab_iterator.
3041 *
3042 * Return: The next slab or NULL if the iterator is exhausted.
3043 */
next_slab(struct slab_iterator * iterator)3044 static struct vdo_slab *next_slab(struct slab_iterator *iterator)
3045 {
3046 struct vdo_slab *slab = iterator->next;
3047
3048 if ((slab == NULL) || (slab->slab_number < iterator->end + iterator->stride))
3049 iterator->next = NULL;
3050 else
3051 iterator->next = iterator->slabs[slab->slab_number - iterator->stride];
3052
3053 return slab;
3054 }
3055
3056 /**
3057 * abort_waiter() - Abort vios waiting to make journal entries when read-only.
3058 *
3059 * This callback is invoked on all vios waiting to make slab journal entries after the VDO has gone
3060 * into read-only mode. Implements waiter_callback_fn.
3061 */
abort_waiter(struct vdo_waiter * waiter,void * context __always_unused)3062 static void abort_waiter(struct vdo_waiter *waiter, void *context __always_unused)
3063 {
3064 struct reference_updater *updater =
3065 container_of(waiter, struct reference_updater, waiter);
3066 struct data_vio *data_vio = data_vio_from_reference_updater(updater);
3067
3068 if (updater->increment) {
3069 continue_data_vio_with_error(data_vio, VDO_READ_ONLY);
3070 return;
3071 }
3072
3073 vdo_continue_completion(&data_vio->decrement_completion, VDO_READ_ONLY);
3074 }
3075
3076 /* Implements vdo_read_only_notification_fn. */
notify_block_allocator_of_read_only_mode(void * listener,struct vdo_completion * parent)3077 static void notify_block_allocator_of_read_only_mode(void *listener,
3078 struct vdo_completion *parent)
3079 {
3080 struct block_allocator *allocator = listener;
3081 struct slab_iterator iterator;
3082
3083 assert_on_allocator_thread(allocator->thread_id, __func__);
3084 iterator = get_slab_iterator(allocator);
3085 while (iterator.next != NULL) {
3086 struct vdo_slab *slab = next_slab(&iterator);
3087
3088 vdo_waitq_notify_all_waiters(&slab->journal.entry_waiters,
3089 abort_waiter, &slab->journal);
3090 check_if_slab_drained(slab);
3091 }
3092
3093 vdo_finish_completion(parent);
3094 }
3095
3096 /**
3097 * vdo_acquire_provisional_reference() - Acquire a provisional reference on behalf of a PBN lock if
3098 * the block it locks is unreferenced.
3099 * @slab: The slab which contains the block.
3100 * @pbn: The physical block to reference.
3101 * @lock: The lock.
3102 *
3103 * Return: VDO_SUCCESS or an error.
3104 */
vdo_acquire_provisional_reference(struct vdo_slab * slab,physical_block_number_t pbn,struct pbn_lock * lock)3105 int vdo_acquire_provisional_reference(struct vdo_slab *slab, physical_block_number_t pbn,
3106 struct pbn_lock *lock)
3107 {
3108 slab_block_number block_number;
3109 int result;
3110
3111 if (vdo_pbn_lock_has_provisional_reference(lock))
3112 return VDO_SUCCESS;
3113
3114 if (!is_slab_open(slab))
3115 return VDO_INVALID_ADMIN_STATE;
3116
3117 result = slab_block_number_from_pbn(slab, pbn, &block_number);
3118 if (result != VDO_SUCCESS)
3119 return result;
3120
3121 if (slab->counters[block_number] == EMPTY_REFERENCE_COUNT) {
3122 make_provisional_reference(slab, block_number);
3123 if (lock != NULL)
3124 vdo_assign_pbn_lock_provisional_reference(lock);
3125 }
3126
3127 if (vdo_pbn_lock_has_provisional_reference(lock))
3128 adjust_free_block_count(slab, false);
3129
3130 return VDO_SUCCESS;
3131 }
3132
allocate_slab_block(struct vdo_slab * slab,physical_block_number_t * block_number_ptr)3133 static int __must_check allocate_slab_block(struct vdo_slab *slab,
3134 physical_block_number_t *block_number_ptr)
3135 {
3136 slab_block_number free_index;
3137
3138 if (!is_slab_open(slab))
3139 return VDO_INVALID_ADMIN_STATE;
3140
3141 if (!search_reference_blocks(slab, &free_index))
3142 return VDO_NO_SPACE;
3143
3144 VDO_ASSERT_LOG_ONLY((slab->counters[free_index] == EMPTY_REFERENCE_COUNT),
3145 "free block must have ref count of zero");
3146 make_provisional_reference(slab, free_index);
3147 adjust_free_block_count(slab, false);
3148
3149 /*
3150 * Update the search hint so the next search will start at the array index just past the
3151 * free block we just found.
3152 */
3153 slab->search_cursor.index = (free_index + 1);
3154
3155 *block_number_ptr = slab->start + free_index;
3156 return VDO_SUCCESS;
3157 }
3158
3159 /**
3160 * open_slab() - Prepare a slab to be allocated from.
3161 * @slab: The slab.
3162 */
open_slab(struct vdo_slab * slab)3163 static void open_slab(struct vdo_slab *slab)
3164 {
3165 reset_search_cursor(slab);
3166 if (is_slab_journal_blank(slab)) {
3167 WRITE_ONCE(slab->allocator->statistics.slabs_opened,
3168 slab->allocator->statistics.slabs_opened + 1);
3169 dirty_all_reference_blocks(slab);
3170 } else {
3171 WRITE_ONCE(slab->allocator->statistics.slabs_reopened,
3172 slab->allocator->statistics.slabs_reopened + 1);
3173 }
3174
3175 slab->allocator->open_slab = slab;
3176 }
3177
3178
3179 /*
3180 * The block allocated will have a provisional reference and the reference must be either confirmed
3181 * with a subsequent increment or vacated with a subsequent decrement via
3182 * vdo_release_block_reference().
3183 */
vdo_allocate_block(struct block_allocator * allocator,physical_block_number_t * block_number_ptr)3184 int vdo_allocate_block(struct block_allocator *allocator,
3185 physical_block_number_t *block_number_ptr)
3186 {
3187 int result;
3188
3189 if (allocator->open_slab != NULL) {
3190 /* Try to allocate the next block in the currently open slab. */
3191 result = allocate_slab_block(allocator->open_slab, block_number_ptr);
3192 if ((result == VDO_SUCCESS) || (result != VDO_NO_SPACE))
3193 return result;
3194
3195 /* Put the exhausted open slab back into the priority table. */
3196 prioritize_slab(allocator->open_slab);
3197 }
3198
3199 /* Remove the highest priority slab from the priority table and make it the open slab. */
3200 open_slab(list_entry(vdo_priority_table_dequeue(allocator->prioritized_slabs),
3201 struct vdo_slab, allocq_entry));
3202
3203 /*
3204 * Try allocating again. If we're out of space immediately after opening a slab, then every
3205 * slab must be fully allocated.
3206 */
3207 return allocate_slab_block(allocator->open_slab, block_number_ptr);
3208 }
3209
3210 /**
3211 * vdo_enqueue_clean_slab_waiter() - Wait for a clean slab.
3212 * @allocator: The block_allocator on which to wait.
3213 * @waiter: The waiter.
3214 *
3215 * Return: VDO_SUCCESS if the waiter was queued, VDO_NO_SPACE if there are no slabs to scrub, and
3216 * some other error otherwise.
3217 */
vdo_enqueue_clean_slab_waiter(struct block_allocator * allocator,struct vdo_waiter * waiter)3218 int vdo_enqueue_clean_slab_waiter(struct block_allocator *allocator,
3219 struct vdo_waiter *waiter)
3220 {
3221 if (vdo_is_read_only(allocator->depot->vdo))
3222 return VDO_READ_ONLY;
3223
3224 if (vdo_is_state_quiescent(&allocator->scrubber.admin_state))
3225 return VDO_NO_SPACE;
3226
3227 vdo_waitq_enqueue_waiter(&allocator->scrubber.waiters, waiter);
3228 return VDO_SUCCESS;
3229 }
3230
3231 /**
3232 * vdo_modify_reference_count() - Modify the reference count of a block by first making a slab
3233 * journal entry and then updating the reference counter.
3234 * @completion: The data_vio completion for which to add the entry.
3235 * @updater: Which of the data_vio's reference updaters is being submitted.
3236 */
vdo_modify_reference_count(struct vdo_completion * completion,struct reference_updater * updater)3237 void vdo_modify_reference_count(struct vdo_completion *completion,
3238 struct reference_updater *updater)
3239 {
3240 struct vdo_slab *slab = vdo_get_slab(completion->vdo->depot, updater->zpbn.pbn);
3241
3242 if (!is_slab_open(slab)) {
3243 vdo_continue_completion(completion, VDO_INVALID_ADMIN_STATE);
3244 return;
3245 }
3246
3247 if (vdo_is_read_only(completion->vdo)) {
3248 vdo_continue_completion(completion, VDO_READ_ONLY);
3249 return;
3250 }
3251
3252 vdo_waitq_enqueue_waiter(&slab->journal.entry_waiters, &updater->waiter);
3253 if ((slab->status != VDO_SLAB_REBUILT) && requires_reaping(&slab->journal))
3254 register_slab_for_scrubbing(slab, true);
3255
3256 add_entries(&slab->journal);
3257 }
3258
3259 /* Release an unused provisional reference. */
vdo_release_block_reference(struct block_allocator * allocator,physical_block_number_t pbn)3260 int vdo_release_block_reference(struct block_allocator *allocator,
3261 physical_block_number_t pbn)
3262 {
3263 struct reference_updater updater;
3264
3265 if (pbn == VDO_ZERO_BLOCK)
3266 return VDO_SUCCESS;
3267
3268 updater = (struct reference_updater) {
3269 .operation = VDO_JOURNAL_DATA_REMAPPING,
3270 .increment = false,
3271 .zpbn = {
3272 .pbn = pbn,
3273 },
3274 };
3275
3276 return adjust_reference_count(vdo_get_slab(allocator->depot, pbn),
3277 &updater, NULL);
3278 }
3279
3280 /*
3281 * This is a min_heap callback function orders slab_status structures using the 'is_clean' field as
3282 * the primary key and the 'emptiness' field as the secondary key.
3283 *
3284 * Slabs need to be pushed onto the rings in the same order they are to be popped off. Popping
3285 * should always get the most empty first, so pushing should be from most empty to least empty.
3286 * Thus, the ordering is reversed from the usual sense since min_heap returns smaller elements
3287 * before larger ones.
3288 */
slab_status_is_less_than(const void * item1,const void * item2,void __always_unused * args)3289 static bool slab_status_is_less_than(const void *item1, const void *item2,
3290 void __always_unused *args)
3291 {
3292 const struct slab_status *info1 = item1;
3293 const struct slab_status *info2 = item2;
3294
3295 if (info1->is_clean != info2->is_clean)
3296 return info1->is_clean;
3297 if (info1->emptiness != info2->emptiness)
3298 return info1->emptiness > info2->emptiness;
3299 return info1->slab_number < info2->slab_number;
3300 }
3301
3302 static const struct min_heap_callbacks slab_status_min_heap = {
3303 .less = slab_status_is_less_than,
3304 .swp = NULL,
3305 };
3306
3307 /* Inform the slab actor that a action has finished on some slab; used by apply_to_slabs(). */
slab_action_callback(struct vdo_completion * completion)3308 static void slab_action_callback(struct vdo_completion *completion)
3309 {
3310 struct block_allocator *allocator = vdo_as_block_allocator(completion);
3311 struct slab_actor *actor = &allocator->slab_actor;
3312
3313 if (--actor->slab_action_count == 0) {
3314 actor->callback(completion);
3315 return;
3316 }
3317
3318 vdo_reset_completion(completion);
3319 }
3320
3321 /* Preserve the error from part of an action and continue. */
handle_operation_error(struct vdo_completion * completion)3322 static void handle_operation_error(struct vdo_completion *completion)
3323 {
3324 struct block_allocator *allocator = vdo_as_block_allocator(completion);
3325
3326 if (allocator->state.waiter != NULL)
3327 vdo_set_completion_result(allocator->state.waiter, completion->result);
3328 completion->callback(completion);
3329 }
3330
3331 /* Perform an action on each of an allocator's slabs in parallel. */
apply_to_slabs(struct block_allocator * allocator,vdo_action_fn callback)3332 static void apply_to_slabs(struct block_allocator *allocator, vdo_action_fn callback)
3333 {
3334 struct slab_iterator iterator;
3335
3336 vdo_prepare_completion(&allocator->completion, slab_action_callback,
3337 handle_operation_error, allocator->thread_id, NULL);
3338 allocator->completion.requeue = false;
3339
3340 /*
3341 * Since we are going to dequeue all of the slabs, the open slab will become invalid, so
3342 * clear it.
3343 */
3344 allocator->open_slab = NULL;
3345
3346 /* Ensure that we don't finish before we're done starting. */
3347 allocator->slab_actor = (struct slab_actor) {
3348 .slab_action_count = 1,
3349 .callback = callback,
3350 };
3351
3352 iterator = get_slab_iterator(allocator);
3353 while (iterator.next != NULL) {
3354 const struct admin_state_code *operation =
3355 vdo_get_admin_state_code(&allocator->state);
3356 struct vdo_slab *slab = next_slab(&iterator);
3357
3358 list_del_init(&slab->allocq_entry);
3359 allocator->slab_actor.slab_action_count++;
3360 vdo_start_operation_with_waiter(&slab->state, operation,
3361 &allocator->completion,
3362 initiate_slab_action);
3363 }
3364
3365 slab_action_callback(&allocator->completion);
3366 }
3367
finish_loading_allocator(struct vdo_completion * completion)3368 static void finish_loading_allocator(struct vdo_completion *completion)
3369 {
3370 struct block_allocator *allocator = vdo_as_block_allocator(completion);
3371 const struct admin_state_code *operation =
3372 vdo_get_admin_state_code(&allocator->state);
3373
3374 if (allocator->eraser != NULL)
3375 dm_kcopyd_client_destroy(vdo_forget(allocator->eraser));
3376
3377 if (operation == VDO_ADMIN_STATE_LOADING_FOR_RECOVERY) {
3378 void *context =
3379 vdo_get_current_action_context(allocator->depot->action_manager);
3380
3381 vdo_replay_into_slab_journals(allocator, context);
3382 return;
3383 }
3384
3385 vdo_finish_loading(&allocator->state);
3386 }
3387
3388 static void erase_next_slab_journal(struct block_allocator *allocator);
3389
copy_callback(int read_err,unsigned long write_err,void * context)3390 static void copy_callback(int read_err, unsigned long write_err, void *context)
3391 {
3392 struct block_allocator *allocator = context;
3393 int result = (((read_err == 0) && (write_err == 0)) ? VDO_SUCCESS : -EIO);
3394
3395 if (result != VDO_SUCCESS) {
3396 vdo_fail_completion(&allocator->completion, result);
3397 return;
3398 }
3399
3400 erase_next_slab_journal(allocator);
3401 }
3402
3403 /* erase_next_slab_journal() - Erase the next slab journal. */
erase_next_slab_journal(struct block_allocator * allocator)3404 static void erase_next_slab_journal(struct block_allocator *allocator)
3405 {
3406 struct vdo_slab *slab;
3407 physical_block_number_t pbn;
3408 struct dm_io_region regions[1];
3409 struct slab_depot *depot = allocator->depot;
3410 block_count_t blocks = depot->slab_config.slab_journal_blocks;
3411
3412 if (allocator->slabs_to_erase.next == NULL) {
3413 vdo_finish_completion(&allocator->completion);
3414 return;
3415 }
3416
3417 slab = next_slab(&allocator->slabs_to_erase);
3418 pbn = slab->journal_origin - depot->vdo->geometry.bio_offset;
3419 regions[0] = (struct dm_io_region) {
3420 .bdev = vdo_get_backing_device(depot->vdo),
3421 .sector = pbn * VDO_SECTORS_PER_BLOCK,
3422 .count = blocks * VDO_SECTORS_PER_BLOCK,
3423 };
3424 dm_kcopyd_zero(allocator->eraser, 1, regions, 0, copy_callback, allocator);
3425 }
3426
3427 /* Implements vdo_admin_initiator_fn. */
initiate_load(struct admin_state * state)3428 static void initiate_load(struct admin_state *state)
3429 {
3430 struct block_allocator *allocator =
3431 container_of(state, struct block_allocator, state);
3432 const struct admin_state_code *operation = vdo_get_admin_state_code(state);
3433
3434 if (operation == VDO_ADMIN_STATE_LOADING_FOR_REBUILD) {
3435 /*
3436 * Must requeue because the kcopyd client cannot be freed in the same stack frame
3437 * as the kcopyd callback, lest it deadlock.
3438 */
3439 vdo_prepare_completion_for_requeue(&allocator->completion,
3440 finish_loading_allocator,
3441 handle_operation_error,
3442 allocator->thread_id, NULL);
3443 allocator->eraser = dm_kcopyd_client_create(NULL);
3444 if (IS_ERR(allocator->eraser)) {
3445 vdo_fail_completion(&allocator->completion,
3446 PTR_ERR(allocator->eraser));
3447 allocator->eraser = NULL;
3448 return;
3449 }
3450 allocator->slabs_to_erase = get_slab_iterator(allocator);
3451
3452 erase_next_slab_journal(allocator);
3453 return;
3454 }
3455
3456 apply_to_slabs(allocator, finish_loading_allocator);
3457 }
3458
3459 /**
3460 * vdo_notify_slab_journals_are_recovered() - Inform a block allocator that its slab journals have
3461 * been recovered from the recovery journal.
3462 * @completion The allocator completion
3463 */
vdo_notify_slab_journals_are_recovered(struct vdo_completion * completion)3464 void vdo_notify_slab_journals_are_recovered(struct vdo_completion *completion)
3465 {
3466 struct block_allocator *allocator = vdo_as_block_allocator(completion);
3467
3468 vdo_finish_loading_with_result(&allocator->state, completion->result);
3469 }
3470
get_slab_statuses(struct block_allocator * allocator,struct slab_status ** statuses_ptr)3471 static int get_slab_statuses(struct block_allocator *allocator,
3472 struct slab_status **statuses_ptr)
3473 {
3474 int result;
3475 struct slab_status *statuses;
3476 struct slab_iterator iterator = get_slab_iterator(allocator);
3477
3478 result = vdo_allocate(allocator->slab_count, struct slab_status, __func__,
3479 &statuses);
3480 if (result != VDO_SUCCESS)
3481 return result;
3482
3483 *statuses_ptr = statuses;
3484
3485 while (iterator.next != NULL) {
3486 slab_count_t slab_number = next_slab(&iterator)->slab_number;
3487
3488 *statuses++ = (struct slab_status) {
3489 .slab_number = slab_number,
3490 .is_clean = !allocator->summary_entries[slab_number].is_dirty,
3491 .emptiness = allocator->summary_entries[slab_number].fullness_hint,
3492 };
3493 }
3494
3495 return VDO_SUCCESS;
3496 }
3497
3498 /* Prepare slabs for allocation or scrubbing. */
vdo_prepare_slabs_for_allocation(struct block_allocator * allocator)3499 static int __must_check vdo_prepare_slabs_for_allocation(struct block_allocator *allocator)
3500 {
3501 struct slab_status current_slab_status;
3502 DEFINE_MIN_HEAP(struct slab_status, heap) heap;
3503 int result;
3504 struct slab_status *slab_statuses;
3505 struct slab_depot *depot = allocator->depot;
3506
3507 WRITE_ONCE(allocator->allocated_blocks,
3508 allocator->slab_count * depot->slab_config.data_blocks);
3509 result = get_slab_statuses(allocator, &slab_statuses);
3510 if (result != VDO_SUCCESS)
3511 return result;
3512
3513 /* Sort the slabs by cleanliness, then by emptiness hint. */
3514 heap = (struct heap) {
3515 .data = slab_statuses,
3516 .nr = allocator->slab_count,
3517 .size = allocator->slab_count,
3518 };
3519 min_heapify_all(&heap, &slab_status_min_heap, NULL);
3520
3521 while (heap.nr > 0) {
3522 bool high_priority;
3523 struct vdo_slab *slab;
3524 struct slab_journal *journal;
3525
3526 current_slab_status = slab_statuses[0];
3527 min_heap_pop(&heap, &slab_status_min_heap, NULL);
3528 slab = depot->slabs[current_slab_status.slab_number];
3529
3530 if ((depot->load_type == VDO_SLAB_DEPOT_REBUILD_LOAD) ||
3531 (!allocator->summary_entries[slab->slab_number].load_ref_counts &&
3532 current_slab_status.is_clean)) {
3533 queue_slab(slab);
3534 continue;
3535 }
3536
3537 slab->status = VDO_SLAB_REQUIRES_SCRUBBING;
3538 journal = &slab->journal;
3539 high_priority = ((current_slab_status.is_clean &&
3540 (depot->load_type == VDO_SLAB_DEPOT_NORMAL_LOAD)) ||
3541 (journal_length(journal) >= journal->scrubbing_threshold));
3542 register_slab_for_scrubbing(slab, high_priority);
3543 }
3544
3545 vdo_free(slab_statuses);
3546 return VDO_SUCCESS;
3547 }
3548
status_to_string(enum slab_rebuild_status status)3549 static const char *status_to_string(enum slab_rebuild_status status)
3550 {
3551 switch (status) {
3552 case VDO_SLAB_REBUILT:
3553 return "REBUILT";
3554 case VDO_SLAB_REQUIRES_SCRUBBING:
3555 return "SCRUBBING";
3556 case VDO_SLAB_REQUIRES_HIGH_PRIORITY_SCRUBBING:
3557 return "PRIORITY_SCRUBBING";
3558 case VDO_SLAB_REBUILDING:
3559 return "REBUILDING";
3560 case VDO_SLAB_REPLAYING:
3561 return "REPLAYING";
3562 default:
3563 return "UNKNOWN";
3564 }
3565 }
3566
vdo_dump_block_allocator(const struct block_allocator * allocator)3567 void vdo_dump_block_allocator(const struct block_allocator *allocator)
3568 {
3569 unsigned int pause_counter = 0;
3570 struct slab_iterator iterator = get_slab_iterator(allocator);
3571 const struct slab_scrubber *scrubber = &allocator->scrubber;
3572
3573 vdo_log_info("block_allocator zone %u", allocator->zone_number);
3574 while (iterator.next != NULL) {
3575 struct vdo_slab *slab = next_slab(&iterator);
3576 struct slab_journal *journal = &slab->journal;
3577
3578 if (slab->reference_blocks != NULL) {
3579 /* Terse because there are a lot of slabs to dump and syslog is lossy. */
3580 vdo_log_info("slab %u: P%u, %llu free", slab->slab_number,
3581 slab->priority,
3582 (unsigned long long) slab->free_blocks);
3583 } else {
3584 vdo_log_info("slab %u: status %s", slab->slab_number,
3585 status_to_string(slab->status));
3586 }
3587
3588 vdo_log_info(" slab journal: entry_waiters=%zu waiting_to_commit=%s updating_slab_summary=%s head=%llu unreapable=%llu tail=%llu next_commit=%llu summarized=%llu last_summarized=%llu recovery_lock=%llu dirty=%s",
3589 vdo_waitq_num_waiters(&journal->entry_waiters),
3590 vdo_bool_to_string(journal->waiting_to_commit),
3591 vdo_bool_to_string(journal->updating_slab_summary),
3592 (unsigned long long) journal->head,
3593 (unsigned long long) journal->unreapable,
3594 (unsigned long long) journal->tail,
3595 (unsigned long long) journal->next_commit,
3596 (unsigned long long) journal->summarized,
3597 (unsigned long long) journal->last_summarized,
3598 (unsigned long long) journal->recovery_lock,
3599 vdo_bool_to_string(journal->recovery_lock != 0));
3600 /*
3601 * Given the frequency with which the locks are just a tiny bit off, it might be
3602 * worth dumping all the locks, but that might be too much logging.
3603 */
3604
3605 if (slab->counters != NULL) {
3606 /* Terse because there are a lot of slabs to dump and syslog is lossy. */
3607 vdo_log_info(" slab: free=%u/%u blocks=%u dirty=%zu active=%zu journal@(%llu,%u)",
3608 slab->free_blocks, slab->block_count,
3609 slab->reference_block_count,
3610 vdo_waitq_num_waiters(&slab->dirty_blocks),
3611 slab->active_count,
3612 (unsigned long long) slab->slab_journal_point.sequence_number,
3613 slab->slab_journal_point.entry_count);
3614 } else {
3615 vdo_log_info(" no counters");
3616 }
3617
3618 /*
3619 * Wait for a while after each batch of 32 slabs dumped, an arbitrary number,
3620 * allowing the kernel log a chance to be flushed instead of being overrun.
3621 */
3622 if (pause_counter++ == 31) {
3623 pause_counter = 0;
3624 vdo_pause_for_logger();
3625 }
3626 }
3627
3628 vdo_log_info("slab_scrubber slab_count %u waiters %zu %s%s",
3629 READ_ONCE(scrubber->slab_count),
3630 vdo_waitq_num_waiters(&scrubber->waiters),
3631 vdo_get_admin_state_code(&scrubber->admin_state)->name,
3632 scrubber->high_priority_only ? ", high_priority_only " : "");
3633 }
3634
free_slab(struct vdo_slab * slab)3635 static void free_slab(struct vdo_slab *slab)
3636 {
3637 if (slab == NULL)
3638 return;
3639
3640 list_del(&slab->allocq_entry);
3641 vdo_free(vdo_forget(slab->journal.block));
3642 vdo_free(vdo_forget(slab->journal.locks));
3643 vdo_free(vdo_forget(slab->counters));
3644 vdo_free(vdo_forget(slab->reference_blocks));
3645 vdo_free(slab);
3646 }
3647
initialize_slab_journal(struct vdo_slab * slab)3648 static int initialize_slab_journal(struct vdo_slab *slab)
3649 {
3650 struct slab_journal *journal = &slab->journal;
3651 const struct slab_config *slab_config = &slab->allocator->depot->slab_config;
3652 int result;
3653
3654 result = vdo_allocate(slab_config->slab_journal_blocks, struct journal_lock,
3655 __func__, &journal->locks);
3656 if (result != VDO_SUCCESS)
3657 return result;
3658
3659 result = vdo_allocate(VDO_BLOCK_SIZE, char, "struct packed_slab_journal_block",
3660 (char **) &journal->block);
3661 if (result != VDO_SUCCESS)
3662 return result;
3663
3664 journal->slab = slab;
3665 journal->size = slab_config->slab_journal_blocks;
3666 journal->flushing_threshold = slab_config->slab_journal_flushing_threshold;
3667 journal->blocking_threshold = slab_config->slab_journal_blocking_threshold;
3668 journal->scrubbing_threshold = slab_config->slab_journal_scrubbing_threshold;
3669 journal->entries_per_block = VDO_SLAB_JOURNAL_ENTRIES_PER_BLOCK;
3670 journal->full_entries_per_block = VDO_SLAB_JOURNAL_FULL_ENTRIES_PER_BLOCK;
3671 journal->events = &slab->allocator->slab_journal_statistics;
3672 journal->recovery_journal = slab->allocator->depot->vdo->recovery_journal;
3673 journal->tail = 1;
3674 journal->head = 1;
3675
3676 journal->flushing_deadline = journal->flushing_threshold;
3677 /*
3678 * Set there to be some time between the deadline and the blocking threshold, so that
3679 * hopefully all are done before blocking.
3680 */
3681 if ((journal->blocking_threshold - journal->flushing_threshold) > 5)
3682 journal->flushing_deadline = journal->blocking_threshold - 5;
3683
3684 journal->slab_summary_waiter.callback = release_journal_locks;
3685
3686 INIT_LIST_HEAD(&journal->dirty_entry);
3687 INIT_LIST_HEAD(&journal->uncommitted_blocks);
3688
3689 journal->tail_header.nonce = slab->allocator->nonce;
3690 journal->tail_header.metadata_type = VDO_METADATA_SLAB_JOURNAL;
3691 initialize_journal_state(journal);
3692 return VDO_SUCCESS;
3693 }
3694
3695 /**
3696 * make_slab() - Construct a new, empty slab.
3697 * @slab_origin: The physical block number within the block allocator partition of the first block
3698 * in the slab.
3699 * @allocator: The block allocator to which the slab belongs.
3700 * @slab_number: The slab number of the slab.
3701 * @is_new: true if this slab is being allocated as part of a resize.
3702 * @slab_ptr: A pointer to receive the new slab.
3703 *
3704 * Return: VDO_SUCCESS or an error code.
3705 */
make_slab(physical_block_number_t slab_origin,struct block_allocator * allocator,slab_count_t slab_number,bool is_new,struct vdo_slab ** slab_ptr)3706 static int __must_check make_slab(physical_block_number_t slab_origin,
3707 struct block_allocator *allocator,
3708 slab_count_t slab_number, bool is_new,
3709 struct vdo_slab **slab_ptr)
3710 {
3711 const struct slab_config *slab_config = &allocator->depot->slab_config;
3712 struct vdo_slab *slab;
3713 int result;
3714
3715 result = vdo_allocate(1, struct vdo_slab, __func__, &slab);
3716 if (result != VDO_SUCCESS)
3717 return result;
3718
3719 *slab = (struct vdo_slab) {
3720 .allocator = allocator,
3721 .start = slab_origin,
3722 .end = slab_origin + slab_config->slab_blocks,
3723 .slab_number = slab_number,
3724 .ref_counts_origin = slab_origin + slab_config->data_blocks,
3725 .journal_origin =
3726 vdo_get_slab_journal_start_block(slab_config, slab_origin),
3727 .block_count = slab_config->data_blocks,
3728 .free_blocks = slab_config->data_blocks,
3729 .reference_block_count =
3730 vdo_get_saved_reference_count_size(slab_config->data_blocks),
3731 };
3732 INIT_LIST_HEAD(&slab->allocq_entry);
3733
3734 result = initialize_slab_journal(slab);
3735 if (result != VDO_SUCCESS) {
3736 free_slab(slab);
3737 return result;
3738 }
3739
3740 if (is_new) {
3741 vdo_set_admin_state_code(&slab->state, VDO_ADMIN_STATE_NEW);
3742 result = allocate_slab_counters(slab);
3743 if (result != VDO_SUCCESS) {
3744 free_slab(slab);
3745 return result;
3746 }
3747 } else {
3748 vdo_set_admin_state_code(&slab->state, VDO_ADMIN_STATE_NORMAL_OPERATION);
3749 }
3750
3751 *slab_ptr = slab;
3752 return VDO_SUCCESS;
3753 }
3754
3755 /**
3756 * allocate_slabs() - Allocate a new slab pointer array.
3757 * @depot: The depot.
3758 * @slab_count: The number of slabs the depot should have in the new array.
3759 *
3760 * Any existing slab pointers will be copied into the new array, and slabs will be allocated as
3761 * needed. The newly allocated slabs will not be distributed for use by the block allocators.
3762 *
3763 * Return: VDO_SUCCESS or an error code.
3764 */
allocate_slabs(struct slab_depot * depot,slab_count_t slab_count)3765 static int allocate_slabs(struct slab_depot *depot, slab_count_t slab_count)
3766 {
3767 block_count_t slab_size;
3768 bool resizing = false;
3769 physical_block_number_t slab_origin;
3770 int result;
3771
3772 result = vdo_allocate(slab_count, struct vdo_slab *,
3773 "slab pointer array", &depot->new_slabs);
3774 if (result != VDO_SUCCESS)
3775 return result;
3776
3777 if (depot->slabs != NULL) {
3778 memcpy(depot->new_slabs, depot->slabs,
3779 depot->slab_count * sizeof(struct vdo_slab *));
3780 resizing = true;
3781 }
3782
3783 slab_size = depot->slab_config.slab_blocks;
3784 slab_origin = depot->first_block + (depot->slab_count * slab_size);
3785
3786 for (depot->new_slab_count = depot->slab_count;
3787 depot->new_slab_count < slab_count;
3788 depot->new_slab_count++, slab_origin += slab_size) {
3789 struct block_allocator *allocator =
3790 &depot->allocators[depot->new_slab_count % depot->zone_count];
3791 struct vdo_slab **slab_ptr = &depot->new_slabs[depot->new_slab_count];
3792
3793 result = make_slab(slab_origin, allocator, depot->new_slab_count,
3794 resizing, slab_ptr);
3795 if (result != VDO_SUCCESS)
3796 return result;
3797 }
3798
3799 return VDO_SUCCESS;
3800 }
3801
3802 /**
3803 * vdo_abandon_new_slabs() - Abandon any new slabs in this depot, freeing them as needed.
3804 * @depot: The depot.
3805 */
vdo_abandon_new_slabs(struct slab_depot * depot)3806 void vdo_abandon_new_slabs(struct slab_depot *depot)
3807 {
3808 slab_count_t i;
3809
3810 if (depot->new_slabs == NULL)
3811 return;
3812
3813 for (i = depot->slab_count; i < depot->new_slab_count; i++)
3814 free_slab(vdo_forget(depot->new_slabs[i]));
3815 depot->new_slab_count = 0;
3816 depot->new_size = 0;
3817 vdo_free(vdo_forget(depot->new_slabs));
3818 }
3819
3820 /**
3821 * get_allocator_thread_id() - Get the ID of the thread on which a given allocator operates.
3822 *
3823 * Implements vdo_zone_thread_getter_fn.
3824 */
get_allocator_thread_id(void * context,zone_count_t zone_number)3825 static thread_id_t get_allocator_thread_id(void *context, zone_count_t zone_number)
3826 {
3827 return ((struct slab_depot *) context)->allocators[zone_number].thread_id;
3828 }
3829
3830 /**
3831 * release_recovery_journal_lock() - Request the slab journal to release the recovery journal lock
3832 * it may hold on a specified recovery journal block.
3833 * @journal: The slab journal.
3834 * @recovery_lock: The sequence number of the recovery journal block whose locks should be
3835 * released.
3836 *
3837 * Return: true if the journal does hold a lock on the specified block (which it will release).
3838 */
release_recovery_journal_lock(struct slab_journal * journal,sequence_number_t recovery_lock)3839 static bool __must_check release_recovery_journal_lock(struct slab_journal *journal,
3840 sequence_number_t recovery_lock)
3841 {
3842 if (recovery_lock > journal->recovery_lock) {
3843 VDO_ASSERT_LOG_ONLY((recovery_lock < journal->recovery_lock),
3844 "slab journal recovery lock is not older than the recovery journal head");
3845 return false;
3846 }
3847
3848 if ((recovery_lock < journal->recovery_lock) ||
3849 vdo_is_read_only(journal->slab->allocator->depot->vdo))
3850 return false;
3851
3852 /* All locks are held by the block which is in progress; write it. */
3853 commit_tail(journal);
3854 return true;
3855 }
3856
3857 /*
3858 * Request a commit of all dirty tail blocks which are locking the recovery journal block the depot
3859 * is seeking to release.
3860 *
3861 * Implements vdo_zone_action_fn.
3862 */
release_tail_block_locks(void * context,zone_count_t zone_number,struct vdo_completion * parent)3863 static void release_tail_block_locks(void *context, zone_count_t zone_number,
3864 struct vdo_completion *parent)
3865 {
3866 struct slab_journal *journal, *tmp;
3867 struct slab_depot *depot = context;
3868 struct list_head *list = &depot->allocators[zone_number].dirty_slab_journals;
3869
3870 list_for_each_entry_safe(journal, tmp, list, dirty_entry) {
3871 if (!release_recovery_journal_lock(journal,
3872 depot->active_release_request))
3873 break;
3874 }
3875
3876 vdo_finish_completion(parent);
3877 }
3878
3879 /**
3880 * prepare_for_tail_block_commit() - Prepare to commit oldest tail blocks.
3881 *
3882 * Implements vdo_action_preamble_fn.
3883 */
prepare_for_tail_block_commit(void * context,struct vdo_completion * parent)3884 static void prepare_for_tail_block_commit(void *context, struct vdo_completion *parent)
3885 {
3886 struct slab_depot *depot = context;
3887
3888 depot->active_release_request = depot->new_release_request;
3889 vdo_finish_completion(parent);
3890 }
3891
3892 /**
3893 * schedule_tail_block_commit() - Schedule a tail block commit if necessary.
3894 *
3895 * This method should not be called directly. Rather, call vdo_schedule_default_action() on the
3896 * depot's action manager.
3897 *
3898 * Implements vdo_action_scheduler_fn.
3899 */
schedule_tail_block_commit(void * context)3900 static bool schedule_tail_block_commit(void *context)
3901 {
3902 struct slab_depot *depot = context;
3903
3904 if (depot->new_release_request == depot->active_release_request)
3905 return false;
3906
3907 return vdo_schedule_action(depot->action_manager,
3908 prepare_for_tail_block_commit,
3909 release_tail_block_locks,
3910 NULL, NULL);
3911 }
3912
3913 /**
3914 * initialize_slab_scrubber() - Initialize an allocator's slab scrubber.
3915 * @allocator: The allocator being initialized
3916 *
3917 * Return: VDO_SUCCESS or an error.
3918 */
initialize_slab_scrubber(struct block_allocator * allocator)3919 static int initialize_slab_scrubber(struct block_allocator *allocator)
3920 {
3921 struct slab_scrubber *scrubber = &allocator->scrubber;
3922 block_count_t slab_journal_size =
3923 allocator->depot->slab_config.slab_journal_blocks;
3924 char *journal_data;
3925 int result;
3926
3927 result = vdo_allocate(VDO_BLOCK_SIZE * slab_journal_size,
3928 char, __func__, &journal_data);
3929 if (result != VDO_SUCCESS)
3930 return result;
3931
3932 result = allocate_vio_components(allocator->completion.vdo,
3933 VIO_TYPE_SLAB_JOURNAL,
3934 VIO_PRIORITY_METADATA,
3935 allocator, slab_journal_size,
3936 journal_data, &scrubber->vio);
3937 if (result != VDO_SUCCESS) {
3938 vdo_free(journal_data);
3939 return result;
3940 }
3941
3942 INIT_LIST_HEAD(&scrubber->high_priority_slabs);
3943 INIT_LIST_HEAD(&scrubber->slabs);
3944 vdo_set_admin_state_code(&scrubber->admin_state, VDO_ADMIN_STATE_SUSPENDED);
3945 return VDO_SUCCESS;
3946 }
3947
3948 /**
3949 * initialize_slab_summary_block() - Initialize a slab_summary_block.
3950 * @allocator: The allocator which owns the block.
3951 * @index: The index of this block in its zone's summary.
3952 *
3953 * Return: VDO_SUCCESS or an error.
3954 */
initialize_slab_summary_block(struct block_allocator * allocator,block_count_t index)3955 static int __must_check initialize_slab_summary_block(struct block_allocator *allocator,
3956 block_count_t index)
3957 {
3958 struct slab_summary_block *block = &allocator->summary_blocks[index];
3959 int result;
3960
3961 result = vdo_allocate(VDO_BLOCK_SIZE, char, __func__, &block->outgoing_entries);
3962 if (result != VDO_SUCCESS)
3963 return result;
3964
3965 result = allocate_vio_components(allocator->depot->vdo, VIO_TYPE_SLAB_SUMMARY,
3966 VIO_PRIORITY_METADATA, NULL, 1,
3967 block->outgoing_entries, &block->vio);
3968 if (result != VDO_SUCCESS)
3969 return result;
3970
3971 block->allocator = allocator;
3972 block->entries = &allocator->summary_entries[VDO_SLAB_SUMMARY_ENTRIES_PER_BLOCK * index];
3973 block->index = index;
3974 return VDO_SUCCESS;
3975 }
3976
initialize_block_allocator(struct slab_depot * depot,zone_count_t zone)3977 static int __must_check initialize_block_allocator(struct slab_depot *depot,
3978 zone_count_t zone)
3979 {
3980 int result;
3981 block_count_t i;
3982 struct block_allocator *allocator = &depot->allocators[zone];
3983 struct vdo *vdo = depot->vdo;
3984 block_count_t max_free_blocks = depot->slab_config.data_blocks;
3985 unsigned int max_priority = (2 + ilog2(max_free_blocks));
3986
3987 *allocator = (struct block_allocator) {
3988 .depot = depot,
3989 .zone_number = zone,
3990 .thread_id = vdo->thread_config.physical_threads[zone],
3991 .nonce = vdo->states.vdo.nonce,
3992 };
3993
3994 INIT_LIST_HEAD(&allocator->dirty_slab_journals);
3995 vdo_set_admin_state_code(&allocator->state, VDO_ADMIN_STATE_NORMAL_OPERATION);
3996 result = vdo_register_read_only_listener(vdo, allocator,
3997 notify_block_allocator_of_read_only_mode,
3998 allocator->thread_id);
3999 if (result != VDO_SUCCESS)
4000 return result;
4001
4002 vdo_initialize_completion(&allocator->completion, vdo, VDO_BLOCK_ALLOCATOR_COMPLETION);
4003 result = make_vio_pool(vdo, BLOCK_ALLOCATOR_VIO_POOL_SIZE, allocator->thread_id,
4004 VIO_TYPE_SLAB_JOURNAL, VIO_PRIORITY_METADATA,
4005 allocator, &allocator->vio_pool);
4006 if (result != VDO_SUCCESS)
4007 return result;
4008
4009 result = initialize_slab_scrubber(allocator);
4010 if (result != VDO_SUCCESS)
4011 return result;
4012
4013 result = vdo_make_priority_table(max_priority, &allocator->prioritized_slabs);
4014 if (result != VDO_SUCCESS)
4015 return result;
4016
4017 result = vdo_allocate(VDO_SLAB_SUMMARY_BLOCKS_PER_ZONE,
4018 struct slab_summary_block, __func__,
4019 &allocator->summary_blocks);
4020 if (result != VDO_SUCCESS)
4021 return result;
4022
4023 vdo_set_admin_state_code(&allocator->summary_state,
4024 VDO_ADMIN_STATE_NORMAL_OPERATION);
4025 allocator->summary_entries = depot->summary_entries + (MAX_VDO_SLABS * zone);
4026
4027 /* Initialize each summary block. */
4028 for (i = 0; i < VDO_SLAB_SUMMARY_BLOCKS_PER_ZONE; i++) {
4029 result = initialize_slab_summary_block(allocator, i);
4030 if (result != VDO_SUCCESS)
4031 return result;
4032 }
4033
4034 /*
4035 * Performing well atop thin provisioned storage requires either that VDO discards freed
4036 * blocks, or that the block allocator try to use slabs that already have allocated blocks
4037 * in preference to slabs that have never been opened. For reasons we have not been able to
4038 * fully understand, some SSD machines have been have been very sensitive (50% reduction in
4039 * test throughput) to very slight differences in the timing and locality of block
4040 * allocation. Assigning a low priority to unopened slabs (max_priority/2, say) would be
4041 * ideal for the story, but anything less than a very high threshold (max_priority - 1)
4042 * hurts on these machines.
4043 *
4044 * This sets the free block threshold for preferring to open an unopened slab to the binary
4045 * floor of 3/4ths the total number of data blocks in a slab, which will generally evaluate
4046 * to about half the slab size.
4047 */
4048 allocator->unopened_slab_priority = (1 + ilog2((max_free_blocks * 3) / 4));
4049
4050 return VDO_SUCCESS;
4051 }
4052
allocate_components(struct slab_depot * depot,struct partition * summary_partition)4053 static int allocate_components(struct slab_depot *depot,
4054 struct partition *summary_partition)
4055 {
4056 int result;
4057 zone_count_t zone;
4058 slab_count_t slab_count;
4059 u8 hint;
4060 u32 i;
4061 const struct thread_config *thread_config = &depot->vdo->thread_config;
4062
4063 result = vdo_make_action_manager(depot->zone_count, get_allocator_thread_id,
4064 thread_config->journal_thread, depot,
4065 schedule_tail_block_commit,
4066 depot->vdo, &depot->action_manager);
4067 if (result != VDO_SUCCESS)
4068 return result;
4069
4070 depot->origin = depot->first_block;
4071
4072 /* block size must be a multiple of entry size */
4073 BUILD_BUG_ON((VDO_BLOCK_SIZE % sizeof(struct slab_summary_entry)) != 0);
4074
4075 depot->summary_origin = summary_partition->offset;
4076 depot->hint_shift = vdo_get_slab_summary_hint_shift(depot->slab_size_shift);
4077 result = vdo_allocate(MAXIMUM_VDO_SLAB_SUMMARY_ENTRIES,
4078 struct slab_summary_entry, __func__,
4079 &depot->summary_entries);
4080 if (result != VDO_SUCCESS)
4081 return result;
4082
4083
4084 /* Initialize all the entries. */
4085 hint = compute_fullness_hint(depot, depot->slab_config.data_blocks);
4086 for (i = 0; i < MAXIMUM_VDO_SLAB_SUMMARY_ENTRIES; i++) {
4087 /*
4088 * This default tail block offset must be reflected in
4089 * slabJournal.c::read_slab_journal_tail().
4090 */
4091 depot->summary_entries[i] = (struct slab_summary_entry) {
4092 .tail_block_offset = 0,
4093 .fullness_hint = hint,
4094 .load_ref_counts = false,
4095 .is_dirty = false,
4096 };
4097 }
4098
4099 slab_count = vdo_compute_slab_count(depot->first_block, depot->last_block,
4100 depot->slab_size_shift);
4101 if (thread_config->physical_zone_count > slab_count) {
4102 return vdo_log_error_strerror(VDO_BAD_CONFIGURATION,
4103 "%u physical zones exceeds slab count %u",
4104 thread_config->physical_zone_count,
4105 slab_count);
4106 }
4107
4108 /* Initialize the block allocators. */
4109 for (zone = 0; zone < depot->zone_count; zone++) {
4110 result = initialize_block_allocator(depot, zone);
4111 if (result != VDO_SUCCESS)
4112 return result;
4113 }
4114
4115 /* Allocate slabs. */
4116 result = allocate_slabs(depot, slab_count);
4117 if (result != VDO_SUCCESS)
4118 return result;
4119
4120 /* Use the new slabs. */
4121 for (i = depot->slab_count; i < depot->new_slab_count; i++) {
4122 struct vdo_slab *slab = depot->new_slabs[i];
4123
4124 register_slab_with_allocator(slab->allocator, slab);
4125 WRITE_ONCE(depot->slab_count, depot->slab_count + 1);
4126 }
4127
4128 depot->slabs = depot->new_slabs;
4129 depot->new_slabs = NULL;
4130 depot->new_slab_count = 0;
4131
4132 return VDO_SUCCESS;
4133 }
4134
4135 /**
4136 * vdo_decode_slab_depot() - Make a slab depot and configure it with the state read from the super
4137 * block.
4138 * @state: The slab depot state from the super block.
4139 * @vdo: The VDO which will own the depot.
4140 * @summary_partition: The partition which holds the slab summary.
4141 * @depot_ptr: A pointer to hold the depot.
4142 *
4143 * Return: A success or error code.
4144 */
vdo_decode_slab_depot(struct slab_depot_state_2_0 state,struct vdo * vdo,struct partition * summary_partition,struct slab_depot ** depot_ptr)4145 int vdo_decode_slab_depot(struct slab_depot_state_2_0 state, struct vdo *vdo,
4146 struct partition *summary_partition,
4147 struct slab_depot **depot_ptr)
4148 {
4149 unsigned int slab_size_shift;
4150 struct slab_depot *depot;
4151 int result;
4152
4153 /*
4154 * Calculate the bit shift for efficiently mapping block numbers to slabs. Using a shift
4155 * requires that the slab size be a power of two.
4156 */
4157 block_count_t slab_size = state.slab_config.slab_blocks;
4158
4159 if (!is_power_of_2(slab_size)) {
4160 return vdo_log_error_strerror(UDS_INVALID_ARGUMENT,
4161 "slab size must be a power of two");
4162 }
4163 slab_size_shift = ilog2(slab_size);
4164
4165 result = vdo_allocate_extended(struct slab_depot,
4166 vdo->thread_config.physical_zone_count,
4167 struct block_allocator, __func__, &depot);
4168 if (result != VDO_SUCCESS)
4169 return result;
4170
4171 depot->vdo = vdo;
4172 depot->old_zone_count = state.zone_count;
4173 depot->zone_count = vdo->thread_config.physical_zone_count;
4174 depot->slab_config = state.slab_config;
4175 depot->first_block = state.first_block;
4176 depot->last_block = state.last_block;
4177 depot->slab_size_shift = slab_size_shift;
4178
4179 result = allocate_components(depot, summary_partition);
4180 if (result != VDO_SUCCESS) {
4181 vdo_free_slab_depot(depot);
4182 return result;
4183 }
4184
4185 *depot_ptr = depot;
4186 return VDO_SUCCESS;
4187 }
4188
uninitialize_allocator_summary(struct block_allocator * allocator)4189 static void uninitialize_allocator_summary(struct block_allocator *allocator)
4190 {
4191 block_count_t i;
4192
4193 if (allocator->summary_blocks == NULL)
4194 return;
4195
4196 for (i = 0; i < VDO_SLAB_SUMMARY_BLOCKS_PER_ZONE; i++) {
4197 free_vio_components(&allocator->summary_blocks[i].vio);
4198 vdo_free(vdo_forget(allocator->summary_blocks[i].outgoing_entries));
4199 }
4200
4201 vdo_free(vdo_forget(allocator->summary_blocks));
4202 }
4203
4204 /**
4205 * vdo_free_slab_depot() - Destroy a slab depot.
4206 * @depot: The depot to destroy.
4207 */
vdo_free_slab_depot(struct slab_depot * depot)4208 void vdo_free_slab_depot(struct slab_depot *depot)
4209 {
4210 zone_count_t zone = 0;
4211
4212 if (depot == NULL)
4213 return;
4214
4215 vdo_abandon_new_slabs(depot);
4216
4217 for (zone = 0; zone < depot->zone_count; zone++) {
4218 struct block_allocator *allocator = &depot->allocators[zone];
4219
4220 if (allocator->eraser != NULL)
4221 dm_kcopyd_client_destroy(vdo_forget(allocator->eraser));
4222
4223 uninitialize_allocator_summary(allocator);
4224 uninitialize_scrubber_vio(&allocator->scrubber);
4225 free_vio_pool(vdo_forget(allocator->vio_pool));
4226 vdo_free_priority_table(vdo_forget(allocator->prioritized_slabs));
4227 }
4228
4229 if (depot->slabs != NULL) {
4230 slab_count_t i;
4231
4232 for (i = 0; i < depot->slab_count; i++)
4233 free_slab(vdo_forget(depot->slabs[i]));
4234 }
4235
4236 vdo_free(vdo_forget(depot->slabs));
4237 vdo_free(vdo_forget(depot->action_manager));
4238 vdo_free(vdo_forget(depot->summary_entries));
4239 vdo_free(depot);
4240 }
4241
4242 /**
4243 * vdo_record_slab_depot() - Record the state of a slab depot for encoding into the super block.
4244 * @depot: The depot to encode.
4245 *
4246 * Return: The depot state.
4247 */
vdo_record_slab_depot(const struct slab_depot * depot)4248 struct slab_depot_state_2_0 vdo_record_slab_depot(const struct slab_depot *depot)
4249 {
4250 /*
4251 * If this depot is currently using 0 zones, it must have been synchronously loaded by a
4252 * tool and is now being saved. We did not load and combine the slab summary, so we still
4253 * need to do that next time we load with the old zone count rather than 0.
4254 */
4255 struct slab_depot_state_2_0 state;
4256 zone_count_t zones_to_record = depot->zone_count;
4257
4258 if (depot->zone_count == 0)
4259 zones_to_record = depot->old_zone_count;
4260
4261 state = (struct slab_depot_state_2_0) {
4262 .slab_config = depot->slab_config,
4263 .first_block = depot->first_block,
4264 .last_block = depot->last_block,
4265 .zone_count = zones_to_record,
4266 };
4267
4268 return state;
4269 }
4270
4271 /**
4272 * vdo_allocate_reference_counters() - Allocate the reference counters for all slabs in the depot.
4273 *
4274 * Context: This method may be called only before entering normal operation from the load thread.
4275 *
4276 * Return: VDO_SUCCESS or an error.
4277 */
vdo_allocate_reference_counters(struct slab_depot * depot)4278 int vdo_allocate_reference_counters(struct slab_depot *depot)
4279 {
4280 struct slab_iterator iterator =
4281 get_depot_slab_iterator(depot, depot->slab_count - 1, 0, 1);
4282
4283 while (iterator.next != NULL) {
4284 int result = allocate_slab_counters(next_slab(&iterator));
4285
4286 if (result != VDO_SUCCESS)
4287 return result;
4288 }
4289
4290 return VDO_SUCCESS;
4291 }
4292
4293 /**
4294 * get_slab_number() - Get the number of the slab that contains a specified block.
4295 * @depot: The slab depot.
4296 * @pbn: The physical block number.
4297 * @slab_number_ptr: A pointer to hold the slab number.
4298 *
4299 * Return: VDO_SUCCESS or an error.
4300 */
get_slab_number(const struct slab_depot * depot,physical_block_number_t pbn,slab_count_t * slab_number_ptr)4301 static int __must_check get_slab_number(const struct slab_depot *depot,
4302 physical_block_number_t pbn,
4303 slab_count_t *slab_number_ptr)
4304 {
4305 slab_count_t slab_number;
4306
4307 if (pbn < depot->first_block)
4308 return VDO_OUT_OF_RANGE;
4309
4310 slab_number = (pbn - depot->first_block) >> depot->slab_size_shift;
4311 if (slab_number >= depot->slab_count)
4312 return VDO_OUT_OF_RANGE;
4313
4314 *slab_number_ptr = slab_number;
4315 return VDO_SUCCESS;
4316 }
4317
4318 /**
4319 * vdo_get_slab() - Get the slab object for the slab that contains a specified block.
4320 * @depot: The slab depot.
4321 * @pbn: The physical block number.
4322 *
4323 * Will put the VDO in read-only mode if the PBN is not a valid data block nor the zero block.
4324 *
4325 * Return: The slab containing the block, or NULL if the block number is the zero block or
4326 * otherwise out of range.
4327 */
vdo_get_slab(const struct slab_depot * depot,physical_block_number_t pbn)4328 struct vdo_slab *vdo_get_slab(const struct slab_depot *depot,
4329 physical_block_number_t pbn)
4330 {
4331 slab_count_t slab_number;
4332 int result;
4333
4334 if (pbn == VDO_ZERO_BLOCK)
4335 return NULL;
4336
4337 result = get_slab_number(depot, pbn, &slab_number);
4338 if (result != VDO_SUCCESS) {
4339 vdo_enter_read_only_mode(depot->vdo, result);
4340 return NULL;
4341 }
4342
4343 return depot->slabs[slab_number];
4344 }
4345
4346 /**
4347 * vdo_get_increment_limit() - Determine how many new references a block can acquire.
4348 * @depot: The slab depot.
4349 * @pbn: The physical block number that is being queried.
4350 *
4351 * Context: This method must be called from the physical zone thread of the PBN.
4352 *
4353 * Return: The number of available references.
4354 */
vdo_get_increment_limit(struct slab_depot * depot,physical_block_number_t pbn)4355 u8 vdo_get_increment_limit(struct slab_depot *depot, physical_block_number_t pbn)
4356 {
4357 struct vdo_slab *slab = vdo_get_slab(depot, pbn);
4358 vdo_refcount_t *counter_ptr = NULL;
4359 int result;
4360
4361 if ((slab == NULL) || (slab->status != VDO_SLAB_REBUILT))
4362 return 0;
4363
4364 result = get_reference_counter(slab, pbn, &counter_ptr);
4365 if (result != VDO_SUCCESS)
4366 return 0;
4367
4368 if (*counter_ptr == PROVISIONAL_REFERENCE_COUNT)
4369 return (MAXIMUM_REFERENCE_COUNT - 1);
4370
4371 return (MAXIMUM_REFERENCE_COUNT - *counter_ptr);
4372 }
4373
4374 /**
4375 * vdo_is_physical_data_block() - Determine whether the given PBN refers to a data block.
4376 * @depot: The depot.
4377 * @pbn: The physical block number to ask about.
4378 *
4379 * Return: True if the PBN corresponds to a data block.
4380 */
vdo_is_physical_data_block(const struct slab_depot * depot,physical_block_number_t pbn)4381 bool vdo_is_physical_data_block(const struct slab_depot *depot,
4382 physical_block_number_t pbn)
4383 {
4384 slab_count_t slab_number;
4385 slab_block_number sbn;
4386
4387 return ((pbn == VDO_ZERO_BLOCK) ||
4388 ((get_slab_number(depot, pbn, &slab_number) == VDO_SUCCESS) &&
4389 (slab_block_number_from_pbn(depot->slabs[slab_number], pbn, &sbn) ==
4390 VDO_SUCCESS)));
4391 }
4392
4393 /**
4394 * vdo_get_slab_depot_allocated_blocks() - Get the total number of data blocks allocated across all
4395 * the slabs in the depot.
4396 * @depot: The slab depot.
4397 *
4398 * This is the total number of blocks with a non-zero reference count.
4399 *
4400 * Context: This may be called from any thread.
4401 *
4402 * Return: The total number of blocks with a non-zero reference count.
4403 */
vdo_get_slab_depot_allocated_blocks(const struct slab_depot * depot)4404 block_count_t vdo_get_slab_depot_allocated_blocks(const struct slab_depot *depot)
4405 {
4406 block_count_t total = 0;
4407 zone_count_t zone;
4408
4409 for (zone = 0; zone < depot->zone_count; zone++) {
4410 /* The allocators are responsible for thread safety. */
4411 total += READ_ONCE(depot->allocators[zone].allocated_blocks);
4412 }
4413
4414 return total;
4415 }
4416
4417 /**
4418 * vdo_get_slab_depot_data_blocks() - Get the total number of data blocks in all the slabs in the
4419 * depot.
4420 * @depot: The slab depot.
4421 *
4422 * Context: This may be called from any thread.
4423 *
4424 * Return: The total number of data blocks in all slabs.
4425 */
vdo_get_slab_depot_data_blocks(const struct slab_depot * depot)4426 block_count_t vdo_get_slab_depot_data_blocks(const struct slab_depot *depot)
4427 {
4428 return (READ_ONCE(depot->slab_count) * depot->slab_config.data_blocks);
4429 }
4430
4431 /**
4432 * finish_combining_zones() - Clean up after saving out the combined slab summary.
4433 * @completion: The vio which was used to write the summary data.
4434 */
finish_combining_zones(struct vdo_completion * completion)4435 static void finish_combining_zones(struct vdo_completion *completion)
4436 {
4437 int result = completion->result;
4438 struct vdo_completion *parent = completion->parent;
4439
4440 free_vio(as_vio(vdo_forget(completion)));
4441 vdo_fail_completion(parent, result);
4442 }
4443
handle_combining_error(struct vdo_completion * completion)4444 static void handle_combining_error(struct vdo_completion *completion)
4445 {
4446 vio_record_metadata_io_error(as_vio(completion));
4447 finish_combining_zones(completion);
4448 }
4449
write_summary_endio(struct bio * bio)4450 static void write_summary_endio(struct bio *bio)
4451 {
4452 struct vio *vio = bio->bi_private;
4453 struct vdo *vdo = vio->completion.vdo;
4454
4455 continue_vio_after_io(vio, finish_combining_zones,
4456 vdo->thread_config.admin_thread);
4457 }
4458
4459 /**
4460 * combine_summaries() - Treating the current entries buffer as the on-disk value of all zones,
4461 * update every zone to the correct values for every slab.
4462 * @depot: The depot whose summary entries should be combined.
4463 */
combine_summaries(struct slab_depot * depot)4464 static void combine_summaries(struct slab_depot *depot)
4465 {
4466 /*
4467 * Combine all the old summary data into the portion of the buffer corresponding to the
4468 * first zone.
4469 */
4470 zone_count_t zone = 0;
4471 struct slab_summary_entry *entries = depot->summary_entries;
4472
4473 if (depot->old_zone_count > 1) {
4474 slab_count_t entry_number;
4475
4476 for (entry_number = 0; entry_number < MAX_VDO_SLABS; entry_number++) {
4477 if (zone != 0) {
4478 memcpy(entries + entry_number,
4479 entries + (zone * MAX_VDO_SLABS) + entry_number,
4480 sizeof(struct slab_summary_entry));
4481 }
4482
4483 zone++;
4484 if (zone == depot->old_zone_count)
4485 zone = 0;
4486 }
4487 }
4488
4489 /* Copy the combined data to each zones's region of the buffer. */
4490 for (zone = 1; zone < MAX_VDO_PHYSICAL_ZONES; zone++) {
4491 memcpy(entries + (zone * MAX_VDO_SLABS), entries,
4492 MAX_VDO_SLABS * sizeof(struct slab_summary_entry));
4493 }
4494 }
4495
4496 /**
4497 * finish_loading_summary() - Finish loading slab summary data.
4498 * @completion: The vio which was used to read the summary data.
4499 *
4500 * Combines the slab summary data from all the previously written zones and copies the combined
4501 * summary to each partition's data region. Then writes the combined summary back out to disk. This
4502 * callback is registered in load_summary_endio().
4503 */
finish_loading_summary(struct vdo_completion * completion)4504 static void finish_loading_summary(struct vdo_completion *completion)
4505 {
4506 struct slab_depot *depot = completion->vdo->depot;
4507
4508 /* Combine the summary from each zone so each zone is correct for all slabs. */
4509 combine_summaries(depot);
4510
4511 /* Write the combined summary back out. */
4512 vdo_submit_metadata_vio(as_vio(completion), depot->summary_origin,
4513 write_summary_endio, handle_combining_error,
4514 REQ_OP_WRITE);
4515 }
4516
load_summary_endio(struct bio * bio)4517 static void load_summary_endio(struct bio *bio)
4518 {
4519 struct vio *vio = bio->bi_private;
4520 struct vdo *vdo = vio->completion.vdo;
4521
4522 continue_vio_after_io(vio, finish_loading_summary,
4523 vdo->thread_config.admin_thread);
4524 }
4525
4526 /**
4527 * load_slab_summary() - The preamble of a load operation.
4528 *
4529 * Implements vdo_action_preamble_fn.
4530 */
load_slab_summary(void * context,struct vdo_completion * parent)4531 static void load_slab_summary(void *context, struct vdo_completion *parent)
4532 {
4533 int result;
4534 struct vio *vio;
4535 struct slab_depot *depot = context;
4536 const struct admin_state_code *operation =
4537 vdo_get_current_manager_operation(depot->action_manager);
4538
4539 result = create_multi_block_metadata_vio(depot->vdo, VIO_TYPE_SLAB_SUMMARY,
4540 VIO_PRIORITY_METADATA, parent,
4541 VDO_SLAB_SUMMARY_BLOCKS,
4542 (char *) depot->summary_entries, &vio);
4543 if (result != VDO_SUCCESS) {
4544 vdo_fail_completion(parent, result);
4545 return;
4546 }
4547
4548 if ((operation == VDO_ADMIN_STATE_FORMATTING) ||
4549 (operation == VDO_ADMIN_STATE_LOADING_FOR_REBUILD)) {
4550 finish_loading_summary(&vio->completion);
4551 return;
4552 }
4553
4554 vdo_submit_metadata_vio(vio, depot->summary_origin, load_summary_endio,
4555 handle_combining_error, REQ_OP_READ);
4556 }
4557
4558 /* Implements vdo_zone_action_fn. */
load_allocator(void * context,zone_count_t zone_number,struct vdo_completion * parent)4559 static void load_allocator(void *context, zone_count_t zone_number,
4560 struct vdo_completion *parent)
4561 {
4562 struct slab_depot *depot = context;
4563
4564 vdo_start_loading(&depot->allocators[zone_number].state,
4565 vdo_get_current_manager_operation(depot->action_manager),
4566 parent, initiate_load);
4567 }
4568
4569 /**
4570 * vdo_load_slab_depot() - Asynchronously load any slab depot state that isn't included in the
4571 * super_block component.
4572 * @depot: The depot to load.
4573 * @operation: The type of load to perform.
4574 * @parent: The completion to notify when the load is complete.
4575 * @context: Additional context for the load operation; may be NULL.
4576 *
4577 * This method may be called only before entering normal operation from the load thread.
4578 */
vdo_load_slab_depot(struct slab_depot * depot,const struct admin_state_code * operation,struct vdo_completion * parent,void * context)4579 void vdo_load_slab_depot(struct slab_depot *depot,
4580 const struct admin_state_code *operation,
4581 struct vdo_completion *parent, void *context)
4582 {
4583 if (!vdo_assert_load_operation(operation, parent))
4584 return;
4585
4586 vdo_schedule_operation_with_context(depot->action_manager, operation,
4587 load_slab_summary, load_allocator,
4588 NULL, context, parent);
4589 }
4590
4591 /* Implements vdo_zone_action_fn. */
prepare_to_allocate(void * context,zone_count_t zone_number,struct vdo_completion * parent)4592 static void prepare_to_allocate(void *context, zone_count_t zone_number,
4593 struct vdo_completion *parent)
4594 {
4595 struct slab_depot *depot = context;
4596 struct block_allocator *allocator = &depot->allocators[zone_number];
4597 int result;
4598
4599 result = vdo_prepare_slabs_for_allocation(allocator);
4600 if (result != VDO_SUCCESS) {
4601 vdo_fail_completion(parent, result);
4602 return;
4603 }
4604
4605 scrub_slabs(allocator, parent);
4606 }
4607
4608 /**
4609 * vdo_prepare_slab_depot_to_allocate() - Prepare the slab depot to come online and start
4610 * allocating blocks.
4611 * @depot: The depot to prepare.
4612 * @load_type: The load type.
4613 * @parent: The completion to notify when the operation is complete.
4614 *
4615 * This method may be called only before entering normal operation from the load thread. It must be
4616 * called before allocation may proceed.
4617 */
vdo_prepare_slab_depot_to_allocate(struct slab_depot * depot,enum slab_depot_load_type load_type,struct vdo_completion * parent)4618 void vdo_prepare_slab_depot_to_allocate(struct slab_depot *depot,
4619 enum slab_depot_load_type load_type,
4620 struct vdo_completion *parent)
4621 {
4622 depot->load_type = load_type;
4623 atomic_set(&depot->zones_to_scrub, depot->zone_count);
4624 vdo_schedule_action(depot->action_manager, NULL,
4625 prepare_to_allocate, NULL, parent);
4626 }
4627
4628 /**
4629 * vdo_update_slab_depot_size() - Update the slab depot to reflect its new size in memory.
4630 * @depot: The depot to update.
4631 *
4632 * This size is saved to disk as part of the super block.
4633 */
vdo_update_slab_depot_size(struct slab_depot * depot)4634 void vdo_update_slab_depot_size(struct slab_depot *depot)
4635 {
4636 depot->last_block = depot->new_last_block;
4637 }
4638
4639 /**
4640 * vdo_prepare_to_grow_slab_depot() - Allocate new memory needed for a resize of a slab depot to
4641 * the given size.
4642 * @depot: The depot to prepare to resize.
4643 * @partition: The new depot partition
4644 *
4645 * Return: VDO_SUCCESS or an error.
4646 */
vdo_prepare_to_grow_slab_depot(struct slab_depot * depot,const struct partition * partition)4647 int vdo_prepare_to_grow_slab_depot(struct slab_depot *depot,
4648 const struct partition *partition)
4649 {
4650 struct slab_depot_state_2_0 new_state;
4651 int result;
4652 slab_count_t new_slab_count;
4653
4654 if ((partition->count >> depot->slab_size_shift) <= depot->slab_count)
4655 return VDO_INCREMENT_TOO_SMALL;
4656
4657 /* Generate the depot configuration for the new block count. */
4658 VDO_ASSERT_LOG_ONLY(depot->first_block == partition->offset,
4659 "New slab depot partition doesn't change origin");
4660 result = vdo_configure_slab_depot(partition, depot->slab_config,
4661 depot->zone_count, &new_state);
4662 if (result != VDO_SUCCESS)
4663 return result;
4664
4665 new_slab_count = vdo_compute_slab_count(depot->first_block,
4666 new_state.last_block,
4667 depot->slab_size_shift);
4668 if (new_slab_count <= depot->slab_count)
4669 return vdo_log_error_strerror(VDO_INCREMENT_TOO_SMALL,
4670 "Depot can only grow");
4671 if (new_slab_count == depot->new_slab_count) {
4672 /* Check it out, we've already got all the new slabs allocated! */
4673 return VDO_SUCCESS;
4674 }
4675
4676 vdo_abandon_new_slabs(depot);
4677 result = allocate_slabs(depot, new_slab_count);
4678 if (result != VDO_SUCCESS) {
4679 vdo_abandon_new_slabs(depot);
4680 return result;
4681 }
4682
4683 depot->new_size = partition->count;
4684 depot->old_last_block = depot->last_block;
4685 depot->new_last_block = new_state.last_block;
4686
4687 return VDO_SUCCESS;
4688 }
4689
4690 /**
4691 * finish_registration() - Finish registering new slabs now that all of the allocators have
4692 * received their new slabs.
4693 *
4694 * Implements vdo_action_conclusion_fn.
4695 */
finish_registration(void * context)4696 static int finish_registration(void *context)
4697 {
4698 struct slab_depot *depot = context;
4699
4700 WRITE_ONCE(depot->slab_count, depot->new_slab_count);
4701 vdo_free(depot->slabs);
4702 depot->slabs = depot->new_slabs;
4703 depot->new_slabs = NULL;
4704 depot->new_slab_count = 0;
4705 return VDO_SUCCESS;
4706 }
4707
4708 /* Implements vdo_zone_action_fn. */
register_new_slabs(void * context,zone_count_t zone_number,struct vdo_completion * parent)4709 static void register_new_slabs(void *context, zone_count_t zone_number,
4710 struct vdo_completion *parent)
4711 {
4712 struct slab_depot *depot = context;
4713 struct block_allocator *allocator = &depot->allocators[zone_number];
4714 slab_count_t i;
4715
4716 for (i = depot->slab_count; i < depot->new_slab_count; i++) {
4717 struct vdo_slab *slab = depot->new_slabs[i];
4718
4719 if (slab->allocator == allocator)
4720 register_slab_with_allocator(allocator, slab);
4721 }
4722
4723 vdo_finish_completion(parent);
4724 }
4725
4726 /**
4727 * vdo_use_new_slabs() - Use the new slabs allocated for resize.
4728 * @depot: The depot.
4729 * @parent: The object to notify when complete.
4730 */
vdo_use_new_slabs(struct slab_depot * depot,struct vdo_completion * parent)4731 void vdo_use_new_slabs(struct slab_depot *depot, struct vdo_completion *parent)
4732 {
4733 VDO_ASSERT_LOG_ONLY(depot->new_slabs != NULL, "Must have new slabs to use");
4734 vdo_schedule_operation(depot->action_manager,
4735 VDO_ADMIN_STATE_SUSPENDED_OPERATION,
4736 NULL, register_new_slabs,
4737 finish_registration, parent);
4738 }
4739
4740 /**
4741 * stop_scrubbing() - Tell the scrubber to stop scrubbing after it finishes the slab it is
4742 * currently working on.
4743 * @allocator: The block allocator owning the scrubber to stop.
4744 */
stop_scrubbing(struct block_allocator * allocator)4745 static void stop_scrubbing(struct block_allocator *allocator)
4746 {
4747 struct slab_scrubber *scrubber = &allocator->scrubber;
4748
4749 if (vdo_is_state_quiescent(&scrubber->admin_state)) {
4750 vdo_finish_completion(&allocator->completion);
4751 } else {
4752 vdo_start_draining(&scrubber->admin_state,
4753 VDO_ADMIN_STATE_SUSPENDING,
4754 &allocator->completion, NULL);
4755 }
4756 }
4757
4758 /* Implements vdo_admin_initiator_fn. */
initiate_summary_drain(struct admin_state * state)4759 static void initiate_summary_drain(struct admin_state *state)
4760 {
4761 check_summary_drain_complete(container_of(state, struct block_allocator,
4762 summary_state));
4763 }
4764
do_drain_step(struct vdo_completion * completion)4765 static void do_drain_step(struct vdo_completion *completion)
4766 {
4767 struct block_allocator *allocator = vdo_as_block_allocator(completion);
4768
4769 vdo_prepare_completion_for_requeue(&allocator->completion, do_drain_step,
4770 handle_operation_error, allocator->thread_id,
4771 NULL);
4772 switch (++allocator->drain_step) {
4773 case VDO_DRAIN_ALLOCATOR_STEP_SCRUBBER:
4774 stop_scrubbing(allocator);
4775 return;
4776
4777 case VDO_DRAIN_ALLOCATOR_STEP_SLABS:
4778 apply_to_slabs(allocator, do_drain_step);
4779 return;
4780
4781 case VDO_DRAIN_ALLOCATOR_STEP_SUMMARY:
4782 vdo_start_draining(&allocator->summary_state,
4783 vdo_get_admin_state_code(&allocator->state),
4784 completion, initiate_summary_drain);
4785 return;
4786
4787 case VDO_DRAIN_ALLOCATOR_STEP_FINISHED:
4788 VDO_ASSERT_LOG_ONLY(!is_vio_pool_busy(allocator->vio_pool),
4789 "vio pool not busy");
4790 vdo_finish_draining_with_result(&allocator->state, completion->result);
4791 return;
4792
4793 default:
4794 vdo_finish_draining_with_result(&allocator->state, UDS_BAD_STATE);
4795 }
4796 }
4797
4798 /* Implements vdo_admin_initiator_fn. */
initiate_drain(struct admin_state * state)4799 static void initiate_drain(struct admin_state *state)
4800 {
4801 struct block_allocator *allocator =
4802 container_of(state, struct block_allocator, state);
4803
4804 allocator->drain_step = VDO_DRAIN_ALLOCATOR_START;
4805 do_drain_step(&allocator->completion);
4806 }
4807
4808 /*
4809 * Drain all allocator I/O. Depending upon the type of drain, some or all dirty metadata may be
4810 * written to disk. The type of drain will be determined from the state of the allocator's depot.
4811 *
4812 * Implements vdo_zone_action_fn.
4813 */
drain_allocator(void * context,zone_count_t zone_number,struct vdo_completion * parent)4814 static void drain_allocator(void *context, zone_count_t zone_number,
4815 struct vdo_completion *parent)
4816 {
4817 struct slab_depot *depot = context;
4818
4819 vdo_start_draining(&depot->allocators[zone_number].state,
4820 vdo_get_current_manager_operation(depot->action_manager),
4821 parent, initiate_drain);
4822 }
4823
4824 /**
4825 * vdo_drain_slab_depot() - Drain all slab depot I/O.
4826 * @depot: The depot to drain.
4827 * @operation: The drain operation (flush, rebuild, suspend, or save).
4828 * @parent: The completion to finish when the drain is complete.
4829 *
4830 * If saving, or flushing, all dirty depot metadata will be written out. If saving or suspending,
4831 * the depot will be left in a suspended state.
4832 */
vdo_drain_slab_depot(struct slab_depot * depot,const struct admin_state_code * operation,struct vdo_completion * parent)4833 void vdo_drain_slab_depot(struct slab_depot *depot,
4834 const struct admin_state_code *operation,
4835 struct vdo_completion *parent)
4836 {
4837 vdo_schedule_operation(depot->action_manager, operation,
4838 NULL, drain_allocator, NULL, parent);
4839 }
4840
4841 /**
4842 * resume_scrubbing() - Tell the scrubber to resume scrubbing if it has been stopped.
4843 * @allocator: The allocator being resumed.
4844 */
resume_scrubbing(struct block_allocator * allocator)4845 static void resume_scrubbing(struct block_allocator *allocator)
4846 {
4847 int result;
4848 struct slab_scrubber *scrubber = &allocator->scrubber;
4849
4850 if (!has_slabs_to_scrub(scrubber)) {
4851 vdo_finish_completion(&allocator->completion);
4852 return;
4853 }
4854
4855 result = vdo_resume_if_quiescent(&scrubber->admin_state);
4856 if (result != VDO_SUCCESS) {
4857 vdo_fail_completion(&allocator->completion, result);
4858 return;
4859 }
4860
4861 scrub_next_slab(scrubber);
4862 vdo_finish_completion(&allocator->completion);
4863 }
4864
do_resume_step(struct vdo_completion * completion)4865 static void do_resume_step(struct vdo_completion *completion)
4866 {
4867 struct block_allocator *allocator = vdo_as_block_allocator(completion);
4868
4869 vdo_prepare_completion_for_requeue(&allocator->completion, do_resume_step,
4870 handle_operation_error,
4871 allocator->thread_id, NULL);
4872 switch (--allocator->drain_step) {
4873 case VDO_DRAIN_ALLOCATOR_STEP_SUMMARY:
4874 vdo_fail_completion(completion,
4875 vdo_resume_if_quiescent(&allocator->summary_state));
4876 return;
4877
4878 case VDO_DRAIN_ALLOCATOR_STEP_SLABS:
4879 apply_to_slabs(allocator, do_resume_step);
4880 return;
4881
4882 case VDO_DRAIN_ALLOCATOR_STEP_SCRUBBER:
4883 resume_scrubbing(allocator);
4884 return;
4885
4886 case VDO_DRAIN_ALLOCATOR_START:
4887 vdo_finish_resuming_with_result(&allocator->state, completion->result);
4888 return;
4889
4890 default:
4891 vdo_finish_resuming_with_result(&allocator->state, UDS_BAD_STATE);
4892 }
4893 }
4894
4895 /* Implements vdo_admin_initiator_fn. */
initiate_resume(struct admin_state * state)4896 static void initiate_resume(struct admin_state *state)
4897 {
4898 struct block_allocator *allocator =
4899 container_of(state, struct block_allocator, state);
4900
4901 allocator->drain_step = VDO_DRAIN_ALLOCATOR_STEP_FINISHED;
4902 do_resume_step(&allocator->completion);
4903 }
4904
4905 /* Implements vdo_zone_action_fn. */
resume_allocator(void * context,zone_count_t zone_number,struct vdo_completion * parent)4906 static void resume_allocator(void *context, zone_count_t zone_number,
4907 struct vdo_completion *parent)
4908 {
4909 struct slab_depot *depot = context;
4910
4911 vdo_start_resuming(&depot->allocators[zone_number].state,
4912 vdo_get_current_manager_operation(depot->action_manager),
4913 parent, initiate_resume);
4914 }
4915
4916 /**
4917 * vdo_resume_slab_depot() - Resume a suspended slab depot.
4918 * @depot: The depot to resume.
4919 * @parent: The completion to finish when the depot has resumed.
4920 */
vdo_resume_slab_depot(struct slab_depot * depot,struct vdo_completion * parent)4921 void vdo_resume_slab_depot(struct slab_depot *depot, struct vdo_completion *parent)
4922 {
4923 if (vdo_is_read_only(depot->vdo)) {
4924 vdo_continue_completion(parent, VDO_READ_ONLY);
4925 return;
4926 }
4927
4928 vdo_schedule_operation(depot->action_manager, VDO_ADMIN_STATE_RESUMING,
4929 NULL, resume_allocator, NULL, parent);
4930 }
4931
4932 /**
4933 * vdo_commit_oldest_slab_journal_tail_blocks() - Commit all dirty tail blocks which are locking a
4934 * given recovery journal block.
4935 * @depot: The depot.
4936 * @recovery_block_number: The sequence number of the recovery journal block whose locks should be
4937 * released.
4938 *
4939 * Context: This method must be called from the journal zone thread.
4940 */
vdo_commit_oldest_slab_journal_tail_blocks(struct slab_depot * depot,sequence_number_t recovery_block_number)4941 void vdo_commit_oldest_slab_journal_tail_blocks(struct slab_depot *depot,
4942 sequence_number_t recovery_block_number)
4943 {
4944 if (depot == NULL)
4945 return;
4946
4947 depot->new_release_request = recovery_block_number;
4948 vdo_schedule_default_action(depot->action_manager);
4949 }
4950
4951 /* Implements vdo_zone_action_fn. */
scrub_all_unrecovered_slabs(void * context,zone_count_t zone_number,struct vdo_completion * parent)4952 static void scrub_all_unrecovered_slabs(void *context, zone_count_t zone_number,
4953 struct vdo_completion *parent)
4954 {
4955 struct slab_depot *depot = context;
4956
4957 scrub_slabs(&depot->allocators[zone_number], NULL);
4958 vdo_launch_completion(parent);
4959 }
4960
4961 /**
4962 * vdo_scrub_all_unrecovered_slabs() - Scrub all unrecovered slabs.
4963 * @depot: The depot to scrub.
4964 * @parent: The object to notify when scrubbing has been launched for all zones.
4965 */
vdo_scrub_all_unrecovered_slabs(struct slab_depot * depot,struct vdo_completion * parent)4966 void vdo_scrub_all_unrecovered_slabs(struct slab_depot *depot,
4967 struct vdo_completion *parent)
4968 {
4969 vdo_schedule_action(depot->action_manager, NULL,
4970 scrub_all_unrecovered_slabs,
4971 NULL, parent);
4972 }
4973
4974 /**
4975 * get_block_allocator_statistics() - Get the total of the statistics from all the block allocators
4976 * in the depot.
4977 * @depot: The slab depot.
4978 *
4979 * Return: The statistics from all block allocators in the depot.
4980 */
4981 static struct block_allocator_statistics __must_check
get_block_allocator_statistics(const struct slab_depot * depot)4982 get_block_allocator_statistics(const struct slab_depot *depot)
4983 {
4984 struct block_allocator_statistics totals;
4985 zone_count_t zone;
4986
4987 memset(&totals, 0, sizeof(totals));
4988
4989 for (zone = 0; zone < depot->zone_count; zone++) {
4990 const struct block_allocator *allocator = &depot->allocators[zone];
4991 const struct block_allocator_statistics *stats = &allocator->statistics;
4992
4993 totals.slab_count += allocator->slab_count;
4994 totals.slabs_opened += READ_ONCE(stats->slabs_opened);
4995 totals.slabs_reopened += READ_ONCE(stats->slabs_reopened);
4996 }
4997
4998 return totals;
4999 }
5000
5001 /**
5002 * get_ref_counts_statistics() - Get the cumulative ref_counts statistics for the depot.
5003 * @depot: The slab depot.
5004 *
5005 * Return: The cumulative statistics for all ref_counts in the depot.
5006 */
5007 static struct ref_counts_statistics __must_check
get_ref_counts_statistics(const struct slab_depot * depot)5008 get_ref_counts_statistics(const struct slab_depot *depot)
5009 {
5010 struct ref_counts_statistics totals;
5011 zone_count_t zone;
5012
5013 memset(&totals, 0, sizeof(totals));
5014
5015 for (zone = 0; zone < depot->zone_count; zone++) {
5016 totals.blocks_written +=
5017 READ_ONCE(depot->allocators[zone].ref_counts_statistics.blocks_written);
5018 }
5019
5020 return totals;
5021 }
5022
5023 /**
5024 * get_slab_journal_statistics() - Get the aggregated slab journal statistics for the depot.
5025 * @depot: The slab depot.
5026 *
5027 * Return: The aggregated statistics for all slab journals in the depot.
5028 */
5029 static struct slab_journal_statistics __must_check
get_slab_journal_statistics(const struct slab_depot * depot)5030 get_slab_journal_statistics(const struct slab_depot *depot)
5031 {
5032 struct slab_journal_statistics totals;
5033 zone_count_t zone;
5034
5035 memset(&totals, 0, sizeof(totals));
5036
5037 for (zone = 0; zone < depot->zone_count; zone++) {
5038 const struct slab_journal_statistics *stats =
5039 &depot->allocators[zone].slab_journal_statistics;
5040
5041 totals.disk_full_count += READ_ONCE(stats->disk_full_count);
5042 totals.flush_count += READ_ONCE(stats->flush_count);
5043 totals.blocked_count += READ_ONCE(stats->blocked_count);
5044 totals.blocks_written += READ_ONCE(stats->blocks_written);
5045 totals.tail_busy_count += READ_ONCE(stats->tail_busy_count);
5046 }
5047
5048 return totals;
5049 }
5050
5051 /**
5052 * vdo_get_slab_depot_statistics() - Get all the vdo_statistics fields that are properties of the
5053 * slab depot.
5054 * @depot: The slab depot.
5055 * @stats: The vdo statistics structure to partially fill.
5056 */
vdo_get_slab_depot_statistics(const struct slab_depot * depot,struct vdo_statistics * stats)5057 void vdo_get_slab_depot_statistics(const struct slab_depot *depot,
5058 struct vdo_statistics *stats)
5059 {
5060 slab_count_t slab_count = READ_ONCE(depot->slab_count);
5061 slab_count_t unrecovered = 0;
5062 zone_count_t zone;
5063
5064 for (zone = 0; zone < depot->zone_count; zone++) {
5065 /* The allocators are responsible for thread safety. */
5066 unrecovered += READ_ONCE(depot->allocators[zone].scrubber.slab_count);
5067 }
5068
5069 stats->recovery_percentage = (slab_count - unrecovered) * 100 / slab_count;
5070 stats->allocator = get_block_allocator_statistics(depot);
5071 stats->ref_counts = get_ref_counts_statistics(depot);
5072 stats->slab_journal = get_slab_journal_statistics(depot);
5073 stats->slab_summary = (struct slab_summary_statistics) {
5074 .blocks_written = atomic64_read(&depot->summary_statistics.blocks_written),
5075 };
5076 }
5077
5078 /**
5079 * vdo_dump_slab_depot() - Dump the slab depot, in a thread-unsafe fashion.
5080 * @depot: The slab depot.
5081 */
vdo_dump_slab_depot(const struct slab_depot * depot)5082 void vdo_dump_slab_depot(const struct slab_depot *depot)
5083 {
5084 vdo_log_info("vdo slab depot");
5085 vdo_log_info(" zone_count=%u old_zone_count=%u slabCount=%u active_release_request=%llu new_release_request=%llu",
5086 (unsigned int) depot->zone_count,
5087 (unsigned int) depot->old_zone_count, READ_ONCE(depot->slab_count),
5088 (unsigned long long) depot->active_release_request,
5089 (unsigned long long) depot->new_release_request);
5090 }
5091