1 /*
2 * Copyright (C) 2021 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <libsnapshot/cow_format.h>
18 #include <pthread.h>
19
20 #include <android-base/properties.h>
21
22 #include "merge_worker.h"
23 #include "snapuserd_core.h"
24 #include "utility.h"
25
26 namespace android {
27 namespace snapshot {
28
29 using namespace android;
30 using namespace android::dm;
31 using android::base::unique_fd;
32
MergeWorker(const std::string & cow_device,const std::string & misc_name,const std::string & base_path_merge,std::shared_ptr<SnapshotHandler> snapuserd,uint32_t cow_op_merge_size)33 MergeWorker::MergeWorker(const std::string& cow_device, const std::string& misc_name,
34 const std::string& base_path_merge,
35 std::shared_ptr<SnapshotHandler> snapuserd, uint32_t cow_op_merge_size)
36 : Worker(cow_device, misc_name, base_path_merge, snapuserd),
37 cow_op_merge_size_(cow_op_merge_size) {}
38
PrepareMerge(uint64_t * source_offset,int * pending_ops,std::vector<const CowOperation * > * replace_zero_vec)39 int MergeWorker::PrepareMerge(uint64_t* source_offset, int* pending_ops,
40 std::vector<const CowOperation*>* replace_zero_vec) {
41 int num_ops = *pending_ops;
42 // 0 indicates ro.virtual_ab.cow_op_merge_size was not set in the build
43 if (cow_op_merge_size_ != 0) {
44 num_ops = std::min(cow_op_merge_size_, static_cast<uint32_t>(*pending_ops));
45 }
46
47 int nr_consecutive = 0;
48 bool checkOrderedOp = (replace_zero_vec == nullptr);
49 size_t num_blocks = 1;
50
51 do {
52 if (!cowop_iter_->AtEnd() && num_ops) {
53 const CowOperation* cow_op = cowop_iter_->Get();
54 if (checkOrderedOp && !IsOrderedOp(*cow_op)) {
55 break;
56 }
57
58 *source_offset = static_cast<uint64_t>(cow_op->new_block) * BLOCK_SZ;
59 if (!checkOrderedOp) {
60 replace_zero_vec->push_back(cow_op);
61 if (cow_op->type() == kCowReplaceOp) {
62 // Get the number of blocks this op has compressed
63 num_blocks = (CowOpCompressionSize(cow_op, BLOCK_SZ) / BLOCK_SZ);
64 }
65 }
66
67 cowop_iter_->Next();
68 num_ops -= num_blocks;
69 nr_consecutive = num_blocks;
70
71 while (!cowop_iter_->AtEnd() && num_ops) {
72 const CowOperation* op = cowop_iter_->Get();
73 if (checkOrderedOp && !IsOrderedOp(*op)) {
74 break;
75 }
76
77 uint64_t next_offset = static_cast<uint64_t>(op->new_block) * BLOCK_SZ;
78 if (next_offset != (*source_offset + nr_consecutive * BLOCK_SZ)) {
79 break;
80 }
81
82 if (!checkOrderedOp) {
83 if (op->type() == kCowReplaceOp) {
84 num_blocks = (CowOpCompressionSize(op, BLOCK_SZ) / BLOCK_SZ);
85 if (num_ops < num_blocks) {
86 break;
87 }
88 } else {
89 // zero op
90 num_blocks = 1;
91 }
92 replace_zero_vec->push_back(op);
93 }
94
95 nr_consecutive += num_blocks;
96 num_ops -= num_blocks;
97 cowop_iter_->Next();
98 }
99 }
100 } while (0);
101
102 return nr_consecutive;
103 }
104
MergeReplaceZeroOps()105 bool MergeWorker::MergeReplaceZeroOps() {
106 // Flush after merging 1MB. Since all ops are independent and there is no
107 // dependency between COW ops, we will flush the data and the number
108 // of ops merged in COW block device. If there is a crash, we will
109 // end up replaying some of the COW ops which were already merged. That is
110 // ok.
111 //
112 // Although increasing this greater than 1MB may help in improving merge
113 // times; however, on devices with low memory, this can be problematic
114 // when there are multiple merge threads in parallel.
115 int total_ops_merged_per_commit = (PAYLOAD_BUFFER_SZ / BLOCK_SZ);
116 int num_ops_merged = 0;
117
118 SNAP_LOG(INFO) << "MergeReplaceZeroOps started....";
119
120 while (!cowop_iter_->AtEnd()) {
121 int num_ops = PAYLOAD_BUFFER_SZ / BLOCK_SZ;
122 std::vector<const CowOperation*> replace_zero_vec;
123 uint64_t source_offset;
124
125 int linear_blocks = PrepareMerge(&source_offset, &num_ops, &replace_zero_vec);
126 if (linear_blocks == 0) {
127 // Merge complete
128 CHECK(cowop_iter_->AtEnd());
129 break;
130 }
131
132 for (size_t i = 0; i < replace_zero_vec.size(); i++) {
133 const CowOperation* cow_op = replace_zero_vec[i];
134 if (cow_op->type() == kCowReplaceOp) {
135 size_t buffer_size = CowOpCompressionSize(cow_op, BLOCK_SZ);
136 void* buffer = bufsink_.AcquireBuffer(buffer_size);
137 if (!buffer) {
138 SNAP_LOG(ERROR) << "AcquireBuffer failed in MergeReplaceOps";
139 return false;
140 }
141 // Read the entire compressed buffer spanning multiple blocks
142 if (!reader_->ReadData(cow_op, buffer, buffer_size)) {
143 SNAP_LOG(ERROR) << "Failed to read COW in merge";
144 return false;
145 }
146 } else {
147 void* buffer = bufsink_.AcquireBuffer(BLOCK_SZ);
148 if (!buffer) {
149 SNAP_LOG(ERROR) << "AcquireBuffer failed in MergeReplaceOps";
150 return false;
151 }
152 CHECK(cow_op->type() == kCowZeroOp);
153 memset(buffer, 0, BLOCK_SZ);
154 }
155 }
156
157 size_t io_size = linear_blocks * BLOCK_SZ;
158
159 // Merge - Write the contents back to base device
160 int ret = TEMP_FAILURE_RETRY(pwrite(base_path_merge_fd_.get(), bufsink_.GetPayloadBufPtr(),
161 io_size, source_offset));
162 if (ret < 0 || ret != io_size) {
163 SNAP_LOG(ERROR)
164 << "Merge: ReplaceZeroOps: Failed to write to backing device while merging "
165 << " at offset: " << source_offset << " io_size: " << io_size;
166 return false;
167 }
168
169 num_ops_merged += replace_zero_vec.size();
170
171 if (num_ops_merged >= total_ops_merged_per_commit) {
172 // Flush the data
173 if (fsync(base_path_merge_fd_.get()) < 0) {
174 SNAP_LOG(ERROR) << "Merge: ReplaceZeroOps: Failed to fsync merged data";
175 return false;
176 }
177
178 // Track the merge completion
179 if (!snapuserd_->CommitMerge(num_ops_merged)) {
180 SNAP_LOG(ERROR) << " Failed to commit the merged block in the header";
181 return false;
182 }
183
184 num_ops_merged = 0;
185 }
186
187 bufsink_.ResetBufferOffset();
188
189 if (snapuserd_->IsIOTerminated()) {
190 SNAP_LOG(ERROR) << "MergeReplaceZeroOps: MergeWorker threads terminated - shutting "
191 "down merge";
192 return false;
193 }
194 }
195
196 // Any left over ops not flushed yet.
197 if (num_ops_merged) {
198 // Flush the data
199 if (fsync(base_path_merge_fd_.get()) < 0) {
200 SNAP_LOG(ERROR) << "Merge: ReplaceZeroOps: Failed to fsync merged data";
201 return false;
202 }
203
204 if (!snapuserd_->CommitMerge(num_ops_merged)) {
205 SNAP_LOG(ERROR) << " Failed to commit the merged block in the header";
206 return false;
207 }
208
209 num_ops_merged = 0;
210 }
211
212 return true;
213 }
214
MergeOrderedOpsAsync()215 bool MergeWorker::MergeOrderedOpsAsync() {
216 void* mapped_addr = snapuserd_->GetMappedAddr();
217 void* read_ahead_buffer =
218 static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset());
219
220 SNAP_LOG(INFO) << "MergeOrderedOpsAsync started....";
221
222 while (!cowop_iter_->AtEnd()) {
223 const CowOperation* cow_op = cowop_iter_->Get();
224 if (!IsOrderedOp(*cow_op)) {
225 break;
226 }
227
228 SNAP_LOG(DEBUG) << "Waiting for merge begin...";
229 // Wait for RA thread to notify that the merge window
230 // is ready for merging.
231 if (!snapuserd_->WaitForMergeBegin()) {
232 SNAP_LOG(ERROR) << "Failed waiting for merge to begin";
233 return false;
234 }
235
236 std::optional<std::lock_guard<std::mutex>> buffer_lock;
237 // Acquire the buffer lock at this point so that RA thread
238 // doesn't step into this buffer. See b/377819507
239 buffer_lock.emplace(snapuserd_->GetBufferLock());
240
241 snapuserd_->SetMergeInProgress(ra_block_index_);
242
243 loff_t offset = 0;
244 int num_ops = snapuserd_->GetTotalBlocksToMerge();
245
246 int pending_sqe = queue_depth_;
247 int pending_ios_to_submit = 0;
248 bool flush_required = false;
249 blocks_merged_in_group_ = 0;
250
251 SNAP_LOG(DEBUG) << "Merging copy-ops of size: " << num_ops;
252 while (num_ops) {
253 uint64_t source_offset;
254
255 int linear_blocks = PrepareMerge(&source_offset, &num_ops);
256
257 if (linear_blocks != 0) {
258 size_t io_size = (linear_blocks * BLOCK_SZ);
259
260 // Get an SQE entry from the ring and populate the I/O variables
261 struct io_uring_sqe* sqe = io_uring_get_sqe(ring_.get());
262 if (!sqe) {
263 SNAP_PLOG(ERROR) << "io_uring_get_sqe failed during merge-ordered ops";
264 return false;
265 }
266
267 io_uring_prep_write(sqe, base_path_merge_fd_.get(),
268 (char*)read_ahead_buffer + offset, io_size, source_offset);
269
270 offset += io_size;
271 num_ops -= linear_blocks;
272 blocks_merged_in_group_ += linear_blocks;
273
274 pending_sqe -= 1;
275 pending_ios_to_submit += 1;
276 // These flags are important - We need to make sure that the
277 // blocks are linked and are written in the same order as
278 // populated. This is because of overlapping block writes.
279 //
280 // If there are no dependency, we can optimize this further by
281 // allowing parallel writes; but for now, just link all the SQ
282 // entries.
283 sqe->flags |= (IOSQE_IO_LINK | IOSQE_ASYNC);
284 }
285
286 // Ring is full or no more COW ops to be merged in this batch
287 if (pending_sqe == 0 || num_ops == 0 || (linear_blocks == 0 && pending_ios_to_submit)) {
288 // If this is a last set of COW ops to be merged in this batch, we need
289 // to sync the merged data. We will try to grab an SQE entry
290 // and set the FSYNC command; additionally, make sure that
291 // the fsync is done after all the I/O operations queued
292 // in the ring is completed by setting IOSQE_IO_DRAIN.
293 //
294 // If there is no space in the ring, we will flush it later
295 // by explicitly calling fsync() system call.
296 if (num_ops == 0 || (linear_blocks == 0 && pending_ios_to_submit)) {
297 if (pending_sqe != 0) {
298 struct io_uring_sqe* sqe = io_uring_get_sqe(ring_.get());
299 if (!sqe) {
300 // very unlikely but let's continue and not fail the
301 // merge - we will flush it later
302 SNAP_PLOG(ERROR) << "io_uring_get_sqe failed during merge-ordered ops";
303 flush_required = true;
304 } else {
305 io_uring_prep_fsync(sqe, base_path_merge_fd_.get(), 0);
306 // Drain the queue before fsync
307 io_uring_sqe_set_flags(sqe, IOSQE_IO_DRAIN);
308 pending_sqe -= 1;
309 flush_required = false;
310 pending_ios_to_submit += 1;
311 sqe->flags |= (IOSQE_IO_LINK | IOSQE_ASYNC);
312 }
313 } else {
314 flush_required = true;
315 }
316 }
317
318 // Submit the IO for all the COW ops in a single syscall
319 int ret = io_uring_submit(ring_.get());
320 if (ret != pending_ios_to_submit) {
321 SNAP_PLOG(ERROR)
322 << "io_uring_submit failed for read-ahead: "
323 << " io submit: " << ret << " expected: " << pending_ios_to_submit;
324 return false;
325 }
326
327 int pending_ios_to_complete = pending_ios_to_submit;
328 pending_ios_to_submit = 0;
329
330 bool status = true;
331
332 // Reap I/O completions
333 while (pending_ios_to_complete) {
334 struct io_uring_cqe* cqe;
335
336 // io_uring_wait_cqe can potentially return -EAGAIN or -EINTR;
337 // these error codes are not truly I/O errors; we can retry them
338 // by re-populating the SQE entries and submitting the I/O
339 // request back. However, we don't do that now; instead we
340 // will fallback to synchronous I/O.
341 ret = io_uring_wait_cqe(ring_.get(), &cqe);
342 if (ret) {
343 SNAP_LOG(ERROR) << "Merge: io_uring_wait_cqe failed: " << strerror(-ret);
344 status = false;
345 break;
346 }
347
348 if (cqe->res < 0) {
349 SNAP_LOG(ERROR) << "Merge: io_uring_wait_cqe failed with res: " << cqe->res;
350 status = false;
351 break;
352 }
353
354 io_uring_cqe_seen(ring_.get(), cqe);
355 pending_ios_to_complete -= 1;
356 }
357
358 if (!status) {
359 return false;
360 }
361
362 pending_sqe = queue_depth_;
363 }
364
365 if (linear_blocks == 0) {
366 break;
367 }
368 }
369
370 // Verify all ops are merged
371 CHECK(num_ops == 0);
372
373 // Flush the data
374 if (flush_required && (fsync(base_path_merge_fd_.get()) < 0)) {
375 SNAP_LOG(ERROR) << " Failed to fsync merged data";
376 return false;
377 }
378
379 // Merge is done and data is on disk. Update the COW Header about
380 // the merge completion
381 if (!snapuserd_->CommitMerge(snapuserd_->GetTotalBlocksToMerge())) {
382 SNAP_LOG(ERROR) << " Failed to commit the merged block in the header";
383 return false;
384 }
385
386 SNAP_LOG(DEBUG) << "Block commit of size: " << snapuserd_->GetTotalBlocksToMerge();
387
388 // Mark the block as merge complete
389 snapuserd_->SetMergeCompleted(ra_block_index_);
390
391 // Release the buffer lock
392 buffer_lock.reset();
393
394 // Notify RA thread that the merge thread is ready to merge the next
395 // window
396 snapuserd_->NotifyRAForMergeReady();
397
398 // Get the next block
399 ra_block_index_ += 1;
400 }
401
402 return true;
403 }
404
MergeOrderedOps()405 bool MergeWorker::MergeOrderedOps() {
406 void* mapped_addr = snapuserd_->GetMappedAddr();
407 void* read_ahead_buffer =
408 static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset());
409
410 SNAP_LOG(INFO) << "MergeOrderedOps started....";
411
412 while (!cowop_iter_->AtEnd()) {
413 const CowOperation* cow_op = cowop_iter_->Get();
414 if (!IsOrderedOp(*cow_op)) {
415 break;
416 }
417
418 SNAP_LOG(DEBUG) << "Waiting for merge begin...";
419 // Wait for RA thread to notify that the merge window
420 // is ready for merging.
421 if (!snapuserd_->WaitForMergeBegin()) {
422 snapuserd_->SetMergeFailed(ra_block_index_);
423 return false;
424 }
425
426 std::optional<std::lock_guard<std::mutex>> buffer_lock;
427 // Acquire the buffer lock at this point so that RA thread
428 // doesn't step into this buffer. See b/377819507
429 buffer_lock.emplace(snapuserd_->GetBufferLock());
430
431 snapuserd_->SetMergeInProgress(ra_block_index_);
432
433 loff_t offset = 0;
434 int num_ops = snapuserd_->GetTotalBlocksToMerge();
435 SNAP_LOG(DEBUG) << "Merging copy-ops of size: " << num_ops;
436 while (num_ops) {
437 uint64_t source_offset;
438
439 int linear_blocks = PrepareMerge(&source_offset, &num_ops);
440 if (linear_blocks == 0) {
441 break;
442 }
443
444 size_t io_size = (linear_blocks * BLOCK_SZ);
445 // Write to the base device. Data is already in the RA buffer. Note
446 // that XOR ops is already handled by the RA thread. We just write
447 // the contents out.
448 int ret = TEMP_FAILURE_RETRY(pwrite(base_path_merge_fd_.get(),
449 (char*)read_ahead_buffer + offset, io_size,
450 source_offset));
451 if (ret < 0 || ret != io_size) {
452 SNAP_LOG(ERROR) << "Failed to write to backing device while merging "
453 << " at offset: " << source_offset << " io_size: " << io_size;
454 snapuserd_->SetMergeFailed(ra_block_index_);
455 return false;
456 }
457
458 offset += io_size;
459 num_ops -= linear_blocks;
460 }
461
462 // Verify all ops are merged
463 CHECK(num_ops == 0);
464
465 // Flush the data
466 if (fsync(base_path_merge_fd_.get()) < 0) {
467 SNAP_LOG(ERROR) << " Failed to fsync merged data";
468 snapuserd_->SetMergeFailed(ra_block_index_);
469 return false;
470 }
471
472 // Merge is done and data is on disk. Update the COW Header about
473 // the merge completion
474 if (!snapuserd_->CommitMerge(snapuserd_->GetTotalBlocksToMerge())) {
475 SNAP_LOG(ERROR) << " Failed to commit the merged block in the header";
476 snapuserd_->SetMergeFailed(ra_block_index_);
477 return false;
478 }
479
480 SNAP_LOG(DEBUG) << "Block commit of size: " << snapuserd_->GetTotalBlocksToMerge();
481 // Mark the block as merge complete
482 snapuserd_->SetMergeCompleted(ra_block_index_);
483
484 // Release the buffer lock
485 buffer_lock.reset();
486
487 // Notify RA thread that the merge thread is ready to merge the next
488 // window
489 snapuserd_->NotifyRAForMergeReady();
490
491 // Get the next block
492 ra_block_index_ += 1;
493 }
494
495 return true;
496 }
497
AsyncMerge()498 bool MergeWorker::AsyncMerge() {
499 if (!MergeOrderedOpsAsync()) {
500 SNAP_LOG(ERROR) << "MergeOrderedOpsAsync failed - Falling back to synchronous I/O";
501 // Reset the iter so that we retry the merge
502 while (blocks_merged_in_group_ && !cowop_iter_->AtBegin()) {
503 cowop_iter_->Prev();
504 blocks_merged_in_group_ -= 1;
505 }
506
507 return false;
508 }
509
510 SNAP_LOG(INFO) << "MergeOrderedOpsAsync completed";
511 return true;
512 }
513
SyncMerge()514 bool MergeWorker::SyncMerge() {
515 if (!MergeOrderedOps()) {
516 SNAP_LOG(ERROR) << "Merge failed for ordered ops";
517 return false;
518 }
519
520 SNAP_LOG(INFO) << "MergeOrderedOps completed";
521 return true;
522 }
523
Merge()524 bool MergeWorker::Merge() {
525 cowop_iter_ = reader_->GetOpIter(true);
526
527 bool retry = false;
528 bool ordered_ops_merge_status;
529
530 // Start Async Merge
531 if (merge_async_) {
532 ordered_ops_merge_status = AsyncMerge();
533 if (!ordered_ops_merge_status) {
534 FinalizeIouring();
535 retry = true;
536 merge_async_ = false;
537 }
538 }
539
540 // Check if we need to fallback and retry the merge
541 //
542 // If the device doesn't support async merge, we
543 // will directly enter here (aka devices with 4.x kernels)
544 const bool sync_merge_required = (retry || !merge_async_);
545
546 if (sync_merge_required) {
547 ordered_ops_merge_status = SyncMerge();
548 if (!ordered_ops_merge_status) {
549 // Merge failed. Device will continue to be mounted
550 // off snapshots; merge will be retried during
551 // next reboot
552 SNAP_LOG(ERROR) << "Merge failed for ordered ops";
553 snapuserd_->MergeFailed();
554 return false;
555 }
556 }
557
558 // Replace and Zero ops
559 if (!MergeReplaceZeroOps()) {
560 SNAP_LOG(ERROR) << "Merge failed for replace/zero ops";
561 snapuserd_->MergeFailed();
562 return false;
563 }
564
565 snapuserd_->MergeCompleted();
566
567 return true;
568 }
569
InitializeIouring()570 bool MergeWorker::InitializeIouring() {
571 if (!snapuserd_->IsIouringSupported()) {
572 return false;
573 }
574
575 ring_ = std::make_unique<struct io_uring>();
576
577 int ret = io_uring_queue_init(queue_depth_, ring_.get(), 0);
578 if (ret) {
579 LOG(ERROR) << "Merge: io_uring_queue_init failed with ret: " << ret;
580 return false;
581 }
582
583 merge_async_ = true;
584
585 LOG(INFO) << "Merge: io_uring initialized with queue depth: " << queue_depth_;
586 return true;
587 }
588
FinalizeIouring()589 void MergeWorker::FinalizeIouring() {
590 if (merge_async_) {
591 io_uring_queue_exit(ring_.get());
592 }
593 }
594
Run()595 bool MergeWorker::Run() {
596 SNAP_LOG(DEBUG) << "Waiting for merge begin...";
597
598 pthread_setname_np(pthread_self(), "MergeWorker");
599
600 if (!snapuserd_->WaitForMergeBegin()) {
601 return true;
602 }
603 auto merge_thread_priority = android::base::GetUintProperty<uint32_t>(
604 "ro.virtual_ab.merge_thread_priority", ANDROID_PRIORITY_BACKGROUND);
605
606 if (!SetThreadPriority(merge_thread_priority)) {
607 SNAP_PLOG(ERROR) << "Failed to set thread priority";
608 }
609
610 if (!SetProfiles({"CPUSET_SP_BACKGROUND"})) {
611 SNAP_PLOG(ERROR) << "Failed to assign task profile to Mergeworker thread";
612 }
613
614 SNAP_LOG(INFO) << "Merge starting..";
615
616 bufsink_.Initialize(PAYLOAD_BUFFER_SZ);
617
618 if (!Init()) {
619 SNAP_LOG(ERROR) << "Merge thread initialization failed...";
620 snapuserd_->MergeFailed();
621 return false;
622 }
623
624 InitializeIouring();
625
626 if (!Merge()) {
627 return false;
628 }
629
630 FinalizeIouring();
631 CloseFds();
632 reader_->CloseCowFd();
633
634 SNAP_LOG(INFO) << "Snapshot-Merge completed";
635
636 return true;
637 }
638
639 } // namespace snapshot
640 } // namespace android
641