xref: /aosp_15_r20/external/zstd/programs/fileio_asyncio.h (revision 01826a4963a0d8a59bc3812d29bdf0fb76416722)
1*01826a49SYabin Cui /*
2*01826a49SYabin Cui  * Copyright (c) Meta Platforms, Inc. and affiliates.
3*01826a49SYabin Cui  * All rights reserved.
4*01826a49SYabin Cui  *
5*01826a49SYabin Cui  * This source code is licensed under both the BSD-style license (found in the
6*01826a49SYabin Cui  * LICENSE file in the root directory of this source tree) and the GPLv2 (found
7*01826a49SYabin Cui  * in the COPYING file in the root directory of this source tree).
8*01826a49SYabin Cui  * You may select, at your option, one of the above-listed licenses.
9*01826a49SYabin Cui  */
10*01826a49SYabin Cui 
11*01826a49SYabin Cui  /*
12*01826a49SYabin Cui   * FileIO AsyncIO exposes read/write IO pools that allow doing IO asynchronously.
13*01826a49SYabin Cui   * Current implementation relies on having one thread that reads and one that
14*01826a49SYabin Cui   * writes.
15*01826a49SYabin Cui   * Each IO pool supports up to `MAX_IO_JOBS` that can be enqueued for work, but
16*01826a49SYabin Cui   * are performed serially by the appropriate worker thread.
17*01826a49SYabin Cui   * Most systems exposes better primitives to perform asynchronous IO, such as
18*01826a49SYabin Cui   * io_uring on newer linux systems. The API is built in such a way that in the
19*01826a49SYabin Cui   * future we could replace the threads with better solutions when available.
20*01826a49SYabin Cui   */
21*01826a49SYabin Cui 
22*01826a49SYabin Cui #ifndef ZSTD_FILEIO_ASYNCIO_H
23*01826a49SYabin Cui #define ZSTD_FILEIO_ASYNCIO_H
24*01826a49SYabin Cui 
25*01826a49SYabin Cui #if defined (__cplusplus)
26*01826a49SYabin Cui extern "C" {
27*01826a49SYabin Cui #endif
28*01826a49SYabin Cui 
29*01826a49SYabin Cui #include "../lib/common/mem.h"     /* U32, U64 */
30*01826a49SYabin Cui #include "fileio_types.h"
31*01826a49SYabin Cui #include "platform.h"
32*01826a49SYabin Cui #include "util.h"
33*01826a49SYabin Cui #include "../lib/common/pool.h"
34*01826a49SYabin Cui #include "../lib/common/threading.h"
35*01826a49SYabin Cui 
36*01826a49SYabin Cui #define MAX_IO_JOBS          (10)
37*01826a49SYabin Cui 
38*01826a49SYabin Cui typedef struct {
39*01826a49SYabin Cui     /* These struct fields should be set only on creation and not changed afterwards */
40*01826a49SYabin Cui     POOL_ctx* threadPool;
41*01826a49SYabin Cui     int threadPoolActive;
42*01826a49SYabin Cui     int totalIoJobs;
43*01826a49SYabin Cui     const FIO_prefs_t* prefs;
44*01826a49SYabin Cui     POOL_function poolFunction;
45*01826a49SYabin Cui 
46*01826a49SYabin Cui     /* Controls the file we currently write to, make changes only by using provided utility functions */
47*01826a49SYabin Cui     FILE* file;
48*01826a49SYabin Cui 
49*01826a49SYabin Cui     /* The jobs and availableJobsCount fields are accessed by both the main and worker threads and should
50*01826a49SYabin Cui      * only be mutated after locking the mutex */
51*01826a49SYabin Cui     ZSTD_pthread_mutex_t ioJobsMutex;
52*01826a49SYabin Cui     void* availableJobs[MAX_IO_JOBS];
53*01826a49SYabin Cui     int availableJobsCount;
54*01826a49SYabin Cui     size_t jobBufferSize;
55*01826a49SYabin Cui } IOPoolCtx_t;
56*01826a49SYabin Cui 
57*01826a49SYabin Cui typedef struct {
58*01826a49SYabin Cui     IOPoolCtx_t base;
59*01826a49SYabin Cui 
60*01826a49SYabin Cui     /* State regarding the currently read file */
61*01826a49SYabin Cui     int reachedEof;
62*01826a49SYabin Cui     U64 nextReadOffset;
63*01826a49SYabin Cui     U64 waitingOnOffset;
64*01826a49SYabin Cui 
65*01826a49SYabin Cui     /* We may hold an IOJob object as needed if we actively expose its buffer. */
66*01826a49SYabin Cui     void *currentJobHeld;
67*01826a49SYabin Cui 
68*01826a49SYabin Cui     /* Coalesce buffer is used to join two buffers in case where we need to read more bytes than left in
69*01826a49SYabin Cui      * the first of them. Shouldn't be accessed from outside ot utility functions. */
70*01826a49SYabin Cui     U8 *coalesceBuffer;
71*01826a49SYabin Cui 
72*01826a49SYabin Cui     /* Read buffer can be used by consumer code, take care when copying this pointer aside as it might
73*01826a49SYabin Cui      * change when consuming / refilling buffer. */
74*01826a49SYabin Cui     U8 *srcBuffer;
75*01826a49SYabin Cui     size_t srcBufferLoaded;
76*01826a49SYabin Cui 
77*01826a49SYabin Cui     /* We need to know what tasks completed so we can use their buffers when their time comes.
78*01826a49SYabin Cui      * Should only be accessed after locking base.ioJobsMutex . */
79*01826a49SYabin Cui     void* completedJobs[MAX_IO_JOBS];
80*01826a49SYabin Cui     int completedJobsCount;
81*01826a49SYabin Cui     ZSTD_pthread_cond_t jobCompletedCond;
82*01826a49SYabin Cui } ReadPoolCtx_t;
83*01826a49SYabin Cui 
84*01826a49SYabin Cui typedef struct {
85*01826a49SYabin Cui     IOPoolCtx_t base;
86*01826a49SYabin Cui     unsigned storedSkips;
87*01826a49SYabin Cui } WritePoolCtx_t;
88*01826a49SYabin Cui 
89*01826a49SYabin Cui typedef struct {
90*01826a49SYabin Cui     /* These fields are automatically set and shouldn't be changed by non WritePool code. */
91*01826a49SYabin Cui     void *ctx;
92*01826a49SYabin Cui     FILE* file;
93*01826a49SYabin Cui     void *buffer;
94*01826a49SYabin Cui     size_t bufferSize;
95*01826a49SYabin Cui 
96*01826a49SYabin Cui     /* This field should be changed before a job is queued for execution and should contain the number
97*01826a49SYabin Cui      * of bytes to write from the buffer. */
98*01826a49SYabin Cui     size_t usedBufferSize;
99*01826a49SYabin Cui     U64 offset;
100*01826a49SYabin Cui } IOJob_t;
101*01826a49SYabin Cui 
102*01826a49SYabin Cui /* AIO_supported:
103*01826a49SYabin Cui  * Returns 1 if AsyncIO is supported on the system, 0 otherwise. */
104*01826a49SYabin Cui int AIO_supported(void);
105*01826a49SYabin Cui 
106*01826a49SYabin Cui 
107*01826a49SYabin Cui /* AIO_WritePool_releaseIoJob:
108*01826a49SYabin Cui  * Releases an acquired job back to the pool. Doesn't execute the job. */
109*01826a49SYabin Cui void AIO_WritePool_releaseIoJob(IOJob_t *job);
110*01826a49SYabin Cui 
111*01826a49SYabin Cui /* AIO_WritePool_acquireJob:
112*01826a49SYabin Cui  * Returns an available write job to be used for a future write. */
113*01826a49SYabin Cui IOJob_t* AIO_WritePool_acquireJob(WritePoolCtx_t *ctx);
114*01826a49SYabin Cui 
115*01826a49SYabin Cui /* AIO_WritePool_enqueueAndReacquireWriteJob:
116*01826a49SYabin Cui  * Enqueues a write job for execution and acquires a new one.
117*01826a49SYabin Cui  * After execution `job`'s pointed value would change to the newly acquired job.
118*01826a49SYabin Cui  * Make sure to set `usedBufferSize` to the wanted length before call.
119*01826a49SYabin Cui  * The queued job shouldn't be used directly after queueing it. */
120*01826a49SYabin Cui void AIO_WritePool_enqueueAndReacquireWriteJob(IOJob_t **job);
121*01826a49SYabin Cui 
122*01826a49SYabin Cui /* AIO_WritePool_sparseWriteEnd:
123*01826a49SYabin Cui  * Ends sparse writes to the current file.
124*01826a49SYabin Cui  * Blocks on completion of all current write jobs before executing. */
125*01826a49SYabin Cui void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t *ctx);
126*01826a49SYabin Cui 
127*01826a49SYabin Cui /* AIO_WritePool_setFile:
128*01826a49SYabin Cui  * Sets the destination file for future writes in the pool.
129*01826a49SYabin Cui  * Requires completion of all queues write jobs and release of all otherwise acquired jobs.
130*01826a49SYabin Cui  * Also requires ending of sparse write if a previous file was used in sparse mode. */
131*01826a49SYabin Cui void AIO_WritePool_setFile(WritePoolCtx_t *ctx, FILE* file);
132*01826a49SYabin Cui 
133*01826a49SYabin Cui /* AIO_WritePool_getFile:
134*01826a49SYabin Cui  * Returns the file the writePool is currently set to write to. */
135*01826a49SYabin Cui FILE* AIO_WritePool_getFile(const WritePoolCtx_t* ctx);
136*01826a49SYabin Cui 
137*01826a49SYabin Cui /* AIO_WritePool_closeFile:
138*01826a49SYabin Cui  * Ends sparse write and closes the writePool's current file and sets the file to NULL.
139*01826a49SYabin Cui  * Requires completion of all queues write jobs and release of all otherwise acquired jobs.  */
140*01826a49SYabin Cui int AIO_WritePool_closeFile(WritePoolCtx_t *ctx);
141*01826a49SYabin Cui 
142*01826a49SYabin Cui /* AIO_WritePool_create:
143*01826a49SYabin Cui  * Allocates and sets and a new write pool including its included jobs.
144*01826a49SYabin Cui  * bufferSize should be set to the maximal buffer we want to write to at a time. */
145*01826a49SYabin Cui WritePoolCtx_t* AIO_WritePool_create(const FIO_prefs_t* prefs, size_t bufferSize);
146*01826a49SYabin Cui 
147*01826a49SYabin Cui /* AIO_WritePool_free:
148*01826a49SYabin Cui  * Frees and releases a writePool and its resources. Closes destination file. */
149*01826a49SYabin Cui void AIO_WritePool_free(WritePoolCtx_t* ctx);
150*01826a49SYabin Cui 
151*01826a49SYabin Cui /* AIO_WritePool_setAsync:
152*01826a49SYabin Cui  * Allows (de)activating async mode, to be used when the expected overhead
153*01826a49SYabin Cui  * of asyncio costs more than the expected gains. */
154*01826a49SYabin Cui void AIO_WritePool_setAsync(WritePoolCtx_t* ctx, int async);
155*01826a49SYabin Cui 
156*01826a49SYabin Cui /* AIO_ReadPool_create:
157*01826a49SYabin Cui  * Allocates and sets and a new readPool including its included jobs.
158*01826a49SYabin Cui  * bufferSize should be set to the maximal buffer we want to read at a time, will also be used
159*01826a49SYabin Cui  * as our basic read size. */
160*01826a49SYabin Cui ReadPoolCtx_t* AIO_ReadPool_create(const FIO_prefs_t* prefs, size_t bufferSize);
161*01826a49SYabin Cui 
162*01826a49SYabin Cui /* AIO_ReadPool_free:
163*01826a49SYabin Cui  * Frees and releases a readPool and its resources. Closes source file. */
164*01826a49SYabin Cui void AIO_ReadPool_free(ReadPoolCtx_t* ctx);
165*01826a49SYabin Cui 
166*01826a49SYabin Cui /* AIO_ReadPool_setAsync:
167*01826a49SYabin Cui  * Allows (de)activating async mode, to be used when the expected overhead
168*01826a49SYabin Cui  * of asyncio costs more than the expected gains. */
169*01826a49SYabin Cui void AIO_ReadPool_setAsync(ReadPoolCtx_t* ctx, int async);
170*01826a49SYabin Cui 
171*01826a49SYabin Cui /* AIO_ReadPool_consumeBytes:
172*01826a49SYabin Cui  * Consumes byes from srcBuffer's beginning and updates srcBufferLoaded accordingly. */
173*01826a49SYabin Cui void AIO_ReadPool_consumeBytes(ReadPoolCtx_t *ctx, size_t n);
174*01826a49SYabin Cui 
175*01826a49SYabin Cui /* AIO_ReadPool_fillBuffer:
176*01826a49SYabin Cui  * Makes sure buffer has at least n bytes loaded (as long as n is not bigger than the initialized bufferSize).
177*01826a49SYabin Cui  * Returns if srcBuffer has at least n bytes loaded or if we've reached the end of the file.
178*01826a49SYabin Cui  * Return value is the number of bytes added to the buffer.
179*01826a49SYabin Cui  * Note that srcBuffer might have up to 2 times bufferSize bytes. */
180*01826a49SYabin Cui size_t AIO_ReadPool_fillBuffer(ReadPoolCtx_t *ctx, size_t n);
181*01826a49SYabin Cui 
182*01826a49SYabin Cui /* AIO_ReadPool_consumeAndRefill:
183*01826a49SYabin Cui  * Consumes the current buffer and refills it with bufferSize bytes. */
184*01826a49SYabin Cui size_t AIO_ReadPool_consumeAndRefill(ReadPoolCtx_t *ctx);
185*01826a49SYabin Cui 
186*01826a49SYabin Cui /* AIO_ReadPool_setFile:
187*01826a49SYabin Cui  * Sets the source file for future read in the pool. Initiates reading immediately if file is not NULL.
188*01826a49SYabin Cui  * Waits for all current enqueued tasks to complete if a previous file was set. */
189*01826a49SYabin Cui void AIO_ReadPool_setFile(ReadPoolCtx_t *ctx, FILE* file);
190*01826a49SYabin Cui 
191*01826a49SYabin Cui /* AIO_ReadPool_getFile:
192*01826a49SYabin Cui  * Returns the current file set for the read pool. */
193*01826a49SYabin Cui FILE* AIO_ReadPool_getFile(const ReadPoolCtx_t *ctx);
194*01826a49SYabin Cui 
195*01826a49SYabin Cui /* AIO_ReadPool_closeFile:
196*01826a49SYabin Cui  * Closes the current set file. Waits for all current enqueued tasks to complete and resets state. */
197*01826a49SYabin Cui int AIO_ReadPool_closeFile(ReadPoolCtx_t *ctx);
198*01826a49SYabin Cui 
199*01826a49SYabin Cui #if defined (__cplusplus)
200*01826a49SYabin Cui }
201*01826a49SYabin Cui #endif
202*01826a49SYabin Cui 
203*01826a49SYabin Cui #endif /* ZSTD_FILEIO_ASYNCIO_H */
204