xref: /aosp_15_r20/system/core/fs_mgr/libsnapshot/snapuserd/user-space-merge/merge_worker.cpp (revision 00c7fec1bb09f3284aad6a6f96d2f63dfc3650ad)
1 /*
2  * Copyright (C) 2021 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <libsnapshot/cow_format.h>
18 #include <pthread.h>
19 
20 #include <android-base/properties.h>
21 
22 #include "merge_worker.h"
23 #include "snapuserd_core.h"
24 #include "utility.h"
25 
26 namespace android {
27 namespace snapshot {
28 
29 using namespace android;
30 using namespace android::dm;
31 using android::base::unique_fd;
32 
MergeWorker(const std::string & cow_device,const std::string & misc_name,const std::string & base_path_merge,std::shared_ptr<SnapshotHandler> snapuserd,uint32_t cow_op_merge_size)33 MergeWorker::MergeWorker(const std::string& cow_device, const std::string& misc_name,
34                          const std::string& base_path_merge,
35                          std::shared_ptr<SnapshotHandler> snapuserd, uint32_t cow_op_merge_size)
36     : Worker(cow_device, misc_name, base_path_merge, snapuserd),
37       cow_op_merge_size_(cow_op_merge_size) {}
38 
PrepareMerge(uint64_t * source_offset,int * pending_ops,std::vector<const CowOperation * > * replace_zero_vec)39 int MergeWorker::PrepareMerge(uint64_t* source_offset, int* pending_ops,
40                               std::vector<const CowOperation*>* replace_zero_vec) {
41     int num_ops = *pending_ops;
42     // 0 indicates ro.virtual_ab.cow_op_merge_size was not set in the build
43     if (cow_op_merge_size_ != 0) {
44         num_ops = std::min(cow_op_merge_size_, static_cast<uint32_t>(*pending_ops));
45     }
46 
47     int nr_consecutive = 0;
48     bool checkOrderedOp = (replace_zero_vec == nullptr);
49     size_t num_blocks = 1;
50 
51     do {
52         if (!cowop_iter_->AtEnd() && num_ops) {
53             const CowOperation* cow_op = cowop_iter_->Get();
54             if (checkOrderedOp && !IsOrderedOp(*cow_op)) {
55                 break;
56             }
57 
58             *source_offset = static_cast<uint64_t>(cow_op->new_block) * BLOCK_SZ;
59             if (!checkOrderedOp) {
60                 replace_zero_vec->push_back(cow_op);
61                 if (cow_op->type() == kCowReplaceOp) {
62                     // Get the number of blocks this op has compressed
63                     num_blocks = (CowOpCompressionSize(cow_op, BLOCK_SZ) / BLOCK_SZ);
64                 }
65             }
66 
67             cowop_iter_->Next();
68             num_ops -= num_blocks;
69             nr_consecutive = num_blocks;
70 
71             while (!cowop_iter_->AtEnd() && num_ops) {
72                 const CowOperation* op = cowop_iter_->Get();
73                 if (checkOrderedOp && !IsOrderedOp(*op)) {
74                     break;
75                 }
76 
77                 uint64_t next_offset = static_cast<uint64_t>(op->new_block) * BLOCK_SZ;
78                 if (next_offset != (*source_offset + nr_consecutive * BLOCK_SZ)) {
79                     break;
80                 }
81 
82                 if (!checkOrderedOp) {
83                     if (op->type() == kCowReplaceOp) {
84                         num_blocks = (CowOpCompressionSize(op, BLOCK_SZ) / BLOCK_SZ);
85                         if (num_ops < num_blocks) {
86                             break;
87                         }
88                     } else {
89                         // zero op
90                         num_blocks = 1;
91                     }
92                     replace_zero_vec->push_back(op);
93                 }
94 
95                 nr_consecutive += num_blocks;
96                 num_ops -= num_blocks;
97                 cowop_iter_->Next();
98             }
99         }
100     } while (0);
101 
102     return nr_consecutive;
103 }
104 
MergeReplaceZeroOps()105 bool MergeWorker::MergeReplaceZeroOps() {
106     // Flush after merging 1MB. Since all ops are independent and there is no
107     // dependency between COW ops, we will flush the data and the number
108     // of ops merged in COW block device. If there is a crash, we will
109     // end up replaying some of the COW ops which were already merged. That is
110     // ok.
111     //
112     // Although increasing this greater than 1MB may help in improving merge
113     // times; however, on devices with low memory, this can be problematic
114     // when there are multiple merge threads in parallel.
115     int total_ops_merged_per_commit = (PAYLOAD_BUFFER_SZ / BLOCK_SZ);
116     int num_ops_merged = 0;
117 
118     SNAP_LOG(INFO) << "MergeReplaceZeroOps started....";
119 
120     while (!cowop_iter_->AtEnd()) {
121         int num_ops = PAYLOAD_BUFFER_SZ / BLOCK_SZ;
122         std::vector<const CowOperation*> replace_zero_vec;
123         uint64_t source_offset;
124 
125         int linear_blocks = PrepareMerge(&source_offset, &num_ops, &replace_zero_vec);
126         if (linear_blocks == 0) {
127             // Merge complete
128             CHECK(cowop_iter_->AtEnd());
129             break;
130         }
131 
132         for (size_t i = 0; i < replace_zero_vec.size(); i++) {
133             const CowOperation* cow_op = replace_zero_vec[i];
134             if (cow_op->type() == kCowReplaceOp) {
135                 size_t buffer_size = CowOpCompressionSize(cow_op, BLOCK_SZ);
136                 void* buffer = bufsink_.AcquireBuffer(buffer_size);
137                 if (!buffer) {
138                     SNAP_LOG(ERROR) << "AcquireBuffer failed in MergeReplaceOps";
139                     return false;
140                 }
141                 // Read the entire compressed buffer spanning multiple blocks
142                 if (!reader_->ReadData(cow_op, buffer, buffer_size)) {
143                     SNAP_LOG(ERROR) << "Failed to read COW in merge";
144                     return false;
145                 }
146             } else {
147                 void* buffer = bufsink_.AcquireBuffer(BLOCK_SZ);
148                 if (!buffer) {
149                     SNAP_LOG(ERROR) << "AcquireBuffer failed in MergeReplaceOps";
150                     return false;
151                 }
152                 CHECK(cow_op->type() == kCowZeroOp);
153                 memset(buffer, 0, BLOCK_SZ);
154             }
155         }
156 
157         size_t io_size = linear_blocks * BLOCK_SZ;
158 
159         // Merge - Write the contents back to base device
160         int ret = TEMP_FAILURE_RETRY(pwrite(base_path_merge_fd_.get(), bufsink_.GetPayloadBufPtr(),
161                                             io_size, source_offset));
162         if (ret < 0 || ret != io_size) {
163             SNAP_LOG(ERROR)
164                     << "Merge: ReplaceZeroOps: Failed to write to backing device while merging "
165                     << " at offset: " << source_offset << " io_size: " << io_size;
166             return false;
167         }
168 
169         num_ops_merged += replace_zero_vec.size();
170 
171         if (num_ops_merged >= total_ops_merged_per_commit) {
172             // Flush the data
173             if (fsync(base_path_merge_fd_.get()) < 0) {
174                 SNAP_LOG(ERROR) << "Merge: ReplaceZeroOps: Failed to fsync merged data";
175                 return false;
176             }
177 
178             // Track the merge completion
179             if (!snapuserd_->CommitMerge(num_ops_merged)) {
180                 SNAP_LOG(ERROR) << " Failed to commit the merged block in the header";
181                 return false;
182             }
183 
184             num_ops_merged = 0;
185         }
186 
187         bufsink_.ResetBufferOffset();
188 
189         if (snapuserd_->IsIOTerminated()) {
190             SNAP_LOG(ERROR) << "MergeReplaceZeroOps: MergeWorker threads terminated - shutting "
191                                "down merge";
192             return false;
193         }
194     }
195 
196     // Any left over ops not flushed yet.
197     if (num_ops_merged) {
198         // Flush the data
199         if (fsync(base_path_merge_fd_.get()) < 0) {
200             SNAP_LOG(ERROR) << "Merge: ReplaceZeroOps: Failed to fsync merged data";
201             return false;
202         }
203 
204         if (!snapuserd_->CommitMerge(num_ops_merged)) {
205             SNAP_LOG(ERROR) << " Failed to commit the merged block in the header";
206             return false;
207         }
208 
209         num_ops_merged = 0;
210     }
211 
212     return true;
213 }
214 
MergeOrderedOpsAsync()215 bool MergeWorker::MergeOrderedOpsAsync() {
216     void* mapped_addr = snapuserd_->GetMappedAddr();
217     void* read_ahead_buffer =
218             static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset());
219 
220     SNAP_LOG(INFO) << "MergeOrderedOpsAsync started....";
221 
222     while (!cowop_iter_->AtEnd()) {
223         const CowOperation* cow_op = cowop_iter_->Get();
224         if (!IsOrderedOp(*cow_op)) {
225             break;
226         }
227 
228         SNAP_LOG(DEBUG) << "Waiting for merge begin...";
229         // Wait for RA thread to notify that the merge window
230         // is ready for merging.
231         if (!snapuserd_->WaitForMergeBegin()) {
232             SNAP_LOG(ERROR) << "Failed waiting for merge to begin";
233             return false;
234         }
235 
236         std::optional<std::lock_guard<std::mutex>> buffer_lock;
237         // Acquire the buffer lock at this point so that RA thread
238         // doesn't step into this buffer. See b/377819507
239         buffer_lock.emplace(snapuserd_->GetBufferLock());
240 
241         snapuserd_->SetMergeInProgress(ra_block_index_);
242 
243         loff_t offset = 0;
244         int num_ops = snapuserd_->GetTotalBlocksToMerge();
245 
246         int pending_sqe = queue_depth_;
247         int pending_ios_to_submit = 0;
248         bool flush_required = false;
249         blocks_merged_in_group_ = 0;
250 
251         SNAP_LOG(DEBUG) << "Merging copy-ops of size: " << num_ops;
252         while (num_ops) {
253             uint64_t source_offset;
254 
255             int linear_blocks = PrepareMerge(&source_offset, &num_ops);
256 
257             if (linear_blocks != 0) {
258                 size_t io_size = (linear_blocks * BLOCK_SZ);
259 
260                 // Get an SQE entry from the ring and populate the I/O variables
261                 struct io_uring_sqe* sqe = io_uring_get_sqe(ring_.get());
262                 if (!sqe) {
263                     SNAP_PLOG(ERROR) << "io_uring_get_sqe failed during merge-ordered ops";
264                     return false;
265                 }
266 
267                 io_uring_prep_write(sqe, base_path_merge_fd_.get(),
268                                     (char*)read_ahead_buffer + offset, io_size, source_offset);
269 
270                 offset += io_size;
271                 num_ops -= linear_blocks;
272                 blocks_merged_in_group_ += linear_blocks;
273 
274                 pending_sqe -= 1;
275                 pending_ios_to_submit += 1;
276                 // These flags are important - We need to make sure that the
277                 // blocks are linked and are written in the same order as
278                 // populated. This is because of overlapping block writes.
279                 //
280                 // If there are no dependency, we can optimize this further by
281                 // allowing parallel writes; but for now, just link all the SQ
282                 // entries.
283                 sqe->flags |= (IOSQE_IO_LINK | IOSQE_ASYNC);
284             }
285 
286             // Ring is full or no more COW ops to be merged in this batch
287             if (pending_sqe == 0 || num_ops == 0 || (linear_blocks == 0 && pending_ios_to_submit)) {
288                 // If this is a last set of COW ops to be merged in this batch, we need
289                 // to sync the merged data. We will try to grab an SQE entry
290                 // and set the FSYNC command; additionally, make sure that
291                 // the fsync is done after all the I/O operations queued
292                 // in the ring is completed by setting IOSQE_IO_DRAIN.
293                 //
294                 // If there is no space in the ring, we will flush it later
295                 // by explicitly calling fsync() system call.
296                 if (num_ops == 0 || (linear_blocks == 0 && pending_ios_to_submit)) {
297                     if (pending_sqe != 0) {
298                         struct io_uring_sqe* sqe = io_uring_get_sqe(ring_.get());
299                         if (!sqe) {
300                             // very unlikely but let's continue and not fail the
301                             // merge - we will flush it later
302                             SNAP_PLOG(ERROR) << "io_uring_get_sqe failed during merge-ordered ops";
303                             flush_required = true;
304                         } else {
305                             io_uring_prep_fsync(sqe, base_path_merge_fd_.get(), 0);
306                             // Drain the queue before fsync
307                             io_uring_sqe_set_flags(sqe, IOSQE_IO_DRAIN);
308                             pending_sqe -= 1;
309                             flush_required = false;
310                             pending_ios_to_submit += 1;
311                             sqe->flags |= (IOSQE_IO_LINK | IOSQE_ASYNC);
312                         }
313                     } else {
314                         flush_required = true;
315                     }
316                 }
317 
318                 // Submit the IO for all the COW ops in a single syscall
319                 int ret = io_uring_submit(ring_.get());
320                 if (ret != pending_ios_to_submit) {
321                     SNAP_PLOG(ERROR)
322                             << "io_uring_submit failed for read-ahead: "
323                             << " io submit: " << ret << " expected: " << pending_ios_to_submit;
324                     return false;
325                 }
326 
327                 int pending_ios_to_complete = pending_ios_to_submit;
328                 pending_ios_to_submit = 0;
329 
330                 bool status = true;
331 
332                 // Reap I/O completions
333                 while (pending_ios_to_complete) {
334                     struct io_uring_cqe* cqe;
335 
336                     // io_uring_wait_cqe can potentially return -EAGAIN or -EINTR;
337                     // these error codes are not truly I/O errors; we can retry them
338                     // by re-populating the SQE entries and submitting the I/O
339                     // request back. However, we don't do that now; instead we
340                     // will fallback to synchronous I/O.
341                     ret = io_uring_wait_cqe(ring_.get(), &cqe);
342                     if (ret) {
343                         SNAP_LOG(ERROR) << "Merge: io_uring_wait_cqe failed: " << strerror(-ret);
344                         status = false;
345                         break;
346                     }
347 
348                     if (cqe->res < 0) {
349                         SNAP_LOG(ERROR) << "Merge: io_uring_wait_cqe failed with res: " << cqe->res;
350                         status = false;
351                         break;
352                     }
353 
354                     io_uring_cqe_seen(ring_.get(), cqe);
355                     pending_ios_to_complete -= 1;
356                 }
357 
358                 if (!status) {
359                     return false;
360                 }
361 
362                 pending_sqe = queue_depth_;
363             }
364 
365             if (linear_blocks == 0) {
366                 break;
367             }
368         }
369 
370         // Verify all ops are merged
371         CHECK(num_ops == 0);
372 
373         // Flush the data
374         if (flush_required && (fsync(base_path_merge_fd_.get()) < 0)) {
375             SNAP_LOG(ERROR) << " Failed to fsync merged data";
376             return false;
377         }
378 
379         // Merge is done and data is on disk. Update the COW Header about
380         // the merge completion
381         if (!snapuserd_->CommitMerge(snapuserd_->GetTotalBlocksToMerge())) {
382             SNAP_LOG(ERROR) << " Failed to commit the merged block in the header";
383             return false;
384         }
385 
386         SNAP_LOG(DEBUG) << "Block commit of size: " << snapuserd_->GetTotalBlocksToMerge();
387 
388         // Mark the block as merge complete
389         snapuserd_->SetMergeCompleted(ra_block_index_);
390 
391         // Release the buffer lock
392         buffer_lock.reset();
393 
394         // Notify RA thread that the merge thread is ready to merge the next
395         // window
396         snapuserd_->NotifyRAForMergeReady();
397 
398         // Get the next block
399         ra_block_index_ += 1;
400     }
401 
402     return true;
403 }
404 
MergeOrderedOps()405 bool MergeWorker::MergeOrderedOps() {
406     void* mapped_addr = snapuserd_->GetMappedAddr();
407     void* read_ahead_buffer =
408             static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset());
409 
410     SNAP_LOG(INFO) << "MergeOrderedOps started....";
411 
412     while (!cowop_iter_->AtEnd()) {
413         const CowOperation* cow_op = cowop_iter_->Get();
414         if (!IsOrderedOp(*cow_op)) {
415             break;
416         }
417 
418         SNAP_LOG(DEBUG) << "Waiting for merge begin...";
419         // Wait for RA thread to notify that the merge window
420         // is ready for merging.
421         if (!snapuserd_->WaitForMergeBegin()) {
422             snapuserd_->SetMergeFailed(ra_block_index_);
423             return false;
424         }
425 
426         std::optional<std::lock_guard<std::mutex>> buffer_lock;
427         // Acquire the buffer lock at this point so that RA thread
428         // doesn't step into this buffer. See b/377819507
429         buffer_lock.emplace(snapuserd_->GetBufferLock());
430 
431         snapuserd_->SetMergeInProgress(ra_block_index_);
432 
433         loff_t offset = 0;
434         int num_ops = snapuserd_->GetTotalBlocksToMerge();
435         SNAP_LOG(DEBUG) << "Merging copy-ops of size: " << num_ops;
436         while (num_ops) {
437             uint64_t source_offset;
438 
439             int linear_blocks = PrepareMerge(&source_offset, &num_ops);
440             if (linear_blocks == 0) {
441                 break;
442             }
443 
444             size_t io_size = (linear_blocks * BLOCK_SZ);
445             // Write to the base device. Data is already in the RA buffer. Note
446             // that XOR ops is already handled by the RA thread. We just write
447             // the contents out.
448             int ret = TEMP_FAILURE_RETRY(pwrite(base_path_merge_fd_.get(),
449                                                 (char*)read_ahead_buffer + offset, io_size,
450                                                 source_offset));
451             if (ret < 0 || ret != io_size) {
452                 SNAP_LOG(ERROR) << "Failed to write to backing device while merging "
453                                 << " at offset: " << source_offset << " io_size: " << io_size;
454                 snapuserd_->SetMergeFailed(ra_block_index_);
455                 return false;
456             }
457 
458             offset += io_size;
459             num_ops -= linear_blocks;
460         }
461 
462         // Verify all ops are merged
463         CHECK(num_ops == 0);
464 
465         // Flush the data
466         if (fsync(base_path_merge_fd_.get()) < 0) {
467             SNAP_LOG(ERROR) << " Failed to fsync merged data";
468             snapuserd_->SetMergeFailed(ra_block_index_);
469             return false;
470         }
471 
472         // Merge is done and data is on disk. Update the COW Header about
473         // the merge completion
474         if (!snapuserd_->CommitMerge(snapuserd_->GetTotalBlocksToMerge())) {
475             SNAP_LOG(ERROR) << " Failed to commit the merged block in the header";
476             snapuserd_->SetMergeFailed(ra_block_index_);
477             return false;
478         }
479 
480         SNAP_LOG(DEBUG) << "Block commit of size: " << snapuserd_->GetTotalBlocksToMerge();
481         // Mark the block as merge complete
482         snapuserd_->SetMergeCompleted(ra_block_index_);
483 
484         // Release the buffer lock
485         buffer_lock.reset();
486 
487         // Notify RA thread that the merge thread is ready to merge the next
488         // window
489         snapuserd_->NotifyRAForMergeReady();
490 
491         // Get the next block
492         ra_block_index_ += 1;
493     }
494 
495     return true;
496 }
497 
AsyncMerge()498 bool MergeWorker::AsyncMerge() {
499     if (!MergeOrderedOpsAsync()) {
500         SNAP_LOG(ERROR) << "MergeOrderedOpsAsync failed - Falling back to synchronous I/O";
501         // Reset the iter so that we retry the merge
502         while (blocks_merged_in_group_ && !cowop_iter_->AtBegin()) {
503             cowop_iter_->Prev();
504             blocks_merged_in_group_ -= 1;
505         }
506 
507         return false;
508     }
509 
510     SNAP_LOG(INFO) << "MergeOrderedOpsAsync completed";
511     return true;
512 }
513 
SyncMerge()514 bool MergeWorker::SyncMerge() {
515     if (!MergeOrderedOps()) {
516         SNAP_LOG(ERROR) << "Merge failed for ordered ops";
517         return false;
518     }
519 
520     SNAP_LOG(INFO) << "MergeOrderedOps completed";
521     return true;
522 }
523 
Merge()524 bool MergeWorker::Merge() {
525     cowop_iter_ = reader_->GetOpIter(true);
526 
527     bool retry = false;
528     bool ordered_ops_merge_status;
529 
530     // Start Async Merge
531     if (merge_async_) {
532         ordered_ops_merge_status = AsyncMerge();
533         if (!ordered_ops_merge_status) {
534             FinalizeIouring();
535             retry = true;
536             merge_async_ = false;
537         }
538     }
539 
540     // Check if we need to fallback and retry the merge
541     //
542     // If the device doesn't support async merge, we
543     // will directly enter here (aka devices with 4.x kernels)
544     const bool sync_merge_required = (retry || !merge_async_);
545 
546     if (sync_merge_required) {
547         ordered_ops_merge_status = SyncMerge();
548         if (!ordered_ops_merge_status) {
549             // Merge failed. Device will continue to be mounted
550             // off snapshots; merge will be retried during
551             // next reboot
552             SNAP_LOG(ERROR) << "Merge failed for ordered ops";
553             snapuserd_->MergeFailed();
554             return false;
555         }
556     }
557 
558     // Replace and Zero ops
559     if (!MergeReplaceZeroOps()) {
560         SNAP_LOG(ERROR) << "Merge failed for replace/zero ops";
561         snapuserd_->MergeFailed();
562         return false;
563     }
564 
565     snapuserd_->MergeCompleted();
566 
567     return true;
568 }
569 
InitializeIouring()570 bool MergeWorker::InitializeIouring() {
571     if (!snapuserd_->IsIouringSupported()) {
572         return false;
573     }
574 
575     ring_ = std::make_unique<struct io_uring>();
576 
577     int ret = io_uring_queue_init(queue_depth_, ring_.get(), 0);
578     if (ret) {
579         LOG(ERROR) << "Merge: io_uring_queue_init failed with ret: " << ret;
580         return false;
581     }
582 
583     merge_async_ = true;
584 
585     LOG(INFO) << "Merge: io_uring initialized with queue depth: " << queue_depth_;
586     return true;
587 }
588 
FinalizeIouring()589 void MergeWorker::FinalizeIouring() {
590     if (merge_async_) {
591         io_uring_queue_exit(ring_.get());
592     }
593 }
594 
Run()595 bool MergeWorker::Run() {
596     SNAP_LOG(DEBUG) << "Waiting for merge begin...";
597 
598     pthread_setname_np(pthread_self(), "MergeWorker");
599 
600     if (!snapuserd_->WaitForMergeBegin()) {
601         return true;
602     }
603     auto merge_thread_priority = android::base::GetUintProperty<uint32_t>(
604             "ro.virtual_ab.merge_thread_priority", ANDROID_PRIORITY_BACKGROUND);
605 
606     if (!SetThreadPriority(merge_thread_priority)) {
607         SNAP_PLOG(ERROR) << "Failed to set thread priority";
608     }
609 
610     if (!SetProfiles({"CPUSET_SP_BACKGROUND"})) {
611         SNAP_PLOG(ERROR) << "Failed to assign task profile to Mergeworker thread";
612     }
613 
614     SNAP_LOG(INFO) << "Merge starting..";
615 
616     bufsink_.Initialize(PAYLOAD_BUFFER_SZ);
617 
618     if (!Init()) {
619         SNAP_LOG(ERROR) << "Merge thread initialization failed...";
620         snapuserd_->MergeFailed();
621         return false;
622     }
623 
624     InitializeIouring();
625 
626     if (!Merge()) {
627         return false;
628     }
629 
630     FinalizeIouring();
631     CloseFds();
632     reader_->CloseCowFd();
633 
634     SNAP_LOG(INFO) << "Snapshot-Merge completed";
635 
636     return true;
637 }
638 
639 }  // namespace snapshot
640 }  // namespace android
641