xref: /aosp_15_r20/external/ublksrv/nbd/tgt_nbd.cpp (revision 94c4a1e103eb1715230460aab379dff275992c20)
1 // SPDX-License-Identifier: GPL-2.0
2 
3 #include <config.h>
4 #include <vector>
5 #include "ublksrv_tgt.h"
6 #include "ublksrv_tgt_endian.h"
7 #include "cliserv.h"
8 #include "nbd.h"
9 
10 //#define NBD_DEBUG_HANDSHAKE 1
11 //#define NBD_DEBUG_IO 1
12 //#define NBD_DEBUG_CQE 1
13 
14 #ifdef NBD_DEBUG_IO
15 #define NBD_IO_DBG  ublk_err
16 #else
17 #define NBD_IO_DBG(...)
18 #endif
19 
20 #ifdef NBD_DEBUG_HANDSHAKE
21 #define NBD_HS_DBG  ublk_err
22 #else
23 #define NBD_HS_DBG(...)
24 #endif
25 
26 #define nbd_err	ublk_err
27 
28 #define NBD_MAX_NAME	512
29 
30 #define NBD_OP_READ_REQ  0x80
31 #define NBD_OP_READ_REPLY  0x81
32 
33 struct nbd_tgt_data {
34 	bool unix_sock;
35 	bool use_send_zc;
36 };
37 
38 #ifndef HAVE_LIBURING_SEND_ZC
39 #define io_uring_prep_sendmsg_zc io_uring_prep_sendmsg
40 #define IORING_CQE_F_NOTIF (1U << 3)
41 #endif
42 
43 struct nbd_queue_data {
44 	unsigned short in_flight_ios;
45 
46 	unsigned short recv_started:1;
47 	unsigned short use_send_zc:1;
48 	unsigned short use_unix_sock:1;
49 	unsigned short need_handle_recv:1;
50 	unsigned short send_sqe_chain_busy:1;
51 
52 	unsigned int chained_send_ios;
53 
54 	/*
55 	 * When the current chain is busy, staggering send ios
56 	 * into this queue(next_chain). After the current chain
57 	 * is consumed, submit all send ios in 'next_chain' as
58 	 * one whole batch.
59 	 */
60 	std::vector <const struct ublk_io_data *> next_chain;
61 
62 	struct io_uring_sqe *last_send_sqe;
63 	struct nbd_reply reply;
64 	struct io_uring_cqe recv_cqe;
65 };
66 
67 struct nbd_io_data {
68 	unsigned int cmd_cookie;
69 	unsigned int done;	//for handling partial recv
70 };
71 
72 static inline struct nbd_queue_data *
nbd_get_queue_data(const struct ublksrv_queue * q)73 nbd_get_queue_data(const struct ublksrv_queue *q)
74 {
75 	return (struct nbd_queue_data *)q->private_data;
76 }
77 
78 static inline struct nbd_io_data *
io_tgt_to_nbd_data(const struct ublk_io_tgt * io)79 io_tgt_to_nbd_data(const struct ublk_io_tgt *io)
80 {
81 	return (struct nbd_io_data *)(io + 1);
82 }
83 
req_to_nbd_cmd_type(const struct ublksrv_io_desc * iod)84 static int req_to_nbd_cmd_type(const struct ublksrv_io_desc *iod)
85 {
86 	switch (ublksrv_get_op(iod)) {
87 	case UBLK_IO_OP_DISCARD:
88 		return NBD_CMD_TRIM;
89 	case UBLK_IO_OP_FLUSH:
90 		return NBD_CMD_FLUSH;
91 	case UBLK_IO_OP_WRITE:
92 		return NBD_CMD_WRITE;
93 	case UBLK_IO_OP_READ:
94 		return NBD_CMD_READ;
95 	default:
96 		return -1;
97 	}
98 }
99 
is_recv_io(const struct ublksrv_queue * q,const struct ublk_io_data * data)100 static inline bool is_recv_io(const struct ublksrv_queue *q,
101 		const struct ublk_io_data *data)
102 {
103 	return data->tag >= q->q_depth;
104 }
105 
106 #define NBD_COOKIE_BITS 32
nbd_cmd_handle(const struct ublksrv_queue * q,const struct ublk_io_data * data,const struct nbd_io_data * nbd_data)107 static inline u64 nbd_cmd_handle(const struct ublksrv_queue *q,
108 		const struct ublk_io_data *data,
109 		const struct nbd_io_data *nbd_data)
110 {
111 	u64 cookie = nbd_data->cmd_cookie;
112 
113 	return (cookie << NBD_COOKIE_BITS) | ublk_unique_tag(q->q_id, data->tag);
114 }
115 
nbd_handle_to_cookie(u64 handle)116 static inline u32 nbd_handle_to_cookie(u64 handle)
117 {
118 	return (u32)(handle >> NBD_COOKIE_BITS);
119 }
120 
nbd_handle_to_tag(u64 handle)121 static inline u32 nbd_handle_to_tag(u64 handle)
122 {
123 	return (u32)handle;
124 }
125 
__nbd_build_req(const struct ublksrv_queue * q,const struct ublk_io_data * data,const struct nbd_io_data * nbd_data,u32 type,struct nbd_request * req)126 static inline void __nbd_build_req(const struct ublksrv_queue *q,
127 		const struct ublk_io_data *data,
128 		const struct nbd_io_data *nbd_data,
129 		u32 type, struct nbd_request *req)
130 {
131 	u32 nbd_cmd_flags = 0;
132 	u64 handle;
133 
134 	if (data->iod->op_flags & UBLK_IO_F_FUA)
135 		nbd_cmd_flags |= NBD_CMD_FLAG_FUA;
136 
137 	req->type = htonl(type | nbd_cmd_flags);
138 
139 	if (type != NBD_CMD_FLUSH) {
140 		req->from = cpu_to_be64((u64)data->iod->start_sector << 9);
141 		req->len = htonl(data->iod->nr_sectors << 9);
142 	}
143 
144 	handle = nbd_cmd_handle(q, data, nbd_data);
145 	memcpy(req->handle, &handle, sizeof(handle));
146 }
147 
nbd_queue_req(const struct ublksrv_queue * q,const struct ublk_io_data * data,const struct nbd_request * req,const struct msghdr * msg)148 static int nbd_queue_req(const struct ublksrv_queue *q,
149 		const struct ublk_io_data *data,
150 		const struct nbd_request *req, const struct msghdr *msg)
151 {
152 	struct nbd_queue_data *q_data = nbd_get_queue_data(q);
153 	const struct ublksrv_io_desc *iod = data->iod;
154 	struct io_uring_sqe *sqe = io_uring_get_sqe(q->ring_ptr);
155 	unsigned ublk_op = ublksrv_get_op(iod);
156 	unsigned msg_flags = MSG_NOSIGNAL;
157 
158 	if (!sqe) {
159 		nbd_err("%s: get sqe failed, tag %d op %d\n",
160 				__func__, data->tag, ublk_op);
161 		return -ENOMEM;
162 	}
163 
164 	/*
165 	 * Always set WAITALL, so io_uring will handle retry in case of
166 	 * short send, see below link:
167 	 *
168 	 * https://lore.kernel.org/io-uring/[email protected]/
169 	 *
170 	 * note: It was added for recv* in 5.18 and send* in 5.19.
171 	 */
172 	msg_flags |= MSG_WAITALL;
173 
174 	if (ublk_op != UBLK_IO_OP_WRITE) {
175 		io_uring_prep_send(sqe, q->q_id + 1, req,
176 				sizeof(*req), msg_flags);
177 	} else {
178 		if (q_data->use_send_zc)
179 			io_uring_prep_sendmsg_zc(sqe, q->q_id + 1, msg,
180 				msg_flags);
181 		else
182 			io_uring_prep_sendmsg(sqe, q->q_id + 1, msg,
183 				msg_flags);
184 	}
185 
186 	if (ublk_op == UBLK_IO_OP_READ)
187 		ublk_op = NBD_OP_READ_REQ;
188 
189 	/*
190 	 * The encoded nr_sectors should only be used for validating write req
191 	 * when its cqe is completed, since iod data isn't available at that time
192 	 * because request can be reused.
193 	 */
194 	sqe->user_data = build_user_data(data->tag, ublk_op, ublk_op ==
195 			UBLK_IO_OP_WRITE ? data->iod->nr_sectors : 0, 1);
196 	io_uring_sqe_set_flags(sqe, /*IOSQE_CQE_SKIP_SUCCESS |*/
197 			IOSQE_FIXED_FILE | IOSQE_IO_LINK);
198 	q_data->last_send_sqe = sqe;
199 	q_data->chained_send_ios += 1;
200 
201 	NBD_IO_DBG("%s: queue io op %d(%llu %x %llx) ios(%u %u)"
202 			" (qid %d tag %u, cmd_op %u target: %d, user_data %llx)\n",
203 		__func__, ublk_op, data->iod->start_sector,
204 		data->iod->nr_sectors, sqe->addr,
205 		q_data->in_flight_ios, q_data->chained_send_ios,
206 		q->q_id, data->tag, ublk_op, 1, sqe->user_data);
207 
208 	return 1;
209 }
210 
__nbd_handle_io_async(const struct ublksrv_queue * q,const struct ublk_io_data * data,struct ublk_io_tgt * io)211 static co_io_job __nbd_handle_io_async(const struct ublksrv_queue *q,
212 		const struct ublk_io_data *data, struct ublk_io_tgt *io)
213 {
214 	int ret = -EIO;
215 	struct nbd_request req = {.magic = htonl(NBD_REQUEST_MAGIC),};
216 	struct nbd_queue_data *q_data = nbd_get_queue_data(q);
217 	struct nbd_io_data *nbd_data = io_tgt_to_nbd_data(io);
218 	int type = req_to_nbd_cmd_type(data->iod);
219 	struct iovec iov[2] = {
220 		[0] = {
221 			.iov_base = (void *)&req,
222 			.iov_len = sizeof(req),
223 		},
224 		[1] = {
225 			.iov_base = (void *)data->iod->addr,
226 			.iov_len = data->iod->nr_sectors << 9,
227 		},
228 	};
229 	struct msghdr msg = {
230 		.msg_iov = iov,
231 		.msg_iovlen = 2,
232 	};
233 
234 	if (type == -1)
235 		goto fail;
236 
237 	nbd_data->cmd_cookie += 1;
238 
239 	__nbd_build_req(q, data, nbd_data, type, &req);
240 	q_data->in_flight_ios += 1;
241 
242 	nbd_data->done = 0;
243 
244 again:
245 	ret = nbd_queue_req(q, data, &req, &msg);
246 	if (ret < 0)
247 		goto fail;
248 
249 	co_await__suspend_always(data->tag);
250 	if (io->tgt_io_cqe->res == -EAGAIN)
251 		goto again;
252 	ret = io->tgt_io_cqe->res;
253 fail:
254 	if (ret < 0)
255 		nbd_err("%s: err %d\n", __func__, ret);
256 	else
257 		ret += nbd_data->done;
258 	ublksrv_complete_io(q, data->tag, ret);
259 	q_data->in_flight_ios -= 1;
260 	NBD_IO_DBG("%s: tag %d res %d\n", __func__, data->tag, ret);
261 
262 	co_return;
263 }
264 
nbd_handle_recv_reply(const struct ublksrv_queue * q,struct nbd_io_data * nbd_data,const struct io_uring_cqe * cqe,const struct ublk_io_data ** io_data)265 static int nbd_handle_recv_reply(const struct ublksrv_queue *q,
266 		struct nbd_io_data *nbd_data,
267 		const struct io_uring_cqe *cqe,
268 		const struct ublk_io_data **io_data)
269 {
270 	struct nbd_queue_data *q_data = nbd_get_queue_data(q);
271 	const struct ublk_io_data *data;
272 	struct ublk_io_tgt *io;
273 	u64 handle;
274 	int tag, hwq;
275 	unsigned ublk_op;
276 	int ret = -EINVAL;
277 
278 	if (cqe->res < 0) {
279 		nbd_err("%s %d: reply cqe %d\n", __func__,
280 				__LINE__, cqe->res);
281 		ret = cqe->res;
282 		goto fail;
283 	} else if (cqe->res == 0 && !nbd_data->done) {
284 		//return 0;
285 		nbd_err("%s %d: zero reply cqe %d %llx\n", __func__,
286 				__LINE__, cqe->res, cqe->user_data);
287 	}
288 
289 	if (ntohl(q_data->reply.magic) != NBD_REPLY_MAGIC) {
290 		nbd_err("%s %d: reply bad magic %x res %d\n",
291 				__func__, __LINE__,
292 				ntohl(q_data->reply.magic), cqe->res);
293 		ret = -EPROTO;
294 		goto fail;
295 	}
296 
297 	if (cqe->res + nbd_data->done != sizeof(struct nbd_reply)) {
298 		nbd_err("%s %d: bad reply cqe %d %llx, done %u\n",
299 				__func__, __LINE__,
300 				cqe->res, cqe->user_data,
301 				nbd_data->done);
302 	}
303 	ublk_assert(cqe->res + nbd_data->done == sizeof(struct nbd_reply));
304 
305 	memcpy(&handle, q_data->reply.handle, sizeof(handle));
306 	tag = nbd_handle_to_tag(handle);
307 	hwq = ublk_unique_tag_to_hwq(tag);
308 	tag = ublk_unique_tag_to_tag(tag);
309 
310 	if (tag >= q->q_depth) {
311 		nbd_err("%s %d: tag is too big %d\n", __func__,
312 				__LINE__, tag);
313 		goto fail;
314 	}
315 
316 	if (hwq != q->q_id) {
317 		nbd_err("%s %d: hwq is too big %d\n", __func__,
318 				__LINE__, hwq);
319 		goto fail;
320 	}
321 
322 	data = ublksrv_queue_get_io_data(q, tag);
323 	io = __ublk_get_io_tgt_data(data);
324 	nbd_data = io_tgt_to_nbd_data(io);
325 	if (nbd_data->cmd_cookie != nbd_handle_to_cookie(handle)) {
326 		nbd_err("%s %d: cookie not match tag %d: %x %lx\n",
327 				__func__, __LINE__, data->tag,
328 				nbd_data->cmd_cookie, handle);
329 		goto fail;
330 	}
331 
332 	ublk_op = ublksrv_get_op(data->iod);
333 	if (ublk_op == UBLK_IO_OP_READ) {
334 		*io_data = data;
335 		return 1;
336 	} else {
337 		int err = ntohl(q_data->reply.error);
338 		struct io_uring_cqe fake_cqe;
339 
340 		NBD_IO_DBG("%s: got write reply, tag %d res %d\n",
341 					__func__, data->tag, err);
342 
343 		if (err) {
344 			fake_cqe.res = -EIO;
345 		} else {
346 			if (ublk_op == UBLK_IO_OP_WRITE)
347 				fake_cqe.res = data->iod->nr_sectors << 9;
348 			else
349 				fake_cqe.res = 0;
350 		}
351 
352 		io->tgt_io_cqe = &fake_cqe;
353 		io->co.resume();
354 		return 0;
355 	}
356 fail:
357 	return ret;
358 }
359 
__nbd_resume_read_req(const struct ublk_io_data * data,const struct io_uring_cqe * cqe,unsigned done)360 static void __nbd_resume_read_req(const struct ublk_io_data *data,
361 		const struct io_uring_cqe *cqe, unsigned done)
362 {
363 	struct ublk_io_tgt *io = __ublk_get_io_tgt_data(data);
364 	struct nbd_io_data *nbd_data = io_tgt_to_nbd_data(io);
365 
366 	nbd_data->done = done;
367 	io->tgt_io_cqe = cqe;
368 	io->co.resume();
369 }
370 
371 /* recv completion drives the whole IO flow */
nbd_start_recv(const struct ublksrv_queue * q,struct nbd_io_data * nbd_data,void * buf,int len,bool reply,unsigned done)372 static inline int nbd_start_recv(const struct ublksrv_queue *q,
373 		struct nbd_io_data *nbd_data, void *buf, int len,
374 		bool reply, unsigned done)
375 {
376 	struct nbd_queue_data *q_data = nbd_get_queue_data(q);
377 	struct io_uring_sqe *sqe = io_uring_get_sqe(q->ring_ptr);
378 	unsigned int op = reply ? NBD_OP_READ_REPLY : UBLK_IO_OP_READ;
379 	unsigned int tag = q->q_depth;	//recv always use this extra tag
380 
381 	if (!sqe) {
382 		nbd_err("%s: get sqe failed, len %d reply %d done %d\n",
383 				__func__, len, reply, done);
384 		return -ENOMEM;
385 	}
386 
387 	nbd_data->done = done;
388 	io_uring_prep_recv(sqe, q->q_id + 1, (char *)buf + done, len - done, MSG_WAITALL);
389 	io_uring_sqe_set_flags(sqe, IOSQE_FIXED_FILE);
390 
391 	/* bit63 marks us as tgt io */
392 	sqe->user_data = build_user_data(tag, op, 0, 1);
393 
394 	ublk_assert(q_data->in_flight_ios);
395 	NBD_IO_DBG("%s: q_inflight %d queue recv %s"
396 				"(qid %d tag %u, target: %d, user_data %llx)\n",
397 			__func__, q_data->in_flight_ios, reply ? "reply" : "io",
398 			q->q_id, tag, 1, sqe->user_data);
399 
400 	return 0;
401 }
402 
403 /*
404  * Submit recv worker for reading nbd reply or read io data
405  *
406  * return value:
407  *
408  * 0 : queued via io_uring
409  * len : data read already, must be same with len
410  * < 0 : failure
411  */
nbd_do_recv(const struct ublksrv_queue * q,struct nbd_io_data * nbd_data,int fd,void * buf,unsigned len)412 static int nbd_do_recv(const struct ublksrv_queue *q,
413 		struct nbd_io_data *nbd_data, int fd,
414 		void *buf, unsigned len)
415 {
416 	unsigned msg_flags = MSG_DONTWAIT | MSG_WAITALL;
417 	int i = 0, done = 0;
418 	const int loops = len < 512 ? 16 : 32;
419 	int ret;
420 
421 	while (i++ < loops && done < len) {
422 		ret = recv(fd, (char *)buf + done, len - done, msg_flags);
423 		if (ret > 0)
424 			done += ret;
425 
426 		if (!done)
427 			break;
428 	}
429 	if (done == len)
430 		return done;
431 
432 	NBD_IO_DBG("%s: sync(non-blocking) recv %d(%s)/%d/%u\n",
433 			__func__, ret, strerror(errno), done, len);
434 	ret = nbd_start_recv(q, nbd_data, buf, len, len < 512, done);
435 
436 	return ret;
437 }
438 
439 /*
440  * Every request will be responded with one reply, and we complete the
441  * request after the reply is received.
442  *
443  * Read request is a bit special, since the data returned are received
444  * with the reply together, so we have to handle read IO data here.
445  */
__nbd_handle_recv(const struct ublksrv_queue * q,const struct ublk_io_data * data,struct ublk_io_tgt * io)446 static co_io_job __nbd_handle_recv(const struct ublksrv_queue *q,
447 		const struct ublk_io_data *data, struct ublk_io_tgt *io)
448 {
449 	struct nbd_io_data *nbd_data = io_tgt_to_nbd_data(io);
450 	struct nbd_queue_data *q_data = nbd_get_queue_data(q);
451 	int fd = q->dev->tgt.fds[q->q_id + 1];
452 	unsigned int len;
453 	u64 cqe_buf[2] = {0};
454 	struct io_uring_cqe *fake_cqe = (struct io_uring_cqe *)cqe_buf;
455 
456 	q_data->recv_started = 1;
457 
458 	while (q_data->in_flight_ios > 0) {
459 		const struct ublk_io_data *io_data = NULL;
460 		int ret;
461 read_reply:
462 		ret = nbd_do_recv(q, nbd_data, fd, &q_data->reply,
463 				sizeof(q_data->reply));
464 		if (ret == sizeof(q_data->reply)) {
465 			nbd_data->done = ret;
466 			fake_cqe->res = 0;
467 			io->tgt_io_cqe = fake_cqe;
468 			goto handle_recv;
469 		} else if (ret < 0)
470 			break;
471 
472 		co_await__suspend_always(data->tag);
473 		if (io->tgt_io_cqe->res == -EAGAIN)
474 			goto read_reply;
475 
476 handle_recv:
477 		ret = nbd_handle_recv_reply(q, nbd_data, io->tgt_io_cqe, &io_data);
478 		if (ret < 0)
479 			break;
480 		if (!ret)
481 			continue;
482 read_io:
483 		ublk_assert(io_data != NULL);
484 
485 		len = io_data->iod->nr_sectors << 9;
486 		ret = nbd_do_recv(q, nbd_data, fd, (void *)io_data->iod->addr, len);
487 		if (ret == len) {
488 			nbd_data->done = ret;
489 			fake_cqe->res = 0;
490 			io->tgt_io_cqe = fake_cqe;
491 			goto handle_read_io;
492 		} else if (ret < 0)
493 			break;
494 
495 		/* still wait on recv coroutine context */
496 		co_await__suspend_always(data->tag);
497 
498 		ret = io->tgt_io_cqe->res;
499 		if (ret == -EAGAIN)
500 			goto read_io;
501 
502 handle_read_io:
503 		__nbd_resume_read_req(io_data, io->tgt_io_cqe, nbd_data->done);
504 	}
505 	q_data->recv_started = 0;
506 	co_return;
507 }
508 
nbd_handle_io_async(const struct ublksrv_queue * q,const struct ublk_io_data * data)509 static int nbd_handle_io_async(const struct ublksrv_queue *q,
510 		const struct ublk_io_data *data)
511 {
512 	struct ublk_io_tgt *io = __ublk_get_io_tgt_data(data);
513 	struct nbd_queue_data *q_data = nbd_get_queue_data(q);
514 
515 	/*
516 	 * Put the io in the queue and submit them after
517 	 * the current chain becomes idle.
518 	 */
519 	if (q_data->send_sqe_chain_busy)
520 		q_data->next_chain.push_back(data);
521 	else
522 		io->co = __nbd_handle_io_async(q, data, io);
523 
524 	return 0;
525 }
526 
527 /*
528  * Don't touch @data because the pointed ublk io request may have been
529  * completed before this send cqe is handled. And ublk io request completion
530  * is triggered by reply received from nbd server.
531  */
nbd_send_req_done(const struct ublksrv_queue * q,const struct ublk_io_data * data,const struct io_uring_cqe * cqe)532 static void nbd_send_req_done(const struct ublksrv_queue *q,
533 		const struct ublk_io_data *data,
534 		const struct io_uring_cqe *cqe)
535 {
536 	struct nbd_queue_data *q_data = nbd_get_queue_data(q);
537 	unsigned ublk_op = user_data_to_op(cqe->user_data);
538 	int tag = user_data_to_tag(cqe->user_data);
539 	unsigned int nr_sects = user_data_to_tgt_data(cqe->user_data);
540 	unsigned total;
541 
542 	/* nothing to do for send_zc notification */
543 	if (cqe->flags & IORING_CQE_F_NOTIF)
544 		return;
545 
546 	ublk_assert(q_data->chained_send_ios);
547 	if (!--q_data->chained_send_ios) {
548 		if (q_data->send_sqe_chain_busy)
549 			q_data->send_sqe_chain_busy = 0;
550 	}
551 
552 	/*
553 	 * In case of failure, how to tell recv work to handle the
554 	 * request? So far just warn it, maybe nbd server will
555 	 * send one err reply.
556 	 */
557 	if (cqe->res < 0)
558 		nbd_err("%s: tag %d cqe fail %d %llx\n",
559 				__func__, tag, cqe->res, cqe->user_data);
560 
561 	/*
562 	 * We have set MSG_WAITALL, so short send shouldn't be possible,
563 	 * but just warn in case of io_uring regression
564 	 */
565 	if (ublk_op == UBLK_IO_OP_WRITE)
566 		total = sizeof(nbd_request) + (nr_sects << 9);
567 	else
568 		total = sizeof(nbd_request);
569 	if (cqe->res < total)
570 		nbd_err("%s: short send/receive tag %d op %d %llx, len %u written %u cqe flags %x\n",
571 				__func__, tag, ublk_op, cqe->user_data,
572 				total, cqe->res, cqe->flags);
573 }
574 
nbd_tgt_io_done(const struct ublksrv_queue * q,const struct ublk_io_data * data,const struct io_uring_cqe * cqe)575 static void nbd_tgt_io_done(const struct ublksrv_queue *q,
576 		const struct ublk_io_data *data,
577 		const struct io_uring_cqe *cqe)
578 {
579 	int tag = user_data_to_tag(cqe->user_data);
580 
581 	ublk_assert(tag == data->tag);
582 #if NBD_DEBUG_CQE == 1
583 	struct nbd_queue_data *q_data = nbd_get_queue_data(q);
584 	nbd_err("%s: tag %d queue(ios %u %u) cqe(res %d flags %x user data %llx)\n",
585 			__func__, tag,
586 			q_data->in_flight_ios, q_data->chained_send_ios,
587 			cqe->res, cqe->flags, cqe->user_data);
588 #endif
589 
590 	/* both reply and read io is done in recv io coroutine */
591 	if (is_recv_io(q, data)) {
592 		struct nbd_queue_data *q_data = nbd_get_queue_data(q);
593 
594 		/*
595 		 * Delay recv data handling into nbd_handle_io_bg(), so
596 		 * any recv sqe won't cut in the send sqe chain.
597 		 *
598 		 * So far, recv is strictly serialized, so saving
599 		 * this single cqe works; in the future, if
600 		 * recv becomes batched, here has to be fixed
601 		 */
602 		q_data->recv_cqe = *cqe;
603 		q_data->need_handle_recv = 1;
604 		return;
605 	}
606 
607 	nbd_send_req_done(q, data, cqe);
608 }
609 
nbd_handle_send_bg(const struct ublksrv_queue * q,struct nbd_queue_data * q_data)610 static void nbd_handle_send_bg(const struct ublksrv_queue *q,
611 		struct nbd_queue_data *q_data)
612 {
613 	if (!q_data->send_sqe_chain_busy) {
614 		std::vector<const struct ublk_io_data *> &ios =
615 			q_data->next_chain;
616 
617 		for (auto it = ios.cbegin(); it != ios.cend(); ++it) {
618 			auto data = *it;
619 			struct ublk_io_tgt *io = __ublk_get_io_tgt_data(data);
620 
621 			ublk_assert(data->tag < q->q_depth);
622 			io->co = __nbd_handle_io_async(q, data, io);
623 		}
624 
625 		ios.clear();
626 
627 		if (q_data->chained_send_ios && !q_data->send_sqe_chain_busy)
628 			q_data->send_sqe_chain_busy = 1;
629 	}
630 	if (q_data->last_send_sqe) {
631 		q_data->last_send_sqe->flags &= ~IOSQE_IO_LINK;
632 		q_data->last_send_sqe = NULL;
633 	}
634 }
635 
nbd_handle_recv_bg(const struct ublksrv_queue * q,struct nbd_queue_data * q_data)636 static void nbd_handle_recv_bg(const struct ublksrv_queue *q,
637 		struct nbd_queue_data *q_data)
638 {
639 	if (q_data->in_flight_ios && !q_data->recv_started) {
640 		const struct ublk_io_data *data =
641 			ublksrv_queue_get_io_data(q, q->q_depth);
642 		struct ublk_io_tgt *io = __ublk_get_io_tgt_data(data);
643 
644 		ublk_assert(data->tag == q->q_depth);
645 
646 		io->co = __nbd_handle_recv(q, data, io);
647 	}
648 
649 	/* reply or read io data is comming */
650 	if (q_data->need_handle_recv) {
651 		const struct ublk_io_data *data =
652 			ublksrv_queue_get_io_data(q, q->q_depth);
653 		struct ublk_io_tgt *io = __ublk_get_io_tgt_data(data);
654 
655 		ublk_assert(data->tag == q->q_depth);
656 
657 		io->tgt_io_cqe = &q_data->recv_cqe;
658 		io->co.resume();
659 		q_data->need_handle_recv = 0;
660 	}
661 }
662 
__nbd_handle_io_bg(const struct ublksrv_queue * q,struct nbd_queue_data * q_data)663 static void __nbd_handle_io_bg(const struct ublksrv_queue *q,
664 		struct nbd_queue_data *q_data)
665 {
666 	nbd_handle_send_bg(q, q_data);
667 
668 	/* stop to queue send now since we need to recv now */
669 	if (q_data->chained_send_ios && !q_data->send_sqe_chain_busy)
670 		q_data->send_sqe_chain_busy = 1;
671 
672 	/*
673 	 * recv SQE can't cut in send SQE chain, so it has to be
674 	 * moved here after the send SQE chain is built
675 	 *
676 	 * Also queuing ublk io command may allocate sqe too.
677 	 */
678 	nbd_handle_recv_bg(q, q_data);
679 }
680 
681 /*
682  * The initial send request batch should be in same send sqe batch, before
683  * this batch isn't done, all new send requests are staggered into next_chain
684  * which will be flushed after the current chain is completed.
685  *
686  * Also recv work is always started after send requests are queued, because
687  * the recv sqe may cut the send sqe chain, and the ublk io cmd sqe may cut
688  * the send sqe chain too.
689  *
690  * This is why nbd_handle_recv_bg() always follows nbd_handle_send_bg().
691  */
nbd_handle_io_bg(const struct ublksrv_queue * q,int nr_queued_io)692 static void nbd_handle_io_bg(const struct ublksrv_queue *q, int nr_queued_io)
693 {
694 	struct nbd_queue_data *q_data = nbd_get_queue_data(q);
695 
696 	NBD_IO_DBG("%s: pending ios %d/%d chain_busy %d next_chain %ld recv(%d) sqes %u\n",
697 				__func__, q_data->in_flight_ios,
698 				q_data->chained_send_ios,
699 				q_data->send_sqe_chain_busy,
700 				q_data->next_chain.size(),
701 				q_data->recv_started,
702 				nr_queued_io);
703 
704 	__nbd_handle_io_bg(q, q_data);
705 
706 	/*
707 	 * io can be completed in recv work since we do sync recv, so
708 	 * io could be completed before the send seq's cqe is returned.
709 	 *
710 	 * When this happens, simply clear chain busy, so that we can
711 	 * queue more requests.
712 	 */
713 	if (!q_data->in_flight_ios && q_data->send_sqe_chain_busy) {
714 		/* all inflight ios are done, so it is safe to send request */
715 		q_data->send_sqe_chain_busy = 0;
716 
717 		if (!q_data->next_chain.empty())
718 			__nbd_handle_io_bg(q, q_data);
719 	}
720 
721 	if (!q_data->recv_started && !q_data->send_sqe_chain_busy &&
722 			!q_data->next_chain.empty())
723 		nbd_err("%s: hang risk: pending ios %d/%d\n",
724 				__func__, q_data->in_flight_ios,
725 				q_data->chained_send_ios);
726 }
727 
nbd_usage_for_add(void)728 static void nbd_usage_for_add(void)
729 {
730 	printf("           nbd: --host=$HOST [--port=$PORT] | --unix=$UNIX_PATH\n");
731 }
732 
nbd_init_queue(const struct ublksrv_queue * q,void ** queue_data_ptr)733 static int nbd_init_queue(const struct ublksrv_queue *q,
734 		void **queue_data_ptr)
735 {
736 	struct nbd_queue_data *data =
737 		(struct nbd_queue_data *)calloc(sizeof(*data), 1);
738 	struct nbd_tgt_data *ddata = (struct nbd_tgt_data*)q->dev->tgt.tgt_data;
739 
740 	if (!data)
741 		return -ENOMEM;
742 
743 	data->next_chain.clear();
744 	data->use_send_zc = ddata->unix_sock ? false : ddata->use_send_zc;
745 	data->use_unix_sock = ddata->unix_sock;
746 	data->recv_started = 0;
747 	//nbd_err("%s send zc %d\n", __func__, data->use_send_zc);
748 
749 	*queue_data_ptr = (void *)data;
750 	return 0;
751 }
752 
nbd_deinit_queue(const struct ublksrv_queue * q)753 static void nbd_deinit_queue(const struct ublksrv_queue *q)
754 {
755 	struct nbd_queue_data *data = nbd_get_queue_data(q);
756 
757 	free(data);
758 }
759 
nbd_deinit_tgt(const struct ublksrv_dev * dev)760 static void nbd_deinit_tgt(const struct ublksrv_dev *dev)
761 {
762 	const struct ublksrv_tgt_info *tgt = &dev->tgt;
763 	const struct ublksrv_ctrl_dev_info *info =
764 		ublksrv_ctrl_get_dev_info(ublksrv_get_ctrl_dev(dev));
765 	int i;
766 
767 	free(tgt->tgt_data);
768 
769 	for (i = 0; i < info->nr_hw_queues; i++) {
770 		int fd = tgt->fds[i + 1];
771 
772 		shutdown(fd, SHUT_RDWR);
773 		close(fd);
774 	}
775 }
776 
nbd_setup_tgt(struct ublksrv_dev * dev,int type,bool recovery,const char * jbuf,uint16_t * flags)777 static int nbd_setup_tgt(struct ublksrv_dev *dev, int type, bool recovery,
778 		const char *jbuf, uint16_t *flags)
779 {
780 	struct ublksrv_tgt_info *tgt = &dev->tgt;
781 	const struct ublksrv_ctrl_dev_info *info =
782 		ublksrv_ctrl_get_dev_info(ublksrv_get_ctrl_dev(dev));
783 	int i;
784 	struct nbd_tgt_data *data = (struct nbd_tgt_data *)dev->tgt.tgt_data;
785 
786 	const char *port = NBD_DEFAULT_PORT;
787 	uint16_t needed_flags = 0;
788 	uint32_t cflags = NBD_FLAG_C_FIXED_NEWSTYLE;
789 
790 	char host_name[NBD_MAX_NAME] = {0};
791 	char exp_name[NBD_MAX_NAME] = {0};
792 	char unix_path[NBD_MAX_NAME] = {0};
793 	u64 size64 = 0;
794 	bool can_opt_go = true;
795 
796 	/* todo: support tls */
797 	char *certfile = NULL;
798 	char *keyfile = NULL;
799 	char *cacertfile = NULL;
800 	char *tlshostname = NULL;
801 	bool tls = false;
802 
803 	long send_zc = 0;
804 
805 
806 	if (info->flags & UBLK_F_USER_COPY)
807 		return -EINVAL;
808 
809 	ublk_assert(jbuf);
810 	ublk_assert(type == UBLKSRV_TGT_TYPE_NBD);
811 	ublk_assert(!recovery || info->state == UBLK_S_DEV_QUIESCED);
812 
813 	ublksrv_json_read_target_str_info(jbuf, NBD_MAX_NAME, "host",
814 			host_name);
815 	ublksrv_json_read_target_str_info(jbuf, NBD_MAX_NAME, "unix",
816 			unix_path);
817 	ublksrv_json_read_target_str_info(jbuf, NBD_MAX_NAME, "export_name",
818 			exp_name);
819 	ublksrv_json_read_target_ulong_info(jbuf, "send_zc", &send_zc);
820 
821 	NBD_HS_DBG("%s: host %s unix %s exp_name %s send_zc\n", __func__,
822 			host_name, unix_path, exp_name, send_zc);
823 	for (i = 0; i < info->nr_hw_queues; i++) {
824 		int sock;
825 		unsigned int opts = 0;
826 
827 		if (strlen(unix_path))
828 			sock = openunix(unix_path);
829 		else
830 			sock = opennet(host_name, port, false);
831 
832 		if (sock >= 0)
833 			negotiate(&sock, &size64, flags, exp_name,
834 					needed_flags, cflags, opts, certfile,
835 					keyfile, cacertfile, tlshostname, tls,
836 					can_opt_go);
837 		else
838 			ublk_err("%s: open socket failed %d\n", __func__, sock);
839 
840 		tgt->fds[i + 1] = sock;
841 		NBD_HS_DBG("%s:qid %d %s-%s size %luMB flags %x sock %d\n",
842 				__func__, i, host_name, port,
843 				size64 >> 20, *flags, sock);
844 	}
845 
846 	tgt->dev_size = size64;
847 
848 	/*
849 	 * one extra slot for receiving reply & read io, so
850 	 * the preferred queue depth should be 127 or 255,
851 	 * then half of SQ memory consumption can be saved
852 	 * especially we use IORING_SETUP_SQE128
853 	 */
854 	tgt->tgt_ring_depth = info->queue_depth + 1;
855 	tgt->nr_fds = info->nr_hw_queues;
856 	tgt->extra_ios = 1;	//one extra slot for receiving nbd reply
857 	data->unix_sock = strlen(unix_path) > 0 ? true : false;
858 	data->use_send_zc = !!send_zc;
859 
860 	tgt->io_data_size = sizeof(struct ublk_io_tgt) +
861 		sizeof(struct nbd_io_data);
862 
863 	ublksrv_dev_set_cq_depth(dev, 2 * tgt->tgt_ring_depth);
864 
865 	return 0;
866 }
867 
nbd_parse_flags(struct ublk_params * p,uint16_t flags,uint32_t bs)868 static void nbd_parse_flags(struct ublk_params *p, uint16_t flags, uint32_t bs)
869 {
870 	__u32 attrs = 0;
871 
872 	NBD_HS_DBG("%s: negotiated flags %x\n", __func__, flags);
873 
874 	if (flags & NBD_FLAG_READ_ONLY)
875 		attrs |= UBLK_ATTR_READ_ONLY;
876 	if (flags & NBD_FLAG_SEND_FLUSH) {
877 		if (flags & NBD_FLAG_SEND_FUA)
878 			attrs |= UBLK_ATTR_FUA;
879 		else
880 			attrs |= UBLK_ATTR_VOLATILE_CACHE;
881 	}
882 
883 	p->basic.attrs |= attrs;
884 
885 	if (flags & NBD_FLAG_SEND_TRIM) {
886 		p->discard.discard_granularity = bs;
887 		p->discard.max_discard_sectors = UINT_MAX >> 9;
888 		p->discard.max_discard_segments	= 1;
889 		p->types |= UBLK_PARAM_TYPE_DISCARD;
890         }
891 }
892 
nbd_init_tgt(struct ublksrv_dev * dev,int type,int argc,char * argv[])893 static int nbd_init_tgt(struct ublksrv_dev *dev, int type, int argc,
894 		char *argv[])
895 {
896 	int send_zc = 0;
897 	int read_only = 0;
898 	static const struct option nbd_longopts[] = {
899 		{ "host",	required_argument, 0, 0},
900 		{ "unix",	required_argument, 0, 0},
901 		{ "export_name",	required_argument, 0, 0},
902 		{ "send_zc",  0,  &send_zc, 1},
903 		{ "read_only",  0,  &read_only, 1},
904 		{ NULL }
905 	};
906 	struct ublksrv_tgt_info *tgt = &dev->tgt;
907 	const struct ublksrv_ctrl_dev_info *info =
908 		ublksrv_ctrl_get_dev_info(ublksrv_get_ctrl_dev(dev));
909 	int jbuf_size;
910 	char *jbuf = ublksrv_tgt_return_json_buf(dev, &jbuf_size);
911 	struct ublksrv_tgt_base_json tgt_json = {
912 		.type = type,
913 	};
914 	int opt;
915 	int option_index = 0;
916 	unsigned char bs_shift = 9;
917 	const char *host_name = NULL;
918 	const char *unix_path = NULL;
919 	const char *exp_name = NULL;
920 	uint16_t flags = 0;
921 	int ret;
922 
923 	strcpy(tgt_json.name, "nbd");
924 
925 	if (type != UBLKSRV_TGT_TYPE_NBD)
926 		return -1;
927 
928 	while ((opt = getopt_long(argc, argv, "-:f:",
929 				  nbd_longopts, &option_index)) != -1) {
930 		if (opt < 0)
931 			break;
932 		if (opt > 0)
933 			continue;
934 
935 		if (!strcmp(nbd_longopts[option_index].name, "host"))
936 		      host_name = optarg;
937 		if (!strcmp(nbd_longopts[option_index].name, "unix"))
938 		      unix_path = optarg;
939 		if (!strcmp(nbd_longopts[option_index].name, "export_name"))
940 			exp_name = optarg;
941 	}
942 
943 #ifndef HAVE_LIBURING_SEND_ZC
944 	if (send_zc)
945 		return -EINVAL;
946 #endif
947 
948 	ublk_json_write_dev_info(dev, &jbuf, &jbuf_size);
949 	ublk_json_write_tgt_str(dev, &jbuf, &jbuf_size, "host", host_name);
950 	ublk_json_write_tgt_str(dev, &jbuf, &jbuf_size, "unix", unix_path);
951 	ublk_json_write_tgt_str(dev, &jbuf, &jbuf_size, "export_name", exp_name);
952 	ublk_json_write_tgt_long(dev, &jbuf, &jbuf_size, "send_zc", send_zc);
953 
954 	tgt->tgt_data = calloc(sizeof(struct nbd_tgt_data), 1);
955 
956 	ret = nbd_setup_tgt(dev, type, false, jbuf, &flags);
957 	if (ret)
958 		return ret;
959 
960 	tgt_json.dev_size = tgt->dev_size;
961 	ublk_json_write_target_base(dev, &jbuf, &jbuf_size, &tgt_json);
962 
963 	struct ublk_params p = {
964 		.types = UBLK_PARAM_TYPE_BASIC,
965 		.basic = {
966 			.attrs = read_only ? UBLK_ATTR_READ_ONLY : 0U,
967 			.logical_bs_shift	= bs_shift,
968 			.physical_bs_shift	= 12,
969 			.io_opt_shift		= 12,
970 			.io_min_shift		= bs_shift,
971 			.max_sectors		= info->max_io_buf_bytes >> 9,
972 			.dev_sectors		= tgt->dev_size >> 9,
973 		},
974 	};
975 
976 	nbd_parse_flags(&p, flags, 1U << bs_shift);
977 	ublk_json_write_params(dev, &jbuf, &jbuf_size, &p);
978 
979 	return 0;
980 }
981 
nbd_recovery_tgt(struct ublksrv_dev * dev,int type)982 static int nbd_recovery_tgt(struct ublksrv_dev *dev, int type)
983 {
984 	const struct ublksrv_ctrl_dev *cdev = ublksrv_get_ctrl_dev(dev);
985 	const char *jbuf = ublksrv_ctrl_get_recovery_jbuf(cdev);
986 	uint16_t flags = 0;
987 
988 	dev->tgt.tgt_data = calloc(sizeof(struct nbd_tgt_data), 1);
989 
990 	return nbd_setup_tgt(dev, type, true, jbuf, &flags);
991 }
992 
993 struct ublksrv_tgt_type  nbd_tgt_type = {
994 	.handle_io_async = nbd_handle_io_async,
995 	.tgt_io_done = nbd_tgt_io_done,
996 	.handle_io_background = nbd_handle_io_bg,
997 	.usage_for_add	=  nbd_usage_for_add,
998 	.init_tgt = nbd_init_tgt,
999 	.deinit_tgt = nbd_deinit_tgt,
1000 	.type	= UBLKSRV_TGT_TYPE_NBD,
1001 	.name	=  "nbd",
1002 	.recovery_tgt = nbd_recovery_tgt,
1003 	.init_queue = nbd_init_queue,
1004 	.deinit_queue = nbd_deinit_queue,
1005 };
1006 
1007 static void tgt_nbd_init() __attribute__((constructor));
1008 
tgt_nbd_init(void)1009 static void tgt_nbd_init(void)
1010 {
1011 	ublksrv_register_tgt_type(&nbd_tgt_type);
1012 }
1013 
1014