1 /* SPDX-License-Identifier: MIT */
2 /*
3 * Description: test io_uring fpos handling
4 *
5 */
6 #include <errno.h>
7 #include <stdio.h>
8 #include <unistd.h>
9 #include <stdlib.h>
10 #include <string.h>
11 #include <fcntl.h>
12 #include <assert.h>
13
14 #include "helpers.h"
15 #include "liburing.h"
16
17 #define FILE_SIZE 5000
18 #define QUEUE_SIZE 2048
19
create_file(const char * file,size_t size)20 static void create_file(const char *file, size_t size)
21 {
22 ssize_t ret;
23 char *buf;
24 size_t idx;
25 int fd;
26
27 buf = t_malloc(size);
28 for (idx = 0; idx < size; ++idx) {
29 /* write 0 or 1 */
30 buf[idx] = (unsigned char)(idx & 0x01);
31 }
32
33 fd = open(file, O_WRONLY | O_CREAT, 0644);
34 assert(fd >= 0);
35
36 ret = write(fd, buf, size);
37 fsync(fd);
38 close(fd);
39 free(buf);
40 assert(ret == size);
41 }
42
test_read(struct io_uring * ring,bool async,int blocksize)43 static int test_read(struct io_uring *ring, bool async, int blocksize)
44 {
45 int ret, fd, i;
46 bool done = false;
47 struct io_uring_sqe *sqe;
48 struct io_uring_cqe *cqe;
49 loff_t current, expected = 0;
50 int count_ok;
51 int count_0 = 0, count_1 = 0;
52 unsigned char buff[QUEUE_SIZE * blocksize];
53 unsigned char reordered[QUEUE_SIZE * blocksize];
54
55 create_file(".test_fpos_read", FILE_SIZE);
56 fd = open(".test_fpos_read", O_RDONLY);
57 unlink(".test_fpos_read");
58 assert(fd >= 0);
59
60 while (!done) {
61 for (i = 0; i < QUEUE_SIZE; ++i) {
62 sqe = io_uring_get_sqe(ring);
63 if (!sqe) {
64 fprintf(stderr, "no sqe\n");
65 return -1;
66 }
67 io_uring_prep_read(sqe, fd,
68 buff + i * blocksize,
69 blocksize, -1);
70 sqe->user_data = i;
71 if (async)
72 sqe->flags |= IOSQE_ASYNC;
73 if (i != QUEUE_SIZE - 1)
74 sqe->flags |= IOSQE_IO_LINK;
75 }
76 ret = io_uring_submit_and_wait(ring, QUEUE_SIZE);
77 if (ret != QUEUE_SIZE) {
78 fprintf(stderr, "submit failed: %d\n", ret);
79 return 1;
80 }
81 count_ok = 0;
82 for (i = 0; i < QUEUE_SIZE; ++i) {
83 int res;
84
85 ret = io_uring_peek_cqe(ring, &cqe);
86 if (ret) {
87 fprintf(stderr, "peek failed: %d\n", ret);
88 return ret;
89 }
90 assert(cqe->user_data < QUEUE_SIZE);
91 memcpy(reordered + count_ok,
92 buff + cqe->user_data * blocksize, blocksize);
93 res = cqe->res;
94 io_uring_cqe_seen(ring, cqe);
95 if (res == 0) {
96 done = true;
97 } else if (res == -ECANCELED) {
98 /* cancelled, probably ok */
99 } else if (res < 0 || res > blocksize) {
100 fprintf(stderr, "bad read: %d\n", res);
101 return -1;
102 } else {
103 expected += res;
104 count_ok += res;
105 }
106 }
107 ret = 0;
108 for (i = 0; i < count_ok; i++) {
109 if (reordered[i] == 1) {
110 count_1++;
111 } else if (reordered[i] == 0) {
112 count_0++;
113 } else {
114 fprintf(stderr, "odd read %d\n",
115 (int)reordered[i]);
116 ret = -1;
117 break;
118 }
119 }
120 if (labs(count_1 - count_0) > 1) {
121 fprintf(stderr, "inconsistent reads, got 0s:%d 1s:%d\n",
122 count_0, count_1);
123 ret = -1;
124 }
125 current = lseek(fd, 0, SEEK_CUR);
126 if (current != expected) {
127 fprintf(stderr, "f_pos incorrect, expected %ld have %ld\n",
128 (long) expected, (long) current);
129 ret = -1;
130 }
131 if (ret)
132 return ret;
133 }
134 return 0;
135 }
136
137
test_write(struct io_uring * ring,bool async,int blocksize)138 static int test_write(struct io_uring *ring, bool async, int blocksize)
139 {
140 int ret, fd, i;
141 struct io_uring_sqe *sqe;
142 struct io_uring_cqe *cqe;
143 bool fail = false;
144 loff_t current;
145 char data[blocksize+1];
146 char readbuff[QUEUE_SIZE*blocksize+1];
147
148 fd = open(".test_fpos_write", O_RDWR | O_CREAT, 0644);
149 unlink(".test_fpos_write");
150 assert(fd >= 0);
151
152 for (i = 0; i < blocksize; i++)
153 data[i] = 'A' + i;
154
155 data[blocksize] = '\0';
156
157 for (i = 0; i < QUEUE_SIZE; ++i) {
158 sqe = io_uring_get_sqe(ring);
159 if (!sqe) {
160 fprintf(stderr, "no sqe\n");
161 return -1;
162 }
163 io_uring_prep_write(sqe, fd, data + (i % blocksize), 1, -1);
164 sqe->user_data = 1;
165 if (async)
166 sqe->flags |= IOSQE_ASYNC;
167 if (i != QUEUE_SIZE - 1)
168 sqe->flags |= IOSQE_IO_LINK;
169 }
170 ret = io_uring_submit_and_wait(ring, QUEUE_SIZE);
171 if (ret != QUEUE_SIZE) {
172 fprintf(stderr, "submit failed: %d\n", ret);
173 return 1;
174 }
175 for (i = 0; i < QUEUE_SIZE; ++i) {
176 int res;
177
178 ret = io_uring_peek_cqe(ring, &cqe);
179 res = cqe->res;
180 if (ret) {
181 fprintf(stderr, "peek failed: %d\n", ret);
182 return ret;
183 }
184 io_uring_cqe_seen(ring, cqe);
185 if (!fail && res != 1) {
186 fprintf(stderr, "bad result %d\n", res);
187 fail = true;
188 }
189 }
190 current = lseek(fd, 0, SEEK_CUR);
191 if (current != QUEUE_SIZE) {
192 fprintf(stderr, "f_pos incorrect, expected %ld have %d\n",
193 (long) current, QUEUE_SIZE);
194 fail = true;
195 }
196 current = lseek(fd, 0, SEEK_SET);
197 if (current != 0) {
198 perror("seek to start");
199 return -1;
200 }
201 ret = read(fd, readbuff, QUEUE_SIZE);
202 if (ret != QUEUE_SIZE) {
203 fprintf(stderr, "did not write enough: %d\n", ret);
204 return -1;
205 }
206 i = 0;
207 while (i < QUEUE_SIZE - blocksize) {
208 if (strncmp(readbuff + i, data, blocksize)) {
209 char bad[QUEUE_SIZE+1];
210
211 memcpy(bad, readbuff + i, blocksize);
212 bad[blocksize] = '\0';
213 fprintf(stderr, "unexpected data %s\n", bad);
214 fail = true;
215 }
216 i += blocksize;
217 }
218
219 return fail ? -1 : 0;
220 }
221
main(int argc,char * argv[])222 int main(int argc, char *argv[])
223 {
224 struct io_uring ring;
225 int ret;
226
227 if (argc > 1)
228 return 0;
229
230 ret = io_uring_queue_init(QUEUE_SIZE, &ring, 0);
231 if (ret) {
232 fprintf(stderr, "ring setup failed\n");
233 return 1;
234 }
235
236 for (int test = 0; test < 8; test++) {
237 int async = test & 0x01;
238 int write = test & 0x02;
239 int blocksize = test & 0x04 ? 1 : 7;
240
241 ret = write
242 ? test_write(&ring, !!async, blocksize)
243 : test_read(&ring, !!async, blocksize);
244 if (ret) {
245 fprintf(stderr, "failed %s async=%d blocksize=%d\n",
246 write ? "write" : "read",
247 async, blocksize);
248 return -1;
249 }
250 }
251 return 0;
252 }
253