1 /* SPDX-License-Identifier: MIT */
2 /*
3 * Simple test case using the socket op
4 */
5 #include <errno.h>
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <string.h>
9 #include <unistd.h>
10 #include <arpa/inet.h>
11 #include <sys/types.h>
12 #include <sys/socket.h>
13 #include <pthread.h>
14
15 #include "liburing.h"
16 #include "helpers.h"
17
18 static char str[] = "This is a test of send and recv over io_uring!";
19
20 #define MAX_MSG 128
21
22 #define PORT 10202
23 #define HOST "127.0.0.1"
24
25 static int no_socket;
26
recv_prep(struct io_uring * ring,struct iovec * iov,int * sock,int registerfiles)27 static int recv_prep(struct io_uring *ring, struct iovec *iov, int *sock,
28 int registerfiles)
29 {
30 struct sockaddr_in saddr;
31 struct io_uring_sqe *sqe;
32 int sockfd, ret, val, use_fd;
33
34 memset(&saddr, 0, sizeof(saddr));
35 saddr.sin_family = AF_INET;
36 saddr.sin_addr.s_addr = htonl(INADDR_ANY);
37 saddr.sin_port = htons(PORT);
38
39 sockfd = socket(AF_INET, SOCK_DGRAM, 0);
40 if (sockfd < 0) {
41 perror("socket");
42 return 1;
43 }
44
45 val = 1;
46 setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
47
48 ret = bind(sockfd, (struct sockaddr *)&saddr, sizeof(saddr));
49 if (ret < 0) {
50 perror("bind");
51 goto err;
52 }
53
54 if (registerfiles) {
55 ret = io_uring_register_files(ring, &sockfd, 1);
56 if (ret) {
57 fprintf(stderr, "file reg failed\n");
58 goto err;
59 }
60 use_fd = 0;
61 } else {
62 use_fd = sockfd;
63 }
64
65 sqe = io_uring_get_sqe(ring);
66 io_uring_prep_recv(sqe, use_fd, iov->iov_base, iov->iov_len, 0);
67 if (registerfiles)
68 sqe->flags |= IOSQE_FIXED_FILE;
69 sqe->user_data = 2;
70
71 ret = io_uring_submit(ring);
72 if (ret <= 0) {
73 fprintf(stderr, "submit failed: %d\n", ret);
74 goto err;
75 }
76
77 *sock = sockfd;
78 return 0;
79 err:
80 close(sockfd);
81 return 1;
82 }
83
do_recv(struct io_uring * ring,struct iovec * iov)84 static int do_recv(struct io_uring *ring, struct iovec *iov)
85 {
86 struct io_uring_cqe *cqe;
87 int ret;
88
89 ret = io_uring_wait_cqe(ring, &cqe);
90 if (ret) {
91 fprintf(stdout, "wait_cqe: %d\n", ret);
92 goto err;
93 }
94 if (cqe->res == -EINVAL) {
95 fprintf(stdout, "recv not supported, skipping\n");
96 return 0;
97 }
98 if (cqe->res < 0) {
99 fprintf(stderr, "failed cqe: %d\n", cqe->res);
100 goto err;
101 }
102
103 if (cqe->res -1 != strlen(str)) {
104 fprintf(stderr, "got wrong length: %d/%d\n", cqe->res,
105 (int) strlen(str) + 1);
106 goto err;
107 }
108
109 if (strcmp(str, iov->iov_base)) {
110 fprintf(stderr, "string mismatch\n");
111 goto err;
112 }
113
114 return 0;
115 err:
116 return 1;
117 }
118
119 struct recv_data {
120 pthread_mutex_t mutex;
121 int use_sqthread;
122 int registerfiles;
123 };
124
recv_fn(void * data)125 static void *recv_fn(void *data)
126 {
127 struct recv_data *rd = data;
128 char buf[MAX_MSG + 1];
129 struct iovec iov = {
130 .iov_base = buf,
131 .iov_len = sizeof(buf) - 1,
132 };
133 struct io_uring_params p = { };
134 struct io_uring ring;
135 int ret, sock;
136
137 if (rd->use_sqthread)
138 p.flags = IORING_SETUP_SQPOLL;
139 ret = t_create_ring_params(1, &ring, &p);
140 if (ret == T_SETUP_SKIP) {
141 pthread_mutex_unlock(&rd->mutex);
142 ret = 0;
143 goto err;
144 } else if (ret < 0) {
145 pthread_mutex_unlock(&rd->mutex);
146 goto err;
147 }
148
149 if (rd->use_sqthread && !rd->registerfiles) {
150 if (!(p.features & IORING_FEAT_SQPOLL_NONFIXED)) {
151 fprintf(stdout, "Non-registered SQPOLL not available, skipping\n");
152 pthread_mutex_unlock(&rd->mutex);
153 goto err;
154 }
155 }
156
157 ret = recv_prep(&ring, &iov, &sock, rd->registerfiles);
158 if (ret) {
159 fprintf(stderr, "recv_prep failed: %d\n", ret);
160 goto err;
161 }
162 pthread_mutex_unlock(&rd->mutex);
163 ret = do_recv(&ring, &iov);
164
165 close(sock);
166 io_uring_queue_exit(&ring);
167 err:
168 return (void *)(intptr_t)ret;
169 }
170
fallback_send(struct io_uring * ring,struct sockaddr_in * saddr)171 static int fallback_send(struct io_uring *ring, struct sockaddr_in *saddr)
172 {
173 struct iovec iov = {
174 .iov_base = str,
175 .iov_len = sizeof(str),
176 };
177 struct io_uring_cqe *cqe;
178 struct io_uring_sqe *sqe;
179 int sockfd, ret;
180
181 sockfd = socket(AF_INET, SOCK_DGRAM, 0);
182 if (sockfd < 0) {
183 perror("socket");
184 return 1;
185 }
186
187 ret = connect(sockfd, (struct sockaddr *)saddr, sizeof(*saddr));
188 if (ret < 0) {
189 perror("connect");
190 return 1;
191 }
192
193 sqe = io_uring_get_sqe(ring);
194 io_uring_prep_send(sqe, sockfd, iov.iov_base, iov.iov_len, 0);
195 sqe->user_data = 1;
196
197 ret = io_uring_submit(ring);
198 if (ret <= 0) {
199 fprintf(stderr, "submit failed: %d\n", ret);
200 goto err;
201 }
202
203 ret = io_uring_wait_cqe(ring, &cqe);
204 if (cqe->res == -EINVAL) {
205 fprintf(stdout, "send not supported, skipping\n");
206 close(sockfd);
207 return 0;
208 }
209 if (cqe->res != iov.iov_len) {
210 fprintf(stderr, "failed cqe: %d\n", cqe->res);
211 goto err;
212 }
213
214 close(sockfd);
215 return 0;
216 err:
217 close(sockfd);
218 return 1;
219 }
220
do_send(int socket_direct,int alloc)221 static int do_send(int socket_direct, int alloc)
222 {
223 struct sockaddr_in saddr;
224 struct iovec iov = {
225 .iov_base = str,
226 .iov_len = sizeof(str),
227 };
228 struct io_uring ring;
229 struct io_uring_cqe *cqe;
230 struct io_uring_sqe *sqe;
231 int sockfd, ret, fd = -1;
232
233 ret = io_uring_queue_init(1, &ring, 0);
234 if (ret) {
235 fprintf(stderr, "queue init failed: %d\n", ret);
236 return 1;
237 }
238
239 if (socket_direct) {
240 ret = io_uring_register_files(&ring, &fd, 1);
241 if (ret) {
242 fprintf(stderr, "file register %d\n", ret);
243 return 1;
244 }
245 }
246
247 memset(&saddr, 0, sizeof(saddr));
248 saddr.sin_family = AF_INET;
249 saddr.sin_port = htons(PORT);
250 inet_pton(AF_INET, HOST, &saddr.sin_addr);
251
252 sqe = io_uring_get_sqe(&ring);
253 if (socket_direct) {
254 unsigned file_index = 0;
255 if (alloc)
256 file_index = IORING_FILE_INDEX_ALLOC - 1;
257 io_uring_prep_socket_direct(sqe, AF_INET, SOCK_DGRAM, 0,
258 file_index, 0);
259 } else {
260 io_uring_prep_socket(sqe, AF_INET, SOCK_DGRAM, 0, 0);
261 }
262 ret = io_uring_submit(&ring);
263 if (ret != 1) {
264 fprintf(stderr, "socket submit: %d\n", ret);
265 return 1;
266 }
267 ret = io_uring_wait_cqe(&ring, &cqe);
268 if (ret) {
269 fprintf(stderr, "wait_cqe: %d\n", ret);
270 return 1;
271 }
272 if (cqe->res < 0) {
273 if (cqe->res == -EINVAL) {
274 fprintf(stdout, "No socket support, skipping\n");
275 no_socket = 1;
276 io_uring_cqe_seen(&ring, cqe);
277 return fallback_send(&ring, &saddr);
278 }
279
280 fprintf(stderr, "socket res: %d\n", ret);
281 return 1;
282 }
283
284 sockfd = cqe->res;
285 if (socket_direct && !alloc)
286 sockfd = 0;
287 io_uring_cqe_seen(&ring, cqe);
288
289 sqe = io_uring_get_sqe(&ring);
290 io_uring_prep_connect(sqe, sockfd, (struct sockaddr *) &saddr,
291 sizeof(saddr));
292 if (socket_direct)
293 sqe->flags |= IOSQE_FIXED_FILE;
294 ret = io_uring_submit(&ring);
295 if (ret != 1) {
296 fprintf(stderr, "connect submit: %d\n", ret);
297 return 1;
298 }
299 ret = io_uring_wait_cqe(&ring, &cqe);
300 if (ret) {
301 fprintf(stderr, "wait_cqe: %d\n", ret);
302 return 1;
303 }
304 if (cqe->res < 0) {
305 fprintf(stderr, "connect res: %d\n", cqe->res);
306 return 1;
307 }
308 io_uring_cqe_seen(&ring, cqe);
309
310 sqe = io_uring_get_sqe(&ring);
311 io_uring_prep_send(sqe, sockfd, iov.iov_base, iov.iov_len, 0);
312 sqe->user_data = 1;
313 if (socket_direct)
314 sqe->flags |= IOSQE_FIXED_FILE;
315
316 ret = io_uring_submit(&ring);
317 if (ret <= 0) {
318 fprintf(stderr, "submit failed: %d\n", ret);
319 goto err;
320 }
321
322 ret = io_uring_wait_cqe(&ring, &cqe);
323 if (cqe->res == -EINVAL) {
324 fprintf(stdout, "send not supported, skipping\n");
325 close(sockfd);
326 return 0;
327 }
328 if (cqe->res != iov.iov_len) {
329 fprintf(stderr, "failed cqe: %d\n", cqe->res);
330 goto err;
331 }
332
333 close(sockfd);
334 return 0;
335 err:
336 close(sockfd);
337 return 1;
338 }
339
test(int use_sqthread,int regfiles,int socket_direct,int alloc)340 static int test(int use_sqthread, int regfiles, int socket_direct, int alloc)
341 {
342 pthread_mutexattr_t attr;
343 pthread_t recv_thread;
344 struct recv_data rd;
345 int ret;
346 void *retval;
347
348 pthread_mutexattr_init(&attr);
349 pthread_mutexattr_setpshared(&attr, 1);
350 pthread_mutex_init(&rd.mutex, &attr);
351 pthread_mutex_lock(&rd.mutex);
352 rd.use_sqthread = use_sqthread;
353 rd.registerfiles = regfiles;
354
355 ret = pthread_create(&recv_thread, NULL, recv_fn, &rd);
356 if (ret) {
357 fprintf(stderr, "Thread create failed: %d\n", ret);
358 pthread_mutex_unlock(&rd.mutex);
359 return 1;
360 }
361
362 pthread_mutex_lock(&rd.mutex);
363 do_send(socket_direct, alloc);
364 pthread_join(recv_thread, &retval);
365 return (intptr_t)retval;
366 }
367
main(int argc,char * argv[])368 int main(int argc, char *argv[])
369 {
370 int ret;
371
372 if (argc > 1)
373 return 0;
374
375 ret = test(0, 0, 0, 0);
376 if (ret) {
377 fprintf(stderr, "test sqthread=0 failed\n");
378 return ret;
379 }
380 if (no_socket)
381 return 0;
382
383 ret = test(1, 1, 0, 0);
384 if (ret) {
385 fprintf(stderr, "test sqthread=1 reg=1 failed\n");
386 return ret;
387 }
388
389 ret = test(1, 0, 0, 0);
390 if (ret) {
391 fprintf(stderr, "test sqthread=1 reg=0 failed\n");
392 return ret;
393 }
394
395 ret = test(0, 0, 1, 0);
396 if (ret) {
397 fprintf(stderr, "test sqthread=0 direct=1 failed\n");
398 return ret;
399 }
400
401 ret = test(0, 0, 1, 1);
402 if (ret) {
403 fprintf(stderr, "test sqthread=0 direct=alloc failed\n");
404 return ret;
405 }
406
407 return 0;
408 }
409