1 /*
2 * Copyright (C) 2021 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "snapuserd_core.h"
18
19 #include <android-base/chrono_utils.h>
20 #include <android-base/properties.h>
21 #include <android-base/scopeguard.h>
22 #include <android-base/strings.h>
23 #include <snapuserd/dm_user_block_server.h>
24
25 #include "merge_worker.h"
26 #include "read_worker.h"
27 #include "utility.h"
28
29 namespace android {
30 namespace snapshot {
31
32 using namespace android;
33 using namespace android::dm;
34 using android::base::unique_fd;
35
SnapshotHandler(std::string misc_name,std::string cow_device,std::string backing_device,std::string base_path_merge,std::shared_ptr<IBlockServerOpener> opener,int num_worker_threads,bool use_iouring,bool perform_verification,bool o_direct,uint32_t cow_op_merge_size)36 SnapshotHandler::SnapshotHandler(std::string misc_name, std::string cow_device,
37 std::string backing_device, std::string base_path_merge,
38 std::shared_ptr<IBlockServerOpener> opener, int num_worker_threads,
39 bool use_iouring, bool perform_verification, bool o_direct,
40 uint32_t cow_op_merge_size) {
41 misc_name_ = std::move(misc_name);
42 cow_device_ = std::move(cow_device);
43 backing_store_device_ = std::move(backing_device);
44 block_server_opener_ = std::move(opener);
45 base_path_merge_ = std::move(base_path_merge);
46 num_worker_threads_ = num_worker_threads;
47 is_io_uring_enabled_ = use_iouring;
48 perform_verification_ = perform_verification;
49 o_direct_ = o_direct;
50 cow_op_merge_size_ = cow_op_merge_size;
51 }
52
InitializeWorkers()53 bool SnapshotHandler::InitializeWorkers() {
54 for (int i = 0; i < num_worker_threads_; i++) {
55 auto wt = std::make_unique<ReadWorker>(cow_device_, backing_store_device_, misc_name_,
56 base_path_merge_, GetSharedPtr(),
57 block_server_opener_, o_direct_);
58 if (!wt->Init()) {
59 SNAP_LOG(ERROR) << "Thread initialization failed";
60 return false;
61 }
62
63 worker_threads_.push_back(std::move(wt));
64 }
65 merge_thread_ = std::make_unique<MergeWorker>(cow_device_, misc_name_, base_path_merge_,
66 GetSharedPtr(), cow_op_merge_size_);
67
68 read_ahead_thread_ = std::make_unique<ReadAhead>(cow_device_, backing_store_device_, misc_name_,
69 GetSharedPtr(), cow_op_merge_size_);
70
71 update_verify_ = std::make_unique<UpdateVerify>(misc_name_);
72
73 return true;
74 }
75
CloneReaderForWorker()76 std::unique_ptr<CowReader> SnapshotHandler::CloneReaderForWorker() {
77 return reader_->CloneCowReader();
78 }
79
UpdateMergeCompletionPercentage()80 void SnapshotHandler::UpdateMergeCompletionPercentage() {
81 struct CowHeader* ch = reinterpret_cast<struct CowHeader*>(mapped_addr_);
82 merge_completion_percentage_ = (ch->num_merge_ops * 100.0) / reader_->get_num_total_data_ops();
83
84 SNAP_LOG(DEBUG) << "Merge-complete %: " << merge_completion_percentage_
85 << " num_merge_ops: " << ch->num_merge_ops
86 << " total-ops: " << reader_->get_num_total_data_ops();
87
88 if (ch->num_merge_ops == reader_->get_num_total_data_ops()) {
89 MarkMergeComplete();
90 }
91 }
92
CommitMerge(int num_merge_ops)93 bool SnapshotHandler::CommitMerge(int num_merge_ops) {
94 struct CowHeader* ch = reinterpret_cast<struct CowHeader*>(mapped_addr_);
95 ch->num_merge_ops += num_merge_ops;
96
97 if (scratch_space_) {
98 if (ra_thread_) {
99 struct BufferState* ra_state = GetBufferState();
100 ra_state->read_ahead_state = kCowReadAheadInProgress;
101 }
102
103 int ret = msync(mapped_addr_, BLOCK_SZ, MS_SYNC);
104 if (ret < 0) {
105 SNAP_PLOG(ERROR) << "msync header failed: " << ret;
106 return false;
107 }
108 } else {
109 reader_->UpdateMergeOpsCompleted(num_merge_ops);
110 const auto& header = reader_->GetHeader();
111
112 if (lseek(cow_fd_.get(), 0, SEEK_SET) < 0) {
113 SNAP_PLOG(ERROR) << "lseek failed";
114 return false;
115 }
116
117 if (!android::base::WriteFully(cow_fd_, &header, header.prefix.header_size)) {
118 SNAP_PLOG(ERROR) << "Write to header failed";
119 return false;
120 }
121
122 if (fsync(cow_fd_.get()) < 0) {
123 SNAP_PLOG(ERROR) << "fsync failed";
124 return false;
125 }
126 }
127
128 // Update the merge completion - this is used by update engine
129 // to track the completion. No need to take a lock. It is ok
130 // even if there is a miss on reading a latest updated value.
131 // Subsequent polling will eventually converge to completion.
132 UpdateMergeCompletionPercentage();
133
134 return true;
135 }
136
PrepareReadAhead()137 void SnapshotHandler::PrepareReadAhead() {
138 struct BufferState* ra_state = GetBufferState();
139 // Check if the data has to be re-constructed from COW device
140 if (ra_state->read_ahead_state == kCowReadAheadDone) {
141 populate_data_from_cow_ = true;
142 } else {
143 populate_data_from_cow_ = false;
144 }
145
146 NotifyRAForMergeReady();
147 }
148
CheckMergeCompletionStatus()149 bool SnapshotHandler::CheckMergeCompletionStatus() {
150 if (!merge_initiated_) {
151 SNAP_LOG(INFO) << "Merge was not initiated. Total-data-ops: "
152 << reader_->get_num_total_data_ops();
153 return false;
154 }
155
156 struct CowHeader* ch = reinterpret_cast<struct CowHeader*>(mapped_addr_);
157
158 SNAP_LOG(INFO) << "Merge-status: Total-Merged-ops: " << ch->num_merge_ops
159 << " Total-data-ops: " << reader_->get_num_total_data_ops();
160 return true;
161 }
162
ReadMetadata()163 bool SnapshotHandler::ReadMetadata() {
164 reader_ = std::make_unique<CowReader>(CowReader::ReaderFlags::USERSPACE_MERGE, true);
165 CowOptions options;
166
167 SNAP_LOG(DEBUG) << "ReadMetadata: Parsing cow file";
168
169 if (!reader_->Parse(cow_fd_)) {
170 SNAP_LOG(ERROR) << "Failed to parse";
171 return false;
172 }
173
174 const auto& header = reader_->GetHeader();
175 if (!(header.block_size == BLOCK_SZ)) {
176 SNAP_LOG(ERROR) << "Invalid header block size found: " << header.block_size;
177 return false;
178 }
179
180 SNAP_LOG(INFO) << "Merge-ops: " << header.num_merge_ops;
181 if (header.num_merge_ops) {
182 resume_merge_ = true;
183 SNAP_LOG(INFO) << "Resume Snapshot-merge";
184 }
185
186 if (!MmapMetadata()) {
187 SNAP_LOG(ERROR) << "mmap failed";
188 return false;
189 }
190
191 UpdateMergeCompletionPercentage();
192
193 // Initialize the iterator for reading metadata
194 std::unique_ptr<ICowOpIter> cowop_iter = reader_->GetOpIter(true);
195
196 int num_ra_ops_per_iter = ((GetBufferDataSize()) / BLOCK_SZ);
197 int ra_index = 0;
198
199 size_t copy_ops = 0, replace_ops = 0, zero_ops = 0, xor_ops = 0;
200
201 while (!cowop_iter->AtEnd()) {
202 const CowOperation* cow_op = cowop_iter->Get();
203
204 if (cow_op->type() == kCowCopyOp) {
205 copy_ops += 1;
206 } else if (cow_op->type() == kCowReplaceOp) {
207 replace_ops += 1;
208 } else if (cow_op->type() == kCowZeroOp) {
209 zero_ops += 1;
210 } else if (cow_op->type() == kCowXorOp) {
211 xor_ops += 1;
212 }
213
214 chunk_vec_.push_back(std::make_pair(ChunkToSector(cow_op->new_block), cow_op));
215
216 if (IsOrderedOp(*cow_op)) {
217 ra_thread_ = true;
218 block_to_ra_index_[cow_op->new_block] = ra_index;
219 num_ra_ops_per_iter -= 1;
220
221 if ((ra_index + 1) - merge_blk_state_.size() == 1) {
222 std::unique_ptr<MergeGroupState> blk_state = std::make_unique<MergeGroupState>(
223 MERGE_GROUP_STATE::GROUP_MERGE_PENDING, 0);
224
225 merge_blk_state_.push_back(std::move(blk_state));
226 }
227
228 // Move to next RA block
229 if (num_ra_ops_per_iter == 0) {
230 num_ra_ops_per_iter = ((GetBufferDataSize()) / BLOCK_SZ);
231 ra_index += 1;
232 }
233 }
234 cowop_iter->Next();
235 }
236
237 chunk_vec_.shrink_to_fit();
238
239 // Sort the vector based on sectors as we need this during un-aligned access
240 std::sort(chunk_vec_.begin(), chunk_vec_.end(), compare);
241
242 PrepareReadAhead();
243
244 SNAP_LOG(INFO) << "Merged-ops: " << header.num_merge_ops
245 << " Total-data-ops: " << reader_->get_num_total_data_ops()
246 << " Unmerged-ops: " << chunk_vec_.size() << " Copy-ops: " << copy_ops
247 << " Zero-ops: " << zero_ops << " Replace-ops: " << replace_ops
248 << " Xor-ops: " << xor_ops;
249
250 return true;
251 }
252
MmapMetadata()253 bool SnapshotHandler::MmapMetadata() {
254 const auto& header = reader_->GetHeader();
255
256 total_mapped_addr_length_ = header.prefix.header_size + BUFFER_REGION_DEFAULT_SIZE;
257
258 if (header.prefix.major_version >= 2 && header.buffer_size > 0) {
259 scratch_space_ = true;
260 }
261
262 if (scratch_space_) {
263 mapped_addr_ = mmap(NULL, total_mapped_addr_length_, PROT_READ | PROT_WRITE, MAP_SHARED,
264 cow_fd_.get(), 0);
265 } else {
266 mapped_addr_ = mmap(NULL, total_mapped_addr_length_, PROT_READ | PROT_WRITE,
267 MAP_SHARED | MAP_ANONYMOUS, -1, 0);
268 struct CowHeader* ch = reinterpret_cast<struct CowHeader*>(mapped_addr_);
269 ch->num_merge_ops = header.num_merge_ops;
270 }
271
272 if (mapped_addr_ == MAP_FAILED) {
273 SNAP_LOG(ERROR) << "mmap metadata failed";
274 return false;
275 }
276
277 return true;
278 }
279
UnmapBufferRegion()280 void SnapshotHandler::UnmapBufferRegion() {
281 int ret = munmap(mapped_addr_, total_mapped_addr_length_);
282 if (ret < 0) {
283 SNAP_PLOG(ERROR) << "munmap failed";
284 }
285 }
286
InitCowDevice()287 bool SnapshotHandler::InitCowDevice() {
288 cow_fd_.reset(open(cow_device_.c_str(), O_RDWR));
289 if (cow_fd_ < 0) {
290 SNAP_PLOG(ERROR) << "Open Failed: " << cow_device_;
291 return false;
292 }
293
294 return ReadMetadata();
295 }
296
297 /*
298 * Entry point to launch threads
299 */
Start()300 bool SnapshotHandler::Start() {
301 std::vector<std::future<bool>> threads;
302 std::future<bool> ra_thread_status;
303
304 if (ra_thread_) {
305 ra_thread_status =
306 std::async(std::launch::async, &ReadAhead::RunThread, read_ahead_thread_.get());
307 // If this is a merge-resume path, wait until RA thread is fully up as
308 // the data has to be re-constructed from the scratch space.
309 if (resume_merge_ && ShouldReconstructDataFromCow()) {
310 WaitForRaThreadToStart();
311 }
312 }
313
314 // Launch worker threads
315 for (int i = 0; i < worker_threads_.size(); i++) {
316 threads.emplace_back(
317 std::async(std::launch::async, &ReadWorker::Run, worker_threads_[i].get()));
318 }
319
320 std::future<bool> merge_thread =
321 std::async(std::launch::async, &MergeWorker::Run, merge_thread_.get());
322
323 // Now that the worker threads are up, scan the partitions.
324 // If the snapshot-merge is being resumed, there is no need to scan as the
325 // current slot is already marked as boot complete.
326 if (perform_verification_ && !resume_merge_) {
327 update_verify_->VerifyUpdatePartition();
328 }
329
330 bool ret = true;
331 for (auto& t : threads) {
332 ret = t.get() && ret;
333 }
334
335 // Worker threads are terminated by this point - this can only happen:
336 //
337 // 1: If dm-user device is destroyed
338 // 2: We had an I/O failure when reading root partitions
339 //
340 // In case (1), this would be a graceful shutdown. In this case, merge
341 // thread and RA thread should have already terminated by this point. We will be
342 // destroying the dm-user device only _after_ merge is completed.
343 //
344 // In case (2), if merge thread had started, then it will be
345 // continuing to merge; however, since we had an I/O failure and the
346 // I/O on root partitions are no longer served, we will terminate the
347 // merge
348
349 NotifyIOTerminated();
350
351 bool read_ahead_retval = false;
352
353 SNAP_LOG(INFO) << "Snapshot I/O terminated. Waiting for merge thread....";
354 bool merge_thread_status = merge_thread.get();
355
356 if (ra_thread_) {
357 read_ahead_retval = ra_thread_status.get();
358 }
359
360 SNAP_LOG(INFO) << "Worker threads terminated with ret: " << ret
361 << " Merge-thread with ret: " << merge_thread_status
362 << " RA-thread with ret: " << read_ahead_retval;
363 return ret;
364 }
365
GetBufferMetadataOffset()366 uint64_t SnapshotHandler::GetBufferMetadataOffset() {
367 const auto& header = reader_->GetHeader();
368
369 return (header.prefix.header_size + sizeof(BufferState));
370 }
371
372 /*
373 * Metadata for read-ahead is 16 bytes. For a 2 MB region, we will
374 * end up with 8k (2 PAGE) worth of metadata. Thus, a 2MB buffer
375 * region is split into:
376 *
377 * 1: 8k metadata
378 * 2: Scratch space
379 *
380 */
GetBufferMetadataSize()381 size_t SnapshotHandler::GetBufferMetadataSize() {
382 const auto& header = reader_->GetHeader();
383 size_t buffer_size = header.buffer_size;
384
385 // If there is no scratch space, then just use the
386 // anonymous memory
387 if (buffer_size == 0) {
388 buffer_size = BUFFER_REGION_DEFAULT_SIZE;
389 }
390
391 return ((buffer_size * sizeof(struct ScratchMetadata)) / BLOCK_SZ);
392 }
393
GetBufferDataOffset()394 size_t SnapshotHandler::GetBufferDataOffset() {
395 const auto& header = reader_->GetHeader();
396
397 return (header.prefix.header_size + GetBufferMetadataSize());
398 }
399
400 /*
401 * (2MB - 8K = 2088960 bytes) will be the buffer region to hold the data.
402 */
GetBufferDataSize()403 size_t SnapshotHandler::GetBufferDataSize() {
404 const auto& header = reader_->GetHeader();
405 size_t buffer_size = header.buffer_size;
406
407 // If there is no scratch space, then just use the
408 // anonymous memory
409 if (buffer_size == 0) {
410 buffer_size = BUFFER_REGION_DEFAULT_SIZE;
411 }
412
413 return (buffer_size - GetBufferMetadataSize());
414 }
415
GetBufferState()416 struct BufferState* SnapshotHandler::GetBufferState() {
417 const auto& header = reader_->GetHeader();
418
419 struct BufferState* ra_state =
420 reinterpret_cast<struct BufferState*>((char*)mapped_addr_ + header.prefix.header_size);
421 return ra_state;
422 }
423
IsIouringSupported()424 bool SnapshotHandler::IsIouringSupported() {
425 if (!KernelSupportsIoUring()) {
426 return false;
427 }
428
429 // During selinux init transition, libsnapshot will propagate the
430 // status of io_uring enablement. As properties are not initialized,
431 // we cannot query system property.
432 if (is_io_uring_enabled_) {
433 return true;
434 }
435
436 // Finally check the system property
437 return android::base::GetBoolProperty("ro.virtual_ab.io_uring.enabled", false);
438 }
439
CheckPartitionVerification()440 bool SnapshotHandler::CheckPartitionVerification() {
441 return update_verify_->CheckPartitionVerification();
442 }
443
FreeResources()444 void SnapshotHandler::FreeResources() {
445 worker_threads_.clear();
446 read_ahead_thread_ = nullptr;
447 merge_thread_ = nullptr;
448 }
449
GetNumSectors() const450 uint64_t SnapshotHandler::GetNumSectors() const {
451 unique_fd fd(TEMP_FAILURE_RETRY(open(base_path_merge_.c_str(), O_RDONLY | O_CLOEXEC)));
452 if (fd < 0) {
453 SNAP_LOG(ERROR) << "Cannot open base path: " << base_path_merge_;
454 return false;
455 }
456
457 uint64_t dev_sz = get_block_device_size(fd.get());
458 if (!dev_sz) {
459 SNAP_LOG(ERROR) << "Failed to find block device size: " << base_path_merge_;
460 return false;
461 }
462
463 return dev_sz / SECTOR_SIZE;
464 }
465
466 } // namespace snapshot
467 } // namespace android
468