1 /*
2 * Copyright (C) 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/tracing/internal/tracing_muxer_impl.h"
18
19 #include <algorithm>
20 #include <atomic>
21 #include <mutex>
22 #include <optional>
23 #include <vector>
24
25 #include "perfetto/base/build_config.h"
26 #include "perfetto/base/logging.h"
27 #include "perfetto/base/task_runner.h"
28 #include "perfetto/base/time.h"
29 #include "perfetto/ext/base/hash.h"
30 #include "perfetto/ext/base/thread_checker.h"
31 #include "perfetto/ext/base/waitable_event.h"
32 #include "perfetto/ext/tracing/core/shared_memory_arbiter.h"
33 #include "perfetto/ext/tracing/core/trace_packet.h"
34 #include "perfetto/ext/tracing/core/trace_stats.h"
35 #include "perfetto/ext/tracing/core/trace_writer.h"
36 #include "perfetto/ext/tracing/core/tracing_service.h"
37 #include "perfetto/tracing/buffer_exhausted_policy.h"
38 #include "perfetto/tracing/core/data_source_config.h"
39 #include "perfetto/tracing/core/tracing_service_state.h"
40 #include "perfetto/tracing/data_source.h"
41 #include "perfetto/tracing/internal/data_source_internal.h"
42 #include "perfetto/tracing/internal/interceptor_trace_writer.h"
43 #include "perfetto/tracing/internal/tracing_backend_fake.h"
44 #include "perfetto/tracing/trace_writer_base.h"
45 #include "perfetto/tracing/tracing.h"
46 #include "perfetto/tracing/tracing_backend.h"
47 #include "src/tracing/core/null_trace_writer.h"
48 #include "src/tracing/internal/tracing_muxer_fake.h"
49
50 #include "protos/perfetto/config/interceptor_config.gen.h"
51
52 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
53 #include <io.h> // For dup()
54 #else
55 #include <unistd.h> // For dup()
56 #endif
57
58 namespace perfetto {
59 namespace internal {
60
61 namespace {
62
63 using RegisteredDataSource = TracingMuxerImpl::RegisteredDataSource;
64
65 // A task runner which prevents calls to DataSource::Trace() while an operation
66 // is in progress. Used to guard against unexpected re-entrancy where the
67 // user-provided task runner implementation tries to enter a trace point under
68 // the hood.
69 class NonReentrantTaskRunner : public base::TaskRunner {
70 public:
NonReentrantTaskRunner(TracingMuxer * muxer,std::unique_ptr<base::TaskRunner> task_runner)71 NonReentrantTaskRunner(TracingMuxer* muxer,
72 std::unique_ptr<base::TaskRunner> task_runner)
73 : muxer_(muxer), task_runner_(std::move(task_runner)) {}
74
75 // base::TaskRunner implementation.
PostTask(std::function<void ()> task)76 void PostTask(std::function<void()> task) override {
77 CallWithGuard([&] { task_runner_->PostTask(std::move(task)); });
78 }
79
PostDelayedTask(std::function<void ()> task,uint32_t delay_ms)80 void PostDelayedTask(std::function<void()> task, uint32_t delay_ms) override {
81 CallWithGuard(
82 [&] { task_runner_->PostDelayedTask(std::move(task), delay_ms); });
83 }
84
AddFileDescriptorWatch(base::PlatformHandle fd,std::function<void ()> callback)85 void AddFileDescriptorWatch(base::PlatformHandle fd,
86 std::function<void()> callback) override {
87 CallWithGuard(
88 [&] { task_runner_->AddFileDescriptorWatch(fd, std::move(callback)); });
89 }
90
RemoveFileDescriptorWatch(base::PlatformHandle fd)91 void RemoveFileDescriptorWatch(base::PlatformHandle fd) override {
92 CallWithGuard([&] { task_runner_->RemoveFileDescriptorWatch(fd); });
93 }
94
RunsTasksOnCurrentThread() const95 bool RunsTasksOnCurrentThread() const override {
96 bool result;
97 CallWithGuard([&] { result = task_runner_->RunsTasksOnCurrentThread(); });
98 return result;
99 }
100
101 private:
102 template <typename T>
CallWithGuard(T lambda) const103 void CallWithGuard(T lambda) const {
104 auto* root_tls = muxer_->GetOrCreateTracingTLS();
105 if (PERFETTO_UNLIKELY(root_tls->is_in_trace_point)) {
106 lambda();
107 return;
108 }
109 ScopedReentrancyAnnotator scoped_annotator(*root_tls);
110 lambda();
111 }
112
113 TracingMuxer* const muxer_;
114 std::unique_ptr<base::TaskRunner> task_runner_;
115 };
116
117 class StopArgsImpl : public DataSourceBase::StopArgs {
118 public:
HandleStopAsynchronously() const119 std::function<void()> HandleStopAsynchronously() const override {
120 auto closure = std::move(async_stop_closure);
121 async_stop_closure = std::function<void()>();
122 return closure;
123 }
124
125 mutable std::function<void()> async_stop_closure;
126 };
127
128 class FlushArgsImpl : public DataSourceBase::FlushArgs {
129 public:
HandleFlushAsynchronously() const130 std::function<void()> HandleFlushAsynchronously() const override {
131 auto closure = std::move(async_flush_closure);
132 async_flush_closure = std::function<void()>();
133 return closure;
134 }
135
136 mutable std::function<void()> async_flush_closure;
137 };
138
139 // Holds an earlier TracingMuxerImpl instance after ResetForTesting() is called.
140 static TracingMuxerImpl* g_prev_instance{};
141
142 template <typename RegisteredBackend>
143 struct CompareBackendByType {
BackendTypePriorityperfetto::internal::__anon6824a9f60111::CompareBackendByType144 static int BackendTypePriority(BackendType type) {
145 switch (type) {
146 case kSystemBackend:
147 return 0;
148 case kInProcessBackend:
149 return 1;
150 case kCustomBackend:
151 return 2;
152 // The UnspecifiedBackend has the highest priority so that
153 // TracingBackendFake is the last one on the backend lists.
154 case kUnspecifiedBackend:
155 break;
156 }
157 return 3;
158 }
operator ()perfetto::internal::__anon6824a9f60111::CompareBackendByType159 bool operator()(BackendType type, const RegisteredBackend& b) {
160 return BackendTypePriority(type) < BackendTypePriority(b.type);
161 }
162 };
163
164 } // namespace
165
166 // ----- Begin of TracingMuxerImpl::ProducerImpl
ProducerImpl(TracingMuxerImpl * muxer,TracingBackendId backend_id,uint32_t shmem_batch_commits_duration_ms,bool shmem_direct_patching_enabled)167 TracingMuxerImpl::ProducerImpl::ProducerImpl(
168 TracingMuxerImpl* muxer,
169 TracingBackendId backend_id,
170 uint32_t shmem_batch_commits_duration_ms,
171 bool shmem_direct_patching_enabled)
172 : muxer_(muxer),
173 backend_id_(backend_id),
174 shmem_batch_commits_duration_ms_(shmem_batch_commits_duration_ms),
175 shmem_direct_patching_enabled_(shmem_direct_patching_enabled) {}
176
~ProducerImpl()177 TracingMuxerImpl::ProducerImpl::~ProducerImpl() {
178 muxer_ = nullptr;
179 }
180
Initialize(std::unique_ptr<ProducerEndpoint> endpoint)181 void TracingMuxerImpl::ProducerImpl::Initialize(
182 std::unique_ptr<ProducerEndpoint> endpoint) {
183 PERFETTO_DCHECK_THREAD(thread_checker_);
184 PERFETTO_DCHECK(!connected_);
185 connection_id_.fetch_add(1, std::memory_order_relaxed);
186 is_producer_provided_smb_ = endpoint->shared_memory();
187 last_startup_target_buffer_reservation_ = 0;
188
189 // Adopt the endpoint into a shared pointer so that we can safely share it
190 // across threads that create trace writers. The custom deleter function
191 // ensures that the endpoint is always destroyed on the muxer's thread. (Note
192 // that |task_runner| is assumed to outlive tracing sessions on all threads.)
193 auto* task_runner = muxer_->task_runner_.get();
194 auto deleter = [task_runner](ProducerEndpoint* e) {
195 if (task_runner->RunsTasksOnCurrentThread()) {
196 delete e;
197 return;
198 }
199 task_runner->PostTask([e] { delete e; });
200 };
201 std::shared_ptr<ProducerEndpoint> service(endpoint.release(), deleter);
202 // This atomic store is needed because another thread might be concurrently
203 // creating a trace writer using the previous (disconnected) |service_|. See
204 // CreateTraceWriter().
205 std::atomic_store(&service_, std::move(service));
206 // Don't try to use the service here since it may not have connected yet. See
207 // OnConnect().
208 }
209
OnConnect()210 void TracingMuxerImpl::ProducerImpl::OnConnect() {
211 PERFETTO_DLOG("Producer connected");
212 PERFETTO_DCHECK_THREAD(thread_checker_);
213 PERFETTO_DCHECK(!connected_);
214 if (is_producer_provided_smb_ && !service_->IsShmemProvidedByProducer()) {
215 PERFETTO_ELOG(
216 "The service likely doesn't support producer-provided SMBs. Preventing "
217 "future attempts to use producer-provided SMB again with this "
218 "backend.");
219 producer_provided_smb_failed_ = true;
220 // Will call OnDisconnect() and cause a reconnect without producer-provided
221 // SMB.
222 service_->Disconnect();
223 return;
224 }
225 connected_ = true;
226 muxer_->UpdateDataSourcesOnAllBackends();
227 SendOnConnectTriggers();
228 }
229
OnDisconnect()230 void TracingMuxerImpl::ProducerImpl::OnDisconnect() {
231 PERFETTO_DCHECK_THREAD(thread_checker_);
232 // If we're being destroyed, bail out.
233 if (!muxer_)
234 return;
235 connected_ = false;
236 // Active data sources for this producer will be stopped by
237 // DestroyStoppedTraceWritersForCurrentThread() since the reconnected producer
238 // will have a different connection id (even before it has finished
239 // connecting).
240 registered_data_sources_.reset();
241 DisposeConnection();
242
243 // Try reconnecting the producer.
244 muxer_->OnProducerDisconnected(this);
245 }
246
DisposeConnection()247 void TracingMuxerImpl::ProducerImpl::DisposeConnection() {
248 // Keep the old service around as a dead connection in case it has active
249 // trace writers. If any tracing sessions were created, we can't clear
250 // |service_| here because other threads may be concurrently creating new
251 // trace writers. Any reconnection attempt will atomically swap the new
252 // service in place of the old one.
253 if (did_setup_tracing_ || did_setup_startup_tracing_) {
254 dead_services_.push_back(service_);
255 } else {
256 service_.reset();
257 }
258 }
259
OnTracingSetup()260 void TracingMuxerImpl::ProducerImpl::OnTracingSetup() {
261 PERFETTO_DCHECK_THREAD(thread_checker_);
262 did_setup_tracing_ = true;
263 service_->MaybeSharedMemoryArbiter()->SetBatchCommitsDuration(
264 shmem_batch_commits_duration_ms_);
265 if (shmem_direct_patching_enabled_) {
266 service_->MaybeSharedMemoryArbiter()->EnableDirectSMBPatching();
267 }
268 }
269
OnStartupTracingSetup()270 void TracingMuxerImpl::ProducerImpl::OnStartupTracingSetup() {
271 PERFETTO_DCHECK_THREAD(thread_checker_);
272 did_setup_startup_tracing_ = true;
273 }
274
SetupDataSource(DataSourceInstanceID id,const DataSourceConfig & cfg)275 void TracingMuxerImpl::ProducerImpl::SetupDataSource(
276 DataSourceInstanceID id,
277 const DataSourceConfig& cfg) {
278 PERFETTO_DCHECK_THREAD(thread_checker_);
279 if (!muxer_)
280 return;
281 muxer_->SetupDataSource(
282 backend_id_, connection_id_.load(std::memory_order_relaxed), id, cfg);
283 }
284
StartDataSource(DataSourceInstanceID id,const DataSourceConfig &)285 void TracingMuxerImpl::ProducerImpl::StartDataSource(DataSourceInstanceID id,
286 const DataSourceConfig&) {
287 PERFETTO_DCHECK_THREAD(thread_checker_);
288 if (!muxer_)
289 return;
290 muxer_->StartDataSource(backend_id_, id);
291 service_->NotifyDataSourceStarted(id);
292 }
293
StopDataSource(DataSourceInstanceID id)294 void TracingMuxerImpl::ProducerImpl::StopDataSource(DataSourceInstanceID id) {
295 PERFETTO_DCHECK_THREAD(thread_checker_);
296 if (!muxer_)
297 return;
298 muxer_->StopDataSource_AsyncBegin(backend_id_, id);
299 }
300
Flush(FlushRequestID flush_id,const DataSourceInstanceID * instances,size_t instance_count,FlushFlags flush_flags)301 void TracingMuxerImpl::ProducerImpl::Flush(
302 FlushRequestID flush_id,
303 const DataSourceInstanceID* instances,
304 size_t instance_count,
305 FlushFlags flush_flags) {
306 PERFETTO_DCHECK_THREAD(thread_checker_);
307 bool all_handled = true;
308 if (muxer_) {
309 for (size_t i = 0; i < instance_count; i++) {
310 DataSourceInstanceID ds_id = instances[i];
311 bool handled = muxer_->FlushDataSource_AsyncBegin(backend_id_, ds_id,
312 flush_id, flush_flags);
313 if (!handled) {
314 pending_flushes_[flush_id].insert(ds_id);
315 all_handled = false;
316 }
317 }
318 }
319
320 if (all_handled) {
321 service_->NotifyFlushComplete(flush_id);
322 }
323 }
324
ClearIncrementalState(const DataSourceInstanceID * instances,size_t instance_count)325 void TracingMuxerImpl::ProducerImpl::ClearIncrementalState(
326 const DataSourceInstanceID* instances,
327 size_t instance_count) {
328 PERFETTO_DCHECK_THREAD(thread_checker_);
329 if (!muxer_)
330 return;
331 for (size_t inst_idx = 0; inst_idx < instance_count; inst_idx++) {
332 muxer_->ClearDataSourceIncrementalState(backend_id_, instances[inst_idx]);
333 }
334 }
335
SweepDeadServices()336 bool TracingMuxerImpl::ProducerImpl::SweepDeadServices() {
337 PERFETTO_DCHECK_THREAD(thread_checker_);
338 auto is_unused = [](const std::shared_ptr<ProducerEndpoint>& endpoint) {
339 auto* arbiter = endpoint->MaybeSharedMemoryArbiter();
340 return !arbiter || arbiter->TryShutdown();
341 };
342 for (auto it = dead_services_.begin(); it != dead_services_.end();) {
343 auto next_it = it;
344 next_it++;
345 if (is_unused(*it)) {
346 dead_services_.erase(it);
347 }
348 it = next_it;
349 }
350 return dead_services_.empty();
351 }
352
SendOnConnectTriggers()353 void TracingMuxerImpl::ProducerImpl::SendOnConnectTriggers() {
354 PERFETTO_DCHECK_THREAD(thread_checker_);
355 base::TimeMillis now = base::GetWallTimeMs();
356 std::vector<std::string> triggers;
357 while (!on_connect_triggers_.empty()) {
358 // Skip if we passed TTL.
359 if (on_connect_triggers_.front().second > now) {
360 triggers.push_back(std::move(on_connect_triggers_.front().first));
361 }
362 on_connect_triggers_.pop_front();
363 }
364 if (!triggers.empty()) {
365 service_->ActivateTriggers(triggers);
366 }
367 }
368
NotifyFlushForDataSourceDone(DataSourceInstanceID ds_id,FlushRequestID flush_id)369 void TracingMuxerImpl::ProducerImpl::NotifyFlushForDataSourceDone(
370 DataSourceInstanceID ds_id,
371 FlushRequestID flush_id) {
372 if (!connected_) {
373 return;
374 }
375
376 {
377 auto it = pending_flushes_.find(flush_id);
378 if (it == pending_flushes_.end()) {
379 return;
380 }
381 std::set<DataSourceInstanceID>& ds_ids = it->second;
382 ds_ids.erase(ds_id);
383 }
384
385 std::optional<DataSourceInstanceID> biggest_flush_id;
386 for (auto it = pending_flushes_.begin(); it != pending_flushes_.end();) {
387 if (it->second.empty()) {
388 biggest_flush_id = it->first;
389 it = pending_flushes_.erase(it);
390 } else {
391 break;
392 }
393 }
394
395 if (biggest_flush_id) {
396 service_->NotifyFlushComplete(*biggest_flush_id);
397 }
398 }
399
400 // ----- End of TracingMuxerImpl::ProducerImpl methods.
401
402 // ----- Begin of TracingMuxerImpl::ConsumerImpl
ConsumerImpl(TracingMuxerImpl * muxer,BackendType backend_type,TracingSessionGlobalID session_id)403 TracingMuxerImpl::ConsumerImpl::ConsumerImpl(TracingMuxerImpl* muxer,
404 BackendType backend_type,
405 TracingSessionGlobalID session_id)
406 : muxer_(muxer), backend_type_(backend_type), session_id_(session_id) {}
407
~ConsumerImpl()408 TracingMuxerImpl::ConsumerImpl::~ConsumerImpl() {
409 muxer_ = nullptr;
410 }
411
Initialize(std::unique_ptr<ConsumerEndpoint> endpoint)412 void TracingMuxerImpl::ConsumerImpl::Initialize(
413 std::unique_ptr<ConsumerEndpoint> endpoint) {
414 PERFETTO_DCHECK_THREAD(thread_checker_);
415 service_ = std::move(endpoint);
416 // Don't try to use the service here since it may not have connected yet. See
417 // OnConnect().
418 }
419
OnConnect()420 void TracingMuxerImpl::ConsumerImpl::OnConnect() {
421 PERFETTO_DCHECK_THREAD(thread_checker_);
422 PERFETTO_DCHECK(!connected_);
423 connected_ = true;
424
425 // Observe data source instance events so we get notified when tracing starts.
426 service_->ObserveEvents(ObservableEvents::TYPE_DATA_SOURCES_INSTANCES |
427 ObservableEvents::TYPE_ALL_DATA_SOURCES_STARTED);
428
429 // If the API client configured and started tracing before we connected,
430 // tell the backend about it now.
431 if (trace_config_)
432 muxer_->SetupTracingSession(session_id_, trace_config_);
433 if (start_pending_)
434 muxer_->StartTracingSession(session_id_);
435 if (get_trace_stats_pending_) {
436 auto callback = std::move(get_trace_stats_callback_);
437 get_trace_stats_callback_ = nullptr;
438 muxer_->GetTraceStats(session_id_, std::move(callback));
439 }
440 if (query_service_state_callback_) {
441 auto callback = std::move(query_service_state_callback_);
442 query_service_state_callback_ = nullptr;
443 muxer_->QueryServiceState(session_id_, std::move(callback));
444 }
445 if (session_to_clone_) {
446 service_->CloneSession(*session_to_clone_);
447 session_to_clone_ = std::nullopt;
448 }
449
450 if (stop_pending_)
451 muxer_->StopTracingSession(session_id_);
452 }
453
OnDisconnect()454 void TracingMuxerImpl::ConsumerImpl::OnDisconnect() {
455 PERFETTO_DCHECK_THREAD(thread_checker_);
456 // If we're being destroyed, bail out.
457 if (!muxer_)
458 return;
459 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
460 if (!connected_ && backend_type_ == kSystemBackend) {
461 PERFETTO_ELOG(
462 "Unable to connect to the system tracing service as a consumer. On "
463 "Android, use the \"perfetto\" command line tool instead to start "
464 "system-wide tracing sessions");
465 }
466 #endif // PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
467
468 // Notify the client about disconnection.
469 NotifyError(TracingError{TracingError::kDisconnected, "Peer disconnected"});
470
471 // Make sure the client doesn't hang in a blocking start/stop because of the
472 // disconnection.
473 NotifyStartComplete();
474 NotifyStopComplete();
475
476 // It shouldn't be necessary to call StopTracingSession. If we get this call
477 // it means that the service did shutdown before us, so there is no point
478 // trying it to ask it to stop the session. We should just remember to cleanup
479 // the consumer vector.
480 connected_ = false;
481
482 // Notify the muxer that it is safe to destroy |this|. This is needed because
483 // the ConsumerEndpoint stored in |service_| requires that |this| be safe to
484 // access until OnDisconnect() is called.
485 muxer_->OnConsumerDisconnected(this);
486 }
487
Disconnect()488 void TracingMuxerImpl::ConsumerImpl::Disconnect() {
489 // This is weird and deserves a comment.
490 //
491 // When we called the ConnectConsumer method on the service it returns
492 // us a ConsumerEndpoint which we stored in |service_|, however this
493 // ConsumerEndpoint holds a pointer to the ConsumerImpl pointed to by
494 // |this|. Part of the API contract to TracingService::ConnectConsumer is that
495 // the ConsumerImpl pointer has to be valid until the
496 // ConsumerImpl::OnDisconnect method is called. Therefore we reset the
497 // ConsumerEndpoint |service_|. Eventually this will call
498 // ConsumerImpl::OnDisconnect and we will inform the muxer it is safe to
499 // call the destructor of |this|.
500 service_.reset();
501 }
502
OnTracingDisabled(const std::string & error)503 void TracingMuxerImpl::ConsumerImpl::OnTracingDisabled(
504 const std::string& error) {
505 PERFETTO_DCHECK_THREAD(thread_checker_);
506 PERFETTO_DCHECK(!stopped_);
507 stopped_ = true;
508
509 if (!error.empty())
510 NotifyError(TracingError{TracingError::kTracingFailed, error});
511
512 // If we're still waiting for the start event, fire it now. This may happen if
513 // there are no active data sources in the session.
514 NotifyStartComplete();
515 NotifyStopComplete();
516 }
517
NotifyStartComplete()518 void TracingMuxerImpl::ConsumerImpl::NotifyStartComplete() {
519 PERFETTO_DCHECK_THREAD(thread_checker_);
520 if (start_complete_callback_) {
521 muxer_->task_runner_->PostTask(std::move(start_complete_callback_));
522 start_complete_callback_ = nullptr;
523 }
524 if (blocking_start_complete_callback_) {
525 muxer_->task_runner_->PostTask(
526 std::move(blocking_start_complete_callback_));
527 blocking_start_complete_callback_ = nullptr;
528 }
529 }
530
NotifyError(const TracingError & error)531 void TracingMuxerImpl::ConsumerImpl::NotifyError(const TracingError& error) {
532 PERFETTO_DCHECK_THREAD(thread_checker_);
533 if (error_callback_) {
534 muxer_->task_runner_->PostTask(
535 std::bind(std::move(error_callback_), error));
536 }
537 }
538
NotifyStopComplete()539 void TracingMuxerImpl::ConsumerImpl::NotifyStopComplete() {
540 PERFETTO_DCHECK_THREAD(thread_checker_);
541 if (stop_complete_callback_) {
542 muxer_->task_runner_->PostTask(std::move(stop_complete_callback_));
543 stop_complete_callback_ = nullptr;
544 }
545 if (blocking_stop_complete_callback_) {
546 muxer_->task_runner_->PostTask(std::move(blocking_stop_complete_callback_));
547 blocking_stop_complete_callback_ = nullptr;
548 }
549 }
550
OnTraceData(std::vector<TracePacket> packets,bool has_more)551 void TracingMuxerImpl::ConsumerImpl::OnTraceData(
552 std::vector<TracePacket> packets,
553 bool has_more) {
554 PERFETTO_DCHECK_THREAD(thread_checker_);
555 if (!read_trace_callback_)
556 return;
557
558 size_t capacity = 0;
559 for (const auto& packet : packets) {
560 // 16 is an over-estimation of the proto preamble size
561 capacity += packet.size() + 16;
562 }
563
564 // The shared_ptr is to avoid making a copy of the buffer when PostTask-ing.
565 std::shared_ptr<std::vector<char>> buf(new std::vector<char>());
566 buf->reserve(capacity);
567 for (auto& packet : packets) {
568 char* start;
569 size_t size;
570 std::tie(start, size) = packet.GetProtoPreamble();
571 buf->insert(buf->end(), start, start + size);
572 for (auto& slice : packet.slices()) {
573 const auto* slice_data = reinterpret_cast<const char*>(slice.start);
574 buf->insert(buf->end(), slice_data, slice_data + slice.size);
575 }
576 }
577
578 auto callback = read_trace_callback_;
579 muxer_->task_runner_->PostTask([callback, buf, has_more] {
580 TracingSession::ReadTraceCallbackArgs callback_arg{};
581 callback_arg.data = buf->empty() ? nullptr : &(*buf)[0];
582 callback_arg.size = buf->size();
583 callback_arg.has_more = has_more;
584 callback(callback_arg);
585 });
586
587 if (!has_more)
588 read_trace_callback_ = nullptr;
589 }
590
OnObservableEvents(const ObservableEvents & events)591 void TracingMuxerImpl::ConsumerImpl::OnObservableEvents(
592 const ObservableEvents& events) {
593 if (events.instance_state_changes_size()) {
594 for (const auto& state_change : events.instance_state_changes()) {
595 DataSourceHandle handle{state_change.producer_name(),
596 state_change.data_source_name()};
597 data_source_states_[handle] =
598 state_change.state() ==
599 ObservableEvents::DATA_SOURCE_INSTANCE_STATE_STARTED;
600 }
601 }
602
603 if (events.instance_state_changes_size() ||
604 events.all_data_sources_started()) {
605 // Data sources are first reported as being stopped before starting, so once
606 // all the data sources we know about have started we can declare tracing
607 // begun. In the case where there are no matching data sources for the
608 // session, the service will report the all_data_sources_started() event
609 // without adding any instances (only since Android S / Perfetto v10.0).
610 if (start_complete_callback_ || blocking_start_complete_callback_) {
611 bool all_data_sources_started = std::all_of(
612 data_source_states_.cbegin(), data_source_states_.cend(),
613 [](std::pair<DataSourceHandle, bool> state) { return state.second; });
614 if (all_data_sources_started)
615 NotifyStartComplete();
616 }
617 }
618 }
619
OnSessionCloned(const OnSessionClonedArgs & args)620 void TracingMuxerImpl::ConsumerImpl::OnSessionCloned(
621 const OnSessionClonedArgs& args) {
622 if (!clone_trace_callback_)
623 return;
624 TracingSession::CloneTraceCallbackArgs callback_arg{};
625 callback_arg.success = args.success;
626 callback_arg.error = std::move(args.error);
627 callback_arg.uuid_msb = args.uuid.msb();
628 callback_arg.uuid_lsb = args.uuid.lsb();
629 muxer_->task_runner_->PostTask(
630 std::bind(std::move(clone_trace_callback_), std::move(callback_arg)));
631 clone_trace_callback_ = nullptr;
632 }
633
OnTraceStats(bool success,const TraceStats & trace_stats)634 void TracingMuxerImpl::ConsumerImpl::OnTraceStats(
635 bool success,
636 const TraceStats& trace_stats) {
637 if (!get_trace_stats_callback_)
638 return;
639 TracingSession::GetTraceStatsCallbackArgs callback_arg{};
640 callback_arg.success = success;
641 callback_arg.trace_stats_data = trace_stats.SerializeAsArray();
642 muxer_->task_runner_->PostTask(
643 std::bind(std::move(get_trace_stats_callback_), std::move(callback_arg)));
644 get_trace_stats_callback_ = nullptr;
645 }
646
647 // The callbacks below are not used.
OnDetach(bool)648 void TracingMuxerImpl::ConsumerImpl::OnDetach(bool) {}
OnAttach(bool,const TraceConfig &)649 void TracingMuxerImpl::ConsumerImpl::OnAttach(bool, const TraceConfig&) {}
650 // ----- End of TracingMuxerImpl::ConsumerImpl
651
652 // ----- Begin of TracingMuxerImpl::TracingSessionImpl
653
654 // TracingSessionImpl is the RAII object returned to API clients when they
655 // invoke Tracing::CreateTracingSession. They use it for starting/stopping
656 // tracing.
657
TracingSessionImpl(TracingMuxerImpl * muxer,TracingSessionGlobalID session_id,BackendType backend_type)658 TracingMuxerImpl::TracingSessionImpl::TracingSessionImpl(
659 TracingMuxerImpl* muxer,
660 TracingSessionGlobalID session_id,
661 BackendType backend_type)
662 : muxer_(muxer), session_id_(session_id), backend_type_(backend_type) {}
663
664 // Can be destroyed from any thread.
~TracingSessionImpl()665 TracingMuxerImpl::TracingSessionImpl::~TracingSessionImpl() {
666 auto* muxer = muxer_;
667 auto session_id = session_id_;
668 muxer->task_runner_->PostTask(
669 [muxer, session_id] { muxer->DestroyTracingSession(session_id); });
670 }
671
672 // Can be called from any thread.
Setup(const TraceConfig & cfg,int fd)673 void TracingMuxerImpl::TracingSessionImpl::Setup(const TraceConfig& cfg,
674 int fd) {
675 auto* muxer = muxer_;
676 auto session_id = session_id_;
677 std::shared_ptr<TraceConfig> trace_config(new TraceConfig(cfg));
678 if (fd >= 0) {
679 base::ignore_result(backend_type_); // For -Wunused in the amalgamation.
680 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
681 if (backend_type_ != kInProcessBackend) {
682 PERFETTO_FATAL(
683 "Passing a file descriptor to TracingSession::Setup() is only "
684 "supported with the kInProcessBackend on Windows. Use "
685 "TracingSession::ReadTrace() instead");
686 }
687 #endif
688 trace_config->set_write_into_file(true);
689 fd = dup(fd);
690 }
691 muxer->task_runner_->PostTask([muxer, session_id, trace_config, fd] {
692 muxer->SetupTracingSession(session_id, trace_config, base::ScopedFile(fd));
693 });
694 }
695
696 // Can be called from any thread.
Start()697 void TracingMuxerImpl::TracingSessionImpl::Start() {
698 auto* muxer = muxer_;
699 auto session_id = session_id_;
700 muxer->task_runner_->PostTask(
701 [muxer, session_id] { muxer->StartTracingSession(session_id); });
702 }
703
CloneTrace(CloneTraceArgs args,CloneTraceCallback cb)704 void TracingMuxerImpl::TracingSessionImpl::CloneTrace(CloneTraceArgs args,
705 CloneTraceCallback cb) {
706 auto* muxer = muxer_;
707 auto session_id = session_id_;
708 muxer->task_runner_->PostTask([muxer, session_id, args, cb] {
709 muxer->CloneTracingSession(session_id, args, std::move(cb));
710 });
711 }
712
713 // Can be called from any thread.
ChangeTraceConfig(const TraceConfig & cfg)714 void TracingMuxerImpl::TracingSessionImpl::ChangeTraceConfig(
715 const TraceConfig& cfg) {
716 auto* muxer = muxer_;
717 auto session_id = session_id_;
718 muxer->task_runner_->PostTask([muxer, session_id, cfg] {
719 muxer->ChangeTracingSessionConfig(session_id, cfg);
720 });
721 }
722
723 // Can be called from any thread except the service thread.
StartBlocking()724 void TracingMuxerImpl::TracingSessionImpl::StartBlocking() {
725 PERFETTO_DCHECK(!muxer_->task_runner_->RunsTasksOnCurrentThread());
726 auto* muxer = muxer_;
727 auto session_id = session_id_;
728 base::WaitableEvent tracing_started;
729 muxer->task_runner_->PostTask([muxer, session_id, &tracing_started] {
730 auto* consumer = muxer->FindConsumer(session_id);
731 if (!consumer) {
732 // TODO(skyostil): Signal an error to the user.
733 tracing_started.Notify();
734 return;
735 }
736 PERFETTO_DCHECK(!consumer->blocking_start_complete_callback_);
737 consumer->blocking_start_complete_callback_ = [&] {
738 tracing_started.Notify();
739 };
740 muxer->StartTracingSession(session_id);
741 });
742 tracing_started.Wait();
743 }
744
745 // Can be called from any thread.
Flush(std::function<void (bool)> user_callback,uint32_t timeout_ms)746 void TracingMuxerImpl::TracingSessionImpl::Flush(
747 std::function<void(bool)> user_callback,
748 uint32_t timeout_ms) {
749 auto* muxer = muxer_;
750 auto session_id = session_id_;
751 muxer->task_runner_->PostTask([muxer, session_id, timeout_ms, user_callback] {
752 auto* consumer = muxer->FindConsumer(session_id);
753 if (!consumer) {
754 std::move(user_callback)(false);
755 return;
756 }
757 muxer->FlushTracingSession(session_id, timeout_ms,
758 std::move(user_callback));
759 });
760 }
761
762 // Can be called from any thread.
Stop()763 void TracingMuxerImpl::TracingSessionImpl::Stop() {
764 auto* muxer = muxer_;
765 auto session_id = session_id_;
766 muxer->task_runner_->PostTask(
767 [muxer, session_id] { muxer->StopTracingSession(session_id); });
768 }
769
770 // Can be called from any thread except the service thread.
StopBlocking()771 void TracingMuxerImpl::TracingSessionImpl::StopBlocking() {
772 PERFETTO_DCHECK(!muxer_->task_runner_->RunsTasksOnCurrentThread());
773 auto* muxer = muxer_;
774 auto session_id = session_id_;
775 base::WaitableEvent tracing_stopped;
776 muxer->task_runner_->PostTask([muxer, session_id, &tracing_stopped] {
777 auto* consumer = muxer->FindConsumer(session_id);
778 if (!consumer) {
779 // TODO(skyostil): Signal an error to the user.
780 tracing_stopped.Notify();
781 return;
782 }
783 PERFETTO_DCHECK(!consumer->blocking_stop_complete_callback_);
784 consumer->blocking_stop_complete_callback_ = [&] {
785 tracing_stopped.Notify();
786 };
787 muxer->StopTracingSession(session_id);
788 });
789 tracing_stopped.Wait();
790 }
791
792 // Can be called from any thread.
ReadTrace(ReadTraceCallback cb)793 void TracingMuxerImpl::TracingSessionImpl::ReadTrace(ReadTraceCallback cb) {
794 auto* muxer = muxer_;
795 auto session_id = session_id_;
796 muxer->task_runner_->PostTask([muxer, session_id, cb] {
797 muxer->ReadTracingSessionData(session_id, std::move(cb));
798 });
799 }
800
801 // Can be called from any thread.
SetOnStartCallback(std::function<void ()> cb)802 void TracingMuxerImpl::TracingSessionImpl::SetOnStartCallback(
803 std::function<void()> cb) {
804 auto* muxer = muxer_;
805 auto session_id = session_id_;
806 muxer->task_runner_->PostTask([muxer, session_id, cb] {
807 auto* consumer = muxer->FindConsumer(session_id);
808 if (!consumer)
809 return;
810 consumer->start_complete_callback_ = cb;
811 });
812 }
813
814 // Can be called from any thread
SetOnErrorCallback(std::function<void (TracingError)> cb)815 void TracingMuxerImpl::TracingSessionImpl::SetOnErrorCallback(
816 std::function<void(TracingError)> cb) {
817 auto* muxer = muxer_;
818 auto session_id = session_id_;
819 muxer->task_runner_->PostTask([muxer, session_id, cb] {
820 auto* consumer = muxer->FindConsumer(session_id);
821 if (!consumer) {
822 // Notify the client about concurrent disconnection of the session.
823 if (cb)
824 cb(TracingError{TracingError::kDisconnected, "Peer disconnected"});
825 return;
826 }
827 consumer->error_callback_ = cb;
828 });
829 }
830
831 // Can be called from any thread.
SetOnStopCallback(std::function<void ()> cb)832 void TracingMuxerImpl::TracingSessionImpl::SetOnStopCallback(
833 std::function<void()> cb) {
834 auto* muxer = muxer_;
835 auto session_id = session_id_;
836 muxer->task_runner_->PostTask([muxer, session_id, cb] {
837 auto* consumer = muxer->FindConsumer(session_id);
838 if (!consumer)
839 return;
840 consumer->stop_complete_callback_ = cb;
841 });
842 }
843
844 // Can be called from any thread.
GetTraceStats(GetTraceStatsCallback cb)845 void TracingMuxerImpl::TracingSessionImpl::GetTraceStats(
846 GetTraceStatsCallback cb) {
847 auto* muxer = muxer_;
848 auto session_id = session_id_;
849 muxer->task_runner_->PostTask([muxer, session_id, cb] {
850 muxer->GetTraceStats(session_id, std::move(cb));
851 });
852 }
853
854 // Can be called from any thread.
QueryServiceState(QueryServiceStateCallback cb)855 void TracingMuxerImpl::TracingSessionImpl::QueryServiceState(
856 QueryServiceStateCallback cb) {
857 auto* muxer = muxer_;
858 auto session_id = session_id_;
859 muxer->task_runner_->PostTask([muxer, session_id, cb] {
860 muxer->QueryServiceState(session_id, std::move(cb));
861 });
862 }
863
864 // ----- End of TracingMuxerImpl::TracingSessionImpl
865
866 // ----- Begin of TracingMuxerImpl::StartupTracingSessionImpl
867
StartupTracingSessionImpl(TracingMuxerImpl * muxer,TracingSessionGlobalID session_id,BackendType backend_type)868 TracingMuxerImpl::StartupTracingSessionImpl::StartupTracingSessionImpl(
869 TracingMuxerImpl* muxer,
870 TracingSessionGlobalID session_id,
871 BackendType backend_type)
872 : muxer_(muxer), session_id_(session_id), backend_type_(backend_type) {}
873
874 // Can be destroyed from any thread.
875 TracingMuxerImpl::StartupTracingSessionImpl::~StartupTracingSessionImpl() =
876 default;
877
Abort()878 void TracingMuxerImpl::StartupTracingSessionImpl::Abort() {
879 auto* muxer = muxer_;
880 auto session_id = session_id_;
881 auto backend_type = backend_type_;
882 muxer->task_runner_->PostTask([muxer, session_id, backend_type] {
883 muxer->AbortStartupTracingSession(session_id, backend_type);
884 });
885 }
886
887 // Must not be called from the SDK's internal thread.
AbortBlocking()888 void TracingMuxerImpl::StartupTracingSessionImpl::AbortBlocking() {
889 auto* muxer = muxer_;
890 auto session_id = session_id_;
891 auto backend_type = backend_type_;
892 PERFETTO_CHECK(!muxer->task_runner_->RunsTasksOnCurrentThread());
893 base::WaitableEvent event;
894 muxer->task_runner_->PostTask([muxer, session_id, backend_type, &event] {
895 muxer->AbortStartupTracingSession(session_id, backend_type);
896 event.Notify();
897 });
898 event.Wait();
899 }
900
901 // ----- End of TracingMuxerImpl::StartupTracingSessionImpl
902
903 // static
904 TracingMuxer* TracingMuxer::instance_ = TracingMuxerFake::Get();
905
906 // This is called by perfetto::Tracing::Initialize().
907 // Can be called on any thread. Typically, but not necessarily, that will be
908 // the embedder's main thread.
TracingMuxerImpl(const TracingInitArgs & args)909 TracingMuxerImpl::TracingMuxerImpl(const TracingInitArgs& args)
910 : TracingMuxer(args.platform ? args.platform
911 : Platform::GetDefaultPlatform()) {
912 PERFETTO_DETACH_FROM_THREAD(thread_checker_);
913 instance_ = this;
914
915 // Create the thread where muxer, producers and service will live.
916 Platform::CreateTaskRunnerArgs tr_args{/*name_for_debugging=*/"TracingMuxer"};
917 task_runner_.reset(new NonReentrantTaskRunner(
918 this, platform_->CreateTaskRunner(std::move(tr_args))));
919
920 // Run the initializer on that thread.
921 task_runner_->PostTask([this, args] {
922 Initialize(args);
923 AddBackends(args);
924 });
925 }
926
Initialize(const TracingInitArgs & args)927 void TracingMuxerImpl::Initialize(const TracingInitArgs& args) {
928 PERFETTO_DCHECK_THREAD(thread_checker_); // Rebind the thread checker.
929
930 policy_ = args.tracing_policy;
931 supports_multiple_data_source_instances_ =
932 args.supports_multiple_data_source_instances;
933
934 // Fallback backend for producer creation for an unsupported backend type.
935 PERFETTO_CHECK(producer_backends_.empty());
936 AddProducerBackend(internal::TracingBackendFake::GetInstance(),
937 BackendType::kUnspecifiedBackend, args);
938 // Fallback backend for consumer creation for an unsupported backend type.
939 // This backend simply fails any attempt to start a tracing session.
940 PERFETTO_CHECK(consumer_backends_.empty());
941 AddConsumerBackend(internal::TracingBackendFake::GetInstance(),
942 BackendType::kUnspecifiedBackend);
943 }
944
AddConsumerBackend(TracingConsumerBackend * backend,BackendType type)945 void TracingMuxerImpl::AddConsumerBackend(TracingConsumerBackend* backend,
946 BackendType type) {
947 if (!backend) {
948 // We skip the log in release builds because the *_backend_fake.cc code
949 // has already an ELOG before returning a nullptr.
950 PERFETTO_DLOG("Consumer backend creation failed, type %d",
951 static_cast<int>(type));
952 return;
953 }
954 // Keep the backends sorted by type.
955 auto it =
956 std::upper_bound(consumer_backends_.begin(), consumer_backends_.end(),
957 type, CompareBackendByType<RegisteredConsumerBackend>());
958 it = consumer_backends_.emplace(it);
959
960 RegisteredConsumerBackend& rb = *it;
961 rb.backend = backend;
962 rb.type = type;
963 }
964
AddProducerBackend(TracingProducerBackend * backend,BackendType type,const TracingInitArgs & args)965 void TracingMuxerImpl::AddProducerBackend(TracingProducerBackend* backend,
966 BackendType type,
967 const TracingInitArgs& args) {
968 if (!backend) {
969 // We skip the log in release builds because the *_backend_fake.cc code
970 // has already an ELOG before returning a nullptr.
971 PERFETTO_DLOG("Producer backend creation failed, type %d",
972 static_cast<int>(type));
973 return;
974 }
975 TracingBackendId backend_id = producer_backends_.size();
976 // Keep the backends sorted by type.
977 auto it =
978 std::upper_bound(producer_backends_.begin(), producer_backends_.end(),
979 type, CompareBackendByType<RegisteredProducerBackend>());
980 it = producer_backends_.emplace(it);
981
982 RegisteredProducerBackend& rb = *it;
983 rb.backend = backend;
984 rb.id = backend_id;
985 rb.type = type;
986 rb.producer.reset(new ProducerImpl(this, backend_id,
987 args.shmem_batch_commits_duration_ms,
988 args.shmem_direct_patching_enabled));
989 rb.producer_conn_args.producer = rb.producer.get();
990 rb.producer_conn_args.producer_name = platform_->GetCurrentProcessName();
991 rb.producer_conn_args.task_runner = task_runner_.get();
992 rb.producer_conn_args.shmem_size_hint_bytes = args.shmem_size_hint_kb * 1024;
993 rb.producer_conn_args.shmem_page_size_hint_bytes =
994 args.shmem_page_size_hint_kb * 1024;
995 rb.producer_conn_args.create_socket_async = args.create_socket_async;
996 rb.producer->Initialize(rb.backend->ConnectProducer(rb.producer_conn_args));
997 }
998
999 TracingMuxerImpl::RegisteredProducerBackend*
FindProducerBackendById(TracingBackendId id)1000 TracingMuxerImpl::FindProducerBackendById(TracingBackendId id) {
1001 for (RegisteredProducerBackend& b : producer_backends_) {
1002 if (b.id == id) {
1003 return &b;
1004 }
1005 }
1006 return nullptr;
1007 }
1008
1009 TracingMuxerImpl::RegisteredProducerBackend*
FindProducerBackendByType(BackendType type)1010 TracingMuxerImpl::FindProducerBackendByType(BackendType type) {
1011 for (RegisteredProducerBackend& b : producer_backends_) {
1012 if (b.type == type) {
1013 return &b;
1014 }
1015 }
1016 return nullptr;
1017 }
1018
1019 TracingMuxerImpl::RegisteredConsumerBackend*
FindConsumerBackendByType(BackendType type)1020 TracingMuxerImpl::FindConsumerBackendByType(BackendType type) {
1021 for (RegisteredConsumerBackend& b : consumer_backends_) {
1022 if (b.type == type) {
1023 return &b;
1024 }
1025 }
1026 return nullptr;
1027 }
1028
AddBackends(const TracingInitArgs & args)1029 void TracingMuxerImpl::AddBackends(const TracingInitArgs& args) {
1030 if (args.backends & kSystemBackend) {
1031 PERFETTO_CHECK(args.system_producer_backend_factory_);
1032 if (FindProducerBackendByType(kSystemBackend) == nullptr) {
1033 AddProducerBackend(args.system_producer_backend_factory_(),
1034 kSystemBackend, args);
1035 }
1036 if (args.enable_system_consumer) {
1037 PERFETTO_CHECK(args.system_consumer_backend_factory_);
1038 if (FindConsumerBackendByType(kSystemBackend) == nullptr) {
1039 AddConsumerBackend(args.system_consumer_backend_factory_(),
1040 kSystemBackend);
1041 }
1042 }
1043 }
1044
1045 if (args.backends & kInProcessBackend) {
1046 TracingBackend* b = nullptr;
1047 if (FindProducerBackendByType(kInProcessBackend) == nullptr) {
1048 if (!b) {
1049 PERFETTO_CHECK(args.in_process_backend_factory_);
1050 b = args.in_process_backend_factory_();
1051 }
1052 AddProducerBackend(b, kInProcessBackend, args);
1053 }
1054 if (FindConsumerBackendByType(kInProcessBackend) == nullptr) {
1055 if (!b) {
1056 PERFETTO_CHECK(args.in_process_backend_factory_);
1057 b = args.in_process_backend_factory_();
1058 }
1059 AddConsumerBackend(b, kInProcessBackend);
1060 }
1061 }
1062
1063 if (args.backends & kCustomBackend) {
1064 PERFETTO_CHECK(args.custom_backend);
1065 if (FindProducerBackendByType(kCustomBackend) == nullptr) {
1066 AddProducerBackend(args.custom_backend, kCustomBackend, args);
1067 }
1068 if (FindConsumerBackendByType(kCustomBackend) == nullptr) {
1069 AddConsumerBackend(args.custom_backend, kCustomBackend);
1070 }
1071 }
1072
1073 if (args.backends & ~(kSystemBackend | kInProcessBackend | kCustomBackend)) {
1074 PERFETTO_FATAL("Unsupported tracing backend type");
1075 }
1076 }
1077
1078 // Can be called from any thread (but not concurrently).
RegisterDataSource(const DataSourceDescriptor & descriptor,DataSourceFactory factory,DataSourceParams params,bool no_flush,DataSourceStaticState * static_state)1079 bool TracingMuxerImpl::RegisterDataSource(
1080 const DataSourceDescriptor& descriptor,
1081 DataSourceFactory factory,
1082 DataSourceParams params,
1083 bool no_flush,
1084 DataSourceStaticState* static_state) {
1085 // Ignore repeated registrations.
1086 if (static_state->index != kMaxDataSources)
1087 return true;
1088
1089 uint32_t new_index = next_data_source_index_++;
1090 if (new_index >= kMaxDataSources) {
1091 PERFETTO_DLOG(
1092 "RegisterDataSource failed: too many data sources already registered");
1093 return false;
1094 }
1095
1096 // Initialize the static state.
1097 static_assert(sizeof(static_state->instances[0]) >= sizeof(DataSourceState),
1098 "instances[] size mismatch");
1099 for (size_t i = 0; i < static_state->instances.size(); i++)
1100 new (&static_state->instances[i]) DataSourceState{};
1101
1102 static_state->index = new_index;
1103
1104 // Generate a semi-unique id for this data source.
1105 base::Hasher hash;
1106 hash.Update(reinterpret_cast<intptr_t>(static_state));
1107 hash.Update(base::GetWallTimeNs().count());
1108 static_state->id = hash.digest() ? hash.digest() : 1;
1109
1110 task_runner_->PostTask([this, descriptor, factory, static_state, params,
1111 no_flush] {
1112 data_sources_.emplace_back();
1113 RegisteredDataSource& rds = data_sources_.back();
1114 rds.descriptor = descriptor;
1115 rds.factory = factory;
1116 rds.supports_multiple_instances =
1117 supports_multiple_data_source_instances_ &&
1118 params.supports_multiple_instances;
1119 rds.requires_callbacks_under_lock = params.requires_callbacks_under_lock;
1120 rds.static_state = static_state;
1121 rds.no_flush = no_flush;
1122
1123 UpdateDataSourceOnAllBackends(rds, /*is_changed=*/false);
1124 });
1125 return true;
1126 }
1127
1128 // Can be called from any thread (but not concurrently).
UpdateDataSourceDescriptor(const DataSourceDescriptor & descriptor,const DataSourceStaticState * static_state)1129 void TracingMuxerImpl::UpdateDataSourceDescriptor(
1130 const DataSourceDescriptor& descriptor,
1131 const DataSourceStaticState* static_state) {
1132 task_runner_->PostTask([this, descriptor, static_state] {
1133 for (auto& rds : data_sources_) {
1134 if (rds.static_state == static_state) {
1135 PERFETTO_CHECK(rds.descriptor.name() == descriptor.name());
1136 rds.descriptor = descriptor;
1137 rds.descriptor.set_id(static_state->id);
1138 UpdateDataSourceOnAllBackends(rds, /*is_changed=*/true);
1139 return;
1140 }
1141 }
1142 });
1143 }
1144
1145 // Can be called from any thread (but not concurrently).
RegisterInterceptor(const InterceptorDescriptor & descriptor,InterceptorFactory factory,InterceptorBase::TLSFactory tls_factory,InterceptorBase::TracePacketCallback packet_callback)1146 void TracingMuxerImpl::RegisterInterceptor(
1147 const InterceptorDescriptor& descriptor,
1148 InterceptorFactory factory,
1149 InterceptorBase::TLSFactory tls_factory,
1150 InterceptorBase::TracePacketCallback packet_callback) {
1151 task_runner_->PostTask([this, descriptor, factory, tls_factory,
1152 packet_callback] {
1153 // Ignore repeated registrations.
1154 for (const auto& interceptor : interceptors_) {
1155 if (interceptor.descriptor.name() == descriptor.name()) {
1156 PERFETTO_DCHECK(interceptor.tls_factory == tls_factory);
1157 PERFETTO_DCHECK(interceptor.packet_callback == packet_callback);
1158 return;
1159 }
1160 }
1161 // Only allow certain interceptors for now.
1162 if (descriptor.name() != "test_interceptor" &&
1163 descriptor.name() != "console" && descriptor.name() != "etwexport") {
1164 PERFETTO_ELOG(
1165 "Interceptors are experimental. If you want to use them, please "
1166 "get in touch with the project maintainers "
1167 "(https://perfetto.dev/docs/contributing/"
1168 "getting-started#community).");
1169 return;
1170 }
1171 interceptors_.emplace_back();
1172 RegisteredInterceptor& interceptor = interceptors_.back();
1173 interceptor.descriptor = descriptor;
1174 interceptor.factory = factory;
1175 interceptor.tls_factory = tls_factory;
1176 interceptor.packet_callback = packet_callback;
1177 });
1178 }
1179
ActivateTriggers(const std::vector<std::string> & triggers,uint32_t ttl_ms)1180 void TracingMuxerImpl::ActivateTriggers(
1181 const std::vector<std::string>& triggers,
1182 uint32_t ttl_ms) {
1183 base::TimeMillis expire_time =
1184 base::GetWallTimeMs() + base::TimeMillis(ttl_ms);
1185 task_runner_->PostTask([this, triggers, expire_time] {
1186 for (RegisteredProducerBackend& backend : producer_backends_) {
1187 if (backend.producer->connected_) {
1188 backend.producer->service_->ActivateTriggers(triggers);
1189 } else {
1190 for (const std::string& trigger : triggers) {
1191 backend.producer->on_connect_triggers_.emplace_back(trigger,
1192 expire_time);
1193 }
1194 }
1195 }
1196 });
1197 }
1198
1199 // Checks if there is any matching startup tracing data source instance for a
1200 // new SetupDataSource call. If so, moves the data source to this tracing
1201 // session (and its target buffer) and returns true, otherwise returns false.
MaybeAdoptStartupTracingInDataSource(TracingBackendId backend_id,uint32_t backend_connection_id,DataSourceInstanceID instance_id,const DataSourceConfig & cfg,const std::vector<RegisteredDataSource> & data_sources)1202 static bool MaybeAdoptStartupTracingInDataSource(
1203 TracingBackendId backend_id,
1204 uint32_t backend_connection_id,
1205 DataSourceInstanceID instance_id,
1206 const DataSourceConfig& cfg,
1207 const std::vector<RegisteredDataSource>& data_sources) {
1208 for (const auto& rds : data_sources) {
1209 DataSourceStaticState* static_state = rds.static_state;
1210 for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
1211 auto* internal_state = static_state->TryGet(i);
1212
1213 if (internal_state &&
1214 internal_state->startup_target_buffer_reservation.load(
1215 std::memory_order_relaxed) &&
1216 internal_state->data_source_instance_id == 0 &&
1217 internal_state->backend_id == backend_id &&
1218 internal_state->backend_connection_id == backend_connection_id &&
1219 internal_state->config &&
1220 internal_state->data_source->CanAdoptStartupSession(
1221 *internal_state->config, cfg)) {
1222 PERFETTO_DLOG("Setting up data source %" PRIu64
1223 " %s by adopting it from a startup tracing session",
1224 instance_id, cfg.name().c_str());
1225
1226 std::lock_guard<std::recursive_mutex> lock(internal_state->lock);
1227 // Set the associations. The actual takeover happens in
1228 // StartDataSource().
1229 internal_state->data_source_instance_id = instance_id;
1230 internal_state->buffer_id =
1231 static_cast<internal::BufferId>(cfg.target_buffer());
1232 internal_state->config.reset(new DataSourceConfig(cfg));
1233
1234 // TODO(eseckler): Should the data souce config provided by the service
1235 // be allowed to specify additional interceptors / additional data
1236 // source params?
1237
1238 return true;
1239 }
1240 }
1241 }
1242 return false;
1243 }
1244
1245 // Called by the service of one of the backends.
SetupDataSource(TracingBackendId backend_id,uint32_t backend_connection_id,DataSourceInstanceID instance_id,const DataSourceConfig & cfg)1246 void TracingMuxerImpl::SetupDataSource(TracingBackendId backend_id,
1247 uint32_t backend_connection_id,
1248 DataSourceInstanceID instance_id,
1249 const DataSourceConfig& cfg) {
1250 PERFETTO_DLOG("Setting up data source %" PRIu64 " %s", instance_id,
1251 cfg.name().c_str());
1252 PERFETTO_DCHECK_THREAD(thread_checker_);
1253
1254 // First check if there is any matching startup tracing data source instance.
1255 if (MaybeAdoptStartupTracingInDataSource(backend_id, backend_connection_id,
1256 instance_id, cfg, data_sources_)) {
1257 return;
1258 }
1259
1260 for (const auto& rds : data_sources_) {
1261 if (rds.descriptor.name() != cfg.name())
1262 continue;
1263 DataSourceStaticState& static_state = *rds.static_state;
1264
1265 // If this data source is already active for this exact config, don't start
1266 // another instance. This happens when we have several data sources with the
1267 // same name, in which case the service sends one SetupDataSource event for
1268 // each one. Since we can't map which event maps to which data source, we
1269 // ensure each event only starts one data source instance.
1270 // TODO(skyostil): Register a unique id with each data source to the service
1271 // to disambiguate.
1272 bool active_for_config = false;
1273 for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
1274 if (!static_state.TryGet(i))
1275 continue;
1276 auto* internal_state =
1277 reinterpret_cast<DataSourceState*>(&static_state.instances[i]);
1278 if (internal_state->backend_id == backend_id &&
1279 internal_state->backend_connection_id == backend_connection_id &&
1280 internal_state->config && *internal_state->config == cfg) {
1281 active_for_config = true;
1282 break;
1283 }
1284 }
1285 if (active_for_config) {
1286 PERFETTO_DLOG(
1287 "Data source %s is already active with this config, skipping",
1288 cfg.name().c_str());
1289 continue;
1290 }
1291
1292 SetupDataSourceImpl(rds, backend_id, backend_connection_id, instance_id,
1293 cfg, /*startup_session_id=*/0);
1294 return;
1295 }
1296 }
1297
SetupDataSourceImpl(const RegisteredDataSource & rds,TracingBackendId backend_id,uint32_t backend_connection_id,DataSourceInstanceID instance_id,const DataSourceConfig & cfg,TracingSessionGlobalID startup_session_id)1298 TracingMuxerImpl::FindDataSourceRes TracingMuxerImpl::SetupDataSourceImpl(
1299 const RegisteredDataSource& rds,
1300 TracingBackendId backend_id,
1301 uint32_t backend_connection_id,
1302 DataSourceInstanceID instance_id,
1303 const DataSourceConfig& cfg,
1304 TracingSessionGlobalID startup_session_id) {
1305 PERFETTO_DCHECK_THREAD(thread_checker_);
1306 DataSourceStaticState& static_state = *rds.static_state;
1307
1308 // If any bit is set in `static_state.valid_instances` then at least one
1309 // other instance of data source is running.
1310 if (!rds.supports_multiple_instances &&
1311 static_state.valid_instances.load(std::memory_order_acquire) != 0) {
1312 PERFETTO_ELOG(
1313 "Failed to setup data source because some another instance of this "
1314 "data source is already active");
1315 return FindDataSourceRes();
1316 }
1317
1318 for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
1319 // Find a free slot.
1320 if (static_state.TryGet(i))
1321 continue;
1322
1323 auto* internal_state =
1324 reinterpret_cast<DataSourceState*>(&static_state.instances[i]);
1325 std::unique_lock<std::recursive_mutex> lock(internal_state->lock);
1326 static_assert(
1327 std::is_same<decltype(internal_state->data_source_instance_id),
1328 DataSourceInstanceID>::value,
1329 "data_source_instance_id type mismatch");
1330 internal_state->muxer_id_for_testing = muxer_id_for_testing_;
1331 RegisteredProducerBackend& backend = *FindProducerBackendById(backend_id);
1332
1333 if (startup_session_id) {
1334 uint16_t& last_reservation =
1335 backend.producer->last_startup_target_buffer_reservation_;
1336 if (last_reservation == std::numeric_limits<uint16_t>::max()) {
1337 PERFETTO_ELOG(
1338 "Startup buffer reservations exhausted, dropping data source");
1339 return FindDataSourceRes();
1340 }
1341 internal_state->startup_target_buffer_reservation.store(
1342 ++last_reservation, std::memory_order_relaxed);
1343 } else {
1344 internal_state->startup_target_buffer_reservation.store(
1345 0, std::memory_order_relaxed);
1346 }
1347
1348 internal_state->backend_id = backend_id;
1349 internal_state->backend_connection_id = backend_connection_id;
1350 internal_state->data_source_instance_id = instance_id;
1351 internal_state->buffer_id =
1352 static_cast<internal::BufferId>(cfg.target_buffer());
1353 internal_state->config.reset(new DataSourceConfig(cfg));
1354 internal_state->startup_session_id = startup_session_id;
1355 internal_state->data_source = rds.factory();
1356 internal_state->interceptor = nullptr;
1357 internal_state->interceptor_id = 0;
1358 internal_state->will_notify_on_stop = rds.descriptor.will_notify_on_stop();
1359
1360 if (cfg.has_interceptor_config()) {
1361 for (size_t j = 0; j < interceptors_.size(); j++) {
1362 if (cfg.interceptor_config().name() ==
1363 interceptors_[j].descriptor.name()) {
1364 PERFETTO_DLOG("Intercepting data source %" PRIu64
1365 " \"%s\" into \"%s\"",
1366 instance_id, cfg.name().c_str(),
1367 cfg.interceptor_config().name().c_str());
1368 internal_state->interceptor_id = static_cast<uint32_t>(j + 1);
1369 internal_state->interceptor = interceptors_[j].factory();
1370 internal_state->interceptor->OnSetup({cfg});
1371 break;
1372 }
1373 }
1374 if (!internal_state->interceptor_id) {
1375 PERFETTO_ELOG("Unknown interceptor configured for data source: %s",
1376 cfg.interceptor_config().name().c_str());
1377 }
1378 }
1379
1380 // This must be made at the end. See matching acquire-load in
1381 // DataSource::Trace().
1382 static_state.valid_instances.fetch_or(1 << i, std::memory_order_release);
1383
1384 DataSourceBase::SetupArgs setup_args;
1385 setup_args.config = &cfg;
1386 setup_args.backend_type = backend.type;
1387 setup_args.internal_instance_index = i;
1388
1389 if (!rds.requires_callbacks_under_lock)
1390 lock.unlock();
1391 internal_state->data_source->OnSetup(setup_args);
1392
1393 return FindDataSourceRes(&static_state, internal_state, i,
1394 rds.requires_callbacks_under_lock);
1395 }
1396 PERFETTO_ELOG(
1397 "Maximum number of data source instances exhausted. "
1398 "Dropping data source %" PRIu64,
1399 instance_id);
1400 return FindDataSourceRes();
1401 }
1402
1403 // Called by the service of one of the backends.
StartDataSource(TracingBackendId backend_id,DataSourceInstanceID instance_id)1404 void TracingMuxerImpl::StartDataSource(TracingBackendId backend_id,
1405 DataSourceInstanceID instance_id) {
1406 PERFETTO_DLOG("Starting data source %" PRIu64, instance_id);
1407 PERFETTO_DCHECK_THREAD(thread_checker_);
1408
1409 auto ds = FindDataSource(backend_id, instance_id);
1410 if (!ds) {
1411 PERFETTO_ELOG("Could not find data source to start");
1412 return;
1413 }
1414
1415 // Check if the data source was already started for startup tracing.
1416 uint16_t startup_reservation =
1417 ds.internal_state->startup_target_buffer_reservation.load(
1418 std::memory_order_relaxed);
1419 if (startup_reservation) {
1420 RegisteredProducerBackend& backend = *FindProducerBackendById(backend_id);
1421 TracingSessionGlobalID session_id = ds.internal_state->startup_session_id;
1422 auto session_it = std::find_if(
1423 backend.startup_sessions.begin(), backend.startup_sessions.end(),
1424 [session_id](const RegisteredStartupSession& session) {
1425 return session.session_id == session_id;
1426 });
1427 PERFETTO_DCHECK(session_it != backend.startup_sessions.end());
1428
1429 if (session_it->is_aborting) {
1430 PERFETTO_DLOG("Data source %" PRIu64
1431 " was already aborted for startup tracing, not starting it",
1432 instance_id);
1433 return;
1434 }
1435
1436 PERFETTO_DLOG(
1437 "Data source %" PRIu64
1438 " was already started for startup tracing, binding its target buffer",
1439 instance_id);
1440
1441 backend.producer->service_->MaybeSharedMemoryArbiter()
1442 ->BindStartupTargetBuffer(startup_reservation,
1443 ds.internal_state->buffer_id);
1444
1445 // The reservation ID can be used even after binding it, so there's no need
1446 // for any barriers here - we just need atomicity.
1447 ds.internal_state->startup_target_buffer_reservation.store(
1448 0, std::memory_order_relaxed);
1449
1450 // TODO(eseckler): Should we reset incremental state at this point, or
1451 // notify the data source some other way?
1452
1453 // The session should not have been fully bound yet (or aborted).
1454 PERFETTO_DCHECK(session_it->num_unbound_data_sources > 0);
1455
1456 session_it->num_unbound_data_sources--;
1457 if (session_it->num_unbound_data_sources == 0) {
1458 if (session_it->on_adopted)
1459 task_runner_->PostTask(session_it->on_adopted);
1460 backend.startup_sessions.erase(session_it);
1461 }
1462 return;
1463 }
1464
1465 StartDataSourceImpl(ds);
1466 }
1467
StartDataSourceImpl(const FindDataSourceRes & ds)1468 void TracingMuxerImpl::StartDataSourceImpl(const FindDataSourceRes& ds) {
1469 PERFETTO_DCHECK_THREAD(thread_checker_);
1470
1471 DataSourceBase::StartArgs start_args{};
1472 start_args.internal_instance_index = ds.instance_idx;
1473
1474 std::unique_lock<std::recursive_mutex> lock(ds.internal_state->lock);
1475 if (ds.internal_state->interceptor)
1476 ds.internal_state->interceptor->OnStart({});
1477 ds.internal_state->trace_lambda_enabled.store(true,
1478 std::memory_order_relaxed);
1479 PERFETTO_DCHECK(ds.internal_state->data_source != nullptr);
1480
1481 if (!ds.requires_callbacks_under_lock)
1482 lock.unlock();
1483 ds.internal_state->data_source->OnStart(start_args);
1484 }
1485
1486 // Called by the service of one of the backends.
StopDataSource_AsyncBegin(TracingBackendId backend_id,DataSourceInstanceID instance_id)1487 void TracingMuxerImpl::StopDataSource_AsyncBegin(
1488 TracingBackendId backend_id,
1489 DataSourceInstanceID instance_id) {
1490 PERFETTO_DLOG("Stopping data source %" PRIu64, instance_id);
1491 PERFETTO_DCHECK_THREAD(thread_checker_);
1492
1493 auto ds = FindDataSource(backend_id, instance_id);
1494 if (!ds) {
1495 PERFETTO_ELOG("Could not find data source to stop");
1496 return;
1497 }
1498
1499 StopDataSource_AsyncBeginImpl(ds);
1500 }
1501
StopDataSource_AsyncBeginImpl(const FindDataSourceRes & ds)1502 void TracingMuxerImpl::StopDataSource_AsyncBeginImpl(
1503 const FindDataSourceRes& ds) {
1504 TracingBackendId backend_id = ds.internal_state->backend_id;
1505 uint32_t backend_connection_id = ds.internal_state->backend_connection_id;
1506 DataSourceInstanceID instance_id = ds.internal_state->data_source_instance_id;
1507
1508 StopArgsImpl stop_args{};
1509 stop_args.internal_instance_index = ds.instance_idx;
1510 stop_args.async_stop_closure = [this, backend_id, backend_connection_id,
1511 instance_id, ds] {
1512 // TracingMuxerImpl is long lived, capturing |this| is okay.
1513 // The notification closure can be moved out of the StopArgs by the
1514 // embedder to handle stop asynchronously. The embedder might then
1515 // call the closure on a different thread than the current one, hence
1516 // this nested PostTask().
1517 task_runner_->PostTask(
1518 [this, backend_id, backend_connection_id, instance_id, ds] {
1519 StopDataSource_AsyncEnd(backend_id, backend_connection_id,
1520 instance_id, ds);
1521 });
1522 };
1523
1524 {
1525 std::unique_lock<std::recursive_mutex> lock(ds.internal_state->lock);
1526
1527 // Don't call OnStop again if the datasource is already stopping.
1528 if (ds.internal_state->async_stop_in_progress)
1529 return;
1530 ds.internal_state->async_stop_in_progress = true;
1531
1532 if (ds.internal_state->interceptor)
1533 ds.internal_state->interceptor->OnStop({});
1534
1535 if (!ds.requires_callbacks_under_lock)
1536 lock.unlock();
1537 ds.internal_state->data_source->OnStop(stop_args);
1538 }
1539
1540 // If the embedder hasn't called StopArgs.HandleStopAsynchronously() run the
1541 // async closure here. In theory we could avoid the PostTask and call
1542 // straight into CompleteDataSourceAsyncStop(). We keep that to reduce
1543 // divergencies between the deferred-stop vs non-deferred-stop code paths.
1544 if (stop_args.async_stop_closure)
1545 std::move(stop_args.async_stop_closure)();
1546 }
1547
StopDataSource_AsyncEnd(TracingBackendId backend_id,uint32_t backend_connection_id,DataSourceInstanceID instance_id,const FindDataSourceRes & ds)1548 void TracingMuxerImpl::StopDataSource_AsyncEnd(TracingBackendId backend_id,
1549 uint32_t backend_connection_id,
1550 DataSourceInstanceID instance_id,
1551 const FindDataSourceRes& ds) {
1552 PERFETTO_DLOG("Ending async stop of data source %" PRIu64, instance_id);
1553 PERFETTO_DCHECK_THREAD(thread_checker_);
1554
1555 // Check that the data source instance is still active and was not modified
1556 // while it was being stopped.
1557 if (!ds.static_state->TryGet(ds.instance_idx) ||
1558 ds.internal_state->backend_id != backend_id ||
1559 ds.internal_state->backend_connection_id != backend_connection_id ||
1560 ds.internal_state->data_source_instance_id != instance_id) {
1561 PERFETTO_ELOG(
1562 "Async stop of data source %" PRIu64
1563 " failed. This might be due to calling the async_stop_closure twice.",
1564 instance_id);
1565 return;
1566 }
1567
1568 const uint32_t mask = ~(1 << ds.instance_idx);
1569 ds.static_state->valid_instances.fetch_and(mask, std::memory_order_acq_rel);
1570
1571 bool will_notify_on_stop;
1572 // Take the mutex to prevent that the data source is in the middle of
1573 // a Trace() execution where it called GetDataSourceLocked() while we
1574 // destroy it.
1575 uint16_t startup_buffer_reservation;
1576 TracingSessionGlobalID startup_session_id;
1577 {
1578 std::lock_guard<std::recursive_mutex> guard(ds.internal_state->lock);
1579 ds.internal_state->trace_lambda_enabled.store(false,
1580 std::memory_order_relaxed);
1581 ds.internal_state->data_source.reset();
1582 ds.internal_state->interceptor.reset();
1583 ds.internal_state->config.reset();
1584 ds.internal_state->async_stop_in_progress = false;
1585 will_notify_on_stop = ds.internal_state->will_notify_on_stop;
1586 startup_buffer_reservation =
1587 ds.internal_state->startup_target_buffer_reservation.load(
1588 std::memory_order_relaxed);
1589 startup_session_id = ds.internal_state->startup_session_id;
1590 }
1591
1592 // The other fields of internal_state are deliberately *not* cleared.
1593 // See races-related comments of DataSource::Trace().
1594
1595 TracingMuxer::generation_++;
1596
1597 // |producer_backends_| is append-only, Backend instances are always valid.
1598 PERFETTO_CHECK(backend_id < producer_backends_.size());
1599 RegisteredProducerBackend& backend = *FindProducerBackendById(backend_id);
1600 ProducerImpl* producer = backend.producer.get();
1601 if (!producer)
1602 return;
1603
1604 // If the data source instance still has a startup buffer reservation, it was
1605 // only active for startup tracing and never started by the service. Discard
1606 // the startup buffer reservation.
1607 if (startup_buffer_reservation) {
1608 PERFETTO_DCHECK(startup_session_id);
1609
1610 if (producer->service_ && producer->service_->MaybeSharedMemoryArbiter()) {
1611 producer->service_->MaybeSharedMemoryArbiter()
1612 ->AbortStartupTracingForReservation(startup_buffer_reservation);
1613 }
1614
1615 auto session_it = std::find_if(
1616 backend.startup_sessions.begin(), backend.startup_sessions.end(),
1617 [startup_session_id](const RegisteredStartupSession& session) {
1618 return session.session_id == startup_session_id;
1619 });
1620
1621 // Session should not be removed until abortion of all data source instances
1622 // is complete.
1623 PERFETTO_DCHECK(session_it != backend.startup_sessions.end());
1624
1625 session_it->num_aborting_data_sources--;
1626 if (session_it->num_aborting_data_sources == 0) {
1627 if (session_it->on_aborted)
1628 task_runner_->PostTask(session_it->on_aborted);
1629
1630 backend.startup_sessions.erase(session_it);
1631 }
1632 }
1633
1634 if (producer->connected_ &&
1635 backend.producer->connection_id_.load(std::memory_order_relaxed) ==
1636 backend_connection_id) {
1637 // Flush any commits that might have been batched by SharedMemoryArbiter.
1638 producer->service_->MaybeSharedMemoryArbiter()
1639 ->FlushPendingCommitDataRequests();
1640 if (instance_id && will_notify_on_stop)
1641 producer->service_->NotifyDataSourceStopped(instance_id);
1642 }
1643 producer->SweepDeadServices();
1644 }
1645
ClearDataSourceIncrementalState(TracingBackendId backend_id,DataSourceInstanceID instance_id)1646 void TracingMuxerImpl::ClearDataSourceIncrementalState(
1647 TracingBackendId backend_id,
1648 DataSourceInstanceID instance_id) {
1649 PERFETTO_DCHECK_THREAD(thread_checker_);
1650 PERFETTO_DLOG("Clearing incremental state for data source %" PRIu64,
1651 instance_id);
1652 auto ds = FindDataSource(backend_id, instance_id);
1653 if (!ds) {
1654 PERFETTO_ELOG("Could not find data source to clear incremental state for");
1655 return;
1656 }
1657
1658 DataSourceBase::ClearIncrementalStateArgs clear_incremental_state_args;
1659 clear_incremental_state_args.internal_instance_index = ds.instance_idx;
1660 {
1661 std::unique_lock<std::recursive_mutex> lock;
1662 if (ds.requires_callbacks_under_lock)
1663 lock = std::unique_lock<std::recursive_mutex>(ds.internal_state->lock);
1664 ds.internal_state->data_source->WillClearIncrementalState(
1665 clear_incremental_state_args);
1666 }
1667
1668 // Make DataSource::TraceContext::GetIncrementalState() eventually notice that
1669 // the incremental state should be cleared.
1670 ds.static_state->GetUnsafe(ds.instance_idx)
1671 ->incremental_state_generation.fetch_add(1, std::memory_order_relaxed);
1672 }
1673
FlushDataSource_AsyncBegin(TracingBackendId backend_id,DataSourceInstanceID instance_id,FlushRequestID flush_id,FlushFlags flush_flags)1674 bool TracingMuxerImpl::FlushDataSource_AsyncBegin(
1675 TracingBackendId backend_id,
1676 DataSourceInstanceID instance_id,
1677 FlushRequestID flush_id,
1678 FlushFlags flush_flags) {
1679 PERFETTO_DLOG("Flushing data source %" PRIu64, instance_id);
1680 auto ds = FindDataSource(backend_id, instance_id);
1681 if (!ds) {
1682 PERFETTO_ELOG("Could not find data source to flush");
1683 return true;
1684 }
1685
1686 uint32_t backend_connection_id = ds.internal_state->backend_connection_id;
1687
1688 FlushArgsImpl flush_args;
1689 flush_args.flush_flags = flush_flags;
1690 flush_args.internal_instance_index = ds.instance_idx;
1691 flush_args.async_flush_closure = [this, backend_id, backend_connection_id,
1692 instance_id, ds, flush_id] {
1693 // TracingMuxerImpl is long lived, capturing |this| is okay.
1694 // The notification closure can be moved out of the StopArgs by the
1695 // embedder to handle stop asynchronously. The embedder might then
1696 // call the closure on a different thread than the current one, hence
1697 // this nested PostTask().
1698 task_runner_->PostTask(
1699 [this, backend_id, backend_connection_id, instance_id, ds, flush_id] {
1700 FlushDataSource_AsyncEnd(backend_id, backend_connection_id,
1701 instance_id, ds, flush_id);
1702 });
1703 };
1704 {
1705 std::unique_lock<std::recursive_mutex> lock;
1706 if (ds.requires_callbacks_under_lock)
1707 lock = std::unique_lock<std::recursive_mutex>(ds.internal_state->lock);
1708 ds.internal_state->data_source->OnFlush(flush_args);
1709 }
1710
1711 // |async_flush_closure| is moved out of |flush_args| if the producer
1712 // requested to handle the flush asynchronously.
1713 bool handled = static_cast<bool>(flush_args.async_flush_closure);
1714 return handled;
1715 }
1716
FlushDataSource_AsyncEnd(TracingBackendId backend_id,uint32_t backend_connection_id,DataSourceInstanceID instance_id,const FindDataSourceRes & ds,FlushRequestID flush_id)1717 void TracingMuxerImpl::FlushDataSource_AsyncEnd(
1718 TracingBackendId backend_id,
1719 uint32_t backend_connection_id,
1720 DataSourceInstanceID instance_id,
1721 const FindDataSourceRes& ds,
1722 FlushRequestID flush_id) {
1723 PERFETTO_DLOG("Ending async flush of data source %" PRIu64, instance_id);
1724 PERFETTO_DCHECK_THREAD(thread_checker_);
1725
1726 // Check that the data source instance is still active and was not modified
1727 // while it was being flushed.
1728 if (!ds.static_state->TryGet(ds.instance_idx) ||
1729 ds.internal_state->backend_id != backend_id ||
1730 ds.internal_state->backend_connection_id != backend_connection_id ||
1731 ds.internal_state->data_source_instance_id != instance_id) {
1732 PERFETTO_ELOG("Async flush of data source %" PRIu64
1733 " failed. This might be due to the data source being stopped "
1734 "in the meantime",
1735 instance_id);
1736 return;
1737 }
1738
1739 // |producer_backends_| is append-only, Backend instances are always valid.
1740 PERFETTO_CHECK(backend_id < producer_backends_.size());
1741 RegisteredProducerBackend& backend = *FindProducerBackendById(backend_id);
1742
1743 ProducerImpl* producer = backend.producer.get();
1744 if (!producer)
1745 return;
1746
1747 // If the tracing service disconnects and reconnects while a data source is
1748 // handling a flush request, there's no point is sending the flush reply to
1749 // the newly reconnected producer.
1750 if (producer->connected_ &&
1751 backend.producer->connection_id_.load(std::memory_order_relaxed) ==
1752 backend_connection_id) {
1753 producer->NotifyFlushForDataSourceDone(instance_id, flush_id);
1754 }
1755 }
1756
SyncProducersForTesting()1757 void TracingMuxerImpl::SyncProducersForTesting() {
1758 std::mutex mutex;
1759 std::condition_variable cv;
1760
1761 // IPC-based producers don't report connection errors explicitly for each
1762 // command, but instead with an asynchronous callback
1763 // (ProducerImpl::OnDisconnected). This means that the sync command below
1764 // may have completed but failed to reach the service because of a
1765 // disconnection, but we can't tell until the disconnection message comes
1766 // through. To guard against this, we run two whole rounds of sync round-trips
1767 // before returning; the first one will detect any disconnected producers and
1768 // the second one will ensure any reconnections have completed and all data
1769 // sources are registered in the service again.
1770 for (size_t i = 0; i < 2; i++) {
1771 size_t countdown = std::numeric_limits<size_t>::max();
1772 task_runner_->PostTask([this, &mutex, &cv, &countdown] {
1773 {
1774 std::unique_lock<std::mutex> countdown_lock(mutex);
1775 countdown = producer_backends_.size();
1776 }
1777 for (auto& backend : producer_backends_) {
1778 auto* producer = backend.producer.get();
1779 producer->service_->Sync([&mutex, &cv, &countdown] {
1780 std::unique_lock<std::mutex> countdown_lock(mutex);
1781 countdown--;
1782 cv.notify_one();
1783 });
1784 }
1785 });
1786
1787 {
1788 std::unique_lock<std::mutex> countdown_lock(mutex);
1789 cv.wait(countdown_lock, [&countdown] { return !countdown; });
1790 }
1791 }
1792
1793 // Check that all producers are indeed connected.
1794 bool done = false;
1795 bool all_producers_connected = true;
1796 task_runner_->PostTask([this, &mutex, &cv, &done, &all_producers_connected] {
1797 for (auto& backend : producer_backends_)
1798 all_producers_connected &= backend.producer->connected_;
1799 std::unique_lock<std::mutex> lock(mutex);
1800 done = true;
1801 cv.notify_one();
1802 });
1803
1804 {
1805 std::unique_lock<std::mutex> lock(mutex);
1806 cv.wait(lock, [&done] { return done; });
1807 }
1808 PERFETTO_DCHECK(all_producers_connected);
1809 }
1810
DestroyStoppedTraceWritersForCurrentThread()1811 void TracingMuxerImpl::DestroyStoppedTraceWritersForCurrentThread() {
1812 // Iterate across all possible data source types.
1813 auto cur_generation = generation_.load(std::memory_order_acquire);
1814 auto* root_tls = GetOrCreateTracingTLS();
1815
1816 auto destroy_stopped_instances = [](DataSourceThreadLocalState& tls) {
1817 // |tls| has a vector of per-data-source-instance thread-local state.
1818 DataSourceStaticState* static_state = tls.static_state;
1819 if (!static_state)
1820 return; // Slot not used.
1821
1822 // Iterate across all possible instances for this data source.
1823 for (uint32_t inst = 0; inst < kMaxDataSourceInstances; inst++) {
1824 DataSourceInstanceThreadLocalState& ds_tls = tls.per_instance[inst];
1825 if (!ds_tls.trace_writer)
1826 continue;
1827
1828 DataSourceState* ds_state = static_state->TryGet(inst);
1829 if (ds_state &&
1830 ds_state->muxer_id_for_testing == ds_tls.muxer_id_for_testing &&
1831 ds_state->backend_id == ds_tls.backend_id &&
1832 ds_state->backend_connection_id == ds_tls.backend_connection_id &&
1833 ds_state->startup_target_buffer_reservation.load(
1834 std::memory_order_relaxed) ==
1835 ds_tls.startup_target_buffer_reservation &&
1836 ds_state->buffer_id == ds_tls.buffer_id &&
1837 ds_state->data_source_instance_id == ds_tls.data_source_instance_id) {
1838 continue;
1839 }
1840
1841 // The DataSource instance has been destroyed or recycled.
1842 ds_tls.Reset(); // Will also destroy the |ds_tls.trace_writer|.
1843 }
1844 };
1845
1846 for (size_t ds_idx = 0; ds_idx < kMaxDataSources; ds_idx++) {
1847 // |tls| has a vector of per-data-source-instance thread-local state.
1848 DataSourceThreadLocalState& tls = root_tls->data_sources_tls[ds_idx];
1849 destroy_stopped_instances(tls);
1850 }
1851 destroy_stopped_instances(root_tls->track_event_tls);
1852 root_tls->generation = cur_generation;
1853 }
1854
1855 // Called both when a new data source is registered or when a new backend
1856 // connects. In both cases we want to be sure we reflected the data source
1857 // registrations on the backends.
UpdateDataSourcesOnAllBackends()1858 void TracingMuxerImpl::UpdateDataSourcesOnAllBackends() {
1859 PERFETTO_DCHECK_THREAD(thread_checker_);
1860 for (RegisteredDataSource& rds : data_sources_) {
1861 UpdateDataSourceOnAllBackends(rds, /*is_changed=*/false);
1862 }
1863 }
1864
UpdateDataSourceOnAllBackends(RegisteredDataSource & rds,bool is_changed)1865 void TracingMuxerImpl::UpdateDataSourceOnAllBackends(RegisteredDataSource& rds,
1866 bool is_changed) {
1867 PERFETTO_DCHECK_THREAD(thread_checker_);
1868 for (RegisteredProducerBackend& backend : producer_backends_) {
1869 // We cannot call RegisterDataSource on the backend before it connects.
1870 if (!backend.producer->connected_)
1871 continue;
1872
1873 PERFETTO_DCHECK(rds.static_state->index < kMaxDataSources);
1874 bool is_registered = backend.producer->registered_data_sources_.test(
1875 rds.static_state->index);
1876 if (is_registered && !is_changed)
1877 continue;
1878
1879 if (!rds.descriptor.no_flush()) {
1880 rds.descriptor.set_no_flush(rds.no_flush);
1881 }
1882 rds.descriptor.set_will_notify_on_start(true);
1883 if (!rds.descriptor.has_will_notify_on_stop()) {
1884 rds.descriptor.set_will_notify_on_stop(true);
1885 }
1886
1887 rds.descriptor.set_handles_incremental_state_clear(true);
1888 rds.descriptor.set_id(rds.static_state->id);
1889 if (is_registered) {
1890 backend.producer->service_->UpdateDataSource(rds.descriptor);
1891 } else {
1892 backend.producer->service_->RegisterDataSource(rds.descriptor);
1893 }
1894 backend.producer->registered_data_sources_.set(rds.static_state->index);
1895 }
1896 }
1897
SetupTracingSession(TracingSessionGlobalID session_id,const std::shared_ptr<TraceConfig> & trace_config,base::ScopedFile trace_fd)1898 void TracingMuxerImpl::SetupTracingSession(
1899 TracingSessionGlobalID session_id,
1900 const std::shared_ptr<TraceConfig>& trace_config,
1901 base::ScopedFile trace_fd) {
1902 PERFETTO_DCHECK_THREAD(thread_checker_);
1903 PERFETTO_CHECK(!trace_fd || trace_config->write_into_file());
1904
1905 auto* consumer = FindConsumer(session_id);
1906 if (!consumer)
1907 return;
1908
1909 consumer->trace_config_ = trace_config;
1910 if (trace_fd)
1911 consumer->trace_fd_ = std::move(trace_fd);
1912
1913 if (!consumer->connected_)
1914 return;
1915
1916 // Only used in the deferred start mode.
1917 if (trace_config->deferred_start()) {
1918 consumer->service_->EnableTracing(*trace_config,
1919 std::move(consumer->trace_fd_));
1920 }
1921 }
1922
StartTracingSession(TracingSessionGlobalID session_id)1923 void TracingMuxerImpl::StartTracingSession(TracingSessionGlobalID session_id) {
1924 PERFETTO_DCHECK_THREAD(thread_checker_);
1925
1926 auto* consumer = FindConsumer(session_id);
1927
1928 if (!consumer)
1929 return;
1930
1931 if (!consumer->trace_config_) {
1932 PERFETTO_ELOG("Must call Setup(config) first");
1933 return;
1934 }
1935
1936 if (!consumer->connected_) {
1937 consumer->start_pending_ = true;
1938 return;
1939 }
1940
1941 consumer->start_pending_ = false;
1942 if (consumer->trace_config_->deferred_start()) {
1943 consumer->service_->StartTracing();
1944 } else {
1945 consumer->service_->EnableTracing(*consumer->trace_config_,
1946 std::move(consumer->trace_fd_));
1947 }
1948
1949 // TODO implement support for the deferred-start + fast-triggering case.
1950 }
1951
CloneTracingSession(TracingSessionGlobalID session_id,TracingSession::CloneTraceArgs args,TracingSession::CloneTraceCallback callback)1952 void TracingMuxerImpl::CloneTracingSession(
1953 TracingSessionGlobalID session_id,
1954 TracingSession::CloneTraceArgs args,
1955 TracingSession::CloneTraceCallback callback) {
1956 PERFETTO_DCHECK_THREAD(thread_checker_);
1957 auto* consumer = FindConsumer(session_id);
1958 if (!consumer) {
1959 TracingSession::CloneTraceCallbackArgs callback_arg{};
1960 callback_arg.success = false;
1961 callback_arg.error = "Tracing session not found";
1962 callback(callback_arg);
1963 return;
1964 }
1965 // Multiple concurrent cloning isn't supported.
1966 PERFETTO_DCHECK(!consumer->clone_trace_callback_);
1967 consumer->clone_trace_callback_ = std::move(callback);
1968 ConsumerEndpoint::CloneSessionArgs consumer_args{};
1969 consumer_args.unique_session_name = args.unique_session_name;
1970 if (!consumer->connected_) {
1971 consumer->session_to_clone_ = std::move(consumer_args);
1972 return;
1973 }
1974 consumer->session_to_clone_ = std::nullopt;
1975 consumer->service_->CloneSession(consumer_args);
1976 }
1977
ChangeTracingSessionConfig(TracingSessionGlobalID session_id,const TraceConfig & trace_config)1978 void TracingMuxerImpl::ChangeTracingSessionConfig(
1979 TracingSessionGlobalID session_id,
1980 const TraceConfig& trace_config) {
1981 PERFETTO_DCHECK_THREAD(thread_checker_);
1982
1983 auto* consumer = FindConsumer(session_id);
1984
1985 if (!consumer)
1986 return;
1987
1988 if (!consumer->trace_config_) {
1989 // Changing the config is only supported for started sessions.
1990 PERFETTO_ELOG("Must call Setup(config) and Start() first");
1991 return;
1992 }
1993
1994 consumer->trace_config_ = std::make_shared<TraceConfig>(trace_config);
1995 if (consumer->connected_)
1996 consumer->service_->ChangeTraceConfig(trace_config);
1997 }
1998
FlushTracingSession(TracingSessionGlobalID session_id,uint32_t timeout_ms,std::function<void (bool)> callback)1999 void TracingMuxerImpl::FlushTracingSession(TracingSessionGlobalID session_id,
2000 uint32_t timeout_ms,
2001 std::function<void(bool)> callback) {
2002 PERFETTO_DCHECK_THREAD(thread_checker_);
2003 auto* consumer = FindConsumer(session_id);
2004 if (!consumer || consumer->start_pending_ || consumer->stop_pending_ ||
2005 !consumer->trace_config_) {
2006 PERFETTO_ELOG("Flush() can be called only after Start() and before Stop()");
2007 std::move(callback)(false);
2008 return;
2009 }
2010
2011 // For now we don't want to expose the flush reason to the consumer-side SDK
2012 // users to avoid misuses until there is a strong need.
2013 consumer->service_->Flush(timeout_ms, std::move(callback),
2014 FlushFlags(FlushFlags::Initiator::kConsumerSdk,
2015 FlushFlags::Reason::kExplicit));
2016 }
2017
StopTracingSession(TracingSessionGlobalID session_id)2018 void TracingMuxerImpl::StopTracingSession(TracingSessionGlobalID session_id) {
2019 PERFETTO_DCHECK_THREAD(thread_checker_);
2020 auto* consumer = FindConsumer(session_id);
2021 if (!consumer)
2022 return;
2023
2024 if (consumer->start_pending_) {
2025 // If the session hasn't started yet, wait until it does before stopping.
2026 consumer->stop_pending_ = true;
2027 return;
2028 }
2029
2030 consumer->stop_pending_ = false;
2031 if (consumer->stopped_) {
2032 // If the session was already stopped (e.g., it failed to start), don't try
2033 // stopping again.
2034 consumer->NotifyStopComplete();
2035 } else if (!consumer->trace_config_) {
2036 PERFETTO_ELOG("Must call Setup(config) and Start() first");
2037 return;
2038 } else {
2039 consumer->service_->DisableTracing();
2040 }
2041
2042 consumer->trace_config_.reset();
2043 }
2044
DestroyTracingSession(TracingSessionGlobalID session_id)2045 void TracingMuxerImpl::DestroyTracingSession(
2046 TracingSessionGlobalID session_id) {
2047 PERFETTO_DCHECK_THREAD(thread_checker_);
2048 for (RegisteredConsumerBackend& backend : consumer_backends_) {
2049 // We need to find the consumer (if any) and call Disconnect as we destroy
2050 // the tracing session. We can't call Disconnect() inside this for loop
2051 // because in the in-process case this will end up to a synchronous call to
2052 // OnConsumerDisconnect which will invalidate all the iterators to
2053 // |backend.consumers|.
2054 ConsumerImpl* consumer = nullptr;
2055 for (auto& con : backend.consumers) {
2056 if (con->session_id_ == session_id) {
2057 consumer = con.get();
2058 break;
2059 }
2060 }
2061 if (consumer) {
2062 // We broke out of the loop above on the assumption that each backend will
2063 // only have a single consumer per session. This DCHECK ensures that
2064 // this is the case.
2065 PERFETTO_DCHECK(
2066 std::count_if(backend.consumers.begin(), backend.consumers.end(),
2067 [session_id](const std::unique_ptr<ConsumerImpl>& con) {
2068 return con->session_id_ == session_id;
2069 }) == 1u);
2070 consumer->Disconnect();
2071 }
2072 }
2073 }
2074
ReadTracingSessionData(TracingSessionGlobalID session_id,std::function<void (TracingSession::ReadTraceCallbackArgs)> callback)2075 void TracingMuxerImpl::ReadTracingSessionData(
2076 TracingSessionGlobalID session_id,
2077 std::function<void(TracingSession::ReadTraceCallbackArgs)> callback) {
2078 PERFETTO_DCHECK_THREAD(thread_checker_);
2079 auto* consumer = FindConsumer(session_id);
2080 if (!consumer) {
2081 // TODO(skyostil): Signal an error to the user.
2082 TracingSession::ReadTraceCallbackArgs callback_arg{};
2083 callback(callback_arg);
2084 return;
2085 }
2086 PERFETTO_DCHECK(!consumer->read_trace_callback_);
2087 consumer->read_trace_callback_ = std::move(callback);
2088 consumer->service_->ReadBuffers();
2089 }
2090
GetTraceStats(TracingSessionGlobalID session_id,TracingSession::GetTraceStatsCallback callback)2091 void TracingMuxerImpl::GetTraceStats(
2092 TracingSessionGlobalID session_id,
2093 TracingSession::GetTraceStatsCallback callback) {
2094 PERFETTO_DCHECK_THREAD(thread_checker_);
2095 auto* consumer = FindConsumer(session_id);
2096 if (!consumer) {
2097 TracingSession::GetTraceStatsCallbackArgs callback_arg{};
2098 callback_arg.success = false;
2099 callback(std::move(callback_arg));
2100 return;
2101 }
2102 PERFETTO_DCHECK(!consumer->get_trace_stats_callback_);
2103 consumer->get_trace_stats_callback_ = std::move(callback);
2104 if (!consumer->connected_) {
2105 consumer->get_trace_stats_pending_ = true;
2106 return;
2107 }
2108 consumer->get_trace_stats_pending_ = false;
2109 consumer->service_->GetTraceStats();
2110 }
2111
QueryServiceState(TracingSessionGlobalID session_id,TracingSession::QueryServiceStateCallback callback)2112 void TracingMuxerImpl::QueryServiceState(
2113 TracingSessionGlobalID session_id,
2114 TracingSession::QueryServiceStateCallback callback) {
2115 PERFETTO_DCHECK_THREAD(thread_checker_);
2116 auto* consumer = FindConsumer(session_id);
2117 if (!consumer) {
2118 TracingSession::QueryServiceStateCallbackArgs callback_arg{};
2119 callback_arg.success = false;
2120 callback(std::move(callback_arg));
2121 return;
2122 }
2123 PERFETTO_DCHECK(!consumer->query_service_state_callback_);
2124 if (!consumer->connected_) {
2125 consumer->query_service_state_callback_ = std::move(callback);
2126 return;
2127 }
2128 auto callback_wrapper = [callback](bool success,
2129 protos::gen::TracingServiceState state) {
2130 TracingSession::QueryServiceStateCallbackArgs callback_arg{};
2131 callback_arg.success = success;
2132 callback_arg.service_state_data = state.SerializeAsArray();
2133 callback(std::move(callback_arg));
2134 };
2135 consumer->service_->QueryServiceState({}, std::move(callback_wrapper));
2136 }
2137
SetBatchCommitsDurationForTesting(uint32_t batch_commits_duration_ms,BackendType backend_type)2138 void TracingMuxerImpl::SetBatchCommitsDurationForTesting(
2139 uint32_t batch_commits_duration_ms,
2140 BackendType backend_type) {
2141 for (RegisteredProducerBackend& backend : producer_backends_) {
2142 if (backend.producer && backend.producer->connected_ &&
2143 backend.type == backend_type) {
2144 backend.producer->service_->MaybeSharedMemoryArbiter()
2145 ->SetBatchCommitsDuration(batch_commits_duration_ms);
2146 }
2147 }
2148 }
2149
EnableDirectSMBPatchingForTesting(BackendType backend_type)2150 bool TracingMuxerImpl::EnableDirectSMBPatchingForTesting(
2151 BackendType backend_type) {
2152 for (RegisteredProducerBackend& backend : producer_backends_) {
2153 if (backend.producer && backend.producer->connected_ &&
2154 backend.type == backend_type &&
2155 !backend.producer->service_->MaybeSharedMemoryArbiter()
2156 ->EnableDirectSMBPatching()) {
2157 return false;
2158 }
2159 }
2160 return true;
2161 }
2162
FindConsumer(TracingSessionGlobalID session_id)2163 TracingMuxerImpl::ConsumerImpl* TracingMuxerImpl::FindConsumer(
2164 TracingSessionGlobalID session_id) {
2165 PERFETTO_DCHECK_THREAD(thread_checker_);
2166 return FindConsumerAndBackend(session_id).first;
2167 }
2168
2169 std::pair<TracingMuxerImpl::ConsumerImpl*,
2170 TracingMuxerImpl::RegisteredConsumerBackend*>
FindConsumerAndBackend(TracingSessionGlobalID session_id)2171 TracingMuxerImpl::FindConsumerAndBackend(TracingSessionGlobalID session_id) {
2172 PERFETTO_DCHECK_THREAD(thread_checker_);
2173 for (RegisteredConsumerBackend& backend : consumer_backends_) {
2174 for (auto& consumer : backend.consumers) {
2175 if (consumer->session_id_ == session_id) {
2176 return {consumer.get(), &backend};
2177 }
2178 }
2179 }
2180 return {nullptr, nullptr};
2181 }
2182
InitializeConsumer(TracingSessionGlobalID session_id)2183 void TracingMuxerImpl::InitializeConsumer(TracingSessionGlobalID session_id) {
2184 PERFETTO_DCHECK_THREAD(thread_checker_);
2185
2186 auto res = FindConsumerAndBackend(session_id);
2187 if (!res.first || !res.second)
2188 return;
2189 TracingMuxerImpl::ConsumerImpl* consumer = res.first;
2190 RegisteredConsumerBackend& backend = *res.second;
2191
2192 TracingBackend::ConnectConsumerArgs conn_args;
2193 conn_args.consumer = consumer;
2194 conn_args.task_runner = task_runner_.get();
2195 consumer->Initialize(backend.backend->ConnectConsumer(conn_args));
2196 }
2197
OnConsumerDisconnected(ConsumerImpl * consumer)2198 void TracingMuxerImpl::OnConsumerDisconnected(ConsumerImpl* consumer) {
2199 PERFETTO_DCHECK_THREAD(thread_checker_);
2200 for (RegisteredConsumerBackend& backend : consumer_backends_) {
2201 auto pred = [consumer](const std::unique_ptr<ConsumerImpl>& con) {
2202 return con.get() == consumer;
2203 };
2204 backend.consumers.erase(std::remove_if(backend.consumers.begin(),
2205 backend.consumers.end(), pred),
2206 backend.consumers.end());
2207 }
2208 }
2209
SetMaxProducerReconnectionsForTesting(uint32_t count)2210 void TracingMuxerImpl::SetMaxProducerReconnectionsForTesting(uint32_t count) {
2211 max_producer_reconnections_.store(count);
2212 }
2213
OnProducerDisconnected(ProducerImpl * producer)2214 void TracingMuxerImpl::OnProducerDisconnected(ProducerImpl* producer) {
2215 PERFETTO_DCHECK_THREAD(thread_checker_);
2216 for (RegisteredProducerBackend& backend : producer_backends_) {
2217 if (backend.producer.get() != producer)
2218 continue;
2219
2220 // The tracing service is disconnected. It does not make sense to keep
2221 // tracing (we wouldn't be able to commit). On reconnection, the tracing
2222 // service will restart the data sources.
2223 for (const auto& rds : data_sources_) {
2224 DataSourceStaticState* static_state = rds.static_state;
2225 for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
2226 auto* internal_state = static_state->TryGet(i);
2227 if (internal_state && internal_state->backend_id == backend.id &&
2228 internal_state->backend_connection_id ==
2229 backend.producer->connection_id_.load(
2230 std::memory_order_relaxed)) {
2231 StopDataSource_AsyncBeginImpl(
2232 FindDataSourceRes(static_state, internal_state, i,
2233 rds.requires_callbacks_under_lock));
2234 }
2235 }
2236 }
2237
2238 // Try reconnecting the disconnected producer. If the connection succeeds,
2239 // all the data sources will be automatically re-registered.
2240 if (producer->connection_id_.load(std::memory_order_relaxed) >
2241 max_producer_reconnections_.load()) {
2242 // Avoid reconnecting a failing producer too many times. Instead we just
2243 // leak the producer instead of trying to avoid further complicating
2244 // cross-thread trace writer creation.
2245 PERFETTO_ELOG("Producer disconnected too many times; not reconnecting");
2246 continue;
2247 }
2248
2249 backend.producer->Initialize(
2250 backend.backend->ConnectProducer(backend.producer_conn_args));
2251 // Don't use producer-provided SMBs for the next connection unless startup
2252 // tracing requires it again.
2253 backend.producer_conn_args.use_producer_provided_smb = false;
2254 }
2255 }
2256
SweepDeadBackends()2257 void TracingMuxerImpl::SweepDeadBackends() {
2258 PERFETTO_DCHECK_THREAD(thread_checker_);
2259 for (auto it = dead_backends_.begin(); it != dead_backends_.end();) {
2260 auto next_it = it;
2261 next_it++;
2262 if (it->producer->SweepDeadServices())
2263 dead_backends_.erase(it);
2264 it = next_it;
2265 }
2266 }
2267
FindDataSource(TracingBackendId backend_id,DataSourceInstanceID instance_id)2268 TracingMuxerImpl::FindDataSourceRes TracingMuxerImpl::FindDataSource(
2269 TracingBackendId backend_id,
2270 DataSourceInstanceID instance_id) {
2271 PERFETTO_DCHECK_THREAD(thread_checker_);
2272 RegisteredProducerBackend& backend = *FindProducerBackendById(backend_id);
2273 for (const auto& rds : data_sources_) {
2274 DataSourceStaticState* static_state = rds.static_state;
2275 for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
2276 auto* internal_state = static_state->TryGet(i);
2277 if (internal_state && internal_state->backend_id == backend_id &&
2278 internal_state->backend_connection_id ==
2279 backend.producer->connection_id_.load(
2280 std::memory_order_relaxed) &&
2281 internal_state->data_source_instance_id == instance_id) {
2282 return FindDataSourceRes(static_state, internal_state, i,
2283 rds.requires_callbacks_under_lock);
2284 }
2285 }
2286 }
2287 return FindDataSourceRes();
2288 }
2289
2290 // Can be called from any thread.
CreateTraceWriter(DataSourceStaticState * static_state,uint32_t data_source_instance_index,DataSourceState * data_source,BufferExhaustedPolicy buffer_exhausted_policy)2291 std::unique_ptr<TraceWriterBase> TracingMuxerImpl::CreateTraceWriter(
2292 DataSourceStaticState* static_state,
2293 uint32_t data_source_instance_index,
2294 DataSourceState* data_source,
2295 BufferExhaustedPolicy buffer_exhausted_policy) {
2296 if (PERFETTO_UNLIKELY(data_source->interceptor_id)) {
2297 // If the session is being intercepted, return a heap-backed trace writer
2298 // instead. This is safe because all the data given to the interceptor is
2299 // either thread-local (|instance_index|), statically allocated
2300 // (|static_state|) or constant after initialization (|interceptor|). Access
2301 // to the interceptor instance itself through |data_source| is protected by
2302 // a statically allocated lock (similarly to the data source instance).
2303 auto& interceptor = interceptors_[data_source->interceptor_id - 1];
2304 return std::unique_ptr<TraceWriterBase>(new InterceptorTraceWriter(
2305 interceptor.tls_factory(static_state, data_source_instance_index),
2306 interceptor.packet_callback, static_state, data_source_instance_index));
2307 }
2308 ProducerImpl* producer =
2309 FindProducerBackendById(data_source->backend_id)->producer.get();
2310 // Atomically load the current service endpoint. We keep the pointer as a
2311 // shared pointer on the stack to guard against it from being concurrently
2312 // modified on the thread by ProducerImpl::Initialize() swapping in a
2313 // reconnected service on the muxer task runner thread.
2314 //
2315 // The endpoint may also be concurrently modified by SweepDeadServices()
2316 // clearing out old disconnected services. We guard against that by
2317 // SharedMemoryArbiter keeping track of any outstanding trace writers. After
2318 // shutdown has started, the trace writer created below will be a null one
2319 // which will drop any written data. See SharedMemoryArbiter::TryShutdown().
2320 //
2321 // We use an atomic pointer instead of holding a lock because
2322 // CreateTraceWriter posts tasks under the hood.
2323 std::shared_ptr<ProducerEndpoint> service =
2324 std::atomic_load(&producer->service_);
2325
2326 // The service may have been disconnected and reconnected concurrently after
2327 // the data source was enabled, in which case we may not have an arbiter, or
2328 // would be creating a TraceWriter for the wrong (a newer) connection / SMB.
2329 // Instead, early-out now. A relaxed load is fine here because the atomic_load
2330 // above ensures that the |service| isn't newer.
2331 if (producer->connection_id_.load(std::memory_order_relaxed) !=
2332 data_source->backend_connection_id) {
2333 return std::unique_ptr<TraceWriter>(new NullTraceWriter());
2334 }
2335
2336 // We just need a relaxed atomic read here: We can use the reservation ID even
2337 // after the buffer was bound, we just need to be sure to read it atomically.
2338 uint16_t startup_buffer_reservation =
2339 data_source->startup_target_buffer_reservation.load(
2340 std::memory_order_relaxed);
2341 if (startup_buffer_reservation) {
2342 return service->MaybeSharedMemoryArbiter()->CreateStartupTraceWriter(
2343 startup_buffer_reservation);
2344 }
2345 return service->CreateTraceWriter(data_source->buffer_id,
2346 buffer_exhausted_policy);
2347 }
2348
2349 // This is called via the public API Tracing::NewTrace().
2350 // Can be called from any thread.
CreateTracingSession(BackendType requested_backend_type,TracingConsumerBackend * (* system_backend_factory)())2351 std::unique_ptr<TracingSession> TracingMuxerImpl::CreateTracingSession(
2352 BackendType requested_backend_type,
2353 TracingConsumerBackend* (*system_backend_factory)()) {
2354 TracingSessionGlobalID session_id = ++next_tracing_session_id_;
2355
2356 // |backend_type| can only specify one backend, not an OR-ed mask.
2357 PERFETTO_CHECK((requested_backend_type & (requested_backend_type - 1)) == 0);
2358
2359 // Capturing |this| is fine because the TracingMuxer is a leaky singleton.
2360 task_runner_->PostTask([this, requested_backend_type, session_id,
2361 system_backend_factory] {
2362 if (requested_backend_type == kSystemBackend && system_backend_factory &&
2363 !FindConsumerBackendByType(kSystemBackend)) {
2364 AddConsumerBackend(system_backend_factory(), kSystemBackend);
2365 }
2366 for (RegisteredConsumerBackend& backend : consumer_backends_) {
2367 if (requested_backend_type && backend.type &&
2368 backend.type != requested_backend_type) {
2369 continue;
2370 }
2371
2372 // Create the consumer now, even if we have to ask the embedder below, so
2373 // that any other tasks executing after this one can find the consumer and
2374 // change its pending attributes.
2375 backend.consumers.emplace_back(
2376 new ConsumerImpl(this, backend.type, session_id));
2377
2378 // The last registered backend in |consumer_backends_| is the unsupported
2379 // backend without a valid type.
2380 if (!backend.type) {
2381 PERFETTO_ELOG(
2382 "No tracing backend ready for type=%d, consumer will disconnect",
2383 requested_backend_type);
2384 InitializeConsumer(session_id);
2385 return;
2386 }
2387
2388 // Check if the embedder wants to be asked for permission before
2389 // connecting the consumer.
2390 if (!policy_) {
2391 InitializeConsumer(session_id);
2392 return;
2393 }
2394
2395 BackendType type = backend.type;
2396 TracingPolicy::ShouldAllowConsumerSessionArgs args;
2397 args.backend_type = backend.type;
2398 args.result_callback = [this, type, session_id](bool allow) {
2399 task_runner_->PostTask([this, type, session_id, allow] {
2400 if (allow) {
2401 InitializeConsumer(session_id);
2402 return;
2403 }
2404
2405 PERFETTO_ELOG(
2406 "Consumer session for backend type type=%d forbidden, "
2407 "consumer will disconnect",
2408 type);
2409
2410 auto* consumer = FindConsumer(session_id);
2411 if (!consumer)
2412 return;
2413
2414 consumer->OnDisconnect();
2415 });
2416 };
2417 policy_->ShouldAllowConsumerSession(args);
2418 return;
2419 }
2420 PERFETTO_DFATAL("Not reached");
2421 });
2422
2423 return std::unique_ptr<TracingSession>(
2424 new TracingSessionImpl(this, session_id, requested_backend_type));
2425 }
2426
2427 // static
2428 // This is called via the public API Tracing::SetupStartupTracing().
2429 // Can be called from any thread.
2430 std::unique_ptr<StartupTracingSession>
CreateStartupTracingSession(const TraceConfig & config,Tracing::SetupStartupTracingOpts opts)2431 TracingMuxerImpl::CreateStartupTracingSession(
2432 const TraceConfig& config,
2433 Tracing::SetupStartupTracingOpts opts) {
2434 BackendType backend_type = opts.backend;
2435 // |backend_type| can only specify one backend, not an OR-ed mask.
2436 PERFETTO_CHECK((backend_type & (backend_type - 1)) == 0);
2437 // The in-process backend doesn't support startup tracing.
2438 PERFETTO_CHECK(backend_type != BackendType::kInProcessBackend);
2439
2440 TracingSessionGlobalID session_id = ++next_tracing_session_id_;
2441
2442 // Capturing |this| is fine because the TracingMuxer is a leaky singleton.
2443 task_runner_->PostTask([this, config, opts, backend_type, session_id] {
2444 for (RegisteredProducerBackend& backend : producer_backends_) {
2445 if (backend_type && backend.type && backend.type != backend_type) {
2446 continue;
2447 }
2448
2449 TracingBackendId backend_id = backend.id;
2450
2451 // The last registered backend in |producer_backends_| is the unsupported
2452 // backend without a valid type.
2453 if (!backend.type) {
2454 PERFETTO_ELOG(
2455 "No tracing backend initialized for type=%d, startup tracing "
2456 "failed",
2457 backend_type);
2458 if (opts.on_setup)
2459 opts.on_setup(Tracing::OnStartupTracingSetupCallbackArgs{
2460 0 /* num_data_sources_started */});
2461 return;
2462 }
2463
2464 if (!backend.producer->service_ ||
2465 !backend.producer->service_->shared_memory()) {
2466 // If we unsuccessfully attempted to use a producer-provided SMB in the
2467 // past, don't try again.
2468 if (backend.producer->producer_provided_smb_failed_) {
2469 PERFETTO_ELOG(
2470 "Backend %zu doesn't seem to support producer-provided "
2471 "SMBs, startup tracing failed",
2472 backend_id);
2473 if (opts.on_setup)
2474 opts.on_setup(Tracing::OnStartupTracingSetupCallbackArgs{
2475 0 /* num_data_sources_started */});
2476 return;
2477 }
2478
2479 PERFETTO_DLOG("Reconnecting backend %zu for startup tracing",
2480 backend_id);
2481 backend.producer_conn_args.use_producer_provided_smb = true;
2482 backend.producer->service_->Disconnect(); // Causes a reconnect.
2483 PERFETTO_DCHECK(backend.producer->service_ &&
2484 backend.producer->service_->MaybeSharedMemoryArbiter());
2485 }
2486
2487 RegisteredStartupSession session;
2488 session.session_id = session_id;
2489 session.on_aborted = opts.on_aborted;
2490 session.on_adopted = opts.on_adopted;
2491
2492 for (const TraceConfig::DataSource& ds_cfg : config.data_sources()) {
2493 // Find all matching data sources and start one instance of each.
2494 for (const auto& rds : data_sources_) {
2495 if (rds.descriptor.name() != ds_cfg.config().name())
2496 continue;
2497
2498 PERFETTO_DLOG(
2499 "Setting up data source %s for startup tracing with target "
2500 "buffer reservation %" PRIi32,
2501 rds.descriptor.name().c_str(),
2502 backend.producer->last_startup_target_buffer_reservation_ + 1u);
2503 auto ds = SetupDataSourceImpl(
2504 rds, backend_id,
2505 backend.producer->connection_id_.load(std::memory_order_relaxed),
2506 /*instance_id=*/0, ds_cfg.config(),
2507 /*startup_session_id=*/session_id);
2508 if (ds) {
2509 StartDataSourceImpl(ds);
2510 session.num_unbound_data_sources++;
2511 }
2512 }
2513 }
2514
2515 int num_ds = session.num_unbound_data_sources;
2516 auto on_setup = opts.on_setup;
2517 if (on_setup) {
2518 backend.producer->OnStartupTracingSetup();
2519 task_runner_->PostTask([on_setup, num_ds] {
2520 on_setup(Tracing::OnStartupTracingSetupCallbackArgs{num_ds});
2521 });
2522 }
2523
2524 if (num_ds > 0) {
2525 backend.startup_sessions.push_back(std::move(session));
2526
2527 if (opts.timeout_ms > 0) {
2528 task_runner_->PostDelayedTask(
2529 [this, session_id, backend_type] {
2530 AbortStartupTracingSession(session_id, backend_type);
2531 },
2532 opts.timeout_ms);
2533 }
2534 }
2535 return;
2536 }
2537 PERFETTO_DFATAL("Invalid startup tracing session backend");
2538 });
2539
2540 return std::unique_ptr<StartupTracingSession>(
2541 new StartupTracingSessionImpl(this, session_id, backend_type));
2542 }
2543
2544 // Must not be called from the SDK's internal thread.
2545 std::unique_ptr<StartupTracingSession>
CreateStartupTracingSessionBlocking(const TraceConfig & config,Tracing::SetupStartupTracingOpts opts)2546 TracingMuxerImpl::CreateStartupTracingSessionBlocking(
2547 const TraceConfig& config,
2548 Tracing::SetupStartupTracingOpts opts) {
2549 auto previous_on_setup = std::move(opts.on_setup);
2550 PERFETTO_CHECK(!task_runner_->RunsTasksOnCurrentThread());
2551 base::WaitableEvent event;
2552 // It is safe to capture by reference because once on_setup is called only
2553 // once before this method returns.
2554 opts.on_setup = [&](Tracing::OnStartupTracingSetupCallbackArgs args) {
2555 if (previous_on_setup) {
2556 previous_on_setup(std::move(args));
2557 }
2558 event.Notify();
2559 };
2560 auto session = CreateStartupTracingSession(config, std::move(opts));
2561 event.Wait();
2562 return session;
2563 }
2564
AbortStartupTracingSession(TracingSessionGlobalID session_id,BackendType backend_type)2565 void TracingMuxerImpl::AbortStartupTracingSession(
2566 TracingSessionGlobalID session_id,
2567 BackendType backend_type) {
2568 PERFETTO_DCHECK_THREAD(thread_checker_);
2569
2570 for (RegisteredProducerBackend& backend : producer_backends_) {
2571 if (backend_type != backend.type)
2572 continue;
2573
2574 auto session_it = std::find_if(
2575 backend.startup_sessions.begin(), backend.startup_sessions.end(),
2576 [session_id](const RegisteredStartupSession& session) {
2577 return session.session_id == session_id;
2578 });
2579
2580 // The startup session may have already been aborted or fully adopted.
2581 if (session_it == backend.startup_sessions.end())
2582 return;
2583 if (session_it->is_aborting)
2584 return;
2585
2586 session_it->is_aborting = true;
2587
2588 // Iterate all data sources and abort them if they weren't adopted yet.
2589 for (const auto& rds : data_sources_) {
2590 DataSourceStaticState* static_state = rds.static_state;
2591 for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
2592 auto* internal_state = static_state->TryGet(i);
2593 if (internal_state &&
2594 internal_state->startup_target_buffer_reservation.load(
2595 std::memory_order_relaxed) &&
2596 internal_state->data_source_instance_id == 0 &&
2597 internal_state->startup_session_id == session_id) {
2598 PERFETTO_DLOG(
2599 "Aborting startup tracing for data source %s (target buffer "
2600 "reservation %" PRIu16 ")",
2601 rds.descriptor.name().c_str(),
2602 internal_state->startup_target_buffer_reservation.load(
2603 std::memory_order_relaxed));
2604
2605 // Abort the instance asynchronously by stopping it. From this point
2606 // onwards, the service will not be able to adopt it via
2607 // StartDataSource().
2608 session_it->num_aborting_data_sources++;
2609 StopDataSource_AsyncBeginImpl(
2610 FindDataSourceRes(static_state, internal_state, i,
2611 rds.requires_callbacks_under_lock));
2612 }
2613 }
2614 }
2615
2616 // If we did everything right, we should have aborted all still-unbound data
2617 // source instances.
2618 PERFETTO_DCHECK(session_it->num_unbound_data_sources ==
2619 session_it->num_aborting_data_sources);
2620
2621 if (session_it->num_aborting_data_sources == 0) {
2622 if (session_it->on_aborted)
2623 task_runner_->PostTask(session_it->on_aborted);
2624
2625 backend.startup_sessions.erase(session_it);
2626 }
2627 return;
2628 }
2629 // We might reach here in tests because when we start a trace, we post the
2630 // Task(AbortStartupTrace, delay=timeout). When we do
2631 // perfetto::ResetForTesting, we sweep dead backends, and we are not able to
2632 // kill those delayed tasks because TaskRunner doesn't have support for
2633 // deleting scheduled future tasks and TaskRunner doesn't have any API for us
2634 // to wait for the completion of all the scheduled tasks (apart from
2635 // deleting the TaskRunner) and we want to avoid doing that because we need
2636 // a long running TaskRunner in muxer.
2637 PERFETTO_DLOG("Invalid startup tracing session backend");
2638 }
2639
InitializeInstance(const TracingInitArgs & args)2640 void TracingMuxerImpl::InitializeInstance(const TracingInitArgs& args) {
2641 if (instance_ != TracingMuxerFake::Get()) {
2642 // The tracing muxer was already initialized. We might need to initialize
2643 // additional backends that were not configured earlier.
2644 auto* muxer = static_cast<TracingMuxerImpl*>(instance_);
2645 muxer->task_runner_->PostTask([muxer, args] { muxer->AddBackends(args); });
2646 return;
2647 }
2648 // If we previously had a TracingMuxerImpl instance which was reset,
2649 // reinitialize and reuse it instead of trying to create a new one. See
2650 // ResetForTesting().
2651 if (g_prev_instance) {
2652 auto* muxer = g_prev_instance;
2653 g_prev_instance = nullptr;
2654 instance_ = muxer;
2655 muxer->task_runner_->PostTask([muxer, args] {
2656 muxer->Initialize(args);
2657 muxer->AddBackends(args);
2658 });
2659 } else {
2660 new TracingMuxerImpl(args);
2661 }
2662 }
2663
2664 // static
ResetForTesting()2665 void TracingMuxerImpl::ResetForTesting() {
2666 // Ideally we'd tear down the entire TracingMuxerImpl, but the lifetimes of
2667 // various objects make that a non-starter. In particular:
2668 //
2669 // 1) Any thread that has entered a trace event has a TraceWriter, which holds
2670 // a reference back to ProducerImpl::service_.
2671 //
2672 // 2) ProducerImpl::service_ has a reference back to the ProducerImpl.
2673 //
2674 // 3) ProducerImpl holds reference to TracingMuxerImpl::task_runner_, which in
2675 // turn depends on TracingMuxerImpl itself.
2676 //
2677 // Because of this, it's not safe to deallocate TracingMuxerImpl until all
2678 // threads have dropped their TraceWriters. Since we can't really ask the
2679 // caller to guarantee this, we'll instead reset enough of the muxer's state
2680 // so that it can be reinitialized later and ensure all necessary objects from
2681 // the old state remain alive until all references have gone away.
2682 auto* muxer = reinterpret_cast<TracingMuxerImpl*>(instance_);
2683
2684 base::WaitableEvent reset_done;
2685 auto do_reset = [muxer, &reset_done] {
2686 muxer->DestroyStoppedTraceWritersForCurrentThread();
2687 // Unregister all data sources so they don't interfere with any future
2688 // tracing sessions.
2689 for (RegisteredDataSource& rds : muxer->data_sources_) {
2690 for (RegisteredProducerBackend& backend : muxer->producer_backends_) {
2691 if (!backend.producer->service_ || !backend.producer->connected_)
2692 continue;
2693 backend.producer->service_->UnregisterDataSource(rds.descriptor.name());
2694 }
2695 }
2696 for (auto& backend : muxer->consumer_backends_) {
2697 // Check that no consumer session is currently active on any backend.
2698 for (auto& consumer : backend.consumers)
2699 PERFETTO_CHECK(!consumer->service_);
2700 }
2701 for (auto& backend : muxer->producer_backends_) {
2702 backend.producer->muxer_ = nullptr;
2703 backend.producer->DisposeConnection();
2704 muxer->dead_backends_.push_back(std::move(backend));
2705 }
2706 muxer->consumer_backends_.clear();
2707 muxer->producer_backends_.clear();
2708 muxer->interceptors_.clear();
2709
2710 for (auto& ds : muxer->data_sources_) {
2711 ds.static_state->ResetForTesting();
2712 }
2713
2714 muxer->data_sources_.clear();
2715 muxer->next_data_source_index_ = 0;
2716
2717 // Free all backends without active trace writers or other inbound
2718 // references. Note that even if all the backends get swept, the muxer still
2719 // needs to stay around since |task_runner_| is assumed to be long-lived.
2720 muxer->SweepDeadBackends();
2721
2722 // Make sure we eventually discard any per-thread trace writers from the
2723 // previous instance.
2724 muxer->muxer_id_for_testing_++;
2725
2726 g_prev_instance = muxer;
2727 instance_ = TracingMuxerFake::Get();
2728
2729 // Call the user provided cleanups on the muxer thread.
2730 for (auto& cb : muxer->reset_callbacks_) {
2731 cb();
2732 }
2733
2734 reset_done.Notify();
2735 };
2736
2737 // Some tests run the muxer and the test on the same thread. In these cases,
2738 // we can reset synchronously.
2739 if (muxer->task_runner_->RunsTasksOnCurrentThread()) {
2740 do_reset();
2741 } else {
2742 muxer->DestroyStoppedTraceWritersForCurrentThread();
2743 muxer->task_runner_->PostTask(std::move(do_reset));
2744 reset_done.Wait();
2745 // Call the user provided cleanups also on this thread.
2746 for (auto& cb : muxer->reset_callbacks_) {
2747 cb();
2748 }
2749 }
2750 muxer->reset_callbacks_.clear();
2751 }
2752
2753 // static
Shutdown()2754 void TracingMuxerImpl::Shutdown() {
2755 auto* muxer = reinterpret_cast<TracingMuxerImpl*>(instance_);
2756
2757 // Shutting down on the muxer thread would lead to a deadlock.
2758 PERFETTO_CHECK(!muxer->task_runner_->RunsTasksOnCurrentThread());
2759 muxer->DestroyStoppedTraceWritersForCurrentThread();
2760
2761 std::unique_ptr<base::TaskRunner> owned_task_runner(
2762 muxer->task_runner_.get());
2763 base::WaitableEvent shutdown_done;
2764 owned_task_runner->PostTask([muxer, &shutdown_done] {
2765 // Check that no consumer session is currently active on any backend.
2766 // Producers will be automatically disconnected as a part of deleting the
2767 // muxer below.
2768 for (auto& backend : muxer->consumer_backends_) {
2769 for (auto& consumer : backend.consumers) {
2770 PERFETTO_CHECK(!consumer->service_);
2771 }
2772 }
2773 // Make sure no trace writers are lingering around on the muxer thread. Note
2774 // that we can't do this for any arbitrary thread in the process; it is the
2775 // caller's responsibility to clean them up before shutting down Perfetto.
2776 muxer->DestroyStoppedTraceWritersForCurrentThread();
2777 // The task runner must be deleted outside the muxer thread. This is done by
2778 // `owned_task_runner` above.
2779 muxer->task_runner_.release();
2780 auto* platform = muxer->platform_;
2781 delete muxer;
2782 instance_ = TracingMuxerFake::Get();
2783 platform->Shutdown();
2784 shutdown_done.Notify();
2785 });
2786 shutdown_done.Wait();
2787 }
2788
AppendResetForTestingCallback(std::function<void ()> cb)2789 void TracingMuxerImpl::AppendResetForTestingCallback(std::function<void()> cb) {
2790 reset_callbacks_.push_back(std::move(cb));
2791 }
2792
2793 TracingMuxer::~TracingMuxer() = default;
2794
2795 static_assert(std::is_same<internal::BufferId, BufferID>::value,
2796 "public's BufferId and tracing/core's BufferID diverged");
2797
2798 } // namespace internal
2799 } // namespace perfetto
2800