1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3 * Copyright 2023 Red Hat
4 */
5
6 #include "block-map.h"
7
8 #include <linux/bio.h>
9 #include <linux/ratelimit.h>
10
11 #include "errors.h"
12 #include "logger.h"
13 #include "memory-alloc.h"
14 #include "permassert.h"
15
16 #include "action-manager.h"
17 #include "admin-state.h"
18 #include "completion.h"
19 #include "constants.h"
20 #include "data-vio.h"
21 #include "encodings.h"
22 #include "io-submitter.h"
23 #include "physical-zone.h"
24 #include "recovery-journal.h"
25 #include "slab-depot.h"
26 #include "status-codes.h"
27 #include "types.h"
28 #include "vdo.h"
29 #include "vio.h"
30 #include "wait-queue.h"
31
32 /**
33 * DOC: Block map eras
34 *
35 * The block map era, or maximum age, is used as follows:
36 *
37 * Each block map page, when dirty, records the earliest recovery journal block sequence number of
38 * the changes reflected in that dirty block. Sequence numbers are classified into eras: every
39 * @maximum_age sequence numbers, we switch to a new era. Block map pages are assigned to eras
40 * according to the sequence number they record.
41 *
42 * In the current (newest) era, block map pages are not written unless there is cache pressure. In
43 * the next oldest era, each time a new journal block is written 1/@maximum_age of the pages in
44 * this era are issued for write. In all older eras, pages are issued for write immediately.
45 */
46
47 struct page_descriptor {
48 root_count_t root_index;
49 height_t height;
50 page_number_t page_index;
51 slot_number_t slot;
52 } __packed;
53
54 union page_key {
55 struct page_descriptor descriptor;
56 u64 key;
57 };
58
59 struct write_if_not_dirtied_context {
60 struct block_map_zone *zone;
61 u8 generation;
62 };
63
64 struct block_map_tree_segment {
65 struct tree_page *levels[VDO_BLOCK_MAP_TREE_HEIGHT];
66 };
67
68 struct block_map_tree {
69 struct block_map_tree_segment *segments;
70 };
71
72 struct forest {
73 struct block_map *map;
74 size_t segments;
75 struct boundary *boundaries;
76 struct tree_page **pages;
77 struct block_map_tree trees[];
78 };
79
80 struct cursor_level {
81 page_number_t page_index;
82 slot_number_t slot;
83 };
84
85 struct cursors;
86
87 struct cursor {
88 struct vdo_waiter waiter;
89 struct block_map_tree *tree;
90 height_t height;
91 struct cursors *parent;
92 struct boundary boundary;
93 struct cursor_level levels[VDO_BLOCK_MAP_TREE_HEIGHT];
94 struct pooled_vio *vio;
95 };
96
97 struct cursors {
98 struct block_map_zone *zone;
99 struct vio_pool *pool;
100 vdo_entry_callback_fn entry_callback;
101 struct vdo_completion *completion;
102 root_count_t active_roots;
103 struct cursor cursors[];
104 };
105
106 static const physical_block_number_t NO_PAGE = 0xFFFFFFFFFFFFFFFF;
107
108 /* Used to indicate that the page holding the location of a tree root has been "loaded". */
109 static const physical_block_number_t VDO_INVALID_PBN = 0xFFFFFFFFFFFFFFFF;
110
111 const struct block_map_entry UNMAPPED_BLOCK_MAP_ENTRY = {
112 .mapping_state = VDO_MAPPING_STATE_UNMAPPED & 0x0F,
113 .pbn_high_nibble = 0,
114 .pbn_low_word = __cpu_to_le32(VDO_ZERO_BLOCK & UINT_MAX),
115 };
116
117 #define LOG_INTERVAL 4000
118 #define DISPLAY_INTERVAL 100000
119
120 /*
121 * For adjusting VDO page cache statistic fields which are only mutated on the logical zone thread.
122 * Prevents any compiler shenanigans from affecting other threads reading those stats.
123 */
124 #define ADD_ONCE(value, delta) WRITE_ONCE(value, (value) + (delta))
125
is_dirty(const struct page_info * info)126 static inline bool is_dirty(const struct page_info *info)
127 {
128 return info->state == PS_DIRTY;
129 }
130
is_present(const struct page_info * info)131 static inline bool is_present(const struct page_info *info)
132 {
133 return (info->state == PS_RESIDENT) || (info->state == PS_DIRTY);
134 }
135
is_in_flight(const struct page_info * info)136 static inline bool is_in_flight(const struct page_info *info)
137 {
138 return (info->state == PS_INCOMING) || (info->state == PS_OUTGOING);
139 }
140
is_incoming(const struct page_info * info)141 static inline bool is_incoming(const struct page_info *info)
142 {
143 return info->state == PS_INCOMING;
144 }
145
is_outgoing(const struct page_info * info)146 static inline bool is_outgoing(const struct page_info *info)
147 {
148 return info->state == PS_OUTGOING;
149 }
150
is_valid(const struct page_info * info)151 static inline bool is_valid(const struct page_info *info)
152 {
153 return is_present(info) || is_outgoing(info);
154 }
155
get_page_buffer(struct page_info * info)156 static char *get_page_buffer(struct page_info *info)
157 {
158 struct vdo_page_cache *cache = info->cache;
159
160 return &cache->pages[(info - cache->infos) * VDO_BLOCK_SIZE];
161 }
162
page_completion_from_waiter(struct vdo_waiter * waiter)163 static inline struct vdo_page_completion *page_completion_from_waiter(struct vdo_waiter *waiter)
164 {
165 struct vdo_page_completion *completion;
166
167 if (waiter == NULL)
168 return NULL;
169
170 completion = container_of(waiter, struct vdo_page_completion, waiter);
171 vdo_assert_completion_type(&completion->completion, VDO_PAGE_COMPLETION);
172 return completion;
173 }
174
175 /**
176 * initialize_info() - Initialize all page info structures and put them on the free list.
177 *
178 * Return: VDO_SUCCESS or an error.
179 */
initialize_info(struct vdo_page_cache * cache)180 static int initialize_info(struct vdo_page_cache *cache)
181 {
182 struct page_info *info;
183
184 INIT_LIST_HEAD(&cache->free_list);
185 for (info = cache->infos; info < cache->infos + cache->page_count; info++) {
186 int result;
187
188 info->cache = cache;
189 info->state = PS_FREE;
190 info->pbn = NO_PAGE;
191
192 result = create_metadata_vio(cache->vdo, VIO_TYPE_BLOCK_MAP,
193 VIO_PRIORITY_METADATA, info,
194 get_page_buffer(info), &info->vio);
195 if (result != VDO_SUCCESS)
196 return result;
197
198 /* The thread ID should never change. */
199 info->vio->completion.callback_thread_id = cache->zone->thread_id;
200
201 INIT_LIST_HEAD(&info->state_entry);
202 list_add_tail(&info->state_entry, &cache->free_list);
203 INIT_LIST_HEAD(&info->lru_entry);
204 }
205
206 return VDO_SUCCESS;
207 }
208
209 /**
210 * allocate_cache_components() - Allocate components of the cache which require their own
211 * allocation.
212 *
213 * The caller is responsible for all clean up on errors.
214 *
215 * Return: VDO_SUCCESS or an error code.
216 */
allocate_cache_components(struct vdo_page_cache * cache)217 static int __must_check allocate_cache_components(struct vdo_page_cache *cache)
218 {
219 u64 size = cache->page_count * (u64) VDO_BLOCK_SIZE;
220 int result;
221
222 result = vdo_allocate(cache->page_count, struct page_info, "page infos",
223 &cache->infos);
224 if (result != VDO_SUCCESS)
225 return result;
226
227 result = vdo_allocate_memory(size, VDO_BLOCK_SIZE, "cache pages", &cache->pages);
228 if (result != VDO_SUCCESS)
229 return result;
230
231 result = vdo_int_map_create(cache->page_count, &cache->page_map);
232 if (result != VDO_SUCCESS)
233 return result;
234
235 return initialize_info(cache);
236 }
237
238 /**
239 * assert_on_cache_thread() - Assert that a function has been called on the VDO page cache's
240 * thread.
241 */
assert_on_cache_thread(struct vdo_page_cache * cache,const char * function_name)242 static inline void assert_on_cache_thread(struct vdo_page_cache *cache,
243 const char *function_name)
244 {
245 thread_id_t thread_id = vdo_get_callback_thread_id();
246
247 VDO_ASSERT_LOG_ONLY((thread_id == cache->zone->thread_id),
248 "%s() must only be called on cache thread %d, not thread %d",
249 function_name, cache->zone->thread_id, thread_id);
250 }
251
252 /** assert_io_allowed() - Assert that a page cache may issue I/O. */
assert_io_allowed(struct vdo_page_cache * cache)253 static inline void assert_io_allowed(struct vdo_page_cache *cache)
254 {
255 VDO_ASSERT_LOG_ONLY(!vdo_is_state_quiescent(&cache->zone->state),
256 "VDO page cache may issue I/O");
257 }
258
259 /** report_cache_pressure() - Log and, if enabled, report cache pressure. */
report_cache_pressure(struct vdo_page_cache * cache)260 static void report_cache_pressure(struct vdo_page_cache *cache)
261 {
262 ADD_ONCE(cache->stats.cache_pressure, 1);
263 if (cache->waiter_count > cache->page_count) {
264 if ((cache->pressure_report % LOG_INTERVAL) == 0)
265 vdo_log_info("page cache pressure %u", cache->stats.cache_pressure);
266
267 if (++cache->pressure_report >= DISPLAY_INTERVAL)
268 cache->pressure_report = 0;
269 }
270 }
271
272 /**
273 * get_page_state_name() - Return the name of a page state.
274 *
275 * If the page state is invalid a static string is returned and the invalid state is logged.
276 *
277 * Return: A pointer to a static page state name.
278 */
get_page_state_name(enum vdo_page_buffer_state state)279 static const char * __must_check get_page_state_name(enum vdo_page_buffer_state state)
280 {
281 int result;
282 static const char * const state_names[] = {
283 "FREE", "INCOMING", "FAILED", "RESIDENT", "DIRTY", "OUTGOING"
284 };
285
286 BUILD_BUG_ON(ARRAY_SIZE(state_names) != PAGE_STATE_COUNT);
287
288 result = VDO_ASSERT(state < ARRAY_SIZE(state_names),
289 "Unknown page_state value %d", state);
290 if (result != VDO_SUCCESS)
291 return "[UNKNOWN PAGE STATE]";
292
293 return state_names[state];
294 }
295
296 /**
297 * update_counter() - Update the counter associated with a given state.
298 * @info: The page info to count.
299 * @delta: The delta to apply to the counter.
300 */
update_counter(struct page_info * info,s32 delta)301 static void update_counter(struct page_info *info, s32 delta)
302 {
303 struct block_map_statistics *stats = &info->cache->stats;
304
305 switch (info->state) {
306 case PS_FREE:
307 ADD_ONCE(stats->free_pages, delta);
308 return;
309
310 case PS_INCOMING:
311 ADD_ONCE(stats->incoming_pages, delta);
312 return;
313
314 case PS_OUTGOING:
315 ADD_ONCE(stats->outgoing_pages, delta);
316 return;
317
318 case PS_FAILED:
319 ADD_ONCE(stats->failed_pages, delta);
320 return;
321
322 case PS_RESIDENT:
323 ADD_ONCE(stats->clean_pages, delta);
324 return;
325
326 case PS_DIRTY:
327 ADD_ONCE(stats->dirty_pages, delta);
328 return;
329
330 default:
331 return;
332 }
333 }
334
335 /** update_lru() - Update the lru information for an active page. */
update_lru(struct page_info * info)336 static void update_lru(struct page_info *info)
337 {
338 if (info->cache->lru_list.prev != &info->lru_entry)
339 list_move_tail(&info->lru_entry, &info->cache->lru_list);
340 }
341
342 /**
343 * set_info_state() - Set the state of a page_info and put it on the right list, adjusting
344 * counters.
345 */
set_info_state(struct page_info * info,enum vdo_page_buffer_state new_state)346 static void set_info_state(struct page_info *info, enum vdo_page_buffer_state new_state)
347 {
348 if (new_state == info->state)
349 return;
350
351 update_counter(info, -1);
352 info->state = new_state;
353 update_counter(info, 1);
354
355 switch (info->state) {
356 case PS_FREE:
357 case PS_FAILED:
358 list_move_tail(&info->state_entry, &info->cache->free_list);
359 return;
360
361 case PS_OUTGOING:
362 list_move_tail(&info->state_entry, &info->cache->outgoing_list);
363 return;
364
365 case PS_DIRTY:
366 return;
367
368 default:
369 list_del_init(&info->state_entry);
370 }
371 }
372
373 /** set_info_pbn() - Set the pbn for an info, updating the map as needed. */
set_info_pbn(struct page_info * info,physical_block_number_t pbn)374 static int __must_check set_info_pbn(struct page_info *info, physical_block_number_t pbn)
375 {
376 struct vdo_page_cache *cache = info->cache;
377
378 /* Either the new or the old page number must be NO_PAGE. */
379 int result = VDO_ASSERT((pbn == NO_PAGE) || (info->pbn == NO_PAGE),
380 "Must free a page before reusing it.");
381 if (result != VDO_SUCCESS)
382 return result;
383
384 if (info->pbn != NO_PAGE)
385 vdo_int_map_remove(cache->page_map, info->pbn);
386
387 info->pbn = pbn;
388
389 if (pbn != NO_PAGE) {
390 result = vdo_int_map_put(cache->page_map, pbn, info, true, NULL);
391 if (result != VDO_SUCCESS)
392 return result;
393 }
394 return VDO_SUCCESS;
395 }
396
397 /** reset_page_info() - Reset page info to represent an unallocated page. */
reset_page_info(struct page_info * info)398 static int reset_page_info(struct page_info *info)
399 {
400 int result;
401
402 result = VDO_ASSERT(info->busy == 0, "VDO Page must not be busy");
403 if (result != VDO_SUCCESS)
404 return result;
405
406 result = VDO_ASSERT(!vdo_waitq_has_waiters(&info->waiting),
407 "VDO Page must not have waiters");
408 if (result != VDO_SUCCESS)
409 return result;
410
411 result = set_info_pbn(info, NO_PAGE);
412 set_info_state(info, PS_FREE);
413 list_del_init(&info->lru_entry);
414 return result;
415 }
416
417 /**
418 * find_free_page() - Find a free page.
419 *
420 * Return: A pointer to the page info structure (if found), NULL otherwise.
421 */
find_free_page(struct vdo_page_cache * cache)422 static struct page_info * __must_check find_free_page(struct vdo_page_cache *cache)
423 {
424 struct page_info *info;
425
426 info = list_first_entry_or_null(&cache->free_list, struct page_info,
427 state_entry);
428 if (info != NULL)
429 list_del_init(&info->state_entry);
430
431 return info;
432 }
433
434 /**
435 * find_page() - Find the page info (if any) associated with a given pbn.
436 * @pbn: The absolute physical block number of the page.
437 *
438 * Return: The page info for the page if available, or NULL if not.
439 */
find_page(struct vdo_page_cache * cache,physical_block_number_t pbn)440 static struct page_info * __must_check find_page(struct vdo_page_cache *cache,
441 physical_block_number_t pbn)
442 {
443 if ((cache->last_found != NULL) && (cache->last_found->pbn == pbn))
444 return cache->last_found;
445
446 cache->last_found = vdo_int_map_get(cache->page_map, pbn);
447 return cache->last_found;
448 }
449
450 /**
451 * select_lru_page() - Determine which page is least recently used.
452 *
453 * Picks the least recently used from among the non-busy entries at the front of each of the lru
454 * ring. Since whenever we mark a page busy we also put it to the end of the ring it is unlikely
455 * that the entries at the front are busy unless the queue is very short, but not impossible.
456 *
457 * Return: A pointer to the info structure for a relevant page, or NULL if no such page can be
458 * found. The page can be dirty or resident.
459 */
select_lru_page(struct vdo_page_cache * cache)460 static struct page_info * __must_check select_lru_page(struct vdo_page_cache *cache)
461 {
462 struct page_info *info;
463
464 list_for_each_entry(info, &cache->lru_list, lru_entry)
465 if ((info->busy == 0) && !is_in_flight(info))
466 return info;
467
468 return NULL;
469 }
470
471 /* ASYNCHRONOUS INTERFACE BEYOND THIS POINT */
472
473 /**
474 * complete_with_page() - Helper to complete the VDO Page Completion request successfully.
475 * @info: The page info representing the result page.
476 * @vdo_page_comp: The VDO page completion to complete.
477 */
complete_with_page(struct page_info * info,struct vdo_page_completion * vdo_page_comp)478 static void complete_with_page(struct page_info *info,
479 struct vdo_page_completion *vdo_page_comp)
480 {
481 bool available = vdo_page_comp->writable ? is_present(info) : is_valid(info);
482
483 if (!available) {
484 vdo_log_error_strerror(VDO_BAD_PAGE,
485 "Requested cache page %llu in state %s is not %s",
486 (unsigned long long) info->pbn,
487 get_page_state_name(info->state),
488 vdo_page_comp->writable ? "present" : "valid");
489 vdo_fail_completion(&vdo_page_comp->completion, VDO_BAD_PAGE);
490 return;
491 }
492
493 vdo_page_comp->info = info;
494 vdo_page_comp->ready = true;
495 vdo_finish_completion(&vdo_page_comp->completion);
496 }
497
498 /**
499 * complete_waiter_with_error() - Complete a page completion with an error code.
500 * @waiter: The page completion, as a waiter.
501 * @result_ptr: A pointer to the error code.
502 *
503 * Implements waiter_callback_fn.
504 */
complete_waiter_with_error(struct vdo_waiter * waiter,void * result_ptr)505 static void complete_waiter_with_error(struct vdo_waiter *waiter, void *result_ptr)
506 {
507 int *result = result_ptr;
508
509 vdo_fail_completion(&page_completion_from_waiter(waiter)->completion, *result);
510 }
511
512 /**
513 * complete_waiter_with_page() - Complete a page completion with a page.
514 * @waiter: The page completion, as a waiter.
515 * @page_info: The page info to complete with.
516 *
517 * Implements waiter_callback_fn.
518 */
complete_waiter_with_page(struct vdo_waiter * waiter,void * page_info)519 static void complete_waiter_with_page(struct vdo_waiter *waiter, void *page_info)
520 {
521 complete_with_page(page_info, page_completion_from_waiter(waiter));
522 }
523
524 /**
525 * distribute_page_over_waitq() - Complete a waitq of VDO page completions with a page result.
526 *
527 * Upon completion the waitq will be empty.
528 *
529 * Return: The number of pages distributed.
530 */
distribute_page_over_waitq(struct page_info * info,struct vdo_wait_queue * waitq)531 static unsigned int distribute_page_over_waitq(struct page_info *info,
532 struct vdo_wait_queue *waitq)
533 {
534 size_t num_pages;
535
536 update_lru(info);
537 num_pages = vdo_waitq_num_waiters(waitq);
538
539 /*
540 * Increment the busy count once for each pending completion so that this page does not
541 * stop being busy until all completions have been processed.
542 */
543 info->busy += num_pages;
544
545 vdo_waitq_notify_all_waiters(waitq, complete_waiter_with_page, info);
546 return num_pages;
547 }
548
549 /**
550 * set_persistent_error() - Set a persistent error which all requests will receive in the future.
551 * @context: A string describing what triggered the error.
552 *
553 * Once triggered, all enqueued completions will get this error. Any future requests will result in
554 * this error as well.
555 */
set_persistent_error(struct vdo_page_cache * cache,const char * context,int result)556 static void set_persistent_error(struct vdo_page_cache *cache, const char *context,
557 int result)
558 {
559 struct page_info *info;
560 /* If we're already read-only, there's no need to log. */
561 struct vdo *vdo = cache->vdo;
562
563 if ((result != VDO_READ_ONLY) && !vdo_is_read_only(vdo)) {
564 vdo_log_error_strerror(result, "VDO Page Cache persistent error: %s",
565 context);
566 vdo_enter_read_only_mode(vdo, result);
567 }
568
569 assert_on_cache_thread(cache, __func__);
570
571 vdo_waitq_notify_all_waiters(&cache->free_waiters,
572 complete_waiter_with_error, &result);
573 cache->waiter_count = 0;
574
575 for (info = cache->infos; info < cache->infos + cache->page_count; info++) {
576 vdo_waitq_notify_all_waiters(&info->waiting,
577 complete_waiter_with_error, &result);
578 }
579 }
580
581 /**
582 * validate_completed_page() - Check that a page completion which is being freed to the cache
583 * referred to a valid page and is in a valid state.
584 * @writable: Whether a writable page is required.
585 *
586 * Return: VDO_SUCCESS if the page was valid, otherwise as error
587 */
validate_completed_page(struct vdo_page_completion * completion,bool writable)588 static int __must_check validate_completed_page(struct vdo_page_completion *completion,
589 bool writable)
590 {
591 int result;
592
593 result = VDO_ASSERT(completion->ready, "VDO Page completion not ready");
594 if (result != VDO_SUCCESS)
595 return result;
596
597 result = VDO_ASSERT(completion->info != NULL,
598 "VDO Page Completion must be complete");
599 if (result != VDO_SUCCESS)
600 return result;
601
602 result = VDO_ASSERT(completion->info->pbn == completion->pbn,
603 "VDO Page Completion pbn must be consistent");
604 if (result != VDO_SUCCESS)
605 return result;
606
607 result = VDO_ASSERT(is_valid(completion->info),
608 "VDO Page Completion page must be valid");
609 if (result != VDO_SUCCESS)
610 return result;
611
612 if (writable) {
613 result = VDO_ASSERT(completion->writable,
614 "VDO Page Completion must be writable");
615 if (result != VDO_SUCCESS)
616 return result;
617 }
618
619 return VDO_SUCCESS;
620 }
621
check_for_drain_complete(struct block_map_zone * zone)622 static void check_for_drain_complete(struct block_map_zone *zone)
623 {
624 if (vdo_is_state_draining(&zone->state) &&
625 (zone->active_lookups == 0) &&
626 !vdo_waitq_has_waiters(&zone->flush_waiters) &&
627 !is_vio_pool_busy(zone->vio_pool) &&
628 (zone->page_cache.outstanding_reads == 0) &&
629 (zone->page_cache.outstanding_writes == 0)) {
630 vdo_finish_draining_with_result(&zone->state,
631 (vdo_is_read_only(zone->block_map->vdo) ?
632 VDO_READ_ONLY : VDO_SUCCESS));
633 }
634 }
635
enter_zone_read_only_mode(struct block_map_zone * zone,int result)636 static void enter_zone_read_only_mode(struct block_map_zone *zone, int result)
637 {
638 vdo_enter_read_only_mode(zone->block_map->vdo, result);
639
640 /*
641 * We are in read-only mode, so we won't ever write any page out.
642 * Just take all waiters off the waitq so the zone can drain.
643 */
644 vdo_waitq_init(&zone->flush_waiters);
645 check_for_drain_complete(zone);
646 }
647
648 static bool __must_check
validate_completed_page_or_enter_read_only_mode(struct vdo_page_completion * completion,bool writable)649 validate_completed_page_or_enter_read_only_mode(struct vdo_page_completion *completion,
650 bool writable)
651 {
652 int result = validate_completed_page(completion, writable);
653
654 if (result == VDO_SUCCESS)
655 return true;
656
657 enter_zone_read_only_mode(completion->info->cache->zone, result);
658 return false;
659 }
660
661 /**
662 * handle_load_error() - Handle page load errors.
663 * @completion: The page read vio.
664 */
handle_load_error(struct vdo_completion * completion)665 static void handle_load_error(struct vdo_completion *completion)
666 {
667 int result = completion->result;
668 struct page_info *info = completion->parent;
669 struct vdo_page_cache *cache = info->cache;
670
671 assert_on_cache_thread(cache, __func__);
672 vio_record_metadata_io_error(as_vio(completion));
673 vdo_enter_read_only_mode(cache->zone->block_map->vdo, result);
674 ADD_ONCE(cache->stats.failed_reads, 1);
675 set_info_state(info, PS_FAILED);
676 vdo_waitq_notify_all_waiters(&info->waiting, complete_waiter_with_error, &result);
677 reset_page_info(info);
678
679 /*
680 * Don't decrement until right before calling check_for_drain_complete() to
681 * ensure that the above work can't cause the page cache to be freed out from under us.
682 */
683 cache->outstanding_reads--;
684 check_for_drain_complete(cache->zone);
685 }
686
687 /**
688 * page_is_loaded() - Callback used when a page has been loaded.
689 * @completion: The vio which has loaded the page. Its parent is the page_info.
690 */
page_is_loaded(struct vdo_completion * completion)691 static void page_is_loaded(struct vdo_completion *completion)
692 {
693 struct page_info *info = completion->parent;
694 struct vdo_page_cache *cache = info->cache;
695 nonce_t nonce = info->cache->zone->block_map->nonce;
696 struct block_map_page *page;
697 enum block_map_page_validity validity;
698
699 assert_on_cache_thread(cache, __func__);
700
701 page = (struct block_map_page *) get_page_buffer(info);
702 validity = vdo_validate_block_map_page(page, nonce, info->pbn);
703 if (validity == VDO_BLOCK_MAP_PAGE_BAD) {
704 physical_block_number_t pbn = vdo_get_block_map_page_pbn(page);
705 int result = vdo_log_error_strerror(VDO_BAD_PAGE,
706 "Expected page %llu but got page %llu instead",
707 (unsigned long long) info->pbn,
708 (unsigned long long) pbn);
709
710 vdo_continue_completion(completion, result);
711 return;
712 }
713
714 if (validity == VDO_BLOCK_MAP_PAGE_INVALID)
715 vdo_format_block_map_page(page, nonce, info->pbn, false);
716
717 info->recovery_lock = 0;
718 set_info_state(info, PS_RESIDENT);
719 distribute_page_over_waitq(info, &info->waiting);
720
721 /*
722 * Don't decrement until right before calling check_for_drain_complete() to
723 * ensure that the above work can't cause the page cache to be freed out from under us.
724 */
725 cache->outstanding_reads--;
726 check_for_drain_complete(cache->zone);
727 }
728
729 /**
730 * handle_rebuild_read_error() - Handle a read error during a read-only rebuild.
731 * @completion: The page load completion.
732 */
handle_rebuild_read_error(struct vdo_completion * completion)733 static void handle_rebuild_read_error(struct vdo_completion *completion)
734 {
735 struct page_info *info = completion->parent;
736 struct vdo_page_cache *cache = info->cache;
737
738 assert_on_cache_thread(cache, __func__);
739
740 /*
741 * We are doing a read-only rebuild, so treat this as a successful read
742 * of an uninitialized page.
743 */
744 vio_record_metadata_io_error(as_vio(completion));
745 ADD_ONCE(cache->stats.failed_reads, 1);
746 memset(get_page_buffer(info), 0, VDO_BLOCK_SIZE);
747 vdo_reset_completion(completion);
748 page_is_loaded(completion);
749 }
750
load_cache_page_endio(struct bio * bio)751 static void load_cache_page_endio(struct bio *bio)
752 {
753 struct vio *vio = bio->bi_private;
754 struct page_info *info = vio->completion.parent;
755
756 continue_vio_after_io(vio, page_is_loaded, info->cache->zone->thread_id);
757 }
758
759 /**
760 * launch_page_load() - Begin the process of loading a page.
761 *
762 * Return: VDO_SUCCESS or an error code.
763 */
launch_page_load(struct page_info * info,physical_block_number_t pbn)764 static int __must_check launch_page_load(struct page_info *info,
765 physical_block_number_t pbn)
766 {
767 int result;
768 vdo_action_fn callback;
769 struct vdo_page_cache *cache = info->cache;
770
771 assert_io_allowed(cache);
772
773 result = set_info_pbn(info, pbn);
774 if (result != VDO_SUCCESS)
775 return result;
776
777 result = VDO_ASSERT((info->busy == 0), "Page is not busy before loading.");
778 if (result != VDO_SUCCESS)
779 return result;
780
781 set_info_state(info, PS_INCOMING);
782 cache->outstanding_reads++;
783 ADD_ONCE(cache->stats.pages_loaded, 1);
784 callback = (cache->rebuilding ? handle_rebuild_read_error : handle_load_error);
785 vdo_submit_metadata_vio(info->vio, pbn, load_cache_page_endio,
786 callback, REQ_OP_READ | REQ_PRIO);
787 return VDO_SUCCESS;
788 }
789
790 static void write_pages(struct vdo_completion *completion);
791
792 /** handle_flush_error() - Handle errors flushing the layer. */
handle_flush_error(struct vdo_completion * completion)793 static void handle_flush_error(struct vdo_completion *completion)
794 {
795 struct page_info *info = completion->parent;
796
797 vio_record_metadata_io_error(as_vio(completion));
798 set_persistent_error(info->cache, "flush failed", completion->result);
799 write_pages(completion);
800 }
801
flush_endio(struct bio * bio)802 static void flush_endio(struct bio *bio)
803 {
804 struct vio *vio = bio->bi_private;
805 struct page_info *info = vio->completion.parent;
806
807 continue_vio_after_io(vio, write_pages, info->cache->zone->thread_id);
808 }
809
810 /** save_pages() - Attempt to save the outgoing pages by first flushing the layer. */
save_pages(struct vdo_page_cache * cache)811 static void save_pages(struct vdo_page_cache *cache)
812 {
813 struct page_info *info;
814 struct vio *vio;
815
816 if ((cache->pages_in_flush > 0) || (cache->pages_to_flush == 0))
817 return;
818
819 assert_io_allowed(cache);
820
821 info = list_first_entry(&cache->outgoing_list, struct page_info, state_entry);
822
823 cache->pages_in_flush = cache->pages_to_flush;
824 cache->pages_to_flush = 0;
825 ADD_ONCE(cache->stats.flush_count, 1);
826
827 vio = info->vio;
828
829 /*
830 * We must make sure that the recovery journal entries that changed these pages were
831 * successfully persisted, and thus must issue a flush before each batch of pages is
832 * written to ensure this.
833 */
834 vdo_submit_flush_vio(vio, flush_endio, handle_flush_error);
835 }
836
837 /**
838 * schedule_page_save() - Add a page to the outgoing list of pages waiting to be saved.
839 *
840 * Once in the list, a page may not be used until it has been written out.
841 */
schedule_page_save(struct page_info * info)842 static void schedule_page_save(struct page_info *info)
843 {
844 if (info->busy > 0) {
845 info->write_status = WRITE_STATUS_DEFERRED;
846 return;
847 }
848
849 info->cache->pages_to_flush++;
850 info->cache->outstanding_writes++;
851 set_info_state(info, PS_OUTGOING);
852 }
853
854 /**
855 * launch_page_save() - Add a page to outgoing pages waiting to be saved, and then start saving
856 * pages if another save is not in progress.
857 */
launch_page_save(struct page_info * info)858 static void launch_page_save(struct page_info *info)
859 {
860 schedule_page_save(info);
861 save_pages(info->cache);
862 }
863
864 /**
865 * completion_needs_page() - Determine whether a given vdo_page_completion (as a waiter) is
866 * requesting a given page number.
867 * @context: A pointer to the pbn of the desired page.
868 *
869 * Implements waiter_match_fn.
870 *
871 * Return: true if the page completion is for the desired page number.
872 */
completion_needs_page(struct vdo_waiter * waiter,void * context)873 static bool completion_needs_page(struct vdo_waiter *waiter, void *context)
874 {
875 physical_block_number_t *pbn = context;
876
877 return (page_completion_from_waiter(waiter)->pbn == *pbn);
878 }
879
880 /**
881 * allocate_free_page() - Allocate a free page to the first completion in the waiting queue, and
882 * any other completions that match it in page number.
883 */
allocate_free_page(struct page_info * info)884 static void allocate_free_page(struct page_info *info)
885 {
886 int result;
887 struct vdo_waiter *oldest_waiter;
888 physical_block_number_t pbn;
889 struct vdo_page_cache *cache = info->cache;
890
891 assert_on_cache_thread(cache, __func__);
892
893 if (!vdo_waitq_has_waiters(&cache->free_waiters)) {
894 if (cache->stats.cache_pressure > 0) {
895 vdo_log_info("page cache pressure relieved");
896 WRITE_ONCE(cache->stats.cache_pressure, 0);
897 }
898
899 return;
900 }
901
902 result = reset_page_info(info);
903 if (result != VDO_SUCCESS) {
904 set_persistent_error(cache, "cannot reset page info", result);
905 return;
906 }
907
908 oldest_waiter = vdo_waitq_get_first_waiter(&cache->free_waiters);
909 pbn = page_completion_from_waiter(oldest_waiter)->pbn;
910
911 /*
912 * Remove all entries which match the page number in question and push them onto the page
913 * info's waitq.
914 */
915 vdo_waitq_dequeue_matching_waiters(&cache->free_waiters, completion_needs_page,
916 &pbn, &info->waiting);
917 cache->waiter_count -= vdo_waitq_num_waiters(&info->waiting);
918
919 result = launch_page_load(info, pbn);
920 if (result != VDO_SUCCESS) {
921 vdo_waitq_notify_all_waiters(&info->waiting,
922 complete_waiter_with_error, &result);
923 }
924 }
925
926 /**
927 * discard_a_page() - Begin the process of discarding a page.
928 *
929 * If no page is discardable, increments a count of deferred frees so that the next release of a
930 * page which is no longer busy will kick off another discard cycle. This is an indication that the
931 * cache is not big enough.
932 *
933 * If the selected page is not dirty, immediately allocates the page to the oldest completion
934 * waiting for a free page.
935 */
discard_a_page(struct vdo_page_cache * cache)936 static void discard_a_page(struct vdo_page_cache *cache)
937 {
938 struct page_info *info = select_lru_page(cache);
939
940 if (info == NULL) {
941 report_cache_pressure(cache);
942 return;
943 }
944
945 if (!is_dirty(info)) {
946 allocate_free_page(info);
947 return;
948 }
949
950 VDO_ASSERT_LOG_ONLY(!is_in_flight(info),
951 "page selected for discard is not in flight");
952
953 cache->discard_count++;
954 info->write_status = WRITE_STATUS_DISCARD;
955 launch_page_save(info);
956 }
957
958 /**
959 * discard_page_for_completion() - Helper used to trigger a discard so that the completion can get
960 * a different page.
961 */
discard_page_for_completion(struct vdo_page_completion * vdo_page_comp)962 static void discard_page_for_completion(struct vdo_page_completion *vdo_page_comp)
963 {
964 struct vdo_page_cache *cache = vdo_page_comp->cache;
965
966 cache->waiter_count++;
967 vdo_waitq_enqueue_waiter(&cache->free_waiters, &vdo_page_comp->waiter);
968 discard_a_page(cache);
969 }
970
971 /**
972 * discard_page_if_needed() - Helper used to trigger a discard if the cache needs another free
973 * page.
974 * @cache: The page cache.
975 */
discard_page_if_needed(struct vdo_page_cache * cache)976 static void discard_page_if_needed(struct vdo_page_cache *cache)
977 {
978 if (cache->waiter_count > cache->discard_count)
979 discard_a_page(cache);
980 }
981
982 /**
983 * write_has_finished() - Inform the cache that a write has finished (possibly with an error).
984 * @info: The info structure for the page whose write just completed.
985 *
986 * Return: true if the page write was a discard.
987 */
write_has_finished(struct page_info * info)988 static bool write_has_finished(struct page_info *info)
989 {
990 bool was_discard = (info->write_status == WRITE_STATUS_DISCARD);
991
992 assert_on_cache_thread(info->cache, __func__);
993 info->cache->outstanding_writes--;
994
995 info->write_status = WRITE_STATUS_NORMAL;
996 return was_discard;
997 }
998
999 /**
1000 * handle_page_write_error() - Handler for page write errors.
1001 * @completion: The page write vio.
1002 */
handle_page_write_error(struct vdo_completion * completion)1003 static void handle_page_write_error(struct vdo_completion *completion)
1004 {
1005 int result = completion->result;
1006 struct page_info *info = completion->parent;
1007 struct vdo_page_cache *cache = info->cache;
1008
1009 vio_record_metadata_io_error(as_vio(completion));
1010
1011 /* If we're already read-only, write failures are to be expected. */
1012 if (result != VDO_READ_ONLY) {
1013 vdo_log_ratelimit(vdo_log_error,
1014 "failed to write block map page %llu",
1015 (unsigned long long) info->pbn);
1016 }
1017
1018 set_info_state(info, PS_DIRTY);
1019 ADD_ONCE(cache->stats.failed_writes, 1);
1020 set_persistent_error(cache, "cannot write page", result);
1021
1022 if (!write_has_finished(info))
1023 discard_page_if_needed(cache);
1024
1025 check_for_drain_complete(cache->zone);
1026 }
1027
1028 static void page_is_written_out(struct vdo_completion *completion);
1029
write_cache_page_endio(struct bio * bio)1030 static void write_cache_page_endio(struct bio *bio)
1031 {
1032 struct vio *vio = bio->bi_private;
1033 struct page_info *info = vio->completion.parent;
1034
1035 continue_vio_after_io(vio, page_is_written_out, info->cache->zone->thread_id);
1036 }
1037
1038 /**
1039 * page_is_written_out() - Callback used when a page has been written out.
1040 * @completion: The vio which wrote the page. Its parent is a page_info.
1041 */
page_is_written_out(struct vdo_completion * completion)1042 static void page_is_written_out(struct vdo_completion *completion)
1043 {
1044 bool was_discard, reclaimed;
1045 u32 reclamations;
1046 struct page_info *info = completion->parent;
1047 struct vdo_page_cache *cache = info->cache;
1048 struct block_map_page *page = (struct block_map_page *) get_page_buffer(info);
1049
1050 if (!page->header.initialized) {
1051 page->header.initialized = true;
1052 vdo_submit_metadata_vio(info->vio, info->pbn,
1053 write_cache_page_endio,
1054 handle_page_write_error,
1055 REQ_OP_WRITE | REQ_PRIO | REQ_PREFLUSH);
1056 return;
1057 }
1058
1059 /* Handle journal updates and torn write protection. */
1060 vdo_release_recovery_journal_block_reference(cache->zone->block_map->journal,
1061 info->recovery_lock,
1062 VDO_ZONE_TYPE_LOGICAL,
1063 cache->zone->zone_number);
1064 info->recovery_lock = 0;
1065 was_discard = write_has_finished(info);
1066 reclaimed = (!was_discard || (info->busy > 0) || vdo_waitq_has_waiters(&info->waiting));
1067
1068 set_info_state(info, PS_RESIDENT);
1069
1070 reclamations = distribute_page_over_waitq(info, &info->waiting);
1071 ADD_ONCE(cache->stats.reclaimed, reclamations);
1072
1073 if (was_discard)
1074 cache->discard_count--;
1075
1076 if (reclaimed)
1077 discard_page_if_needed(cache);
1078 else
1079 allocate_free_page(info);
1080
1081 check_for_drain_complete(cache->zone);
1082 }
1083
1084 /**
1085 * write_pages() - Write the batch of pages which were covered by the layer flush which just
1086 * completed.
1087 * @flush_completion: The flush vio.
1088 *
1089 * This callback is registered in save_pages().
1090 */
write_pages(struct vdo_completion * flush_completion)1091 static void write_pages(struct vdo_completion *flush_completion)
1092 {
1093 struct vdo_page_cache *cache = ((struct page_info *) flush_completion->parent)->cache;
1094
1095 /*
1096 * We need to cache these two values on the stack since it is possible for the last
1097 * page info to cause the page cache to get freed. Hence once we launch the last page,
1098 * it may be unsafe to dereference the cache.
1099 */
1100 bool has_unflushed_pages = (cache->pages_to_flush > 0);
1101 page_count_t pages_in_flush = cache->pages_in_flush;
1102
1103 cache->pages_in_flush = 0;
1104 while (pages_in_flush-- > 0) {
1105 struct page_info *info =
1106 list_first_entry(&cache->outgoing_list, struct page_info,
1107 state_entry);
1108
1109 list_del_init(&info->state_entry);
1110 if (vdo_is_read_only(info->cache->vdo)) {
1111 struct vdo_completion *completion = &info->vio->completion;
1112
1113 vdo_reset_completion(completion);
1114 completion->callback = page_is_written_out;
1115 completion->error_handler = handle_page_write_error;
1116 vdo_fail_completion(completion, VDO_READ_ONLY);
1117 continue;
1118 }
1119 ADD_ONCE(info->cache->stats.pages_saved, 1);
1120 vdo_submit_metadata_vio(info->vio, info->pbn, write_cache_page_endio,
1121 handle_page_write_error, REQ_OP_WRITE | REQ_PRIO);
1122 }
1123
1124 if (has_unflushed_pages) {
1125 /*
1126 * If there are unflushed pages, the cache can't have been freed, so this call is
1127 * safe.
1128 */
1129 save_pages(cache);
1130 }
1131 }
1132
1133 /**
1134 * vdo_release_page_completion() - Release a VDO Page Completion.
1135 *
1136 * The page referenced by this completion (if any) will no longer be held busy by this completion.
1137 * If a page becomes discardable and there are completions awaiting free pages then a new round of
1138 * page discarding is started.
1139 */
vdo_release_page_completion(struct vdo_completion * completion)1140 void vdo_release_page_completion(struct vdo_completion *completion)
1141 {
1142 struct page_info *discard_info = NULL;
1143 struct vdo_page_completion *page_completion = as_vdo_page_completion(completion);
1144 struct vdo_page_cache *cache;
1145
1146 if (completion->result == VDO_SUCCESS) {
1147 if (!validate_completed_page_or_enter_read_only_mode(page_completion, false))
1148 return;
1149
1150 if (--page_completion->info->busy == 0)
1151 discard_info = page_completion->info;
1152 }
1153
1154 VDO_ASSERT_LOG_ONLY((page_completion->waiter.next_waiter == NULL),
1155 "Page being released after leaving all queues");
1156
1157 page_completion->info = NULL;
1158 cache = page_completion->cache;
1159 assert_on_cache_thread(cache, __func__);
1160
1161 if (discard_info != NULL) {
1162 if (discard_info->write_status == WRITE_STATUS_DEFERRED) {
1163 discard_info->write_status = WRITE_STATUS_NORMAL;
1164 launch_page_save(discard_info);
1165 }
1166
1167 /*
1168 * if there are excess requests for pages (that have not already started discards)
1169 * we need to discard some page (which may be this one)
1170 */
1171 discard_page_if_needed(cache);
1172 }
1173 }
1174
1175 /**
1176 * load_page_for_completion() - Helper function to load a page as described by a VDO Page
1177 * Completion.
1178 */
load_page_for_completion(struct page_info * info,struct vdo_page_completion * vdo_page_comp)1179 static void load_page_for_completion(struct page_info *info,
1180 struct vdo_page_completion *vdo_page_comp)
1181 {
1182 int result;
1183
1184 vdo_waitq_enqueue_waiter(&info->waiting, &vdo_page_comp->waiter);
1185 result = launch_page_load(info, vdo_page_comp->pbn);
1186 if (result != VDO_SUCCESS) {
1187 vdo_waitq_notify_all_waiters(&info->waiting,
1188 complete_waiter_with_error, &result);
1189 }
1190 }
1191
1192 /**
1193 * vdo_get_page() - Initialize a page completion and get a block map page.
1194 * @page_completion: The vdo_page_completion to initialize.
1195 * @zone: The block map zone of the desired page.
1196 * @pbn: The absolute physical block of the desired page.
1197 * @writable: Whether the page can be modified.
1198 * @parent: The object to notify when the fetch is complete.
1199 * @callback: The notification callback.
1200 * @error_handler: The handler for fetch errors.
1201 * @requeue: Whether we must requeue when notifying the parent.
1202 *
1203 * May cause another page to be discarded (potentially writing a dirty page) and the one nominated
1204 * by the completion to be loaded from disk. When the callback is invoked, the page will be
1205 * resident in the cache and marked busy. All callers must call vdo_release_page_completion()
1206 * when they are done with the page to clear the busy mark.
1207 */
vdo_get_page(struct vdo_page_completion * page_completion,struct block_map_zone * zone,physical_block_number_t pbn,bool writable,void * parent,vdo_action_fn callback,vdo_action_fn error_handler,bool requeue)1208 void vdo_get_page(struct vdo_page_completion *page_completion,
1209 struct block_map_zone *zone, physical_block_number_t pbn,
1210 bool writable, void *parent, vdo_action_fn callback,
1211 vdo_action_fn error_handler, bool requeue)
1212 {
1213 struct vdo_page_cache *cache = &zone->page_cache;
1214 struct vdo_completion *completion = &page_completion->completion;
1215 struct page_info *info;
1216
1217 assert_on_cache_thread(cache, __func__);
1218 VDO_ASSERT_LOG_ONLY((page_completion->waiter.next_waiter == NULL),
1219 "New page completion was not already on a wait queue");
1220
1221 *page_completion = (struct vdo_page_completion) {
1222 .pbn = pbn,
1223 .writable = writable,
1224 .cache = cache,
1225 };
1226
1227 vdo_initialize_completion(completion, cache->vdo, VDO_PAGE_COMPLETION);
1228 vdo_prepare_completion(completion, callback, error_handler,
1229 cache->zone->thread_id, parent);
1230 completion->requeue = requeue;
1231
1232 if (page_completion->writable && vdo_is_read_only(cache->vdo)) {
1233 vdo_fail_completion(completion, VDO_READ_ONLY);
1234 return;
1235 }
1236
1237 if (page_completion->writable)
1238 ADD_ONCE(cache->stats.write_count, 1);
1239 else
1240 ADD_ONCE(cache->stats.read_count, 1);
1241
1242 info = find_page(cache, page_completion->pbn);
1243 if (info != NULL) {
1244 /* The page is in the cache already. */
1245 if ((info->write_status == WRITE_STATUS_DEFERRED) ||
1246 is_incoming(info) ||
1247 (is_outgoing(info) && page_completion->writable)) {
1248 /* The page is unusable until it has finished I/O. */
1249 ADD_ONCE(cache->stats.wait_for_page, 1);
1250 vdo_waitq_enqueue_waiter(&info->waiting, &page_completion->waiter);
1251 return;
1252 }
1253
1254 if (is_valid(info)) {
1255 /* The page is usable. */
1256 ADD_ONCE(cache->stats.found_in_cache, 1);
1257 if (!is_present(info))
1258 ADD_ONCE(cache->stats.read_outgoing, 1);
1259 update_lru(info);
1260 info->busy++;
1261 complete_with_page(info, page_completion);
1262 return;
1263 }
1264
1265 /* Something horrible has gone wrong. */
1266 VDO_ASSERT_LOG_ONLY(false, "Info found in a usable state.");
1267 }
1268
1269 /* The page must be fetched. */
1270 info = find_free_page(cache);
1271 if (info != NULL) {
1272 ADD_ONCE(cache->stats.fetch_required, 1);
1273 load_page_for_completion(info, page_completion);
1274 return;
1275 }
1276
1277 /* The page must wait for a page to be discarded. */
1278 ADD_ONCE(cache->stats.discard_required, 1);
1279 discard_page_for_completion(page_completion);
1280 }
1281
1282 /**
1283 * vdo_request_page_write() - Request that a VDO page be written out as soon as it is not busy.
1284 * @completion: The vdo_page_completion containing the page.
1285 */
vdo_request_page_write(struct vdo_completion * completion)1286 void vdo_request_page_write(struct vdo_completion *completion)
1287 {
1288 struct page_info *info;
1289 struct vdo_page_completion *vdo_page_comp = as_vdo_page_completion(completion);
1290
1291 if (!validate_completed_page_or_enter_read_only_mode(vdo_page_comp, true))
1292 return;
1293
1294 info = vdo_page_comp->info;
1295 set_info_state(info, PS_DIRTY);
1296 launch_page_save(info);
1297 }
1298
1299 /**
1300 * vdo_get_cached_page() - Get the block map page from a page completion.
1301 * @completion: A vdo page completion whose callback has been called.
1302 * @page_ptr: A pointer to hold the page
1303 *
1304 * Return: VDO_SUCCESS or an error
1305 */
vdo_get_cached_page(struct vdo_completion * completion,struct block_map_page ** page_ptr)1306 int vdo_get_cached_page(struct vdo_completion *completion,
1307 struct block_map_page **page_ptr)
1308 {
1309 int result;
1310 struct vdo_page_completion *vpc;
1311
1312 vpc = as_vdo_page_completion(completion);
1313 result = validate_completed_page(vpc, true);
1314 if (result == VDO_SUCCESS)
1315 *page_ptr = (struct block_map_page *) get_page_buffer(vpc->info);
1316
1317 return result;
1318 }
1319
1320 /**
1321 * vdo_invalidate_page_cache() - Invalidate all entries in the VDO page cache.
1322 *
1323 * There must not be any dirty pages in the cache.
1324 *
1325 * Return: A success or error code.
1326 */
vdo_invalidate_page_cache(struct vdo_page_cache * cache)1327 int vdo_invalidate_page_cache(struct vdo_page_cache *cache)
1328 {
1329 struct page_info *info;
1330
1331 assert_on_cache_thread(cache, __func__);
1332
1333 /* Make sure we don't throw away any dirty pages. */
1334 for (info = cache->infos; info < cache->infos + cache->page_count; info++) {
1335 int result = VDO_ASSERT(!is_dirty(info), "cache must have no dirty pages");
1336
1337 if (result != VDO_SUCCESS)
1338 return result;
1339 }
1340
1341 /* Reset the page map by re-allocating it. */
1342 vdo_int_map_free(vdo_forget(cache->page_map));
1343 return vdo_int_map_create(cache->page_count, &cache->page_map);
1344 }
1345
1346 /**
1347 * get_tree_page_by_index() - Get the tree page for a given height and page index.
1348 *
1349 * Return: The requested page.
1350 */
get_tree_page_by_index(struct forest * forest,root_count_t root_index,height_t height,page_number_t page_index)1351 static struct tree_page * __must_check get_tree_page_by_index(struct forest *forest,
1352 root_count_t root_index,
1353 height_t height,
1354 page_number_t page_index)
1355 {
1356 page_number_t offset = 0;
1357 size_t segment;
1358
1359 for (segment = 0; segment < forest->segments; segment++) {
1360 page_number_t border = forest->boundaries[segment].levels[height - 1];
1361
1362 if (page_index < border) {
1363 struct block_map_tree *tree = &forest->trees[root_index];
1364
1365 return &(tree->segments[segment].levels[height - 1][page_index - offset]);
1366 }
1367
1368 offset = border;
1369 }
1370
1371 return NULL;
1372 }
1373
1374 /* Get the page referred to by the lock's tree slot at its current height. */
get_tree_page(const struct block_map_zone * zone,const struct tree_lock * lock)1375 static inline struct tree_page *get_tree_page(const struct block_map_zone *zone,
1376 const struct tree_lock *lock)
1377 {
1378 return get_tree_page_by_index(zone->block_map->forest, lock->root_index,
1379 lock->height,
1380 lock->tree_slots[lock->height].page_index);
1381 }
1382
1383 /** vdo_copy_valid_page() - Validate and copy a buffer to a page. */
vdo_copy_valid_page(char * buffer,nonce_t nonce,physical_block_number_t pbn,struct block_map_page * page)1384 bool vdo_copy_valid_page(char *buffer, nonce_t nonce,
1385 physical_block_number_t pbn,
1386 struct block_map_page *page)
1387 {
1388 struct block_map_page *loaded = (struct block_map_page *) buffer;
1389 enum block_map_page_validity validity =
1390 vdo_validate_block_map_page(loaded, nonce, pbn);
1391
1392 if (validity == VDO_BLOCK_MAP_PAGE_VALID) {
1393 memcpy(page, loaded, VDO_BLOCK_SIZE);
1394 return true;
1395 }
1396
1397 if (validity == VDO_BLOCK_MAP_PAGE_BAD) {
1398 vdo_log_error_strerror(VDO_BAD_PAGE,
1399 "Expected page %llu but got page %llu instead",
1400 (unsigned long long) pbn,
1401 (unsigned long long) vdo_get_block_map_page_pbn(loaded));
1402 }
1403
1404 return false;
1405 }
1406
1407 /**
1408 * in_cyclic_range() - Check whether the given value is between the lower and upper bounds, within
1409 * a cyclic range of values from 0 to (modulus - 1).
1410 * @lower: The lowest value to accept.
1411 * @value: The value to check.
1412 * @upper: The highest value to accept.
1413 * @modulus: The size of the cyclic space, no more than 2^15.
1414 *
1415 * The value and both bounds must be smaller than the modulus.
1416 *
1417 * Return: true if the value is in range.
1418 */
in_cyclic_range(u16 lower,u16 value,u16 upper,u16 modulus)1419 static bool in_cyclic_range(u16 lower, u16 value, u16 upper, u16 modulus)
1420 {
1421 if (value < lower)
1422 value += modulus;
1423 if (upper < lower)
1424 upper += modulus;
1425 return (value <= upper);
1426 }
1427
1428 /**
1429 * is_not_older() - Check whether a generation is strictly older than some other generation in the
1430 * context of a zone's current generation range.
1431 * @zone: The zone in which to do the comparison.
1432 * @a: The generation in question.
1433 * @b: The generation to compare to.
1434 *
1435 * Return: true if generation @a is not strictly older than generation @b in the context of @zone
1436 */
is_not_older(struct block_map_zone * zone,u8 a,u8 b)1437 static bool __must_check is_not_older(struct block_map_zone *zone, u8 a, u8 b)
1438 {
1439 int result;
1440
1441 result = VDO_ASSERT((in_cyclic_range(zone->oldest_generation, a, zone->generation, 1 << 8) &&
1442 in_cyclic_range(zone->oldest_generation, b, zone->generation, 1 << 8)),
1443 "generation(s) %u, %u are out of range [%u, %u]",
1444 a, b, zone->oldest_generation, zone->generation);
1445 if (result != VDO_SUCCESS) {
1446 enter_zone_read_only_mode(zone, result);
1447 return true;
1448 }
1449
1450 return in_cyclic_range(b, a, zone->generation, 1 << 8);
1451 }
1452
release_generation(struct block_map_zone * zone,u8 generation)1453 static void release_generation(struct block_map_zone *zone, u8 generation)
1454 {
1455 int result;
1456
1457 result = VDO_ASSERT((zone->dirty_page_counts[generation] > 0),
1458 "dirty page count underflow for generation %u", generation);
1459 if (result != VDO_SUCCESS) {
1460 enter_zone_read_only_mode(zone, result);
1461 return;
1462 }
1463
1464 zone->dirty_page_counts[generation]--;
1465 while ((zone->dirty_page_counts[zone->oldest_generation] == 0) &&
1466 (zone->oldest_generation != zone->generation))
1467 zone->oldest_generation++;
1468 }
1469
set_generation(struct block_map_zone * zone,struct tree_page * page,u8 new_generation)1470 static void set_generation(struct block_map_zone *zone, struct tree_page *page,
1471 u8 new_generation)
1472 {
1473 u32 new_count;
1474 int result;
1475 bool decrement_old = vdo_waiter_is_waiting(&page->waiter);
1476 u8 old_generation = page->generation;
1477
1478 if (decrement_old && (old_generation == new_generation))
1479 return;
1480
1481 page->generation = new_generation;
1482 new_count = ++zone->dirty_page_counts[new_generation];
1483 result = VDO_ASSERT((new_count != 0), "dirty page count overflow for generation %u",
1484 new_generation);
1485 if (result != VDO_SUCCESS) {
1486 enter_zone_read_only_mode(zone, result);
1487 return;
1488 }
1489
1490 if (decrement_old)
1491 release_generation(zone, old_generation);
1492 }
1493
1494 static void write_page(struct tree_page *tree_page, struct pooled_vio *vio);
1495
1496 /* Implements waiter_callback_fn */
write_page_callback(struct vdo_waiter * waiter,void * context)1497 static void write_page_callback(struct vdo_waiter *waiter, void *context)
1498 {
1499 write_page(container_of(waiter, struct tree_page, waiter), context);
1500 }
1501
acquire_vio(struct vdo_waiter * waiter,struct block_map_zone * zone)1502 static void acquire_vio(struct vdo_waiter *waiter, struct block_map_zone *zone)
1503 {
1504 waiter->callback = write_page_callback;
1505 acquire_vio_from_pool(zone->vio_pool, waiter);
1506 }
1507
1508 /* Return: true if all possible generations were not already active */
attempt_increment(struct block_map_zone * zone)1509 static bool attempt_increment(struct block_map_zone *zone)
1510 {
1511 u8 generation = zone->generation + 1;
1512
1513 if (zone->oldest_generation == generation)
1514 return false;
1515
1516 zone->generation = generation;
1517 return true;
1518 }
1519
1520 /* Launches a flush if one is not already in progress. */
enqueue_page(struct tree_page * page,struct block_map_zone * zone)1521 static void enqueue_page(struct tree_page *page, struct block_map_zone *zone)
1522 {
1523 if ((zone->flusher == NULL) && attempt_increment(zone)) {
1524 zone->flusher = page;
1525 acquire_vio(&page->waiter, zone);
1526 return;
1527 }
1528
1529 vdo_waitq_enqueue_waiter(&zone->flush_waiters, &page->waiter);
1530 }
1531
write_page_if_not_dirtied(struct vdo_waiter * waiter,void * context)1532 static void write_page_if_not_dirtied(struct vdo_waiter *waiter, void *context)
1533 {
1534 struct tree_page *page = container_of(waiter, struct tree_page, waiter);
1535 struct write_if_not_dirtied_context *write_context = context;
1536
1537 if (page->generation == write_context->generation) {
1538 acquire_vio(waiter, write_context->zone);
1539 return;
1540 }
1541
1542 enqueue_page(page, write_context->zone);
1543 }
1544
return_to_pool(struct block_map_zone * zone,struct pooled_vio * vio)1545 static void return_to_pool(struct block_map_zone *zone, struct pooled_vio *vio)
1546 {
1547 return_vio_to_pool(zone->vio_pool, vio);
1548 check_for_drain_complete(zone);
1549 }
1550
1551 /* This callback is registered in write_initialized_page(). */
finish_page_write(struct vdo_completion * completion)1552 static void finish_page_write(struct vdo_completion *completion)
1553 {
1554 bool dirty;
1555 struct vio *vio = as_vio(completion);
1556 struct pooled_vio *pooled = container_of(vio, struct pooled_vio, vio);
1557 struct tree_page *page = completion->parent;
1558 struct block_map_zone *zone = pooled->context;
1559
1560 vdo_release_recovery_journal_block_reference(zone->block_map->journal,
1561 page->writing_recovery_lock,
1562 VDO_ZONE_TYPE_LOGICAL,
1563 zone->zone_number);
1564
1565 dirty = (page->writing_generation != page->generation);
1566 release_generation(zone, page->writing_generation);
1567 page->writing = false;
1568
1569 if (zone->flusher == page) {
1570 struct write_if_not_dirtied_context context = {
1571 .zone = zone,
1572 .generation = page->writing_generation,
1573 };
1574
1575 vdo_waitq_notify_all_waiters(&zone->flush_waiters,
1576 write_page_if_not_dirtied, &context);
1577 if (dirty && attempt_increment(zone)) {
1578 write_page(page, pooled);
1579 return;
1580 }
1581
1582 zone->flusher = NULL;
1583 }
1584
1585 if (dirty) {
1586 enqueue_page(page, zone);
1587 } else if ((zone->flusher == NULL) && vdo_waitq_has_waiters(&zone->flush_waiters) &&
1588 attempt_increment(zone)) {
1589 zone->flusher = container_of(vdo_waitq_dequeue_waiter(&zone->flush_waiters),
1590 struct tree_page, waiter);
1591 write_page(zone->flusher, pooled);
1592 return;
1593 }
1594
1595 return_to_pool(zone, pooled);
1596 }
1597
handle_write_error(struct vdo_completion * completion)1598 static void handle_write_error(struct vdo_completion *completion)
1599 {
1600 int result = completion->result;
1601 struct vio *vio = as_vio(completion);
1602 struct pooled_vio *pooled = container_of(vio, struct pooled_vio, vio);
1603 struct block_map_zone *zone = pooled->context;
1604
1605 vio_record_metadata_io_error(vio);
1606 enter_zone_read_only_mode(zone, result);
1607 return_to_pool(zone, pooled);
1608 }
1609
1610 static void write_page_endio(struct bio *bio);
1611
write_initialized_page(struct vdo_completion * completion)1612 static void write_initialized_page(struct vdo_completion *completion)
1613 {
1614 struct vio *vio = as_vio(completion);
1615 struct pooled_vio *pooled = container_of(vio, struct pooled_vio, vio);
1616 struct block_map_zone *zone = pooled->context;
1617 struct tree_page *tree_page = completion->parent;
1618 struct block_map_page *page = (struct block_map_page *) vio->data;
1619 blk_opf_t operation = REQ_OP_WRITE | REQ_PRIO;
1620
1621 /*
1622 * Now that we know the page has been written at least once, mark the copy we are writing
1623 * as initialized.
1624 */
1625 page->header.initialized = true;
1626
1627 if (zone->flusher == tree_page)
1628 operation |= REQ_PREFLUSH;
1629
1630 vdo_submit_metadata_vio(vio, vdo_get_block_map_page_pbn(page),
1631 write_page_endio, handle_write_error,
1632 operation);
1633 }
1634
write_page_endio(struct bio * bio)1635 static void write_page_endio(struct bio *bio)
1636 {
1637 struct pooled_vio *vio = bio->bi_private;
1638 struct block_map_zone *zone = vio->context;
1639 struct block_map_page *page = (struct block_map_page *) vio->vio.data;
1640
1641 continue_vio_after_io(&vio->vio,
1642 (page->header.initialized ?
1643 finish_page_write : write_initialized_page),
1644 zone->thread_id);
1645 }
1646
write_page(struct tree_page * tree_page,struct pooled_vio * vio)1647 static void write_page(struct tree_page *tree_page, struct pooled_vio *vio)
1648 {
1649 struct vdo_completion *completion = &vio->vio.completion;
1650 struct block_map_zone *zone = vio->context;
1651 struct block_map_page *page = vdo_as_block_map_page(tree_page);
1652
1653 if ((zone->flusher != tree_page) &&
1654 is_not_older(zone, tree_page->generation, zone->generation)) {
1655 /*
1656 * This page was re-dirtied after the last flush was issued, hence we need to do
1657 * another flush.
1658 */
1659 enqueue_page(tree_page, zone);
1660 return_to_pool(zone, vio);
1661 return;
1662 }
1663
1664 completion->parent = tree_page;
1665 memcpy(vio->vio.data, tree_page->page_buffer, VDO_BLOCK_SIZE);
1666 completion->callback_thread_id = zone->thread_id;
1667
1668 tree_page->writing = true;
1669 tree_page->writing_generation = tree_page->generation;
1670 tree_page->writing_recovery_lock = tree_page->recovery_lock;
1671
1672 /* Clear this now so that we know this page is not on any dirty list. */
1673 tree_page->recovery_lock = 0;
1674
1675 /*
1676 * We've already copied the page into the vio which will write it, so if it was not yet
1677 * initialized, the first write will indicate that (for torn write protection). It is now
1678 * safe to mark it as initialized in memory since if the write fails, the in memory state
1679 * will become irrelevant.
1680 */
1681 if (page->header.initialized) {
1682 write_initialized_page(completion);
1683 return;
1684 }
1685
1686 page->header.initialized = true;
1687 vdo_submit_metadata_vio(&vio->vio, vdo_get_block_map_page_pbn(page),
1688 write_page_endio, handle_write_error,
1689 REQ_OP_WRITE | REQ_PRIO);
1690 }
1691
1692 /* Release a lock on a page which was being loaded or allocated. */
release_page_lock(struct data_vio * data_vio,char * what)1693 static void release_page_lock(struct data_vio *data_vio, char *what)
1694 {
1695 struct block_map_zone *zone;
1696 struct tree_lock *lock_holder;
1697 struct tree_lock *lock = &data_vio->tree_lock;
1698
1699 VDO_ASSERT_LOG_ONLY(lock->locked,
1700 "release of unlocked block map page %s for key %llu in tree %u",
1701 what, (unsigned long long) lock->key, lock->root_index);
1702
1703 zone = data_vio->logical.zone->block_map_zone;
1704 lock_holder = vdo_int_map_remove(zone->loading_pages, lock->key);
1705 VDO_ASSERT_LOG_ONLY((lock_holder == lock),
1706 "block map page %s mismatch for key %llu in tree %u",
1707 what, (unsigned long long) lock->key, lock->root_index);
1708 lock->locked = false;
1709 }
1710
finish_lookup(struct data_vio * data_vio,int result)1711 static void finish_lookup(struct data_vio *data_vio, int result)
1712 {
1713 data_vio->tree_lock.height = 0;
1714
1715 --data_vio->logical.zone->block_map_zone->active_lookups;
1716
1717 set_data_vio_logical_callback(data_vio, continue_data_vio_with_block_map_slot);
1718 data_vio->vio.completion.error_handler = handle_data_vio_error;
1719 continue_data_vio_with_error(data_vio, result);
1720 }
1721
abort_lookup_for_waiter(struct vdo_waiter * waiter,void * context)1722 static void abort_lookup_for_waiter(struct vdo_waiter *waiter, void *context)
1723 {
1724 struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter);
1725 int result = *((int *) context);
1726
1727 if (!data_vio->write) {
1728 if (result == VDO_NO_SPACE)
1729 result = VDO_SUCCESS;
1730 } else if (result != VDO_NO_SPACE) {
1731 result = VDO_READ_ONLY;
1732 }
1733
1734 finish_lookup(data_vio, result);
1735 }
1736
abort_lookup(struct data_vio * data_vio,int result,char * what)1737 static void abort_lookup(struct data_vio *data_vio, int result, char *what)
1738 {
1739 if (result != VDO_NO_SPACE)
1740 enter_zone_read_only_mode(data_vio->logical.zone->block_map_zone, result);
1741
1742 if (data_vio->tree_lock.locked) {
1743 release_page_lock(data_vio, what);
1744 vdo_waitq_notify_all_waiters(&data_vio->tree_lock.waiters,
1745 abort_lookup_for_waiter,
1746 &result);
1747 }
1748
1749 finish_lookup(data_vio, result);
1750 }
1751
abort_load(struct data_vio * data_vio,int result)1752 static void abort_load(struct data_vio *data_vio, int result)
1753 {
1754 abort_lookup(data_vio, result, "load");
1755 }
1756
is_invalid_tree_entry(const struct vdo * vdo,const struct data_location * mapping,height_t height)1757 static bool __must_check is_invalid_tree_entry(const struct vdo *vdo,
1758 const struct data_location *mapping,
1759 height_t height)
1760 {
1761 if (!vdo_is_valid_location(mapping) ||
1762 vdo_is_state_compressed(mapping->state) ||
1763 (vdo_is_mapped_location(mapping) && (mapping->pbn == VDO_ZERO_BLOCK)))
1764 return true;
1765
1766 /* Roots aren't physical data blocks, so we can't check their PBNs. */
1767 if (height == VDO_BLOCK_MAP_TREE_HEIGHT)
1768 return false;
1769
1770 return !vdo_is_physical_data_block(vdo->depot, mapping->pbn);
1771 }
1772
1773 static void load_block_map_page(struct block_map_zone *zone, struct data_vio *data_vio);
1774 static void allocate_block_map_page(struct block_map_zone *zone,
1775 struct data_vio *data_vio);
1776
continue_with_loaded_page(struct data_vio * data_vio,struct block_map_page * page)1777 static void continue_with_loaded_page(struct data_vio *data_vio,
1778 struct block_map_page *page)
1779 {
1780 struct tree_lock *lock = &data_vio->tree_lock;
1781 struct block_map_tree_slot slot = lock->tree_slots[lock->height];
1782 struct data_location mapping =
1783 vdo_unpack_block_map_entry(&page->entries[slot.block_map_slot.slot]);
1784
1785 if (is_invalid_tree_entry(vdo_from_data_vio(data_vio), &mapping, lock->height)) {
1786 vdo_log_error_strerror(VDO_BAD_MAPPING,
1787 "Invalid block map tree PBN: %llu with state %u for page index %u at height %u",
1788 (unsigned long long) mapping.pbn, mapping.state,
1789 lock->tree_slots[lock->height - 1].page_index,
1790 lock->height - 1);
1791 abort_load(data_vio, VDO_BAD_MAPPING);
1792 return;
1793 }
1794
1795 if (!vdo_is_mapped_location(&mapping)) {
1796 /* The page we need is unallocated */
1797 allocate_block_map_page(data_vio->logical.zone->block_map_zone,
1798 data_vio);
1799 return;
1800 }
1801
1802 lock->tree_slots[lock->height - 1].block_map_slot.pbn = mapping.pbn;
1803 if (lock->height == 1) {
1804 finish_lookup(data_vio, VDO_SUCCESS);
1805 return;
1806 }
1807
1808 /* We know what page we need to load next */
1809 load_block_map_page(data_vio->logical.zone->block_map_zone, data_vio);
1810 }
1811
continue_load_for_waiter(struct vdo_waiter * waiter,void * context)1812 static void continue_load_for_waiter(struct vdo_waiter *waiter, void *context)
1813 {
1814 struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter);
1815
1816 data_vio->tree_lock.height--;
1817 continue_with_loaded_page(data_vio, context);
1818 }
1819
finish_block_map_page_load(struct vdo_completion * completion)1820 static void finish_block_map_page_load(struct vdo_completion *completion)
1821 {
1822 physical_block_number_t pbn;
1823 struct tree_page *tree_page;
1824 struct block_map_page *page;
1825 nonce_t nonce;
1826 struct vio *vio = as_vio(completion);
1827 struct pooled_vio *pooled = vio_as_pooled_vio(vio);
1828 struct data_vio *data_vio = completion->parent;
1829 struct block_map_zone *zone = pooled->context;
1830 struct tree_lock *tree_lock = &data_vio->tree_lock;
1831
1832 tree_lock->height--;
1833 pbn = tree_lock->tree_slots[tree_lock->height].block_map_slot.pbn;
1834 tree_page = get_tree_page(zone, tree_lock);
1835 page = (struct block_map_page *) tree_page->page_buffer;
1836 nonce = zone->block_map->nonce;
1837
1838 if (!vdo_copy_valid_page(vio->data, nonce, pbn, page))
1839 vdo_format_block_map_page(page, nonce, pbn, false);
1840 return_vio_to_pool(zone->vio_pool, pooled);
1841
1842 /* Release our claim to the load and wake any waiters */
1843 release_page_lock(data_vio, "load");
1844 vdo_waitq_notify_all_waiters(&tree_lock->waiters, continue_load_for_waiter, page);
1845 continue_with_loaded_page(data_vio, page);
1846 }
1847
handle_io_error(struct vdo_completion * completion)1848 static void handle_io_error(struct vdo_completion *completion)
1849 {
1850 int result = completion->result;
1851 struct vio *vio = as_vio(completion);
1852 struct pooled_vio *pooled = container_of(vio, struct pooled_vio, vio);
1853 struct data_vio *data_vio = completion->parent;
1854 struct block_map_zone *zone = pooled->context;
1855
1856 vio_record_metadata_io_error(vio);
1857 return_vio_to_pool(zone->vio_pool, pooled);
1858 abort_load(data_vio, result);
1859 }
1860
load_page_endio(struct bio * bio)1861 static void load_page_endio(struct bio *bio)
1862 {
1863 struct vio *vio = bio->bi_private;
1864 struct data_vio *data_vio = vio->completion.parent;
1865
1866 continue_vio_after_io(vio, finish_block_map_page_load,
1867 data_vio->logical.zone->thread_id);
1868 }
1869
load_page(struct vdo_waiter * waiter,void * context)1870 static void load_page(struct vdo_waiter *waiter, void *context)
1871 {
1872 struct pooled_vio *pooled = context;
1873 struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter);
1874 struct tree_lock *lock = &data_vio->tree_lock;
1875 physical_block_number_t pbn = lock->tree_slots[lock->height - 1].block_map_slot.pbn;
1876
1877 pooled->vio.completion.parent = data_vio;
1878 vdo_submit_metadata_vio(&pooled->vio, pbn, load_page_endio,
1879 handle_io_error, REQ_OP_READ | REQ_PRIO);
1880 }
1881
1882 /*
1883 * If the page is already locked, queue up to wait for the lock to be released. If the lock is
1884 * acquired, @data_vio->tree_lock.locked will be true.
1885 */
attempt_page_lock(struct block_map_zone * zone,struct data_vio * data_vio)1886 static int attempt_page_lock(struct block_map_zone *zone, struct data_vio *data_vio)
1887 {
1888 int result;
1889 struct tree_lock *lock_holder;
1890 struct tree_lock *lock = &data_vio->tree_lock;
1891 height_t height = lock->height;
1892 struct block_map_tree_slot tree_slot = lock->tree_slots[height];
1893 union page_key key;
1894
1895 key.descriptor = (struct page_descriptor) {
1896 .root_index = lock->root_index,
1897 .height = height,
1898 .page_index = tree_slot.page_index,
1899 .slot = tree_slot.block_map_slot.slot,
1900 };
1901 lock->key = key.key;
1902
1903 result = vdo_int_map_put(zone->loading_pages, lock->key,
1904 lock, false, (void **) &lock_holder);
1905 if (result != VDO_SUCCESS)
1906 return result;
1907
1908 if (lock_holder == NULL) {
1909 /* We got the lock */
1910 data_vio->tree_lock.locked = true;
1911 return VDO_SUCCESS;
1912 }
1913
1914 /* Someone else is loading or allocating the page we need */
1915 vdo_waitq_enqueue_waiter(&lock_holder->waiters, &data_vio->waiter);
1916 return VDO_SUCCESS;
1917 }
1918
1919 /* Load a block map tree page from disk, for the next level in the data vio tree lock. */
load_block_map_page(struct block_map_zone * zone,struct data_vio * data_vio)1920 static void load_block_map_page(struct block_map_zone *zone, struct data_vio *data_vio)
1921 {
1922 int result;
1923
1924 result = attempt_page_lock(zone, data_vio);
1925 if (result != VDO_SUCCESS) {
1926 abort_load(data_vio, result);
1927 return;
1928 }
1929
1930 if (data_vio->tree_lock.locked) {
1931 data_vio->waiter.callback = load_page;
1932 acquire_vio_from_pool(zone->vio_pool, &data_vio->waiter);
1933 }
1934 }
1935
allocation_failure(struct vdo_completion * completion)1936 static void allocation_failure(struct vdo_completion *completion)
1937 {
1938 struct data_vio *data_vio = as_data_vio(completion);
1939
1940 if (vdo_requeue_completion_if_needed(completion,
1941 data_vio->logical.zone->thread_id))
1942 return;
1943
1944 abort_lookup(data_vio, completion->result, "allocation");
1945 }
1946
continue_allocation_for_waiter(struct vdo_waiter * waiter,void * context)1947 static void continue_allocation_for_waiter(struct vdo_waiter *waiter, void *context)
1948 {
1949 struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter);
1950 struct tree_lock *tree_lock = &data_vio->tree_lock;
1951 physical_block_number_t pbn = *((physical_block_number_t *) context);
1952
1953 tree_lock->height--;
1954 data_vio->tree_lock.tree_slots[tree_lock->height].block_map_slot.pbn = pbn;
1955
1956 if (tree_lock->height == 0) {
1957 finish_lookup(data_vio, VDO_SUCCESS);
1958 return;
1959 }
1960
1961 allocate_block_map_page(data_vio->logical.zone->block_map_zone, data_vio);
1962 }
1963
1964 /** expire_oldest_list() - Expire the oldest list. */
expire_oldest_list(struct dirty_lists * dirty_lists)1965 static void expire_oldest_list(struct dirty_lists *dirty_lists)
1966 {
1967 block_count_t i = dirty_lists->offset++;
1968
1969 dirty_lists->oldest_period++;
1970 if (!list_empty(&dirty_lists->eras[i][VDO_TREE_PAGE])) {
1971 list_splice_tail_init(&dirty_lists->eras[i][VDO_TREE_PAGE],
1972 &dirty_lists->expired[VDO_TREE_PAGE]);
1973 }
1974
1975 if (!list_empty(&dirty_lists->eras[i][VDO_CACHE_PAGE])) {
1976 list_splice_tail_init(&dirty_lists->eras[i][VDO_CACHE_PAGE],
1977 &dirty_lists->expired[VDO_CACHE_PAGE]);
1978 }
1979
1980 if (dirty_lists->offset == dirty_lists->maximum_age)
1981 dirty_lists->offset = 0;
1982 }
1983
1984
1985 /** update_period() - Update the dirty_lists period if necessary. */
update_period(struct dirty_lists * dirty,sequence_number_t period)1986 static void update_period(struct dirty_lists *dirty, sequence_number_t period)
1987 {
1988 while (dirty->next_period <= period) {
1989 if ((dirty->next_period - dirty->oldest_period) == dirty->maximum_age)
1990 expire_oldest_list(dirty);
1991 dirty->next_period++;
1992 }
1993 }
1994
1995 /** write_expired_elements() - Write out the expired list. */
write_expired_elements(struct block_map_zone * zone)1996 static void write_expired_elements(struct block_map_zone *zone)
1997 {
1998 struct tree_page *page, *ttmp;
1999 struct page_info *info, *ptmp;
2000 struct list_head *expired;
2001 u8 generation = zone->generation;
2002
2003 expired = &zone->dirty_lists->expired[VDO_TREE_PAGE];
2004 list_for_each_entry_safe(page, ttmp, expired, entry) {
2005 int result;
2006
2007 list_del_init(&page->entry);
2008
2009 result = VDO_ASSERT(!vdo_waiter_is_waiting(&page->waiter),
2010 "Newly expired page not already waiting to write");
2011 if (result != VDO_SUCCESS) {
2012 enter_zone_read_only_mode(zone, result);
2013 continue;
2014 }
2015
2016 set_generation(zone, page, generation);
2017 if (!page->writing)
2018 enqueue_page(page, zone);
2019 }
2020
2021 expired = &zone->dirty_lists->expired[VDO_CACHE_PAGE];
2022 list_for_each_entry_safe(info, ptmp, expired, state_entry) {
2023 list_del_init(&info->state_entry);
2024 schedule_page_save(info);
2025 }
2026
2027 save_pages(&zone->page_cache);
2028 }
2029
2030 /**
2031 * add_to_dirty_lists() - Add an element to the dirty lists.
2032 * @zone: The zone in which we are operating.
2033 * @entry: The list entry of the element to add.
2034 * @type: The type of page.
2035 * @old_period: The period in which the element was previously dirtied, or 0 if it was not dirty.
2036 * @new_period: The period in which the element has now been dirtied, or 0 if it does not hold a
2037 * lock.
2038 */
add_to_dirty_lists(struct block_map_zone * zone,struct list_head * entry,enum block_map_page_type type,sequence_number_t old_period,sequence_number_t new_period)2039 static void add_to_dirty_lists(struct block_map_zone *zone,
2040 struct list_head *entry,
2041 enum block_map_page_type type,
2042 sequence_number_t old_period,
2043 sequence_number_t new_period)
2044 {
2045 struct dirty_lists *dirty_lists = zone->dirty_lists;
2046
2047 if ((old_period == new_period) || ((old_period != 0) && (old_period < new_period)))
2048 return;
2049
2050 if (new_period < dirty_lists->oldest_period) {
2051 list_move_tail(entry, &dirty_lists->expired[type]);
2052 } else {
2053 update_period(dirty_lists, new_period);
2054 list_move_tail(entry,
2055 &dirty_lists->eras[new_period % dirty_lists->maximum_age][type]);
2056 }
2057
2058 write_expired_elements(zone);
2059 }
2060
2061 /*
2062 * Record the allocation in the tree and wake any waiters now that the write lock has been
2063 * released.
2064 */
finish_block_map_allocation(struct vdo_completion * completion)2065 static void finish_block_map_allocation(struct vdo_completion *completion)
2066 {
2067 physical_block_number_t pbn;
2068 struct tree_page *tree_page;
2069 struct block_map_page *page;
2070 sequence_number_t old_lock;
2071 struct data_vio *data_vio = as_data_vio(completion);
2072 struct block_map_zone *zone = data_vio->logical.zone->block_map_zone;
2073 struct tree_lock *tree_lock = &data_vio->tree_lock;
2074 height_t height = tree_lock->height;
2075
2076 assert_data_vio_in_logical_zone(data_vio);
2077
2078 tree_page = get_tree_page(zone, tree_lock);
2079 pbn = tree_lock->tree_slots[height - 1].block_map_slot.pbn;
2080
2081 /* Record the allocation. */
2082 page = (struct block_map_page *) tree_page->page_buffer;
2083 old_lock = tree_page->recovery_lock;
2084 vdo_update_block_map_page(page, data_vio, pbn,
2085 VDO_MAPPING_STATE_UNCOMPRESSED,
2086 &tree_page->recovery_lock);
2087
2088 if (vdo_waiter_is_waiting(&tree_page->waiter)) {
2089 /* This page is waiting to be written out. */
2090 if (zone->flusher != tree_page) {
2091 /*
2092 * The outstanding flush won't cover the update we just made,
2093 * so mark the page as needing another flush.
2094 */
2095 set_generation(zone, tree_page, zone->generation);
2096 }
2097 } else {
2098 /* Put the page on a dirty list */
2099 if (old_lock == 0)
2100 INIT_LIST_HEAD(&tree_page->entry);
2101 add_to_dirty_lists(zone, &tree_page->entry, VDO_TREE_PAGE,
2102 old_lock, tree_page->recovery_lock);
2103 }
2104
2105 tree_lock->height--;
2106 if (height > 1) {
2107 /* Format the interior node we just allocated (in memory). */
2108 tree_page = get_tree_page(zone, tree_lock);
2109 vdo_format_block_map_page(tree_page->page_buffer,
2110 zone->block_map->nonce,
2111 pbn, false);
2112 }
2113
2114 /* Release our claim to the allocation and wake any waiters */
2115 release_page_lock(data_vio, "allocation");
2116 vdo_waitq_notify_all_waiters(&tree_lock->waiters,
2117 continue_allocation_for_waiter, &pbn);
2118 if (tree_lock->height == 0) {
2119 finish_lookup(data_vio, VDO_SUCCESS);
2120 return;
2121 }
2122
2123 allocate_block_map_page(zone, data_vio);
2124 }
2125
release_block_map_write_lock(struct vdo_completion * completion)2126 static void release_block_map_write_lock(struct vdo_completion *completion)
2127 {
2128 struct data_vio *data_vio = as_data_vio(completion);
2129
2130 assert_data_vio_in_allocated_zone(data_vio);
2131
2132 release_data_vio_allocation_lock(data_vio, true);
2133 launch_data_vio_logical_callback(data_vio, finish_block_map_allocation);
2134 }
2135
2136 /*
2137 * Newly allocated block map pages are set to have to MAXIMUM_REFERENCES after they are journaled,
2138 * to prevent deduplication against the block after we release the write lock on it, but before we
2139 * write out the page.
2140 */
set_block_map_page_reference_count(struct vdo_completion * completion)2141 static void set_block_map_page_reference_count(struct vdo_completion *completion)
2142 {
2143 struct data_vio *data_vio = as_data_vio(completion);
2144
2145 assert_data_vio_in_allocated_zone(data_vio);
2146
2147 completion->callback = release_block_map_write_lock;
2148 vdo_modify_reference_count(completion, &data_vio->increment_updater);
2149 }
2150
journal_block_map_allocation(struct vdo_completion * completion)2151 static void journal_block_map_allocation(struct vdo_completion *completion)
2152 {
2153 struct data_vio *data_vio = as_data_vio(completion);
2154
2155 assert_data_vio_in_journal_zone(data_vio);
2156
2157 set_data_vio_allocated_zone_callback(data_vio,
2158 set_block_map_page_reference_count);
2159 vdo_add_recovery_journal_entry(completion->vdo->recovery_journal, data_vio);
2160 }
2161
allocate_block(struct vdo_completion * completion)2162 static void allocate_block(struct vdo_completion *completion)
2163 {
2164 struct data_vio *data_vio = as_data_vio(completion);
2165 struct tree_lock *lock = &data_vio->tree_lock;
2166 physical_block_number_t pbn;
2167
2168 assert_data_vio_in_allocated_zone(data_vio);
2169
2170 if (!vdo_allocate_block_in_zone(data_vio))
2171 return;
2172
2173 pbn = data_vio->allocation.pbn;
2174 lock->tree_slots[lock->height - 1].block_map_slot.pbn = pbn;
2175 data_vio->increment_updater = (struct reference_updater) {
2176 .operation = VDO_JOURNAL_BLOCK_MAP_REMAPPING,
2177 .increment = true,
2178 .zpbn = {
2179 .pbn = pbn,
2180 .state = VDO_MAPPING_STATE_UNCOMPRESSED,
2181 },
2182 .lock = data_vio->allocation.lock,
2183 };
2184
2185 launch_data_vio_journal_callback(data_vio, journal_block_map_allocation);
2186 }
2187
allocate_block_map_page(struct block_map_zone * zone,struct data_vio * data_vio)2188 static void allocate_block_map_page(struct block_map_zone *zone,
2189 struct data_vio *data_vio)
2190 {
2191 int result;
2192
2193 if (!data_vio->write || data_vio->is_discard) {
2194 /* This is a pure read or a discard, so there's nothing left to do here. */
2195 finish_lookup(data_vio, VDO_SUCCESS);
2196 return;
2197 }
2198
2199 result = attempt_page_lock(zone, data_vio);
2200 if (result != VDO_SUCCESS) {
2201 abort_lookup(data_vio, result, "allocation");
2202 return;
2203 }
2204
2205 if (!data_vio->tree_lock.locked)
2206 return;
2207
2208 data_vio_allocate_data_block(data_vio, VIO_BLOCK_MAP_WRITE_LOCK,
2209 allocate_block, allocation_failure);
2210 }
2211
2212 /**
2213 * vdo_find_block_map_slot() - Find the block map slot in which the block map entry for a data_vio
2214 * resides and cache that result in the data_vio.
2215 *
2216 * All ancestors in the tree will be allocated or loaded, as needed.
2217 */
vdo_find_block_map_slot(struct data_vio * data_vio)2218 void vdo_find_block_map_slot(struct data_vio *data_vio)
2219 {
2220 page_number_t page_index;
2221 struct block_map_tree_slot tree_slot;
2222 struct data_location mapping;
2223 struct block_map_page *page = NULL;
2224 struct tree_lock *lock = &data_vio->tree_lock;
2225 struct block_map_zone *zone = data_vio->logical.zone->block_map_zone;
2226
2227 zone->active_lookups++;
2228 if (vdo_is_state_draining(&zone->state)) {
2229 finish_lookup(data_vio, VDO_SHUTTING_DOWN);
2230 return;
2231 }
2232
2233 lock->tree_slots[0].block_map_slot.slot =
2234 data_vio->logical.lbn % VDO_BLOCK_MAP_ENTRIES_PER_PAGE;
2235 page_index = (lock->tree_slots[0].page_index / zone->block_map->root_count);
2236 tree_slot = (struct block_map_tree_slot) {
2237 .page_index = page_index / VDO_BLOCK_MAP_ENTRIES_PER_PAGE,
2238 .block_map_slot = {
2239 .pbn = 0,
2240 .slot = page_index % VDO_BLOCK_MAP_ENTRIES_PER_PAGE,
2241 },
2242 };
2243
2244 for (lock->height = 1; lock->height <= VDO_BLOCK_MAP_TREE_HEIGHT; lock->height++) {
2245 physical_block_number_t pbn;
2246
2247 lock->tree_slots[lock->height] = tree_slot;
2248 page = (struct block_map_page *) (get_tree_page(zone, lock)->page_buffer);
2249 pbn = vdo_get_block_map_page_pbn(page);
2250 if (pbn != VDO_ZERO_BLOCK) {
2251 lock->tree_slots[lock->height].block_map_slot.pbn = pbn;
2252 break;
2253 }
2254
2255 /* Calculate the index and slot for the next level. */
2256 tree_slot.block_map_slot.slot =
2257 tree_slot.page_index % VDO_BLOCK_MAP_ENTRIES_PER_PAGE;
2258 tree_slot.page_index = tree_slot.page_index / VDO_BLOCK_MAP_ENTRIES_PER_PAGE;
2259 }
2260
2261 /* The page at this height has been allocated and loaded. */
2262 mapping = vdo_unpack_block_map_entry(&page->entries[tree_slot.block_map_slot.slot]);
2263 if (is_invalid_tree_entry(vdo_from_data_vio(data_vio), &mapping, lock->height)) {
2264 vdo_log_error_strerror(VDO_BAD_MAPPING,
2265 "Invalid block map tree PBN: %llu with state %u for page index %u at height %u",
2266 (unsigned long long) mapping.pbn, mapping.state,
2267 lock->tree_slots[lock->height - 1].page_index,
2268 lock->height - 1);
2269 abort_load(data_vio, VDO_BAD_MAPPING);
2270 return;
2271 }
2272
2273 if (!vdo_is_mapped_location(&mapping)) {
2274 /* The page we want one level down has not been allocated, so allocate it. */
2275 allocate_block_map_page(zone, data_vio);
2276 return;
2277 }
2278
2279 lock->tree_slots[lock->height - 1].block_map_slot.pbn = mapping.pbn;
2280 if (lock->height == 1) {
2281 /* This is the ultimate block map page, so we're done */
2282 finish_lookup(data_vio, VDO_SUCCESS);
2283 return;
2284 }
2285
2286 /* We know what page we need to load. */
2287 load_block_map_page(zone, data_vio);
2288 }
2289
2290 /*
2291 * Find the PBN of a leaf block map page. This method may only be used after all allocated tree
2292 * pages have been loaded, otherwise, it may give the wrong answer (0).
2293 */
vdo_find_block_map_page_pbn(struct block_map * map,page_number_t page_number)2294 physical_block_number_t vdo_find_block_map_page_pbn(struct block_map *map,
2295 page_number_t page_number)
2296 {
2297 struct data_location mapping;
2298 struct tree_page *tree_page;
2299 struct block_map_page *page;
2300 root_count_t root_index = page_number % map->root_count;
2301 page_number_t page_index = page_number / map->root_count;
2302 slot_number_t slot = page_index % VDO_BLOCK_MAP_ENTRIES_PER_PAGE;
2303
2304 page_index /= VDO_BLOCK_MAP_ENTRIES_PER_PAGE;
2305
2306 tree_page = get_tree_page_by_index(map->forest, root_index, 1, page_index);
2307 page = (struct block_map_page *) tree_page->page_buffer;
2308 if (!page->header.initialized)
2309 return VDO_ZERO_BLOCK;
2310
2311 mapping = vdo_unpack_block_map_entry(&page->entries[slot]);
2312 if (!vdo_is_valid_location(&mapping) || vdo_is_state_compressed(mapping.state))
2313 return VDO_ZERO_BLOCK;
2314 return mapping.pbn;
2315 }
2316
2317 /*
2318 * Write a tree page or indicate that it has been re-dirtied if it is already being written. This
2319 * method is used when correcting errors in the tree during read-only rebuild.
2320 */
vdo_write_tree_page(struct tree_page * page,struct block_map_zone * zone)2321 void vdo_write_tree_page(struct tree_page *page, struct block_map_zone *zone)
2322 {
2323 bool waiting = vdo_waiter_is_waiting(&page->waiter);
2324
2325 if (waiting && (zone->flusher == page))
2326 return;
2327
2328 set_generation(zone, page, zone->generation);
2329 if (waiting || page->writing)
2330 return;
2331
2332 enqueue_page(page, zone);
2333 }
2334
make_segment(struct forest * old_forest,block_count_t new_pages,struct boundary * new_boundary,struct forest * forest)2335 static int make_segment(struct forest *old_forest, block_count_t new_pages,
2336 struct boundary *new_boundary, struct forest *forest)
2337 {
2338 size_t index = (old_forest == NULL) ? 0 : old_forest->segments;
2339 struct tree_page *page_ptr;
2340 page_count_t segment_sizes[VDO_BLOCK_MAP_TREE_HEIGHT];
2341 height_t height;
2342 root_count_t root;
2343 int result;
2344
2345 forest->segments = index + 1;
2346
2347 result = vdo_allocate(forest->segments, struct boundary,
2348 "forest boundary array", &forest->boundaries);
2349 if (result != VDO_SUCCESS)
2350 return result;
2351
2352 result = vdo_allocate(forest->segments, struct tree_page *,
2353 "forest page pointers", &forest->pages);
2354 if (result != VDO_SUCCESS)
2355 return result;
2356
2357 result = vdo_allocate(new_pages, struct tree_page,
2358 "new forest pages", &forest->pages[index]);
2359 if (result != VDO_SUCCESS)
2360 return result;
2361
2362 if (index > 0) {
2363 memcpy(forest->boundaries, old_forest->boundaries,
2364 index * sizeof(struct boundary));
2365 memcpy(forest->pages, old_forest->pages,
2366 index * sizeof(struct tree_page *));
2367 }
2368
2369 memcpy(&(forest->boundaries[index]), new_boundary, sizeof(struct boundary));
2370
2371 for (height = 0; height < VDO_BLOCK_MAP_TREE_HEIGHT; height++) {
2372 segment_sizes[height] = new_boundary->levels[height];
2373 if (index > 0)
2374 segment_sizes[height] -= old_forest->boundaries[index - 1].levels[height];
2375 }
2376
2377 page_ptr = forest->pages[index];
2378 for (root = 0; root < forest->map->root_count; root++) {
2379 struct block_map_tree_segment *segment;
2380 struct block_map_tree *tree = &(forest->trees[root]);
2381 height_t height;
2382
2383 int result = vdo_allocate(forest->segments,
2384 struct block_map_tree_segment,
2385 "tree root segments", &tree->segments);
2386 if (result != VDO_SUCCESS)
2387 return result;
2388
2389 if (index > 0) {
2390 memcpy(tree->segments, old_forest->trees[root].segments,
2391 index * sizeof(struct block_map_tree_segment));
2392 }
2393
2394 segment = &(tree->segments[index]);
2395 for (height = 0; height < VDO_BLOCK_MAP_TREE_HEIGHT; height++) {
2396 if (segment_sizes[height] == 0)
2397 continue;
2398
2399 segment->levels[height] = page_ptr;
2400 if (height == (VDO_BLOCK_MAP_TREE_HEIGHT - 1)) {
2401 /* Record the root. */
2402 struct block_map_page *page =
2403 vdo_format_block_map_page(page_ptr->page_buffer,
2404 forest->map->nonce,
2405 VDO_INVALID_PBN, true);
2406 page->entries[0] =
2407 vdo_pack_block_map_entry(forest->map->root_origin + root,
2408 VDO_MAPPING_STATE_UNCOMPRESSED);
2409 }
2410 page_ptr += segment_sizes[height];
2411 }
2412 }
2413
2414 return VDO_SUCCESS;
2415 }
2416
deforest(struct forest * forest,size_t first_page_segment)2417 static void deforest(struct forest *forest, size_t first_page_segment)
2418 {
2419 root_count_t root;
2420
2421 if (forest->pages != NULL) {
2422 size_t segment;
2423
2424 for (segment = first_page_segment; segment < forest->segments; segment++)
2425 vdo_free(forest->pages[segment]);
2426 vdo_free(forest->pages);
2427 }
2428
2429 for (root = 0; root < forest->map->root_count; root++)
2430 vdo_free(forest->trees[root].segments);
2431
2432 vdo_free(forest->boundaries);
2433 vdo_free(forest);
2434 }
2435
2436 /**
2437 * make_forest() - Make a collection of trees for a block_map, expanding the existing forest if
2438 * there is one.
2439 * @entries: The number of entries the block map will hold.
2440 *
2441 * Return: VDO_SUCCESS or an error.
2442 */
make_forest(struct block_map * map,block_count_t entries)2443 static int make_forest(struct block_map *map, block_count_t entries)
2444 {
2445 struct forest *forest, *old_forest = map->forest;
2446 struct boundary new_boundary, *old_boundary = NULL;
2447 block_count_t new_pages;
2448 int result;
2449
2450 if (old_forest != NULL)
2451 old_boundary = &(old_forest->boundaries[old_forest->segments - 1]);
2452
2453 new_pages = vdo_compute_new_forest_pages(map->root_count, old_boundary,
2454 entries, &new_boundary);
2455 if (new_pages == 0) {
2456 map->next_entry_count = entries;
2457 return VDO_SUCCESS;
2458 }
2459
2460 result = vdo_allocate_extended(struct forest, map->root_count,
2461 struct block_map_tree, __func__,
2462 &forest);
2463 if (result != VDO_SUCCESS)
2464 return result;
2465
2466 forest->map = map;
2467 result = make_segment(old_forest, new_pages, &new_boundary, forest);
2468 if (result != VDO_SUCCESS) {
2469 deforest(forest, forest->segments - 1);
2470 return result;
2471 }
2472
2473 map->next_forest = forest;
2474 map->next_entry_count = entries;
2475 return VDO_SUCCESS;
2476 }
2477
2478 /**
2479 * replace_forest() - Replace a block_map's forest with the already-prepared larger forest.
2480 */
replace_forest(struct block_map * map)2481 static void replace_forest(struct block_map *map)
2482 {
2483 if (map->next_forest != NULL) {
2484 if (map->forest != NULL)
2485 deforest(map->forest, map->forest->segments);
2486 map->forest = vdo_forget(map->next_forest);
2487 }
2488
2489 map->entry_count = map->next_entry_count;
2490 map->next_entry_count = 0;
2491 }
2492
2493 /**
2494 * finish_cursor() - Finish the traversal of a single tree. If it was the last cursor, finish the
2495 * traversal.
2496 */
finish_cursor(struct cursor * cursor)2497 static void finish_cursor(struct cursor *cursor)
2498 {
2499 struct cursors *cursors = cursor->parent;
2500 struct vdo_completion *completion = cursors->completion;
2501
2502 return_vio_to_pool(cursors->pool, vdo_forget(cursor->vio));
2503 if (--cursors->active_roots > 0)
2504 return;
2505
2506 vdo_free(cursors);
2507
2508 vdo_finish_completion(completion);
2509 }
2510
2511 static void traverse(struct cursor *cursor);
2512
2513 /**
2514 * continue_traversal() - Continue traversing a block map tree.
2515 * @completion: The VIO doing a read or write.
2516 */
continue_traversal(struct vdo_completion * completion)2517 static void continue_traversal(struct vdo_completion *completion)
2518 {
2519 vio_record_metadata_io_error(as_vio(completion));
2520 traverse(completion->parent);
2521 }
2522
2523 /**
2524 * finish_traversal_load() - Continue traversing a block map tree now that a page has been loaded.
2525 * @completion: The VIO doing the read.
2526 */
finish_traversal_load(struct vdo_completion * completion)2527 static void finish_traversal_load(struct vdo_completion *completion)
2528 {
2529 struct cursor *cursor = completion->parent;
2530 height_t height = cursor->height;
2531 struct cursor_level *level = &cursor->levels[height];
2532 struct tree_page *tree_page =
2533 &(cursor->tree->segments[0].levels[height][level->page_index]);
2534 struct block_map_page *page = (struct block_map_page *) tree_page->page_buffer;
2535
2536 vdo_copy_valid_page(cursor->vio->vio.data,
2537 cursor->parent->zone->block_map->nonce,
2538 pbn_from_vio_bio(cursor->vio->vio.bio), page);
2539 traverse(cursor);
2540 }
2541
traversal_endio(struct bio * bio)2542 static void traversal_endio(struct bio *bio)
2543 {
2544 struct vio *vio = bio->bi_private;
2545 struct cursor *cursor = vio->completion.parent;
2546
2547 continue_vio_after_io(vio, finish_traversal_load,
2548 cursor->parent->zone->thread_id);
2549 }
2550
2551 /**
2552 * traverse() - Traverse a single block map tree.
2553 *
2554 * This is the recursive heart of the traversal process.
2555 */
traverse(struct cursor * cursor)2556 static void traverse(struct cursor *cursor)
2557 {
2558 for (; cursor->height < VDO_BLOCK_MAP_TREE_HEIGHT; cursor->height++) {
2559 height_t height = cursor->height;
2560 struct cursor_level *level = &cursor->levels[height];
2561 struct tree_page *tree_page =
2562 &(cursor->tree->segments[0].levels[height][level->page_index]);
2563 struct block_map_page *page = (struct block_map_page *) tree_page->page_buffer;
2564
2565 if (!page->header.initialized)
2566 continue;
2567
2568 for (; level->slot < VDO_BLOCK_MAP_ENTRIES_PER_PAGE; level->slot++) {
2569 struct cursor_level *next_level;
2570 page_number_t entry_index =
2571 (VDO_BLOCK_MAP_ENTRIES_PER_PAGE * level->page_index) + level->slot;
2572 struct data_location location =
2573 vdo_unpack_block_map_entry(&page->entries[level->slot]);
2574
2575 if (!vdo_is_valid_location(&location)) {
2576 /* This entry is invalid, so remove it from the page. */
2577 page->entries[level->slot] = UNMAPPED_BLOCK_MAP_ENTRY;
2578 vdo_write_tree_page(tree_page, cursor->parent->zone);
2579 continue;
2580 }
2581
2582 if (!vdo_is_mapped_location(&location))
2583 continue;
2584
2585 /* Erase mapped entries past the end of the logical space. */
2586 if (entry_index >= cursor->boundary.levels[height]) {
2587 page->entries[level->slot] = UNMAPPED_BLOCK_MAP_ENTRY;
2588 vdo_write_tree_page(tree_page, cursor->parent->zone);
2589 continue;
2590 }
2591
2592 if (cursor->height < VDO_BLOCK_MAP_TREE_HEIGHT - 1) {
2593 int result = cursor->parent->entry_callback(location.pbn,
2594 cursor->parent->completion);
2595 if (result != VDO_SUCCESS) {
2596 page->entries[level->slot] = UNMAPPED_BLOCK_MAP_ENTRY;
2597 vdo_write_tree_page(tree_page, cursor->parent->zone);
2598 continue;
2599 }
2600 }
2601
2602 if (cursor->height == 0)
2603 continue;
2604
2605 cursor->height--;
2606 next_level = &cursor->levels[cursor->height];
2607 next_level->page_index = entry_index;
2608 next_level->slot = 0;
2609 level->slot++;
2610 vdo_submit_metadata_vio(&cursor->vio->vio, location.pbn,
2611 traversal_endio, continue_traversal,
2612 REQ_OP_READ | REQ_PRIO);
2613 return;
2614 }
2615 }
2616
2617 finish_cursor(cursor);
2618 }
2619
2620 /**
2621 * launch_cursor() - Start traversing a single block map tree now that the cursor has a VIO with
2622 * which to load pages.
2623 * @context: The pooled_vio just acquired.
2624 *
2625 * Implements waiter_callback_fn.
2626 */
launch_cursor(struct vdo_waiter * waiter,void * context)2627 static void launch_cursor(struct vdo_waiter *waiter, void *context)
2628 {
2629 struct cursor *cursor = container_of(waiter, struct cursor, waiter);
2630 struct pooled_vio *pooled = context;
2631
2632 cursor->vio = pooled;
2633 pooled->vio.completion.parent = cursor;
2634 pooled->vio.completion.callback_thread_id = cursor->parent->zone->thread_id;
2635 traverse(cursor);
2636 }
2637
2638 /**
2639 * compute_boundary() - Compute the number of pages used at each level of the given root's tree.
2640 *
2641 * Return: The list of page counts as a boundary structure.
2642 */
compute_boundary(struct block_map * map,root_count_t root_index)2643 static struct boundary compute_boundary(struct block_map *map, root_count_t root_index)
2644 {
2645 struct boundary boundary;
2646 height_t height;
2647 page_count_t leaf_pages = vdo_compute_block_map_page_count(map->entry_count);
2648 /*
2649 * Compute the leaf pages for this root. If the number of leaf pages does not distribute
2650 * evenly, we must determine if this root gets an extra page. Extra pages are assigned to
2651 * roots starting from tree 0.
2652 */
2653 page_count_t last_tree_root = (leaf_pages - 1) % map->root_count;
2654 page_count_t level_pages = leaf_pages / map->root_count;
2655
2656 if (root_index <= last_tree_root)
2657 level_pages++;
2658
2659 for (height = 0; height < VDO_BLOCK_MAP_TREE_HEIGHT - 1; height++) {
2660 boundary.levels[height] = level_pages;
2661 level_pages = DIV_ROUND_UP(level_pages, VDO_BLOCK_MAP_ENTRIES_PER_PAGE);
2662 }
2663
2664 /* The root node always exists, even if the root is otherwise unused. */
2665 boundary.levels[VDO_BLOCK_MAP_TREE_HEIGHT - 1] = 1;
2666
2667 return boundary;
2668 }
2669
2670 /**
2671 * vdo_traverse_forest() - Walk the entire forest of a block map.
2672 * @callback: A function to call with the pbn of each allocated node in the forest.
2673 * @completion: The completion to notify on each traversed PBN, and when traversal completes.
2674 */
vdo_traverse_forest(struct block_map * map,vdo_entry_callback_fn callback,struct vdo_completion * completion)2675 void vdo_traverse_forest(struct block_map *map, vdo_entry_callback_fn callback,
2676 struct vdo_completion *completion)
2677 {
2678 root_count_t root;
2679 struct cursors *cursors;
2680 int result;
2681
2682 result = vdo_allocate_extended(struct cursors, map->root_count,
2683 struct cursor, __func__, &cursors);
2684 if (result != VDO_SUCCESS) {
2685 vdo_fail_completion(completion, result);
2686 return;
2687 }
2688
2689 cursors->zone = &map->zones[0];
2690 cursors->pool = cursors->zone->vio_pool;
2691 cursors->entry_callback = callback;
2692 cursors->completion = completion;
2693 cursors->active_roots = map->root_count;
2694 for (root = 0; root < map->root_count; root++) {
2695 struct cursor *cursor = &cursors->cursors[root];
2696
2697 *cursor = (struct cursor) {
2698 .tree = &map->forest->trees[root],
2699 .height = VDO_BLOCK_MAP_TREE_HEIGHT - 1,
2700 .parent = cursors,
2701 .boundary = compute_boundary(map, root),
2702 };
2703
2704 cursor->waiter.callback = launch_cursor;
2705 acquire_vio_from_pool(cursors->pool, &cursor->waiter);
2706 }
2707 }
2708
2709 /**
2710 * initialize_block_map_zone() - Initialize the per-zone portions of the block map.
2711 * @maximum_age: The number of journal blocks before a dirtied page is considered old and must be
2712 * written out.
2713 */
initialize_block_map_zone(struct block_map * map,zone_count_t zone_number,page_count_t cache_size,block_count_t maximum_age)2714 static int __must_check initialize_block_map_zone(struct block_map *map,
2715 zone_count_t zone_number,
2716 page_count_t cache_size,
2717 block_count_t maximum_age)
2718 {
2719 int result;
2720 block_count_t i;
2721 struct vdo *vdo = map->vdo;
2722 struct block_map_zone *zone = &map->zones[zone_number];
2723
2724 BUILD_BUG_ON(sizeof(struct page_descriptor) != sizeof(u64));
2725
2726 zone->zone_number = zone_number;
2727 zone->thread_id = vdo->thread_config.logical_threads[zone_number];
2728 zone->block_map = map;
2729
2730 result = vdo_allocate_extended(struct dirty_lists, maximum_age,
2731 dirty_era_t, __func__,
2732 &zone->dirty_lists);
2733 if (result != VDO_SUCCESS)
2734 return result;
2735
2736 zone->dirty_lists->maximum_age = maximum_age;
2737 INIT_LIST_HEAD(&zone->dirty_lists->expired[VDO_TREE_PAGE]);
2738 INIT_LIST_HEAD(&zone->dirty_lists->expired[VDO_CACHE_PAGE]);
2739
2740 for (i = 0; i < maximum_age; i++) {
2741 INIT_LIST_HEAD(&zone->dirty_lists->eras[i][VDO_TREE_PAGE]);
2742 INIT_LIST_HEAD(&zone->dirty_lists->eras[i][VDO_CACHE_PAGE]);
2743 }
2744
2745 result = vdo_int_map_create(VDO_LOCK_MAP_CAPACITY, &zone->loading_pages);
2746 if (result != VDO_SUCCESS)
2747 return result;
2748
2749 result = make_vio_pool(vdo, BLOCK_MAP_VIO_POOL_SIZE,
2750 zone->thread_id, VIO_TYPE_BLOCK_MAP_INTERIOR,
2751 VIO_PRIORITY_METADATA, zone, &zone->vio_pool);
2752 if (result != VDO_SUCCESS)
2753 return result;
2754
2755 vdo_set_admin_state_code(&zone->state, VDO_ADMIN_STATE_NORMAL_OPERATION);
2756
2757 zone->page_cache.zone = zone;
2758 zone->page_cache.vdo = vdo;
2759 zone->page_cache.page_count = cache_size / map->zone_count;
2760 zone->page_cache.stats.free_pages = zone->page_cache.page_count;
2761
2762 result = allocate_cache_components(&zone->page_cache);
2763 if (result != VDO_SUCCESS)
2764 return result;
2765
2766 /* initialize empty circular queues */
2767 INIT_LIST_HEAD(&zone->page_cache.lru_list);
2768 INIT_LIST_HEAD(&zone->page_cache.outgoing_list);
2769
2770 return VDO_SUCCESS;
2771 }
2772
2773 /* Implements vdo_zone_thread_getter_fn */
get_block_map_zone_thread_id(void * context,zone_count_t zone_number)2774 static thread_id_t get_block_map_zone_thread_id(void *context, zone_count_t zone_number)
2775 {
2776 struct block_map *map = context;
2777
2778 return map->zones[zone_number].thread_id;
2779 }
2780
2781 /* Implements vdo_action_preamble_fn */
prepare_for_era_advance(void * context,struct vdo_completion * parent)2782 static void prepare_for_era_advance(void *context, struct vdo_completion *parent)
2783 {
2784 struct block_map *map = context;
2785
2786 map->current_era_point = map->pending_era_point;
2787 vdo_finish_completion(parent);
2788 }
2789
2790 /* Implements vdo_zone_action_fn */
advance_block_map_zone_era(void * context,zone_count_t zone_number,struct vdo_completion * parent)2791 static void advance_block_map_zone_era(void *context, zone_count_t zone_number,
2792 struct vdo_completion *parent)
2793 {
2794 struct block_map *map = context;
2795 struct block_map_zone *zone = &map->zones[zone_number];
2796
2797 update_period(zone->dirty_lists, map->current_era_point);
2798 write_expired_elements(zone);
2799 vdo_finish_completion(parent);
2800 }
2801
2802 /*
2803 * Schedule an era advance if necessary. This method should not be called directly. Rather, call
2804 * vdo_schedule_default_action() on the block map's action manager.
2805 *
2806 * Implements vdo_action_scheduler_fn.
2807 */
schedule_era_advance(void * context)2808 static bool schedule_era_advance(void *context)
2809 {
2810 struct block_map *map = context;
2811
2812 if (map->current_era_point == map->pending_era_point)
2813 return false;
2814
2815 return vdo_schedule_action(map->action_manager, prepare_for_era_advance,
2816 advance_block_map_zone_era, NULL, NULL);
2817 }
2818
uninitialize_block_map_zone(struct block_map_zone * zone)2819 static void uninitialize_block_map_zone(struct block_map_zone *zone)
2820 {
2821 struct vdo_page_cache *cache = &zone->page_cache;
2822
2823 vdo_free(vdo_forget(zone->dirty_lists));
2824 free_vio_pool(vdo_forget(zone->vio_pool));
2825 vdo_int_map_free(vdo_forget(zone->loading_pages));
2826 if (cache->infos != NULL) {
2827 struct page_info *info;
2828
2829 for (info = cache->infos; info < cache->infos + cache->page_count; info++)
2830 free_vio(vdo_forget(info->vio));
2831 }
2832
2833 vdo_int_map_free(vdo_forget(cache->page_map));
2834 vdo_free(vdo_forget(cache->infos));
2835 vdo_free(vdo_forget(cache->pages));
2836 }
2837
vdo_free_block_map(struct block_map * map)2838 void vdo_free_block_map(struct block_map *map)
2839 {
2840 zone_count_t zone;
2841
2842 if (map == NULL)
2843 return;
2844
2845 for (zone = 0; zone < map->zone_count; zone++)
2846 uninitialize_block_map_zone(&map->zones[zone]);
2847
2848 vdo_abandon_block_map_growth(map);
2849 if (map->forest != NULL)
2850 deforest(vdo_forget(map->forest), 0);
2851 vdo_free(vdo_forget(map->action_manager));
2852 vdo_free(map);
2853 }
2854
2855 /* @journal may be NULL. */
vdo_decode_block_map(struct block_map_state_2_0 state,block_count_t logical_blocks,struct vdo * vdo,struct recovery_journal * journal,nonce_t nonce,page_count_t cache_size,block_count_t maximum_age,struct block_map ** map_ptr)2856 int vdo_decode_block_map(struct block_map_state_2_0 state, block_count_t logical_blocks,
2857 struct vdo *vdo, struct recovery_journal *journal,
2858 nonce_t nonce, page_count_t cache_size, block_count_t maximum_age,
2859 struct block_map **map_ptr)
2860 {
2861 struct block_map *map;
2862 int result;
2863 zone_count_t zone = 0;
2864
2865 BUILD_BUG_ON(VDO_BLOCK_MAP_ENTRIES_PER_PAGE !=
2866 ((VDO_BLOCK_SIZE - sizeof(struct block_map_page)) /
2867 sizeof(struct block_map_entry)));
2868 result = VDO_ASSERT(cache_size > 0, "block map cache size is specified");
2869 if (result != VDO_SUCCESS)
2870 return result;
2871
2872 result = vdo_allocate_extended(struct block_map,
2873 vdo->thread_config.logical_zone_count,
2874 struct block_map_zone, __func__, &map);
2875 if (result != VDO_SUCCESS)
2876 return result;
2877
2878 map->vdo = vdo;
2879 map->root_origin = state.root_origin;
2880 map->root_count = state.root_count;
2881 map->entry_count = logical_blocks;
2882 map->journal = journal;
2883 map->nonce = nonce;
2884
2885 result = make_forest(map, map->entry_count);
2886 if (result != VDO_SUCCESS) {
2887 vdo_free_block_map(map);
2888 return result;
2889 }
2890
2891 replace_forest(map);
2892
2893 map->zone_count = vdo->thread_config.logical_zone_count;
2894 for (zone = 0; zone < map->zone_count; zone++) {
2895 result = initialize_block_map_zone(map, zone, cache_size, maximum_age);
2896 if (result != VDO_SUCCESS) {
2897 vdo_free_block_map(map);
2898 return result;
2899 }
2900 }
2901
2902 result = vdo_make_action_manager(map->zone_count, get_block_map_zone_thread_id,
2903 vdo_get_recovery_journal_thread_id(journal),
2904 map, schedule_era_advance, vdo,
2905 &map->action_manager);
2906 if (result != VDO_SUCCESS) {
2907 vdo_free_block_map(map);
2908 return result;
2909 }
2910
2911 *map_ptr = map;
2912 return VDO_SUCCESS;
2913 }
2914
vdo_record_block_map(const struct block_map * map)2915 struct block_map_state_2_0 vdo_record_block_map(const struct block_map *map)
2916 {
2917 return (struct block_map_state_2_0) {
2918 .flat_page_origin = VDO_BLOCK_MAP_FLAT_PAGE_ORIGIN,
2919 /* This is the flat page count, which has turned out to always be 0. */
2920 .flat_page_count = 0,
2921 .root_origin = map->root_origin,
2922 .root_count = map->root_count,
2923 };
2924 }
2925
2926 /* The block map needs to know the journals' sequence number to initialize the eras. */
vdo_initialize_block_map_from_journal(struct block_map * map,struct recovery_journal * journal)2927 void vdo_initialize_block_map_from_journal(struct block_map *map,
2928 struct recovery_journal *journal)
2929 {
2930 zone_count_t z = 0;
2931
2932 map->current_era_point = vdo_get_recovery_journal_current_sequence_number(journal);
2933 map->pending_era_point = map->current_era_point;
2934
2935 for (z = 0; z < map->zone_count; z++) {
2936 struct dirty_lists *dirty_lists = map->zones[z].dirty_lists;
2937
2938 VDO_ASSERT_LOG_ONLY(dirty_lists->next_period == 0, "current period not set");
2939 dirty_lists->oldest_period = map->current_era_point;
2940 dirty_lists->next_period = map->current_era_point + 1;
2941 dirty_lists->offset = map->current_era_point % dirty_lists->maximum_age;
2942 }
2943 }
2944
2945 /* Compute the logical zone for the LBN of a data vio. */
vdo_compute_logical_zone(struct data_vio * data_vio)2946 zone_count_t vdo_compute_logical_zone(struct data_vio *data_vio)
2947 {
2948 struct block_map *map = vdo_from_data_vio(data_vio)->block_map;
2949 struct tree_lock *tree_lock = &data_vio->tree_lock;
2950 page_number_t page_number = data_vio->logical.lbn / VDO_BLOCK_MAP_ENTRIES_PER_PAGE;
2951
2952 tree_lock->tree_slots[0].page_index = page_number;
2953 tree_lock->root_index = page_number % map->root_count;
2954 return (tree_lock->root_index % map->zone_count);
2955 }
2956
vdo_advance_block_map_era(struct block_map * map,sequence_number_t recovery_block_number)2957 void vdo_advance_block_map_era(struct block_map *map,
2958 sequence_number_t recovery_block_number)
2959 {
2960 if (map == NULL)
2961 return;
2962
2963 map->pending_era_point = recovery_block_number;
2964 vdo_schedule_default_action(map->action_manager);
2965 }
2966
2967 /* Implements vdo_admin_initiator_fn */
initiate_drain(struct admin_state * state)2968 static void initiate_drain(struct admin_state *state)
2969 {
2970 struct block_map_zone *zone = container_of(state, struct block_map_zone, state);
2971
2972 VDO_ASSERT_LOG_ONLY((zone->active_lookups == 0),
2973 "%s() called with no active lookups", __func__);
2974
2975 if (!vdo_is_state_suspending(state)) {
2976 while (zone->dirty_lists->oldest_period < zone->dirty_lists->next_period)
2977 expire_oldest_list(zone->dirty_lists);
2978 write_expired_elements(zone);
2979 }
2980
2981 check_for_drain_complete(zone);
2982 }
2983
2984 /* Implements vdo_zone_action_fn. */
drain_zone(void * context,zone_count_t zone_number,struct vdo_completion * parent)2985 static void drain_zone(void *context, zone_count_t zone_number,
2986 struct vdo_completion *parent)
2987 {
2988 struct block_map *map = context;
2989 struct block_map_zone *zone = &map->zones[zone_number];
2990
2991 vdo_start_draining(&zone->state,
2992 vdo_get_current_manager_operation(map->action_manager),
2993 parent, initiate_drain);
2994 }
2995
vdo_drain_block_map(struct block_map * map,const struct admin_state_code * operation,struct vdo_completion * parent)2996 void vdo_drain_block_map(struct block_map *map, const struct admin_state_code *operation,
2997 struct vdo_completion *parent)
2998 {
2999 vdo_schedule_operation(map->action_manager, operation, NULL, drain_zone, NULL,
3000 parent);
3001 }
3002
3003 /* Implements vdo_zone_action_fn. */
resume_block_map_zone(void * context,zone_count_t zone_number,struct vdo_completion * parent)3004 static void resume_block_map_zone(void *context, zone_count_t zone_number,
3005 struct vdo_completion *parent)
3006 {
3007 struct block_map *map = context;
3008 struct block_map_zone *zone = &map->zones[zone_number];
3009
3010 vdo_fail_completion(parent, vdo_resume_if_quiescent(&zone->state));
3011 }
3012
vdo_resume_block_map(struct block_map * map,struct vdo_completion * parent)3013 void vdo_resume_block_map(struct block_map *map, struct vdo_completion *parent)
3014 {
3015 vdo_schedule_operation(map->action_manager, VDO_ADMIN_STATE_RESUMING,
3016 NULL, resume_block_map_zone, NULL, parent);
3017 }
3018
3019 /* Allocate an expanded collection of trees, for a future growth. */
vdo_prepare_to_grow_block_map(struct block_map * map,block_count_t new_logical_blocks)3020 int vdo_prepare_to_grow_block_map(struct block_map *map,
3021 block_count_t new_logical_blocks)
3022 {
3023 if (map->next_entry_count == new_logical_blocks)
3024 return VDO_SUCCESS;
3025
3026 if (map->next_entry_count > 0)
3027 vdo_abandon_block_map_growth(map);
3028
3029 if (new_logical_blocks < map->entry_count) {
3030 map->next_entry_count = map->entry_count;
3031 return VDO_SUCCESS;
3032 }
3033
3034 return make_forest(map, new_logical_blocks);
3035 }
3036
3037 /* Implements vdo_action_preamble_fn */
grow_forest(void * context,struct vdo_completion * completion)3038 static void grow_forest(void *context, struct vdo_completion *completion)
3039 {
3040 replace_forest(context);
3041 vdo_finish_completion(completion);
3042 }
3043
3044 /* Requires vdo_prepare_to_grow_block_map() to have been previously called. */
vdo_grow_block_map(struct block_map * map,struct vdo_completion * parent)3045 void vdo_grow_block_map(struct block_map *map, struct vdo_completion *parent)
3046 {
3047 vdo_schedule_operation(map->action_manager,
3048 VDO_ADMIN_STATE_SUSPENDED_OPERATION,
3049 grow_forest, NULL, NULL, parent);
3050 }
3051
vdo_abandon_block_map_growth(struct block_map * map)3052 void vdo_abandon_block_map_growth(struct block_map *map)
3053 {
3054 struct forest *forest = vdo_forget(map->next_forest);
3055
3056 if (forest != NULL)
3057 deforest(forest, forest->segments - 1);
3058
3059 map->next_entry_count = 0;
3060 }
3061
3062 /* Release the page completion and then continue the requester. */
finish_processing_page(struct vdo_completion * completion,int result)3063 static inline void finish_processing_page(struct vdo_completion *completion, int result)
3064 {
3065 struct vdo_completion *parent = completion->parent;
3066
3067 vdo_release_page_completion(completion);
3068 vdo_continue_completion(parent, result);
3069 }
3070
handle_page_error(struct vdo_completion * completion)3071 static void handle_page_error(struct vdo_completion *completion)
3072 {
3073 finish_processing_page(completion, completion->result);
3074 }
3075
3076 /* Fetch the mapping page for a block map update, and call the provided handler when fetched. */
fetch_mapping_page(struct data_vio * data_vio,bool modifiable,vdo_action_fn action)3077 static void fetch_mapping_page(struct data_vio *data_vio, bool modifiable,
3078 vdo_action_fn action)
3079 {
3080 struct block_map_zone *zone = data_vio->logical.zone->block_map_zone;
3081
3082 if (vdo_is_state_draining(&zone->state)) {
3083 continue_data_vio_with_error(data_vio, VDO_SHUTTING_DOWN);
3084 return;
3085 }
3086
3087 vdo_get_page(&data_vio->page_completion, zone,
3088 data_vio->tree_lock.tree_slots[0].block_map_slot.pbn,
3089 modifiable, &data_vio->vio.completion,
3090 action, handle_page_error, false);
3091 }
3092
3093 /**
3094 * clear_mapped_location() - Clear a data_vio's mapped block location, setting it to be unmapped.
3095 *
3096 * This indicates the block map entry for the logical block is either unmapped or corrupted.
3097 */
clear_mapped_location(struct data_vio * data_vio)3098 static void clear_mapped_location(struct data_vio *data_vio)
3099 {
3100 data_vio->mapped = (struct zoned_pbn) {
3101 .state = VDO_MAPPING_STATE_UNMAPPED,
3102 };
3103 }
3104
3105 /**
3106 * set_mapped_location() - Decode and validate a block map entry, and set the mapped location of a
3107 * data_vio.
3108 *
3109 * Return: VDO_SUCCESS or VDO_BAD_MAPPING if the map entry is invalid or an error code for any
3110 * other failure
3111 */
set_mapped_location(struct data_vio * data_vio,const struct block_map_entry * entry)3112 static int __must_check set_mapped_location(struct data_vio *data_vio,
3113 const struct block_map_entry *entry)
3114 {
3115 /* Unpack the PBN for logging purposes even if the entry is invalid. */
3116 struct data_location mapped = vdo_unpack_block_map_entry(entry);
3117
3118 if (vdo_is_valid_location(&mapped)) {
3119 int result;
3120
3121 result = vdo_get_physical_zone(vdo_from_data_vio(data_vio),
3122 mapped.pbn, &data_vio->mapped.zone);
3123 if (result == VDO_SUCCESS) {
3124 data_vio->mapped.pbn = mapped.pbn;
3125 data_vio->mapped.state = mapped.state;
3126 return VDO_SUCCESS;
3127 }
3128
3129 /*
3130 * Return all errors not specifically known to be errors from validating the
3131 * location.
3132 */
3133 if ((result != VDO_OUT_OF_RANGE) && (result != VDO_BAD_MAPPING))
3134 return result;
3135 }
3136
3137 /*
3138 * Log the corruption even if we wind up ignoring it for write VIOs, converting all cases
3139 * to VDO_BAD_MAPPING.
3140 */
3141 vdo_log_error_strerror(VDO_BAD_MAPPING,
3142 "PBN %llu with state %u read from the block map was invalid",
3143 (unsigned long long) mapped.pbn, mapped.state);
3144
3145 /*
3146 * A read VIO has no option but to report the bad mapping--reading zeros would be hiding
3147 * known data loss.
3148 */
3149 if (!data_vio->write)
3150 return VDO_BAD_MAPPING;
3151
3152 /*
3153 * A write VIO only reads this mapping to decref the old block. Treat this as an unmapped
3154 * entry rather than fail the write.
3155 */
3156 clear_mapped_location(data_vio);
3157 return VDO_SUCCESS;
3158 }
3159
3160 /* This callback is registered in vdo_get_mapped_block(). */
get_mapping_from_fetched_page(struct vdo_completion * completion)3161 static void get_mapping_from_fetched_page(struct vdo_completion *completion)
3162 {
3163 int result;
3164 struct vdo_page_completion *vpc = as_vdo_page_completion(completion);
3165 const struct block_map_page *page;
3166 const struct block_map_entry *entry;
3167 struct data_vio *data_vio = as_data_vio(completion->parent);
3168 struct block_map_tree_slot *tree_slot;
3169
3170 if (completion->result != VDO_SUCCESS) {
3171 finish_processing_page(completion, completion->result);
3172 return;
3173 }
3174
3175 result = validate_completed_page(vpc, false);
3176 if (result != VDO_SUCCESS) {
3177 finish_processing_page(completion, result);
3178 return;
3179 }
3180
3181 page = (const struct block_map_page *) get_page_buffer(vpc->info);
3182 tree_slot = &data_vio->tree_lock.tree_slots[0];
3183 entry = &page->entries[tree_slot->block_map_slot.slot];
3184
3185 result = set_mapped_location(data_vio, entry);
3186 finish_processing_page(completion, result);
3187 }
3188
vdo_update_block_map_page(struct block_map_page * page,struct data_vio * data_vio,physical_block_number_t pbn,enum block_mapping_state mapping_state,sequence_number_t * recovery_lock)3189 void vdo_update_block_map_page(struct block_map_page *page, struct data_vio *data_vio,
3190 physical_block_number_t pbn,
3191 enum block_mapping_state mapping_state,
3192 sequence_number_t *recovery_lock)
3193 {
3194 struct block_map_zone *zone = data_vio->logical.zone->block_map_zone;
3195 struct block_map *block_map = zone->block_map;
3196 struct recovery_journal *journal = block_map->journal;
3197 sequence_number_t old_locked, new_locked;
3198 struct tree_lock *tree_lock = &data_vio->tree_lock;
3199
3200 /* Encode the new mapping. */
3201 page->entries[tree_lock->tree_slots[tree_lock->height].block_map_slot.slot] =
3202 vdo_pack_block_map_entry(pbn, mapping_state);
3203
3204 /* Adjust references on the recovery journal blocks. */
3205 old_locked = *recovery_lock;
3206 new_locked = data_vio->recovery_sequence_number;
3207
3208 if ((old_locked == 0) || (old_locked > new_locked)) {
3209 vdo_acquire_recovery_journal_block_reference(journal, new_locked,
3210 VDO_ZONE_TYPE_LOGICAL,
3211 zone->zone_number);
3212
3213 if (old_locked > 0) {
3214 vdo_release_recovery_journal_block_reference(journal, old_locked,
3215 VDO_ZONE_TYPE_LOGICAL,
3216 zone->zone_number);
3217 }
3218
3219 *recovery_lock = new_locked;
3220 }
3221
3222 /*
3223 * FIXME: explain this more
3224 * Release the transferred lock from the data_vio.
3225 */
3226 vdo_release_journal_entry_lock(journal, new_locked);
3227 data_vio->recovery_sequence_number = 0;
3228 }
3229
put_mapping_in_fetched_page(struct vdo_completion * completion)3230 static void put_mapping_in_fetched_page(struct vdo_completion *completion)
3231 {
3232 struct data_vio *data_vio = as_data_vio(completion->parent);
3233 sequence_number_t old_lock;
3234 struct vdo_page_completion *vpc;
3235 struct page_info *info;
3236 int result;
3237
3238 if (completion->result != VDO_SUCCESS) {
3239 finish_processing_page(completion, completion->result);
3240 return;
3241 }
3242
3243 vpc = as_vdo_page_completion(completion);
3244 result = validate_completed_page(vpc, true);
3245 if (result != VDO_SUCCESS) {
3246 finish_processing_page(completion, result);
3247 return;
3248 }
3249
3250 info = vpc->info;
3251 old_lock = info->recovery_lock;
3252 vdo_update_block_map_page((struct block_map_page *) get_page_buffer(info),
3253 data_vio, data_vio->new_mapped.pbn,
3254 data_vio->new_mapped.state, &info->recovery_lock);
3255 set_info_state(info, PS_DIRTY);
3256 add_to_dirty_lists(info->cache->zone, &info->state_entry,
3257 VDO_CACHE_PAGE, old_lock, info->recovery_lock);
3258 finish_processing_page(completion, VDO_SUCCESS);
3259 }
3260
3261 /* Read a stored block mapping into a data_vio. */
vdo_get_mapped_block(struct data_vio * data_vio)3262 void vdo_get_mapped_block(struct data_vio *data_vio)
3263 {
3264 if (data_vio->tree_lock.tree_slots[0].block_map_slot.pbn == VDO_ZERO_BLOCK) {
3265 /*
3266 * We know that the block map page for this LBN has not been allocated, so the
3267 * block must be unmapped.
3268 */
3269 clear_mapped_location(data_vio);
3270 continue_data_vio(data_vio);
3271 return;
3272 }
3273
3274 fetch_mapping_page(data_vio, false, get_mapping_from_fetched_page);
3275 }
3276
3277 /* Update a stored block mapping to reflect a data_vio's new mapping. */
vdo_put_mapped_block(struct data_vio * data_vio)3278 void vdo_put_mapped_block(struct data_vio *data_vio)
3279 {
3280 fetch_mapping_page(data_vio, true, put_mapping_in_fetched_page);
3281 }
3282
vdo_get_block_map_statistics(struct block_map * map)3283 struct block_map_statistics vdo_get_block_map_statistics(struct block_map *map)
3284 {
3285 zone_count_t zone = 0;
3286 struct block_map_statistics totals;
3287
3288 memset(&totals, 0, sizeof(struct block_map_statistics));
3289 for (zone = 0; zone < map->zone_count; zone++) {
3290 const struct block_map_statistics *stats =
3291 &(map->zones[zone].page_cache.stats);
3292
3293 totals.dirty_pages += READ_ONCE(stats->dirty_pages);
3294 totals.clean_pages += READ_ONCE(stats->clean_pages);
3295 totals.free_pages += READ_ONCE(stats->free_pages);
3296 totals.failed_pages += READ_ONCE(stats->failed_pages);
3297 totals.incoming_pages += READ_ONCE(stats->incoming_pages);
3298 totals.outgoing_pages += READ_ONCE(stats->outgoing_pages);
3299 totals.cache_pressure += READ_ONCE(stats->cache_pressure);
3300 totals.read_count += READ_ONCE(stats->read_count);
3301 totals.write_count += READ_ONCE(stats->write_count);
3302 totals.failed_reads += READ_ONCE(stats->failed_reads);
3303 totals.failed_writes += READ_ONCE(stats->failed_writes);
3304 totals.reclaimed += READ_ONCE(stats->reclaimed);
3305 totals.read_outgoing += READ_ONCE(stats->read_outgoing);
3306 totals.found_in_cache += READ_ONCE(stats->found_in_cache);
3307 totals.discard_required += READ_ONCE(stats->discard_required);
3308 totals.wait_for_page += READ_ONCE(stats->wait_for_page);
3309 totals.fetch_required += READ_ONCE(stats->fetch_required);
3310 totals.pages_loaded += READ_ONCE(stats->pages_loaded);
3311 totals.pages_saved += READ_ONCE(stats->pages_saved);
3312 totals.flush_count += READ_ONCE(stats->flush_count);
3313 }
3314
3315 return totals;
3316 }
3317