xref: /aosp_15_r20/system/core/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h (revision 00c7fec1bb09f3284aad6a6f96d2f63dfc3650ad)
1 // Copyright (C) 2021 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #pragma once
16 
17 #include <linux/types.h>
18 #include <stdint.h>
19 #include <stdlib.h>
20 #include <sys/mman.h>
21 #include <sys/resource.h>
22 #include <sys/time.h>
23 #include <unistd.h>
24 
25 #include <condition_variable>
26 #include <cstring>
27 #include <future>
28 #include <iostream>
29 #include <limits>
30 #include <mutex>
31 #include <ostream>
32 #include <string>
33 #include <thread>
34 #include <unordered_map>
35 #include <unordered_set>
36 #include <vector>
37 
38 #include <android-base/file.h>
39 #include <android-base/logging.h>
40 #include <android-base/stringprintf.h>
41 #include <android-base/unique_fd.h>
42 #include <ext4_utils/ext4_utils.h>
43 #include <libdm/dm.h>
44 #include <libsnapshot/cow_reader.h>
45 #include <libsnapshot/cow_writer.h>
46 #include <snapuserd/block_server.h>
47 #include <snapuserd/snapuserd_buffer.h>
48 #include <snapuserd/snapuserd_kernel.h>
49 #include <storage_literals/storage_literals.h>
50 #include <system/thread_defs.h>
51 #include "snapuserd_readahead.h"
52 #include "snapuserd_verify.h"
53 
54 namespace android {
55 namespace snapshot {
56 
57 using android::base::unique_fd;
58 using namespace std::chrono_literals;
59 using namespace android::storage_literals;
60 
61 static constexpr size_t PAYLOAD_BUFFER_SZ = (1UL << 20);
62 static_assert(PAYLOAD_BUFFER_SZ >= BLOCK_SZ);
63 
64 static constexpr int kNumWorkerThreads = 4;
65 
66 #define SNAP_LOG(level) LOG(level) << misc_name_ << ": "
67 #define SNAP_PLOG(level) PLOG(level) << misc_name_ << ": "
68 
69 enum class MERGE_IO_TRANSITION {
70     INVALID,
71     MERGE_READY,
72     MERGE_BEGIN,
73     MERGE_FAILED,
74     MERGE_COMPLETE,
75     IO_TERMINATED,
76     READ_AHEAD_FAILURE
77 };
78 
79 class MergeWorker;
80 class ReadWorker;
81 
82 enum class MERGE_GROUP_STATE {
83     GROUP_MERGE_PENDING,
84     GROUP_MERGE_RA_READY,
85     GROUP_MERGE_IN_PROGRESS,
86     GROUP_MERGE_COMPLETED,
87     GROUP_MERGE_FAILED,
88     GROUP_INVALID,
89 };
90 
91 struct MergeGroupState {
92     MERGE_GROUP_STATE merge_state_;
93     // Ref count I/O when group state
94     // is in "GROUP_MERGE_PENDING"
95     size_t num_ios_in_progress;
96     std::mutex m_lock;
97     std::condition_variable m_cv;
98 
MergeGroupStateMergeGroupState99     MergeGroupState(MERGE_GROUP_STATE state, size_t n_ios)
100         : merge_state_(state), num_ios_in_progress(n_ios) {}
101 };
102 
103 class SnapshotHandler : public std::enable_shared_from_this<SnapshotHandler> {
104   public:
105     SnapshotHandler(std::string misc_name, std::string cow_device, std::string backing_device,
106                     std::string base_path_merge, std::shared_ptr<IBlockServerOpener> opener,
107                     int num_workers, bool use_iouring, bool perform_verification, bool o_direct,
108                     uint32_t cow_op_merge_size);
109     bool InitCowDevice();
110     bool Start();
111 
GetControlDevicePath()112     const std::string& GetControlDevicePath() { return control_device_; }
GetMiscName()113     const std::string& GetMiscName() { return misc_name_; }
114     uint64_t GetNumSectors() const;
IsAttached()115     const bool& IsAttached() const { return attached_; }
AttachControlDevice()116     void AttachControlDevice() { attached_ = true; }
117 
118     bool CheckMergeCompletionStatus();
119     bool CommitMerge(int num_merge_ops);
120 
CloseFds()121     void CloseFds() { cow_fd_ = {}; }
122     void FreeResources();
123 
124     bool InitializeWorkers();
125     std::unique_ptr<CowReader> CloneReaderForWorker();
GetSharedPtr()126     std::shared_ptr<SnapshotHandler> GetSharedPtr() { return shared_from_this(); }
127 
GetChunkVec()128     std::vector<std::pair<sector_t, const CowOperation*>>& GetChunkVec() { return chunk_vec_; }
129 
compare(std::pair<sector_t,const CowOperation * > p1,std::pair<sector_t,const CowOperation * > p2)130     static bool compare(std::pair<sector_t, const CowOperation*> p1,
131                         std::pair<sector_t, const CowOperation*> p2) {
132         return p1.first < p2.first;
133     }
134 
135     void UnmapBufferRegion();
136     bool MmapMetadata();
137 
138     // Read-ahead related functions
GetMappedAddr()139     void* GetMappedAddr() { return mapped_addr_; }
140     void PrepareReadAhead();
GetReadAheadMap()141     std::unordered_map<uint64_t, void*>& GetReadAheadMap() { return read_ahead_buffer_map_; }
142 
143     // State transitions for merge
144     void InitiateMerge();
145     void MonitorMerge();
146     void WakeupMonitorMergeThread();
147     void WaitForMergeComplete();
148     bool WaitForMergeBegin();
149     void RaThreadStarted();
150     void WaitForRaThreadToStart();
151     void NotifyRAForMergeReady();
152     bool WaitForMergeReady();
153     void MergeFailed();
154     bool IsIOTerminated();
155     void MergeCompleted();
156     void NotifyIOTerminated();
157     bool ReadAheadIOCompleted(bool sync);
158     void ReadAheadIOFailed();
159 
ShouldReconstructDataFromCow()160     bool ShouldReconstructDataFromCow() { return populate_data_from_cow_; }
FinishReconstructDataFromCow()161     void FinishReconstructDataFromCow() { populate_data_from_cow_ = false; }
162     void MarkMergeComplete();
163     // Return the snapshot status
164     std::string GetMergeStatus();
165 
166     // RA related functions
167     uint64_t GetBufferMetadataOffset();
168     size_t GetBufferMetadataSize();
169     size_t GetBufferDataOffset();
170     size_t GetBufferDataSize();
171 
172     // Total number of blocks to be merged in a given read-ahead buffer region
SetMergedBlockCountForNextCommit(int x)173     void SetMergedBlockCountForNextCommit(int x) { total_ra_blocks_merged_ = x; }
GetTotalBlocksToMerge()174     int GetTotalBlocksToMerge() { return total_ra_blocks_merged_; }
MergeInitiated()175     bool MergeInitiated() { return merge_initiated_; }
MergeMonitored()176     bool MergeMonitored() { return merge_monitored_; }
GetMergePercentage()177     double GetMergePercentage() { return merge_completion_percentage_; }
178 
179     // Merge Block State Transitions
180     void SetMergeCompleted(size_t block_index);
181     void SetMergeInProgress(size_t block_index);
182     void SetMergeFailed(size_t block_index);
183     void NotifyIOCompletion(uint64_t new_block);
184     bool GetRABuffer(std::unique_lock<std::mutex>* lock, uint64_t block, void* buffer);
185     MERGE_GROUP_STATE ProcessMergingBlock(uint64_t new_block, void* buffer);
186 
187     bool IsIouringSupported();
188     bool CheckPartitionVerification();
GetBufferLock()189     std::mutex& GetBufferLock() { return buffer_lock_; }
190 
191   private:
192     bool ReadMetadata();
ChunkToSector(chunk_t chunk)193     sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; }
SectorToChunk(sector_t sector)194     chunk_t SectorToChunk(sector_t sector) { return sector >> CHUNK_SHIFT; }
IsBlockAligned(uint64_t read_size)195     bool IsBlockAligned(uint64_t read_size) { return ((read_size & (BLOCK_SZ - 1)) == 0); }
196     struct BufferState* GetBufferState();
197     void UpdateMergeCompletionPercentage();
198 
199     // COW device
200     std::string cow_device_;
201     // Source device
202     std::string backing_store_device_;
203     // dm-user control device
204     std::string control_device_;
205     std::string misc_name_;
206     // Base device for merging
207     std::string base_path_merge_;
208 
209     unique_fd cow_fd_;
210 
211     std::unique_ptr<CowReader> reader_;
212 
213     // chunk_vec stores the pseudo mapping of sector
214     // to COW operations.
215     std::vector<std::pair<sector_t, const CowOperation*>> chunk_vec_;
216 
217     std::mutex lock_;
218     std::condition_variable cv;
219 
220     // Lock the buffer used for snapshot-merge
221     std::mutex buffer_lock_;
222 
223     void* mapped_addr_;
224     size_t total_mapped_addr_length_;
225 
226     std::vector<std::unique_ptr<ReadWorker>> worker_threads_;
227     // Read-ahead related
228     bool populate_data_from_cow_ = false;
229     bool ra_thread_ = false;
230     bool ra_thread_started_ = false;
231     int total_ra_blocks_merged_ = 0;
232     MERGE_IO_TRANSITION io_state_ = MERGE_IO_TRANSITION::INVALID;
233     std::unique_ptr<ReadAhead> read_ahead_thread_;
234     std::unordered_map<uint64_t, void*> read_ahead_buffer_map_;
235 
236     // user-space-merging
237     std::unordered_map<uint64_t, int> block_to_ra_index_;
238 
239     // Merge Block state
240     std::vector<std::unique_ptr<MergeGroupState>> merge_blk_state_;
241 
242     std::unique_ptr<MergeWorker> merge_thread_;
243     double merge_completion_percentage_;
244 
245     bool merge_initiated_ = false;
246     bool merge_monitored_ = false;
247     bool attached_ = false;
248     bool is_io_uring_enabled_ = false;
249     bool scratch_space_ = false;
250     int num_worker_threads_ = kNumWorkerThreads;
251     bool perform_verification_ = true;
252     bool resume_merge_ = false;
253     bool merge_complete_ = false;
254     bool o_direct_ = false;
255     uint32_t cow_op_merge_size_ = 0;
256     std::unique_ptr<UpdateVerify> update_verify_;
257     std::shared_ptr<IBlockServerOpener> block_server_opener_;
258 };
259 
260 std::ostream& operator<<(std::ostream& os, MERGE_IO_TRANSITION value);
261 static_assert(sizeof(off_t) == sizeof(uint64_t));
262 
263 }  // namespace snapshot
264 }  // namespace android
265