1 /* SPDX-License-Identifier: MIT */
2 #define _POSIX_C_SOURCE 200112L
3
4 #include "lib.h"
5 #include "syscall.h"
6 #include "liburing.h"
7 #include "int_flags.h"
8 #include "liburing/compat.h"
9 #include "liburing/io_uring.h"
10
11 /*
12 * Returns true if we're not using SQ thread (thus nobody submits but us)
13 * or if IORING_SQ_NEED_WAKEUP is set, so submit thread must be explicitly
14 * awakened. For the latter case, we set the thread wakeup flag.
15 */
sq_ring_needs_enter(struct io_uring * ring,unsigned * flags)16 static inline bool sq_ring_needs_enter(struct io_uring *ring, unsigned *flags)
17 {
18 if (!(ring->flags & IORING_SETUP_SQPOLL))
19 return true;
20
21 /*
22 * Ensure the kernel can see the store to the SQ tail before we read
23 * the flags.
24 */
25 io_uring_smp_mb();
26
27 if (uring_unlikely(IO_URING_READ_ONCE(*ring->sq.kflags) &
28 IORING_SQ_NEED_WAKEUP)) {
29 *flags |= IORING_ENTER_SQ_WAKEUP;
30 return true;
31 }
32
33 return false;
34 }
35
cq_ring_needs_flush(struct io_uring * ring)36 static inline bool cq_ring_needs_flush(struct io_uring *ring)
37 {
38 return IO_URING_READ_ONCE(*ring->sq.kflags) &
39 (IORING_SQ_CQ_OVERFLOW | IORING_SQ_TASKRUN);
40 }
41
cq_ring_needs_enter(struct io_uring * ring)42 static inline bool cq_ring_needs_enter(struct io_uring *ring)
43 {
44 return (ring->flags & IORING_SETUP_IOPOLL) || cq_ring_needs_flush(ring);
45 }
46
47 struct get_data {
48 unsigned submit;
49 unsigned wait_nr;
50 unsigned get_flags;
51 int sz;
52 void *arg;
53 };
54
_io_uring_get_cqe(struct io_uring * ring,struct io_uring_cqe ** cqe_ptr,struct get_data * data)55 static int _io_uring_get_cqe(struct io_uring *ring,
56 struct io_uring_cqe **cqe_ptr,
57 struct get_data *data)
58 {
59 struct io_uring_cqe *cqe = NULL;
60 bool looped = false;
61 int err;
62
63 do {
64 bool need_enter = false;
65 unsigned flags = 0;
66 unsigned nr_available;
67 int ret;
68
69 err = __io_uring_peek_cqe(ring, &cqe, &nr_available);
70 if (err)
71 break;
72 if (!cqe && !data->wait_nr && !data->submit) {
73 /*
74 * If we already looped once, we already entererd
75 * the kernel. Since there's nothing to submit or
76 * wait for, don't keep retrying.
77 */
78 if (looped || !cq_ring_needs_enter(ring)) {
79 err = -EAGAIN;
80 break;
81 }
82 need_enter = true;
83 }
84 if (data->wait_nr > nr_available || need_enter) {
85 flags = IORING_ENTER_GETEVENTS | data->get_flags;
86 need_enter = true;
87 }
88 if (data->submit && sq_ring_needs_enter(ring, &flags))
89 need_enter = true;
90 if (!need_enter)
91 break;
92
93 if (ring->int_flags & INT_FLAG_REG_RING)
94 flags |= IORING_ENTER_REGISTERED_RING;
95 ret = ____sys_io_uring_enter2(ring->enter_ring_fd, data->submit,
96 data->wait_nr, flags, data->arg,
97 data->sz);
98 if (ret < 0) {
99 err = ret;
100 break;
101 }
102
103 data->submit -= ret;
104 if (cqe)
105 break;
106 looped = true;
107 } while (1);
108
109 *cqe_ptr = cqe;
110 return err;
111 }
112
__io_uring_get_cqe(struct io_uring * ring,struct io_uring_cqe ** cqe_ptr,unsigned submit,unsigned wait_nr,sigset_t * sigmask)113 int __io_uring_get_cqe(struct io_uring *ring, struct io_uring_cqe **cqe_ptr,
114 unsigned submit, unsigned wait_nr, sigset_t *sigmask)
115 {
116 struct get_data data = {
117 .submit = submit,
118 .wait_nr = wait_nr,
119 .get_flags = 0,
120 .sz = _NSIG / 8,
121 .arg = sigmask,
122 };
123
124 return _io_uring_get_cqe(ring, cqe_ptr, &data);
125 }
126
127 /*
128 * Fill in an array of IO completions up to count, if any are available.
129 * Returns the amount of IO completions filled.
130 */
io_uring_peek_batch_cqe(struct io_uring * ring,struct io_uring_cqe ** cqes,unsigned count)131 unsigned io_uring_peek_batch_cqe(struct io_uring *ring,
132 struct io_uring_cqe **cqes, unsigned count)
133 {
134 unsigned ready;
135 bool overflow_checked = false;
136 int shift = 0;
137
138 if (ring->flags & IORING_SETUP_CQE32)
139 shift = 1;
140
141 again:
142 ready = io_uring_cq_ready(ring);
143 if (ready) {
144 unsigned head = *ring->cq.khead;
145 unsigned mask = *ring->cq.kring_mask;
146 unsigned last;
147 int i = 0;
148
149 count = count > ready ? ready : count;
150 last = head + count;
151 for (;head != last; head++, i++)
152 cqes[i] = &ring->cq.cqes[(head & mask) << shift];
153
154 return count;
155 }
156
157 if (overflow_checked)
158 goto done;
159
160 if (cq_ring_needs_flush(ring)) {
161 int flags = IORING_ENTER_GETEVENTS;
162
163 if (ring->int_flags & INT_FLAG_REG_RING)
164 flags |= IORING_ENTER_REGISTERED_RING;
165 ____sys_io_uring_enter(ring->enter_ring_fd, 0, 0, flags, NULL);
166 overflow_checked = true;
167 goto again;
168 }
169
170 done:
171 return 0;
172 }
173
174 /*
175 * Sync internal state with kernel ring state on the SQ side. Returns the
176 * number of pending items in the SQ ring, for the shared ring.
177 */
__io_uring_flush_sq(struct io_uring * ring)178 int __io_uring_flush_sq(struct io_uring *ring)
179 {
180 struct io_uring_sq *sq = &ring->sq;
181 const unsigned mask = *sq->kring_mask;
182 unsigned ktail = *sq->ktail;
183 unsigned to_submit = sq->sqe_tail - sq->sqe_head;
184
185 if (!to_submit)
186 goto out;
187
188 /*
189 * Fill in sqes that we have queued up, adding them to the kernel ring
190 */
191 do {
192 sq->array[ktail & mask] = sq->sqe_head & mask;
193 ktail++;
194 sq->sqe_head++;
195 } while (--to_submit);
196
197 /*
198 * Ensure that the kernel sees the SQE updates before it sees the tail
199 * update.
200 */
201 io_uring_smp_store_release(sq->ktail, ktail);
202 out:
203 /*
204 * This _may_ look problematic, as we're not supposed to be reading
205 * SQ->head without acquire semantics. When we're in SQPOLL mode, the
206 * kernel submitter could be updating this right now. For non-SQPOLL,
207 * task itself does it, and there's no potential race. But even for
208 * SQPOLL, the load is going to be potentially out-of-date the very
209 * instant it's done, regardless or whether or not it's done
210 * atomically. Worst case, we're going to be over-estimating what
211 * we can submit. The point is, we need to be able to deal with this
212 * situation regardless of any perceived atomicity.
213 */
214 return ktail - *sq->khead;
215 }
216
217 /*
218 * If we have kernel support for IORING_ENTER_EXT_ARG, then we can use that
219 * more efficiently than queueing an internal timeout command.
220 */
io_uring_wait_cqes_new(struct io_uring * ring,struct io_uring_cqe ** cqe_ptr,unsigned wait_nr,struct __kernel_timespec * ts,sigset_t * sigmask)221 static int io_uring_wait_cqes_new(struct io_uring *ring,
222 struct io_uring_cqe **cqe_ptr,
223 unsigned wait_nr,
224 struct __kernel_timespec *ts,
225 sigset_t *sigmask)
226 {
227 struct io_uring_getevents_arg arg = {
228 .sigmask = (unsigned long) sigmask,
229 .sigmask_sz = _NSIG / 8,
230 .ts = (unsigned long) ts
231 };
232 struct get_data data = {
233 .wait_nr = wait_nr,
234 .get_flags = IORING_ENTER_EXT_ARG,
235 .sz = sizeof(arg),
236 .arg = &arg
237 };
238
239 return _io_uring_get_cqe(ring, cqe_ptr, &data);
240 }
241
242 /*
243 * Like io_uring_wait_cqe(), except it accepts a timeout value as well. Note
244 * that an sqe is used internally to handle the timeout. For kernel doesn't
245 * support IORING_FEAT_EXT_ARG, applications using this function must never
246 * set sqe->user_data to LIBURING_UDATA_TIMEOUT!
247 *
248 * For kernels without IORING_FEAT_EXT_ARG (5.10 and older), if 'ts' is
249 * specified, the application need not call io_uring_submit() before
250 * calling this function, as we will do that on its behalf. From this it also
251 * follows that this function isn't safe to use for applications that split SQ
252 * and CQ handling between two threads and expect that to work without
253 * synchronization, as this function manipulates both the SQ and CQ side.
254 *
255 * For kernels with IORING_FEAT_EXT_ARG, no implicit submission is done and
256 * hence this function is safe to use for applications that split SQ and CQ
257 * handling between two threads.
258 */
__io_uring_submit_timeout(struct io_uring * ring,unsigned wait_nr,struct __kernel_timespec * ts)259 static int __io_uring_submit_timeout(struct io_uring *ring, unsigned wait_nr,
260 struct __kernel_timespec *ts)
261 {
262 struct io_uring_sqe *sqe;
263 int ret;
264
265 /*
266 * If the SQ ring is full, we may need to submit IO first
267 */
268 sqe = io_uring_get_sqe(ring);
269 if (!sqe) {
270 ret = io_uring_submit(ring);
271 if (ret < 0)
272 return ret;
273 sqe = io_uring_get_sqe(ring);
274 if (!sqe)
275 return -EAGAIN;
276 }
277 io_uring_prep_timeout(sqe, ts, wait_nr, 0);
278 sqe->user_data = LIBURING_UDATA_TIMEOUT;
279 return __io_uring_flush_sq(ring);
280 }
281
io_uring_wait_cqes(struct io_uring * ring,struct io_uring_cqe ** cqe_ptr,unsigned wait_nr,struct __kernel_timespec * ts,sigset_t * sigmask)282 int io_uring_wait_cqes(struct io_uring *ring, struct io_uring_cqe **cqe_ptr,
283 unsigned wait_nr, struct __kernel_timespec *ts,
284 sigset_t *sigmask)
285 {
286 int to_submit = 0;
287
288 if (ts) {
289 if (ring->features & IORING_FEAT_EXT_ARG)
290 return io_uring_wait_cqes_new(ring, cqe_ptr, wait_nr,
291 ts, sigmask);
292 to_submit = __io_uring_submit_timeout(ring, wait_nr, ts);
293 if (to_submit < 0)
294 return to_submit;
295 }
296
297 return __io_uring_get_cqe(ring, cqe_ptr, to_submit, wait_nr, sigmask);
298 }
299
io_uring_submit_and_wait_timeout(struct io_uring * ring,struct io_uring_cqe ** cqe_ptr,unsigned wait_nr,struct __kernel_timespec * ts,sigset_t * sigmask)300 int io_uring_submit_and_wait_timeout(struct io_uring *ring,
301 struct io_uring_cqe **cqe_ptr,
302 unsigned wait_nr,
303 struct __kernel_timespec *ts,
304 sigset_t *sigmask)
305 {
306 int to_submit;
307
308 if (ts) {
309 if (ring->features & IORING_FEAT_EXT_ARG) {
310 struct io_uring_getevents_arg arg = {
311 .sigmask = (unsigned long) sigmask,
312 .sigmask_sz = _NSIG / 8,
313 .ts = (unsigned long) ts
314 };
315 struct get_data data = {
316 .submit = __io_uring_flush_sq(ring),
317 .wait_nr = wait_nr,
318 .get_flags = IORING_ENTER_EXT_ARG,
319 .sz = sizeof(arg),
320 .arg = &arg
321 };
322
323 return _io_uring_get_cqe(ring, cqe_ptr, &data);
324 }
325 to_submit = __io_uring_submit_timeout(ring, wait_nr, ts);
326 if (to_submit < 0)
327 return to_submit;
328 } else
329 to_submit = __io_uring_flush_sq(ring);
330
331 return __io_uring_get_cqe(ring, cqe_ptr, to_submit, wait_nr, sigmask);
332 }
333
334 /*
335 * See io_uring_wait_cqes() - this function is the same, it just always uses
336 * '1' as the wait_nr.
337 */
io_uring_wait_cqe_timeout(struct io_uring * ring,struct io_uring_cqe ** cqe_ptr,struct __kernel_timespec * ts)338 int io_uring_wait_cqe_timeout(struct io_uring *ring,
339 struct io_uring_cqe **cqe_ptr,
340 struct __kernel_timespec *ts)
341 {
342 return io_uring_wait_cqes(ring, cqe_ptr, 1, ts, NULL);
343 }
344
345 /*
346 * Submit sqes acquired from io_uring_get_sqe() to the kernel.
347 *
348 * Returns number of sqes submitted
349 */
__io_uring_submit(struct io_uring * ring,unsigned submitted,unsigned wait_nr)350 static int __io_uring_submit(struct io_uring *ring, unsigned submitted,
351 unsigned wait_nr)
352 {
353 unsigned flags;
354 int ret;
355
356 flags = 0;
357 if (sq_ring_needs_enter(ring, &flags) || wait_nr) {
358 if (wait_nr || (ring->flags & IORING_SETUP_IOPOLL))
359 flags |= IORING_ENTER_GETEVENTS;
360 if (ring->int_flags & INT_FLAG_REG_RING)
361 flags |= IORING_ENTER_REGISTERED_RING;
362
363 ret = ____sys_io_uring_enter(ring->enter_ring_fd, submitted,
364 wait_nr, flags, NULL);
365 } else
366 ret = submitted;
367
368 return ret;
369 }
370
__io_uring_submit_and_wait(struct io_uring * ring,unsigned wait_nr)371 static int __io_uring_submit_and_wait(struct io_uring *ring, unsigned wait_nr)
372 {
373 return __io_uring_submit(ring, __io_uring_flush_sq(ring), wait_nr);
374 }
375
376 /*
377 * Submit sqes acquired from io_uring_get_sqe() to the kernel.
378 *
379 * Returns number of sqes submitted
380 */
io_uring_submit(struct io_uring * ring)381 int io_uring_submit(struct io_uring *ring)
382 {
383 return __io_uring_submit_and_wait(ring, 0);
384 }
385
386 /*
387 * Like io_uring_submit(), but allows waiting for events as well.
388 *
389 * Returns number of sqes submitted
390 */
io_uring_submit_and_wait(struct io_uring * ring,unsigned wait_nr)391 int io_uring_submit_and_wait(struct io_uring *ring, unsigned wait_nr)
392 {
393 return __io_uring_submit_and_wait(ring, wait_nr);
394 }
395
396 #ifdef LIBURING_INTERNAL
io_uring_get_sqe(struct io_uring * ring)397 struct io_uring_sqe *io_uring_get_sqe(struct io_uring *ring)
398 {
399 return _io_uring_get_sqe(ring);
400 }
401 #endif
402
__io_uring_sqring_wait(struct io_uring * ring)403 int __io_uring_sqring_wait(struct io_uring *ring)
404 {
405 int flags = IORING_ENTER_SQ_WAIT;
406
407 if (ring->int_flags & INT_FLAG_REG_RING)
408 flags |= IORING_ENTER_REGISTERED_RING;
409
410 return ____sys_io_uring_enter(ring->enter_ring_fd, 0, 0, flags, NULL);
411 }
412