1 /* SPDX-License-Identifier: MIT */
2 /*
3 * Test MSG_WAITALL with datagram sockets, with a send splice into two.
4 */
5 #include <errno.h>
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <string.h>
9 #include <unistd.h>
10 #include <arpa/inet.h>
11 #include <sys/types.h>
12 #include <sys/socket.h>
13 #include <pthread.h>
14
15 #include "liburing.h"
16 #include "helpers.h"
17
18 #define MAX_MSG 128
19
20 #define PORT 10201
21 #define HOST "127.0.0.1"
22
recv_prep(struct io_uring * ring,struct iovec * iov,int * sock,int use_recvmsg)23 static int recv_prep(struct io_uring *ring, struct iovec *iov, int *sock,
24 int use_recvmsg)
25 {
26 struct sockaddr_in saddr;
27 struct io_uring_sqe *sqe;
28 int sockfd, ret, val;
29 struct msghdr msg = { };
30
31 memset(&saddr, 0, sizeof(saddr));
32 saddr.sin_family = AF_INET;
33 saddr.sin_addr.s_addr = htonl(INADDR_ANY);
34 saddr.sin_port = htons(PORT);
35
36 sockfd = socket(AF_INET, SOCK_DGRAM, 0);
37 if (sockfd < 0) {
38 perror("socket");
39 return 1;
40 }
41
42 val = 1;
43 setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
44
45 ret = bind(sockfd, (struct sockaddr *)&saddr, sizeof(saddr));
46 if (ret < 0) {
47 perror("bind");
48 goto err;
49 }
50
51 sqe = io_uring_get_sqe(ring);
52 if (!use_recvmsg) {
53 io_uring_prep_recv(sqe, sockfd, iov->iov_base, iov->iov_len,
54 MSG_WAITALL);
55 } else {
56 msg.msg_namelen = sizeof(struct sockaddr_in);
57 msg.msg_iov = iov;
58 msg.msg_iovlen = 1;
59 io_uring_prep_recvmsg(sqe, sockfd, &msg, MSG_WAITALL);
60 }
61
62 sqe->user_data = 2;
63
64 ret = io_uring_submit(ring);
65 if (ret <= 0) {
66 fprintf(stderr, "submit failed: %d\n", ret);
67 goto err;
68 }
69
70 *sock = sockfd;
71 return 0;
72 err:
73 close(sockfd);
74 return 1;
75 }
76
do_recv(struct io_uring * ring)77 static int do_recv(struct io_uring *ring)
78 {
79 struct io_uring_cqe *cqe;
80 int ret;
81
82 ret = io_uring_wait_cqe(ring, &cqe);
83 if (ret) {
84 fprintf(stdout, "wait_cqe: %d\n", ret);
85 goto err;
86 }
87 if (cqe->res == -EINVAL) {
88 fprintf(stdout, "recv not supported, skipping\n");
89 return 0;
90 }
91 if (cqe->res < 0) {
92 fprintf(stderr, "failed cqe: %d\n", cqe->res);
93 goto err;
94 }
95 if (cqe->res != MAX_MSG * sizeof(int) / 2) {
96 fprintf(stderr, "got wrong length: %d\n", cqe->res);
97 goto err;
98 }
99
100 io_uring_cqe_seen(ring, cqe);
101 return 0;
102 err:
103 return 1;
104 }
105
106 struct recv_data {
107 pthread_mutex_t mutex;
108 int use_recvmsg;
109 };
110
recv_fn(void * data)111 static void *recv_fn(void *data)
112 {
113 struct recv_data *rd = data;
114 int buf[MAX_MSG];
115 struct iovec iov = {
116 .iov_base = buf,
117 .iov_len = sizeof(buf),
118 };
119 struct io_uring_params p = { };
120 struct io_uring ring;
121 int ret, sock;
122
123 ret = t_create_ring_params(1, &ring, &p);
124 if (ret == T_SETUP_SKIP) {
125 pthread_mutex_unlock(&rd->mutex);
126 ret = 0;
127 goto err;
128 } else if (ret < 0) {
129 pthread_mutex_unlock(&rd->mutex);
130 goto err;
131 }
132
133 ret = recv_prep(&ring, &iov, &sock, rd->use_recvmsg);
134 if (ret) {
135 fprintf(stderr, "recv_prep failed: %d\n", ret);
136 goto err;
137 }
138 pthread_mutex_unlock(&rd->mutex);
139 ret = do_recv(&ring);
140 close(sock);
141 io_uring_queue_exit(&ring);
142 err:
143 return (void *)(intptr_t)ret;
144 }
145
do_send(void)146 static int do_send(void)
147 {
148 struct sockaddr_in saddr;
149 struct io_uring ring;
150 struct io_uring_cqe *cqe;
151 struct io_uring_sqe *sqe;
152 int sockfd, ret, i;
153 struct iovec iov;
154 int *buf;
155
156 ret = io_uring_queue_init(2, &ring, 0);
157 if (ret) {
158 fprintf(stderr, "queue init failed: %d\n", ret);
159 return 1;
160 }
161
162 buf = malloc(MAX_MSG * sizeof(int));
163 for (i = 0; i < MAX_MSG; i++)
164 buf[i] = i;
165
166 memset(&saddr, 0, sizeof(saddr));
167 saddr.sin_family = AF_INET;
168 saddr.sin_port = htons(PORT);
169 inet_pton(AF_INET, HOST, &saddr.sin_addr);
170
171 sockfd = socket(AF_INET, SOCK_DGRAM, 0);
172 if (sockfd < 0) {
173 perror("socket");
174 return 1;
175 }
176
177 ret = connect(sockfd, (struct sockaddr *)&saddr, sizeof(saddr));
178 if (ret < 0) {
179 perror("connect");
180 return 1;
181 }
182
183 iov.iov_base = buf;
184 iov.iov_len = MAX_MSG * sizeof(int) / 2;
185 for (i = 0; i < 2; i++) {
186 sqe = io_uring_get_sqe(&ring);
187 io_uring_prep_send(sqe, sockfd, iov.iov_base, iov.iov_len, 0);
188 sqe->user_data = 1;
189
190 ret = io_uring_submit(&ring);
191 if (ret <= 0) {
192 fprintf(stderr, "submit failed: %d\n", ret);
193 goto err;
194 }
195 usleep(10000);
196 iov.iov_base += iov.iov_len;
197 }
198
199 for (i = 0; i < 2; i++) {
200 ret = io_uring_wait_cqe(&ring, &cqe);
201 if (cqe->res == -EINVAL) {
202 fprintf(stdout, "send not supported, skipping\n");
203 close(sockfd);
204 return 0;
205 }
206 if (cqe->res != iov.iov_len) {
207 fprintf(stderr, "failed cqe: %d\n", cqe->res);
208 goto err;
209 }
210 io_uring_cqe_seen(&ring, cqe);
211 }
212
213 close(sockfd);
214 return 0;
215 err:
216 close(sockfd);
217 return 1;
218 }
219
test(int use_recvmsg)220 static int test(int use_recvmsg)
221 {
222 pthread_mutexattr_t attr;
223 pthread_t recv_thread;
224 struct recv_data rd;
225 int ret;
226 void *retval;
227
228 pthread_mutexattr_init(&attr);
229 pthread_mutexattr_setpshared(&attr, 1);
230 pthread_mutex_init(&rd.mutex, &attr);
231 pthread_mutex_lock(&rd.mutex);
232 rd.use_recvmsg = use_recvmsg;
233
234 ret = pthread_create(&recv_thread, NULL, recv_fn, &rd);
235 if (ret) {
236 fprintf(stderr, "Thread create failed: %d\n", ret);
237 pthread_mutex_unlock(&rd.mutex);
238 return 1;
239 }
240
241 pthread_mutex_lock(&rd.mutex);
242 do_send();
243 pthread_join(recv_thread, &retval);
244 return (intptr_t)retval;
245 }
246
main(int argc,char * argv[])247 int main(int argc, char *argv[])
248 {
249 int ret;
250
251 if (argc > 1)
252 return 0;
253
254 ret = test(0);
255 if (ret) {
256 fprintf(stderr, "test recv failed\n");
257 return ret;
258 }
259
260 ret = test(1);
261 if (ret) {
262 fprintf(stderr, "test recvmsg failed\n");
263 return ret;
264 }
265
266 return 0;
267 }
268