1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3 * Copyright 2023 Red Hat
4 */
5
6 #include "vio.h"
7
8 #include <linux/bio.h>
9 #include <linux/blkdev.h>
10 #include <linux/kernel.h>
11 #include <linux/ratelimit.h>
12
13 #include "logger.h"
14 #include "memory-alloc.h"
15 #include "permassert.h"
16
17 #include "constants.h"
18 #include "io-submitter.h"
19 #include "vdo.h"
20
21 /* A vio_pool is a collection of preallocated vios. */
22 struct vio_pool {
23 /* The number of objects managed by the pool */
24 size_t size;
25 /* The list of objects which are available */
26 struct list_head available;
27 /* The queue of requestors waiting for objects from the pool */
28 struct vdo_wait_queue waiting;
29 /* The number of objects currently in use */
30 size_t busy_count;
31 /* The list of objects which are in use */
32 struct list_head busy;
33 /* The ID of the thread on which this pool may be used */
34 thread_id_t thread_id;
35 /* The buffer backing the pool's vios */
36 char *buffer;
37 /* The pool entries */
38 struct pooled_vio vios[];
39 };
40
pbn_from_vio_bio(struct bio * bio)41 physical_block_number_t pbn_from_vio_bio(struct bio *bio)
42 {
43 struct vio *vio = bio->bi_private;
44 struct vdo *vdo = vio->completion.vdo;
45 physical_block_number_t pbn = bio->bi_iter.bi_sector / VDO_SECTORS_PER_BLOCK;
46
47 return ((pbn == VDO_GEOMETRY_BLOCK_LOCATION) ? pbn : pbn + vdo->geometry.bio_offset);
48 }
49
create_multi_block_bio(block_count_t size,struct bio ** bio_ptr)50 static int create_multi_block_bio(block_count_t size, struct bio **bio_ptr)
51 {
52 struct bio *bio = NULL;
53 int result;
54
55 result = vdo_allocate_extended(struct bio, size + 1, struct bio_vec,
56 "bio", &bio);
57 if (result != VDO_SUCCESS)
58 return result;
59
60 *bio_ptr = bio;
61 return VDO_SUCCESS;
62 }
63
vdo_create_bio(struct bio ** bio_ptr)64 int vdo_create_bio(struct bio **bio_ptr)
65 {
66 return create_multi_block_bio(1, bio_ptr);
67 }
68
vdo_free_bio(struct bio * bio)69 void vdo_free_bio(struct bio *bio)
70 {
71 if (bio == NULL)
72 return;
73
74 bio_uninit(bio);
75 vdo_free(vdo_forget(bio));
76 }
77
allocate_vio_components(struct vdo * vdo,enum vio_type vio_type,enum vio_priority priority,void * parent,unsigned int block_count,char * data,struct vio * vio)78 int allocate_vio_components(struct vdo *vdo, enum vio_type vio_type,
79 enum vio_priority priority, void *parent,
80 unsigned int block_count, char *data, struct vio *vio)
81 {
82 struct bio *bio;
83 int result;
84
85 result = VDO_ASSERT(block_count <= MAX_BLOCKS_PER_VIO,
86 "block count %u does not exceed maximum %u", block_count,
87 MAX_BLOCKS_PER_VIO);
88 if (result != VDO_SUCCESS)
89 return result;
90
91 result = VDO_ASSERT(((vio_type != VIO_TYPE_UNINITIALIZED) && (vio_type != VIO_TYPE_DATA)),
92 "%d is a metadata type", vio_type);
93 if (result != VDO_SUCCESS)
94 return result;
95
96 result = create_multi_block_bio(block_count, &bio);
97 if (result != VDO_SUCCESS)
98 return result;
99
100 initialize_vio(vio, bio, block_count, vio_type, priority, vdo);
101 vio->completion.parent = parent;
102 vio->data = data;
103 return VDO_SUCCESS;
104 }
105
106 /**
107 * create_multi_block_metadata_vio() - Create a vio.
108 * @vdo: The vdo on which the vio will operate.
109 * @vio_type: The type of vio to create.
110 * @priority: The relative priority to assign to the vio.
111 * @parent: The parent of the vio.
112 * @block_count: The size of the vio in blocks.
113 * @data: The buffer.
114 * @vio_ptr: A pointer to hold the new vio.
115 *
116 * Return: VDO_SUCCESS or an error.
117 */
create_multi_block_metadata_vio(struct vdo * vdo,enum vio_type vio_type,enum vio_priority priority,void * parent,unsigned int block_count,char * data,struct vio ** vio_ptr)118 int create_multi_block_metadata_vio(struct vdo *vdo, enum vio_type vio_type,
119 enum vio_priority priority, void *parent,
120 unsigned int block_count, char *data,
121 struct vio **vio_ptr)
122 {
123 struct vio *vio;
124 int result;
125
126 BUILD_BUG_ON(sizeof(struct vio) > 256);
127
128 /*
129 * Metadata vios should use direct allocation and not use the buffer pool, which is
130 * reserved for submissions from the linux block layer.
131 */
132 result = vdo_allocate(1, struct vio, __func__, &vio);
133 if (result != VDO_SUCCESS) {
134 vdo_log_error("metadata vio allocation failure %d", result);
135 return result;
136 }
137
138 result = allocate_vio_components(vdo, vio_type, priority, parent, block_count,
139 data, vio);
140 if (result != VDO_SUCCESS) {
141 vdo_free(vio);
142 return result;
143 }
144
145 *vio_ptr = vio;
146 return VDO_SUCCESS;
147 }
148
149 /**
150 * free_vio_components() - Free the components of a vio embedded in a larger structure.
151 * @vio: The vio to destroy
152 */
free_vio_components(struct vio * vio)153 void free_vio_components(struct vio *vio)
154 {
155 if (vio == NULL)
156 return;
157
158 BUG_ON(is_data_vio(vio));
159 vdo_free_bio(vdo_forget(vio->bio));
160 }
161
162 /**
163 * free_vio() - Destroy a vio.
164 * @vio: The vio to destroy.
165 */
free_vio(struct vio * vio)166 void free_vio(struct vio *vio)
167 {
168 free_vio_components(vio);
169 vdo_free(vio);
170 }
171
172 /* Set bio properties for a VDO read or write. */
vdo_set_bio_properties(struct bio * bio,struct vio * vio,bio_end_io_t callback,blk_opf_t bi_opf,physical_block_number_t pbn)173 void vdo_set_bio_properties(struct bio *bio, struct vio *vio, bio_end_io_t callback,
174 blk_opf_t bi_opf, physical_block_number_t pbn)
175 {
176 struct vdo *vdo = vio->completion.vdo;
177 struct device_config *config = vdo->device_config;
178
179 pbn -= vdo->geometry.bio_offset;
180 vio->bio_zone = ((pbn / config->thread_counts.bio_rotation_interval) %
181 config->thread_counts.bio_threads);
182
183 bio->bi_private = vio;
184 bio->bi_end_io = callback;
185 bio->bi_opf = bi_opf;
186 bio->bi_iter.bi_sector = pbn * VDO_SECTORS_PER_BLOCK;
187 }
188
189 /*
190 * Prepares the bio to perform IO with the specified buffer. May only be used on a VDO-allocated
191 * bio, as it assumes the bio wraps a 4k buffer that is 4k aligned, but there does not have to be a
192 * vio associated with the bio.
193 */
vio_reset_bio(struct vio * vio,char * data,bio_end_io_t callback,blk_opf_t bi_opf,physical_block_number_t pbn)194 int vio_reset_bio(struct vio *vio, char *data, bio_end_io_t callback,
195 blk_opf_t bi_opf, physical_block_number_t pbn)
196 {
197 int bvec_count, offset, len, i;
198 struct bio *bio = vio->bio;
199
200 bio_reset(bio, bio->bi_bdev, bi_opf);
201 vdo_set_bio_properties(bio, vio, callback, bi_opf, pbn);
202 if (data == NULL)
203 return VDO_SUCCESS;
204
205 bio->bi_ioprio = 0;
206 bio->bi_io_vec = bio->bi_inline_vecs;
207 bio->bi_max_vecs = vio->block_count + 1;
208 len = VDO_BLOCK_SIZE * vio->block_count;
209 offset = offset_in_page(data);
210 bvec_count = DIV_ROUND_UP(offset + len, PAGE_SIZE);
211
212 /*
213 * If we knew that data was always on one page, or contiguous pages, we wouldn't need the
214 * loop. But if we're using vmalloc, it's not impossible that the data is in different
215 * pages that can't be merged in bio_add_page...
216 */
217 for (i = 0; (i < bvec_count) && (len > 0); i++) {
218 struct page *page;
219 int bytes_added;
220 int bytes = PAGE_SIZE - offset;
221
222 if (bytes > len)
223 bytes = len;
224
225 page = is_vmalloc_addr(data) ? vmalloc_to_page(data) : virt_to_page(data);
226 bytes_added = bio_add_page(bio, page, bytes, offset);
227
228 if (bytes_added != bytes) {
229 return vdo_log_error_strerror(VDO_BIO_CREATION_FAILED,
230 "Could only add %i bytes to bio",
231 bytes_added);
232 }
233
234 data += bytes;
235 len -= bytes;
236 offset = 0;
237 }
238
239 return VDO_SUCCESS;
240 }
241
242 /**
243 * update_vio_error_stats() - Update per-vio error stats and log the error.
244 * @vio: The vio which got an error.
245 * @format: The format of the message to log (a printf style format).
246 */
update_vio_error_stats(struct vio * vio,const char * format,...)247 void update_vio_error_stats(struct vio *vio, const char *format, ...)
248 {
249 static DEFINE_RATELIMIT_STATE(error_limiter, DEFAULT_RATELIMIT_INTERVAL,
250 DEFAULT_RATELIMIT_BURST);
251 va_list args;
252 int priority;
253 struct vdo *vdo = vio->completion.vdo;
254
255 switch (vio->completion.result) {
256 case VDO_READ_ONLY:
257 atomic64_inc(&vdo->stats.read_only_error_count);
258 return;
259
260 case VDO_NO_SPACE:
261 atomic64_inc(&vdo->stats.no_space_error_count);
262 priority = VDO_LOG_DEBUG;
263 break;
264
265 default:
266 priority = VDO_LOG_ERR;
267 }
268
269 if (!__ratelimit(&error_limiter))
270 return;
271
272 va_start(args, format);
273 vdo_vlog_strerror(priority, vio->completion.result, VDO_LOGGING_MODULE_NAME,
274 format, args);
275 va_end(args);
276 }
277
vio_record_metadata_io_error(struct vio * vio)278 void vio_record_metadata_io_error(struct vio *vio)
279 {
280 const char *description;
281 physical_block_number_t pbn = pbn_from_vio_bio(vio->bio);
282
283 if (bio_op(vio->bio) == REQ_OP_READ) {
284 description = "read";
285 } else if ((vio->bio->bi_opf & REQ_PREFLUSH) == REQ_PREFLUSH) {
286 description = (((vio->bio->bi_opf & REQ_FUA) == REQ_FUA) ?
287 "write+preflush+fua" :
288 "write+preflush");
289 } else if ((vio->bio->bi_opf & REQ_FUA) == REQ_FUA) {
290 description = "write+fua";
291 } else {
292 description = "write";
293 }
294
295 update_vio_error_stats(vio,
296 "Completing %s vio of type %u for physical block %llu with error",
297 description, vio->type, (unsigned long long) pbn);
298 }
299
300 /**
301 * make_vio_pool() - Create a new vio pool.
302 * @vdo: The vdo.
303 * @pool_size: The number of vios in the pool.
304 * @thread_id: The ID of the thread using this pool.
305 * @vio_type: The type of vios in the pool.
306 * @priority: The priority with which vios from the pool should be enqueued.
307 * @context: The context that each entry will have.
308 * @pool_ptr: The resulting pool.
309 *
310 * Return: A success or error code.
311 */
make_vio_pool(struct vdo * vdo,size_t pool_size,thread_id_t thread_id,enum vio_type vio_type,enum vio_priority priority,void * context,struct vio_pool ** pool_ptr)312 int make_vio_pool(struct vdo *vdo, size_t pool_size, thread_id_t thread_id,
313 enum vio_type vio_type, enum vio_priority priority, void *context,
314 struct vio_pool **pool_ptr)
315 {
316 struct vio_pool *pool;
317 char *ptr;
318 int result;
319
320 result = vdo_allocate_extended(struct vio_pool, pool_size, struct pooled_vio,
321 __func__, &pool);
322 if (result != VDO_SUCCESS)
323 return result;
324
325 pool->thread_id = thread_id;
326 INIT_LIST_HEAD(&pool->available);
327 INIT_LIST_HEAD(&pool->busy);
328
329 result = vdo_allocate(pool_size * VDO_BLOCK_SIZE, char,
330 "VIO pool buffer", &pool->buffer);
331 if (result != VDO_SUCCESS) {
332 free_vio_pool(pool);
333 return result;
334 }
335
336 ptr = pool->buffer;
337 for (pool->size = 0; pool->size < pool_size; pool->size++, ptr += VDO_BLOCK_SIZE) {
338 struct pooled_vio *pooled = &pool->vios[pool->size];
339
340 result = allocate_vio_components(vdo, vio_type, priority, NULL, 1, ptr,
341 &pooled->vio);
342 if (result != VDO_SUCCESS) {
343 free_vio_pool(pool);
344 return result;
345 }
346
347 pooled->context = context;
348 list_add_tail(&pooled->pool_entry, &pool->available);
349 }
350
351 *pool_ptr = pool;
352 return VDO_SUCCESS;
353 }
354
355 /**
356 * free_vio_pool() - Destroy a vio pool.
357 * @pool: The pool to free.
358 */
free_vio_pool(struct vio_pool * pool)359 void free_vio_pool(struct vio_pool *pool)
360 {
361 struct pooled_vio *pooled, *tmp;
362
363 if (pool == NULL)
364 return;
365
366 /* Remove all available vios from the object pool. */
367 VDO_ASSERT_LOG_ONLY(!vdo_waitq_has_waiters(&pool->waiting),
368 "VIO pool must not have any waiters when being freed");
369 VDO_ASSERT_LOG_ONLY((pool->busy_count == 0),
370 "VIO pool must not have %zu busy entries when being freed",
371 pool->busy_count);
372 VDO_ASSERT_LOG_ONLY(list_empty(&pool->busy),
373 "VIO pool must not have busy entries when being freed");
374
375 list_for_each_entry_safe(pooled, tmp, &pool->available, pool_entry) {
376 list_del(&pooled->pool_entry);
377 free_vio_components(&pooled->vio);
378 pool->size--;
379 }
380
381 VDO_ASSERT_LOG_ONLY(pool->size == 0,
382 "VIO pool must not have missing entries when being freed");
383
384 vdo_free(vdo_forget(pool->buffer));
385 vdo_free(pool);
386 }
387
388 /**
389 * is_vio_pool_busy() - Check whether an vio pool has outstanding entries.
390 *
391 * Return: true if the pool is busy.
392 */
is_vio_pool_busy(struct vio_pool * pool)393 bool is_vio_pool_busy(struct vio_pool *pool)
394 {
395 return (pool->busy_count != 0);
396 }
397
398 /**
399 * acquire_vio_from_pool() - Acquire a vio and buffer from the pool (asynchronous).
400 * @pool: The vio pool.
401 * @waiter: Object that is requesting a vio.
402 */
acquire_vio_from_pool(struct vio_pool * pool,struct vdo_waiter * waiter)403 void acquire_vio_from_pool(struct vio_pool *pool, struct vdo_waiter *waiter)
404 {
405 struct pooled_vio *pooled;
406
407 VDO_ASSERT_LOG_ONLY((pool->thread_id == vdo_get_callback_thread_id()),
408 "acquire from active vio_pool called from correct thread");
409
410 if (list_empty(&pool->available)) {
411 vdo_waitq_enqueue_waiter(&pool->waiting, waiter);
412 return;
413 }
414
415 pooled = list_first_entry(&pool->available, struct pooled_vio, pool_entry);
416 pool->busy_count++;
417 list_move_tail(&pooled->pool_entry, &pool->busy);
418 (*waiter->callback)(waiter, pooled);
419 }
420
421 /**
422 * return_vio_to_pool() - Return a vio to the pool
423 * @pool: The vio pool.
424 * @vio: The pooled vio to return.
425 */
return_vio_to_pool(struct vio_pool * pool,struct pooled_vio * vio)426 void return_vio_to_pool(struct vio_pool *pool, struct pooled_vio *vio)
427 {
428 VDO_ASSERT_LOG_ONLY((pool->thread_id == vdo_get_callback_thread_id()),
429 "vio pool entry returned on same thread as it was acquired");
430
431 vio->vio.completion.error_handler = NULL;
432 vio->vio.completion.parent = NULL;
433 if (vdo_waitq_has_waiters(&pool->waiting)) {
434 vdo_waitq_notify_next_waiter(&pool->waiting, NULL, vio);
435 return;
436 }
437
438 list_move_tail(&vio->pool_entry, &pool->available);
439 --pool->busy_count;
440 }
441
442 /*
443 * Various counting functions for statistics.
444 * These are used for bios coming into VDO, as well as bios generated by VDO.
445 */
vdo_count_bios(struct atomic_bio_stats * bio_stats,struct bio * bio)446 void vdo_count_bios(struct atomic_bio_stats *bio_stats, struct bio *bio)
447 {
448 if (((bio->bi_opf & REQ_PREFLUSH) != 0) && (bio->bi_iter.bi_size == 0)) {
449 atomic64_inc(&bio_stats->empty_flush);
450 atomic64_inc(&bio_stats->flush);
451 return;
452 }
453
454 switch (bio_op(bio)) {
455 case REQ_OP_WRITE:
456 atomic64_inc(&bio_stats->write);
457 break;
458 case REQ_OP_READ:
459 atomic64_inc(&bio_stats->read);
460 break;
461 case REQ_OP_DISCARD:
462 atomic64_inc(&bio_stats->discard);
463 break;
464 /*
465 * All other operations are filtered out in dmvdo.c, or not created by VDO, so
466 * shouldn't exist.
467 */
468 default:
469 VDO_ASSERT_LOG_ONLY(0, "Bio operation %d not a write, read, discard, or empty flush",
470 bio_op(bio));
471 }
472
473 if ((bio->bi_opf & REQ_PREFLUSH) != 0)
474 atomic64_inc(&bio_stats->flush);
475 if (bio->bi_opf & REQ_FUA)
476 atomic64_inc(&bio_stats->fua);
477 }
478
count_all_bios_completed(struct vio * vio,struct bio * bio)479 static void count_all_bios_completed(struct vio *vio, struct bio *bio)
480 {
481 struct atomic_statistics *stats = &vio->completion.vdo->stats;
482
483 if (is_data_vio(vio)) {
484 vdo_count_bios(&stats->bios_out_completed, bio);
485 return;
486 }
487
488 vdo_count_bios(&stats->bios_meta_completed, bio);
489 if (vio->type == VIO_TYPE_RECOVERY_JOURNAL)
490 vdo_count_bios(&stats->bios_journal_completed, bio);
491 else if (vio->type == VIO_TYPE_BLOCK_MAP)
492 vdo_count_bios(&stats->bios_page_cache_completed, bio);
493 }
494
vdo_count_completed_bios(struct bio * bio)495 void vdo_count_completed_bios(struct bio *bio)
496 {
497 struct vio *vio = (struct vio *) bio->bi_private;
498
499 atomic64_inc(&vio->completion.vdo->stats.bios_completed);
500 count_all_bios_completed(vio, bio);
501 }
502