xref: /aosp_15_r20/external/liburing/src/queue.c (revision 25da2bea747f3a93b4c30fd9708b0618ef55a0e6)
1 /* SPDX-License-Identifier: MIT */
2 #define _POSIX_C_SOURCE 200112L
3 
4 #include "lib.h"
5 #include "syscall.h"
6 #include "liburing.h"
7 #include "int_flags.h"
8 #include "liburing/compat.h"
9 #include "liburing/io_uring.h"
10 
11 /*
12  * Returns true if we're not using SQ thread (thus nobody submits but us)
13  * or if IORING_SQ_NEED_WAKEUP is set, so submit thread must be explicitly
14  * awakened. For the latter case, we set the thread wakeup flag.
15  */
sq_ring_needs_enter(struct io_uring * ring,unsigned * flags)16 static inline bool sq_ring_needs_enter(struct io_uring *ring, unsigned *flags)
17 {
18 	if (!(ring->flags & IORING_SETUP_SQPOLL))
19 		return true;
20 
21 	/*
22 	 * Ensure the kernel can see the store to the SQ tail before we read
23 	 * the flags.
24 	 */
25 	io_uring_smp_mb();
26 
27 	if (uring_unlikely(IO_URING_READ_ONCE(*ring->sq.kflags) &
28 			   IORING_SQ_NEED_WAKEUP)) {
29 		*flags |= IORING_ENTER_SQ_WAKEUP;
30 		return true;
31 	}
32 
33 	return false;
34 }
35 
cq_ring_needs_flush(struct io_uring * ring)36 static inline bool cq_ring_needs_flush(struct io_uring *ring)
37 {
38 	return IO_URING_READ_ONCE(*ring->sq.kflags) &
39 				 (IORING_SQ_CQ_OVERFLOW | IORING_SQ_TASKRUN);
40 }
41 
cq_ring_needs_enter(struct io_uring * ring)42 static inline bool cq_ring_needs_enter(struct io_uring *ring)
43 {
44 	return (ring->flags & IORING_SETUP_IOPOLL) || cq_ring_needs_flush(ring);
45 }
46 
47 struct get_data {
48 	unsigned submit;
49 	unsigned wait_nr;
50 	unsigned get_flags;
51 	int sz;
52 	void *arg;
53 };
54 
_io_uring_get_cqe(struct io_uring * ring,struct io_uring_cqe ** cqe_ptr,struct get_data * data)55 static int _io_uring_get_cqe(struct io_uring *ring,
56 			     struct io_uring_cqe **cqe_ptr,
57 			     struct get_data *data)
58 {
59 	struct io_uring_cqe *cqe = NULL;
60 	bool looped = false;
61 	int err;
62 
63 	do {
64 		bool need_enter = false;
65 		unsigned flags = 0;
66 		unsigned nr_available;
67 		int ret;
68 
69 		err = __io_uring_peek_cqe(ring, &cqe, &nr_available);
70 		if (err)
71 			break;
72 		if (!cqe && !data->wait_nr && !data->submit) {
73 			/*
74 			 * If we already looped once, we already entererd
75 			 * the kernel. Since there's nothing to submit or
76 			 * wait for, don't keep retrying.
77 			 */
78 			if (looped || !cq_ring_needs_enter(ring)) {
79 				err = -EAGAIN;
80 				break;
81 			}
82 			need_enter = true;
83 		}
84 		if (data->wait_nr > nr_available || need_enter) {
85 			flags = IORING_ENTER_GETEVENTS | data->get_flags;
86 			need_enter = true;
87 		}
88 		if (data->submit && sq_ring_needs_enter(ring, &flags))
89 			need_enter = true;
90 		if (!need_enter)
91 			break;
92 
93 		if (ring->int_flags & INT_FLAG_REG_RING)
94 			flags |= IORING_ENTER_REGISTERED_RING;
95 		ret = ____sys_io_uring_enter2(ring->enter_ring_fd, data->submit,
96 					      data->wait_nr, flags, data->arg,
97 					      data->sz);
98 		if (ret < 0) {
99 			err = ret;
100 			break;
101 		}
102 
103 		data->submit -= ret;
104 		if (cqe)
105 			break;
106 		looped = true;
107 	} while (1);
108 
109 	*cqe_ptr = cqe;
110 	return err;
111 }
112 
__io_uring_get_cqe(struct io_uring * ring,struct io_uring_cqe ** cqe_ptr,unsigned submit,unsigned wait_nr,sigset_t * sigmask)113 int __io_uring_get_cqe(struct io_uring *ring, struct io_uring_cqe **cqe_ptr,
114 		       unsigned submit, unsigned wait_nr, sigset_t *sigmask)
115 {
116 	struct get_data data = {
117 		.submit		= submit,
118 		.wait_nr 	= wait_nr,
119 		.get_flags	= 0,
120 		.sz		= _NSIG / 8,
121 		.arg		= sigmask,
122 	};
123 
124 	return _io_uring_get_cqe(ring, cqe_ptr, &data);
125 }
126 
127 /*
128  * Fill in an array of IO completions up to count, if any are available.
129  * Returns the amount of IO completions filled.
130  */
io_uring_peek_batch_cqe(struct io_uring * ring,struct io_uring_cqe ** cqes,unsigned count)131 unsigned io_uring_peek_batch_cqe(struct io_uring *ring,
132 				 struct io_uring_cqe **cqes, unsigned count)
133 {
134 	unsigned ready;
135 	bool overflow_checked = false;
136 	int shift = 0;
137 
138 	if (ring->flags & IORING_SETUP_CQE32)
139 		shift = 1;
140 
141 again:
142 	ready = io_uring_cq_ready(ring);
143 	if (ready) {
144 		unsigned head = *ring->cq.khead;
145 		unsigned mask = *ring->cq.kring_mask;
146 		unsigned last;
147 		int i = 0;
148 
149 		count = count > ready ? ready : count;
150 		last = head + count;
151 		for (;head != last; head++, i++)
152 			cqes[i] = &ring->cq.cqes[(head & mask) << shift];
153 
154 		return count;
155 	}
156 
157 	if (overflow_checked)
158 		goto done;
159 
160 	if (cq_ring_needs_flush(ring)) {
161 		int flags = IORING_ENTER_GETEVENTS;
162 
163 		if (ring->int_flags & INT_FLAG_REG_RING)
164 			flags |= IORING_ENTER_REGISTERED_RING;
165 		____sys_io_uring_enter(ring->enter_ring_fd, 0, 0, flags, NULL);
166 		overflow_checked = true;
167 		goto again;
168 	}
169 
170 done:
171 	return 0;
172 }
173 
174 /*
175  * Sync internal state with kernel ring state on the SQ side. Returns the
176  * number of pending items in the SQ ring, for the shared ring.
177  */
__io_uring_flush_sq(struct io_uring * ring)178 int __io_uring_flush_sq(struct io_uring *ring)
179 {
180 	struct io_uring_sq *sq = &ring->sq;
181 	const unsigned mask = *sq->kring_mask;
182 	unsigned ktail = *sq->ktail;
183 	unsigned to_submit = sq->sqe_tail - sq->sqe_head;
184 
185 	if (!to_submit)
186 		goto out;
187 
188 	/*
189 	 * Fill in sqes that we have queued up, adding them to the kernel ring
190 	 */
191 	do {
192 		sq->array[ktail & mask] = sq->sqe_head & mask;
193 		ktail++;
194 		sq->sqe_head++;
195 	} while (--to_submit);
196 
197 	/*
198 	 * Ensure that the kernel sees the SQE updates before it sees the tail
199 	 * update.
200 	 */
201 	io_uring_smp_store_release(sq->ktail, ktail);
202 out:
203 	/*
204 	 * This _may_ look problematic, as we're not supposed to be reading
205 	 * SQ->head without acquire semantics. When we're in SQPOLL mode, the
206 	 * kernel submitter could be updating this right now. For non-SQPOLL,
207 	 * task itself does it, and there's no potential race. But even for
208 	 * SQPOLL, the load is going to be potentially out-of-date the very
209 	 * instant it's done, regardless or whether or not it's done
210 	 * atomically. Worst case, we're going to be over-estimating what
211 	 * we can submit. The point is, we need to be able to deal with this
212 	 * situation regardless of any perceived atomicity.
213 	 */
214 	return ktail - *sq->khead;
215 }
216 
217 /*
218  * If we have kernel support for IORING_ENTER_EXT_ARG, then we can use that
219  * more efficiently than queueing an internal timeout command.
220  */
io_uring_wait_cqes_new(struct io_uring * ring,struct io_uring_cqe ** cqe_ptr,unsigned wait_nr,struct __kernel_timespec * ts,sigset_t * sigmask)221 static int io_uring_wait_cqes_new(struct io_uring *ring,
222 				  struct io_uring_cqe **cqe_ptr,
223 				  unsigned wait_nr,
224 				  struct __kernel_timespec *ts,
225 				  sigset_t *sigmask)
226 {
227 	struct io_uring_getevents_arg arg = {
228 		.sigmask	= (unsigned long) sigmask,
229 		.sigmask_sz	= _NSIG / 8,
230 		.ts		= (unsigned long) ts
231 	};
232 	struct get_data data = {
233 		.wait_nr	= wait_nr,
234 		.get_flags	= IORING_ENTER_EXT_ARG,
235 		.sz		= sizeof(arg),
236 		.arg		= &arg
237 	};
238 
239 	return _io_uring_get_cqe(ring, cqe_ptr, &data);
240 }
241 
242 /*
243  * Like io_uring_wait_cqe(), except it accepts a timeout value as well. Note
244  * that an sqe is used internally to handle the timeout. For kernel doesn't
245  * support IORING_FEAT_EXT_ARG, applications using this function must never
246  * set sqe->user_data to LIBURING_UDATA_TIMEOUT!
247  *
248  * For kernels without IORING_FEAT_EXT_ARG (5.10 and older), if 'ts' is
249  * specified, the application need not call io_uring_submit() before
250  * calling this function, as we will do that on its behalf. From this it also
251  * follows that this function isn't safe to use for applications that split SQ
252  * and CQ handling between two threads and expect that to work without
253  * synchronization, as this function manipulates both the SQ and CQ side.
254  *
255  * For kernels with IORING_FEAT_EXT_ARG, no implicit submission is done and
256  * hence this function is safe to use for applications that split SQ and CQ
257  * handling between two threads.
258  */
__io_uring_submit_timeout(struct io_uring * ring,unsigned wait_nr,struct __kernel_timespec * ts)259 static int __io_uring_submit_timeout(struct io_uring *ring, unsigned wait_nr,
260 				     struct __kernel_timespec *ts)
261 {
262 	struct io_uring_sqe *sqe;
263 	int ret;
264 
265 	/*
266 	 * If the SQ ring is full, we may need to submit IO first
267 	 */
268 	sqe = io_uring_get_sqe(ring);
269 	if (!sqe) {
270 		ret = io_uring_submit(ring);
271 		if (ret < 0)
272 			return ret;
273 		sqe = io_uring_get_sqe(ring);
274 		if (!sqe)
275 			return -EAGAIN;
276 	}
277 	io_uring_prep_timeout(sqe, ts, wait_nr, 0);
278 	sqe->user_data = LIBURING_UDATA_TIMEOUT;
279 	return __io_uring_flush_sq(ring);
280 }
281 
io_uring_wait_cqes(struct io_uring * ring,struct io_uring_cqe ** cqe_ptr,unsigned wait_nr,struct __kernel_timespec * ts,sigset_t * sigmask)282 int io_uring_wait_cqes(struct io_uring *ring, struct io_uring_cqe **cqe_ptr,
283 		       unsigned wait_nr, struct __kernel_timespec *ts,
284 		       sigset_t *sigmask)
285 {
286 	int to_submit = 0;
287 
288 	if (ts) {
289 		if (ring->features & IORING_FEAT_EXT_ARG)
290 			return io_uring_wait_cqes_new(ring, cqe_ptr, wait_nr,
291 							ts, sigmask);
292 		to_submit = __io_uring_submit_timeout(ring, wait_nr, ts);
293 		if (to_submit < 0)
294 			return to_submit;
295 	}
296 
297 	return __io_uring_get_cqe(ring, cqe_ptr, to_submit, wait_nr, sigmask);
298 }
299 
io_uring_submit_and_wait_timeout(struct io_uring * ring,struct io_uring_cqe ** cqe_ptr,unsigned wait_nr,struct __kernel_timespec * ts,sigset_t * sigmask)300 int io_uring_submit_and_wait_timeout(struct io_uring *ring,
301 				     struct io_uring_cqe **cqe_ptr,
302 				     unsigned wait_nr,
303 				     struct __kernel_timespec *ts,
304 				     sigset_t *sigmask)
305 {
306 	int to_submit;
307 
308 	if (ts) {
309 		if (ring->features & IORING_FEAT_EXT_ARG) {
310 			struct io_uring_getevents_arg arg = {
311 				.sigmask	= (unsigned long) sigmask,
312 				.sigmask_sz	= _NSIG / 8,
313 				.ts		= (unsigned long) ts
314 			};
315 			struct get_data data = {
316 				.submit		= __io_uring_flush_sq(ring),
317 				.wait_nr	= wait_nr,
318 				.get_flags	= IORING_ENTER_EXT_ARG,
319 				.sz		= sizeof(arg),
320 				.arg		= &arg
321 			};
322 
323 			return _io_uring_get_cqe(ring, cqe_ptr, &data);
324 		}
325 		to_submit = __io_uring_submit_timeout(ring, wait_nr, ts);
326 		if (to_submit < 0)
327 			return to_submit;
328 	} else
329 		to_submit = __io_uring_flush_sq(ring);
330 
331 	return __io_uring_get_cqe(ring, cqe_ptr, to_submit, wait_nr, sigmask);
332 }
333 
334 /*
335  * See io_uring_wait_cqes() - this function is the same, it just always uses
336  * '1' as the wait_nr.
337  */
io_uring_wait_cqe_timeout(struct io_uring * ring,struct io_uring_cqe ** cqe_ptr,struct __kernel_timespec * ts)338 int io_uring_wait_cqe_timeout(struct io_uring *ring,
339 			      struct io_uring_cqe **cqe_ptr,
340 			      struct __kernel_timespec *ts)
341 {
342 	return io_uring_wait_cqes(ring, cqe_ptr, 1, ts, NULL);
343 }
344 
345 /*
346  * Submit sqes acquired from io_uring_get_sqe() to the kernel.
347  *
348  * Returns number of sqes submitted
349  */
__io_uring_submit(struct io_uring * ring,unsigned submitted,unsigned wait_nr)350 static int __io_uring_submit(struct io_uring *ring, unsigned submitted,
351 			     unsigned wait_nr)
352 {
353 	unsigned flags;
354 	int ret;
355 
356 	flags = 0;
357 	if (sq_ring_needs_enter(ring, &flags) || wait_nr) {
358 		if (wait_nr || (ring->flags & IORING_SETUP_IOPOLL))
359 			flags |= IORING_ENTER_GETEVENTS;
360 		if (ring->int_flags & INT_FLAG_REG_RING)
361 			flags |= IORING_ENTER_REGISTERED_RING;
362 
363 		ret = ____sys_io_uring_enter(ring->enter_ring_fd, submitted,
364 						wait_nr, flags, NULL);
365 	} else
366 		ret = submitted;
367 
368 	return ret;
369 }
370 
__io_uring_submit_and_wait(struct io_uring * ring,unsigned wait_nr)371 static int __io_uring_submit_and_wait(struct io_uring *ring, unsigned wait_nr)
372 {
373 	return __io_uring_submit(ring, __io_uring_flush_sq(ring), wait_nr);
374 }
375 
376 /*
377  * Submit sqes acquired from io_uring_get_sqe() to the kernel.
378  *
379  * Returns number of sqes submitted
380  */
io_uring_submit(struct io_uring * ring)381 int io_uring_submit(struct io_uring *ring)
382 {
383 	return __io_uring_submit_and_wait(ring, 0);
384 }
385 
386 /*
387  * Like io_uring_submit(), but allows waiting for events as well.
388  *
389  * Returns number of sqes submitted
390  */
io_uring_submit_and_wait(struct io_uring * ring,unsigned wait_nr)391 int io_uring_submit_and_wait(struct io_uring *ring, unsigned wait_nr)
392 {
393 	return __io_uring_submit_and_wait(ring, wait_nr);
394 }
395 
396 #ifdef LIBURING_INTERNAL
io_uring_get_sqe(struct io_uring * ring)397 struct io_uring_sqe *io_uring_get_sqe(struct io_uring *ring)
398 {
399 	return _io_uring_get_sqe(ring);
400 }
401 #endif
402 
__io_uring_sqring_wait(struct io_uring * ring)403 int __io_uring_sqring_wait(struct io_uring *ring)
404 {
405 	int flags = IORING_ENTER_SQ_WAIT;
406 
407 	if (ring->int_flags & INT_FLAG_REG_RING)
408 		flags |= IORING_ENTER_REGISTERED_RING;
409 
410 	return  ____sys_io_uring_enter(ring->enter_ring_fd, 0, 0, flags, NULL);
411 }
412