xref: /aosp_15_r20/system/core/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp (revision 00c7fec1bb09f3284aad6a6f96d2f63dfc3650ad)
1 /*
2  * Copyright (C) 2021 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "snapuserd_core.h"
18 
19 #include <android-base/chrono_utils.h>
20 #include <android-base/properties.h>
21 #include <android-base/scopeguard.h>
22 #include <android-base/strings.h>
23 #include <snapuserd/dm_user_block_server.h>
24 
25 #include "merge_worker.h"
26 #include "read_worker.h"
27 #include "utility.h"
28 
29 namespace android {
30 namespace snapshot {
31 
32 using namespace android;
33 using namespace android::dm;
34 using android::base::unique_fd;
35 
SnapshotHandler(std::string misc_name,std::string cow_device,std::string backing_device,std::string base_path_merge,std::shared_ptr<IBlockServerOpener> opener,int num_worker_threads,bool use_iouring,bool perform_verification,bool o_direct,uint32_t cow_op_merge_size)36 SnapshotHandler::SnapshotHandler(std::string misc_name, std::string cow_device,
37                                  std::string backing_device, std::string base_path_merge,
38                                  std::shared_ptr<IBlockServerOpener> opener, int num_worker_threads,
39                                  bool use_iouring, bool perform_verification, bool o_direct,
40                                  uint32_t cow_op_merge_size) {
41     misc_name_ = std::move(misc_name);
42     cow_device_ = std::move(cow_device);
43     backing_store_device_ = std::move(backing_device);
44     block_server_opener_ = std::move(opener);
45     base_path_merge_ = std::move(base_path_merge);
46     num_worker_threads_ = num_worker_threads;
47     is_io_uring_enabled_ = use_iouring;
48     perform_verification_ = perform_verification;
49     o_direct_ = o_direct;
50     cow_op_merge_size_ = cow_op_merge_size;
51 }
52 
InitializeWorkers()53 bool SnapshotHandler::InitializeWorkers() {
54     for (int i = 0; i < num_worker_threads_; i++) {
55         auto wt = std::make_unique<ReadWorker>(cow_device_, backing_store_device_, misc_name_,
56                                                base_path_merge_, GetSharedPtr(),
57                                                block_server_opener_, o_direct_);
58         if (!wt->Init()) {
59             SNAP_LOG(ERROR) << "Thread initialization failed";
60             return false;
61         }
62 
63         worker_threads_.push_back(std::move(wt));
64     }
65     merge_thread_ = std::make_unique<MergeWorker>(cow_device_, misc_name_, base_path_merge_,
66                                                   GetSharedPtr(), cow_op_merge_size_);
67 
68     read_ahead_thread_ = std::make_unique<ReadAhead>(cow_device_, backing_store_device_, misc_name_,
69                                                      GetSharedPtr(), cow_op_merge_size_);
70 
71     update_verify_ = std::make_unique<UpdateVerify>(misc_name_);
72 
73     return true;
74 }
75 
CloneReaderForWorker()76 std::unique_ptr<CowReader> SnapshotHandler::CloneReaderForWorker() {
77     return reader_->CloneCowReader();
78 }
79 
UpdateMergeCompletionPercentage()80 void SnapshotHandler::UpdateMergeCompletionPercentage() {
81     struct CowHeader* ch = reinterpret_cast<struct CowHeader*>(mapped_addr_);
82     merge_completion_percentage_ = (ch->num_merge_ops * 100.0) / reader_->get_num_total_data_ops();
83 
84     SNAP_LOG(DEBUG) << "Merge-complete %: " << merge_completion_percentage_
85                     << " num_merge_ops: " << ch->num_merge_ops
86                     << " total-ops: " << reader_->get_num_total_data_ops();
87 
88     if (ch->num_merge_ops == reader_->get_num_total_data_ops()) {
89         MarkMergeComplete();
90     }
91 }
92 
CommitMerge(int num_merge_ops)93 bool SnapshotHandler::CommitMerge(int num_merge_ops) {
94     struct CowHeader* ch = reinterpret_cast<struct CowHeader*>(mapped_addr_);
95     ch->num_merge_ops += num_merge_ops;
96 
97     if (scratch_space_) {
98         if (ra_thread_) {
99             struct BufferState* ra_state = GetBufferState();
100             ra_state->read_ahead_state = kCowReadAheadInProgress;
101         }
102 
103         int ret = msync(mapped_addr_, BLOCK_SZ, MS_SYNC);
104         if (ret < 0) {
105             SNAP_PLOG(ERROR) << "msync header failed: " << ret;
106             return false;
107         }
108     } else {
109         reader_->UpdateMergeOpsCompleted(num_merge_ops);
110         const auto& header = reader_->GetHeader();
111 
112         if (lseek(cow_fd_.get(), 0, SEEK_SET) < 0) {
113             SNAP_PLOG(ERROR) << "lseek failed";
114             return false;
115         }
116 
117         if (!android::base::WriteFully(cow_fd_, &header, header.prefix.header_size)) {
118             SNAP_PLOG(ERROR) << "Write to header failed";
119             return false;
120         }
121 
122         if (fsync(cow_fd_.get()) < 0) {
123             SNAP_PLOG(ERROR) << "fsync failed";
124             return false;
125         }
126     }
127 
128     // Update the merge completion - this is used by update engine
129     // to track the completion. No need to take a lock. It is ok
130     // even if there is a miss on reading a latest updated value.
131     // Subsequent polling will eventually converge to completion.
132     UpdateMergeCompletionPercentage();
133 
134     return true;
135 }
136 
PrepareReadAhead()137 void SnapshotHandler::PrepareReadAhead() {
138     struct BufferState* ra_state = GetBufferState();
139     // Check if the data has to be re-constructed from COW device
140     if (ra_state->read_ahead_state == kCowReadAheadDone) {
141         populate_data_from_cow_ = true;
142     } else {
143         populate_data_from_cow_ = false;
144     }
145 
146     NotifyRAForMergeReady();
147 }
148 
CheckMergeCompletionStatus()149 bool SnapshotHandler::CheckMergeCompletionStatus() {
150     if (!merge_initiated_) {
151         SNAP_LOG(INFO) << "Merge was not initiated. Total-data-ops: "
152                        << reader_->get_num_total_data_ops();
153         return false;
154     }
155 
156     struct CowHeader* ch = reinterpret_cast<struct CowHeader*>(mapped_addr_);
157 
158     SNAP_LOG(INFO) << "Merge-status: Total-Merged-ops: " << ch->num_merge_ops
159                    << " Total-data-ops: " << reader_->get_num_total_data_ops();
160     return true;
161 }
162 
ReadMetadata()163 bool SnapshotHandler::ReadMetadata() {
164     reader_ = std::make_unique<CowReader>(CowReader::ReaderFlags::USERSPACE_MERGE, true);
165     CowOptions options;
166 
167     SNAP_LOG(DEBUG) << "ReadMetadata: Parsing cow file";
168 
169     if (!reader_->Parse(cow_fd_)) {
170         SNAP_LOG(ERROR) << "Failed to parse";
171         return false;
172     }
173 
174     const auto& header = reader_->GetHeader();
175     if (!(header.block_size == BLOCK_SZ)) {
176         SNAP_LOG(ERROR) << "Invalid header block size found: " << header.block_size;
177         return false;
178     }
179 
180     SNAP_LOG(INFO) << "Merge-ops: " << header.num_merge_ops;
181     if (header.num_merge_ops) {
182         resume_merge_ = true;
183         SNAP_LOG(INFO) << "Resume Snapshot-merge";
184     }
185 
186     if (!MmapMetadata()) {
187         SNAP_LOG(ERROR) << "mmap failed";
188         return false;
189     }
190 
191     UpdateMergeCompletionPercentage();
192 
193     // Initialize the iterator for reading metadata
194     std::unique_ptr<ICowOpIter> cowop_iter = reader_->GetOpIter(true);
195 
196     int num_ra_ops_per_iter = ((GetBufferDataSize()) / BLOCK_SZ);
197     int ra_index = 0;
198 
199     size_t copy_ops = 0, replace_ops = 0, zero_ops = 0, xor_ops = 0;
200 
201     while (!cowop_iter->AtEnd()) {
202         const CowOperation* cow_op = cowop_iter->Get();
203 
204         if (cow_op->type() == kCowCopyOp) {
205             copy_ops += 1;
206         } else if (cow_op->type() == kCowReplaceOp) {
207             replace_ops += 1;
208         } else if (cow_op->type() == kCowZeroOp) {
209             zero_ops += 1;
210         } else if (cow_op->type() == kCowXorOp) {
211             xor_ops += 1;
212         }
213 
214         chunk_vec_.push_back(std::make_pair(ChunkToSector(cow_op->new_block), cow_op));
215 
216         if (IsOrderedOp(*cow_op)) {
217             ra_thread_ = true;
218             block_to_ra_index_[cow_op->new_block] = ra_index;
219             num_ra_ops_per_iter -= 1;
220 
221             if ((ra_index + 1) - merge_blk_state_.size() == 1) {
222                 std::unique_ptr<MergeGroupState> blk_state = std::make_unique<MergeGroupState>(
223                         MERGE_GROUP_STATE::GROUP_MERGE_PENDING, 0);
224 
225                 merge_blk_state_.push_back(std::move(blk_state));
226             }
227 
228             // Move to next RA block
229             if (num_ra_ops_per_iter == 0) {
230                 num_ra_ops_per_iter = ((GetBufferDataSize()) / BLOCK_SZ);
231                 ra_index += 1;
232             }
233         }
234         cowop_iter->Next();
235     }
236 
237     chunk_vec_.shrink_to_fit();
238 
239     // Sort the vector based on sectors as we need this during un-aligned access
240     std::sort(chunk_vec_.begin(), chunk_vec_.end(), compare);
241 
242     PrepareReadAhead();
243 
244     SNAP_LOG(INFO) << "Merged-ops: " << header.num_merge_ops
245                    << " Total-data-ops: " << reader_->get_num_total_data_ops()
246                    << " Unmerged-ops: " << chunk_vec_.size() << " Copy-ops: " << copy_ops
247                    << " Zero-ops: " << zero_ops << " Replace-ops: " << replace_ops
248                    << " Xor-ops: " << xor_ops;
249 
250     return true;
251 }
252 
MmapMetadata()253 bool SnapshotHandler::MmapMetadata() {
254     const auto& header = reader_->GetHeader();
255 
256     total_mapped_addr_length_ = header.prefix.header_size + BUFFER_REGION_DEFAULT_SIZE;
257 
258     if (header.prefix.major_version >= 2 && header.buffer_size > 0) {
259         scratch_space_ = true;
260     }
261 
262     if (scratch_space_) {
263         mapped_addr_ = mmap(NULL, total_mapped_addr_length_, PROT_READ | PROT_WRITE, MAP_SHARED,
264                             cow_fd_.get(), 0);
265     } else {
266         mapped_addr_ = mmap(NULL, total_mapped_addr_length_, PROT_READ | PROT_WRITE,
267                             MAP_SHARED | MAP_ANONYMOUS, -1, 0);
268         struct CowHeader* ch = reinterpret_cast<struct CowHeader*>(mapped_addr_);
269         ch->num_merge_ops = header.num_merge_ops;
270     }
271 
272     if (mapped_addr_ == MAP_FAILED) {
273         SNAP_LOG(ERROR) << "mmap metadata failed";
274         return false;
275     }
276 
277     return true;
278 }
279 
UnmapBufferRegion()280 void SnapshotHandler::UnmapBufferRegion() {
281     int ret = munmap(mapped_addr_, total_mapped_addr_length_);
282     if (ret < 0) {
283         SNAP_PLOG(ERROR) << "munmap failed";
284     }
285 }
286 
InitCowDevice()287 bool SnapshotHandler::InitCowDevice() {
288     cow_fd_.reset(open(cow_device_.c_str(), O_RDWR));
289     if (cow_fd_ < 0) {
290         SNAP_PLOG(ERROR) << "Open Failed: " << cow_device_;
291         return false;
292     }
293 
294     return ReadMetadata();
295 }
296 
297 /*
298  * Entry point to launch threads
299  */
Start()300 bool SnapshotHandler::Start() {
301     std::vector<std::future<bool>> threads;
302     std::future<bool> ra_thread_status;
303 
304     if (ra_thread_) {
305         ra_thread_status =
306                 std::async(std::launch::async, &ReadAhead::RunThread, read_ahead_thread_.get());
307         // If this is a merge-resume path, wait until RA thread is fully up as
308         // the data has to be re-constructed from the scratch space.
309         if (resume_merge_ && ShouldReconstructDataFromCow()) {
310             WaitForRaThreadToStart();
311         }
312     }
313 
314     // Launch worker threads
315     for (int i = 0; i < worker_threads_.size(); i++) {
316         threads.emplace_back(
317                 std::async(std::launch::async, &ReadWorker::Run, worker_threads_[i].get()));
318     }
319 
320     std::future<bool> merge_thread =
321             std::async(std::launch::async, &MergeWorker::Run, merge_thread_.get());
322 
323     // Now that the worker threads are up, scan the partitions.
324     // If the snapshot-merge is being resumed, there is no need to scan as the
325     // current slot is already marked as boot complete.
326     if (perform_verification_ && !resume_merge_) {
327         update_verify_->VerifyUpdatePartition();
328     }
329 
330     bool ret = true;
331     for (auto& t : threads) {
332         ret = t.get() && ret;
333     }
334 
335     // Worker threads are terminated by this point - this can only happen:
336     //
337     // 1: If dm-user device is destroyed
338     // 2: We had an I/O failure when reading root partitions
339     //
340     // In case (1), this would be a graceful shutdown. In this case, merge
341     // thread and RA thread should have already terminated by this point. We will be
342     // destroying the dm-user device only _after_ merge is completed.
343     //
344     // In case (2), if merge thread had started, then it will be
345     // continuing to merge; however, since we had an I/O failure and the
346     // I/O on root partitions are no longer served, we will terminate the
347     // merge
348 
349     NotifyIOTerminated();
350 
351     bool read_ahead_retval = false;
352 
353     SNAP_LOG(INFO) << "Snapshot I/O terminated. Waiting for merge thread....";
354     bool merge_thread_status = merge_thread.get();
355 
356     if (ra_thread_) {
357         read_ahead_retval = ra_thread_status.get();
358     }
359 
360     SNAP_LOG(INFO) << "Worker threads terminated with ret: " << ret
361                    << " Merge-thread with ret: " << merge_thread_status
362                    << " RA-thread with ret: " << read_ahead_retval;
363     return ret;
364 }
365 
GetBufferMetadataOffset()366 uint64_t SnapshotHandler::GetBufferMetadataOffset() {
367     const auto& header = reader_->GetHeader();
368 
369     return (header.prefix.header_size + sizeof(BufferState));
370 }
371 
372 /*
373  * Metadata for read-ahead is 16 bytes. For a 2 MB region, we will
374  * end up with 8k (2 PAGE) worth of metadata. Thus, a 2MB buffer
375  * region is split into:
376  *
377  * 1: 8k metadata
378  * 2: Scratch space
379  *
380  */
GetBufferMetadataSize()381 size_t SnapshotHandler::GetBufferMetadataSize() {
382     const auto& header = reader_->GetHeader();
383     size_t buffer_size = header.buffer_size;
384 
385     // If there is no scratch space, then just use the
386     // anonymous memory
387     if (buffer_size == 0) {
388         buffer_size = BUFFER_REGION_DEFAULT_SIZE;
389     }
390 
391     return ((buffer_size * sizeof(struct ScratchMetadata)) / BLOCK_SZ);
392 }
393 
GetBufferDataOffset()394 size_t SnapshotHandler::GetBufferDataOffset() {
395     const auto& header = reader_->GetHeader();
396 
397     return (header.prefix.header_size + GetBufferMetadataSize());
398 }
399 
400 /*
401  * (2MB - 8K = 2088960 bytes) will be the buffer region to hold the data.
402  */
GetBufferDataSize()403 size_t SnapshotHandler::GetBufferDataSize() {
404     const auto& header = reader_->GetHeader();
405     size_t buffer_size = header.buffer_size;
406 
407     // If there is no scratch space, then just use the
408     // anonymous memory
409     if (buffer_size == 0) {
410         buffer_size = BUFFER_REGION_DEFAULT_SIZE;
411     }
412 
413     return (buffer_size - GetBufferMetadataSize());
414 }
415 
GetBufferState()416 struct BufferState* SnapshotHandler::GetBufferState() {
417     const auto& header = reader_->GetHeader();
418 
419     struct BufferState* ra_state =
420             reinterpret_cast<struct BufferState*>((char*)mapped_addr_ + header.prefix.header_size);
421     return ra_state;
422 }
423 
IsIouringSupported()424 bool SnapshotHandler::IsIouringSupported() {
425     if (!KernelSupportsIoUring()) {
426         return false;
427     }
428 
429     // During selinux init transition, libsnapshot will propagate the
430     // status of io_uring enablement. As properties are not initialized,
431     // we cannot query system property.
432     if (is_io_uring_enabled_) {
433         return true;
434     }
435 
436     // Finally check the system property
437     return android::base::GetBoolProperty("ro.virtual_ab.io_uring.enabled", false);
438 }
439 
CheckPartitionVerification()440 bool SnapshotHandler::CheckPartitionVerification() {
441     return update_verify_->CheckPartitionVerification();
442 }
443 
FreeResources()444 void SnapshotHandler::FreeResources() {
445     worker_threads_.clear();
446     read_ahead_thread_ = nullptr;
447     merge_thread_ = nullptr;
448 }
449 
GetNumSectors() const450 uint64_t SnapshotHandler::GetNumSectors() const {
451     unique_fd fd(TEMP_FAILURE_RETRY(open(base_path_merge_.c_str(), O_RDONLY | O_CLOEXEC)));
452     if (fd < 0) {
453         SNAP_LOG(ERROR) << "Cannot open base path: " << base_path_merge_;
454         return false;
455     }
456 
457     uint64_t dev_sz = get_block_device_size(fd.get());
458     if (!dev_sz) {
459         SNAP_LOG(ERROR) << "Failed to find block device size: " << base_path_merge_;
460         return false;
461     }
462 
463     return dev_sz / SECTOR_SIZE;
464 }
465 
466 }  // namespace snapshot
467 }  // namespace android
468