1 /* 2 * Copyright (c) Meta Platforms, Inc. and affiliates. 3 * All rights reserved. 4 * 5 * This source code is licensed under both the BSD-style license (found in the 6 * LICENSE file in the root directory of this source tree) and the GPLv2 (found 7 * in the COPYING file in the root directory of this source tree). 8 * You may select, at your option, one of the above-listed licenses. 9 */ 10 11 /* 12 * FileIO AsyncIO exposes read/write IO pools that allow doing IO asynchronously. 13 * Current implementation relies on having one thread that reads and one that 14 * writes. 15 * Each IO pool supports up to `MAX_IO_JOBS` that can be enqueued for work, but 16 * are performed serially by the appropriate worker thread. 17 * Most systems exposes better primitives to perform asynchronous IO, such as 18 * io_uring on newer linux systems. The API is built in such a way that in the 19 * future we could replace the threads with better solutions when available. 20 */ 21 22 #ifndef ZSTD_FILEIO_ASYNCIO_H 23 #define ZSTD_FILEIO_ASYNCIO_H 24 25 #if defined (__cplusplus) 26 extern "C" { 27 #endif 28 29 #include "../lib/common/mem.h" /* U32, U64 */ 30 #include "fileio_types.h" 31 #include "platform.h" 32 #include "util.h" 33 #include "../lib/common/pool.h" 34 #include "../lib/common/threading.h" 35 36 #define MAX_IO_JOBS (10) 37 38 typedef struct { 39 /* These struct fields should be set only on creation and not changed afterwards */ 40 POOL_ctx* threadPool; 41 int threadPoolActive; 42 int totalIoJobs; 43 const FIO_prefs_t* prefs; 44 POOL_function poolFunction; 45 46 /* Controls the file we currently write to, make changes only by using provided utility functions */ 47 FILE* file; 48 49 /* The jobs and availableJobsCount fields are accessed by both the main and worker threads and should 50 * only be mutated after locking the mutex */ 51 ZSTD_pthread_mutex_t ioJobsMutex; 52 void* availableJobs[MAX_IO_JOBS]; 53 int availableJobsCount; 54 size_t jobBufferSize; 55 } IOPoolCtx_t; 56 57 typedef struct { 58 IOPoolCtx_t base; 59 60 /* State regarding the currently read file */ 61 int reachedEof; 62 U64 nextReadOffset; 63 U64 waitingOnOffset; 64 65 /* We may hold an IOJob object as needed if we actively expose its buffer. */ 66 void *currentJobHeld; 67 68 /* Coalesce buffer is used to join two buffers in case where we need to read more bytes than left in 69 * the first of them. Shouldn't be accessed from outside ot utility functions. */ 70 U8 *coalesceBuffer; 71 72 /* Read buffer can be used by consumer code, take care when copying this pointer aside as it might 73 * change when consuming / refilling buffer. */ 74 U8 *srcBuffer; 75 size_t srcBufferLoaded; 76 77 /* We need to know what tasks completed so we can use their buffers when their time comes. 78 * Should only be accessed after locking base.ioJobsMutex . */ 79 void* completedJobs[MAX_IO_JOBS]; 80 int completedJobsCount; 81 ZSTD_pthread_cond_t jobCompletedCond; 82 } ReadPoolCtx_t; 83 84 typedef struct { 85 IOPoolCtx_t base; 86 unsigned storedSkips; 87 } WritePoolCtx_t; 88 89 typedef struct { 90 /* These fields are automatically set and shouldn't be changed by non WritePool code. */ 91 void *ctx; 92 FILE* file; 93 void *buffer; 94 size_t bufferSize; 95 96 /* This field should be changed before a job is queued for execution and should contain the number 97 * of bytes to write from the buffer. */ 98 size_t usedBufferSize; 99 U64 offset; 100 } IOJob_t; 101 102 /* AIO_supported: 103 * Returns 1 if AsyncIO is supported on the system, 0 otherwise. */ 104 int AIO_supported(void); 105 106 107 /* AIO_WritePool_releaseIoJob: 108 * Releases an acquired job back to the pool. Doesn't execute the job. */ 109 void AIO_WritePool_releaseIoJob(IOJob_t *job); 110 111 /* AIO_WritePool_acquireJob: 112 * Returns an available write job to be used for a future write. */ 113 IOJob_t* AIO_WritePool_acquireJob(WritePoolCtx_t *ctx); 114 115 /* AIO_WritePool_enqueueAndReacquireWriteJob: 116 * Enqueues a write job for execution and acquires a new one. 117 * After execution `job`'s pointed value would change to the newly acquired job. 118 * Make sure to set `usedBufferSize` to the wanted length before call. 119 * The queued job shouldn't be used directly after queueing it. */ 120 void AIO_WritePool_enqueueAndReacquireWriteJob(IOJob_t **job); 121 122 /* AIO_WritePool_sparseWriteEnd: 123 * Ends sparse writes to the current file. 124 * Blocks on completion of all current write jobs before executing. */ 125 void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t *ctx); 126 127 /* AIO_WritePool_setFile: 128 * Sets the destination file for future writes in the pool. 129 * Requires completion of all queues write jobs and release of all otherwise acquired jobs. 130 * Also requires ending of sparse write if a previous file was used in sparse mode. */ 131 void AIO_WritePool_setFile(WritePoolCtx_t *ctx, FILE* file); 132 133 /* AIO_WritePool_getFile: 134 * Returns the file the writePool is currently set to write to. */ 135 FILE* AIO_WritePool_getFile(const WritePoolCtx_t* ctx); 136 137 /* AIO_WritePool_closeFile: 138 * Ends sparse write and closes the writePool's current file and sets the file to NULL. 139 * Requires completion of all queues write jobs and release of all otherwise acquired jobs. */ 140 int AIO_WritePool_closeFile(WritePoolCtx_t *ctx); 141 142 /* AIO_WritePool_create: 143 * Allocates and sets and a new write pool including its included jobs. 144 * bufferSize should be set to the maximal buffer we want to write to at a time. */ 145 WritePoolCtx_t* AIO_WritePool_create(const FIO_prefs_t* prefs, size_t bufferSize); 146 147 /* AIO_WritePool_free: 148 * Frees and releases a writePool and its resources. Closes destination file. */ 149 void AIO_WritePool_free(WritePoolCtx_t* ctx); 150 151 /* AIO_WritePool_setAsync: 152 * Allows (de)activating async mode, to be used when the expected overhead 153 * of asyncio costs more than the expected gains. */ 154 void AIO_WritePool_setAsync(WritePoolCtx_t* ctx, int async); 155 156 /* AIO_ReadPool_create: 157 * Allocates and sets and a new readPool including its included jobs. 158 * bufferSize should be set to the maximal buffer we want to read at a time, will also be used 159 * as our basic read size. */ 160 ReadPoolCtx_t* AIO_ReadPool_create(const FIO_prefs_t* prefs, size_t bufferSize); 161 162 /* AIO_ReadPool_free: 163 * Frees and releases a readPool and its resources. Closes source file. */ 164 void AIO_ReadPool_free(ReadPoolCtx_t* ctx); 165 166 /* AIO_ReadPool_setAsync: 167 * Allows (de)activating async mode, to be used when the expected overhead 168 * of asyncio costs more than the expected gains. */ 169 void AIO_ReadPool_setAsync(ReadPoolCtx_t* ctx, int async); 170 171 /* AIO_ReadPool_consumeBytes: 172 * Consumes byes from srcBuffer's beginning and updates srcBufferLoaded accordingly. */ 173 void AIO_ReadPool_consumeBytes(ReadPoolCtx_t *ctx, size_t n); 174 175 /* AIO_ReadPool_fillBuffer: 176 * Makes sure buffer has at least n bytes loaded (as long as n is not bigger than the initialized bufferSize). 177 * Returns if srcBuffer has at least n bytes loaded or if we've reached the end of the file. 178 * Return value is the number of bytes added to the buffer. 179 * Note that srcBuffer might have up to 2 times bufferSize bytes. */ 180 size_t AIO_ReadPool_fillBuffer(ReadPoolCtx_t *ctx, size_t n); 181 182 /* AIO_ReadPool_consumeAndRefill: 183 * Consumes the current buffer and refills it with bufferSize bytes. */ 184 size_t AIO_ReadPool_consumeAndRefill(ReadPoolCtx_t *ctx); 185 186 /* AIO_ReadPool_setFile: 187 * Sets the source file for future read in the pool. Initiates reading immediately if file is not NULL. 188 * Waits for all current enqueued tasks to complete if a previous file was set. */ 189 void AIO_ReadPool_setFile(ReadPoolCtx_t *ctx, FILE* file); 190 191 /* AIO_ReadPool_getFile: 192 * Returns the current file set for the read pool. */ 193 FILE* AIO_ReadPool_getFile(const ReadPoolCtx_t *ctx); 194 195 /* AIO_ReadPool_closeFile: 196 * Closes the current set file. Waits for all current enqueued tasks to complete and resets state. */ 197 int AIO_ReadPool_closeFile(ReadPoolCtx_t *ctx); 198 199 #if defined (__cplusplus) 200 } 201 #endif 202 203 #endif /* ZSTD_FILEIO_ASYNCIO_H */ 204