xref: /aosp_15_r20/external/liburing/test/recv-msgall.c (revision 25da2bea747f3a93b4c30fd9708b0618ef55a0e6)
1 /* SPDX-License-Identifier: MIT */
2 /*
3  * Test MSG_WAITALL with datagram sockets, with a send splice into two.
4  */
5 #include <errno.h>
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <string.h>
9 #include <unistd.h>
10 #include <arpa/inet.h>
11 #include <sys/types.h>
12 #include <sys/socket.h>
13 #include <pthread.h>
14 
15 #include "liburing.h"
16 #include "helpers.h"
17 
18 #define MAX_MSG	128
19 
20 #define PORT	10201
21 #define HOST	"127.0.0.1"
22 
recv_prep(struct io_uring * ring,struct iovec * iov,int * sock,int use_recvmsg)23 static int recv_prep(struct io_uring *ring, struct iovec *iov, int *sock,
24 		     int use_recvmsg)
25 {
26 	struct sockaddr_in saddr;
27 	struct io_uring_sqe *sqe;
28 	int sockfd, ret, val;
29 	struct msghdr msg = { };
30 
31 	memset(&saddr, 0, sizeof(saddr));
32 	saddr.sin_family = AF_INET;
33 	saddr.sin_addr.s_addr = htonl(INADDR_ANY);
34 	saddr.sin_port = htons(PORT);
35 
36 	sockfd = socket(AF_INET, SOCK_DGRAM, 0);
37 	if (sockfd < 0) {
38 		perror("socket");
39 		return 1;
40 	}
41 
42 	val = 1;
43 	setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
44 
45 	ret = bind(sockfd, (struct sockaddr *)&saddr, sizeof(saddr));
46 	if (ret < 0) {
47 		perror("bind");
48 		goto err;
49 	}
50 
51 	sqe = io_uring_get_sqe(ring);
52 	if (!use_recvmsg) {
53 		io_uring_prep_recv(sqe, sockfd, iov->iov_base, iov->iov_len,
54 					MSG_WAITALL);
55 	} else {
56 		msg.msg_namelen = sizeof(struct sockaddr_in);
57 		msg.msg_iov = iov;
58 		msg.msg_iovlen = 1;
59 		io_uring_prep_recvmsg(sqe, sockfd, &msg, MSG_WAITALL);
60 	}
61 
62 	sqe->user_data = 2;
63 
64 	ret = io_uring_submit(ring);
65 	if (ret <= 0) {
66 		fprintf(stderr, "submit failed: %d\n", ret);
67 		goto err;
68 	}
69 
70 	*sock = sockfd;
71 	return 0;
72 err:
73 	close(sockfd);
74 	return 1;
75 }
76 
do_recv(struct io_uring * ring)77 static int do_recv(struct io_uring *ring)
78 {
79 	struct io_uring_cqe *cqe;
80 	int ret;
81 
82 	ret = io_uring_wait_cqe(ring, &cqe);
83 	if (ret) {
84 		fprintf(stdout, "wait_cqe: %d\n", ret);
85 		goto err;
86 	}
87 	if (cqe->res == -EINVAL) {
88 		fprintf(stdout, "recv not supported, skipping\n");
89 		return 0;
90 	}
91 	if (cqe->res < 0) {
92 		fprintf(stderr, "failed cqe: %d\n", cqe->res);
93 		goto err;
94 	}
95 	if (cqe->res != MAX_MSG * sizeof(int) / 2) {
96 		fprintf(stderr, "got wrong length: %d\n", cqe->res);
97 		goto err;
98 	}
99 
100 	io_uring_cqe_seen(ring, cqe);
101 	return 0;
102 err:
103 	return 1;
104 }
105 
106 struct recv_data {
107 	pthread_mutex_t mutex;
108 	int use_recvmsg;
109 };
110 
recv_fn(void * data)111 static void *recv_fn(void *data)
112 {
113 	struct recv_data *rd = data;
114 	int buf[MAX_MSG];
115 	struct iovec iov = {
116 		.iov_base = buf,
117 		.iov_len = sizeof(buf),
118 	};
119 	struct io_uring_params p = { };
120 	struct io_uring ring;
121 	int ret, sock;
122 
123 	ret = t_create_ring_params(1, &ring, &p);
124 	if (ret == T_SETUP_SKIP) {
125 		pthread_mutex_unlock(&rd->mutex);
126 		ret = 0;
127 		goto err;
128 	} else if (ret < 0) {
129 		pthread_mutex_unlock(&rd->mutex);
130 		goto err;
131 	}
132 
133 	ret = recv_prep(&ring, &iov, &sock, rd->use_recvmsg);
134 	if (ret) {
135 		fprintf(stderr, "recv_prep failed: %d\n", ret);
136 		goto err;
137 	}
138 	pthread_mutex_unlock(&rd->mutex);
139 	ret = do_recv(&ring);
140 	close(sock);
141 	io_uring_queue_exit(&ring);
142 err:
143 	return (void *)(intptr_t)ret;
144 }
145 
do_send(void)146 static int do_send(void)
147 {
148 	struct sockaddr_in saddr;
149 	struct io_uring ring;
150 	struct io_uring_cqe *cqe;
151 	struct io_uring_sqe *sqe;
152 	int sockfd, ret, i;
153 	struct iovec iov;
154 	int *buf;
155 
156 	ret = io_uring_queue_init(2, &ring, 0);
157 	if (ret) {
158 		fprintf(stderr, "queue init failed: %d\n", ret);
159 		return 1;
160 	}
161 
162 	buf = malloc(MAX_MSG * sizeof(int));
163 	for (i = 0; i < MAX_MSG; i++)
164 		buf[i] = i;
165 
166 	memset(&saddr, 0, sizeof(saddr));
167 	saddr.sin_family = AF_INET;
168 	saddr.sin_port = htons(PORT);
169 	inet_pton(AF_INET, HOST, &saddr.sin_addr);
170 
171 	sockfd = socket(AF_INET, SOCK_DGRAM, 0);
172 	if (sockfd < 0) {
173 		perror("socket");
174 		return 1;
175 	}
176 
177 	ret = connect(sockfd, (struct sockaddr *)&saddr, sizeof(saddr));
178 	if (ret < 0) {
179 		perror("connect");
180 		return 1;
181 	}
182 
183 	iov.iov_base = buf;
184 	iov.iov_len = MAX_MSG * sizeof(int) / 2;
185 	for (i = 0; i < 2; i++) {
186 		sqe = io_uring_get_sqe(&ring);
187 		io_uring_prep_send(sqe, sockfd, iov.iov_base, iov.iov_len, 0);
188 		sqe->user_data = 1;
189 
190 		ret = io_uring_submit(&ring);
191 		if (ret <= 0) {
192 			fprintf(stderr, "submit failed: %d\n", ret);
193 			goto err;
194 		}
195 		usleep(10000);
196 		iov.iov_base += iov.iov_len;
197 	}
198 
199 	for (i = 0; i < 2; i++) {
200 		ret = io_uring_wait_cqe(&ring, &cqe);
201 		if (cqe->res == -EINVAL) {
202 			fprintf(stdout, "send not supported, skipping\n");
203 			close(sockfd);
204 			return 0;
205 		}
206 		if (cqe->res != iov.iov_len) {
207 			fprintf(stderr, "failed cqe: %d\n", cqe->res);
208 			goto err;
209 		}
210 		io_uring_cqe_seen(&ring, cqe);
211 	}
212 
213 	close(sockfd);
214 	return 0;
215 err:
216 	close(sockfd);
217 	return 1;
218 }
219 
test(int use_recvmsg)220 static int test(int use_recvmsg)
221 {
222 	pthread_mutexattr_t attr;
223 	pthread_t recv_thread;
224 	struct recv_data rd;
225 	int ret;
226 	void *retval;
227 
228 	pthread_mutexattr_init(&attr);
229 	pthread_mutexattr_setpshared(&attr, 1);
230 	pthread_mutex_init(&rd.mutex, &attr);
231 	pthread_mutex_lock(&rd.mutex);
232 	rd.use_recvmsg = use_recvmsg;
233 
234 	ret = pthread_create(&recv_thread, NULL, recv_fn, &rd);
235 	if (ret) {
236 		fprintf(stderr, "Thread create failed: %d\n", ret);
237 		pthread_mutex_unlock(&rd.mutex);
238 		return 1;
239 	}
240 
241 	pthread_mutex_lock(&rd.mutex);
242 	do_send();
243 	pthread_join(recv_thread, &retval);
244 	return (intptr_t)retval;
245 }
246 
main(int argc,char * argv[])247 int main(int argc, char *argv[])
248 {
249 	int ret;
250 
251 	if (argc > 1)
252 		return 0;
253 
254 	ret = test(0);
255 	if (ret) {
256 		fprintf(stderr, "test recv failed\n");
257 		return ret;
258 	}
259 
260 	ret = test(1);
261 	if (ret) {
262 		fprintf(stderr, "test recvmsg failed\n");
263 		return ret;
264 	}
265 
266 	return 0;
267 }
268