xref: /aosp_15_r20/external/ublksrv/demo_event.c (revision 94c4a1e103eb1715230460aab379dff275992c20)
1 // SPDX-License-Identifier: MIT or GPL-2.0-only
2 
3 #include <config.h>
4 
5 #include <stdlib.h>
6 #include <stdio.h>
7 #include <poll.h>
8 #include <sys/epoll.h>
9 #include <sched.h>
10 #include <pthread.h>
11 #include <getopt.h>
12 #include <stdarg.h>
13 #include <errno.h>
14 #include <error.h>
15 #include <string.h>
16 #include <sys/ioctl.h>
17 #include <sys/types.h>
18 #include <unistd.h>
19 
20 #include "ublksrv.h"
21 #include "ublksrv_utils.h"
22 #include "ublksrv_aio.h"
23 
24 #define UBLKSRV_TGT_TYPE_DEMO  0
25 
26 static bool use_aio = 0;
27 static int backing_fd = -1;
28 
29 static struct ublksrv_aio_ctx *aio_ctx = NULL;
30 static pthread_t io_thread;
31 struct demo_queue_info {
32 	const struct ublksrv_dev *dev;
33 	const struct ublksrv_queue *q;
34 	int qid;
35 
36 	pthread_t thread;
37 };
38 
39 static struct ublksrv_ctrl_dev *this_ctrl_dev;
40 static const struct ublksrv_dev *this_dev;
41 
42 static pthread_mutex_t jbuf_lock;
43 static char jbuf[4096];
44 
sig_handler(int sig)45 static void sig_handler(int sig)
46 {
47 	const struct ublksrv_queue *q = ublksrv_get_queue(this_dev, 0);
48 	unsigned state = ublksrv_queue_state(q);
49 
50 	fprintf(stderr, "got signal %d, stopping %d\n", sig,
51 			state & UBLKSRV_QUEUE_STOPPING);
52 	ublksrv_ctrl_stop_dev(this_ctrl_dev);
53 }
54 
queue_fallocate_async(struct io_uring_sqe * sqe,struct ublksrv_aio * req)55 static void queue_fallocate_async(struct io_uring_sqe *sqe,
56 		struct ublksrv_aio *req)
57 {
58 	__u16 ublk_op = ublksrv_get_op(&req->io);
59 	__u32 flags = ublksrv_get_flags(&req->io);
60 	__u32 mode = FALLOC_FL_KEEP_SIZE;
61 
62 	/* follow logic of linux kernel loop */
63 	if (ublk_op == UBLK_IO_OP_DISCARD) {
64 		mode |= FALLOC_FL_PUNCH_HOLE;
65 	} else if (ublk_op == UBLK_IO_OP_WRITE_ZEROES) {
66 		if (flags & UBLK_IO_F_NOUNMAP)
67 			mode |= FALLOC_FL_ZERO_RANGE;
68 		else
69 			mode |= FALLOC_FL_PUNCH_HOLE;
70 	} else {
71 		mode |= FALLOC_FL_ZERO_RANGE;
72 	}
73 	io_uring_prep_fallocate(sqe, req->fd, mode, req->io.start_sector << 9,
74 			req->io.nr_sectors << 9);
75 }
76 
async_io_submitter(struct ublksrv_aio_ctx * ctx,struct ublksrv_aio * req)77 int async_io_submitter(struct ublksrv_aio_ctx *ctx,
78 		struct ublksrv_aio *req)
79 {
80 	struct io_uring *ring = (struct io_uring*)
81 		ublksrv_aio_get_ctx_data(ctx);
82 	const struct ublksrv_io_desc *iod = &req->io;
83 	unsigned op = ublksrv_get_op(iod);
84 	struct io_uring_sqe *sqe;
85 
86 	sqe = io_uring_get_sqe(ring);
87 	if (!sqe) {
88 		fprintf(stderr, "%s: uring run out of sqe\n", __func__);
89 		return -ENOMEM;
90 	}
91 
92 	if (op == -1 || req->fd < 0) {
93 		fprintf(stderr, "%s: wrong op %d, fd %d, id %x\n", __func__, op,
94 				req->fd, req->id);
95 		return -EINVAL;
96 	}
97 
98 	io_uring_sqe_set_data(sqe, req);
99 	switch (op) {
100 	case UBLK_IO_OP_DISCARD:
101 	case UBLK_IO_OP_WRITE_ZEROES:
102 		queue_fallocate_async(sqe, req);
103 		break;
104 	case UBLK_IO_OP_FLUSH:
105 		io_uring_prep_fsync(sqe, req->fd, IORING_FSYNC_DATASYNC);
106 		break;
107 	case UBLK_IO_OP_READ:
108 		io_uring_prep_read(sqe, req->fd, (void *)iod->addr,
109 				iod->nr_sectors << 9, iod->start_sector << 9);
110 		break;
111 	case UBLK_IO_OP_WRITE:
112 		io_uring_prep_write(sqe, req->fd, (void *)iod->addr,
113 				iod->nr_sectors << 9, iod->start_sector << 9);
114 		break;
115 	default:
116 		fprintf(stderr, "%s: wrong op %d, fd %d, id %x\n", __func__,
117 				op, req->fd, req->id);
118 		return -EINVAL;
119 	}
120 
121 	return 0;
122 }
123 
sync_io_submitter(struct ublksrv_aio_ctx * ctx,struct ublksrv_aio * req)124 int sync_io_submitter(struct ublksrv_aio_ctx *ctx,
125 		struct ublksrv_aio *req)
126 {
127 	const struct ublksrv_io_desc *iod = &req->io;
128 	unsigned ublk_op = ublksrv_get_op(iod);
129 	void *buf = (void *)iod->addr;
130 	unsigned len = iod->nr_sectors << 9;
131 	unsigned long long offset = iod->start_sector << 9;
132 	int mode = FALLOC_FL_KEEP_SIZE;
133 	int ret;
134 
135 	switch (ublk_op) {
136 	case UBLK_IO_OP_READ:
137 		ret = pread(req->fd, buf, len, offset);
138 		break;
139 	case UBLK_IO_OP_WRITE:
140 		ret = pwrite(req->fd, buf, len, offset);
141 		break;
142 	case UBLK_IO_OP_FLUSH:
143 		ret = fdatasync(req->fd);
144 		break;
145 	case UBLK_IO_OP_WRITE_ZEROES:
146 		mode |= FALLOC_FL_ZERO_RANGE;
147 	case UBLK_IO_OP_DISCARD:
148 		ret = fallocate(req->fd, mode, offset, len);
149 		break;
150 	default:
151 		fprintf(stderr, "%s: wrong op %d, fd %d, id %x\n", __func__,
152 				ublk_op, req->fd, req->id);
153 		return -EINVAL;
154 	}
155 
156 	req->res = ret;
157 	return 1;
158 }
159 
io_submit_worker(struct ublksrv_aio_ctx * ctx,struct ublksrv_aio * req)160 static int io_submit_worker(struct ublksrv_aio_ctx *ctx,
161 		struct ublksrv_aio *req)
162 {
163 	/* simulate null target */
164 	if (req->fd < 0)
165 		req->res = req->io.nr_sectors << 9;
166 	else
167 		return sync_io_submitter(ctx, req);
168 
169 	return 1;
170 }
171 
queue_event(struct ublksrv_aio_ctx * ctx)172 static int queue_event(struct ublksrv_aio_ctx *ctx)
173 {
174 	struct io_uring *ring = (struct io_uring *)
175 		ublksrv_aio_get_ctx_data(ctx);
176 	struct io_uring_sqe *sqe;
177 	int ctx_efd = ublksrv_aio_get_efd(ctx);
178 
179 	sqe = io_uring_get_sqe(ring);
180 	if (!sqe) {
181 		fprintf(stderr, "%s: uring run out of sqe\n", __func__);
182 		return -1;
183 	}
184 
185 	io_uring_prep_poll_add(sqe, ctx_efd, POLLIN);
186 	io_uring_sqe_set_data64(sqe, 0);
187 
188 	return 0;
189 }
190 
reap_uring(struct ublksrv_aio_ctx * ctx,struct aio_list * list,int * got_efd)191 static int reap_uring(struct ublksrv_aio_ctx *ctx, struct aio_list *list, int
192 		*got_efd)
193 {
194 	struct io_uring *r = (struct io_uring *)ublksrv_aio_get_ctx_data(ctx);
195 	struct io_uring_cqe *cqe;
196 	unsigned head;
197 	int count = 0;
198 
199 	io_uring_for_each_cqe(r, head, cqe) {
200 		if (cqe->user_data) {
201 			struct ublksrv_aio *req = (struct ublksrv_aio *)
202 				cqe->user_data;
203 
204 			if (cqe->res == -EAGAIN)
205 				async_io_submitter(ctx, req);
206 			else {
207 				req->res = cqe->res;
208 				aio_list_add(list, req);
209 			}
210 		} else {
211 			if (cqe->res < 0)
212 				fprintf(stderr, "eventfd result %d\n",
213 						cqe->res);
214 			*got_efd = 1;
215 		}
216 	        count += 1;
217 	}
218 	io_uring_cq_advance(r, count);
219 
220 	return count;
221 }
222 
demo_event_uring_io_handler_fn(void * data)223 static void *demo_event_uring_io_handler_fn(void *data)
224 {
225 	struct ublksrv_aio_ctx *ctx = (struct ublksrv_aio_ctx *)data;
226 	const struct ublksrv_dev *dev = ublksrv_aio_get_dev(ctx);
227 	const struct ublksrv_ctrl_dev_info *info =
228 		ublksrv_ctrl_get_dev_info(ublksrv_get_ctrl_dev(dev));
229 	unsigned dev_id = info->dev_id;
230 	struct io_uring ring;
231 	unsigned qd;
232 	int ret;
233 	int ctx_efd = ublksrv_aio_get_efd(ctx);
234 
235 	qd = info->queue_depth * info->nr_hw_queues * 2;
236 
237 	io_uring_queue_init(qd, &ring, 0);
238 	ret = io_uring_register_eventfd(&ring, ctx_efd);
239 	if (ret) {
240 		fprintf(stdout, "ublk dev %d fails to register eventfd\n",
241 			dev_id);
242 		return NULL;
243 	}
244 
245 	ublksrv_aio_set_ctx_data(ctx, (void *)&ring);
246 
247 	fprintf(stdout, "ublk dev %d aio(io_uring submitter) context started tid %d\n",
248 			dev_id, ublksrv_gettid());
249 
250 	queue_event(ctx);
251 	io_uring_submit_and_wait(&ring, 0);
252 
253 	while (!ublksrv_aio_ctx_dead(ctx)) {
254 		struct aio_list compl;
255 		int got_efd = 0;
256 
257 		aio_list_init(&compl);
258 		ublksrv_aio_submit_worker(ctx, async_io_submitter, &compl);
259 
260 		reap_uring(ctx, &compl, &got_efd);
261 		ublksrv_aio_complete_worker(ctx, &compl);
262 
263 		if (got_efd)
264 			queue_event(ctx);
265 		io_uring_submit_and_wait(&ring, 1);
266 	}
267 
268 	return NULL;
269 }
270 
271 #define EPOLL_NR_EVENTS 1
demo_event_real_io_handler_fn(void * data)272 static void *demo_event_real_io_handler_fn(void *data)
273 {
274 	struct ublksrv_aio_ctx *ctx = (struct ublksrv_aio_ctx *)data;
275 	const struct ublksrv_dev *dev = ublksrv_aio_get_dev(ctx);
276 	const struct ublksrv_ctrl_dev_info *info =
277 		ublksrv_ctrl_get_dev_info(ublksrv_get_ctrl_dev(dev));
278 
279 	unsigned dev_id = info->dev_id;
280 	struct epoll_event events[EPOLL_NR_EVENTS];
281 	int epoll_fd = epoll_create(EPOLL_NR_EVENTS);
282 	struct epoll_event read_event;
283 	int ctx_efd = ublksrv_aio_get_efd(ctx);
284 
285 	if (epoll_fd < 0) {
286 	        fprintf(stderr, "ublk dev %d create epoll fd failed\n", dev_id);
287 	        return NULL;
288 	}
289 
290 	fprintf(stdout, "ublk dev %d aio context(sync io submitter) started tid %d\n",
291 			dev_id, ublksrv_gettid());
292 
293 	read_event.events = EPOLLIN;
294 	read_event.data.fd = ctx_efd;
295 	(void)epoll_ctl(epoll_fd, EPOLL_CTL_ADD, ctx_efd, &read_event);
296 
297 	while (!ublksrv_aio_ctx_dead(ctx)) {
298 		struct aio_list compl;
299 
300 		aio_list_init(&compl);
301 
302 		ublksrv_aio_submit_worker(ctx, io_submit_worker, &compl);
303 
304 		ublksrv_aio_complete_worker(ctx, &compl);
305 
306 		epoll_wait(epoll_fd, events, EPOLL_NR_EVENTS, -1);
307 	}
308 
309 	return NULL;
310 }
311 
312 /*
313  * io handler for each ublkdev's queue
314  *
315  * Just for showing how to build ublksrv target's io handling, so callers
316  * can apply these APIs in their own thread context for making one ublk
317  * block device.
318  */
demo_event_io_handler_fn(void * data)319 static void *demo_event_io_handler_fn(void *data)
320 {
321 	struct demo_queue_info *info = (struct demo_queue_info *)data;
322 	const struct ublksrv_dev *dev = info->dev;
323 	const struct ublksrv_ctrl_dev_info *dinfo =
324 		ublksrv_ctrl_get_dev_info(ublksrv_get_ctrl_dev(dev));
325 	unsigned dev_id = dinfo->dev_id;
326 	unsigned short q_id = info->qid;
327 	const struct ublksrv_queue *q;
328 
329 	pthread_mutex_lock(&jbuf_lock);
330 	ublksrv_json_write_queue_info(ublksrv_get_ctrl_dev(dev), jbuf, sizeof jbuf,
331 			q_id, ublksrv_gettid());
332 	pthread_mutex_unlock(&jbuf_lock);
333 
334 	q = ublksrv_queue_init(dev, q_id, info);
335 	if (!q) {
336 		fprintf(stderr, "ublk dev %d queue %d init queue failed\n",
337 				dinfo->dev_id, q_id);
338 		return NULL;
339 	}
340 	info->q = q;
341 
342 	fprintf(stdout, "tid %d: ublk dev %d queue %d started\n", ublksrv_gettid(),
343 			dev_id, q->q_id);
344 	do {
345 		if (ublksrv_process_io(q) < 0)
346 			break;
347 	} while (1);
348 
349 	fprintf(stdout, "ublk dev %d queue %d exited\n", dev_id, q->q_id);
350 	ublksrv_queue_deinit(q);
351 	return NULL;
352 }
353 
demo_event_set_parameters(struct ublksrv_ctrl_dev * cdev,const struct ublksrv_dev * dev)354 static void demo_event_set_parameters(struct ublksrv_ctrl_dev *cdev,
355 		const struct ublksrv_dev *dev)
356  {
357 	const struct ublksrv_ctrl_dev_info *info =
358 		ublksrv_ctrl_get_dev_info(cdev);
359 	struct ublk_params p = {
360 		.types = UBLK_PARAM_TYPE_BASIC,
361 		.basic = {
362 			.logical_bs_shift	= 9,
363 			.physical_bs_shift	= 12,
364 			.io_opt_shift		= 12,
365 			.io_min_shift		= 9,
366 			.max_sectors		= info->max_io_buf_bytes >> 9,
367 			.dev_sectors		= dev->tgt.dev_size >> 9,
368 		},
369 	};
370 	int ret;
371 
372 	pthread_mutex_lock(&jbuf_lock);
373 	ublksrv_json_write_params(&p, jbuf, sizeof jbuf);
374 	pthread_mutex_unlock(&jbuf_lock);
375 
376 	ret = ublksrv_ctrl_set_params(cdev, &p);
377 	if (ret)
378 		fprintf(stderr, "dev %d set basic parameter failed %d\n",
379 				info->dev_id, ret);
380 }
381 
382 
demo_event_io_handler(struct ublksrv_ctrl_dev * ctrl_dev)383 static int demo_event_io_handler(struct ublksrv_ctrl_dev *ctrl_dev)
384 {
385 	const struct ublksrv_ctrl_dev_info *dinfo =
386 		ublksrv_ctrl_get_dev_info(ctrl_dev);
387 	int dev_id = dinfo->dev_id;
388 	int ret, i;
389 	const struct ublksrv_dev *dev;
390 	struct demo_queue_info *info_array;
391 	void *thread_ret;
392 
393 	info_array = (struct demo_queue_info *)
394 		calloc(sizeof(struct demo_queue_info), dinfo->nr_hw_queues);
395 	if (!info_array)
396 		return -ENOMEM;
397 
398 	dev = ublksrv_dev_init(ctrl_dev);
399 	if (!dev) {
400 		free(info_array);
401 		return -ENOMEM;
402 	}
403 	this_dev = dev;
404 
405 
406 	aio_ctx = ublksrv_aio_ctx_init(dev, 0);
407 	if (!aio_ctx) {
408 		fprintf(stderr, "dev %d call ublk_aio_ctx_init failed\n", dev_id);
409 		ret = -ENOMEM;
410 		goto fail;
411 	}
412 
413 	if (!use_aio)
414 		pthread_create(&io_thread, NULL, demo_event_real_io_handler_fn,
415 				aio_ctx);
416 	else
417 		pthread_create(&io_thread, NULL, demo_event_uring_io_handler_fn,
418 				aio_ctx);
419 	for (i = 0; i < dinfo->nr_hw_queues; i++) {
420 		info_array[i].dev = dev;
421 		info_array[i].qid = i;
422 
423 		pthread_create(&info_array[i].thread, NULL,
424 				demo_event_io_handler_fn,
425 				&info_array[i]);
426 	}
427 
428 	demo_event_set_parameters(ctrl_dev, dev);
429 
430 	/* everything is fine now, start us */
431 	ret = ublksrv_ctrl_start_dev(ctrl_dev, getpid());
432 	if (ret < 0)
433 		goto fail;
434 
435 	ublksrv_ctrl_get_info(ctrl_dev);
436 	ublksrv_ctrl_dump(ctrl_dev, jbuf);
437 
438 	/* wait until we are terminated */
439 	for (i = 0; i < dinfo->nr_hw_queues; i++) {
440 		pthread_join(info_array[i].thread, &thread_ret);
441 	}
442 	ublksrv_aio_ctx_shutdown(aio_ctx);
443 	pthread_join(io_thread, &thread_ret);
444 	ublksrv_aio_ctx_deinit(aio_ctx);
445 
446 fail:
447 	ublksrv_dev_deinit(dev);
448 
449 	free(info_array);
450 
451 	return ret;
452 }
453 
ublksrv_start_daemon(struct ublksrv_ctrl_dev * ctrl_dev)454 static int ublksrv_start_daemon(struct ublksrv_ctrl_dev *ctrl_dev)
455 {
456 	if (ublksrv_ctrl_get_affinity(ctrl_dev) < 0)
457 		return -1;
458 
459 	return demo_event_io_handler(ctrl_dev);
460 }
461 
demo_init_tgt(struct ublksrv_dev * dev,int type,int argc,char * argv[])462 static int demo_init_tgt(struct ublksrv_dev *dev, int type, int argc,
463 		char *argv[])
464 {
465 	const struct ublksrv_ctrl_dev_info *info =
466 		ublksrv_ctrl_get_dev_info(ublksrv_get_ctrl_dev(dev));
467 	struct ublksrv_tgt_info *tgt = &dev->tgt;
468 	struct ublksrv_tgt_base_json tgt_json = {
469 		.type = type,
470 	};
471 	struct stat st;
472 
473 	strcpy(tgt_json.name, "demo_event");
474 
475 	if (type != UBLKSRV_TGT_TYPE_DEMO)
476 		return -1;
477 
478 	if (backing_fd > 0) {
479 		unsigned long long bytes;
480 
481 		fstat(backing_fd, &st);
482 		if (S_ISBLK(st.st_mode)) {
483 			if (ioctl(backing_fd, BLKGETSIZE64, &bytes) != 0)
484 				return -1;
485 		} else if (S_ISREG(st.st_mode)) {
486 			bytes = st.st_size;
487 		} else {
488 			bytes = 0;
489 		}
490 
491 		tgt->dev_size = bytes;
492 	} else {
493 		tgt->dev_size = 250UL * 1024 * 1024 * 1024;
494 	}
495 
496 	tgt_json.dev_size = tgt->dev_size;
497 	tgt->tgt_ring_depth = info->queue_depth;
498 	tgt->nr_fds = 0;
499 
500 	ublksrv_json_write_dev_info(ublksrv_get_ctrl_dev(dev), jbuf, sizeof jbuf);
501 	ublksrv_json_write_target_base_info(jbuf, sizeof jbuf, &tgt_json);
502 
503 	return 0;
504 }
505 
demo_handle_io_async(const struct ublksrv_queue * q,const struct ublk_io_data * data)506 static int demo_handle_io_async(const struct ublksrv_queue *q,
507 		const struct ublk_io_data *data)
508 {
509 	struct ublksrv_aio *req = ublksrv_aio_alloc_req(aio_ctx, 0);
510 
511 	req->io = *data->iod;
512 	req->fd = backing_fd;
513 	req->id = ublksrv_aio_pid_tag(q->q_id, data->tag);
514 	ublksrv_aio_submit_req(aio_ctx, q, req);
515 
516 	return 0;
517 }
518 
demo_handle_event(const struct ublksrv_queue * q)519 static void demo_handle_event(const struct ublksrv_queue *q)
520 {
521 	ublksrv_aio_handle_event(aio_ctx, q);
522 }
523 
524 static const struct ublksrv_tgt_type demo_event_tgt_type = {
525 	.type	= UBLKSRV_TGT_TYPE_DEMO,
526 	.name	=  "demo_event",
527 	.init_tgt = demo_init_tgt,
528 	.handle_io_async = demo_handle_io_async,
529 	.handle_event = demo_handle_event,
530 };
531 
main(int argc,char * argv[])532 int main(int argc, char *argv[])
533 {
534 	static const struct option longopts[] = {
535 		{ "need_get_data",	1,	NULL, 'g' },
536 		{ "backing_file",	1,	NULL, 'f' },
537 		{ "use_aio",		1,	NULL, 'a' },
538 		{ NULL }
539 	};
540 	struct ublksrv_dev_data data = {
541 		.dev_id = -1,
542 		.max_io_buf_bytes = DEF_BUF_SIZE,
543 		.nr_hw_queues = DEF_NR_HW_QUEUES,
544 		.queue_depth = DEF_QD,
545 		.tgt_type = "demo_event",
546 		.tgt_ops = &demo_event_tgt_type,
547 		.flags = 0,
548 	};
549 	struct ublksrv_ctrl_dev *dev;
550 	int ret, opt;
551 
552 	while ((opt = getopt_long(argc, argv, "f:ga",
553 				  longopts, NULL)) != -1) {
554 		switch (opt) {
555 		case 'g':
556 			data.flags |= UBLK_F_NEED_GET_DATA;
557 			break;
558 		case 'f':
559 			backing_fd = open(optarg, O_RDWR | O_DIRECT);
560 			if (backing_fd < 0)
561 				backing_fd = -1;
562 			break;
563 		case 'a':
564 			use_aio = true;
565 			break;
566 		}
567 	}
568 
569 	if (backing_fd < 0)
570 		use_aio = false;
571 
572 	if (signal(SIGTERM, sig_handler) == SIG_ERR)
573 		error(EXIT_FAILURE, errno, "signal");
574 	if (signal(SIGINT, sig_handler) == SIG_ERR)
575 		error(EXIT_FAILURE, errno, "signal");
576 
577 	pthread_mutex_init(&jbuf_lock, NULL);
578 
579 	data.ublksrv_flags = UBLKSRV_F_NEED_EVENTFD;
580 	dev = ublksrv_ctrl_init(&data);
581 	if (!dev)
582 		error(EXIT_FAILURE, ENODEV, "ublksrv_ctrl_init");
583 	/* ugly, but signal handler needs this_dev */
584 	this_ctrl_dev = dev;
585 
586 	ret = ublksrv_ctrl_add_dev(dev);
587 	if (ret < 0) {
588 		error(0, -ret, "can't add dev %d", data.dev_id);
589 		goto fail;
590 	}
591 
592 	ret = ublksrv_start_daemon(dev);
593 	if (ret < 0) {
594 		error(0, -ret, "can't start daemon");
595 		goto fail_del_dev;
596 	}
597 
598 	ublksrv_ctrl_del_dev(dev);
599 	ublksrv_ctrl_deinit(dev);
600 	exit(EXIT_SUCCESS);
601 
602  fail_del_dev:
603 	ublksrv_ctrl_del_dev(dev);
604  fail:
605 	ublksrv_ctrl_deinit(dev);
606 
607 	exit(EXIT_FAILURE);
608 }
609