1 // SPDX-License-Identifier: MIT or GPL-2.0-only
2
3 #include <config.h>
4
5 #include <stdlib.h>
6 #include <stdio.h>
7 #include <poll.h>
8 #include <sys/epoll.h>
9 #include <sched.h>
10 #include <pthread.h>
11 #include <getopt.h>
12 #include <stdarg.h>
13 #include <errno.h>
14 #include <error.h>
15 #include <string.h>
16 #include <sys/ioctl.h>
17 #include <sys/types.h>
18 #include <unistd.h>
19
20 #include "ublksrv.h"
21 #include "ublksrv_utils.h"
22 #include "ublksrv_aio.h"
23
24 #define UBLKSRV_TGT_TYPE_DEMO 0
25
26 static bool use_aio = 0;
27 static int backing_fd = -1;
28
29 static struct ublksrv_aio_ctx *aio_ctx = NULL;
30 static pthread_t io_thread;
31 struct demo_queue_info {
32 const struct ublksrv_dev *dev;
33 const struct ublksrv_queue *q;
34 int qid;
35
36 pthread_t thread;
37 };
38
39 static struct ublksrv_ctrl_dev *this_ctrl_dev;
40 static const struct ublksrv_dev *this_dev;
41
42 static pthread_mutex_t jbuf_lock;
43 static char jbuf[4096];
44
sig_handler(int sig)45 static void sig_handler(int sig)
46 {
47 const struct ublksrv_queue *q = ublksrv_get_queue(this_dev, 0);
48 unsigned state = ublksrv_queue_state(q);
49
50 fprintf(stderr, "got signal %d, stopping %d\n", sig,
51 state & UBLKSRV_QUEUE_STOPPING);
52 ublksrv_ctrl_stop_dev(this_ctrl_dev);
53 }
54
queue_fallocate_async(struct io_uring_sqe * sqe,struct ublksrv_aio * req)55 static void queue_fallocate_async(struct io_uring_sqe *sqe,
56 struct ublksrv_aio *req)
57 {
58 __u16 ublk_op = ublksrv_get_op(&req->io);
59 __u32 flags = ublksrv_get_flags(&req->io);
60 __u32 mode = FALLOC_FL_KEEP_SIZE;
61
62 /* follow logic of linux kernel loop */
63 if (ublk_op == UBLK_IO_OP_DISCARD) {
64 mode |= FALLOC_FL_PUNCH_HOLE;
65 } else if (ublk_op == UBLK_IO_OP_WRITE_ZEROES) {
66 if (flags & UBLK_IO_F_NOUNMAP)
67 mode |= FALLOC_FL_ZERO_RANGE;
68 else
69 mode |= FALLOC_FL_PUNCH_HOLE;
70 } else {
71 mode |= FALLOC_FL_ZERO_RANGE;
72 }
73 io_uring_prep_fallocate(sqe, req->fd, mode, req->io.start_sector << 9,
74 req->io.nr_sectors << 9);
75 }
76
async_io_submitter(struct ublksrv_aio_ctx * ctx,struct ublksrv_aio * req)77 int async_io_submitter(struct ublksrv_aio_ctx *ctx,
78 struct ublksrv_aio *req)
79 {
80 struct io_uring *ring = (struct io_uring*)
81 ublksrv_aio_get_ctx_data(ctx);
82 const struct ublksrv_io_desc *iod = &req->io;
83 unsigned op = ublksrv_get_op(iod);
84 struct io_uring_sqe *sqe;
85
86 sqe = io_uring_get_sqe(ring);
87 if (!sqe) {
88 fprintf(stderr, "%s: uring run out of sqe\n", __func__);
89 return -ENOMEM;
90 }
91
92 if (op == -1 || req->fd < 0) {
93 fprintf(stderr, "%s: wrong op %d, fd %d, id %x\n", __func__, op,
94 req->fd, req->id);
95 return -EINVAL;
96 }
97
98 io_uring_sqe_set_data(sqe, req);
99 switch (op) {
100 case UBLK_IO_OP_DISCARD:
101 case UBLK_IO_OP_WRITE_ZEROES:
102 queue_fallocate_async(sqe, req);
103 break;
104 case UBLK_IO_OP_FLUSH:
105 io_uring_prep_fsync(sqe, req->fd, IORING_FSYNC_DATASYNC);
106 break;
107 case UBLK_IO_OP_READ:
108 io_uring_prep_read(sqe, req->fd, (void *)iod->addr,
109 iod->nr_sectors << 9, iod->start_sector << 9);
110 break;
111 case UBLK_IO_OP_WRITE:
112 io_uring_prep_write(sqe, req->fd, (void *)iod->addr,
113 iod->nr_sectors << 9, iod->start_sector << 9);
114 break;
115 default:
116 fprintf(stderr, "%s: wrong op %d, fd %d, id %x\n", __func__,
117 op, req->fd, req->id);
118 return -EINVAL;
119 }
120
121 return 0;
122 }
123
sync_io_submitter(struct ublksrv_aio_ctx * ctx,struct ublksrv_aio * req)124 int sync_io_submitter(struct ublksrv_aio_ctx *ctx,
125 struct ublksrv_aio *req)
126 {
127 const struct ublksrv_io_desc *iod = &req->io;
128 unsigned ublk_op = ublksrv_get_op(iod);
129 void *buf = (void *)iod->addr;
130 unsigned len = iod->nr_sectors << 9;
131 unsigned long long offset = iod->start_sector << 9;
132 int mode = FALLOC_FL_KEEP_SIZE;
133 int ret;
134
135 switch (ublk_op) {
136 case UBLK_IO_OP_READ:
137 ret = pread(req->fd, buf, len, offset);
138 break;
139 case UBLK_IO_OP_WRITE:
140 ret = pwrite(req->fd, buf, len, offset);
141 break;
142 case UBLK_IO_OP_FLUSH:
143 ret = fdatasync(req->fd);
144 break;
145 case UBLK_IO_OP_WRITE_ZEROES:
146 mode |= FALLOC_FL_ZERO_RANGE;
147 case UBLK_IO_OP_DISCARD:
148 ret = fallocate(req->fd, mode, offset, len);
149 break;
150 default:
151 fprintf(stderr, "%s: wrong op %d, fd %d, id %x\n", __func__,
152 ublk_op, req->fd, req->id);
153 return -EINVAL;
154 }
155
156 req->res = ret;
157 return 1;
158 }
159
io_submit_worker(struct ublksrv_aio_ctx * ctx,struct ublksrv_aio * req)160 static int io_submit_worker(struct ublksrv_aio_ctx *ctx,
161 struct ublksrv_aio *req)
162 {
163 /* simulate null target */
164 if (req->fd < 0)
165 req->res = req->io.nr_sectors << 9;
166 else
167 return sync_io_submitter(ctx, req);
168
169 return 1;
170 }
171
queue_event(struct ublksrv_aio_ctx * ctx)172 static int queue_event(struct ublksrv_aio_ctx *ctx)
173 {
174 struct io_uring *ring = (struct io_uring *)
175 ublksrv_aio_get_ctx_data(ctx);
176 struct io_uring_sqe *sqe;
177 int ctx_efd = ublksrv_aio_get_efd(ctx);
178
179 sqe = io_uring_get_sqe(ring);
180 if (!sqe) {
181 fprintf(stderr, "%s: uring run out of sqe\n", __func__);
182 return -1;
183 }
184
185 io_uring_prep_poll_add(sqe, ctx_efd, POLLIN);
186 io_uring_sqe_set_data64(sqe, 0);
187
188 return 0;
189 }
190
reap_uring(struct ublksrv_aio_ctx * ctx,struct aio_list * list,int * got_efd)191 static int reap_uring(struct ublksrv_aio_ctx *ctx, struct aio_list *list, int
192 *got_efd)
193 {
194 struct io_uring *r = (struct io_uring *)ublksrv_aio_get_ctx_data(ctx);
195 struct io_uring_cqe *cqe;
196 unsigned head;
197 int count = 0;
198
199 io_uring_for_each_cqe(r, head, cqe) {
200 if (cqe->user_data) {
201 struct ublksrv_aio *req = (struct ublksrv_aio *)
202 cqe->user_data;
203
204 if (cqe->res == -EAGAIN)
205 async_io_submitter(ctx, req);
206 else {
207 req->res = cqe->res;
208 aio_list_add(list, req);
209 }
210 } else {
211 if (cqe->res < 0)
212 fprintf(stderr, "eventfd result %d\n",
213 cqe->res);
214 *got_efd = 1;
215 }
216 count += 1;
217 }
218 io_uring_cq_advance(r, count);
219
220 return count;
221 }
222
demo_event_uring_io_handler_fn(void * data)223 static void *demo_event_uring_io_handler_fn(void *data)
224 {
225 struct ublksrv_aio_ctx *ctx = (struct ublksrv_aio_ctx *)data;
226 const struct ublksrv_dev *dev = ublksrv_aio_get_dev(ctx);
227 const struct ublksrv_ctrl_dev_info *info =
228 ublksrv_ctrl_get_dev_info(ublksrv_get_ctrl_dev(dev));
229 unsigned dev_id = info->dev_id;
230 struct io_uring ring;
231 unsigned qd;
232 int ret;
233 int ctx_efd = ublksrv_aio_get_efd(ctx);
234
235 qd = info->queue_depth * info->nr_hw_queues * 2;
236
237 io_uring_queue_init(qd, &ring, 0);
238 ret = io_uring_register_eventfd(&ring, ctx_efd);
239 if (ret) {
240 fprintf(stdout, "ublk dev %d fails to register eventfd\n",
241 dev_id);
242 return NULL;
243 }
244
245 ublksrv_aio_set_ctx_data(ctx, (void *)&ring);
246
247 fprintf(stdout, "ublk dev %d aio(io_uring submitter) context started tid %d\n",
248 dev_id, ublksrv_gettid());
249
250 queue_event(ctx);
251 io_uring_submit_and_wait(&ring, 0);
252
253 while (!ublksrv_aio_ctx_dead(ctx)) {
254 struct aio_list compl;
255 int got_efd = 0;
256
257 aio_list_init(&compl);
258 ublksrv_aio_submit_worker(ctx, async_io_submitter, &compl);
259
260 reap_uring(ctx, &compl, &got_efd);
261 ublksrv_aio_complete_worker(ctx, &compl);
262
263 if (got_efd)
264 queue_event(ctx);
265 io_uring_submit_and_wait(&ring, 1);
266 }
267
268 return NULL;
269 }
270
271 #define EPOLL_NR_EVENTS 1
demo_event_real_io_handler_fn(void * data)272 static void *demo_event_real_io_handler_fn(void *data)
273 {
274 struct ublksrv_aio_ctx *ctx = (struct ublksrv_aio_ctx *)data;
275 const struct ublksrv_dev *dev = ublksrv_aio_get_dev(ctx);
276 const struct ublksrv_ctrl_dev_info *info =
277 ublksrv_ctrl_get_dev_info(ublksrv_get_ctrl_dev(dev));
278
279 unsigned dev_id = info->dev_id;
280 struct epoll_event events[EPOLL_NR_EVENTS];
281 int epoll_fd = epoll_create(EPOLL_NR_EVENTS);
282 struct epoll_event read_event;
283 int ctx_efd = ublksrv_aio_get_efd(ctx);
284
285 if (epoll_fd < 0) {
286 fprintf(stderr, "ublk dev %d create epoll fd failed\n", dev_id);
287 return NULL;
288 }
289
290 fprintf(stdout, "ublk dev %d aio context(sync io submitter) started tid %d\n",
291 dev_id, ublksrv_gettid());
292
293 read_event.events = EPOLLIN;
294 read_event.data.fd = ctx_efd;
295 (void)epoll_ctl(epoll_fd, EPOLL_CTL_ADD, ctx_efd, &read_event);
296
297 while (!ublksrv_aio_ctx_dead(ctx)) {
298 struct aio_list compl;
299
300 aio_list_init(&compl);
301
302 ublksrv_aio_submit_worker(ctx, io_submit_worker, &compl);
303
304 ublksrv_aio_complete_worker(ctx, &compl);
305
306 epoll_wait(epoll_fd, events, EPOLL_NR_EVENTS, -1);
307 }
308
309 return NULL;
310 }
311
312 /*
313 * io handler for each ublkdev's queue
314 *
315 * Just for showing how to build ublksrv target's io handling, so callers
316 * can apply these APIs in their own thread context for making one ublk
317 * block device.
318 */
demo_event_io_handler_fn(void * data)319 static void *demo_event_io_handler_fn(void *data)
320 {
321 struct demo_queue_info *info = (struct demo_queue_info *)data;
322 const struct ublksrv_dev *dev = info->dev;
323 const struct ublksrv_ctrl_dev_info *dinfo =
324 ublksrv_ctrl_get_dev_info(ublksrv_get_ctrl_dev(dev));
325 unsigned dev_id = dinfo->dev_id;
326 unsigned short q_id = info->qid;
327 const struct ublksrv_queue *q;
328
329 pthread_mutex_lock(&jbuf_lock);
330 ublksrv_json_write_queue_info(ublksrv_get_ctrl_dev(dev), jbuf, sizeof jbuf,
331 q_id, ublksrv_gettid());
332 pthread_mutex_unlock(&jbuf_lock);
333
334 q = ublksrv_queue_init(dev, q_id, info);
335 if (!q) {
336 fprintf(stderr, "ublk dev %d queue %d init queue failed\n",
337 dinfo->dev_id, q_id);
338 return NULL;
339 }
340 info->q = q;
341
342 fprintf(stdout, "tid %d: ublk dev %d queue %d started\n", ublksrv_gettid(),
343 dev_id, q->q_id);
344 do {
345 if (ublksrv_process_io(q) < 0)
346 break;
347 } while (1);
348
349 fprintf(stdout, "ublk dev %d queue %d exited\n", dev_id, q->q_id);
350 ublksrv_queue_deinit(q);
351 return NULL;
352 }
353
demo_event_set_parameters(struct ublksrv_ctrl_dev * cdev,const struct ublksrv_dev * dev)354 static void demo_event_set_parameters(struct ublksrv_ctrl_dev *cdev,
355 const struct ublksrv_dev *dev)
356 {
357 const struct ublksrv_ctrl_dev_info *info =
358 ublksrv_ctrl_get_dev_info(cdev);
359 struct ublk_params p = {
360 .types = UBLK_PARAM_TYPE_BASIC,
361 .basic = {
362 .logical_bs_shift = 9,
363 .physical_bs_shift = 12,
364 .io_opt_shift = 12,
365 .io_min_shift = 9,
366 .max_sectors = info->max_io_buf_bytes >> 9,
367 .dev_sectors = dev->tgt.dev_size >> 9,
368 },
369 };
370 int ret;
371
372 pthread_mutex_lock(&jbuf_lock);
373 ublksrv_json_write_params(&p, jbuf, sizeof jbuf);
374 pthread_mutex_unlock(&jbuf_lock);
375
376 ret = ublksrv_ctrl_set_params(cdev, &p);
377 if (ret)
378 fprintf(stderr, "dev %d set basic parameter failed %d\n",
379 info->dev_id, ret);
380 }
381
382
demo_event_io_handler(struct ublksrv_ctrl_dev * ctrl_dev)383 static int demo_event_io_handler(struct ublksrv_ctrl_dev *ctrl_dev)
384 {
385 const struct ublksrv_ctrl_dev_info *dinfo =
386 ublksrv_ctrl_get_dev_info(ctrl_dev);
387 int dev_id = dinfo->dev_id;
388 int ret, i;
389 const struct ublksrv_dev *dev;
390 struct demo_queue_info *info_array;
391 void *thread_ret;
392
393 info_array = (struct demo_queue_info *)
394 calloc(sizeof(struct demo_queue_info), dinfo->nr_hw_queues);
395 if (!info_array)
396 return -ENOMEM;
397
398 dev = ublksrv_dev_init(ctrl_dev);
399 if (!dev) {
400 free(info_array);
401 return -ENOMEM;
402 }
403 this_dev = dev;
404
405
406 aio_ctx = ublksrv_aio_ctx_init(dev, 0);
407 if (!aio_ctx) {
408 fprintf(stderr, "dev %d call ublk_aio_ctx_init failed\n", dev_id);
409 ret = -ENOMEM;
410 goto fail;
411 }
412
413 if (!use_aio)
414 pthread_create(&io_thread, NULL, demo_event_real_io_handler_fn,
415 aio_ctx);
416 else
417 pthread_create(&io_thread, NULL, demo_event_uring_io_handler_fn,
418 aio_ctx);
419 for (i = 0; i < dinfo->nr_hw_queues; i++) {
420 info_array[i].dev = dev;
421 info_array[i].qid = i;
422
423 pthread_create(&info_array[i].thread, NULL,
424 demo_event_io_handler_fn,
425 &info_array[i]);
426 }
427
428 demo_event_set_parameters(ctrl_dev, dev);
429
430 /* everything is fine now, start us */
431 ret = ublksrv_ctrl_start_dev(ctrl_dev, getpid());
432 if (ret < 0)
433 goto fail;
434
435 ublksrv_ctrl_get_info(ctrl_dev);
436 ublksrv_ctrl_dump(ctrl_dev, jbuf);
437
438 /* wait until we are terminated */
439 for (i = 0; i < dinfo->nr_hw_queues; i++) {
440 pthread_join(info_array[i].thread, &thread_ret);
441 }
442 ublksrv_aio_ctx_shutdown(aio_ctx);
443 pthread_join(io_thread, &thread_ret);
444 ublksrv_aio_ctx_deinit(aio_ctx);
445
446 fail:
447 ublksrv_dev_deinit(dev);
448
449 free(info_array);
450
451 return ret;
452 }
453
ublksrv_start_daemon(struct ublksrv_ctrl_dev * ctrl_dev)454 static int ublksrv_start_daemon(struct ublksrv_ctrl_dev *ctrl_dev)
455 {
456 if (ublksrv_ctrl_get_affinity(ctrl_dev) < 0)
457 return -1;
458
459 return demo_event_io_handler(ctrl_dev);
460 }
461
demo_init_tgt(struct ublksrv_dev * dev,int type,int argc,char * argv[])462 static int demo_init_tgt(struct ublksrv_dev *dev, int type, int argc,
463 char *argv[])
464 {
465 const struct ublksrv_ctrl_dev_info *info =
466 ublksrv_ctrl_get_dev_info(ublksrv_get_ctrl_dev(dev));
467 struct ublksrv_tgt_info *tgt = &dev->tgt;
468 struct ublksrv_tgt_base_json tgt_json = {
469 .type = type,
470 };
471 struct stat st;
472
473 strcpy(tgt_json.name, "demo_event");
474
475 if (type != UBLKSRV_TGT_TYPE_DEMO)
476 return -1;
477
478 if (backing_fd > 0) {
479 unsigned long long bytes;
480
481 fstat(backing_fd, &st);
482 if (S_ISBLK(st.st_mode)) {
483 if (ioctl(backing_fd, BLKGETSIZE64, &bytes) != 0)
484 return -1;
485 } else if (S_ISREG(st.st_mode)) {
486 bytes = st.st_size;
487 } else {
488 bytes = 0;
489 }
490
491 tgt->dev_size = bytes;
492 } else {
493 tgt->dev_size = 250UL * 1024 * 1024 * 1024;
494 }
495
496 tgt_json.dev_size = tgt->dev_size;
497 tgt->tgt_ring_depth = info->queue_depth;
498 tgt->nr_fds = 0;
499
500 ublksrv_json_write_dev_info(ublksrv_get_ctrl_dev(dev), jbuf, sizeof jbuf);
501 ublksrv_json_write_target_base_info(jbuf, sizeof jbuf, &tgt_json);
502
503 return 0;
504 }
505
demo_handle_io_async(const struct ublksrv_queue * q,const struct ublk_io_data * data)506 static int demo_handle_io_async(const struct ublksrv_queue *q,
507 const struct ublk_io_data *data)
508 {
509 struct ublksrv_aio *req = ublksrv_aio_alloc_req(aio_ctx, 0);
510
511 req->io = *data->iod;
512 req->fd = backing_fd;
513 req->id = ublksrv_aio_pid_tag(q->q_id, data->tag);
514 ublksrv_aio_submit_req(aio_ctx, q, req);
515
516 return 0;
517 }
518
demo_handle_event(const struct ublksrv_queue * q)519 static void demo_handle_event(const struct ublksrv_queue *q)
520 {
521 ublksrv_aio_handle_event(aio_ctx, q);
522 }
523
524 static const struct ublksrv_tgt_type demo_event_tgt_type = {
525 .type = UBLKSRV_TGT_TYPE_DEMO,
526 .name = "demo_event",
527 .init_tgt = demo_init_tgt,
528 .handle_io_async = demo_handle_io_async,
529 .handle_event = demo_handle_event,
530 };
531
main(int argc,char * argv[])532 int main(int argc, char *argv[])
533 {
534 static const struct option longopts[] = {
535 { "need_get_data", 1, NULL, 'g' },
536 { "backing_file", 1, NULL, 'f' },
537 { "use_aio", 1, NULL, 'a' },
538 { NULL }
539 };
540 struct ublksrv_dev_data data = {
541 .dev_id = -1,
542 .max_io_buf_bytes = DEF_BUF_SIZE,
543 .nr_hw_queues = DEF_NR_HW_QUEUES,
544 .queue_depth = DEF_QD,
545 .tgt_type = "demo_event",
546 .tgt_ops = &demo_event_tgt_type,
547 .flags = 0,
548 };
549 struct ublksrv_ctrl_dev *dev;
550 int ret, opt;
551
552 while ((opt = getopt_long(argc, argv, "f:ga",
553 longopts, NULL)) != -1) {
554 switch (opt) {
555 case 'g':
556 data.flags |= UBLK_F_NEED_GET_DATA;
557 break;
558 case 'f':
559 backing_fd = open(optarg, O_RDWR | O_DIRECT);
560 if (backing_fd < 0)
561 backing_fd = -1;
562 break;
563 case 'a':
564 use_aio = true;
565 break;
566 }
567 }
568
569 if (backing_fd < 0)
570 use_aio = false;
571
572 if (signal(SIGTERM, sig_handler) == SIG_ERR)
573 error(EXIT_FAILURE, errno, "signal");
574 if (signal(SIGINT, sig_handler) == SIG_ERR)
575 error(EXIT_FAILURE, errno, "signal");
576
577 pthread_mutex_init(&jbuf_lock, NULL);
578
579 data.ublksrv_flags = UBLKSRV_F_NEED_EVENTFD;
580 dev = ublksrv_ctrl_init(&data);
581 if (!dev)
582 error(EXIT_FAILURE, ENODEV, "ublksrv_ctrl_init");
583 /* ugly, but signal handler needs this_dev */
584 this_ctrl_dev = dev;
585
586 ret = ublksrv_ctrl_add_dev(dev);
587 if (ret < 0) {
588 error(0, -ret, "can't add dev %d", data.dev_id);
589 goto fail;
590 }
591
592 ret = ublksrv_start_daemon(dev);
593 if (ret < 0) {
594 error(0, -ret, "can't start daemon");
595 goto fail_del_dev;
596 }
597
598 ublksrv_ctrl_del_dev(dev);
599 ublksrv_ctrl_deinit(dev);
600 exit(EXIT_SUCCESS);
601
602 fail_del_dev:
603 ublksrv_ctrl_del_dev(dev);
604 fail:
605 ublksrv_ctrl_deinit(dev);
606
607 exit(EXIT_FAILURE);
608 }
609