1*01826a49SYabin Cui /* 2*01826a49SYabin Cui * Copyright (c) Meta Platforms, Inc. and affiliates. 3*01826a49SYabin Cui * All rights reserved. 4*01826a49SYabin Cui * 5*01826a49SYabin Cui * This source code is licensed under both the BSD-style license (found in the 6*01826a49SYabin Cui * LICENSE file in the root directory of this source tree) and the GPLv2 (found 7*01826a49SYabin Cui * in the COPYING file in the root directory of this source tree). 8*01826a49SYabin Cui * You may select, at your option, one of the above-listed licenses. 9*01826a49SYabin Cui */ 10*01826a49SYabin Cui 11*01826a49SYabin Cui /* 12*01826a49SYabin Cui * FileIO AsyncIO exposes read/write IO pools that allow doing IO asynchronously. 13*01826a49SYabin Cui * Current implementation relies on having one thread that reads and one that 14*01826a49SYabin Cui * writes. 15*01826a49SYabin Cui * Each IO pool supports up to `MAX_IO_JOBS` that can be enqueued for work, but 16*01826a49SYabin Cui * are performed serially by the appropriate worker thread. 17*01826a49SYabin Cui * Most systems exposes better primitives to perform asynchronous IO, such as 18*01826a49SYabin Cui * io_uring on newer linux systems. The API is built in such a way that in the 19*01826a49SYabin Cui * future we could replace the threads with better solutions when available. 20*01826a49SYabin Cui */ 21*01826a49SYabin Cui 22*01826a49SYabin Cui #ifndef ZSTD_FILEIO_ASYNCIO_H 23*01826a49SYabin Cui #define ZSTD_FILEIO_ASYNCIO_H 24*01826a49SYabin Cui 25*01826a49SYabin Cui #if defined (__cplusplus) 26*01826a49SYabin Cui extern "C" { 27*01826a49SYabin Cui #endif 28*01826a49SYabin Cui 29*01826a49SYabin Cui #include "../lib/common/mem.h" /* U32, U64 */ 30*01826a49SYabin Cui #include "fileio_types.h" 31*01826a49SYabin Cui #include "platform.h" 32*01826a49SYabin Cui #include "util.h" 33*01826a49SYabin Cui #include "../lib/common/pool.h" 34*01826a49SYabin Cui #include "../lib/common/threading.h" 35*01826a49SYabin Cui 36*01826a49SYabin Cui #define MAX_IO_JOBS (10) 37*01826a49SYabin Cui 38*01826a49SYabin Cui typedef struct { 39*01826a49SYabin Cui /* These struct fields should be set only on creation and not changed afterwards */ 40*01826a49SYabin Cui POOL_ctx* threadPool; 41*01826a49SYabin Cui int threadPoolActive; 42*01826a49SYabin Cui int totalIoJobs; 43*01826a49SYabin Cui const FIO_prefs_t* prefs; 44*01826a49SYabin Cui POOL_function poolFunction; 45*01826a49SYabin Cui 46*01826a49SYabin Cui /* Controls the file we currently write to, make changes only by using provided utility functions */ 47*01826a49SYabin Cui FILE* file; 48*01826a49SYabin Cui 49*01826a49SYabin Cui /* The jobs and availableJobsCount fields are accessed by both the main and worker threads and should 50*01826a49SYabin Cui * only be mutated after locking the mutex */ 51*01826a49SYabin Cui ZSTD_pthread_mutex_t ioJobsMutex; 52*01826a49SYabin Cui void* availableJobs[MAX_IO_JOBS]; 53*01826a49SYabin Cui int availableJobsCount; 54*01826a49SYabin Cui size_t jobBufferSize; 55*01826a49SYabin Cui } IOPoolCtx_t; 56*01826a49SYabin Cui 57*01826a49SYabin Cui typedef struct { 58*01826a49SYabin Cui IOPoolCtx_t base; 59*01826a49SYabin Cui 60*01826a49SYabin Cui /* State regarding the currently read file */ 61*01826a49SYabin Cui int reachedEof; 62*01826a49SYabin Cui U64 nextReadOffset; 63*01826a49SYabin Cui U64 waitingOnOffset; 64*01826a49SYabin Cui 65*01826a49SYabin Cui /* We may hold an IOJob object as needed if we actively expose its buffer. */ 66*01826a49SYabin Cui void *currentJobHeld; 67*01826a49SYabin Cui 68*01826a49SYabin Cui /* Coalesce buffer is used to join two buffers in case where we need to read more bytes than left in 69*01826a49SYabin Cui * the first of them. Shouldn't be accessed from outside ot utility functions. */ 70*01826a49SYabin Cui U8 *coalesceBuffer; 71*01826a49SYabin Cui 72*01826a49SYabin Cui /* Read buffer can be used by consumer code, take care when copying this pointer aside as it might 73*01826a49SYabin Cui * change when consuming / refilling buffer. */ 74*01826a49SYabin Cui U8 *srcBuffer; 75*01826a49SYabin Cui size_t srcBufferLoaded; 76*01826a49SYabin Cui 77*01826a49SYabin Cui /* We need to know what tasks completed so we can use their buffers when their time comes. 78*01826a49SYabin Cui * Should only be accessed after locking base.ioJobsMutex . */ 79*01826a49SYabin Cui void* completedJobs[MAX_IO_JOBS]; 80*01826a49SYabin Cui int completedJobsCount; 81*01826a49SYabin Cui ZSTD_pthread_cond_t jobCompletedCond; 82*01826a49SYabin Cui } ReadPoolCtx_t; 83*01826a49SYabin Cui 84*01826a49SYabin Cui typedef struct { 85*01826a49SYabin Cui IOPoolCtx_t base; 86*01826a49SYabin Cui unsigned storedSkips; 87*01826a49SYabin Cui } WritePoolCtx_t; 88*01826a49SYabin Cui 89*01826a49SYabin Cui typedef struct { 90*01826a49SYabin Cui /* These fields are automatically set and shouldn't be changed by non WritePool code. */ 91*01826a49SYabin Cui void *ctx; 92*01826a49SYabin Cui FILE* file; 93*01826a49SYabin Cui void *buffer; 94*01826a49SYabin Cui size_t bufferSize; 95*01826a49SYabin Cui 96*01826a49SYabin Cui /* This field should be changed before a job is queued for execution and should contain the number 97*01826a49SYabin Cui * of bytes to write from the buffer. */ 98*01826a49SYabin Cui size_t usedBufferSize; 99*01826a49SYabin Cui U64 offset; 100*01826a49SYabin Cui } IOJob_t; 101*01826a49SYabin Cui 102*01826a49SYabin Cui /* AIO_supported: 103*01826a49SYabin Cui * Returns 1 if AsyncIO is supported on the system, 0 otherwise. */ 104*01826a49SYabin Cui int AIO_supported(void); 105*01826a49SYabin Cui 106*01826a49SYabin Cui 107*01826a49SYabin Cui /* AIO_WritePool_releaseIoJob: 108*01826a49SYabin Cui * Releases an acquired job back to the pool. Doesn't execute the job. */ 109*01826a49SYabin Cui void AIO_WritePool_releaseIoJob(IOJob_t *job); 110*01826a49SYabin Cui 111*01826a49SYabin Cui /* AIO_WritePool_acquireJob: 112*01826a49SYabin Cui * Returns an available write job to be used for a future write. */ 113*01826a49SYabin Cui IOJob_t* AIO_WritePool_acquireJob(WritePoolCtx_t *ctx); 114*01826a49SYabin Cui 115*01826a49SYabin Cui /* AIO_WritePool_enqueueAndReacquireWriteJob: 116*01826a49SYabin Cui * Enqueues a write job for execution and acquires a new one. 117*01826a49SYabin Cui * After execution `job`'s pointed value would change to the newly acquired job. 118*01826a49SYabin Cui * Make sure to set `usedBufferSize` to the wanted length before call. 119*01826a49SYabin Cui * The queued job shouldn't be used directly after queueing it. */ 120*01826a49SYabin Cui void AIO_WritePool_enqueueAndReacquireWriteJob(IOJob_t **job); 121*01826a49SYabin Cui 122*01826a49SYabin Cui /* AIO_WritePool_sparseWriteEnd: 123*01826a49SYabin Cui * Ends sparse writes to the current file. 124*01826a49SYabin Cui * Blocks on completion of all current write jobs before executing. */ 125*01826a49SYabin Cui void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t *ctx); 126*01826a49SYabin Cui 127*01826a49SYabin Cui /* AIO_WritePool_setFile: 128*01826a49SYabin Cui * Sets the destination file for future writes in the pool. 129*01826a49SYabin Cui * Requires completion of all queues write jobs and release of all otherwise acquired jobs. 130*01826a49SYabin Cui * Also requires ending of sparse write if a previous file was used in sparse mode. */ 131*01826a49SYabin Cui void AIO_WritePool_setFile(WritePoolCtx_t *ctx, FILE* file); 132*01826a49SYabin Cui 133*01826a49SYabin Cui /* AIO_WritePool_getFile: 134*01826a49SYabin Cui * Returns the file the writePool is currently set to write to. */ 135*01826a49SYabin Cui FILE* AIO_WritePool_getFile(const WritePoolCtx_t* ctx); 136*01826a49SYabin Cui 137*01826a49SYabin Cui /* AIO_WritePool_closeFile: 138*01826a49SYabin Cui * Ends sparse write and closes the writePool's current file and sets the file to NULL. 139*01826a49SYabin Cui * Requires completion of all queues write jobs and release of all otherwise acquired jobs. */ 140*01826a49SYabin Cui int AIO_WritePool_closeFile(WritePoolCtx_t *ctx); 141*01826a49SYabin Cui 142*01826a49SYabin Cui /* AIO_WritePool_create: 143*01826a49SYabin Cui * Allocates and sets and a new write pool including its included jobs. 144*01826a49SYabin Cui * bufferSize should be set to the maximal buffer we want to write to at a time. */ 145*01826a49SYabin Cui WritePoolCtx_t* AIO_WritePool_create(const FIO_prefs_t* prefs, size_t bufferSize); 146*01826a49SYabin Cui 147*01826a49SYabin Cui /* AIO_WritePool_free: 148*01826a49SYabin Cui * Frees and releases a writePool and its resources. Closes destination file. */ 149*01826a49SYabin Cui void AIO_WritePool_free(WritePoolCtx_t* ctx); 150*01826a49SYabin Cui 151*01826a49SYabin Cui /* AIO_WritePool_setAsync: 152*01826a49SYabin Cui * Allows (de)activating async mode, to be used when the expected overhead 153*01826a49SYabin Cui * of asyncio costs more than the expected gains. */ 154*01826a49SYabin Cui void AIO_WritePool_setAsync(WritePoolCtx_t* ctx, int async); 155*01826a49SYabin Cui 156*01826a49SYabin Cui /* AIO_ReadPool_create: 157*01826a49SYabin Cui * Allocates and sets and a new readPool including its included jobs. 158*01826a49SYabin Cui * bufferSize should be set to the maximal buffer we want to read at a time, will also be used 159*01826a49SYabin Cui * as our basic read size. */ 160*01826a49SYabin Cui ReadPoolCtx_t* AIO_ReadPool_create(const FIO_prefs_t* prefs, size_t bufferSize); 161*01826a49SYabin Cui 162*01826a49SYabin Cui /* AIO_ReadPool_free: 163*01826a49SYabin Cui * Frees and releases a readPool and its resources. Closes source file. */ 164*01826a49SYabin Cui void AIO_ReadPool_free(ReadPoolCtx_t* ctx); 165*01826a49SYabin Cui 166*01826a49SYabin Cui /* AIO_ReadPool_setAsync: 167*01826a49SYabin Cui * Allows (de)activating async mode, to be used when the expected overhead 168*01826a49SYabin Cui * of asyncio costs more than the expected gains. */ 169*01826a49SYabin Cui void AIO_ReadPool_setAsync(ReadPoolCtx_t* ctx, int async); 170*01826a49SYabin Cui 171*01826a49SYabin Cui /* AIO_ReadPool_consumeBytes: 172*01826a49SYabin Cui * Consumes byes from srcBuffer's beginning and updates srcBufferLoaded accordingly. */ 173*01826a49SYabin Cui void AIO_ReadPool_consumeBytes(ReadPoolCtx_t *ctx, size_t n); 174*01826a49SYabin Cui 175*01826a49SYabin Cui /* AIO_ReadPool_fillBuffer: 176*01826a49SYabin Cui * Makes sure buffer has at least n bytes loaded (as long as n is not bigger than the initialized bufferSize). 177*01826a49SYabin Cui * Returns if srcBuffer has at least n bytes loaded or if we've reached the end of the file. 178*01826a49SYabin Cui * Return value is the number of bytes added to the buffer. 179*01826a49SYabin Cui * Note that srcBuffer might have up to 2 times bufferSize bytes. */ 180*01826a49SYabin Cui size_t AIO_ReadPool_fillBuffer(ReadPoolCtx_t *ctx, size_t n); 181*01826a49SYabin Cui 182*01826a49SYabin Cui /* AIO_ReadPool_consumeAndRefill: 183*01826a49SYabin Cui * Consumes the current buffer and refills it with bufferSize bytes. */ 184*01826a49SYabin Cui size_t AIO_ReadPool_consumeAndRefill(ReadPoolCtx_t *ctx); 185*01826a49SYabin Cui 186*01826a49SYabin Cui /* AIO_ReadPool_setFile: 187*01826a49SYabin Cui * Sets the source file for future read in the pool. Initiates reading immediately if file is not NULL. 188*01826a49SYabin Cui * Waits for all current enqueued tasks to complete if a previous file was set. */ 189*01826a49SYabin Cui void AIO_ReadPool_setFile(ReadPoolCtx_t *ctx, FILE* file); 190*01826a49SYabin Cui 191*01826a49SYabin Cui /* AIO_ReadPool_getFile: 192*01826a49SYabin Cui * Returns the current file set for the read pool. */ 193*01826a49SYabin Cui FILE* AIO_ReadPool_getFile(const ReadPoolCtx_t *ctx); 194*01826a49SYabin Cui 195*01826a49SYabin Cui /* AIO_ReadPool_closeFile: 196*01826a49SYabin Cui * Closes the current set file. Waits for all current enqueued tasks to complete and resets state. */ 197*01826a49SYabin Cui int AIO_ReadPool_closeFile(ReadPoolCtx_t *ctx); 198*01826a49SYabin Cui 199*01826a49SYabin Cui #if defined (__cplusplus) 200*01826a49SYabin Cui } 201*01826a49SYabin Cui #endif 202*01826a49SYabin Cui 203*01826a49SYabin Cui #endif /* ZSTD_FILEIO_ASYNCIO_H */ 204