1 /*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/tracing/core/shared_memory_arbiter_impl.h"
18
19 #include <algorithm>
20 #include <limits>
21 #include <utility>
22
23 #include "perfetto/base/logging.h"
24 #include "perfetto/base/task_runner.h"
25 #include "perfetto/base/time.h"
26 #include "perfetto/ext/tracing/core/commit_data_request.h"
27 #include "perfetto/ext/tracing/core/shared_memory.h"
28 #include "perfetto/ext/tracing/core/shared_memory_abi.h"
29 #include "src/tracing/core/null_trace_writer.h"
30 #include "src/tracing/core/trace_writer_impl.h"
31
32 namespace perfetto {
33
34 using Chunk = SharedMemoryABI::Chunk;
35
36 namespace {
37 static_assert(sizeof(BufferID) == sizeof(uint16_t),
38 "The MaybeUnboundBufferID logic requires BufferID not to grow "
39 "above uint16_t.");
40
MakeTargetBufferIdForReservation(uint16_t reservation_id)41 MaybeUnboundBufferID MakeTargetBufferIdForReservation(uint16_t reservation_id) {
42 // Reservation IDs are stored in the upper bits.
43 PERFETTO_CHECK(reservation_id > 0);
44 return static_cast<MaybeUnboundBufferID>(reservation_id) << 16;
45 }
46
IsReservationTargetBufferId(MaybeUnboundBufferID buffer_id)47 bool IsReservationTargetBufferId(MaybeUnboundBufferID buffer_id) {
48 return (buffer_id >> 16) > 0;
49 }
50 } // namespace
51
52 // static
53 SharedMemoryABI::PageLayout SharedMemoryArbiterImpl::default_page_layout =
54 SharedMemoryABI::PageLayout::kPageDiv1;
55
56 // static
CreateInstance(SharedMemory * shared_memory,size_t page_size,ShmemMode mode,TracingService::ProducerEndpoint * producer_endpoint,base::TaskRunner * task_runner)57 std::unique_ptr<SharedMemoryArbiter> SharedMemoryArbiter::CreateInstance(
58 SharedMemory* shared_memory,
59 size_t page_size,
60 ShmemMode mode,
61 TracingService::ProducerEndpoint* producer_endpoint,
62 base::TaskRunner* task_runner) {
63 return std::unique_ptr<SharedMemoryArbiterImpl>(new SharedMemoryArbiterImpl(
64 shared_memory->start(), shared_memory->size(), mode, page_size,
65 producer_endpoint, task_runner));
66 }
67
68 // static
CreateUnboundInstance(SharedMemory * shared_memory,size_t page_size,ShmemMode mode)69 std::unique_ptr<SharedMemoryArbiter> SharedMemoryArbiter::CreateUnboundInstance(
70 SharedMemory* shared_memory,
71 size_t page_size,
72 ShmemMode mode) {
73 return std::unique_ptr<SharedMemoryArbiterImpl>(new SharedMemoryArbiterImpl(
74 shared_memory->start(), shared_memory->size(), mode, page_size,
75 /*producer_endpoint=*/nullptr, /*task_runner=*/nullptr));
76 }
77
SharedMemoryArbiterImpl(void * start,size_t size,ShmemMode mode,size_t page_size,TracingService::ProducerEndpoint * producer_endpoint,base::TaskRunner * task_runner)78 SharedMemoryArbiterImpl::SharedMemoryArbiterImpl(
79 void* start,
80 size_t size,
81 ShmemMode mode,
82 size_t page_size,
83 TracingService::ProducerEndpoint* producer_endpoint,
84 base::TaskRunner* task_runner)
85 : producer_endpoint_(producer_endpoint),
86 use_shmem_emulation_(mode == ShmemMode::kShmemEmulation),
87 task_runner_(task_runner),
88 shmem_abi_(reinterpret_cast<uint8_t*>(start), size, page_size, mode),
89 active_writer_ids_(kMaxWriterID),
90 fully_bound_(task_runner && producer_endpoint),
91 was_always_bound_(fully_bound_),
92 weak_ptr_factory_(this) {}
93
GetNewChunk(const SharedMemoryABI::ChunkHeader & header,BufferExhaustedPolicy buffer_exhausted_policy)94 Chunk SharedMemoryArbiterImpl::GetNewChunk(
95 const SharedMemoryABI::ChunkHeader& header,
96 BufferExhaustedPolicy buffer_exhausted_policy) {
97 int stall_count = 0;
98 unsigned stall_interval_us = 0;
99 bool task_runner_runs_on_current_thread = false;
100 static const unsigned kMaxStallIntervalUs = 100000;
101 static const int kLogAfterNStalls = 3;
102 static const int kFlushCommitsAfterEveryNStalls = 2;
103 static const int kAssertAtNStalls = 200;
104
105 for (;;) {
106 // TODO(primiano): Probably this lock is not really required and this code
107 // could be rewritten leveraging only the Try* atomic operations in
108 // SharedMemoryABI. But let's not be too adventurous for the moment.
109 {
110 std::unique_lock<std::mutex> scoped_lock(lock_);
111
112 // If ever unbound, we do not support stalling. In theory, we could
113 // support stalling for TraceWriters created after the arbiter and startup
114 // buffer reservations were bound, but to avoid raciness between the
115 // creation of startup writers and binding, we categorically forbid kStall
116 // mode.
117 PERFETTO_DCHECK(was_always_bound_ ||
118 buffer_exhausted_policy == BufferExhaustedPolicy::kDrop);
119
120 task_runner_runs_on_current_thread =
121 task_runner_ && task_runner_->RunsTasksOnCurrentThread();
122
123 // If more than half of the SMB.size() is filled with completed chunks for
124 // which we haven't notified the service yet (i.e. they are still enqueued
125 // in |commit_data_req_|), force a synchronous CommitDataRequest() even if
126 // we acquire a chunk, to reduce the likeliness of stalling the writer.
127 //
128 // We can only do this if we're writing on the same thread that we access
129 // the producer endpoint on, since we cannot notify the producer endpoint
130 // to commit synchronously on a different thread. Attempting to flush
131 // synchronously on another thread will lead to subtle bugs caused by
132 // out-of-order commit requests (crbug.com/919187#c28).
133 bool should_commit_synchronously =
134 task_runner_runs_on_current_thread &&
135 buffer_exhausted_policy == BufferExhaustedPolicy::kStall &&
136 commit_data_req_ && bytes_pending_commit_ >= shmem_abi_.size() / 2;
137
138 const size_t initial_page_idx = page_idx_;
139 for (size_t i = 0; i < shmem_abi_.num_pages(); i++) {
140 page_idx_ = (initial_page_idx + i) % shmem_abi_.num_pages();
141 bool is_new_page = false;
142
143 // TODO(primiano): make the page layout dynamic.
144 auto layout = SharedMemoryArbiterImpl::default_page_layout;
145
146 if (shmem_abi_.is_page_free(page_idx_)) {
147 is_new_page = shmem_abi_.TryPartitionPage(page_idx_, layout);
148 }
149 uint32_t free_chunks;
150 if (is_new_page) {
151 free_chunks = (1 << SharedMemoryABI::kNumChunksForLayout[layout]) - 1;
152 } else {
153 free_chunks = shmem_abi_.GetFreeChunks(page_idx_);
154 }
155
156 for (uint32_t chunk_idx = 0; free_chunks;
157 chunk_idx++, free_chunks >>= 1) {
158 if (!(free_chunks & 1))
159 continue;
160 // We found a free chunk.
161 Chunk chunk = shmem_abi_.TryAcquireChunkForWriting(
162 page_idx_, chunk_idx, &header);
163 if (!chunk.is_valid())
164 continue;
165 if (stall_count > kLogAfterNStalls) {
166 PERFETTO_LOG("Recovered from stall after %d iterations",
167 stall_count);
168 }
169
170 if (should_commit_synchronously) {
171 // We can't flush while holding the lock.
172 scoped_lock.unlock();
173 FlushPendingCommitDataRequests();
174 return chunk;
175 } else {
176 return chunk;
177 }
178 }
179 }
180 } // scoped_lock
181
182 if (buffer_exhausted_policy == BufferExhaustedPolicy::kDrop) {
183 PERFETTO_DLOG("Shared memory buffer exhausted, returning invalid Chunk!");
184 return Chunk();
185 }
186
187 // Stalling is not supported if we were ever unbound (see earlier comment).
188 PERFETTO_CHECK(was_always_bound_);
189
190 // All chunks are taken (either kBeingWritten by us or kBeingRead by the
191 // Service).
192 if (stall_count++ == kLogAfterNStalls) {
193 PERFETTO_LOG("Shared memory buffer overrun! Stalling");
194 }
195
196 if (stall_count == kAssertAtNStalls) {
197 Stats stats = GetStats();
198 PERFETTO_FATAL(
199 "Shared memory buffer max stall count exceeded; possible deadlock "
200 "free=%zu bw=%zu br=%zu comp=%zu pages_free=%zu pages_err=%zu",
201 stats.chunks_free, stats.chunks_being_written,
202 stats.chunks_being_read, stats.chunks_complete, stats.pages_free,
203 stats.pages_unexpected);
204 }
205
206 // If the IPC thread itself is stalled because the current process has
207 // filled up the SMB, we need to make sure that the service can process and
208 // purge the chunks written by our process, by flushing any pending commit
209 // requests. Because other threads in our process can continue to
210 // concurrently grab, fill and commit any chunks purged by the service, it
211 // is possible that the SMB remains full and the IPC thread remains stalled,
212 // needing to flush the concurrently queued up commits again. This is
213 // particularly likely with in-process perfetto service where the IPC thread
214 // is the service thread. To avoid remaining stalled forever in such a
215 // situation, we attempt to flush periodically after every N stalls.
216 if (stall_count % kFlushCommitsAfterEveryNStalls == 0 &&
217 task_runner_runs_on_current_thread) {
218 // TODO(primiano): sending the IPC synchronously is a temporary workaround
219 // until the backpressure logic in probes_producer is sorted out. Until
220 // then the risk is that we stall the message loop waiting for the tracing
221 // service to consume the shared memory buffer (SMB) and, for this reason,
222 // never run the task that tells the service to purge the SMB. This must
223 // happen iff we are on the IPC thread, not doing this will cause
224 // deadlocks, doing this on the wrong thread causes out-of-order data
225 // commits (crbug.com/919187#c28).
226 FlushPendingCommitDataRequests();
227 } else {
228 base::SleepMicroseconds(stall_interval_us);
229 stall_interval_us =
230 std::min(kMaxStallIntervalUs, (stall_interval_us + 1) * 8);
231 }
232 }
233 }
234
ReturnCompletedChunk(Chunk chunk,MaybeUnboundBufferID target_buffer,PatchList * patch_list)235 void SharedMemoryArbiterImpl::ReturnCompletedChunk(
236 Chunk chunk,
237 MaybeUnboundBufferID target_buffer,
238 PatchList* patch_list) {
239 PERFETTO_DCHECK(chunk.is_valid());
240 const WriterID writer_id = chunk.writer_id();
241 UpdateCommitDataRequest(std::move(chunk), writer_id, target_buffer,
242 patch_list);
243 }
244
SendPatches(WriterID writer_id,MaybeUnboundBufferID target_buffer,PatchList * patch_list)245 void SharedMemoryArbiterImpl::SendPatches(WriterID writer_id,
246 MaybeUnboundBufferID target_buffer,
247 PatchList* patch_list) {
248 PERFETTO_DCHECK(!patch_list->empty() && patch_list->front().is_patched());
249 UpdateCommitDataRequest(Chunk(), writer_id, target_buffer, patch_list);
250 }
251
UpdateCommitDataRequest(Chunk chunk,WriterID writer_id,MaybeUnboundBufferID target_buffer,PatchList * patch_list)252 void SharedMemoryArbiterImpl::UpdateCommitDataRequest(
253 Chunk chunk,
254 WriterID writer_id,
255 MaybeUnboundBufferID target_buffer,
256 PatchList* patch_list) {
257 // Note: chunk will be invalid if the call came from SendPatches().
258 base::TaskRunner* task_runner_to_post_delayed_callback_on = nullptr;
259 // The delay with which the flush will be posted.
260 uint32_t flush_delay_ms = 0;
261 base::WeakPtr<SharedMemoryArbiterImpl> weak_this;
262 {
263 std::lock_guard<std::mutex> scoped_lock(lock_);
264
265 if (!commit_data_req_) {
266 commit_data_req_.reset(new CommitDataRequest());
267
268 // Flushing the commit is only supported while we're |fully_bound_|. If we
269 // aren't, we'll flush when |fully_bound_| is updated.
270 if (fully_bound_ && !delayed_flush_scheduled_) {
271 weak_this = weak_ptr_factory_.GetWeakPtr();
272 task_runner_to_post_delayed_callback_on = task_runner_;
273 flush_delay_ms = batch_commits_duration_ms_;
274 delayed_flush_scheduled_ = true;
275 }
276 }
277
278 CommitDataRequest::ChunksToMove* ctm = nullptr; // Set if chunk is valid.
279 // If a valid chunk is specified, return it and attach it to the request.
280 if (chunk.is_valid()) {
281 PERFETTO_DCHECK(chunk.writer_id() == writer_id);
282 uint8_t chunk_idx = chunk.chunk_idx();
283 bytes_pending_commit_ += chunk.size();
284 size_t page_idx;
285
286 ctm = commit_data_req_->add_chunks_to_move();
287 // If the chunk needs patching, it should not be marked as complete yet,
288 // because this would indicate to the service that the producer will not
289 // be writing to it anymore, while the producer might still apply patches
290 // to the chunk later on. In particular, when re-reading (e.g. because of
291 // periodic scraping) a completed chunk, the service expects the flags of
292 // that chunk not to be removed between reads. So, let's say the producer
293 // marked the chunk as complete here and the service then read it for the
294 // first time. If the producer then fully patched the chunk, thus removing
295 // the kChunkNeedsPatching flag, and the service re-read the chunk after
296 // the patching, the service would be thrown off by the removed flag.
297 if (direct_patching_enabled_ &&
298 (chunk.GetPacketCountAndFlags().second &
299 SharedMemoryABI::ChunkHeader::kChunkNeedsPatching)) {
300 page_idx = shmem_abi_.GetPageAndChunkIndex(std::move(chunk)).first;
301 } else {
302 // If the chunk doesn't need patching, we can mark it as complete
303 // immediately. This allows the service to read it in full while
304 // scraping, which would not be the case if the chunk was left in a
305 // kChunkBeingWritten state.
306 page_idx = shmem_abi_.ReleaseChunkAsComplete(std::move(chunk));
307 }
308
309 // DO NOT access |chunk| after this point, it has been std::move()-d
310 // above.
311 ctm->set_page(static_cast<uint32_t>(page_idx));
312 ctm->set_chunk(chunk_idx);
313 ctm->set_target_buffer(target_buffer);
314 }
315
316 // Process the completed patches for previous chunks from the |patch_list|.
317 CommitDataRequest::ChunkToPatch* last_patch_req = nullptr;
318 while (!patch_list->empty() && patch_list->front().is_patched()) {
319 Patch curr_patch = patch_list->front();
320 patch_list->pop_front();
321 // Patches for the same chunk are contiguous in the |patch_list|. So, to
322 // determine if there are any other patches that apply to the chunk that
323 // is being patched, check if the next patch in the |patch_list| applies
324 // to the same chunk.
325 bool chunk_needs_more_patching =
326 !patch_list->empty() &&
327 patch_list->front().chunk_id == curr_patch.chunk_id;
328
329 if (direct_patching_enabled_ &&
330 TryDirectPatchLocked(writer_id, curr_patch,
331 chunk_needs_more_patching)) {
332 continue;
333 }
334
335 // The chunk that this patch applies to has already been released to the
336 // service, so it cannot be patches here. Add the patch to the commit data
337 // request, so that it can be sent to the service and applied there.
338 if (!last_patch_req ||
339 last_patch_req->chunk_id() != curr_patch.chunk_id) {
340 last_patch_req = commit_data_req_->add_chunks_to_patch();
341 last_patch_req->set_writer_id(writer_id);
342 last_patch_req->set_chunk_id(curr_patch.chunk_id);
343 last_patch_req->set_target_buffer(target_buffer);
344 }
345 auto* patch = last_patch_req->add_patches();
346 patch->set_offset(curr_patch.offset);
347 patch->set_data(&curr_patch.size_field[0], curr_patch.size_field.size());
348 }
349
350 // Patches are enqueued in the |patch_list| in order and are notified to
351 // the service when the chunk is returned. The only case when the current
352 // patch list is incomplete is if there is an unpatched entry at the head of
353 // the |patch_list| that belongs to the same ChunkID as the last one we are
354 // about to send to the service.
355 if (last_patch_req && !patch_list->empty() &&
356 patch_list->front().chunk_id == last_patch_req->chunk_id()) {
357 last_patch_req->set_has_more_patches(true);
358 }
359
360 // If the buffer is filling up or if we are given a patch for a chunk
361 // that was already sent to the service, we don't want to wait for the next
362 // delayed flush to happen and we flush immediately. Otherwise, if we
363 // accumulate the patch and a crash occurs before the patch is sent, the
364 // service will not know of the patch and won't be able to reconstruct the
365 // trace.
366 if (fully_bound_ &&
367 (last_patch_req || bytes_pending_commit_ >= shmem_abi_.size() / 2)) {
368 weak_this = weak_ptr_factory_.GetWeakPtr();
369 task_runner_to_post_delayed_callback_on = task_runner_;
370 flush_delay_ms = 0;
371 }
372 } // scoped_lock(lock_)
373
374 // We shouldn't post tasks while locked.
375 // |task_runner_to_post_delayed_callback_on| remains valid after unlocking,
376 // because |task_runner_| is never reset.
377 if (task_runner_to_post_delayed_callback_on) {
378 task_runner_to_post_delayed_callback_on->PostDelayedTask(
379 [weak_this] {
380 if (!weak_this)
381 return;
382 {
383 std::lock_guard<std::mutex> scoped_lock(weak_this->lock_);
384 // Clear |delayed_flush_scheduled_|, allowing the next call to
385 // UpdateCommitDataRequest to start another batching period.
386 weak_this->delayed_flush_scheduled_ = false;
387 }
388 weak_this->FlushPendingCommitDataRequests();
389 },
390 flush_delay_ms);
391 }
392 }
393
TryDirectPatchLocked(WriterID writer_id,const Patch & patch,bool chunk_needs_more_patching)394 bool SharedMemoryArbiterImpl::TryDirectPatchLocked(
395 WriterID writer_id,
396 const Patch& patch,
397 bool chunk_needs_more_patching) {
398 // Search the chunks that are being batched in |commit_data_req_| for a chunk
399 // that needs patching and that matches the provided |writer_id| and
400 // |patch.chunk_id|. Iterate |commit_data_req_| in reverse, since
401 // |commit_data_req_| is appended to at the end with newly-returned chunks,
402 // and patches are more likely to apply to chunks that have been returned
403 // recently.
404 SharedMemoryABI::Chunk chunk;
405 bool chunk_found = false;
406 auto& chunks_to_move = commit_data_req_->chunks_to_move();
407 for (auto ctm_it = chunks_to_move.rbegin(); ctm_it != chunks_to_move.rend();
408 ++ctm_it) {
409 uint32_t header_bitmap = shmem_abi_.GetPageHeaderBitmap(ctm_it->page());
410 auto chunk_state = shmem_abi_.GetChunkStateFromHeaderBitmap(
411 header_bitmap, ctm_it->chunk());
412 // Note: the subset of |commit_data_req_| chunks that still need patching is
413 // also the subset of chunks that are still being written to. The rest of
414 // the chunks in |commit_data_req_| do not need patching and have already
415 // been marked as complete.
416 if (chunk_state != SharedMemoryABI::kChunkBeingWritten)
417 continue;
418
419 chunk = shmem_abi_.GetChunkUnchecked(ctm_it->page(), header_bitmap,
420 ctm_it->chunk());
421 if (chunk.writer_id() == writer_id &&
422 chunk.header()->chunk_id.load(std::memory_order_relaxed) ==
423 patch.chunk_id) {
424 chunk_found = true;
425 break;
426 }
427 }
428
429 if (!chunk_found) {
430 // The chunk has already been committed to the service and the patch cannot
431 // be applied in the producer.
432 return false;
433 }
434
435 // Apply the patch.
436 size_t page_idx;
437 uint8_t chunk_idx;
438 std::tie(page_idx, chunk_idx) = shmem_abi_.GetPageAndChunkIndex(chunk);
439 PERFETTO_DCHECK(shmem_abi_.GetChunkState(page_idx, chunk_idx) ==
440 SharedMemoryABI::ChunkState::kChunkBeingWritten);
441 auto chunk_begin = chunk.payload_begin();
442 uint8_t* ptr = chunk_begin + patch.offset;
443 PERFETTO_CHECK(ptr <= chunk.end() - SharedMemoryABI::kPacketHeaderSize);
444 // DCHECK that we are writing into a zero-filled size field and not into
445 // valid data. It relies on ScatteredStreamWriter::ReserveBytes() to
446 // zero-fill reservations in debug builds.
447 const char zero[SharedMemoryABI::kPacketHeaderSize]{};
448 PERFETTO_DCHECK(memcmp(ptr, &zero, SharedMemoryABI::kPacketHeaderSize) == 0);
449
450 memcpy(ptr, &patch.size_field[0], SharedMemoryABI::kPacketHeaderSize);
451
452 if (!chunk_needs_more_patching) {
453 // Mark that the chunk doesn't need more patching and mark it as complete,
454 // as the producer will not write to it anymore. This allows the service to
455 // read the chunk in full while scraping, which would not be the case if the
456 // chunk was left in a kChunkBeingWritten state.
457 chunk.ClearNeedsPatchingFlag();
458 shmem_abi_.ReleaseChunkAsComplete(std::move(chunk));
459 }
460
461 return true;
462 }
463
SetBatchCommitsDuration(uint32_t batch_commits_duration_ms)464 void SharedMemoryArbiterImpl::SetBatchCommitsDuration(
465 uint32_t batch_commits_duration_ms) {
466 std::lock_guard<std::mutex> scoped_lock(lock_);
467 batch_commits_duration_ms_ = batch_commits_duration_ms;
468 }
469
EnableDirectSMBPatching()470 bool SharedMemoryArbiterImpl::EnableDirectSMBPatching() {
471 std::lock_guard<std::mutex> scoped_lock(lock_);
472 if (!direct_patching_supported_by_service_) {
473 return false;
474 }
475
476 return direct_patching_enabled_ = true;
477 }
478
SetDirectSMBPatchingSupportedByService()479 void SharedMemoryArbiterImpl::SetDirectSMBPatchingSupportedByService() {
480 std::lock_guard<std::mutex> scoped_lock(lock_);
481 direct_patching_supported_by_service_ = true;
482 }
483
484 // This function is quite subtle. When making changes keep in mind these two
485 // challenges:
486 // 1) If the producer stalls and we happen to be on the |task_runner_| IPC
487 // thread (or, for in-process cases, on the same thread where
488 // TracingServiceImpl lives), the CommitData() call must be synchronous and
489 // not posted, to avoid deadlocks.
490 // 2) When different threads hit this function, we must guarantee that we don't
491 // accidentally make commits out of order. See commit 4e4fe8f56ef and
492 // crbug.com/919187 for more context.
FlushPendingCommitDataRequests(std::function<void ()> callback)493 void SharedMemoryArbiterImpl::FlushPendingCommitDataRequests(
494 std::function<void()> callback) {
495 std::unique_ptr<CommitDataRequest> req;
496 {
497 std::unique_lock<std::mutex> scoped_lock(lock_);
498
499 // Flushing is only supported while |fully_bound_|, and there may still be
500 // unbound startup trace writers. If so, skip the commit for now - it'll be
501 // done when |fully_bound_| is updated.
502 if (!fully_bound_) {
503 if (callback)
504 pending_flush_callbacks_.push_back(callback);
505 return;
506 }
507
508 // May be called by TraceWriterImpl on any thread.
509 base::TaskRunner* task_runner = task_runner_;
510 if (!task_runner->RunsTasksOnCurrentThread()) {
511 // We shouldn't post a task while holding a lock. |task_runner| remains
512 // valid after unlocking, because |task_runner_| is never reset.
513 scoped_lock.unlock();
514
515 auto weak_this = weak_ptr_factory_.GetWeakPtr();
516 task_runner->PostTask([weak_this, callback] {
517 if (weak_this)
518 weak_this->FlushPendingCommitDataRequests(std::move(callback));
519 });
520 return;
521 }
522
523 // |commit_data_req_| could have become a nullptr, for example when a forced
524 // sync flush happens in GetNewChunk().
525 if (commit_data_req_) {
526 // Make sure any placeholder buffer IDs from StartupWriters are replaced
527 // before sending the request.
528 bool all_placeholders_replaced =
529 ReplaceCommitPlaceholderBufferIdsLocked();
530 // We're |fully_bound_|, thus all writers are bound and all placeholders
531 // should have been replaced.
532 PERFETTO_DCHECK(all_placeholders_replaced);
533
534 // In order to allow patching in the producer we delay the kChunkComplete
535 // transition and keep batched chunks in the kChunkBeingWritten state.
536 // Since we are about to notify the service of all batched chunks, it will
537 // not be possible to apply any more patches to them and we need to move
538 // them to kChunkComplete - otherwise the service won't look at them.
539 for (auto& ctm : *commit_data_req_->mutable_chunks_to_move()) {
540 uint32_t header_bitmap = shmem_abi_.GetPageHeaderBitmap(ctm.page());
541 auto chunk_state = shmem_abi_.GetChunkStateFromHeaderBitmap(
542 header_bitmap, ctm.chunk());
543 // Note: the subset of |commit_data_req_| chunks that still need
544 // patching is also the subset of chunks that are still being written
545 // to. The rest of the chunks in |commit_data_req_| do not need patching
546 // and have already been marked as complete.
547 if (chunk_state == SharedMemoryABI::kChunkBeingWritten) {
548 auto chunk = shmem_abi_.GetChunkUnchecked(ctm.page(), header_bitmap,
549 ctm.chunk());
550 shmem_abi_.ReleaseChunkAsComplete(std::move(chunk));
551 }
552
553 if (use_shmem_emulation_) {
554 // When running in the emulation mode:
555 // 1. serialize the chunk data to |ctm| as we won't modify the chunk
556 // anymore.
557 // 2. free the chunk as the service won't be able to do this.
558 auto chunk = shmem_abi_.GetChunkUnchecked(ctm.page(), header_bitmap,
559 ctm.chunk());
560 PERFETTO_CHECK(chunk.is_valid());
561 ctm.set_data(chunk.begin(), chunk.size());
562 shmem_abi_.ReleaseChunkAsFree(std::move(chunk));
563 }
564 }
565
566 req = std::move(commit_data_req_);
567 bytes_pending_commit_ = 0;
568 }
569 } // scoped_lock
570
571 if (req) {
572 producer_endpoint_->CommitData(*req, callback);
573 } else if (callback) {
574 // If |req| was nullptr, it means that an enqueued deferred commit was
575 // executed just before this. At this point send an empty commit request
576 // to the service, just to linearize with it and give the guarantee to the
577 // caller that the data has been flushed into the service.
578 producer_endpoint_->CommitData(CommitDataRequest(), std::move(callback));
579 }
580 }
581
TryShutdown()582 bool SharedMemoryArbiterImpl::TryShutdown() {
583 std::lock_guard<std::mutex> scoped_lock(lock_);
584 did_shutdown_ = true;
585 // Shutdown is safe if there are no active trace writers for this arbiter.
586 return active_writer_ids_.IsEmpty();
587 }
588
CreateTraceWriter(BufferID target_buffer,BufferExhaustedPolicy buffer_exhausted_policy)589 std::unique_ptr<TraceWriter> SharedMemoryArbiterImpl::CreateTraceWriter(
590 BufferID target_buffer,
591 BufferExhaustedPolicy buffer_exhausted_policy) {
592 PERFETTO_CHECK(target_buffer > 0);
593 return CreateTraceWriterInternal(target_buffer, buffer_exhausted_policy);
594 }
595
CreateStartupTraceWriter(uint16_t target_buffer_reservation_id)596 std::unique_ptr<TraceWriter> SharedMemoryArbiterImpl::CreateStartupTraceWriter(
597 uint16_t target_buffer_reservation_id) {
598 return CreateTraceWriterInternal(
599 MakeTargetBufferIdForReservation(target_buffer_reservation_id),
600 BufferExhaustedPolicy::kDrop);
601 }
602
BindToProducerEndpoint(TracingService::ProducerEndpoint * producer_endpoint,base::TaskRunner * task_runner)603 void SharedMemoryArbiterImpl::BindToProducerEndpoint(
604 TracingService::ProducerEndpoint* producer_endpoint,
605 base::TaskRunner* task_runner) {
606 PERFETTO_DCHECK(producer_endpoint && task_runner);
607 PERFETTO_DCHECK(task_runner->RunsTasksOnCurrentThread());
608
609 bool should_flush = false;
610 std::function<void()> flush_callback;
611 {
612 std::lock_guard<std::mutex> scoped_lock(lock_);
613 PERFETTO_CHECK(!fully_bound_);
614 PERFETTO_CHECK(!producer_endpoint_ && !task_runner_);
615
616 producer_endpoint_ = producer_endpoint;
617 task_runner_ = task_runner;
618
619 // Now that we're bound to a task runner, also reset the WeakPtrFactory to
620 // it. Because this code runs on the task runner, the factory's weak
621 // pointers will be valid on it.
622 weak_ptr_factory_.Reset(this);
623
624 // All writers registered so far should be startup trace writers, since
625 // the producer cannot feasibly know the target buffer for any future
626 // session yet.
627 for (const auto& entry : pending_writers_) {
628 PERFETTO_CHECK(IsReservationTargetBufferId(entry.second));
629 }
630
631 // If all buffer reservations are bound, we can flush pending commits.
632 if (UpdateFullyBoundLocked()) {
633 should_flush = true;
634 flush_callback = TakePendingFlushCallbacksLocked();
635 }
636 } // scoped_lock
637
638 // Attempt to flush any pending commits (and run pending flush callbacks). If
639 // there are none, this will have no effect. If we ended up in a race that
640 // changed |fully_bound_| back to false, the commit will happen once we become
641 // |fully_bound_| again.
642 if (should_flush)
643 FlushPendingCommitDataRequests(flush_callback);
644 }
645
BindStartupTargetBuffer(uint16_t target_buffer_reservation_id,BufferID target_buffer_id)646 void SharedMemoryArbiterImpl::BindStartupTargetBuffer(
647 uint16_t target_buffer_reservation_id,
648 BufferID target_buffer_id) {
649 PERFETTO_DCHECK(target_buffer_id > 0);
650
651 std::unique_lock<std::mutex> scoped_lock(lock_);
652
653 // We should already be bound to an endpoint.
654 PERFETTO_CHECK(producer_endpoint_);
655 PERFETTO_CHECK(task_runner_);
656 PERFETTO_CHECK(task_runner_->RunsTasksOnCurrentThread());
657
658 BindStartupTargetBufferImpl(std::move(scoped_lock),
659 target_buffer_reservation_id, target_buffer_id);
660 }
661
AbortStartupTracingForReservation(uint16_t target_buffer_reservation_id)662 void SharedMemoryArbiterImpl::AbortStartupTracingForReservation(
663 uint16_t target_buffer_reservation_id) {
664 std::unique_lock<std::mutex> scoped_lock(lock_);
665
666 // If we are already bound to an arbiter, we may need to flush after aborting
667 // the session, and thus should be running on the arbiter's task runner.
668 if (task_runner_ && !task_runner_->RunsTasksOnCurrentThread()) {
669 // We shouldn't post tasks while locked.
670 auto* task_runner = task_runner_;
671 scoped_lock.unlock();
672
673 auto weak_this = weak_ptr_factory_.GetWeakPtr();
674 task_runner->PostTask([weak_this, target_buffer_reservation_id]() {
675 if (!weak_this)
676 return;
677 weak_this->AbortStartupTracingForReservation(
678 target_buffer_reservation_id);
679 });
680 return;
681 }
682
683 // Bind the target buffer reservation to an invalid buffer (ID 0), so that
684 // existing commits, as well as future commits (of currently acquired chunks),
685 // will be released as free free by the service but otherwise ignored (i.e.
686 // not copied into any valid target buffer).
687 BindStartupTargetBufferImpl(std::move(scoped_lock),
688 target_buffer_reservation_id,
689 /*target_buffer_id=*/kInvalidBufferId);
690 }
691
BindStartupTargetBufferImpl(std::unique_lock<std::mutex> scoped_lock,uint16_t target_buffer_reservation_id,BufferID target_buffer_id)692 void SharedMemoryArbiterImpl::BindStartupTargetBufferImpl(
693 std::unique_lock<std::mutex> scoped_lock,
694 uint16_t target_buffer_reservation_id,
695 BufferID target_buffer_id) {
696 // We should already be bound to an endpoint if the target buffer is valid.
697 PERFETTO_DCHECK((producer_endpoint_ && task_runner_) ||
698 target_buffer_id == kInvalidBufferId);
699
700 PERFETTO_DLOG("Binding startup target buffer reservation %" PRIu16
701 " to buffer %" PRIu16,
702 target_buffer_reservation_id, target_buffer_id);
703
704 MaybeUnboundBufferID reserved_id =
705 MakeTargetBufferIdForReservation(target_buffer_reservation_id);
706
707 bool should_flush = false;
708 std::function<void()> flush_callback;
709 std::vector<std::pair<WriterID, BufferID>> writers_to_register;
710
711 TargetBufferReservation& reservation =
712 target_buffer_reservations_[reserved_id];
713 PERFETTO_CHECK(!reservation.resolved);
714 reservation.resolved = true;
715 reservation.target_buffer = target_buffer_id;
716
717 // Collect trace writers associated with the reservation.
718 for (auto it = pending_writers_.begin(); it != pending_writers_.end();) {
719 if (it->second == reserved_id) {
720 // No need to register writers that have an invalid target buffer.
721 if (target_buffer_id != kInvalidBufferId) {
722 writers_to_register.push_back(
723 std::make_pair(it->first, target_buffer_id));
724 }
725 it = pending_writers_.erase(it);
726 } else {
727 it++;
728 }
729 }
730
731 // If all buffer reservations are bound, we can flush pending commits.
732 if (UpdateFullyBoundLocked()) {
733 should_flush = true;
734 flush_callback = TakePendingFlushCallbacksLocked();
735 }
736
737 scoped_lock.unlock();
738
739 // Register any newly bound trace writers with the service.
740 for (const auto& writer_and_target_buffer : writers_to_register) {
741 producer_endpoint_->RegisterTraceWriter(writer_and_target_buffer.first,
742 writer_and_target_buffer.second);
743 }
744
745 // Attempt to flush any pending commits (and run pending flush callbacks). If
746 // there are none, this will have no effect. If we ended up in a race that
747 // changed |fully_bound_| back to false, the commit will happen once we become
748 // |fully_bound_| again.
749 if (should_flush)
750 FlushPendingCommitDataRequests(flush_callback);
751 }
752
GetStats()753 SharedMemoryArbiterImpl::Stats SharedMemoryArbiterImpl::GetStats() {
754 std::lock_guard<std::mutex> scoped_lock(lock_);
755 Stats res;
756
757 for (size_t page_idx = 0; page_idx < shmem_abi_.num_pages(); page_idx++) {
758 uint32_t bitmap = shmem_abi_.page_header(page_idx)->header_bitmap.load(
759 std::memory_order_relaxed);
760 SharedMemoryABI::PageLayout layout =
761 SharedMemoryABI::GetLayoutFromHeaderBitmap(bitmap);
762 if (layout == SharedMemoryABI::kPageNotPartitioned) {
763 res.pages_free++;
764 } else if (layout == SharedMemoryABI::kPageDivReserved1 ||
765 layout == SharedMemoryABI::kPageDivReserved2) {
766 res.pages_unexpected++;
767 }
768 // Free and unexpected pages have zero chunks.
769 const uint32_t num_chunks =
770 SharedMemoryABI::GetNumChunksFromHeaderBitmap(bitmap);
771 for (uint32_t i = 0; i < num_chunks; i++) {
772 switch (SharedMemoryABI::GetChunkStateFromHeaderBitmap(bitmap, i)) {
773 case SharedMemoryABI::kChunkFree:
774 res.chunks_free++;
775 break;
776 case SharedMemoryABI::kChunkBeingWritten:
777 res.chunks_being_written++;
778 break;
779 case SharedMemoryABI::kChunkBeingRead:
780 res.chunks_being_read++;
781 break;
782 case SharedMemoryABI::kChunkComplete:
783 res.chunks_complete++;
784 break;
785 }
786 }
787 }
788
789 return res;
790 }
791
792 std::function<void()>
TakePendingFlushCallbacksLocked()793 SharedMemoryArbiterImpl::TakePendingFlushCallbacksLocked() {
794 if (pending_flush_callbacks_.empty())
795 return std::function<void()>();
796
797 std::vector<std::function<void()>> pending_flush_callbacks;
798 pending_flush_callbacks.swap(pending_flush_callbacks_);
799 // Capture the callback list into the lambda by copy.
800 return [pending_flush_callbacks]() {
801 for (auto& callback : pending_flush_callbacks)
802 callback();
803 };
804 }
805
NotifyFlushComplete(FlushRequestID req_id)806 void SharedMemoryArbiterImpl::NotifyFlushComplete(FlushRequestID req_id) {
807 base::TaskRunner* task_runner_to_commit_on = nullptr;
808
809 {
810 std::lock_guard<std::mutex> scoped_lock(lock_);
811 // If a commit_data_req_ exists it means that somebody else already posted a
812 // FlushPendingCommitDataRequests() task.
813 if (!commit_data_req_) {
814 commit_data_req_.reset(new CommitDataRequest());
815
816 // Flushing the commit is only supported while we're |fully_bound_|. If we
817 // aren't, we'll flush when |fully_bound_| is updated.
818 if (fully_bound_)
819 task_runner_to_commit_on = task_runner_;
820 } else {
821 // If there is another request queued and that also contains is a reply
822 // to a flush request, reply with the highest id.
823 req_id = std::max(req_id, commit_data_req_->flush_request_id());
824 }
825 commit_data_req_->set_flush_request_id(req_id);
826 } // scoped_lock
827
828 // We shouldn't post tasks while locked. |task_runner_to_commit_on|
829 // remains valid after unlocking, because |task_runner_| is never reset.
830 if (task_runner_to_commit_on) {
831 auto weak_this = weak_ptr_factory_.GetWeakPtr();
832 task_runner_to_commit_on->PostTask([weak_this] {
833 if (weak_this)
834 weak_this->FlushPendingCommitDataRequests();
835 });
836 }
837 }
838
CreateTraceWriterInternal(MaybeUnboundBufferID target_buffer,BufferExhaustedPolicy buffer_exhausted_policy)839 std::unique_ptr<TraceWriter> SharedMemoryArbiterImpl::CreateTraceWriterInternal(
840 MaybeUnboundBufferID target_buffer,
841 BufferExhaustedPolicy buffer_exhausted_policy) {
842 WriterID id;
843 base::TaskRunner* task_runner_to_register_on = nullptr;
844
845 {
846 std::lock_guard<std::mutex> scoped_lock(lock_);
847 if (did_shutdown_)
848 return std::unique_ptr<TraceWriter>(new NullTraceWriter());
849
850 id = active_writer_ids_.Allocate();
851 if (!id)
852 return std::unique_ptr<TraceWriter>(new NullTraceWriter());
853
854 PERFETTO_DCHECK(!pending_writers_.count(id));
855
856 if (IsReservationTargetBufferId(target_buffer)) {
857 // If the reservation is new, mark it as unbound in
858 // |target_buffer_reservations_|. Otherwise, if the reservation was
859 // already bound, choose the bound buffer ID now.
860 auto it_and_inserted = target_buffer_reservations_.insert(
861 {target_buffer, TargetBufferReservation()});
862 if (it_and_inserted.first->second.resolved)
863 target_buffer = it_and_inserted.first->second.target_buffer;
864 }
865
866 if (IsReservationTargetBufferId(target_buffer)) {
867 // The arbiter and/or startup buffer reservations are not bound yet, so
868 // buffer the registration of the writer until after we're bound.
869 pending_writers_[id] = target_buffer;
870
871 // Mark the arbiter as not fully bound, since we now have at least one
872 // unbound trace writer / target buffer reservation.
873 fully_bound_ = false;
874 was_always_bound_ = false;
875 } else if (target_buffer != kInvalidBufferId) {
876 // Trace writer is bound, so arbiter should be bound to an endpoint, too.
877 PERFETTO_CHECK(producer_endpoint_ && task_runner_);
878 task_runner_to_register_on = task_runner_;
879 }
880
881 // All trace writers must use kDrop policy if the arbiter ever becomes
882 // unbound.
883 bool uses_drop_policy =
884 buffer_exhausted_policy == BufferExhaustedPolicy::kDrop;
885 all_writers_have_drop_policy_ &= uses_drop_policy;
886 PERFETTO_DCHECK(fully_bound_ || uses_drop_policy);
887 PERFETTO_CHECK(fully_bound_ || all_writers_have_drop_policy_);
888 PERFETTO_CHECK(was_always_bound_ || uses_drop_policy);
889 } // scoped_lock
890
891 // We shouldn't post tasks while locked. |task_runner_to_register_on|
892 // remains valid after unlocking, because |task_runner_| is never reset.
893 if (task_runner_to_register_on) {
894 auto weak_this = weak_ptr_factory_.GetWeakPtr();
895 task_runner_to_register_on->PostTask([weak_this, id, target_buffer] {
896 if (weak_this)
897 weak_this->producer_endpoint_->RegisterTraceWriter(id, target_buffer);
898 });
899 }
900
901 return std::unique_ptr<TraceWriter>(
902 new TraceWriterImpl(this, id, target_buffer, buffer_exhausted_policy));
903 }
904
ReleaseWriterID(WriterID id)905 void SharedMemoryArbiterImpl::ReleaseWriterID(WriterID id) {
906 base::TaskRunner* task_runner = nullptr;
907 base::WeakPtr<SharedMemoryArbiterImpl> weak_this;
908 {
909 std::lock_guard<std::mutex> scoped_lock(lock_);
910 active_writer_ids_.Free(id);
911
912 auto it = pending_writers_.find(id);
913 if (it != pending_writers_.end()) {
914 // Writer hasn't been bound yet and thus also not yet registered with the
915 // service.
916 pending_writers_.erase(it);
917 return;
918 }
919
920 // A trace writer from an aborted session may be destroyed before the
921 // arbiter is bound to a task runner. In that case, it was never registered
922 // with the service.
923 if (!task_runner_)
924 return;
925
926 // If `active_writer_ids_` is empty, `TryShutdown()` can return true
927 // and `*this` can be deleted. Let's grab everything we need from `*this`
928 // before releasing the lock.
929 weak_this = weak_ptr_factory_.GetWeakPtr();
930 task_runner = task_runner_;
931 } // scoped_lock
932
933 // We shouldn't post tasks while locked. |task_runner| remains valid after
934 // unlocking, because |task_runner_| is never reset.
935 task_runner->PostTask([weak_this, id] {
936 if (weak_this)
937 weak_this->producer_endpoint_->UnregisterTraceWriter(id);
938 });
939 }
940
ReplaceCommitPlaceholderBufferIdsLocked()941 bool SharedMemoryArbiterImpl::ReplaceCommitPlaceholderBufferIdsLocked() {
942 if (!commit_data_req_)
943 return true;
944
945 bool all_placeholders_replaced = true;
946 for (auto& chunk : *commit_data_req_->mutable_chunks_to_move()) {
947 if (!IsReservationTargetBufferId(chunk.target_buffer()))
948 continue;
949 const auto it = target_buffer_reservations_.find(chunk.target_buffer());
950 PERFETTO_DCHECK(it != target_buffer_reservations_.end());
951 if (!it->second.resolved) {
952 all_placeholders_replaced = false;
953 continue;
954 }
955 chunk.set_target_buffer(it->second.target_buffer);
956 }
957 for (auto& chunk : *commit_data_req_->mutable_chunks_to_patch()) {
958 if (!IsReservationTargetBufferId(chunk.target_buffer()))
959 continue;
960 const auto it = target_buffer_reservations_.find(chunk.target_buffer());
961 PERFETTO_DCHECK(it != target_buffer_reservations_.end());
962 if (!it->second.resolved) {
963 all_placeholders_replaced = false;
964 continue;
965 }
966 chunk.set_target_buffer(it->second.target_buffer);
967 }
968 return all_placeholders_replaced;
969 }
970
UpdateFullyBoundLocked()971 bool SharedMemoryArbiterImpl::UpdateFullyBoundLocked() {
972 if (!producer_endpoint_) {
973 PERFETTO_DCHECK(!fully_bound_);
974 return false;
975 }
976 // We're fully bound if all target buffer reservations have a valid associated
977 // BufferID.
978 fully_bound_ = std::none_of(
979 target_buffer_reservations_.begin(), target_buffer_reservations_.end(),
980 [](std::pair<MaybeUnboundBufferID, TargetBufferReservation> entry) {
981 return !entry.second.resolved;
982 });
983 if (!fully_bound_)
984 was_always_bound_ = false;
985 return fully_bound_;
986 }
987
988 } // namespace perfetto
989