xref: /aosp_15_r20/external/perfetto/src/tracing/internal/tracing_muxer_impl.cc (revision 6dbdd20afdafa5e3ca9b8809fa73465d530080dc)
1 /*
2  * Copyright (C) 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/tracing/internal/tracing_muxer_impl.h"
18 
19 #include <algorithm>
20 #include <atomic>
21 #include <mutex>
22 #include <optional>
23 #include <vector>
24 
25 #include "perfetto/base/build_config.h"
26 #include "perfetto/base/logging.h"
27 #include "perfetto/base/task_runner.h"
28 #include "perfetto/base/time.h"
29 #include "perfetto/ext/base/hash.h"
30 #include "perfetto/ext/base/thread_checker.h"
31 #include "perfetto/ext/base/waitable_event.h"
32 #include "perfetto/ext/tracing/core/shared_memory_arbiter.h"
33 #include "perfetto/ext/tracing/core/trace_packet.h"
34 #include "perfetto/ext/tracing/core/trace_stats.h"
35 #include "perfetto/ext/tracing/core/trace_writer.h"
36 #include "perfetto/ext/tracing/core/tracing_service.h"
37 #include "perfetto/tracing/buffer_exhausted_policy.h"
38 #include "perfetto/tracing/core/data_source_config.h"
39 #include "perfetto/tracing/core/tracing_service_state.h"
40 #include "perfetto/tracing/data_source.h"
41 #include "perfetto/tracing/internal/data_source_internal.h"
42 #include "perfetto/tracing/internal/interceptor_trace_writer.h"
43 #include "perfetto/tracing/internal/tracing_backend_fake.h"
44 #include "perfetto/tracing/trace_writer_base.h"
45 #include "perfetto/tracing/tracing.h"
46 #include "perfetto/tracing/tracing_backend.h"
47 #include "src/tracing/core/null_trace_writer.h"
48 #include "src/tracing/internal/tracing_muxer_fake.h"
49 
50 #include "protos/perfetto/config/interceptor_config.gen.h"
51 
52 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
53 #include <io.h>  // For dup()
54 #else
55 #include <unistd.h>  // For dup()
56 #endif
57 
58 namespace perfetto {
59 namespace internal {
60 
61 namespace {
62 
63 using RegisteredDataSource = TracingMuxerImpl::RegisteredDataSource;
64 
65 // A task runner which prevents calls to DataSource::Trace() while an operation
66 // is in progress. Used to guard against unexpected re-entrancy where the
67 // user-provided task runner implementation tries to enter a trace point under
68 // the hood.
69 class NonReentrantTaskRunner : public base::TaskRunner {
70  public:
NonReentrantTaskRunner(TracingMuxer * muxer,std::unique_ptr<base::TaskRunner> task_runner)71   NonReentrantTaskRunner(TracingMuxer* muxer,
72                          std::unique_ptr<base::TaskRunner> task_runner)
73       : muxer_(muxer), task_runner_(std::move(task_runner)) {}
74 
75   // base::TaskRunner implementation.
PostTask(std::function<void ()> task)76   void PostTask(std::function<void()> task) override {
77     CallWithGuard([&] { task_runner_->PostTask(std::move(task)); });
78   }
79 
PostDelayedTask(std::function<void ()> task,uint32_t delay_ms)80   void PostDelayedTask(std::function<void()> task, uint32_t delay_ms) override {
81     CallWithGuard(
82         [&] { task_runner_->PostDelayedTask(std::move(task), delay_ms); });
83   }
84 
AddFileDescriptorWatch(base::PlatformHandle fd,std::function<void ()> callback)85   void AddFileDescriptorWatch(base::PlatformHandle fd,
86                               std::function<void()> callback) override {
87     CallWithGuard(
88         [&] { task_runner_->AddFileDescriptorWatch(fd, std::move(callback)); });
89   }
90 
RemoveFileDescriptorWatch(base::PlatformHandle fd)91   void RemoveFileDescriptorWatch(base::PlatformHandle fd) override {
92     CallWithGuard([&] { task_runner_->RemoveFileDescriptorWatch(fd); });
93   }
94 
RunsTasksOnCurrentThread() const95   bool RunsTasksOnCurrentThread() const override {
96     bool result;
97     CallWithGuard([&] { result = task_runner_->RunsTasksOnCurrentThread(); });
98     return result;
99   }
100 
101  private:
102   template <typename T>
CallWithGuard(T lambda) const103   void CallWithGuard(T lambda) const {
104     auto* root_tls = muxer_->GetOrCreateTracingTLS();
105     if (PERFETTO_UNLIKELY(root_tls->is_in_trace_point)) {
106       lambda();
107       return;
108     }
109     ScopedReentrancyAnnotator scoped_annotator(*root_tls);
110     lambda();
111   }
112 
113   TracingMuxer* const muxer_;
114   std::unique_ptr<base::TaskRunner> task_runner_;
115 };
116 
117 class StopArgsImpl : public DataSourceBase::StopArgs {
118  public:
HandleStopAsynchronously() const119   std::function<void()> HandleStopAsynchronously() const override {
120     auto closure = std::move(async_stop_closure);
121     async_stop_closure = std::function<void()>();
122     return closure;
123   }
124 
125   mutable std::function<void()> async_stop_closure;
126 };
127 
128 class FlushArgsImpl : public DataSourceBase::FlushArgs {
129  public:
HandleFlushAsynchronously() const130   std::function<void()> HandleFlushAsynchronously() const override {
131     auto closure = std::move(async_flush_closure);
132     async_flush_closure = std::function<void()>();
133     return closure;
134   }
135 
136   mutable std::function<void()> async_flush_closure;
137 };
138 
139 // Holds an earlier TracingMuxerImpl instance after ResetForTesting() is called.
140 static TracingMuxerImpl* g_prev_instance{};
141 
142 template <typename RegisteredBackend>
143 struct CompareBackendByType {
BackendTypePriorityperfetto::internal::__anon6824a9f60111::CompareBackendByType144   static int BackendTypePriority(BackendType type) {
145     switch (type) {
146       case kSystemBackend:
147         return 0;
148       case kInProcessBackend:
149         return 1;
150       case kCustomBackend:
151         return 2;
152       // The UnspecifiedBackend has the highest priority so that
153       // TracingBackendFake is the last one on the backend lists.
154       case kUnspecifiedBackend:
155         break;
156     }
157     return 3;
158   }
operator ()perfetto::internal::__anon6824a9f60111::CompareBackendByType159   bool operator()(BackendType type, const RegisteredBackend& b) {
160     return BackendTypePriority(type) < BackendTypePriority(b.type);
161   }
162 };
163 
164 }  // namespace
165 
166 // ----- Begin of TracingMuxerImpl::ProducerImpl
ProducerImpl(TracingMuxerImpl * muxer,TracingBackendId backend_id,uint32_t shmem_batch_commits_duration_ms,bool shmem_direct_patching_enabled)167 TracingMuxerImpl::ProducerImpl::ProducerImpl(
168     TracingMuxerImpl* muxer,
169     TracingBackendId backend_id,
170     uint32_t shmem_batch_commits_duration_ms,
171     bool shmem_direct_patching_enabled)
172     : muxer_(muxer),
173       backend_id_(backend_id),
174       shmem_batch_commits_duration_ms_(shmem_batch_commits_duration_ms),
175       shmem_direct_patching_enabled_(shmem_direct_patching_enabled) {}
176 
~ProducerImpl()177 TracingMuxerImpl::ProducerImpl::~ProducerImpl() {
178   muxer_ = nullptr;
179 }
180 
Initialize(std::unique_ptr<ProducerEndpoint> endpoint)181 void TracingMuxerImpl::ProducerImpl::Initialize(
182     std::unique_ptr<ProducerEndpoint> endpoint) {
183   PERFETTO_DCHECK_THREAD(thread_checker_);
184   PERFETTO_DCHECK(!connected_);
185   connection_id_.fetch_add(1, std::memory_order_relaxed);
186   is_producer_provided_smb_ = endpoint->shared_memory();
187   last_startup_target_buffer_reservation_ = 0;
188 
189   // Adopt the endpoint into a shared pointer so that we can safely share it
190   // across threads that create trace writers. The custom deleter function
191   // ensures that the endpoint is always destroyed on the muxer's thread. (Note
192   // that |task_runner| is assumed to outlive tracing sessions on all threads.)
193   auto* task_runner = muxer_->task_runner_.get();
194   auto deleter = [task_runner](ProducerEndpoint* e) {
195     if (task_runner->RunsTasksOnCurrentThread()) {
196       delete e;
197       return;
198     }
199     task_runner->PostTask([e] { delete e; });
200   };
201   std::shared_ptr<ProducerEndpoint> service(endpoint.release(), deleter);
202   // This atomic store is needed because another thread might be concurrently
203   // creating a trace writer using the previous (disconnected) |service_|. See
204   // CreateTraceWriter().
205   std::atomic_store(&service_, std::move(service));
206   // Don't try to use the service here since it may not have connected yet. See
207   // OnConnect().
208 }
209 
OnConnect()210 void TracingMuxerImpl::ProducerImpl::OnConnect() {
211   PERFETTO_DLOG("Producer connected");
212   PERFETTO_DCHECK_THREAD(thread_checker_);
213   PERFETTO_DCHECK(!connected_);
214   if (is_producer_provided_smb_ && !service_->IsShmemProvidedByProducer()) {
215     PERFETTO_ELOG(
216         "The service likely doesn't support producer-provided SMBs. Preventing "
217         "future attempts to use producer-provided SMB again with this "
218         "backend.");
219     producer_provided_smb_failed_ = true;
220     // Will call OnDisconnect() and cause a reconnect without producer-provided
221     // SMB.
222     service_->Disconnect();
223     return;
224   }
225   connected_ = true;
226   muxer_->UpdateDataSourcesOnAllBackends();
227   SendOnConnectTriggers();
228 }
229 
OnDisconnect()230 void TracingMuxerImpl::ProducerImpl::OnDisconnect() {
231   PERFETTO_DCHECK_THREAD(thread_checker_);
232   // If we're being destroyed, bail out.
233   if (!muxer_)
234     return;
235   connected_ = false;
236   // Active data sources for this producer will be stopped by
237   // DestroyStoppedTraceWritersForCurrentThread() since the reconnected producer
238   // will have a different connection id (even before it has finished
239   // connecting).
240   registered_data_sources_.reset();
241   DisposeConnection();
242 
243   // Try reconnecting the producer.
244   muxer_->OnProducerDisconnected(this);
245 }
246 
DisposeConnection()247 void TracingMuxerImpl::ProducerImpl::DisposeConnection() {
248   // Keep the old service around as a dead connection in case it has active
249   // trace writers. If any tracing sessions were created, we can't clear
250   // |service_| here because other threads may be concurrently creating new
251   // trace writers. Any reconnection attempt will atomically swap the new
252   // service in place of the old one.
253   if (did_setup_tracing_ || did_setup_startup_tracing_) {
254     dead_services_.push_back(service_);
255   } else {
256     service_.reset();
257   }
258 }
259 
OnTracingSetup()260 void TracingMuxerImpl::ProducerImpl::OnTracingSetup() {
261   PERFETTO_DCHECK_THREAD(thread_checker_);
262   did_setup_tracing_ = true;
263   service_->MaybeSharedMemoryArbiter()->SetBatchCommitsDuration(
264       shmem_batch_commits_duration_ms_);
265   if (shmem_direct_patching_enabled_) {
266     service_->MaybeSharedMemoryArbiter()->EnableDirectSMBPatching();
267   }
268 }
269 
OnStartupTracingSetup()270 void TracingMuxerImpl::ProducerImpl::OnStartupTracingSetup() {
271   PERFETTO_DCHECK_THREAD(thread_checker_);
272   did_setup_startup_tracing_ = true;
273 }
274 
SetupDataSource(DataSourceInstanceID id,const DataSourceConfig & cfg)275 void TracingMuxerImpl::ProducerImpl::SetupDataSource(
276     DataSourceInstanceID id,
277     const DataSourceConfig& cfg) {
278   PERFETTO_DCHECK_THREAD(thread_checker_);
279   if (!muxer_)
280     return;
281   muxer_->SetupDataSource(
282       backend_id_, connection_id_.load(std::memory_order_relaxed), id, cfg);
283 }
284 
StartDataSource(DataSourceInstanceID id,const DataSourceConfig &)285 void TracingMuxerImpl::ProducerImpl::StartDataSource(DataSourceInstanceID id,
286                                                      const DataSourceConfig&) {
287   PERFETTO_DCHECK_THREAD(thread_checker_);
288   if (!muxer_)
289     return;
290   muxer_->StartDataSource(backend_id_, id);
291   service_->NotifyDataSourceStarted(id);
292 }
293 
StopDataSource(DataSourceInstanceID id)294 void TracingMuxerImpl::ProducerImpl::StopDataSource(DataSourceInstanceID id) {
295   PERFETTO_DCHECK_THREAD(thread_checker_);
296   if (!muxer_)
297     return;
298   muxer_->StopDataSource_AsyncBegin(backend_id_, id);
299 }
300 
Flush(FlushRequestID flush_id,const DataSourceInstanceID * instances,size_t instance_count,FlushFlags flush_flags)301 void TracingMuxerImpl::ProducerImpl::Flush(
302     FlushRequestID flush_id,
303     const DataSourceInstanceID* instances,
304     size_t instance_count,
305     FlushFlags flush_flags) {
306   PERFETTO_DCHECK_THREAD(thread_checker_);
307   bool all_handled = true;
308   if (muxer_) {
309     for (size_t i = 0; i < instance_count; i++) {
310       DataSourceInstanceID ds_id = instances[i];
311       bool handled = muxer_->FlushDataSource_AsyncBegin(backend_id_, ds_id,
312                                                         flush_id, flush_flags);
313       if (!handled) {
314         pending_flushes_[flush_id].insert(ds_id);
315         all_handled = false;
316       }
317     }
318   }
319 
320   if (all_handled) {
321     service_->NotifyFlushComplete(flush_id);
322   }
323 }
324 
ClearIncrementalState(const DataSourceInstanceID * instances,size_t instance_count)325 void TracingMuxerImpl::ProducerImpl::ClearIncrementalState(
326     const DataSourceInstanceID* instances,
327     size_t instance_count) {
328   PERFETTO_DCHECK_THREAD(thread_checker_);
329   if (!muxer_)
330     return;
331   for (size_t inst_idx = 0; inst_idx < instance_count; inst_idx++) {
332     muxer_->ClearDataSourceIncrementalState(backend_id_, instances[inst_idx]);
333   }
334 }
335 
SweepDeadServices()336 bool TracingMuxerImpl::ProducerImpl::SweepDeadServices() {
337   PERFETTO_DCHECK_THREAD(thread_checker_);
338   auto is_unused = [](const std::shared_ptr<ProducerEndpoint>& endpoint) {
339     auto* arbiter = endpoint->MaybeSharedMemoryArbiter();
340     return !arbiter || arbiter->TryShutdown();
341   };
342   for (auto it = dead_services_.begin(); it != dead_services_.end();) {
343     auto next_it = it;
344     next_it++;
345     if (is_unused(*it)) {
346       dead_services_.erase(it);
347     }
348     it = next_it;
349   }
350   return dead_services_.empty();
351 }
352 
SendOnConnectTriggers()353 void TracingMuxerImpl::ProducerImpl::SendOnConnectTriggers() {
354   PERFETTO_DCHECK_THREAD(thread_checker_);
355   base::TimeMillis now = base::GetWallTimeMs();
356   std::vector<std::string> triggers;
357   while (!on_connect_triggers_.empty()) {
358     // Skip if we passed TTL.
359     if (on_connect_triggers_.front().second > now) {
360       triggers.push_back(std::move(on_connect_triggers_.front().first));
361     }
362     on_connect_triggers_.pop_front();
363   }
364   if (!triggers.empty()) {
365     service_->ActivateTriggers(triggers);
366   }
367 }
368 
NotifyFlushForDataSourceDone(DataSourceInstanceID ds_id,FlushRequestID flush_id)369 void TracingMuxerImpl::ProducerImpl::NotifyFlushForDataSourceDone(
370     DataSourceInstanceID ds_id,
371     FlushRequestID flush_id) {
372   if (!connected_) {
373     return;
374   }
375 
376   {
377     auto it = pending_flushes_.find(flush_id);
378     if (it == pending_flushes_.end()) {
379       return;
380     }
381     std::set<DataSourceInstanceID>& ds_ids = it->second;
382     ds_ids.erase(ds_id);
383   }
384 
385   std::optional<DataSourceInstanceID> biggest_flush_id;
386   for (auto it = pending_flushes_.begin(); it != pending_flushes_.end();) {
387     if (it->second.empty()) {
388       biggest_flush_id = it->first;
389       it = pending_flushes_.erase(it);
390     } else {
391       break;
392     }
393   }
394 
395   if (biggest_flush_id) {
396     service_->NotifyFlushComplete(*biggest_flush_id);
397   }
398 }
399 
400 // ----- End of TracingMuxerImpl::ProducerImpl methods.
401 
402 // ----- Begin of TracingMuxerImpl::ConsumerImpl
ConsumerImpl(TracingMuxerImpl * muxer,BackendType backend_type,TracingSessionGlobalID session_id)403 TracingMuxerImpl::ConsumerImpl::ConsumerImpl(TracingMuxerImpl* muxer,
404                                              BackendType backend_type,
405                                              TracingSessionGlobalID session_id)
406     : muxer_(muxer), backend_type_(backend_type), session_id_(session_id) {}
407 
~ConsumerImpl()408 TracingMuxerImpl::ConsumerImpl::~ConsumerImpl() {
409   muxer_ = nullptr;
410 }
411 
Initialize(std::unique_ptr<ConsumerEndpoint> endpoint)412 void TracingMuxerImpl::ConsumerImpl::Initialize(
413     std::unique_ptr<ConsumerEndpoint> endpoint) {
414   PERFETTO_DCHECK_THREAD(thread_checker_);
415   service_ = std::move(endpoint);
416   // Don't try to use the service here since it may not have connected yet. See
417   // OnConnect().
418 }
419 
OnConnect()420 void TracingMuxerImpl::ConsumerImpl::OnConnect() {
421   PERFETTO_DCHECK_THREAD(thread_checker_);
422   PERFETTO_DCHECK(!connected_);
423   connected_ = true;
424 
425   // Observe data source instance events so we get notified when tracing starts.
426   service_->ObserveEvents(ObservableEvents::TYPE_DATA_SOURCES_INSTANCES |
427                           ObservableEvents::TYPE_ALL_DATA_SOURCES_STARTED);
428 
429   // If the API client configured and started tracing before we connected,
430   // tell the backend about it now.
431   if (trace_config_)
432     muxer_->SetupTracingSession(session_id_, trace_config_);
433   if (start_pending_)
434     muxer_->StartTracingSession(session_id_);
435   if (get_trace_stats_pending_) {
436     auto callback = std::move(get_trace_stats_callback_);
437     get_trace_stats_callback_ = nullptr;
438     muxer_->GetTraceStats(session_id_, std::move(callback));
439   }
440   if (query_service_state_callback_) {
441     auto callback = std::move(query_service_state_callback_);
442     query_service_state_callback_ = nullptr;
443     muxer_->QueryServiceState(session_id_, std::move(callback));
444   }
445   if (session_to_clone_) {
446     service_->CloneSession(*session_to_clone_);
447     session_to_clone_ = std::nullopt;
448   }
449 
450   if (stop_pending_)
451     muxer_->StopTracingSession(session_id_);
452 }
453 
OnDisconnect()454 void TracingMuxerImpl::ConsumerImpl::OnDisconnect() {
455   PERFETTO_DCHECK_THREAD(thread_checker_);
456   // If we're being destroyed, bail out.
457   if (!muxer_)
458     return;
459 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
460   if (!connected_ && backend_type_ == kSystemBackend) {
461     PERFETTO_ELOG(
462         "Unable to connect to the system tracing service as a consumer. On "
463         "Android, use the \"perfetto\" command line tool instead to start "
464         "system-wide tracing sessions");
465   }
466 #endif  // PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
467 
468   // Notify the client about disconnection.
469   NotifyError(TracingError{TracingError::kDisconnected, "Peer disconnected"});
470 
471   // Make sure the client doesn't hang in a blocking start/stop because of the
472   // disconnection.
473   NotifyStartComplete();
474   NotifyStopComplete();
475 
476   // It shouldn't be necessary to call StopTracingSession. If we get this call
477   // it means that the service did shutdown before us, so there is no point
478   // trying it to ask it to stop the session. We should just remember to cleanup
479   // the consumer vector.
480   connected_ = false;
481 
482   // Notify the muxer that it is safe to destroy |this|. This is needed because
483   // the ConsumerEndpoint stored in |service_| requires that |this| be safe to
484   // access until OnDisconnect() is called.
485   muxer_->OnConsumerDisconnected(this);
486 }
487 
Disconnect()488 void TracingMuxerImpl::ConsumerImpl::Disconnect() {
489   // This is weird and deserves a comment.
490   //
491   // When we called the ConnectConsumer method on the service it returns
492   // us a ConsumerEndpoint which we stored in |service_|, however this
493   // ConsumerEndpoint holds a pointer to the ConsumerImpl pointed to by
494   // |this|. Part of the API contract to TracingService::ConnectConsumer is that
495   // the ConsumerImpl pointer has to be valid until the
496   // ConsumerImpl::OnDisconnect method is called. Therefore we reset the
497   // ConsumerEndpoint |service_|. Eventually this will call
498   // ConsumerImpl::OnDisconnect and we will inform the muxer it is safe to
499   // call the destructor of |this|.
500   service_.reset();
501 }
502 
OnTracingDisabled(const std::string & error)503 void TracingMuxerImpl::ConsumerImpl::OnTracingDisabled(
504     const std::string& error) {
505   PERFETTO_DCHECK_THREAD(thread_checker_);
506   PERFETTO_DCHECK(!stopped_);
507   stopped_ = true;
508 
509   if (!error.empty())
510     NotifyError(TracingError{TracingError::kTracingFailed, error});
511 
512   // If we're still waiting for the start event, fire it now. This may happen if
513   // there are no active data sources in the session.
514   NotifyStartComplete();
515   NotifyStopComplete();
516 }
517 
NotifyStartComplete()518 void TracingMuxerImpl::ConsumerImpl::NotifyStartComplete() {
519   PERFETTO_DCHECK_THREAD(thread_checker_);
520   if (start_complete_callback_) {
521     muxer_->task_runner_->PostTask(std::move(start_complete_callback_));
522     start_complete_callback_ = nullptr;
523   }
524   if (blocking_start_complete_callback_) {
525     muxer_->task_runner_->PostTask(
526         std::move(blocking_start_complete_callback_));
527     blocking_start_complete_callback_ = nullptr;
528   }
529 }
530 
NotifyError(const TracingError & error)531 void TracingMuxerImpl::ConsumerImpl::NotifyError(const TracingError& error) {
532   PERFETTO_DCHECK_THREAD(thread_checker_);
533   if (error_callback_) {
534     muxer_->task_runner_->PostTask(
535         std::bind(std::move(error_callback_), error));
536   }
537 }
538 
NotifyStopComplete()539 void TracingMuxerImpl::ConsumerImpl::NotifyStopComplete() {
540   PERFETTO_DCHECK_THREAD(thread_checker_);
541   if (stop_complete_callback_) {
542     muxer_->task_runner_->PostTask(std::move(stop_complete_callback_));
543     stop_complete_callback_ = nullptr;
544   }
545   if (blocking_stop_complete_callback_) {
546     muxer_->task_runner_->PostTask(std::move(blocking_stop_complete_callback_));
547     blocking_stop_complete_callback_ = nullptr;
548   }
549 }
550 
OnTraceData(std::vector<TracePacket> packets,bool has_more)551 void TracingMuxerImpl::ConsumerImpl::OnTraceData(
552     std::vector<TracePacket> packets,
553     bool has_more) {
554   PERFETTO_DCHECK_THREAD(thread_checker_);
555   if (!read_trace_callback_)
556     return;
557 
558   size_t capacity = 0;
559   for (const auto& packet : packets) {
560     // 16 is an over-estimation of the proto preamble size
561     capacity += packet.size() + 16;
562   }
563 
564   // The shared_ptr is to avoid making a copy of the buffer when PostTask-ing.
565   std::shared_ptr<std::vector<char>> buf(new std::vector<char>());
566   buf->reserve(capacity);
567   for (auto& packet : packets) {
568     char* start;
569     size_t size;
570     std::tie(start, size) = packet.GetProtoPreamble();
571     buf->insert(buf->end(), start, start + size);
572     for (auto& slice : packet.slices()) {
573       const auto* slice_data = reinterpret_cast<const char*>(slice.start);
574       buf->insert(buf->end(), slice_data, slice_data + slice.size);
575     }
576   }
577 
578   auto callback = read_trace_callback_;
579   muxer_->task_runner_->PostTask([callback, buf, has_more] {
580     TracingSession::ReadTraceCallbackArgs callback_arg{};
581     callback_arg.data = buf->empty() ? nullptr : &(*buf)[0];
582     callback_arg.size = buf->size();
583     callback_arg.has_more = has_more;
584     callback(callback_arg);
585   });
586 
587   if (!has_more)
588     read_trace_callback_ = nullptr;
589 }
590 
OnObservableEvents(const ObservableEvents & events)591 void TracingMuxerImpl::ConsumerImpl::OnObservableEvents(
592     const ObservableEvents& events) {
593   if (events.instance_state_changes_size()) {
594     for (const auto& state_change : events.instance_state_changes()) {
595       DataSourceHandle handle{state_change.producer_name(),
596                               state_change.data_source_name()};
597       data_source_states_[handle] =
598           state_change.state() ==
599           ObservableEvents::DATA_SOURCE_INSTANCE_STATE_STARTED;
600     }
601   }
602 
603   if (events.instance_state_changes_size() ||
604       events.all_data_sources_started()) {
605     // Data sources are first reported as being stopped before starting, so once
606     // all the data sources we know about have started we can declare tracing
607     // begun. In the case where there are no matching data sources for the
608     // session, the service will report the all_data_sources_started() event
609     // without adding any instances (only since Android S / Perfetto v10.0).
610     if (start_complete_callback_ || blocking_start_complete_callback_) {
611       bool all_data_sources_started = std::all_of(
612           data_source_states_.cbegin(), data_source_states_.cend(),
613           [](std::pair<DataSourceHandle, bool> state) { return state.second; });
614       if (all_data_sources_started)
615         NotifyStartComplete();
616     }
617   }
618 }
619 
OnSessionCloned(const OnSessionClonedArgs & args)620 void TracingMuxerImpl::ConsumerImpl::OnSessionCloned(
621     const OnSessionClonedArgs& args) {
622   if (!clone_trace_callback_)
623     return;
624   TracingSession::CloneTraceCallbackArgs callback_arg{};
625   callback_arg.success = args.success;
626   callback_arg.error = std::move(args.error);
627   callback_arg.uuid_msb = args.uuid.msb();
628   callback_arg.uuid_lsb = args.uuid.lsb();
629   muxer_->task_runner_->PostTask(
630       std::bind(std::move(clone_trace_callback_), std::move(callback_arg)));
631   clone_trace_callback_ = nullptr;
632 }
633 
OnTraceStats(bool success,const TraceStats & trace_stats)634 void TracingMuxerImpl::ConsumerImpl::OnTraceStats(
635     bool success,
636     const TraceStats& trace_stats) {
637   if (!get_trace_stats_callback_)
638     return;
639   TracingSession::GetTraceStatsCallbackArgs callback_arg{};
640   callback_arg.success = success;
641   callback_arg.trace_stats_data = trace_stats.SerializeAsArray();
642   muxer_->task_runner_->PostTask(
643       std::bind(std::move(get_trace_stats_callback_), std::move(callback_arg)));
644   get_trace_stats_callback_ = nullptr;
645 }
646 
647 // The callbacks below are not used.
OnDetach(bool)648 void TracingMuxerImpl::ConsumerImpl::OnDetach(bool) {}
OnAttach(bool,const TraceConfig &)649 void TracingMuxerImpl::ConsumerImpl::OnAttach(bool, const TraceConfig&) {}
650 // ----- End of TracingMuxerImpl::ConsumerImpl
651 
652 // ----- Begin of TracingMuxerImpl::TracingSessionImpl
653 
654 // TracingSessionImpl is the RAII object returned to API clients when they
655 // invoke Tracing::CreateTracingSession. They use it for starting/stopping
656 // tracing.
657 
TracingSessionImpl(TracingMuxerImpl * muxer,TracingSessionGlobalID session_id,BackendType backend_type)658 TracingMuxerImpl::TracingSessionImpl::TracingSessionImpl(
659     TracingMuxerImpl* muxer,
660     TracingSessionGlobalID session_id,
661     BackendType backend_type)
662     : muxer_(muxer), session_id_(session_id), backend_type_(backend_type) {}
663 
664 // Can be destroyed from any thread.
~TracingSessionImpl()665 TracingMuxerImpl::TracingSessionImpl::~TracingSessionImpl() {
666   auto* muxer = muxer_;
667   auto session_id = session_id_;
668   muxer->task_runner_->PostTask(
669       [muxer, session_id] { muxer->DestroyTracingSession(session_id); });
670 }
671 
672 // Can be called from any thread.
Setup(const TraceConfig & cfg,int fd)673 void TracingMuxerImpl::TracingSessionImpl::Setup(const TraceConfig& cfg,
674                                                  int fd) {
675   auto* muxer = muxer_;
676   auto session_id = session_id_;
677   std::shared_ptr<TraceConfig> trace_config(new TraceConfig(cfg));
678   if (fd >= 0) {
679     base::ignore_result(backend_type_);  // For -Wunused in the amalgamation.
680 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
681     if (backend_type_ != kInProcessBackend) {
682       PERFETTO_FATAL(
683           "Passing a file descriptor to TracingSession::Setup() is only "
684           "supported with the kInProcessBackend on Windows. Use "
685           "TracingSession::ReadTrace() instead");
686     }
687 #endif
688     trace_config->set_write_into_file(true);
689     fd = dup(fd);
690   }
691   muxer->task_runner_->PostTask([muxer, session_id, trace_config, fd] {
692     muxer->SetupTracingSession(session_id, trace_config, base::ScopedFile(fd));
693   });
694 }
695 
696 // Can be called from any thread.
Start()697 void TracingMuxerImpl::TracingSessionImpl::Start() {
698   auto* muxer = muxer_;
699   auto session_id = session_id_;
700   muxer->task_runner_->PostTask(
701       [muxer, session_id] { muxer->StartTracingSession(session_id); });
702 }
703 
CloneTrace(CloneTraceArgs args,CloneTraceCallback cb)704 void TracingMuxerImpl::TracingSessionImpl::CloneTrace(CloneTraceArgs args,
705                                                       CloneTraceCallback cb) {
706   auto* muxer = muxer_;
707   auto session_id = session_id_;
708   muxer->task_runner_->PostTask([muxer, session_id, args, cb] {
709     muxer->CloneTracingSession(session_id, args, std::move(cb));
710   });
711 }
712 
713 // Can be called from any thread.
ChangeTraceConfig(const TraceConfig & cfg)714 void TracingMuxerImpl::TracingSessionImpl::ChangeTraceConfig(
715     const TraceConfig& cfg) {
716   auto* muxer = muxer_;
717   auto session_id = session_id_;
718   muxer->task_runner_->PostTask([muxer, session_id, cfg] {
719     muxer->ChangeTracingSessionConfig(session_id, cfg);
720   });
721 }
722 
723 // Can be called from any thread except the service thread.
StartBlocking()724 void TracingMuxerImpl::TracingSessionImpl::StartBlocking() {
725   PERFETTO_DCHECK(!muxer_->task_runner_->RunsTasksOnCurrentThread());
726   auto* muxer = muxer_;
727   auto session_id = session_id_;
728   base::WaitableEvent tracing_started;
729   muxer->task_runner_->PostTask([muxer, session_id, &tracing_started] {
730     auto* consumer = muxer->FindConsumer(session_id);
731     if (!consumer) {
732       // TODO(skyostil): Signal an error to the user.
733       tracing_started.Notify();
734       return;
735     }
736     PERFETTO_DCHECK(!consumer->blocking_start_complete_callback_);
737     consumer->blocking_start_complete_callback_ = [&] {
738       tracing_started.Notify();
739     };
740     muxer->StartTracingSession(session_id);
741   });
742   tracing_started.Wait();
743 }
744 
745 // Can be called from any thread.
Flush(std::function<void (bool)> user_callback,uint32_t timeout_ms)746 void TracingMuxerImpl::TracingSessionImpl::Flush(
747     std::function<void(bool)> user_callback,
748     uint32_t timeout_ms) {
749   auto* muxer = muxer_;
750   auto session_id = session_id_;
751   muxer->task_runner_->PostTask([muxer, session_id, timeout_ms, user_callback] {
752     auto* consumer = muxer->FindConsumer(session_id);
753     if (!consumer) {
754       std::move(user_callback)(false);
755       return;
756     }
757     muxer->FlushTracingSession(session_id, timeout_ms,
758                                std::move(user_callback));
759   });
760 }
761 
762 // Can be called from any thread.
Stop()763 void TracingMuxerImpl::TracingSessionImpl::Stop() {
764   auto* muxer = muxer_;
765   auto session_id = session_id_;
766   muxer->task_runner_->PostTask(
767       [muxer, session_id] { muxer->StopTracingSession(session_id); });
768 }
769 
770 // Can be called from any thread except the service thread.
StopBlocking()771 void TracingMuxerImpl::TracingSessionImpl::StopBlocking() {
772   PERFETTO_DCHECK(!muxer_->task_runner_->RunsTasksOnCurrentThread());
773   auto* muxer = muxer_;
774   auto session_id = session_id_;
775   base::WaitableEvent tracing_stopped;
776   muxer->task_runner_->PostTask([muxer, session_id, &tracing_stopped] {
777     auto* consumer = muxer->FindConsumer(session_id);
778     if (!consumer) {
779       // TODO(skyostil): Signal an error to the user.
780       tracing_stopped.Notify();
781       return;
782     }
783     PERFETTO_DCHECK(!consumer->blocking_stop_complete_callback_);
784     consumer->blocking_stop_complete_callback_ = [&] {
785       tracing_stopped.Notify();
786     };
787     muxer->StopTracingSession(session_id);
788   });
789   tracing_stopped.Wait();
790 }
791 
792 // Can be called from any thread.
ReadTrace(ReadTraceCallback cb)793 void TracingMuxerImpl::TracingSessionImpl::ReadTrace(ReadTraceCallback cb) {
794   auto* muxer = muxer_;
795   auto session_id = session_id_;
796   muxer->task_runner_->PostTask([muxer, session_id, cb] {
797     muxer->ReadTracingSessionData(session_id, std::move(cb));
798   });
799 }
800 
801 // Can be called from any thread.
SetOnStartCallback(std::function<void ()> cb)802 void TracingMuxerImpl::TracingSessionImpl::SetOnStartCallback(
803     std::function<void()> cb) {
804   auto* muxer = muxer_;
805   auto session_id = session_id_;
806   muxer->task_runner_->PostTask([muxer, session_id, cb] {
807     auto* consumer = muxer->FindConsumer(session_id);
808     if (!consumer)
809       return;
810     consumer->start_complete_callback_ = cb;
811   });
812 }
813 
814 // Can be called from any thread
SetOnErrorCallback(std::function<void (TracingError)> cb)815 void TracingMuxerImpl::TracingSessionImpl::SetOnErrorCallback(
816     std::function<void(TracingError)> cb) {
817   auto* muxer = muxer_;
818   auto session_id = session_id_;
819   muxer->task_runner_->PostTask([muxer, session_id, cb] {
820     auto* consumer = muxer->FindConsumer(session_id);
821     if (!consumer) {
822       // Notify the client about concurrent disconnection of the session.
823       if (cb)
824         cb(TracingError{TracingError::kDisconnected, "Peer disconnected"});
825       return;
826     }
827     consumer->error_callback_ = cb;
828   });
829 }
830 
831 // Can be called from any thread.
SetOnStopCallback(std::function<void ()> cb)832 void TracingMuxerImpl::TracingSessionImpl::SetOnStopCallback(
833     std::function<void()> cb) {
834   auto* muxer = muxer_;
835   auto session_id = session_id_;
836   muxer->task_runner_->PostTask([muxer, session_id, cb] {
837     auto* consumer = muxer->FindConsumer(session_id);
838     if (!consumer)
839       return;
840     consumer->stop_complete_callback_ = cb;
841   });
842 }
843 
844 // Can be called from any thread.
GetTraceStats(GetTraceStatsCallback cb)845 void TracingMuxerImpl::TracingSessionImpl::GetTraceStats(
846     GetTraceStatsCallback cb) {
847   auto* muxer = muxer_;
848   auto session_id = session_id_;
849   muxer->task_runner_->PostTask([muxer, session_id, cb] {
850     muxer->GetTraceStats(session_id, std::move(cb));
851   });
852 }
853 
854 // Can be called from any thread.
QueryServiceState(QueryServiceStateCallback cb)855 void TracingMuxerImpl::TracingSessionImpl::QueryServiceState(
856     QueryServiceStateCallback cb) {
857   auto* muxer = muxer_;
858   auto session_id = session_id_;
859   muxer->task_runner_->PostTask([muxer, session_id, cb] {
860     muxer->QueryServiceState(session_id, std::move(cb));
861   });
862 }
863 
864 // ----- End of TracingMuxerImpl::TracingSessionImpl
865 
866 // ----- Begin of TracingMuxerImpl::StartupTracingSessionImpl
867 
StartupTracingSessionImpl(TracingMuxerImpl * muxer,TracingSessionGlobalID session_id,BackendType backend_type)868 TracingMuxerImpl::StartupTracingSessionImpl::StartupTracingSessionImpl(
869     TracingMuxerImpl* muxer,
870     TracingSessionGlobalID session_id,
871     BackendType backend_type)
872     : muxer_(muxer), session_id_(session_id), backend_type_(backend_type) {}
873 
874 // Can be destroyed from any thread.
875 TracingMuxerImpl::StartupTracingSessionImpl::~StartupTracingSessionImpl() =
876     default;
877 
Abort()878 void TracingMuxerImpl::StartupTracingSessionImpl::Abort() {
879   auto* muxer = muxer_;
880   auto session_id = session_id_;
881   auto backend_type = backend_type_;
882   muxer->task_runner_->PostTask([muxer, session_id, backend_type] {
883     muxer->AbortStartupTracingSession(session_id, backend_type);
884   });
885 }
886 
887 // Must not be called from the SDK's internal thread.
AbortBlocking()888 void TracingMuxerImpl::StartupTracingSessionImpl::AbortBlocking() {
889   auto* muxer = muxer_;
890   auto session_id = session_id_;
891   auto backend_type = backend_type_;
892   PERFETTO_CHECK(!muxer->task_runner_->RunsTasksOnCurrentThread());
893   base::WaitableEvent event;
894   muxer->task_runner_->PostTask([muxer, session_id, backend_type, &event] {
895     muxer->AbortStartupTracingSession(session_id, backend_type);
896     event.Notify();
897   });
898   event.Wait();
899 }
900 
901 // ----- End of TracingMuxerImpl::StartupTracingSessionImpl
902 
903 // static
904 TracingMuxer* TracingMuxer::instance_ = TracingMuxerFake::Get();
905 
906 // This is called by perfetto::Tracing::Initialize().
907 // Can be called on any thread. Typically, but not necessarily, that will be
908 // the embedder's main thread.
TracingMuxerImpl(const TracingInitArgs & args)909 TracingMuxerImpl::TracingMuxerImpl(const TracingInitArgs& args)
910     : TracingMuxer(args.platform ? args.platform
911                                  : Platform::GetDefaultPlatform()) {
912   PERFETTO_DETACH_FROM_THREAD(thread_checker_);
913   instance_ = this;
914 
915   // Create the thread where muxer, producers and service will live.
916   Platform::CreateTaskRunnerArgs tr_args{/*name_for_debugging=*/"TracingMuxer"};
917   task_runner_.reset(new NonReentrantTaskRunner(
918       this, platform_->CreateTaskRunner(std::move(tr_args))));
919 
920   // Run the initializer on that thread.
921   task_runner_->PostTask([this, args] {
922     Initialize(args);
923     AddBackends(args);
924   });
925 }
926 
Initialize(const TracingInitArgs & args)927 void TracingMuxerImpl::Initialize(const TracingInitArgs& args) {
928   PERFETTO_DCHECK_THREAD(thread_checker_);  // Rebind the thread checker.
929 
930   policy_ = args.tracing_policy;
931   supports_multiple_data_source_instances_ =
932       args.supports_multiple_data_source_instances;
933 
934   // Fallback backend for producer creation for an unsupported backend type.
935   PERFETTO_CHECK(producer_backends_.empty());
936   AddProducerBackend(internal::TracingBackendFake::GetInstance(),
937                      BackendType::kUnspecifiedBackend, args);
938   // Fallback backend for consumer creation for an unsupported backend type.
939   // This backend simply fails any attempt to start a tracing session.
940   PERFETTO_CHECK(consumer_backends_.empty());
941   AddConsumerBackend(internal::TracingBackendFake::GetInstance(),
942                      BackendType::kUnspecifiedBackend);
943 }
944 
AddConsumerBackend(TracingConsumerBackend * backend,BackendType type)945 void TracingMuxerImpl::AddConsumerBackend(TracingConsumerBackend* backend,
946                                           BackendType type) {
947   if (!backend) {
948     // We skip the log in release builds because the *_backend_fake.cc code
949     // has already an ELOG before returning a nullptr.
950     PERFETTO_DLOG("Consumer backend creation failed, type %d",
951                   static_cast<int>(type));
952     return;
953   }
954   // Keep the backends sorted by type.
955   auto it =
956       std::upper_bound(consumer_backends_.begin(), consumer_backends_.end(),
957                        type, CompareBackendByType<RegisteredConsumerBackend>());
958   it = consumer_backends_.emplace(it);
959 
960   RegisteredConsumerBackend& rb = *it;
961   rb.backend = backend;
962   rb.type = type;
963 }
964 
AddProducerBackend(TracingProducerBackend * backend,BackendType type,const TracingInitArgs & args)965 void TracingMuxerImpl::AddProducerBackend(TracingProducerBackend* backend,
966                                           BackendType type,
967                                           const TracingInitArgs& args) {
968   if (!backend) {
969     // We skip the log in release builds because the *_backend_fake.cc code
970     // has already an ELOG before returning a nullptr.
971     PERFETTO_DLOG("Producer backend creation failed, type %d",
972                   static_cast<int>(type));
973     return;
974   }
975   TracingBackendId backend_id = producer_backends_.size();
976   // Keep the backends sorted by type.
977   auto it =
978       std::upper_bound(producer_backends_.begin(), producer_backends_.end(),
979                        type, CompareBackendByType<RegisteredProducerBackend>());
980   it = producer_backends_.emplace(it);
981 
982   RegisteredProducerBackend& rb = *it;
983   rb.backend = backend;
984   rb.id = backend_id;
985   rb.type = type;
986   rb.producer.reset(new ProducerImpl(this, backend_id,
987                                      args.shmem_batch_commits_duration_ms,
988                                      args.shmem_direct_patching_enabled));
989   rb.producer_conn_args.producer = rb.producer.get();
990   rb.producer_conn_args.producer_name = platform_->GetCurrentProcessName();
991   rb.producer_conn_args.task_runner = task_runner_.get();
992   rb.producer_conn_args.shmem_size_hint_bytes = args.shmem_size_hint_kb * 1024;
993   rb.producer_conn_args.shmem_page_size_hint_bytes =
994       args.shmem_page_size_hint_kb * 1024;
995   rb.producer_conn_args.create_socket_async = args.create_socket_async;
996   rb.producer->Initialize(rb.backend->ConnectProducer(rb.producer_conn_args));
997 }
998 
999 TracingMuxerImpl::RegisteredProducerBackend*
FindProducerBackendById(TracingBackendId id)1000 TracingMuxerImpl::FindProducerBackendById(TracingBackendId id) {
1001   for (RegisteredProducerBackend& b : producer_backends_) {
1002     if (b.id == id) {
1003       return &b;
1004     }
1005   }
1006   return nullptr;
1007 }
1008 
1009 TracingMuxerImpl::RegisteredProducerBackend*
FindProducerBackendByType(BackendType type)1010 TracingMuxerImpl::FindProducerBackendByType(BackendType type) {
1011   for (RegisteredProducerBackend& b : producer_backends_) {
1012     if (b.type == type) {
1013       return &b;
1014     }
1015   }
1016   return nullptr;
1017 }
1018 
1019 TracingMuxerImpl::RegisteredConsumerBackend*
FindConsumerBackendByType(BackendType type)1020 TracingMuxerImpl::FindConsumerBackendByType(BackendType type) {
1021   for (RegisteredConsumerBackend& b : consumer_backends_) {
1022     if (b.type == type) {
1023       return &b;
1024     }
1025   }
1026   return nullptr;
1027 }
1028 
AddBackends(const TracingInitArgs & args)1029 void TracingMuxerImpl::AddBackends(const TracingInitArgs& args) {
1030   if (args.backends & kSystemBackend) {
1031     PERFETTO_CHECK(args.system_producer_backend_factory_);
1032     if (FindProducerBackendByType(kSystemBackend) == nullptr) {
1033       AddProducerBackend(args.system_producer_backend_factory_(),
1034                          kSystemBackend, args);
1035     }
1036     if (args.enable_system_consumer) {
1037       PERFETTO_CHECK(args.system_consumer_backend_factory_);
1038       if (FindConsumerBackendByType(kSystemBackend) == nullptr) {
1039         AddConsumerBackend(args.system_consumer_backend_factory_(),
1040                            kSystemBackend);
1041       }
1042     }
1043   }
1044 
1045   if (args.backends & kInProcessBackend) {
1046     TracingBackend* b = nullptr;
1047     if (FindProducerBackendByType(kInProcessBackend) == nullptr) {
1048       if (!b) {
1049         PERFETTO_CHECK(args.in_process_backend_factory_);
1050         b = args.in_process_backend_factory_();
1051       }
1052       AddProducerBackend(b, kInProcessBackend, args);
1053     }
1054     if (FindConsumerBackendByType(kInProcessBackend) == nullptr) {
1055       if (!b) {
1056         PERFETTO_CHECK(args.in_process_backend_factory_);
1057         b = args.in_process_backend_factory_();
1058       }
1059       AddConsumerBackend(b, kInProcessBackend);
1060     }
1061   }
1062 
1063   if (args.backends & kCustomBackend) {
1064     PERFETTO_CHECK(args.custom_backend);
1065     if (FindProducerBackendByType(kCustomBackend) == nullptr) {
1066       AddProducerBackend(args.custom_backend, kCustomBackend, args);
1067     }
1068     if (FindConsumerBackendByType(kCustomBackend) == nullptr) {
1069       AddConsumerBackend(args.custom_backend, kCustomBackend);
1070     }
1071   }
1072 
1073   if (args.backends & ~(kSystemBackend | kInProcessBackend | kCustomBackend)) {
1074     PERFETTO_FATAL("Unsupported tracing backend type");
1075   }
1076 }
1077 
1078 // Can be called from any thread (but not concurrently).
RegisterDataSource(const DataSourceDescriptor & descriptor,DataSourceFactory factory,DataSourceParams params,bool no_flush,DataSourceStaticState * static_state)1079 bool TracingMuxerImpl::RegisterDataSource(
1080     const DataSourceDescriptor& descriptor,
1081     DataSourceFactory factory,
1082     DataSourceParams params,
1083     bool no_flush,
1084     DataSourceStaticState* static_state) {
1085   // Ignore repeated registrations.
1086   if (static_state->index != kMaxDataSources)
1087     return true;
1088 
1089   uint32_t new_index = next_data_source_index_++;
1090   if (new_index >= kMaxDataSources) {
1091     PERFETTO_DLOG(
1092         "RegisterDataSource failed: too many data sources already registered");
1093     return false;
1094   }
1095 
1096   // Initialize the static state.
1097   static_assert(sizeof(static_state->instances[0]) >= sizeof(DataSourceState),
1098                 "instances[] size mismatch");
1099   for (size_t i = 0; i < static_state->instances.size(); i++)
1100     new (&static_state->instances[i]) DataSourceState{};
1101 
1102   static_state->index = new_index;
1103 
1104   // Generate a semi-unique id for this data source.
1105   base::Hasher hash;
1106   hash.Update(reinterpret_cast<intptr_t>(static_state));
1107   hash.Update(base::GetWallTimeNs().count());
1108   static_state->id = hash.digest() ? hash.digest() : 1;
1109 
1110   task_runner_->PostTask([this, descriptor, factory, static_state, params,
1111                           no_flush] {
1112     data_sources_.emplace_back();
1113     RegisteredDataSource& rds = data_sources_.back();
1114     rds.descriptor = descriptor;
1115     rds.factory = factory;
1116     rds.supports_multiple_instances =
1117         supports_multiple_data_source_instances_ &&
1118         params.supports_multiple_instances;
1119     rds.requires_callbacks_under_lock = params.requires_callbacks_under_lock;
1120     rds.static_state = static_state;
1121     rds.no_flush = no_flush;
1122 
1123     UpdateDataSourceOnAllBackends(rds, /*is_changed=*/false);
1124   });
1125   return true;
1126 }
1127 
1128 // Can be called from any thread (but not concurrently).
UpdateDataSourceDescriptor(const DataSourceDescriptor & descriptor,const DataSourceStaticState * static_state)1129 void TracingMuxerImpl::UpdateDataSourceDescriptor(
1130     const DataSourceDescriptor& descriptor,
1131     const DataSourceStaticState* static_state) {
1132   task_runner_->PostTask([this, descriptor, static_state] {
1133     for (auto& rds : data_sources_) {
1134       if (rds.static_state == static_state) {
1135         PERFETTO_CHECK(rds.descriptor.name() == descriptor.name());
1136         rds.descriptor = descriptor;
1137         rds.descriptor.set_id(static_state->id);
1138         UpdateDataSourceOnAllBackends(rds, /*is_changed=*/true);
1139         return;
1140       }
1141     }
1142   });
1143 }
1144 
1145 // Can be called from any thread (but not concurrently).
RegisterInterceptor(const InterceptorDescriptor & descriptor,InterceptorFactory factory,InterceptorBase::TLSFactory tls_factory,InterceptorBase::TracePacketCallback packet_callback)1146 void TracingMuxerImpl::RegisterInterceptor(
1147     const InterceptorDescriptor& descriptor,
1148     InterceptorFactory factory,
1149     InterceptorBase::TLSFactory tls_factory,
1150     InterceptorBase::TracePacketCallback packet_callback) {
1151   task_runner_->PostTask([this, descriptor, factory, tls_factory,
1152                           packet_callback] {
1153     // Ignore repeated registrations.
1154     for (const auto& interceptor : interceptors_) {
1155       if (interceptor.descriptor.name() == descriptor.name()) {
1156         PERFETTO_DCHECK(interceptor.tls_factory == tls_factory);
1157         PERFETTO_DCHECK(interceptor.packet_callback == packet_callback);
1158         return;
1159       }
1160     }
1161     // Only allow certain interceptors for now.
1162     if (descriptor.name() != "test_interceptor" &&
1163         descriptor.name() != "console" && descriptor.name() != "etwexport") {
1164       PERFETTO_ELOG(
1165           "Interceptors are experimental. If you want to use them, please "
1166           "get in touch with the project maintainers "
1167           "(https://perfetto.dev/docs/contributing/"
1168           "getting-started#community).");
1169       return;
1170     }
1171     interceptors_.emplace_back();
1172     RegisteredInterceptor& interceptor = interceptors_.back();
1173     interceptor.descriptor = descriptor;
1174     interceptor.factory = factory;
1175     interceptor.tls_factory = tls_factory;
1176     interceptor.packet_callback = packet_callback;
1177   });
1178 }
1179 
ActivateTriggers(const std::vector<std::string> & triggers,uint32_t ttl_ms)1180 void TracingMuxerImpl::ActivateTriggers(
1181     const std::vector<std::string>& triggers,
1182     uint32_t ttl_ms) {
1183   base::TimeMillis expire_time =
1184       base::GetWallTimeMs() + base::TimeMillis(ttl_ms);
1185   task_runner_->PostTask([this, triggers, expire_time] {
1186     for (RegisteredProducerBackend& backend : producer_backends_) {
1187       if (backend.producer->connected_) {
1188         backend.producer->service_->ActivateTriggers(triggers);
1189       } else {
1190         for (const std::string& trigger : triggers) {
1191           backend.producer->on_connect_triggers_.emplace_back(trigger,
1192                                                               expire_time);
1193         }
1194       }
1195     }
1196   });
1197 }
1198 
1199 // Checks if there is any matching startup tracing data source instance for a
1200 // new SetupDataSource call. If so, moves the data source to this tracing
1201 // session (and its target buffer) and returns true, otherwise returns false.
MaybeAdoptStartupTracingInDataSource(TracingBackendId backend_id,uint32_t backend_connection_id,DataSourceInstanceID instance_id,const DataSourceConfig & cfg,const std::vector<RegisteredDataSource> & data_sources)1202 static bool MaybeAdoptStartupTracingInDataSource(
1203     TracingBackendId backend_id,
1204     uint32_t backend_connection_id,
1205     DataSourceInstanceID instance_id,
1206     const DataSourceConfig& cfg,
1207     const std::vector<RegisteredDataSource>& data_sources) {
1208   for (const auto& rds : data_sources) {
1209     DataSourceStaticState* static_state = rds.static_state;
1210     for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
1211       auto* internal_state = static_state->TryGet(i);
1212 
1213       if (internal_state &&
1214           internal_state->startup_target_buffer_reservation.load(
1215               std::memory_order_relaxed) &&
1216           internal_state->data_source_instance_id == 0 &&
1217           internal_state->backend_id == backend_id &&
1218           internal_state->backend_connection_id == backend_connection_id &&
1219           internal_state->config &&
1220           internal_state->data_source->CanAdoptStartupSession(
1221               *internal_state->config, cfg)) {
1222         PERFETTO_DLOG("Setting up data source %" PRIu64
1223                       " %s by adopting it from a startup tracing session",
1224                       instance_id, cfg.name().c_str());
1225 
1226         std::lock_guard<std::recursive_mutex> lock(internal_state->lock);
1227         // Set the associations. The actual takeover happens in
1228         // StartDataSource().
1229         internal_state->data_source_instance_id = instance_id;
1230         internal_state->buffer_id =
1231             static_cast<internal::BufferId>(cfg.target_buffer());
1232         internal_state->config.reset(new DataSourceConfig(cfg));
1233 
1234         // TODO(eseckler): Should the data souce config provided by the service
1235         // be allowed to specify additional interceptors / additional data
1236         // source params?
1237 
1238         return true;
1239       }
1240     }
1241   }
1242   return false;
1243 }
1244 
1245 // Called by the service of one of the backends.
SetupDataSource(TracingBackendId backend_id,uint32_t backend_connection_id,DataSourceInstanceID instance_id,const DataSourceConfig & cfg)1246 void TracingMuxerImpl::SetupDataSource(TracingBackendId backend_id,
1247                                        uint32_t backend_connection_id,
1248                                        DataSourceInstanceID instance_id,
1249                                        const DataSourceConfig& cfg) {
1250   PERFETTO_DLOG("Setting up data source %" PRIu64 " %s", instance_id,
1251                 cfg.name().c_str());
1252   PERFETTO_DCHECK_THREAD(thread_checker_);
1253 
1254   // First check if there is any matching startup tracing data source instance.
1255   if (MaybeAdoptStartupTracingInDataSource(backend_id, backend_connection_id,
1256                                            instance_id, cfg, data_sources_)) {
1257     return;
1258   }
1259 
1260   for (const auto& rds : data_sources_) {
1261     if (rds.descriptor.name() != cfg.name())
1262       continue;
1263     DataSourceStaticState& static_state = *rds.static_state;
1264 
1265     // If this data source is already active for this exact config, don't start
1266     // another instance. This happens when we have several data sources with the
1267     // same name, in which case the service sends one SetupDataSource event for
1268     // each one. Since we can't map which event maps to which data source, we
1269     // ensure each event only starts one data source instance.
1270     // TODO(skyostil): Register a unique id with each data source to the service
1271     // to disambiguate.
1272     bool active_for_config = false;
1273     for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
1274       if (!static_state.TryGet(i))
1275         continue;
1276       auto* internal_state =
1277           reinterpret_cast<DataSourceState*>(&static_state.instances[i]);
1278       if (internal_state->backend_id == backend_id &&
1279           internal_state->backend_connection_id == backend_connection_id &&
1280           internal_state->config && *internal_state->config == cfg) {
1281         active_for_config = true;
1282         break;
1283       }
1284     }
1285     if (active_for_config) {
1286       PERFETTO_DLOG(
1287           "Data source %s is already active with this config, skipping",
1288           cfg.name().c_str());
1289       continue;
1290     }
1291 
1292     SetupDataSourceImpl(rds, backend_id, backend_connection_id, instance_id,
1293                         cfg, /*startup_session_id=*/0);
1294     return;
1295   }
1296 }
1297 
SetupDataSourceImpl(const RegisteredDataSource & rds,TracingBackendId backend_id,uint32_t backend_connection_id,DataSourceInstanceID instance_id,const DataSourceConfig & cfg,TracingSessionGlobalID startup_session_id)1298 TracingMuxerImpl::FindDataSourceRes TracingMuxerImpl::SetupDataSourceImpl(
1299     const RegisteredDataSource& rds,
1300     TracingBackendId backend_id,
1301     uint32_t backend_connection_id,
1302     DataSourceInstanceID instance_id,
1303     const DataSourceConfig& cfg,
1304     TracingSessionGlobalID startup_session_id) {
1305   PERFETTO_DCHECK_THREAD(thread_checker_);
1306   DataSourceStaticState& static_state = *rds.static_state;
1307 
1308   // If any bit is set in `static_state.valid_instances` then at least one
1309   // other instance of data source is running.
1310   if (!rds.supports_multiple_instances &&
1311       static_state.valid_instances.load(std::memory_order_acquire) != 0) {
1312     PERFETTO_ELOG(
1313         "Failed to setup data source because some another instance of this "
1314         "data source is already active");
1315     return FindDataSourceRes();
1316   }
1317 
1318   for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
1319     // Find a free slot.
1320     if (static_state.TryGet(i))
1321       continue;
1322 
1323     auto* internal_state =
1324         reinterpret_cast<DataSourceState*>(&static_state.instances[i]);
1325     std::unique_lock<std::recursive_mutex> lock(internal_state->lock);
1326     static_assert(
1327         std::is_same<decltype(internal_state->data_source_instance_id),
1328                      DataSourceInstanceID>::value,
1329         "data_source_instance_id type mismatch");
1330     internal_state->muxer_id_for_testing = muxer_id_for_testing_;
1331     RegisteredProducerBackend& backend = *FindProducerBackendById(backend_id);
1332 
1333     if (startup_session_id) {
1334       uint16_t& last_reservation =
1335           backend.producer->last_startup_target_buffer_reservation_;
1336       if (last_reservation == std::numeric_limits<uint16_t>::max()) {
1337         PERFETTO_ELOG(
1338             "Startup buffer reservations exhausted, dropping data source");
1339         return FindDataSourceRes();
1340       }
1341       internal_state->startup_target_buffer_reservation.store(
1342           ++last_reservation, std::memory_order_relaxed);
1343     } else {
1344       internal_state->startup_target_buffer_reservation.store(
1345           0, std::memory_order_relaxed);
1346     }
1347 
1348     internal_state->backend_id = backend_id;
1349     internal_state->backend_connection_id = backend_connection_id;
1350     internal_state->data_source_instance_id = instance_id;
1351     internal_state->buffer_id =
1352         static_cast<internal::BufferId>(cfg.target_buffer());
1353     internal_state->config.reset(new DataSourceConfig(cfg));
1354     internal_state->startup_session_id = startup_session_id;
1355     internal_state->data_source = rds.factory();
1356     internal_state->interceptor = nullptr;
1357     internal_state->interceptor_id = 0;
1358     internal_state->will_notify_on_stop = rds.descriptor.will_notify_on_stop();
1359 
1360     if (cfg.has_interceptor_config()) {
1361       for (size_t j = 0; j < interceptors_.size(); j++) {
1362         if (cfg.interceptor_config().name() ==
1363             interceptors_[j].descriptor.name()) {
1364           PERFETTO_DLOG("Intercepting data source %" PRIu64
1365                         " \"%s\" into \"%s\"",
1366                         instance_id, cfg.name().c_str(),
1367                         cfg.interceptor_config().name().c_str());
1368           internal_state->interceptor_id = static_cast<uint32_t>(j + 1);
1369           internal_state->interceptor = interceptors_[j].factory();
1370           internal_state->interceptor->OnSetup({cfg});
1371           break;
1372         }
1373       }
1374       if (!internal_state->interceptor_id) {
1375         PERFETTO_ELOG("Unknown interceptor configured for data source: %s",
1376                       cfg.interceptor_config().name().c_str());
1377       }
1378     }
1379 
1380     // This must be made at the end. See matching acquire-load in
1381     // DataSource::Trace().
1382     static_state.valid_instances.fetch_or(1 << i, std::memory_order_release);
1383 
1384     DataSourceBase::SetupArgs setup_args;
1385     setup_args.config = &cfg;
1386     setup_args.backend_type = backend.type;
1387     setup_args.internal_instance_index = i;
1388 
1389     if (!rds.requires_callbacks_under_lock)
1390       lock.unlock();
1391     internal_state->data_source->OnSetup(setup_args);
1392 
1393     return FindDataSourceRes(&static_state, internal_state, i,
1394                              rds.requires_callbacks_under_lock);
1395   }
1396   PERFETTO_ELOG(
1397       "Maximum number of data source instances exhausted. "
1398       "Dropping data source %" PRIu64,
1399       instance_id);
1400   return FindDataSourceRes();
1401 }
1402 
1403 // Called by the service of one of the backends.
StartDataSource(TracingBackendId backend_id,DataSourceInstanceID instance_id)1404 void TracingMuxerImpl::StartDataSource(TracingBackendId backend_id,
1405                                        DataSourceInstanceID instance_id) {
1406   PERFETTO_DLOG("Starting data source %" PRIu64, instance_id);
1407   PERFETTO_DCHECK_THREAD(thread_checker_);
1408 
1409   auto ds = FindDataSource(backend_id, instance_id);
1410   if (!ds) {
1411     PERFETTO_ELOG("Could not find data source to start");
1412     return;
1413   }
1414 
1415   // Check if the data source was already started for startup tracing.
1416   uint16_t startup_reservation =
1417       ds.internal_state->startup_target_buffer_reservation.load(
1418           std::memory_order_relaxed);
1419   if (startup_reservation) {
1420     RegisteredProducerBackend& backend = *FindProducerBackendById(backend_id);
1421     TracingSessionGlobalID session_id = ds.internal_state->startup_session_id;
1422     auto session_it = std::find_if(
1423         backend.startup_sessions.begin(), backend.startup_sessions.end(),
1424         [session_id](const RegisteredStartupSession& session) {
1425           return session.session_id == session_id;
1426         });
1427     PERFETTO_DCHECK(session_it != backend.startup_sessions.end());
1428 
1429     if (session_it->is_aborting) {
1430       PERFETTO_DLOG("Data source %" PRIu64
1431                     " was already aborted for startup tracing, not starting it",
1432                     instance_id);
1433       return;
1434     }
1435 
1436     PERFETTO_DLOG(
1437         "Data source %" PRIu64
1438         " was already started for startup tracing, binding its target buffer",
1439         instance_id);
1440 
1441     backend.producer->service_->MaybeSharedMemoryArbiter()
1442         ->BindStartupTargetBuffer(startup_reservation,
1443                                   ds.internal_state->buffer_id);
1444 
1445     // The reservation ID can be used even after binding it, so there's no need
1446     // for any barriers here - we just need atomicity.
1447     ds.internal_state->startup_target_buffer_reservation.store(
1448         0, std::memory_order_relaxed);
1449 
1450     // TODO(eseckler): Should we reset incremental state at this point, or
1451     // notify the data source some other way?
1452 
1453     // The session should not have been fully bound yet (or aborted).
1454     PERFETTO_DCHECK(session_it->num_unbound_data_sources > 0);
1455 
1456     session_it->num_unbound_data_sources--;
1457     if (session_it->num_unbound_data_sources == 0) {
1458       if (session_it->on_adopted)
1459         task_runner_->PostTask(session_it->on_adopted);
1460       backend.startup_sessions.erase(session_it);
1461     }
1462     return;
1463   }
1464 
1465   StartDataSourceImpl(ds);
1466 }
1467 
StartDataSourceImpl(const FindDataSourceRes & ds)1468 void TracingMuxerImpl::StartDataSourceImpl(const FindDataSourceRes& ds) {
1469   PERFETTO_DCHECK_THREAD(thread_checker_);
1470 
1471   DataSourceBase::StartArgs start_args{};
1472   start_args.internal_instance_index = ds.instance_idx;
1473 
1474   std::unique_lock<std::recursive_mutex> lock(ds.internal_state->lock);
1475   if (ds.internal_state->interceptor)
1476     ds.internal_state->interceptor->OnStart({});
1477   ds.internal_state->trace_lambda_enabled.store(true,
1478                                                 std::memory_order_relaxed);
1479   PERFETTO_DCHECK(ds.internal_state->data_source != nullptr);
1480 
1481   if (!ds.requires_callbacks_under_lock)
1482     lock.unlock();
1483   ds.internal_state->data_source->OnStart(start_args);
1484 }
1485 
1486 // Called by the service of one of the backends.
StopDataSource_AsyncBegin(TracingBackendId backend_id,DataSourceInstanceID instance_id)1487 void TracingMuxerImpl::StopDataSource_AsyncBegin(
1488     TracingBackendId backend_id,
1489     DataSourceInstanceID instance_id) {
1490   PERFETTO_DLOG("Stopping data source %" PRIu64, instance_id);
1491   PERFETTO_DCHECK_THREAD(thread_checker_);
1492 
1493   auto ds = FindDataSource(backend_id, instance_id);
1494   if (!ds) {
1495     PERFETTO_ELOG("Could not find data source to stop");
1496     return;
1497   }
1498 
1499   StopDataSource_AsyncBeginImpl(ds);
1500 }
1501 
StopDataSource_AsyncBeginImpl(const FindDataSourceRes & ds)1502 void TracingMuxerImpl::StopDataSource_AsyncBeginImpl(
1503     const FindDataSourceRes& ds) {
1504   TracingBackendId backend_id = ds.internal_state->backend_id;
1505   uint32_t backend_connection_id = ds.internal_state->backend_connection_id;
1506   DataSourceInstanceID instance_id = ds.internal_state->data_source_instance_id;
1507 
1508   StopArgsImpl stop_args{};
1509   stop_args.internal_instance_index = ds.instance_idx;
1510   stop_args.async_stop_closure = [this, backend_id, backend_connection_id,
1511                                   instance_id, ds] {
1512     // TracingMuxerImpl is long lived, capturing |this| is okay.
1513     // The notification closure can be moved out of the StopArgs by the
1514     // embedder to handle stop asynchronously. The embedder might then
1515     // call the closure on a different thread than the current one, hence
1516     // this nested PostTask().
1517     task_runner_->PostTask(
1518         [this, backend_id, backend_connection_id, instance_id, ds] {
1519           StopDataSource_AsyncEnd(backend_id, backend_connection_id,
1520                                   instance_id, ds);
1521         });
1522   };
1523 
1524   {
1525     std::unique_lock<std::recursive_mutex> lock(ds.internal_state->lock);
1526 
1527     // Don't call OnStop again if the datasource is already stopping.
1528     if (ds.internal_state->async_stop_in_progress)
1529       return;
1530     ds.internal_state->async_stop_in_progress = true;
1531 
1532     if (ds.internal_state->interceptor)
1533       ds.internal_state->interceptor->OnStop({});
1534 
1535     if (!ds.requires_callbacks_under_lock)
1536       lock.unlock();
1537     ds.internal_state->data_source->OnStop(stop_args);
1538   }
1539 
1540   // If the embedder hasn't called StopArgs.HandleStopAsynchronously() run the
1541   // async closure here. In theory we could avoid the PostTask and call
1542   // straight into CompleteDataSourceAsyncStop(). We keep that to reduce
1543   // divergencies between the deferred-stop vs non-deferred-stop code paths.
1544   if (stop_args.async_stop_closure)
1545     std::move(stop_args.async_stop_closure)();
1546 }
1547 
StopDataSource_AsyncEnd(TracingBackendId backend_id,uint32_t backend_connection_id,DataSourceInstanceID instance_id,const FindDataSourceRes & ds)1548 void TracingMuxerImpl::StopDataSource_AsyncEnd(TracingBackendId backend_id,
1549                                                uint32_t backend_connection_id,
1550                                                DataSourceInstanceID instance_id,
1551                                                const FindDataSourceRes& ds) {
1552   PERFETTO_DLOG("Ending async stop of data source %" PRIu64, instance_id);
1553   PERFETTO_DCHECK_THREAD(thread_checker_);
1554 
1555   // Check that the data source instance is still active and was not modified
1556   // while it was being stopped.
1557   if (!ds.static_state->TryGet(ds.instance_idx) ||
1558       ds.internal_state->backend_id != backend_id ||
1559       ds.internal_state->backend_connection_id != backend_connection_id ||
1560       ds.internal_state->data_source_instance_id != instance_id) {
1561     PERFETTO_ELOG(
1562         "Async stop of data source %" PRIu64
1563         " failed. This might be due to calling the async_stop_closure twice.",
1564         instance_id);
1565     return;
1566   }
1567 
1568   const uint32_t mask = ~(1 << ds.instance_idx);
1569   ds.static_state->valid_instances.fetch_and(mask, std::memory_order_acq_rel);
1570 
1571   bool will_notify_on_stop;
1572   // Take the mutex to prevent that the data source is in the middle of
1573   // a Trace() execution where it called GetDataSourceLocked() while we
1574   // destroy it.
1575   uint16_t startup_buffer_reservation;
1576   TracingSessionGlobalID startup_session_id;
1577   {
1578     std::lock_guard<std::recursive_mutex> guard(ds.internal_state->lock);
1579     ds.internal_state->trace_lambda_enabled.store(false,
1580                                                   std::memory_order_relaxed);
1581     ds.internal_state->data_source.reset();
1582     ds.internal_state->interceptor.reset();
1583     ds.internal_state->config.reset();
1584     ds.internal_state->async_stop_in_progress = false;
1585     will_notify_on_stop = ds.internal_state->will_notify_on_stop;
1586     startup_buffer_reservation =
1587         ds.internal_state->startup_target_buffer_reservation.load(
1588             std::memory_order_relaxed);
1589     startup_session_id = ds.internal_state->startup_session_id;
1590   }
1591 
1592   // The other fields of internal_state are deliberately *not* cleared.
1593   // See races-related comments of DataSource::Trace().
1594 
1595   TracingMuxer::generation_++;
1596 
1597   // |producer_backends_| is append-only, Backend instances are always valid.
1598   PERFETTO_CHECK(backend_id < producer_backends_.size());
1599   RegisteredProducerBackend& backend = *FindProducerBackendById(backend_id);
1600   ProducerImpl* producer = backend.producer.get();
1601   if (!producer)
1602     return;
1603 
1604   // If the data source instance still has a startup buffer reservation, it was
1605   // only active for startup tracing and never started by the service. Discard
1606   // the startup buffer reservation.
1607   if (startup_buffer_reservation) {
1608     PERFETTO_DCHECK(startup_session_id);
1609 
1610     if (producer->service_ && producer->service_->MaybeSharedMemoryArbiter()) {
1611       producer->service_->MaybeSharedMemoryArbiter()
1612           ->AbortStartupTracingForReservation(startup_buffer_reservation);
1613     }
1614 
1615     auto session_it = std::find_if(
1616         backend.startup_sessions.begin(), backend.startup_sessions.end(),
1617         [startup_session_id](const RegisteredStartupSession& session) {
1618           return session.session_id == startup_session_id;
1619         });
1620 
1621     // Session should not be removed until abortion of all data source instances
1622     // is complete.
1623     PERFETTO_DCHECK(session_it != backend.startup_sessions.end());
1624 
1625     session_it->num_aborting_data_sources--;
1626     if (session_it->num_aborting_data_sources == 0) {
1627       if (session_it->on_aborted)
1628         task_runner_->PostTask(session_it->on_aborted);
1629 
1630       backend.startup_sessions.erase(session_it);
1631     }
1632   }
1633 
1634   if (producer->connected_ &&
1635       backend.producer->connection_id_.load(std::memory_order_relaxed) ==
1636           backend_connection_id) {
1637     // Flush any commits that might have been batched by SharedMemoryArbiter.
1638     producer->service_->MaybeSharedMemoryArbiter()
1639         ->FlushPendingCommitDataRequests();
1640     if (instance_id && will_notify_on_stop)
1641       producer->service_->NotifyDataSourceStopped(instance_id);
1642   }
1643   producer->SweepDeadServices();
1644 }
1645 
ClearDataSourceIncrementalState(TracingBackendId backend_id,DataSourceInstanceID instance_id)1646 void TracingMuxerImpl::ClearDataSourceIncrementalState(
1647     TracingBackendId backend_id,
1648     DataSourceInstanceID instance_id) {
1649   PERFETTO_DCHECK_THREAD(thread_checker_);
1650   PERFETTO_DLOG("Clearing incremental state for data source %" PRIu64,
1651                 instance_id);
1652   auto ds = FindDataSource(backend_id, instance_id);
1653   if (!ds) {
1654     PERFETTO_ELOG("Could not find data source to clear incremental state for");
1655     return;
1656   }
1657 
1658   DataSourceBase::ClearIncrementalStateArgs clear_incremental_state_args;
1659   clear_incremental_state_args.internal_instance_index = ds.instance_idx;
1660   {
1661     std::unique_lock<std::recursive_mutex> lock;
1662     if (ds.requires_callbacks_under_lock)
1663       lock = std::unique_lock<std::recursive_mutex>(ds.internal_state->lock);
1664     ds.internal_state->data_source->WillClearIncrementalState(
1665         clear_incremental_state_args);
1666   }
1667 
1668   // Make DataSource::TraceContext::GetIncrementalState() eventually notice that
1669   // the incremental state should be cleared.
1670   ds.static_state->GetUnsafe(ds.instance_idx)
1671       ->incremental_state_generation.fetch_add(1, std::memory_order_relaxed);
1672 }
1673 
FlushDataSource_AsyncBegin(TracingBackendId backend_id,DataSourceInstanceID instance_id,FlushRequestID flush_id,FlushFlags flush_flags)1674 bool TracingMuxerImpl::FlushDataSource_AsyncBegin(
1675     TracingBackendId backend_id,
1676     DataSourceInstanceID instance_id,
1677     FlushRequestID flush_id,
1678     FlushFlags flush_flags) {
1679   PERFETTO_DLOG("Flushing data source %" PRIu64, instance_id);
1680   auto ds = FindDataSource(backend_id, instance_id);
1681   if (!ds) {
1682     PERFETTO_ELOG("Could not find data source to flush");
1683     return true;
1684   }
1685 
1686   uint32_t backend_connection_id = ds.internal_state->backend_connection_id;
1687 
1688   FlushArgsImpl flush_args;
1689   flush_args.flush_flags = flush_flags;
1690   flush_args.internal_instance_index = ds.instance_idx;
1691   flush_args.async_flush_closure = [this, backend_id, backend_connection_id,
1692                                     instance_id, ds, flush_id] {
1693     // TracingMuxerImpl is long lived, capturing |this| is okay.
1694     // The notification closure can be moved out of the StopArgs by the
1695     // embedder to handle stop asynchronously. The embedder might then
1696     // call the closure on a different thread than the current one, hence
1697     // this nested PostTask().
1698     task_runner_->PostTask(
1699         [this, backend_id, backend_connection_id, instance_id, ds, flush_id] {
1700           FlushDataSource_AsyncEnd(backend_id, backend_connection_id,
1701                                    instance_id, ds, flush_id);
1702         });
1703   };
1704   {
1705     std::unique_lock<std::recursive_mutex> lock;
1706     if (ds.requires_callbacks_under_lock)
1707       lock = std::unique_lock<std::recursive_mutex>(ds.internal_state->lock);
1708     ds.internal_state->data_source->OnFlush(flush_args);
1709   }
1710 
1711   // |async_flush_closure| is moved out of |flush_args| if the producer
1712   // requested to handle the flush asynchronously.
1713   bool handled = static_cast<bool>(flush_args.async_flush_closure);
1714   return handled;
1715 }
1716 
FlushDataSource_AsyncEnd(TracingBackendId backend_id,uint32_t backend_connection_id,DataSourceInstanceID instance_id,const FindDataSourceRes & ds,FlushRequestID flush_id)1717 void TracingMuxerImpl::FlushDataSource_AsyncEnd(
1718     TracingBackendId backend_id,
1719     uint32_t backend_connection_id,
1720     DataSourceInstanceID instance_id,
1721     const FindDataSourceRes& ds,
1722     FlushRequestID flush_id) {
1723   PERFETTO_DLOG("Ending async flush of data source %" PRIu64, instance_id);
1724   PERFETTO_DCHECK_THREAD(thread_checker_);
1725 
1726   // Check that the data source instance is still active and was not modified
1727   // while it was being flushed.
1728   if (!ds.static_state->TryGet(ds.instance_idx) ||
1729       ds.internal_state->backend_id != backend_id ||
1730       ds.internal_state->backend_connection_id != backend_connection_id ||
1731       ds.internal_state->data_source_instance_id != instance_id) {
1732     PERFETTO_ELOG("Async flush of data source %" PRIu64
1733                   " failed. This might be due to the data source being stopped "
1734                   "in the meantime",
1735                   instance_id);
1736     return;
1737   }
1738 
1739   // |producer_backends_| is append-only, Backend instances are always valid.
1740   PERFETTO_CHECK(backend_id < producer_backends_.size());
1741   RegisteredProducerBackend& backend = *FindProducerBackendById(backend_id);
1742 
1743   ProducerImpl* producer = backend.producer.get();
1744   if (!producer)
1745     return;
1746 
1747   // If the tracing service disconnects and reconnects while a data source is
1748   // handling a flush request, there's no point is sending the flush reply to
1749   // the newly reconnected producer.
1750   if (producer->connected_ &&
1751       backend.producer->connection_id_.load(std::memory_order_relaxed) ==
1752           backend_connection_id) {
1753     producer->NotifyFlushForDataSourceDone(instance_id, flush_id);
1754   }
1755 }
1756 
SyncProducersForTesting()1757 void TracingMuxerImpl::SyncProducersForTesting() {
1758   std::mutex mutex;
1759   std::condition_variable cv;
1760 
1761   // IPC-based producers don't report connection errors explicitly for each
1762   // command, but instead with an asynchronous callback
1763   // (ProducerImpl::OnDisconnected). This means that the sync command below
1764   // may have completed but failed to reach the service because of a
1765   // disconnection, but we can't tell until the disconnection message comes
1766   // through. To guard against this, we run two whole rounds of sync round-trips
1767   // before returning; the first one will detect any disconnected producers and
1768   // the second one will ensure any reconnections have completed and all data
1769   // sources are registered in the service again.
1770   for (size_t i = 0; i < 2; i++) {
1771     size_t countdown = std::numeric_limits<size_t>::max();
1772     task_runner_->PostTask([this, &mutex, &cv, &countdown] {
1773       {
1774         std::unique_lock<std::mutex> countdown_lock(mutex);
1775         countdown = producer_backends_.size();
1776       }
1777       for (auto& backend : producer_backends_) {
1778         auto* producer = backend.producer.get();
1779         producer->service_->Sync([&mutex, &cv, &countdown] {
1780           std::unique_lock<std::mutex> countdown_lock(mutex);
1781           countdown--;
1782           cv.notify_one();
1783         });
1784       }
1785     });
1786 
1787     {
1788       std::unique_lock<std::mutex> countdown_lock(mutex);
1789       cv.wait(countdown_lock, [&countdown] { return !countdown; });
1790     }
1791   }
1792 
1793   // Check that all producers are indeed connected.
1794   bool done = false;
1795   bool all_producers_connected = true;
1796   task_runner_->PostTask([this, &mutex, &cv, &done, &all_producers_connected] {
1797     for (auto& backend : producer_backends_)
1798       all_producers_connected &= backend.producer->connected_;
1799     std::unique_lock<std::mutex> lock(mutex);
1800     done = true;
1801     cv.notify_one();
1802   });
1803 
1804   {
1805     std::unique_lock<std::mutex> lock(mutex);
1806     cv.wait(lock, [&done] { return done; });
1807   }
1808   PERFETTO_DCHECK(all_producers_connected);
1809 }
1810 
DestroyStoppedTraceWritersForCurrentThread()1811 void TracingMuxerImpl::DestroyStoppedTraceWritersForCurrentThread() {
1812   // Iterate across all possible data source types.
1813   auto cur_generation = generation_.load(std::memory_order_acquire);
1814   auto* root_tls = GetOrCreateTracingTLS();
1815 
1816   auto destroy_stopped_instances = [](DataSourceThreadLocalState& tls) {
1817     // |tls| has a vector of per-data-source-instance thread-local state.
1818     DataSourceStaticState* static_state = tls.static_state;
1819     if (!static_state)
1820       return;  // Slot not used.
1821 
1822     // Iterate across all possible instances for this data source.
1823     for (uint32_t inst = 0; inst < kMaxDataSourceInstances; inst++) {
1824       DataSourceInstanceThreadLocalState& ds_tls = tls.per_instance[inst];
1825       if (!ds_tls.trace_writer)
1826         continue;
1827 
1828       DataSourceState* ds_state = static_state->TryGet(inst);
1829       if (ds_state &&
1830           ds_state->muxer_id_for_testing == ds_tls.muxer_id_for_testing &&
1831           ds_state->backend_id == ds_tls.backend_id &&
1832           ds_state->backend_connection_id == ds_tls.backend_connection_id &&
1833           ds_state->startup_target_buffer_reservation.load(
1834               std::memory_order_relaxed) ==
1835               ds_tls.startup_target_buffer_reservation &&
1836           ds_state->buffer_id == ds_tls.buffer_id &&
1837           ds_state->data_source_instance_id == ds_tls.data_source_instance_id) {
1838         continue;
1839       }
1840 
1841       // The DataSource instance has been destroyed or recycled.
1842       ds_tls.Reset();  // Will also destroy the |ds_tls.trace_writer|.
1843     }
1844   };
1845 
1846   for (size_t ds_idx = 0; ds_idx < kMaxDataSources; ds_idx++) {
1847     // |tls| has a vector of per-data-source-instance thread-local state.
1848     DataSourceThreadLocalState& tls = root_tls->data_sources_tls[ds_idx];
1849     destroy_stopped_instances(tls);
1850   }
1851   destroy_stopped_instances(root_tls->track_event_tls);
1852   root_tls->generation = cur_generation;
1853 }
1854 
1855 // Called both when a new data source is registered or when a new backend
1856 // connects. In both cases we want to be sure we reflected the data source
1857 // registrations on the backends.
UpdateDataSourcesOnAllBackends()1858 void TracingMuxerImpl::UpdateDataSourcesOnAllBackends() {
1859   PERFETTO_DCHECK_THREAD(thread_checker_);
1860   for (RegisteredDataSource& rds : data_sources_) {
1861     UpdateDataSourceOnAllBackends(rds, /*is_changed=*/false);
1862   }
1863 }
1864 
UpdateDataSourceOnAllBackends(RegisteredDataSource & rds,bool is_changed)1865 void TracingMuxerImpl::UpdateDataSourceOnAllBackends(RegisteredDataSource& rds,
1866                                                      bool is_changed) {
1867   PERFETTO_DCHECK_THREAD(thread_checker_);
1868   for (RegisteredProducerBackend& backend : producer_backends_) {
1869     // We cannot call RegisterDataSource on the backend before it connects.
1870     if (!backend.producer->connected_)
1871       continue;
1872 
1873     PERFETTO_DCHECK(rds.static_state->index < kMaxDataSources);
1874     bool is_registered = backend.producer->registered_data_sources_.test(
1875         rds.static_state->index);
1876     if (is_registered && !is_changed)
1877       continue;
1878 
1879     if (!rds.descriptor.no_flush()) {
1880       rds.descriptor.set_no_flush(rds.no_flush);
1881     }
1882     rds.descriptor.set_will_notify_on_start(true);
1883     if (!rds.descriptor.has_will_notify_on_stop()) {
1884       rds.descriptor.set_will_notify_on_stop(true);
1885     }
1886 
1887     rds.descriptor.set_handles_incremental_state_clear(true);
1888     rds.descriptor.set_id(rds.static_state->id);
1889     if (is_registered) {
1890       backend.producer->service_->UpdateDataSource(rds.descriptor);
1891     } else {
1892       backend.producer->service_->RegisterDataSource(rds.descriptor);
1893     }
1894     backend.producer->registered_data_sources_.set(rds.static_state->index);
1895   }
1896 }
1897 
SetupTracingSession(TracingSessionGlobalID session_id,const std::shared_ptr<TraceConfig> & trace_config,base::ScopedFile trace_fd)1898 void TracingMuxerImpl::SetupTracingSession(
1899     TracingSessionGlobalID session_id,
1900     const std::shared_ptr<TraceConfig>& trace_config,
1901     base::ScopedFile trace_fd) {
1902   PERFETTO_DCHECK_THREAD(thread_checker_);
1903   PERFETTO_CHECK(!trace_fd || trace_config->write_into_file());
1904 
1905   auto* consumer = FindConsumer(session_id);
1906   if (!consumer)
1907     return;
1908 
1909   consumer->trace_config_ = trace_config;
1910   if (trace_fd)
1911     consumer->trace_fd_ = std::move(trace_fd);
1912 
1913   if (!consumer->connected_)
1914     return;
1915 
1916   // Only used in the deferred start mode.
1917   if (trace_config->deferred_start()) {
1918     consumer->service_->EnableTracing(*trace_config,
1919                                       std::move(consumer->trace_fd_));
1920   }
1921 }
1922 
StartTracingSession(TracingSessionGlobalID session_id)1923 void TracingMuxerImpl::StartTracingSession(TracingSessionGlobalID session_id) {
1924   PERFETTO_DCHECK_THREAD(thread_checker_);
1925 
1926   auto* consumer = FindConsumer(session_id);
1927 
1928   if (!consumer)
1929     return;
1930 
1931   if (!consumer->trace_config_) {
1932     PERFETTO_ELOG("Must call Setup(config) first");
1933     return;
1934   }
1935 
1936   if (!consumer->connected_) {
1937     consumer->start_pending_ = true;
1938     return;
1939   }
1940 
1941   consumer->start_pending_ = false;
1942   if (consumer->trace_config_->deferred_start()) {
1943     consumer->service_->StartTracing();
1944   } else {
1945     consumer->service_->EnableTracing(*consumer->trace_config_,
1946                                       std::move(consumer->trace_fd_));
1947   }
1948 
1949   // TODO implement support for the deferred-start + fast-triggering case.
1950 }
1951 
CloneTracingSession(TracingSessionGlobalID session_id,TracingSession::CloneTraceArgs args,TracingSession::CloneTraceCallback callback)1952 void TracingMuxerImpl::CloneTracingSession(
1953     TracingSessionGlobalID session_id,
1954     TracingSession::CloneTraceArgs args,
1955     TracingSession::CloneTraceCallback callback) {
1956   PERFETTO_DCHECK_THREAD(thread_checker_);
1957   auto* consumer = FindConsumer(session_id);
1958   if (!consumer) {
1959     TracingSession::CloneTraceCallbackArgs callback_arg{};
1960     callback_arg.success = false;
1961     callback_arg.error = "Tracing session not found";
1962     callback(callback_arg);
1963     return;
1964   }
1965   // Multiple concurrent cloning isn't supported.
1966   PERFETTO_DCHECK(!consumer->clone_trace_callback_);
1967   consumer->clone_trace_callback_ = std::move(callback);
1968   ConsumerEndpoint::CloneSessionArgs consumer_args{};
1969   consumer_args.unique_session_name = args.unique_session_name;
1970   if (!consumer->connected_) {
1971     consumer->session_to_clone_ = std::move(consumer_args);
1972     return;
1973   }
1974   consumer->session_to_clone_ = std::nullopt;
1975   consumer->service_->CloneSession(consumer_args);
1976 }
1977 
ChangeTracingSessionConfig(TracingSessionGlobalID session_id,const TraceConfig & trace_config)1978 void TracingMuxerImpl::ChangeTracingSessionConfig(
1979     TracingSessionGlobalID session_id,
1980     const TraceConfig& trace_config) {
1981   PERFETTO_DCHECK_THREAD(thread_checker_);
1982 
1983   auto* consumer = FindConsumer(session_id);
1984 
1985   if (!consumer)
1986     return;
1987 
1988   if (!consumer->trace_config_) {
1989     // Changing the config is only supported for started sessions.
1990     PERFETTO_ELOG("Must call Setup(config) and Start() first");
1991     return;
1992   }
1993 
1994   consumer->trace_config_ = std::make_shared<TraceConfig>(trace_config);
1995   if (consumer->connected_)
1996     consumer->service_->ChangeTraceConfig(trace_config);
1997 }
1998 
FlushTracingSession(TracingSessionGlobalID session_id,uint32_t timeout_ms,std::function<void (bool)> callback)1999 void TracingMuxerImpl::FlushTracingSession(TracingSessionGlobalID session_id,
2000                                            uint32_t timeout_ms,
2001                                            std::function<void(bool)> callback) {
2002   PERFETTO_DCHECK_THREAD(thread_checker_);
2003   auto* consumer = FindConsumer(session_id);
2004   if (!consumer || consumer->start_pending_ || consumer->stop_pending_ ||
2005       !consumer->trace_config_) {
2006     PERFETTO_ELOG("Flush() can be called only after Start() and before Stop()");
2007     std::move(callback)(false);
2008     return;
2009   }
2010 
2011   // For now we don't want to expose the flush reason to the consumer-side SDK
2012   // users to avoid misuses until there is a strong need.
2013   consumer->service_->Flush(timeout_ms, std::move(callback),
2014                             FlushFlags(FlushFlags::Initiator::kConsumerSdk,
2015                                        FlushFlags::Reason::kExplicit));
2016 }
2017 
StopTracingSession(TracingSessionGlobalID session_id)2018 void TracingMuxerImpl::StopTracingSession(TracingSessionGlobalID session_id) {
2019   PERFETTO_DCHECK_THREAD(thread_checker_);
2020   auto* consumer = FindConsumer(session_id);
2021   if (!consumer)
2022     return;
2023 
2024   if (consumer->start_pending_) {
2025     // If the session hasn't started yet, wait until it does before stopping.
2026     consumer->stop_pending_ = true;
2027     return;
2028   }
2029 
2030   consumer->stop_pending_ = false;
2031   if (consumer->stopped_) {
2032     // If the session was already stopped (e.g., it failed to start), don't try
2033     // stopping again.
2034     consumer->NotifyStopComplete();
2035   } else if (!consumer->trace_config_) {
2036     PERFETTO_ELOG("Must call Setup(config) and Start() first");
2037     return;
2038   } else {
2039     consumer->service_->DisableTracing();
2040   }
2041 
2042   consumer->trace_config_.reset();
2043 }
2044 
DestroyTracingSession(TracingSessionGlobalID session_id)2045 void TracingMuxerImpl::DestroyTracingSession(
2046     TracingSessionGlobalID session_id) {
2047   PERFETTO_DCHECK_THREAD(thread_checker_);
2048   for (RegisteredConsumerBackend& backend : consumer_backends_) {
2049     // We need to find the consumer (if any) and call Disconnect as we destroy
2050     // the tracing session. We can't call Disconnect() inside this for loop
2051     // because in the in-process case this will end up to a synchronous call to
2052     // OnConsumerDisconnect which will invalidate all the iterators to
2053     // |backend.consumers|.
2054     ConsumerImpl* consumer = nullptr;
2055     for (auto& con : backend.consumers) {
2056       if (con->session_id_ == session_id) {
2057         consumer = con.get();
2058         break;
2059       }
2060     }
2061     if (consumer) {
2062       // We broke out of the loop above on the assumption that each backend will
2063       // only have a single consumer per session. This DCHECK ensures that
2064       // this is the case.
2065       PERFETTO_DCHECK(
2066           std::count_if(backend.consumers.begin(), backend.consumers.end(),
2067                         [session_id](const std::unique_ptr<ConsumerImpl>& con) {
2068                           return con->session_id_ == session_id;
2069                         }) == 1u);
2070       consumer->Disconnect();
2071     }
2072   }
2073 }
2074 
ReadTracingSessionData(TracingSessionGlobalID session_id,std::function<void (TracingSession::ReadTraceCallbackArgs)> callback)2075 void TracingMuxerImpl::ReadTracingSessionData(
2076     TracingSessionGlobalID session_id,
2077     std::function<void(TracingSession::ReadTraceCallbackArgs)> callback) {
2078   PERFETTO_DCHECK_THREAD(thread_checker_);
2079   auto* consumer = FindConsumer(session_id);
2080   if (!consumer) {
2081     // TODO(skyostil): Signal an error to the user.
2082     TracingSession::ReadTraceCallbackArgs callback_arg{};
2083     callback(callback_arg);
2084     return;
2085   }
2086   PERFETTO_DCHECK(!consumer->read_trace_callback_);
2087   consumer->read_trace_callback_ = std::move(callback);
2088   consumer->service_->ReadBuffers();
2089 }
2090 
GetTraceStats(TracingSessionGlobalID session_id,TracingSession::GetTraceStatsCallback callback)2091 void TracingMuxerImpl::GetTraceStats(
2092     TracingSessionGlobalID session_id,
2093     TracingSession::GetTraceStatsCallback callback) {
2094   PERFETTO_DCHECK_THREAD(thread_checker_);
2095   auto* consumer = FindConsumer(session_id);
2096   if (!consumer) {
2097     TracingSession::GetTraceStatsCallbackArgs callback_arg{};
2098     callback_arg.success = false;
2099     callback(std::move(callback_arg));
2100     return;
2101   }
2102   PERFETTO_DCHECK(!consumer->get_trace_stats_callback_);
2103   consumer->get_trace_stats_callback_ = std::move(callback);
2104   if (!consumer->connected_) {
2105     consumer->get_trace_stats_pending_ = true;
2106     return;
2107   }
2108   consumer->get_trace_stats_pending_ = false;
2109   consumer->service_->GetTraceStats();
2110 }
2111 
QueryServiceState(TracingSessionGlobalID session_id,TracingSession::QueryServiceStateCallback callback)2112 void TracingMuxerImpl::QueryServiceState(
2113     TracingSessionGlobalID session_id,
2114     TracingSession::QueryServiceStateCallback callback) {
2115   PERFETTO_DCHECK_THREAD(thread_checker_);
2116   auto* consumer = FindConsumer(session_id);
2117   if (!consumer) {
2118     TracingSession::QueryServiceStateCallbackArgs callback_arg{};
2119     callback_arg.success = false;
2120     callback(std::move(callback_arg));
2121     return;
2122   }
2123   PERFETTO_DCHECK(!consumer->query_service_state_callback_);
2124   if (!consumer->connected_) {
2125     consumer->query_service_state_callback_ = std::move(callback);
2126     return;
2127   }
2128   auto callback_wrapper = [callback](bool success,
2129                                      protos::gen::TracingServiceState state) {
2130     TracingSession::QueryServiceStateCallbackArgs callback_arg{};
2131     callback_arg.success = success;
2132     callback_arg.service_state_data = state.SerializeAsArray();
2133     callback(std::move(callback_arg));
2134   };
2135   consumer->service_->QueryServiceState({}, std::move(callback_wrapper));
2136 }
2137 
SetBatchCommitsDurationForTesting(uint32_t batch_commits_duration_ms,BackendType backend_type)2138 void TracingMuxerImpl::SetBatchCommitsDurationForTesting(
2139     uint32_t batch_commits_duration_ms,
2140     BackendType backend_type) {
2141   for (RegisteredProducerBackend& backend : producer_backends_) {
2142     if (backend.producer && backend.producer->connected_ &&
2143         backend.type == backend_type) {
2144       backend.producer->service_->MaybeSharedMemoryArbiter()
2145           ->SetBatchCommitsDuration(batch_commits_duration_ms);
2146     }
2147   }
2148 }
2149 
EnableDirectSMBPatchingForTesting(BackendType backend_type)2150 bool TracingMuxerImpl::EnableDirectSMBPatchingForTesting(
2151     BackendType backend_type) {
2152   for (RegisteredProducerBackend& backend : producer_backends_) {
2153     if (backend.producer && backend.producer->connected_ &&
2154         backend.type == backend_type &&
2155         !backend.producer->service_->MaybeSharedMemoryArbiter()
2156              ->EnableDirectSMBPatching()) {
2157       return false;
2158     }
2159   }
2160   return true;
2161 }
2162 
FindConsumer(TracingSessionGlobalID session_id)2163 TracingMuxerImpl::ConsumerImpl* TracingMuxerImpl::FindConsumer(
2164     TracingSessionGlobalID session_id) {
2165   PERFETTO_DCHECK_THREAD(thread_checker_);
2166   return FindConsumerAndBackend(session_id).first;
2167 }
2168 
2169 std::pair<TracingMuxerImpl::ConsumerImpl*,
2170           TracingMuxerImpl::RegisteredConsumerBackend*>
FindConsumerAndBackend(TracingSessionGlobalID session_id)2171 TracingMuxerImpl::FindConsumerAndBackend(TracingSessionGlobalID session_id) {
2172   PERFETTO_DCHECK_THREAD(thread_checker_);
2173   for (RegisteredConsumerBackend& backend : consumer_backends_) {
2174     for (auto& consumer : backend.consumers) {
2175       if (consumer->session_id_ == session_id) {
2176         return {consumer.get(), &backend};
2177       }
2178     }
2179   }
2180   return {nullptr, nullptr};
2181 }
2182 
InitializeConsumer(TracingSessionGlobalID session_id)2183 void TracingMuxerImpl::InitializeConsumer(TracingSessionGlobalID session_id) {
2184   PERFETTO_DCHECK_THREAD(thread_checker_);
2185 
2186   auto res = FindConsumerAndBackend(session_id);
2187   if (!res.first || !res.second)
2188     return;
2189   TracingMuxerImpl::ConsumerImpl* consumer = res.first;
2190   RegisteredConsumerBackend& backend = *res.second;
2191 
2192   TracingBackend::ConnectConsumerArgs conn_args;
2193   conn_args.consumer = consumer;
2194   conn_args.task_runner = task_runner_.get();
2195   consumer->Initialize(backend.backend->ConnectConsumer(conn_args));
2196 }
2197 
OnConsumerDisconnected(ConsumerImpl * consumer)2198 void TracingMuxerImpl::OnConsumerDisconnected(ConsumerImpl* consumer) {
2199   PERFETTO_DCHECK_THREAD(thread_checker_);
2200   for (RegisteredConsumerBackend& backend : consumer_backends_) {
2201     auto pred = [consumer](const std::unique_ptr<ConsumerImpl>& con) {
2202       return con.get() == consumer;
2203     };
2204     backend.consumers.erase(std::remove_if(backend.consumers.begin(),
2205                                            backend.consumers.end(), pred),
2206                             backend.consumers.end());
2207   }
2208 }
2209 
SetMaxProducerReconnectionsForTesting(uint32_t count)2210 void TracingMuxerImpl::SetMaxProducerReconnectionsForTesting(uint32_t count) {
2211   max_producer_reconnections_.store(count);
2212 }
2213 
OnProducerDisconnected(ProducerImpl * producer)2214 void TracingMuxerImpl::OnProducerDisconnected(ProducerImpl* producer) {
2215   PERFETTO_DCHECK_THREAD(thread_checker_);
2216   for (RegisteredProducerBackend& backend : producer_backends_) {
2217     if (backend.producer.get() != producer)
2218       continue;
2219 
2220     // The tracing service is disconnected. It does not make sense to keep
2221     // tracing (we wouldn't be able to commit). On reconnection, the tracing
2222     // service will restart the data sources.
2223     for (const auto& rds : data_sources_) {
2224       DataSourceStaticState* static_state = rds.static_state;
2225       for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
2226         auto* internal_state = static_state->TryGet(i);
2227         if (internal_state && internal_state->backend_id == backend.id &&
2228             internal_state->backend_connection_id ==
2229                 backend.producer->connection_id_.load(
2230                     std::memory_order_relaxed)) {
2231           StopDataSource_AsyncBeginImpl(
2232               FindDataSourceRes(static_state, internal_state, i,
2233                                 rds.requires_callbacks_under_lock));
2234         }
2235       }
2236     }
2237 
2238     // Try reconnecting the disconnected producer. If the connection succeeds,
2239     // all the data sources will be automatically re-registered.
2240     if (producer->connection_id_.load(std::memory_order_relaxed) >
2241         max_producer_reconnections_.load()) {
2242       // Avoid reconnecting a failing producer too many times. Instead we just
2243       // leak the producer instead of trying to avoid further complicating
2244       // cross-thread trace writer creation.
2245       PERFETTO_ELOG("Producer disconnected too many times; not reconnecting");
2246       continue;
2247     }
2248 
2249     backend.producer->Initialize(
2250         backend.backend->ConnectProducer(backend.producer_conn_args));
2251     // Don't use producer-provided SMBs for the next connection unless startup
2252     // tracing requires it again.
2253     backend.producer_conn_args.use_producer_provided_smb = false;
2254   }
2255 }
2256 
SweepDeadBackends()2257 void TracingMuxerImpl::SweepDeadBackends() {
2258   PERFETTO_DCHECK_THREAD(thread_checker_);
2259   for (auto it = dead_backends_.begin(); it != dead_backends_.end();) {
2260     auto next_it = it;
2261     next_it++;
2262     if (it->producer->SweepDeadServices())
2263       dead_backends_.erase(it);
2264     it = next_it;
2265   }
2266 }
2267 
FindDataSource(TracingBackendId backend_id,DataSourceInstanceID instance_id)2268 TracingMuxerImpl::FindDataSourceRes TracingMuxerImpl::FindDataSource(
2269     TracingBackendId backend_id,
2270     DataSourceInstanceID instance_id) {
2271   PERFETTO_DCHECK_THREAD(thread_checker_);
2272   RegisteredProducerBackend& backend = *FindProducerBackendById(backend_id);
2273   for (const auto& rds : data_sources_) {
2274     DataSourceStaticState* static_state = rds.static_state;
2275     for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
2276       auto* internal_state = static_state->TryGet(i);
2277       if (internal_state && internal_state->backend_id == backend_id &&
2278           internal_state->backend_connection_id ==
2279               backend.producer->connection_id_.load(
2280                   std::memory_order_relaxed) &&
2281           internal_state->data_source_instance_id == instance_id) {
2282         return FindDataSourceRes(static_state, internal_state, i,
2283                                  rds.requires_callbacks_under_lock);
2284       }
2285     }
2286   }
2287   return FindDataSourceRes();
2288 }
2289 
2290 // Can be called from any thread.
CreateTraceWriter(DataSourceStaticState * static_state,uint32_t data_source_instance_index,DataSourceState * data_source,BufferExhaustedPolicy buffer_exhausted_policy)2291 std::unique_ptr<TraceWriterBase> TracingMuxerImpl::CreateTraceWriter(
2292     DataSourceStaticState* static_state,
2293     uint32_t data_source_instance_index,
2294     DataSourceState* data_source,
2295     BufferExhaustedPolicy buffer_exhausted_policy) {
2296   if (PERFETTO_UNLIKELY(data_source->interceptor_id)) {
2297     // If the session is being intercepted, return a heap-backed trace writer
2298     // instead. This is safe because all the data given to the interceptor is
2299     // either thread-local (|instance_index|), statically allocated
2300     // (|static_state|) or constant after initialization (|interceptor|). Access
2301     // to the interceptor instance itself through |data_source| is protected by
2302     // a statically allocated lock (similarly to the data source instance).
2303     auto& interceptor = interceptors_[data_source->interceptor_id - 1];
2304     return std::unique_ptr<TraceWriterBase>(new InterceptorTraceWriter(
2305         interceptor.tls_factory(static_state, data_source_instance_index),
2306         interceptor.packet_callback, static_state, data_source_instance_index));
2307   }
2308   ProducerImpl* producer =
2309       FindProducerBackendById(data_source->backend_id)->producer.get();
2310   // Atomically load the current service endpoint. We keep the pointer as a
2311   // shared pointer on the stack to guard against it from being concurrently
2312   // modified on the thread by ProducerImpl::Initialize() swapping in a
2313   // reconnected service on the muxer task runner thread.
2314   //
2315   // The endpoint may also be concurrently modified by SweepDeadServices()
2316   // clearing out old disconnected services. We guard against that by
2317   // SharedMemoryArbiter keeping track of any outstanding trace writers. After
2318   // shutdown has started, the trace writer created below will be a null one
2319   // which will drop any written data. See SharedMemoryArbiter::TryShutdown().
2320   //
2321   // We use an atomic pointer instead of holding a lock because
2322   // CreateTraceWriter posts tasks under the hood.
2323   std::shared_ptr<ProducerEndpoint> service =
2324       std::atomic_load(&producer->service_);
2325 
2326   // The service may have been disconnected and reconnected concurrently after
2327   // the data source was enabled, in which case we may not have an arbiter, or
2328   // would be creating a TraceWriter for the wrong (a newer) connection / SMB.
2329   // Instead, early-out now. A relaxed load is fine here because the atomic_load
2330   // above ensures that the |service| isn't newer.
2331   if (producer->connection_id_.load(std::memory_order_relaxed) !=
2332       data_source->backend_connection_id) {
2333     return std::unique_ptr<TraceWriter>(new NullTraceWriter());
2334   }
2335 
2336   // We just need a relaxed atomic read here: We can use the reservation ID even
2337   // after the buffer was bound, we just need to be sure to read it atomically.
2338   uint16_t startup_buffer_reservation =
2339       data_source->startup_target_buffer_reservation.load(
2340           std::memory_order_relaxed);
2341   if (startup_buffer_reservation) {
2342     return service->MaybeSharedMemoryArbiter()->CreateStartupTraceWriter(
2343         startup_buffer_reservation);
2344   }
2345   return service->CreateTraceWriter(data_source->buffer_id,
2346                                     buffer_exhausted_policy);
2347 }
2348 
2349 // This is called via the public API Tracing::NewTrace().
2350 // Can be called from any thread.
CreateTracingSession(BackendType requested_backend_type,TracingConsumerBackend * (* system_backend_factory)())2351 std::unique_ptr<TracingSession> TracingMuxerImpl::CreateTracingSession(
2352     BackendType requested_backend_type,
2353     TracingConsumerBackend* (*system_backend_factory)()) {
2354   TracingSessionGlobalID session_id = ++next_tracing_session_id_;
2355 
2356   // |backend_type| can only specify one backend, not an OR-ed mask.
2357   PERFETTO_CHECK((requested_backend_type & (requested_backend_type - 1)) == 0);
2358 
2359   // Capturing |this| is fine because the TracingMuxer is a leaky singleton.
2360   task_runner_->PostTask([this, requested_backend_type, session_id,
2361                           system_backend_factory] {
2362     if (requested_backend_type == kSystemBackend && system_backend_factory &&
2363         !FindConsumerBackendByType(kSystemBackend)) {
2364       AddConsumerBackend(system_backend_factory(), kSystemBackend);
2365     }
2366     for (RegisteredConsumerBackend& backend : consumer_backends_) {
2367       if (requested_backend_type && backend.type &&
2368           backend.type != requested_backend_type) {
2369         continue;
2370       }
2371 
2372       // Create the consumer now, even if we have to ask the embedder below, so
2373       // that any other tasks executing after this one can find the consumer and
2374       // change its pending attributes.
2375       backend.consumers.emplace_back(
2376           new ConsumerImpl(this, backend.type, session_id));
2377 
2378       // The last registered backend in |consumer_backends_| is the unsupported
2379       // backend without a valid type.
2380       if (!backend.type) {
2381         PERFETTO_ELOG(
2382             "No tracing backend ready for type=%d, consumer will disconnect",
2383             requested_backend_type);
2384         InitializeConsumer(session_id);
2385         return;
2386       }
2387 
2388       // Check if the embedder wants to be asked for permission before
2389       // connecting the consumer.
2390       if (!policy_) {
2391         InitializeConsumer(session_id);
2392         return;
2393       }
2394 
2395       BackendType type = backend.type;
2396       TracingPolicy::ShouldAllowConsumerSessionArgs args;
2397       args.backend_type = backend.type;
2398       args.result_callback = [this, type, session_id](bool allow) {
2399         task_runner_->PostTask([this, type, session_id, allow] {
2400           if (allow) {
2401             InitializeConsumer(session_id);
2402             return;
2403           }
2404 
2405           PERFETTO_ELOG(
2406               "Consumer session for backend type type=%d forbidden, "
2407               "consumer will disconnect",
2408               type);
2409 
2410           auto* consumer = FindConsumer(session_id);
2411           if (!consumer)
2412             return;
2413 
2414           consumer->OnDisconnect();
2415         });
2416       };
2417       policy_->ShouldAllowConsumerSession(args);
2418       return;
2419     }
2420     PERFETTO_DFATAL("Not reached");
2421   });
2422 
2423   return std::unique_ptr<TracingSession>(
2424       new TracingSessionImpl(this, session_id, requested_backend_type));
2425 }
2426 
2427 // static
2428 // This is called via the public API Tracing::SetupStartupTracing().
2429 // Can be called from any thread.
2430 std::unique_ptr<StartupTracingSession>
CreateStartupTracingSession(const TraceConfig & config,Tracing::SetupStartupTracingOpts opts)2431 TracingMuxerImpl::CreateStartupTracingSession(
2432     const TraceConfig& config,
2433     Tracing::SetupStartupTracingOpts opts) {
2434   BackendType backend_type = opts.backend;
2435   // |backend_type| can only specify one backend, not an OR-ed mask.
2436   PERFETTO_CHECK((backend_type & (backend_type - 1)) == 0);
2437   // The in-process backend doesn't support startup tracing.
2438   PERFETTO_CHECK(backend_type != BackendType::kInProcessBackend);
2439 
2440   TracingSessionGlobalID session_id = ++next_tracing_session_id_;
2441 
2442   // Capturing |this| is fine because the TracingMuxer is a leaky singleton.
2443   task_runner_->PostTask([this, config, opts, backend_type, session_id] {
2444     for (RegisteredProducerBackend& backend : producer_backends_) {
2445       if (backend_type && backend.type && backend.type != backend_type) {
2446         continue;
2447       }
2448 
2449       TracingBackendId backend_id = backend.id;
2450 
2451       // The last registered backend in |producer_backends_| is the unsupported
2452       // backend without a valid type.
2453       if (!backend.type) {
2454         PERFETTO_ELOG(
2455             "No tracing backend initialized for type=%d, startup tracing "
2456             "failed",
2457             backend_type);
2458         if (opts.on_setup)
2459           opts.on_setup(Tracing::OnStartupTracingSetupCallbackArgs{
2460               0 /* num_data_sources_started */});
2461         return;
2462       }
2463 
2464       if (!backend.producer->service_ ||
2465           !backend.producer->service_->shared_memory()) {
2466         // If we unsuccessfully attempted to use a producer-provided SMB in the
2467         // past, don't try again.
2468         if (backend.producer->producer_provided_smb_failed_) {
2469           PERFETTO_ELOG(
2470               "Backend %zu doesn't seem to support producer-provided "
2471               "SMBs, startup tracing failed",
2472               backend_id);
2473           if (opts.on_setup)
2474             opts.on_setup(Tracing::OnStartupTracingSetupCallbackArgs{
2475                 0 /* num_data_sources_started */});
2476           return;
2477         }
2478 
2479         PERFETTO_DLOG("Reconnecting backend %zu for startup tracing",
2480                       backend_id);
2481         backend.producer_conn_args.use_producer_provided_smb = true;
2482         backend.producer->service_->Disconnect();  // Causes a reconnect.
2483         PERFETTO_DCHECK(backend.producer->service_ &&
2484                         backend.producer->service_->MaybeSharedMemoryArbiter());
2485       }
2486 
2487       RegisteredStartupSession session;
2488       session.session_id = session_id;
2489       session.on_aborted = opts.on_aborted;
2490       session.on_adopted = opts.on_adopted;
2491 
2492       for (const TraceConfig::DataSource& ds_cfg : config.data_sources()) {
2493         // Find all matching data sources and start one instance of each.
2494         for (const auto& rds : data_sources_) {
2495           if (rds.descriptor.name() != ds_cfg.config().name())
2496             continue;
2497 
2498           PERFETTO_DLOG(
2499               "Setting up data source %s for startup tracing with target "
2500               "buffer reservation %" PRIi32,
2501               rds.descriptor.name().c_str(),
2502               backend.producer->last_startup_target_buffer_reservation_ + 1u);
2503           auto ds = SetupDataSourceImpl(
2504               rds, backend_id,
2505               backend.producer->connection_id_.load(std::memory_order_relaxed),
2506               /*instance_id=*/0, ds_cfg.config(),
2507               /*startup_session_id=*/session_id);
2508           if (ds) {
2509             StartDataSourceImpl(ds);
2510             session.num_unbound_data_sources++;
2511           }
2512         }
2513       }
2514 
2515       int num_ds = session.num_unbound_data_sources;
2516       auto on_setup = opts.on_setup;
2517       if (on_setup) {
2518         backend.producer->OnStartupTracingSetup();
2519         task_runner_->PostTask([on_setup, num_ds] {
2520           on_setup(Tracing::OnStartupTracingSetupCallbackArgs{num_ds});
2521         });
2522       }
2523 
2524       if (num_ds > 0) {
2525         backend.startup_sessions.push_back(std::move(session));
2526 
2527         if (opts.timeout_ms > 0) {
2528           task_runner_->PostDelayedTask(
2529               [this, session_id, backend_type] {
2530                 AbortStartupTracingSession(session_id, backend_type);
2531               },
2532               opts.timeout_ms);
2533         }
2534       }
2535       return;
2536     }
2537     PERFETTO_DFATAL("Invalid startup tracing session backend");
2538   });
2539 
2540   return std::unique_ptr<StartupTracingSession>(
2541       new StartupTracingSessionImpl(this, session_id, backend_type));
2542 }
2543 
2544 // Must not be called from the SDK's internal thread.
2545 std::unique_ptr<StartupTracingSession>
CreateStartupTracingSessionBlocking(const TraceConfig & config,Tracing::SetupStartupTracingOpts opts)2546 TracingMuxerImpl::CreateStartupTracingSessionBlocking(
2547     const TraceConfig& config,
2548     Tracing::SetupStartupTracingOpts opts) {
2549   auto previous_on_setup = std::move(opts.on_setup);
2550   PERFETTO_CHECK(!task_runner_->RunsTasksOnCurrentThread());
2551   base::WaitableEvent event;
2552   // It is safe to capture by reference because once on_setup is called only
2553   // once before this method returns.
2554   opts.on_setup = [&](Tracing::OnStartupTracingSetupCallbackArgs args) {
2555     if (previous_on_setup) {
2556       previous_on_setup(std::move(args));
2557     }
2558     event.Notify();
2559   };
2560   auto session = CreateStartupTracingSession(config, std::move(opts));
2561   event.Wait();
2562   return session;
2563 }
2564 
AbortStartupTracingSession(TracingSessionGlobalID session_id,BackendType backend_type)2565 void TracingMuxerImpl::AbortStartupTracingSession(
2566     TracingSessionGlobalID session_id,
2567     BackendType backend_type) {
2568   PERFETTO_DCHECK_THREAD(thread_checker_);
2569 
2570   for (RegisteredProducerBackend& backend : producer_backends_) {
2571     if (backend_type != backend.type)
2572       continue;
2573 
2574     auto session_it = std::find_if(
2575         backend.startup_sessions.begin(), backend.startup_sessions.end(),
2576         [session_id](const RegisteredStartupSession& session) {
2577           return session.session_id == session_id;
2578         });
2579 
2580     // The startup session may have already been aborted or fully adopted.
2581     if (session_it == backend.startup_sessions.end())
2582       return;
2583     if (session_it->is_aborting)
2584       return;
2585 
2586     session_it->is_aborting = true;
2587 
2588     // Iterate all data sources and abort them if they weren't adopted yet.
2589     for (const auto& rds : data_sources_) {
2590       DataSourceStaticState* static_state = rds.static_state;
2591       for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
2592         auto* internal_state = static_state->TryGet(i);
2593         if (internal_state &&
2594             internal_state->startup_target_buffer_reservation.load(
2595                 std::memory_order_relaxed) &&
2596             internal_state->data_source_instance_id == 0 &&
2597             internal_state->startup_session_id == session_id) {
2598           PERFETTO_DLOG(
2599               "Aborting startup tracing for data source %s (target buffer "
2600               "reservation %" PRIu16 ")",
2601               rds.descriptor.name().c_str(),
2602               internal_state->startup_target_buffer_reservation.load(
2603                   std::memory_order_relaxed));
2604 
2605           // Abort the instance asynchronously by stopping it. From this point
2606           // onwards, the service will not be able to adopt it via
2607           // StartDataSource().
2608           session_it->num_aborting_data_sources++;
2609           StopDataSource_AsyncBeginImpl(
2610               FindDataSourceRes(static_state, internal_state, i,
2611                                 rds.requires_callbacks_under_lock));
2612         }
2613       }
2614     }
2615 
2616     // If we did everything right, we should have aborted all still-unbound data
2617     // source instances.
2618     PERFETTO_DCHECK(session_it->num_unbound_data_sources ==
2619                     session_it->num_aborting_data_sources);
2620 
2621     if (session_it->num_aborting_data_sources == 0) {
2622       if (session_it->on_aborted)
2623         task_runner_->PostTask(session_it->on_aborted);
2624 
2625       backend.startup_sessions.erase(session_it);
2626     }
2627     return;
2628   }
2629   // We might reach here in tests because when we start a trace, we post the
2630   // Task(AbortStartupTrace, delay=timeout). When we do
2631   // perfetto::ResetForTesting, we sweep dead backends, and we are not able to
2632   // kill those delayed tasks because TaskRunner doesn't have support for
2633   // deleting scheduled future tasks and TaskRunner doesn't have any API for us
2634   // to wait for the completion of all the scheduled tasks (apart from
2635   // deleting the TaskRunner) and we want to avoid doing that because we need
2636   // a long running TaskRunner in muxer.
2637   PERFETTO_DLOG("Invalid startup tracing session backend");
2638 }
2639 
InitializeInstance(const TracingInitArgs & args)2640 void TracingMuxerImpl::InitializeInstance(const TracingInitArgs& args) {
2641   if (instance_ != TracingMuxerFake::Get()) {
2642     // The tracing muxer was already initialized. We might need to initialize
2643     // additional backends that were not configured earlier.
2644     auto* muxer = static_cast<TracingMuxerImpl*>(instance_);
2645     muxer->task_runner_->PostTask([muxer, args] { muxer->AddBackends(args); });
2646     return;
2647   }
2648   // If we previously had a TracingMuxerImpl instance which was reset,
2649   // reinitialize and reuse it instead of trying to create a new one. See
2650   // ResetForTesting().
2651   if (g_prev_instance) {
2652     auto* muxer = g_prev_instance;
2653     g_prev_instance = nullptr;
2654     instance_ = muxer;
2655     muxer->task_runner_->PostTask([muxer, args] {
2656       muxer->Initialize(args);
2657       muxer->AddBackends(args);
2658     });
2659   } else {
2660     new TracingMuxerImpl(args);
2661   }
2662 }
2663 
2664 // static
ResetForTesting()2665 void TracingMuxerImpl::ResetForTesting() {
2666   // Ideally we'd tear down the entire TracingMuxerImpl, but the lifetimes of
2667   // various objects make that a non-starter. In particular:
2668   //
2669   // 1) Any thread that has entered a trace event has a TraceWriter, which holds
2670   //    a reference back to ProducerImpl::service_.
2671   //
2672   // 2) ProducerImpl::service_ has a reference back to the ProducerImpl.
2673   //
2674   // 3) ProducerImpl holds reference to TracingMuxerImpl::task_runner_, which in
2675   //    turn depends on TracingMuxerImpl itself.
2676   //
2677   // Because of this, it's not safe to deallocate TracingMuxerImpl until all
2678   // threads have dropped their TraceWriters. Since we can't really ask the
2679   // caller to guarantee this, we'll instead reset enough of the muxer's state
2680   // so that it can be reinitialized later and ensure all necessary objects from
2681   // the old state remain alive until all references have gone away.
2682   auto* muxer = reinterpret_cast<TracingMuxerImpl*>(instance_);
2683 
2684   base::WaitableEvent reset_done;
2685   auto do_reset = [muxer, &reset_done] {
2686     muxer->DestroyStoppedTraceWritersForCurrentThread();
2687     // Unregister all data sources so they don't interfere with any future
2688     // tracing sessions.
2689     for (RegisteredDataSource& rds : muxer->data_sources_) {
2690       for (RegisteredProducerBackend& backend : muxer->producer_backends_) {
2691         if (!backend.producer->service_ || !backend.producer->connected_)
2692           continue;
2693         backend.producer->service_->UnregisterDataSource(rds.descriptor.name());
2694       }
2695     }
2696     for (auto& backend : muxer->consumer_backends_) {
2697       // Check that no consumer session is currently active on any backend.
2698       for (auto& consumer : backend.consumers)
2699         PERFETTO_CHECK(!consumer->service_);
2700     }
2701     for (auto& backend : muxer->producer_backends_) {
2702       backend.producer->muxer_ = nullptr;
2703       backend.producer->DisposeConnection();
2704       muxer->dead_backends_.push_back(std::move(backend));
2705     }
2706     muxer->consumer_backends_.clear();
2707     muxer->producer_backends_.clear();
2708     muxer->interceptors_.clear();
2709 
2710     for (auto& ds : muxer->data_sources_) {
2711       ds.static_state->ResetForTesting();
2712     }
2713 
2714     muxer->data_sources_.clear();
2715     muxer->next_data_source_index_ = 0;
2716 
2717     // Free all backends without active trace writers or other inbound
2718     // references. Note that even if all the backends get swept, the muxer still
2719     // needs to stay around since |task_runner_| is assumed to be long-lived.
2720     muxer->SweepDeadBackends();
2721 
2722     // Make sure we eventually discard any per-thread trace writers from the
2723     // previous instance.
2724     muxer->muxer_id_for_testing_++;
2725 
2726     g_prev_instance = muxer;
2727     instance_ = TracingMuxerFake::Get();
2728 
2729     // Call the user provided cleanups on the muxer thread.
2730     for (auto& cb : muxer->reset_callbacks_) {
2731       cb();
2732     }
2733 
2734     reset_done.Notify();
2735   };
2736 
2737   // Some tests run the muxer and the test on the same thread. In these cases,
2738   // we can reset synchronously.
2739   if (muxer->task_runner_->RunsTasksOnCurrentThread()) {
2740     do_reset();
2741   } else {
2742     muxer->DestroyStoppedTraceWritersForCurrentThread();
2743     muxer->task_runner_->PostTask(std::move(do_reset));
2744     reset_done.Wait();
2745     // Call the user provided cleanups also on this thread.
2746     for (auto& cb : muxer->reset_callbacks_) {
2747       cb();
2748     }
2749   }
2750   muxer->reset_callbacks_.clear();
2751 }
2752 
2753 // static
Shutdown()2754 void TracingMuxerImpl::Shutdown() {
2755   auto* muxer = reinterpret_cast<TracingMuxerImpl*>(instance_);
2756 
2757   // Shutting down on the muxer thread would lead to a deadlock.
2758   PERFETTO_CHECK(!muxer->task_runner_->RunsTasksOnCurrentThread());
2759   muxer->DestroyStoppedTraceWritersForCurrentThread();
2760 
2761   std::unique_ptr<base::TaskRunner> owned_task_runner(
2762       muxer->task_runner_.get());
2763   base::WaitableEvent shutdown_done;
2764   owned_task_runner->PostTask([muxer, &shutdown_done] {
2765     // Check that no consumer session is currently active on any backend.
2766     // Producers will be automatically disconnected as a part of deleting the
2767     // muxer below.
2768     for (auto& backend : muxer->consumer_backends_) {
2769       for (auto& consumer : backend.consumers) {
2770         PERFETTO_CHECK(!consumer->service_);
2771       }
2772     }
2773     // Make sure no trace writers are lingering around on the muxer thread. Note
2774     // that we can't do this for any arbitrary thread in the process; it is the
2775     // caller's responsibility to clean them up before shutting down Perfetto.
2776     muxer->DestroyStoppedTraceWritersForCurrentThread();
2777     // The task runner must be deleted outside the muxer thread. This is done by
2778     // `owned_task_runner` above.
2779     muxer->task_runner_.release();
2780     auto* platform = muxer->platform_;
2781     delete muxer;
2782     instance_ = TracingMuxerFake::Get();
2783     platform->Shutdown();
2784     shutdown_done.Notify();
2785   });
2786   shutdown_done.Wait();
2787 }
2788 
AppendResetForTestingCallback(std::function<void ()> cb)2789 void TracingMuxerImpl::AppendResetForTestingCallback(std::function<void()> cb) {
2790   reset_callbacks_.push_back(std::move(cb));
2791 }
2792 
2793 TracingMuxer::~TracingMuxer() = default;
2794 
2795 static_assert(std::is_same<internal::BufferId, BufferID>::value,
2796               "public's BufferId and tracing/core's BufferID diverged");
2797 
2798 }  // namespace internal
2799 }  // namespace perfetto
2800