1 // SPDX-License-Identifier: GPL-2.0
2
3 #include <config.h>
4 #include <vector>
5 #include "ublksrv_tgt.h"
6 #include "ublksrv_tgt_endian.h"
7 #include "cliserv.h"
8 #include "nbd.h"
9
10 //#define NBD_DEBUG_HANDSHAKE 1
11 //#define NBD_DEBUG_IO 1
12 //#define NBD_DEBUG_CQE 1
13
14 #ifdef NBD_DEBUG_IO
15 #define NBD_IO_DBG ublk_err
16 #else
17 #define NBD_IO_DBG(...)
18 #endif
19
20 #ifdef NBD_DEBUG_HANDSHAKE
21 #define NBD_HS_DBG ublk_err
22 #else
23 #define NBD_HS_DBG(...)
24 #endif
25
26 #define nbd_err ublk_err
27
28 #define NBD_MAX_NAME 512
29
30 #define NBD_OP_READ_REQ 0x80
31 #define NBD_OP_READ_REPLY 0x81
32
33 struct nbd_tgt_data {
34 bool unix_sock;
35 bool use_send_zc;
36 };
37
38 #ifndef HAVE_LIBURING_SEND_ZC
39 #define io_uring_prep_sendmsg_zc io_uring_prep_sendmsg
40 #define IORING_CQE_F_NOTIF (1U << 3)
41 #endif
42
43 struct nbd_queue_data {
44 unsigned short in_flight_ios;
45
46 unsigned short recv_started:1;
47 unsigned short use_send_zc:1;
48 unsigned short use_unix_sock:1;
49 unsigned short need_handle_recv:1;
50 unsigned short send_sqe_chain_busy:1;
51
52 unsigned int chained_send_ios;
53
54 /*
55 * When the current chain is busy, staggering send ios
56 * into this queue(next_chain). After the current chain
57 * is consumed, submit all send ios in 'next_chain' as
58 * one whole batch.
59 */
60 std::vector <const struct ublk_io_data *> next_chain;
61
62 struct io_uring_sqe *last_send_sqe;
63 struct nbd_reply reply;
64 struct io_uring_cqe recv_cqe;
65 };
66
67 struct nbd_io_data {
68 unsigned int cmd_cookie;
69 unsigned int done; //for handling partial recv
70 };
71
72 static inline struct nbd_queue_data *
nbd_get_queue_data(const struct ublksrv_queue * q)73 nbd_get_queue_data(const struct ublksrv_queue *q)
74 {
75 return (struct nbd_queue_data *)q->private_data;
76 }
77
78 static inline struct nbd_io_data *
io_tgt_to_nbd_data(const struct ublk_io_tgt * io)79 io_tgt_to_nbd_data(const struct ublk_io_tgt *io)
80 {
81 return (struct nbd_io_data *)(io + 1);
82 }
83
req_to_nbd_cmd_type(const struct ublksrv_io_desc * iod)84 static int req_to_nbd_cmd_type(const struct ublksrv_io_desc *iod)
85 {
86 switch (ublksrv_get_op(iod)) {
87 case UBLK_IO_OP_DISCARD:
88 return NBD_CMD_TRIM;
89 case UBLK_IO_OP_FLUSH:
90 return NBD_CMD_FLUSH;
91 case UBLK_IO_OP_WRITE:
92 return NBD_CMD_WRITE;
93 case UBLK_IO_OP_READ:
94 return NBD_CMD_READ;
95 default:
96 return -1;
97 }
98 }
99
is_recv_io(const struct ublksrv_queue * q,const struct ublk_io_data * data)100 static inline bool is_recv_io(const struct ublksrv_queue *q,
101 const struct ublk_io_data *data)
102 {
103 return data->tag >= q->q_depth;
104 }
105
106 #define NBD_COOKIE_BITS 32
nbd_cmd_handle(const struct ublksrv_queue * q,const struct ublk_io_data * data,const struct nbd_io_data * nbd_data)107 static inline u64 nbd_cmd_handle(const struct ublksrv_queue *q,
108 const struct ublk_io_data *data,
109 const struct nbd_io_data *nbd_data)
110 {
111 u64 cookie = nbd_data->cmd_cookie;
112
113 return (cookie << NBD_COOKIE_BITS) | ublk_unique_tag(q->q_id, data->tag);
114 }
115
nbd_handle_to_cookie(u64 handle)116 static inline u32 nbd_handle_to_cookie(u64 handle)
117 {
118 return (u32)(handle >> NBD_COOKIE_BITS);
119 }
120
nbd_handle_to_tag(u64 handle)121 static inline u32 nbd_handle_to_tag(u64 handle)
122 {
123 return (u32)handle;
124 }
125
__nbd_build_req(const struct ublksrv_queue * q,const struct ublk_io_data * data,const struct nbd_io_data * nbd_data,u32 type,struct nbd_request * req)126 static inline void __nbd_build_req(const struct ublksrv_queue *q,
127 const struct ublk_io_data *data,
128 const struct nbd_io_data *nbd_data,
129 u32 type, struct nbd_request *req)
130 {
131 u32 nbd_cmd_flags = 0;
132 u64 handle;
133
134 if (data->iod->op_flags & UBLK_IO_F_FUA)
135 nbd_cmd_flags |= NBD_CMD_FLAG_FUA;
136
137 req->type = htonl(type | nbd_cmd_flags);
138
139 if (type != NBD_CMD_FLUSH) {
140 req->from = cpu_to_be64((u64)data->iod->start_sector << 9);
141 req->len = htonl(data->iod->nr_sectors << 9);
142 }
143
144 handle = nbd_cmd_handle(q, data, nbd_data);
145 memcpy(req->handle, &handle, sizeof(handle));
146 }
147
nbd_queue_req(const struct ublksrv_queue * q,const struct ublk_io_data * data,const struct nbd_request * req,const struct msghdr * msg)148 static int nbd_queue_req(const struct ublksrv_queue *q,
149 const struct ublk_io_data *data,
150 const struct nbd_request *req, const struct msghdr *msg)
151 {
152 struct nbd_queue_data *q_data = nbd_get_queue_data(q);
153 const struct ublksrv_io_desc *iod = data->iod;
154 struct io_uring_sqe *sqe = io_uring_get_sqe(q->ring_ptr);
155 unsigned ublk_op = ublksrv_get_op(iod);
156 unsigned msg_flags = MSG_NOSIGNAL;
157
158 if (!sqe) {
159 nbd_err("%s: get sqe failed, tag %d op %d\n",
160 __func__, data->tag, ublk_op);
161 return -ENOMEM;
162 }
163
164 /*
165 * Always set WAITALL, so io_uring will handle retry in case of
166 * short send, see below link:
167 *
168 * https://lore.kernel.org/io-uring/[email protected]/
169 *
170 * note: It was added for recv* in 5.18 and send* in 5.19.
171 */
172 msg_flags |= MSG_WAITALL;
173
174 if (ublk_op != UBLK_IO_OP_WRITE) {
175 io_uring_prep_send(sqe, q->q_id + 1, req,
176 sizeof(*req), msg_flags);
177 } else {
178 if (q_data->use_send_zc)
179 io_uring_prep_sendmsg_zc(sqe, q->q_id + 1, msg,
180 msg_flags);
181 else
182 io_uring_prep_sendmsg(sqe, q->q_id + 1, msg,
183 msg_flags);
184 }
185
186 if (ublk_op == UBLK_IO_OP_READ)
187 ublk_op = NBD_OP_READ_REQ;
188
189 /*
190 * The encoded nr_sectors should only be used for validating write req
191 * when its cqe is completed, since iod data isn't available at that time
192 * because request can be reused.
193 */
194 sqe->user_data = build_user_data(data->tag, ublk_op, ublk_op ==
195 UBLK_IO_OP_WRITE ? data->iod->nr_sectors : 0, 1);
196 io_uring_sqe_set_flags(sqe, /*IOSQE_CQE_SKIP_SUCCESS |*/
197 IOSQE_FIXED_FILE | IOSQE_IO_LINK);
198 q_data->last_send_sqe = sqe;
199 q_data->chained_send_ios += 1;
200
201 NBD_IO_DBG("%s: queue io op %d(%llu %x %llx) ios(%u %u)"
202 " (qid %d tag %u, cmd_op %u target: %d, user_data %llx)\n",
203 __func__, ublk_op, data->iod->start_sector,
204 data->iod->nr_sectors, sqe->addr,
205 q_data->in_flight_ios, q_data->chained_send_ios,
206 q->q_id, data->tag, ublk_op, 1, sqe->user_data);
207
208 return 1;
209 }
210
__nbd_handle_io_async(const struct ublksrv_queue * q,const struct ublk_io_data * data,struct ublk_io_tgt * io)211 static co_io_job __nbd_handle_io_async(const struct ublksrv_queue *q,
212 const struct ublk_io_data *data, struct ublk_io_tgt *io)
213 {
214 int ret = -EIO;
215 struct nbd_request req = {.magic = htonl(NBD_REQUEST_MAGIC),};
216 struct nbd_queue_data *q_data = nbd_get_queue_data(q);
217 struct nbd_io_data *nbd_data = io_tgt_to_nbd_data(io);
218 int type = req_to_nbd_cmd_type(data->iod);
219 struct iovec iov[2] = {
220 [0] = {
221 .iov_base = (void *)&req,
222 .iov_len = sizeof(req),
223 },
224 [1] = {
225 .iov_base = (void *)data->iod->addr,
226 .iov_len = data->iod->nr_sectors << 9,
227 },
228 };
229 struct msghdr msg = {
230 .msg_iov = iov,
231 .msg_iovlen = 2,
232 };
233
234 if (type == -1)
235 goto fail;
236
237 nbd_data->cmd_cookie += 1;
238
239 __nbd_build_req(q, data, nbd_data, type, &req);
240 q_data->in_flight_ios += 1;
241
242 nbd_data->done = 0;
243
244 again:
245 ret = nbd_queue_req(q, data, &req, &msg);
246 if (ret < 0)
247 goto fail;
248
249 co_await__suspend_always(data->tag);
250 if (io->tgt_io_cqe->res == -EAGAIN)
251 goto again;
252 ret = io->tgt_io_cqe->res;
253 fail:
254 if (ret < 0)
255 nbd_err("%s: err %d\n", __func__, ret);
256 else
257 ret += nbd_data->done;
258 ublksrv_complete_io(q, data->tag, ret);
259 q_data->in_flight_ios -= 1;
260 NBD_IO_DBG("%s: tag %d res %d\n", __func__, data->tag, ret);
261
262 co_return;
263 }
264
nbd_handle_recv_reply(const struct ublksrv_queue * q,struct nbd_io_data * nbd_data,const struct io_uring_cqe * cqe,const struct ublk_io_data ** io_data)265 static int nbd_handle_recv_reply(const struct ublksrv_queue *q,
266 struct nbd_io_data *nbd_data,
267 const struct io_uring_cqe *cqe,
268 const struct ublk_io_data **io_data)
269 {
270 struct nbd_queue_data *q_data = nbd_get_queue_data(q);
271 const struct ublk_io_data *data;
272 struct ublk_io_tgt *io;
273 u64 handle;
274 int tag, hwq;
275 unsigned ublk_op;
276 int ret = -EINVAL;
277
278 if (cqe->res < 0) {
279 nbd_err("%s %d: reply cqe %d\n", __func__,
280 __LINE__, cqe->res);
281 ret = cqe->res;
282 goto fail;
283 } else if (cqe->res == 0 && !nbd_data->done) {
284 //return 0;
285 nbd_err("%s %d: zero reply cqe %d %llx\n", __func__,
286 __LINE__, cqe->res, cqe->user_data);
287 }
288
289 if (ntohl(q_data->reply.magic) != NBD_REPLY_MAGIC) {
290 nbd_err("%s %d: reply bad magic %x res %d\n",
291 __func__, __LINE__,
292 ntohl(q_data->reply.magic), cqe->res);
293 ret = -EPROTO;
294 goto fail;
295 }
296
297 if (cqe->res + nbd_data->done != sizeof(struct nbd_reply)) {
298 nbd_err("%s %d: bad reply cqe %d %llx, done %u\n",
299 __func__, __LINE__,
300 cqe->res, cqe->user_data,
301 nbd_data->done);
302 }
303 ublk_assert(cqe->res + nbd_data->done == sizeof(struct nbd_reply));
304
305 memcpy(&handle, q_data->reply.handle, sizeof(handle));
306 tag = nbd_handle_to_tag(handle);
307 hwq = ublk_unique_tag_to_hwq(tag);
308 tag = ublk_unique_tag_to_tag(tag);
309
310 if (tag >= q->q_depth) {
311 nbd_err("%s %d: tag is too big %d\n", __func__,
312 __LINE__, tag);
313 goto fail;
314 }
315
316 if (hwq != q->q_id) {
317 nbd_err("%s %d: hwq is too big %d\n", __func__,
318 __LINE__, hwq);
319 goto fail;
320 }
321
322 data = ublksrv_queue_get_io_data(q, tag);
323 io = __ublk_get_io_tgt_data(data);
324 nbd_data = io_tgt_to_nbd_data(io);
325 if (nbd_data->cmd_cookie != nbd_handle_to_cookie(handle)) {
326 nbd_err("%s %d: cookie not match tag %d: %x %lx\n",
327 __func__, __LINE__, data->tag,
328 nbd_data->cmd_cookie, handle);
329 goto fail;
330 }
331
332 ublk_op = ublksrv_get_op(data->iod);
333 if (ublk_op == UBLK_IO_OP_READ) {
334 *io_data = data;
335 return 1;
336 } else {
337 int err = ntohl(q_data->reply.error);
338 struct io_uring_cqe fake_cqe;
339
340 NBD_IO_DBG("%s: got write reply, tag %d res %d\n",
341 __func__, data->tag, err);
342
343 if (err) {
344 fake_cqe.res = -EIO;
345 } else {
346 if (ublk_op == UBLK_IO_OP_WRITE)
347 fake_cqe.res = data->iod->nr_sectors << 9;
348 else
349 fake_cqe.res = 0;
350 }
351
352 io->tgt_io_cqe = &fake_cqe;
353 io->co.resume();
354 return 0;
355 }
356 fail:
357 return ret;
358 }
359
__nbd_resume_read_req(const struct ublk_io_data * data,const struct io_uring_cqe * cqe,unsigned done)360 static void __nbd_resume_read_req(const struct ublk_io_data *data,
361 const struct io_uring_cqe *cqe, unsigned done)
362 {
363 struct ublk_io_tgt *io = __ublk_get_io_tgt_data(data);
364 struct nbd_io_data *nbd_data = io_tgt_to_nbd_data(io);
365
366 nbd_data->done = done;
367 io->tgt_io_cqe = cqe;
368 io->co.resume();
369 }
370
371 /* recv completion drives the whole IO flow */
nbd_start_recv(const struct ublksrv_queue * q,struct nbd_io_data * nbd_data,void * buf,int len,bool reply,unsigned done)372 static inline int nbd_start_recv(const struct ublksrv_queue *q,
373 struct nbd_io_data *nbd_data, void *buf, int len,
374 bool reply, unsigned done)
375 {
376 struct nbd_queue_data *q_data = nbd_get_queue_data(q);
377 struct io_uring_sqe *sqe = io_uring_get_sqe(q->ring_ptr);
378 unsigned int op = reply ? NBD_OP_READ_REPLY : UBLK_IO_OP_READ;
379 unsigned int tag = q->q_depth; //recv always use this extra tag
380
381 if (!sqe) {
382 nbd_err("%s: get sqe failed, len %d reply %d done %d\n",
383 __func__, len, reply, done);
384 return -ENOMEM;
385 }
386
387 nbd_data->done = done;
388 io_uring_prep_recv(sqe, q->q_id + 1, (char *)buf + done, len - done, MSG_WAITALL);
389 io_uring_sqe_set_flags(sqe, IOSQE_FIXED_FILE);
390
391 /* bit63 marks us as tgt io */
392 sqe->user_data = build_user_data(tag, op, 0, 1);
393
394 ublk_assert(q_data->in_flight_ios);
395 NBD_IO_DBG("%s: q_inflight %d queue recv %s"
396 "(qid %d tag %u, target: %d, user_data %llx)\n",
397 __func__, q_data->in_flight_ios, reply ? "reply" : "io",
398 q->q_id, tag, 1, sqe->user_data);
399
400 return 0;
401 }
402
403 /*
404 * Submit recv worker for reading nbd reply or read io data
405 *
406 * return value:
407 *
408 * 0 : queued via io_uring
409 * len : data read already, must be same with len
410 * < 0 : failure
411 */
nbd_do_recv(const struct ublksrv_queue * q,struct nbd_io_data * nbd_data,int fd,void * buf,unsigned len)412 static int nbd_do_recv(const struct ublksrv_queue *q,
413 struct nbd_io_data *nbd_data, int fd,
414 void *buf, unsigned len)
415 {
416 unsigned msg_flags = MSG_DONTWAIT | MSG_WAITALL;
417 int i = 0, done = 0;
418 const int loops = len < 512 ? 16 : 32;
419 int ret;
420
421 while (i++ < loops && done < len) {
422 ret = recv(fd, (char *)buf + done, len - done, msg_flags);
423 if (ret > 0)
424 done += ret;
425
426 if (!done)
427 break;
428 }
429 if (done == len)
430 return done;
431
432 NBD_IO_DBG("%s: sync(non-blocking) recv %d(%s)/%d/%u\n",
433 __func__, ret, strerror(errno), done, len);
434 ret = nbd_start_recv(q, nbd_data, buf, len, len < 512, done);
435
436 return ret;
437 }
438
439 /*
440 * Every request will be responded with one reply, and we complete the
441 * request after the reply is received.
442 *
443 * Read request is a bit special, since the data returned are received
444 * with the reply together, so we have to handle read IO data here.
445 */
__nbd_handle_recv(const struct ublksrv_queue * q,const struct ublk_io_data * data,struct ublk_io_tgt * io)446 static co_io_job __nbd_handle_recv(const struct ublksrv_queue *q,
447 const struct ublk_io_data *data, struct ublk_io_tgt *io)
448 {
449 struct nbd_io_data *nbd_data = io_tgt_to_nbd_data(io);
450 struct nbd_queue_data *q_data = nbd_get_queue_data(q);
451 int fd = q->dev->tgt.fds[q->q_id + 1];
452 unsigned int len;
453 u64 cqe_buf[2] = {0};
454 struct io_uring_cqe *fake_cqe = (struct io_uring_cqe *)cqe_buf;
455
456 q_data->recv_started = 1;
457
458 while (q_data->in_flight_ios > 0) {
459 const struct ublk_io_data *io_data = NULL;
460 int ret;
461 read_reply:
462 ret = nbd_do_recv(q, nbd_data, fd, &q_data->reply,
463 sizeof(q_data->reply));
464 if (ret == sizeof(q_data->reply)) {
465 nbd_data->done = ret;
466 fake_cqe->res = 0;
467 io->tgt_io_cqe = fake_cqe;
468 goto handle_recv;
469 } else if (ret < 0)
470 break;
471
472 co_await__suspend_always(data->tag);
473 if (io->tgt_io_cqe->res == -EAGAIN)
474 goto read_reply;
475
476 handle_recv:
477 ret = nbd_handle_recv_reply(q, nbd_data, io->tgt_io_cqe, &io_data);
478 if (ret < 0)
479 break;
480 if (!ret)
481 continue;
482 read_io:
483 ublk_assert(io_data != NULL);
484
485 len = io_data->iod->nr_sectors << 9;
486 ret = nbd_do_recv(q, nbd_data, fd, (void *)io_data->iod->addr, len);
487 if (ret == len) {
488 nbd_data->done = ret;
489 fake_cqe->res = 0;
490 io->tgt_io_cqe = fake_cqe;
491 goto handle_read_io;
492 } else if (ret < 0)
493 break;
494
495 /* still wait on recv coroutine context */
496 co_await__suspend_always(data->tag);
497
498 ret = io->tgt_io_cqe->res;
499 if (ret == -EAGAIN)
500 goto read_io;
501
502 handle_read_io:
503 __nbd_resume_read_req(io_data, io->tgt_io_cqe, nbd_data->done);
504 }
505 q_data->recv_started = 0;
506 co_return;
507 }
508
nbd_handle_io_async(const struct ublksrv_queue * q,const struct ublk_io_data * data)509 static int nbd_handle_io_async(const struct ublksrv_queue *q,
510 const struct ublk_io_data *data)
511 {
512 struct ublk_io_tgt *io = __ublk_get_io_tgt_data(data);
513 struct nbd_queue_data *q_data = nbd_get_queue_data(q);
514
515 /*
516 * Put the io in the queue and submit them after
517 * the current chain becomes idle.
518 */
519 if (q_data->send_sqe_chain_busy)
520 q_data->next_chain.push_back(data);
521 else
522 io->co = __nbd_handle_io_async(q, data, io);
523
524 return 0;
525 }
526
527 /*
528 * Don't touch @data because the pointed ublk io request may have been
529 * completed before this send cqe is handled. And ublk io request completion
530 * is triggered by reply received from nbd server.
531 */
nbd_send_req_done(const struct ublksrv_queue * q,const struct ublk_io_data * data,const struct io_uring_cqe * cqe)532 static void nbd_send_req_done(const struct ublksrv_queue *q,
533 const struct ublk_io_data *data,
534 const struct io_uring_cqe *cqe)
535 {
536 struct nbd_queue_data *q_data = nbd_get_queue_data(q);
537 unsigned ublk_op = user_data_to_op(cqe->user_data);
538 int tag = user_data_to_tag(cqe->user_data);
539 unsigned int nr_sects = user_data_to_tgt_data(cqe->user_data);
540 unsigned total;
541
542 /* nothing to do for send_zc notification */
543 if (cqe->flags & IORING_CQE_F_NOTIF)
544 return;
545
546 ublk_assert(q_data->chained_send_ios);
547 if (!--q_data->chained_send_ios) {
548 if (q_data->send_sqe_chain_busy)
549 q_data->send_sqe_chain_busy = 0;
550 }
551
552 /*
553 * In case of failure, how to tell recv work to handle the
554 * request? So far just warn it, maybe nbd server will
555 * send one err reply.
556 */
557 if (cqe->res < 0)
558 nbd_err("%s: tag %d cqe fail %d %llx\n",
559 __func__, tag, cqe->res, cqe->user_data);
560
561 /*
562 * We have set MSG_WAITALL, so short send shouldn't be possible,
563 * but just warn in case of io_uring regression
564 */
565 if (ublk_op == UBLK_IO_OP_WRITE)
566 total = sizeof(nbd_request) + (nr_sects << 9);
567 else
568 total = sizeof(nbd_request);
569 if (cqe->res < total)
570 nbd_err("%s: short send/receive tag %d op %d %llx, len %u written %u cqe flags %x\n",
571 __func__, tag, ublk_op, cqe->user_data,
572 total, cqe->res, cqe->flags);
573 }
574
nbd_tgt_io_done(const struct ublksrv_queue * q,const struct ublk_io_data * data,const struct io_uring_cqe * cqe)575 static void nbd_tgt_io_done(const struct ublksrv_queue *q,
576 const struct ublk_io_data *data,
577 const struct io_uring_cqe *cqe)
578 {
579 int tag = user_data_to_tag(cqe->user_data);
580
581 ublk_assert(tag == data->tag);
582 #if NBD_DEBUG_CQE == 1
583 struct nbd_queue_data *q_data = nbd_get_queue_data(q);
584 nbd_err("%s: tag %d queue(ios %u %u) cqe(res %d flags %x user data %llx)\n",
585 __func__, tag,
586 q_data->in_flight_ios, q_data->chained_send_ios,
587 cqe->res, cqe->flags, cqe->user_data);
588 #endif
589
590 /* both reply and read io is done in recv io coroutine */
591 if (is_recv_io(q, data)) {
592 struct nbd_queue_data *q_data = nbd_get_queue_data(q);
593
594 /*
595 * Delay recv data handling into nbd_handle_io_bg(), so
596 * any recv sqe won't cut in the send sqe chain.
597 *
598 * So far, recv is strictly serialized, so saving
599 * this single cqe works; in the future, if
600 * recv becomes batched, here has to be fixed
601 */
602 q_data->recv_cqe = *cqe;
603 q_data->need_handle_recv = 1;
604 return;
605 }
606
607 nbd_send_req_done(q, data, cqe);
608 }
609
nbd_handle_send_bg(const struct ublksrv_queue * q,struct nbd_queue_data * q_data)610 static void nbd_handle_send_bg(const struct ublksrv_queue *q,
611 struct nbd_queue_data *q_data)
612 {
613 if (!q_data->send_sqe_chain_busy) {
614 std::vector<const struct ublk_io_data *> &ios =
615 q_data->next_chain;
616
617 for (auto it = ios.cbegin(); it != ios.cend(); ++it) {
618 auto data = *it;
619 struct ublk_io_tgt *io = __ublk_get_io_tgt_data(data);
620
621 ublk_assert(data->tag < q->q_depth);
622 io->co = __nbd_handle_io_async(q, data, io);
623 }
624
625 ios.clear();
626
627 if (q_data->chained_send_ios && !q_data->send_sqe_chain_busy)
628 q_data->send_sqe_chain_busy = 1;
629 }
630 if (q_data->last_send_sqe) {
631 q_data->last_send_sqe->flags &= ~IOSQE_IO_LINK;
632 q_data->last_send_sqe = NULL;
633 }
634 }
635
nbd_handle_recv_bg(const struct ublksrv_queue * q,struct nbd_queue_data * q_data)636 static void nbd_handle_recv_bg(const struct ublksrv_queue *q,
637 struct nbd_queue_data *q_data)
638 {
639 if (q_data->in_flight_ios && !q_data->recv_started) {
640 const struct ublk_io_data *data =
641 ublksrv_queue_get_io_data(q, q->q_depth);
642 struct ublk_io_tgt *io = __ublk_get_io_tgt_data(data);
643
644 ublk_assert(data->tag == q->q_depth);
645
646 io->co = __nbd_handle_recv(q, data, io);
647 }
648
649 /* reply or read io data is comming */
650 if (q_data->need_handle_recv) {
651 const struct ublk_io_data *data =
652 ublksrv_queue_get_io_data(q, q->q_depth);
653 struct ublk_io_tgt *io = __ublk_get_io_tgt_data(data);
654
655 ublk_assert(data->tag == q->q_depth);
656
657 io->tgt_io_cqe = &q_data->recv_cqe;
658 io->co.resume();
659 q_data->need_handle_recv = 0;
660 }
661 }
662
__nbd_handle_io_bg(const struct ublksrv_queue * q,struct nbd_queue_data * q_data)663 static void __nbd_handle_io_bg(const struct ublksrv_queue *q,
664 struct nbd_queue_data *q_data)
665 {
666 nbd_handle_send_bg(q, q_data);
667
668 /* stop to queue send now since we need to recv now */
669 if (q_data->chained_send_ios && !q_data->send_sqe_chain_busy)
670 q_data->send_sqe_chain_busy = 1;
671
672 /*
673 * recv SQE can't cut in send SQE chain, so it has to be
674 * moved here after the send SQE chain is built
675 *
676 * Also queuing ublk io command may allocate sqe too.
677 */
678 nbd_handle_recv_bg(q, q_data);
679 }
680
681 /*
682 * The initial send request batch should be in same send sqe batch, before
683 * this batch isn't done, all new send requests are staggered into next_chain
684 * which will be flushed after the current chain is completed.
685 *
686 * Also recv work is always started after send requests are queued, because
687 * the recv sqe may cut the send sqe chain, and the ublk io cmd sqe may cut
688 * the send sqe chain too.
689 *
690 * This is why nbd_handle_recv_bg() always follows nbd_handle_send_bg().
691 */
nbd_handle_io_bg(const struct ublksrv_queue * q,int nr_queued_io)692 static void nbd_handle_io_bg(const struct ublksrv_queue *q, int nr_queued_io)
693 {
694 struct nbd_queue_data *q_data = nbd_get_queue_data(q);
695
696 NBD_IO_DBG("%s: pending ios %d/%d chain_busy %d next_chain %ld recv(%d) sqes %u\n",
697 __func__, q_data->in_flight_ios,
698 q_data->chained_send_ios,
699 q_data->send_sqe_chain_busy,
700 q_data->next_chain.size(),
701 q_data->recv_started,
702 nr_queued_io);
703
704 __nbd_handle_io_bg(q, q_data);
705
706 /*
707 * io can be completed in recv work since we do sync recv, so
708 * io could be completed before the send seq's cqe is returned.
709 *
710 * When this happens, simply clear chain busy, so that we can
711 * queue more requests.
712 */
713 if (!q_data->in_flight_ios && q_data->send_sqe_chain_busy) {
714 /* all inflight ios are done, so it is safe to send request */
715 q_data->send_sqe_chain_busy = 0;
716
717 if (!q_data->next_chain.empty())
718 __nbd_handle_io_bg(q, q_data);
719 }
720
721 if (!q_data->recv_started && !q_data->send_sqe_chain_busy &&
722 !q_data->next_chain.empty())
723 nbd_err("%s: hang risk: pending ios %d/%d\n",
724 __func__, q_data->in_flight_ios,
725 q_data->chained_send_ios);
726 }
727
nbd_usage_for_add(void)728 static void nbd_usage_for_add(void)
729 {
730 printf(" nbd: --host=$HOST [--port=$PORT] | --unix=$UNIX_PATH\n");
731 }
732
nbd_init_queue(const struct ublksrv_queue * q,void ** queue_data_ptr)733 static int nbd_init_queue(const struct ublksrv_queue *q,
734 void **queue_data_ptr)
735 {
736 struct nbd_queue_data *data =
737 (struct nbd_queue_data *)calloc(sizeof(*data), 1);
738 struct nbd_tgt_data *ddata = (struct nbd_tgt_data*)q->dev->tgt.tgt_data;
739
740 if (!data)
741 return -ENOMEM;
742
743 data->next_chain.clear();
744 data->use_send_zc = ddata->unix_sock ? false : ddata->use_send_zc;
745 data->use_unix_sock = ddata->unix_sock;
746 data->recv_started = 0;
747 //nbd_err("%s send zc %d\n", __func__, data->use_send_zc);
748
749 *queue_data_ptr = (void *)data;
750 return 0;
751 }
752
nbd_deinit_queue(const struct ublksrv_queue * q)753 static void nbd_deinit_queue(const struct ublksrv_queue *q)
754 {
755 struct nbd_queue_data *data = nbd_get_queue_data(q);
756
757 free(data);
758 }
759
nbd_deinit_tgt(const struct ublksrv_dev * dev)760 static void nbd_deinit_tgt(const struct ublksrv_dev *dev)
761 {
762 const struct ublksrv_tgt_info *tgt = &dev->tgt;
763 const struct ublksrv_ctrl_dev_info *info =
764 ublksrv_ctrl_get_dev_info(ublksrv_get_ctrl_dev(dev));
765 int i;
766
767 free(tgt->tgt_data);
768
769 for (i = 0; i < info->nr_hw_queues; i++) {
770 int fd = tgt->fds[i + 1];
771
772 shutdown(fd, SHUT_RDWR);
773 close(fd);
774 }
775 }
776
nbd_setup_tgt(struct ublksrv_dev * dev,int type,bool recovery,const char * jbuf,uint16_t * flags)777 static int nbd_setup_tgt(struct ublksrv_dev *dev, int type, bool recovery,
778 const char *jbuf, uint16_t *flags)
779 {
780 struct ublksrv_tgt_info *tgt = &dev->tgt;
781 const struct ublksrv_ctrl_dev_info *info =
782 ublksrv_ctrl_get_dev_info(ublksrv_get_ctrl_dev(dev));
783 int i;
784 struct nbd_tgt_data *data = (struct nbd_tgt_data *)dev->tgt.tgt_data;
785
786 const char *port = NBD_DEFAULT_PORT;
787 uint16_t needed_flags = 0;
788 uint32_t cflags = NBD_FLAG_C_FIXED_NEWSTYLE;
789
790 char host_name[NBD_MAX_NAME] = {0};
791 char exp_name[NBD_MAX_NAME] = {0};
792 char unix_path[NBD_MAX_NAME] = {0};
793 u64 size64 = 0;
794 bool can_opt_go = true;
795
796 /* todo: support tls */
797 char *certfile = NULL;
798 char *keyfile = NULL;
799 char *cacertfile = NULL;
800 char *tlshostname = NULL;
801 bool tls = false;
802
803 long send_zc = 0;
804
805
806 if (info->flags & UBLK_F_USER_COPY)
807 return -EINVAL;
808
809 ublk_assert(jbuf);
810 ublk_assert(type == UBLKSRV_TGT_TYPE_NBD);
811 ublk_assert(!recovery || info->state == UBLK_S_DEV_QUIESCED);
812
813 ublksrv_json_read_target_str_info(jbuf, NBD_MAX_NAME, "host",
814 host_name);
815 ublksrv_json_read_target_str_info(jbuf, NBD_MAX_NAME, "unix",
816 unix_path);
817 ublksrv_json_read_target_str_info(jbuf, NBD_MAX_NAME, "export_name",
818 exp_name);
819 ublksrv_json_read_target_ulong_info(jbuf, "send_zc", &send_zc);
820
821 NBD_HS_DBG("%s: host %s unix %s exp_name %s send_zc\n", __func__,
822 host_name, unix_path, exp_name, send_zc);
823 for (i = 0; i < info->nr_hw_queues; i++) {
824 int sock;
825 unsigned int opts = 0;
826
827 if (strlen(unix_path))
828 sock = openunix(unix_path);
829 else
830 sock = opennet(host_name, port, false);
831
832 if (sock >= 0)
833 negotiate(&sock, &size64, flags, exp_name,
834 needed_flags, cflags, opts, certfile,
835 keyfile, cacertfile, tlshostname, tls,
836 can_opt_go);
837 else
838 ublk_err("%s: open socket failed %d\n", __func__, sock);
839
840 tgt->fds[i + 1] = sock;
841 NBD_HS_DBG("%s:qid %d %s-%s size %luMB flags %x sock %d\n",
842 __func__, i, host_name, port,
843 size64 >> 20, *flags, sock);
844 }
845
846 tgt->dev_size = size64;
847
848 /*
849 * one extra slot for receiving reply & read io, so
850 * the preferred queue depth should be 127 or 255,
851 * then half of SQ memory consumption can be saved
852 * especially we use IORING_SETUP_SQE128
853 */
854 tgt->tgt_ring_depth = info->queue_depth + 1;
855 tgt->nr_fds = info->nr_hw_queues;
856 tgt->extra_ios = 1; //one extra slot for receiving nbd reply
857 data->unix_sock = strlen(unix_path) > 0 ? true : false;
858 data->use_send_zc = !!send_zc;
859
860 tgt->io_data_size = sizeof(struct ublk_io_tgt) +
861 sizeof(struct nbd_io_data);
862
863 ublksrv_dev_set_cq_depth(dev, 2 * tgt->tgt_ring_depth);
864
865 return 0;
866 }
867
nbd_parse_flags(struct ublk_params * p,uint16_t flags,uint32_t bs)868 static void nbd_parse_flags(struct ublk_params *p, uint16_t flags, uint32_t bs)
869 {
870 __u32 attrs = 0;
871
872 NBD_HS_DBG("%s: negotiated flags %x\n", __func__, flags);
873
874 if (flags & NBD_FLAG_READ_ONLY)
875 attrs |= UBLK_ATTR_READ_ONLY;
876 if (flags & NBD_FLAG_SEND_FLUSH) {
877 if (flags & NBD_FLAG_SEND_FUA)
878 attrs |= UBLK_ATTR_FUA;
879 else
880 attrs |= UBLK_ATTR_VOLATILE_CACHE;
881 }
882
883 p->basic.attrs |= attrs;
884
885 if (flags & NBD_FLAG_SEND_TRIM) {
886 p->discard.discard_granularity = bs;
887 p->discard.max_discard_sectors = UINT_MAX >> 9;
888 p->discard.max_discard_segments = 1;
889 p->types |= UBLK_PARAM_TYPE_DISCARD;
890 }
891 }
892
nbd_init_tgt(struct ublksrv_dev * dev,int type,int argc,char * argv[])893 static int nbd_init_tgt(struct ublksrv_dev *dev, int type, int argc,
894 char *argv[])
895 {
896 int send_zc = 0;
897 int read_only = 0;
898 static const struct option nbd_longopts[] = {
899 { "host", required_argument, 0, 0},
900 { "unix", required_argument, 0, 0},
901 { "export_name", required_argument, 0, 0},
902 { "send_zc", 0, &send_zc, 1},
903 { "read_only", 0, &read_only, 1},
904 { NULL }
905 };
906 struct ublksrv_tgt_info *tgt = &dev->tgt;
907 const struct ublksrv_ctrl_dev_info *info =
908 ublksrv_ctrl_get_dev_info(ublksrv_get_ctrl_dev(dev));
909 int jbuf_size;
910 char *jbuf = ublksrv_tgt_return_json_buf(dev, &jbuf_size);
911 struct ublksrv_tgt_base_json tgt_json = {
912 .type = type,
913 };
914 int opt;
915 int option_index = 0;
916 unsigned char bs_shift = 9;
917 const char *host_name = NULL;
918 const char *unix_path = NULL;
919 const char *exp_name = NULL;
920 uint16_t flags = 0;
921 int ret;
922
923 strcpy(tgt_json.name, "nbd");
924
925 if (type != UBLKSRV_TGT_TYPE_NBD)
926 return -1;
927
928 while ((opt = getopt_long(argc, argv, "-:f:",
929 nbd_longopts, &option_index)) != -1) {
930 if (opt < 0)
931 break;
932 if (opt > 0)
933 continue;
934
935 if (!strcmp(nbd_longopts[option_index].name, "host"))
936 host_name = optarg;
937 if (!strcmp(nbd_longopts[option_index].name, "unix"))
938 unix_path = optarg;
939 if (!strcmp(nbd_longopts[option_index].name, "export_name"))
940 exp_name = optarg;
941 }
942
943 #ifndef HAVE_LIBURING_SEND_ZC
944 if (send_zc)
945 return -EINVAL;
946 #endif
947
948 ublk_json_write_dev_info(dev, &jbuf, &jbuf_size);
949 ublk_json_write_tgt_str(dev, &jbuf, &jbuf_size, "host", host_name);
950 ublk_json_write_tgt_str(dev, &jbuf, &jbuf_size, "unix", unix_path);
951 ublk_json_write_tgt_str(dev, &jbuf, &jbuf_size, "export_name", exp_name);
952 ublk_json_write_tgt_long(dev, &jbuf, &jbuf_size, "send_zc", send_zc);
953
954 tgt->tgt_data = calloc(sizeof(struct nbd_tgt_data), 1);
955
956 ret = nbd_setup_tgt(dev, type, false, jbuf, &flags);
957 if (ret)
958 return ret;
959
960 tgt_json.dev_size = tgt->dev_size;
961 ublk_json_write_target_base(dev, &jbuf, &jbuf_size, &tgt_json);
962
963 struct ublk_params p = {
964 .types = UBLK_PARAM_TYPE_BASIC,
965 .basic = {
966 .attrs = read_only ? UBLK_ATTR_READ_ONLY : 0U,
967 .logical_bs_shift = bs_shift,
968 .physical_bs_shift = 12,
969 .io_opt_shift = 12,
970 .io_min_shift = bs_shift,
971 .max_sectors = info->max_io_buf_bytes >> 9,
972 .dev_sectors = tgt->dev_size >> 9,
973 },
974 };
975
976 nbd_parse_flags(&p, flags, 1U << bs_shift);
977 ublk_json_write_params(dev, &jbuf, &jbuf_size, &p);
978
979 return 0;
980 }
981
nbd_recovery_tgt(struct ublksrv_dev * dev,int type)982 static int nbd_recovery_tgt(struct ublksrv_dev *dev, int type)
983 {
984 const struct ublksrv_ctrl_dev *cdev = ublksrv_get_ctrl_dev(dev);
985 const char *jbuf = ublksrv_ctrl_get_recovery_jbuf(cdev);
986 uint16_t flags = 0;
987
988 dev->tgt.tgt_data = calloc(sizeof(struct nbd_tgt_data), 1);
989
990 return nbd_setup_tgt(dev, type, true, jbuf, &flags);
991 }
992
993 struct ublksrv_tgt_type nbd_tgt_type = {
994 .handle_io_async = nbd_handle_io_async,
995 .tgt_io_done = nbd_tgt_io_done,
996 .handle_io_background = nbd_handle_io_bg,
997 .usage_for_add = nbd_usage_for_add,
998 .init_tgt = nbd_init_tgt,
999 .deinit_tgt = nbd_deinit_tgt,
1000 .type = UBLKSRV_TGT_TYPE_NBD,
1001 .name = "nbd",
1002 .recovery_tgt = nbd_recovery_tgt,
1003 .init_queue = nbd_init_queue,
1004 .deinit_queue = nbd_deinit_queue,
1005 };
1006
1007 static void tgt_nbd_init() __attribute__((constructor));
1008
tgt_nbd_init(void)1009 static void tgt_nbd_init(void)
1010 {
1011 ublksrv_register_tgt_type(&nbd_tgt_type);
1012 }
1013
1014