1 /* 2 * Copyright (c) Meta Platforms, Inc. and affiliates. 3 * All rights reserved. 4 * 5 * This source code is licensed under both the BSD-style license (found in the 6 * LICENSE file in the root directory of this source tree) and the GPLv2 (found 7 * in the COPYING file in the root directory of this source tree). 8 */ 9 #pragma once 10 11 #include "ErrorHolder.h" 12 #include "Logging.h" 13 #include "Options.h" 14 #include "utils/Buffer.h" 15 #include "utils/Range.h" 16 #include "utils/ResourcePool.h" 17 #include "utils/ThreadPool.h" 18 #include "utils/WorkQueue.h" 19 #define ZSTD_STATIC_LINKING_ONLY 20 #define ZSTD_DISABLE_DEPRECATE_WARNINGS /* No deprecation warnings, pzstd itself is deprecated 21 * and uses deprecated functions 22 */ 23 #include "zstd.h" 24 #undef ZSTD_STATIC_LINKING_ONLY 25 26 #include <cstddef> 27 #include <cstdint> 28 #include <memory> 29 30 namespace pzstd { 31 /** 32 * Runs pzstd with `options` and returns the number of bytes written. 33 * An error occurred if `errorHandler.hasError()`. 34 * 35 * @param options The pzstd options to use for (de)compression 36 * @returns 0 upon success and non-zero on failure. 37 */ 38 int pzstdMain(const Options& options); 39 40 class SharedState { 41 public: SharedState(const Options & options)42 SharedState(const Options& options) : log(options.verbosity) { 43 if (!options.decompress) { 44 auto parameters = options.determineParameters(); 45 cStreamPool.reset(new ResourcePool<ZSTD_CStream>{ 46 [this, parameters]() -> ZSTD_CStream* { 47 this->log(kLogVerbose, "%s\n", "Creating new ZSTD_CStream"); 48 auto zcs = ZSTD_createCStream(); 49 if (zcs) { 50 auto err = ZSTD_initCStream_advanced( 51 zcs, nullptr, 0, parameters, 0); 52 if (ZSTD_isError(err)) { 53 ZSTD_freeCStream(zcs); 54 return nullptr; 55 } 56 } 57 return zcs; 58 }, 59 [](ZSTD_CStream *zcs) { 60 ZSTD_freeCStream(zcs); 61 }}); 62 } else { 63 dStreamPool.reset(new ResourcePool<ZSTD_DStream>{ 64 [this]() -> ZSTD_DStream* { 65 this->log(kLogVerbose, "%s\n", "Creating new ZSTD_DStream"); 66 auto zds = ZSTD_createDStream(); 67 if (zds) { 68 auto err = ZSTD_initDStream(zds); 69 if (ZSTD_isError(err)) { 70 ZSTD_freeDStream(zds); 71 return nullptr; 72 } 73 } 74 return zds; 75 }, 76 [](ZSTD_DStream *zds) { 77 ZSTD_freeDStream(zds); 78 }}); 79 } 80 } 81 ~SharedState()82 ~SharedState() { 83 // The resource pools have references to this, so destroy them first. 84 cStreamPool.reset(); 85 dStreamPool.reset(); 86 } 87 88 Logger log; 89 ErrorHolder errorHolder; 90 std::unique_ptr<ResourcePool<ZSTD_CStream>> cStreamPool; 91 std::unique_ptr<ResourcePool<ZSTD_DStream>> dStreamPool; 92 }; 93 94 /** 95 * Streams input from `fd`, breaks input up into chunks, and compresses each 96 * chunk independently. Output of each chunk gets streamed to a queue, and 97 * the output queues get put into `chunks` in order. 98 * 99 * @param state The shared state 100 * @param chunks Each compression jobs output queue gets `pushed()` here 101 * as soon as it is available 102 * @param executor The thread pool to run compression jobs in 103 * @param fd The input file descriptor 104 * @param size The size of the input file if known, 0 otherwise 105 * @param numThreads The number of threads in the thread pool 106 * @param parameters The zstd parameters to use for compression 107 * @returns The number of bytes read from the file 108 */ 109 std::uint64_t asyncCompressChunks( 110 SharedState& state, 111 WorkQueue<std::shared_ptr<BufferWorkQueue>>& chunks, 112 ThreadPool& executor, 113 FILE* fd, 114 std::uintmax_t size, 115 std::size_t numThreads, 116 ZSTD_parameters parameters); 117 118 /** 119 * Streams input from `fd`. If pzstd headers are available it breaks the input 120 * up into independent frames. It sends each frame to an independent 121 * decompression job. Output of each frame gets streamed to a queue, and 122 * the output queues get put into `frames` in order. 123 * 124 * @param state The shared state 125 * @param frames Each decompression jobs output queue gets `pushed()` here 126 * as soon as it is available 127 * @param executor The thread pool to run compression jobs in 128 * @param fd The input file descriptor 129 * @returns The number of bytes read from the file 130 */ 131 std::uint64_t asyncDecompressFrames( 132 SharedState& state, 133 WorkQueue<std::shared_ptr<BufferWorkQueue>>& frames, 134 ThreadPool& executor, 135 FILE* fd); 136 137 /** 138 * Streams input in from each queue in `outs` in order, and writes the data to 139 * `outputFd`. 140 * 141 * @param state The shared state 142 * @param outs A queue of output queues, one for each 143 * (de)compression job. 144 * @param outputFd The file descriptor to write to 145 * @param decompress Are we decompressing? 146 * @returns The number of bytes written 147 */ 148 std::uint64_t writeFile( 149 SharedState& state, 150 WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs, 151 FILE* outputFd, 152 bool decompress); 153 } 154