xref: /aosp_15_r20/external/perfetto/src/tracing/service/tracing_service_impl.h (revision 6dbdd20afdafa5e3ca9b8809fa73465d530080dc)
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef SRC_TRACING_SERVICE_TRACING_SERVICE_IMPL_H_
18 #define SRC_TRACING_SERVICE_TRACING_SERVICE_IMPL_H_
19 
20 #include <algorithm>
21 #include <functional>
22 #include <map>
23 #include <memory>
24 #include <optional>
25 #include <set>
26 #include <utility>
27 #include <vector>
28 
29 #include "perfetto/base/logging.h"
30 #include "perfetto/base/status.h"
31 #include "perfetto/base/time.h"
32 #include "perfetto/ext/base/circular_queue.h"
33 #include "perfetto/ext/base/clock_snapshots.h"
34 #include "perfetto/ext/base/periodic_task.h"
35 #include "perfetto/ext/base/uuid.h"
36 #include "perfetto/ext/base/weak_ptr.h"
37 #include "perfetto/ext/base/weak_runner.h"
38 #include "perfetto/ext/tracing/core/basic_types.h"
39 #include "perfetto/ext/tracing/core/client_identity.h"
40 #include "perfetto/ext/tracing/core/commit_data_request.h"
41 #include "perfetto/ext/tracing/core/observable_events.h"
42 #include "perfetto/ext/tracing/core/shared_memory_abi.h"
43 #include "perfetto/ext/tracing/core/trace_stats.h"
44 #include "perfetto/ext/tracing/core/tracing_service.h"
45 #include "perfetto/tracing/core/data_source_config.h"
46 #include "perfetto/tracing/core/data_source_descriptor.h"
47 #include "perfetto/tracing/core/forward_decls.h"
48 #include "perfetto/tracing/core/trace_config.h"
49 #include "src/android_stats/perfetto_atoms.h"
50 #include "src/tracing/core/id_allocator.h"
51 #include "src/tracing/service/clock.h"
52 #include "src/tracing/service/dependencies.h"
53 #include "src/tracing/service/random.h"
54 
55 namespace protozero {
56 class MessageFilter;
57 }
58 
59 namespace perfetto {
60 
61 namespace protos {
62 namespace gen {
63 enum TraceStats_FinalFlushOutcome : int;
64 }
65 }  // namespace protos
66 
67 class Consumer;
68 class Producer;
69 class SharedMemory;
70 class SharedMemoryArbiterImpl;
71 class TraceBuffer;
72 class TracePacket;
73 
74 // The tracing service business logic.
75 class TracingServiceImpl : public TracingService {
76  private:
77   struct DataSourceInstance;
78   struct TriggerInfo;
79 
80  public:
81   static constexpr size_t kMaxShmSize = 32 * 1024 * 1024ul;
82   static constexpr uint32_t kDataSourceStopTimeoutMs = 5000;
83   static constexpr uint8_t kSyncMarker[] = {0x82, 0x47, 0x7a, 0x76, 0xb2, 0x8d,
84                                             0x42, 0xba, 0x81, 0xdc, 0x33, 0x32,
85                                             0x6d, 0x57, 0xa0, 0x79};
86   static constexpr size_t kMaxTracePacketSliceSize =
87       128 * 1024 - 512;  // This is ipc::kIPCBufferSize - 512, see assertion in
88                          // tracing_integration_test.cc and b/195065199
89 
90   // This is a rough threshold to determine how many bytes to read from the
91   // buffers on each iteration when writing into a file. Since filtering and
92   // compression allocate memory, this effectively limits the amount of memory
93   // allocated.
94   static constexpr size_t kWriteIntoFileChunkSize = 1024 * 1024ul;
95 
96   // The implementation behind the service endpoint exposed to each producer.
97   class ProducerEndpointImpl : public TracingService::ProducerEndpoint {
98    public:
99     ProducerEndpointImpl(ProducerID,
100                          const ClientIdentity& client_identity,
101                          TracingServiceImpl*,
102                          base::TaskRunner*,
103                          Producer*,
104                          const std::string& producer_name,
105                          const std::string& sdk_version,
106                          bool in_process,
107                          bool smb_scraping_enabled);
108     ~ProducerEndpointImpl() override;
109 
110     // TracingService::ProducerEndpoint implementation.
111     void Disconnect() override;
112     void RegisterDataSource(const DataSourceDescriptor&) override;
113     void UpdateDataSource(const DataSourceDescriptor&) override;
114     void UnregisterDataSource(const std::string& name) override;
115     void RegisterTraceWriter(uint32_t writer_id,
116                              uint32_t target_buffer) override;
117     void UnregisterTraceWriter(uint32_t writer_id) override;
118     void CommitData(const CommitDataRequest&, CommitDataCallback) override;
119     void SetupSharedMemory(std::unique_ptr<SharedMemory>,
120                            size_t page_size_bytes,
121                            bool provided_by_producer);
122     std::unique_ptr<TraceWriter> CreateTraceWriter(
123         BufferID,
124         BufferExhaustedPolicy) override;
125     SharedMemoryArbiter* MaybeSharedMemoryArbiter() override;
126     bool IsShmemProvidedByProducer() const override;
127     void NotifyFlushComplete(FlushRequestID) override;
128     void NotifyDataSourceStarted(DataSourceInstanceID) override;
129     void NotifyDataSourceStopped(DataSourceInstanceID) override;
130     SharedMemory* shared_memory() const override;
131     size_t shared_buffer_page_size_kb() const override;
132     void ActivateTriggers(const std::vector<std::string>&) override;
133     void Sync(std::function<void()> callback) override;
134 
135     void OnTracingSetup();
136     void SetupDataSource(DataSourceInstanceID, const DataSourceConfig&);
137     void StartDataSource(DataSourceInstanceID, const DataSourceConfig&);
138     void StopDataSource(DataSourceInstanceID);
139     void Flush(FlushRequestID,
140                const std::vector<DataSourceInstanceID>&,
141                FlushFlags);
142     void OnFreeBuffers(const std::vector<BufferID>& target_buffers);
143     void ClearIncrementalState(const std::vector<DataSourceInstanceID>&);
144 
is_allowed_target_buffer(BufferID buffer_id)145     bool is_allowed_target_buffer(BufferID buffer_id) const {
146       return allowed_target_buffers_.count(buffer_id);
147     }
148 
buffer_id_for_writer(WriterID writer_id)149     std::optional<BufferID> buffer_id_for_writer(WriterID writer_id) const {
150       const auto it = writers_.find(writer_id);
151       if (it != writers_.end())
152         return it->second;
153       return std::nullopt;
154     }
155 
156     bool IsAndroidProcessFrozen();
uid()157     uid_t uid() const { return client_identity_.uid(); }
pid()158     pid_t pid() const { return client_identity_.pid(); }
client_identity()159     const ClientIdentity& client_identity() const { return client_identity_; }
160 
161    private:
162     friend class TracingServiceImpl;
163     ProducerEndpointImpl(const ProducerEndpointImpl&) = delete;
164     ProducerEndpointImpl& operator=(const ProducerEndpointImpl&) = delete;
165 
166     ProducerID const id_;
167     ClientIdentity const client_identity_;
168     TracingServiceImpl* const service_;
169     Producer* producer_;
170     std::unique_ptr<SharedMemory> shared_memory_;
171     size_t shared_buffer_page_size_kb_ = 0;
172     SharedMemoryABI shmem_abi_;
173     size_t shmem_size_hint_bytes_ = 0;
174     size_t shmem_page_size_hint_bytes_ = 0;
175     bool is_shmem_provided_by_producer_ = false;
176     const std::string name_;
177     std::string sdk_version_;
178     bool in_process_;
179     bool smb_scraping_enabled_;
180 
181     // Set of the global target_buffer IDs that the producer is configured to
182     // write into in any active tracing session.
183     std::set<BufferID> allowed_target_buffers_;
184 
185     // Maps registered TraceWriter IDs to their target buffers as registered by
186     // the producer. Note that producers aren't required to register their
187     // writers, so we may see commits of chunks with WriterIDs that aren't
188     // contained in this map. However, if a producer does register a writer, the
189     // service will prevent the writer from writing into any other buffer than
190     // the one associated with it here. The BufferIDs stored in this map are
191     // untrusted, so need to be verified against |allowed_target_buffers_|
192     // before use.
193     std::map<WriterID, BufferID> writers_;
194 
195     // This is used only in in-process configurations.
196     // SharedMemoryArbiterImpl methods themselves are thread-safe.
197     std::unique_ptr<SharedMemoryArbiterImpl> inproc_shmem_arbiter_;
198 
199     PERFETTO_THREAD_CHECKER(thread_checker_)
200     base::WeakRunner weak_runner_;
201   };
202 
203   // The implementation behind the service endpoint exposed to each consumer.
204   class ConsumerEndpointImpl : public TracingService::ConsumerEndpoint {
205    public:
206     ConsumerEndpointImpl(TracingServiceImpl*,
207                          base::TaskRunner*,
208                          Consumer*,
209                          uid_t uid);
210     ~ConsumerEndpointImpl() override;
211 
212     void NotifyOnTracingDisabled(const std::string& error);
213 
214     // TracingService::ConsumerEndpoint implementation.
215     void EnableTracing(const TraceConfig&, base::ScopedFile) override;
216     void ChangeTraceConfig(const TraceConfig& cfg) override;
217     void StartTracing() override;
218     void DisableTracing() override;
219     void ReadBuffers() override;
220     void FreeBuffers() override;
221     void Flush(uint32_t timeout_ms, FlushCallback, FlushFlags) override;
222     void Detach(const std::string& key) override;
223     void Attach(const std::string& key) override;
224     void GetTraceStats() override;
225     void ObserveEvents(uint32_t enabled_event_types) override;
226     void QueryServiceState(QueryServiceStateArgs,
227                            QueryServiceStateCallback) override;
228     void QueryCapabilities(QueryCapabilitiesCallback) override;
229     void SaveTraceForBugreport(SaveTraceForBugreportCallback) override;
230     void CloneSession(CloneSessionArgs) override;
231 
232     // Will queue a task to notify the consumer about the state change.
233     void OnDataSourceInstanceStateChange(const ProducerEndpointImpl&,
234                                          const DataSourceInstance&);
235     void OnAllDataSourcesStarted();
236 
GetWeakPtr()237     base::WeakPtr<ConsumerEndpointImpl> GetWeakPtr() {
238       return weak_ptr_factory_.GetWeakPtr();
239     }
240 
241    private:
242     friend class TracingServiceImpl;
243     ConsumerEndpointImpl(const ConsumerEndpointImpl&) = delete;
244     ConsumerEndpointImpl& operator=(const ConsumerEndpointImpl&) = delete;
245 
246     void NotifyCloneSnapshotTrigger(const TriggerInfo& trigger_name);
247 
248     // Returns a pointer to an ObservableEvents object that the caller can fill
249     // and schedules a task to send the ObservableEvents to the consumer.
250     ObservableEvents* AddObservableEvents();
251 
252     base::TaskRunner* const task_runner_;
253     TracingServiceImpl* const service_;
254     Consumer* const consumer_;
255     uid_t const uid_;
256     TracingSessionID tracing_session_id_ = 0;
257 
258     // Whether the consumer is interested in DataSourceInstance state change
259     // events.
260     uint32_t observable_events_mask_ = 0;
261 
262     // ObservableEvents that will be sent to the consumer. If set, a task to
263     // flush the events to the consumer has been queued.
264     std::unique_ptr<ObservableEvents> observable_events_;
265 
266     PERFETTO_THREAD_CHECKER(thread_checker_)
267     base::WeakPtrFactory<ConsumerEndpointImpl> weak_ptr_factory_;  // Keep last.
268   };
269 
270   class RelayEndpointImpl : public TracingService::RelayEndpoint {
271    public:
272     using SyncMode = RelayEndpoint::SyncMode;
273 
274     struct SyncedClockSnapshots {
SyncedClockSnapshotsSyncedClockSnapshots275       SyncedClockSnapshots(SyncMode _sync_mode,
276                            base::ClockSnapshotVector _client_clocks,
277                            base::ClockSnapshotVector _host_clocks)
278           : sync_mode(_sync_mode),
279             client_clocks(std::move(_client_clocks)),
280             host_clocks(std::move(_host_clocks)) {}
281       SyncMode sync_mode;
282       base::ClockSnapshotVector client_clocks;
283       base::ClockSnapshotVector host_clocks;
284     };
285 
286     explicit RelayEndpointImpl(RelayClientID relay_client_id,
287                                TracingServiceImpl* service);
288     ~RelayEndpointImpl() override;
289     void SyncClocks(SyncMode sync_mode,
290                     base::ClockSnapshotVector client_clocks,
291                     base::ClockSnapshotVector host_clocks) override;
292     void Disconnect() override;
293 
machine_id()294     MachineID machine_id() const { return relay_client_id_.first; }
295 
synced_clocks()296     base::CircularQueue<SyncedClockSnapshots>& synced_clocks() {
297       return synced_clocks_;
298     }
299 
300    private:
301     RelayEndpointImpl(const RelayEndpointImpl&) = delete;
302     RelayEndpointImpl& operator=(const RelayEndpointImpl&) = delete;
303 
304     RelayClientID relay_client_id_;
305     TracingServiceImpl* const service_;
306     base::CircularQueue<SyncedClockSnapshots> synced_clocks_;
307 
308     PERFETTO_THREAD_CHECKER(thread_checker_)
309   };
310 
311   explicit TracingServiceImpl(std::unique_ptr<SharedMemory::Factory>,
312                               base::TaskRunner*,
313                               tracing_service::Dependencies,
314                               InitOpts = {});
315   ~TracingServiceImpl() override;
316 
317   // Called by ProducerEndpointImpl.
318   void DisconnectProducer(ProducerID);
319   void RegisterDataSource(ProducerID, const DataSourceDescriptor&);
320   void UpdateDataSource(ProducerID, const DataSourceDescriptor&);
321   void UnregisterDataSource(ProducerID, const std::string& name);
322   void CopyProducerPageIntoLogBuffer(ProducerID,
323                                      const ClientIdentity&,
324                                      WriterID,
325                                      ChunkID,
326                                      BufferID,
327                                      uint16_t num_fragments,
328                                      uint8_t chunk_flags,
329                                      bool chunk_complete,
330                                      const uint8_t* src,
331                                      size_t size);
332   void ApplyChunkPatches(ProducerID,
333                          const std::vector<CommitDataRequest::ChunkToPatch>&);
334   void NotifyFlushDoneForProducer(ProducerID, FlushRequestID);
335   void NotifyDataSourceStarted(ProducerID, DataSourceInstanceID);
336   void NotifyDataSourceStopped(ProducerID, DataSourceInstanceID);
337   void ActivateTriggers(ProducerID, const std::vector<std::string>& triggers);
338 
339   // Called by ConsumerEndpointImpl.
340   bool DetachConsumer(ConsumerEndpointImpl*, const std::string& key);
341   bool AttachConsumer(ConsumerEndpointImpl*, const std::string& key);
342   void DisconnectConsumer(ConsumerEndpointImpl*);
343   base::Status EnableTracing(ConsumerEndpointImpl*,
344                              const TraceConfig&,
345                              base::ScopedFile);
346   void ChangeTraceConfig(ConsumerEndpointImpl*, const TraceConfig&);
347 
348   void StartTracing(TracingSessionID);
349   void DisableTracing(TracingSessionID, bool disable_immediately = false);
350   void Flush(TracingSessionID tsid,
351              uint32_t timeout_ms,
352              ConsumerEndpoint::FlushCallback,
353              FlushFlags);
354   void FlushAndDisableTracing(TracingSessionID);
355   base::Status FlushAndCloneSession(ConsumerEndpointImpl*,
356                                     ConsumerEndpoint::CloneSessionArgs);
357 
358   // Starts reading the internal tracing buffers from the tracing session `tsid`
359   // and sends them to `*consumer` (which must be != nullptr).
360   //
361   // Only reads a limited amount of data in one call. If there's more data,
362   // immediately schedules itself on a PostTask.
363   //
364   // Returns false in case of error.
365   bool ReadBuffersIntoConsumer(TracingSessionID tsid,
366                                ConsumerEndpointImpl* consumer);
367 
368   // Reads all the tracing buffers from the tracing session `tsid` and writes
369   // them into the associated file.
370   //
371   // Reads all the data in the buffers (or until the file is full) before
372   // returning.
373   //
374   // If the tracing session write_period_ms is 0, the file is full or there has
375   // been an error, flushes the file and closes it. Otherwise, schedules itself
376   // to be executed after write_period_ms.
377   //
378   // Returns false in case of error.
379   bool ReadBuffersIntoFile(TracingSessionID);
380 
381   void FreeBuffers(TracingSessionID);
382 
383   // Service implementation.
384   std::unique_ptr<TracingService::ProducerEndpoint> ConnectProducer(
385       Producer*,
386       const ClientIdentity& client_identity,
387       const std::string& producer_name,
388       size_t shared_memory_size_hint_bytes = 0,
389       bool in_process = false,
390       ProducerSMBScrapingMode smb_scraping_mode =
391           ProducerSMBScrapingMode::kDefault,
392       size_t shared_memory_page_size_hint_bytes = 0,
393       std::unique_ptr<SharedMemory> shm = nullptr,
394       const std::string& sdk_version = {}) override;
395 
396   std::unique_ptr<TracingService::ConsumerEndpoint> ConnectConsumer(
397       Consumer*,
398       uid_t) override;
399 
400   std::unique_ptr<TracingService::RelayEndpoint> ConnectRelayClient(
401       RelayClientID) override;
402 
403   void DisconnectRelayClient(RelayClientID);
404 
405   // Set whether SMB scraping should be enabled by default or not. Producers can
406   // override this setting for their own SMBs.
SetSMBScrapingEnabled(bool enabled)407   void SetSMBScrapingEnabled(bool enabled) override {
408     smb_scraping_enabled_ = enabled;
409   }
410 
411   // Exposed mainly for testing.
num_producers()412   size_t num_producers() const { return producers_.size(); }
413   ProducerEndpointImpl* GetProducer(ProducerID) const;
414 
415  private:
416   struct TriggerHistory {
417     int64_t timestamp_ns;
418     uint64_t name_hash;
419 
420     bool operator<(const TriggerHistory& other) const {
421       return timestamp_ns < other.timestamp_ns;
422     }
423   };
424 
425   struct RegisteredDataSource {
426     ProducerID producer_id;
427     DataSourceDescriptor descriptor;
428   };
429 
430   // Represents an active data source for a tracing session.
431   struct DataSourceInstance {
DataSourceInstanceDataSourceInstance432     DataSourceInstance(DataSourceInstanceID id,
433                        const DataSourceConfig& cfg,
434                        const std::string& ds_name,
435                        bool notify_on_start,
436                        bool notify_on_stop,
437                        bool handles_incremental_state_invalidation,
438                        bool no_flush_)
439         : instance_id(id),
440           config(cfg),
441           data_source_name(ds_name),
442           will_notify_on_start(notify_on_start),
443           will_notify_on_stop(notify_on_stop),
444           handles_incremental_state_clear(
445               handles_incremental_state_invalidation),
446           no_flush(no_flush_) {}
447     DataSourceInstance(const DataSourceInstance&) = delete;
448     DataSourceInstance& operator=(const DataSourceInstance&) = delete;
449 
450     DataSourceInstanceID instance_id;
451     DataSourceConfig config;
452     std::string data_source_name;
453     bool will_notify_on_start;
454     bool will_notify_on_stop;
455     bool handles_incremental_state_clear;
456     bool no_flush;
457 
458     enum DataSourceInstanceState {
459       CONFIGURED,
460       STARTING,
461       STARTED,
462       STOPPING,
463       STOPPED
464     };
465     DataSourceInstanceState state = CONFIGURED;
466   };
467 
468   struct PendingFlush {
469     std::set<ProducerID> producers;
470     ConsumerEndpoint::FlushCallback callback;
PendingFlushPendingFlush471     explicit PendingFlush(decltype(callback) cb) : callback(std::move(cb)) {}
472   };
473 
474   using PendingCloneID = uint64_t;
475 
476   struct TriggerInfo {
477     uint64_t boot_time_ns = 0;
478     std::string trigger_name;
479     std::string producer_name;
480     uid_t producer_uid = 0;
481   };
482 
483   struct PendingClone {
484     size_t pending_flush_cnt = 0;
485     // This vector might not be populated all at once. Some buffers might be
486     // nullptr while flushing is not done.
487     std::vector<std::unique_ptr<TraceBuffer>> buffers;
488     bool flush_failed = false;
489     base::WeakPtr<ConsumerEndpointImpl> weak_consumer;
490     bool skip_trace_filter = false;
491     std::optional<TriggerInfo> clone_trigger;
492   };
493 
494   // Holds the state of a tracing session. A tracing session is uniquely bound
495   // a specific Consumer. Each Consumer can own one or more sessions.
496   struct TracingSession {
497     enum State {
498       DISABLED = 0,
499       CONFIGURED,
500       STARTED,
501       DISABLING_WAITING_STOP_ACKS,
502       CLONED_READ_ONLY,
503     };
504 
505     TracingSession(TracingSessionID,
506                    ConsumerEndpointImpl*,
507                    const TraceConfig&,
508                    base::TaskRunner*);
509     TracingSession(TracingSession&&) = delete;
510     TracingSession& operator=(TracingSession&&) = delete;
511 
num_buffersTracingSession512     size_t num_buffers() const { return buffers_index.size(); }
513 
flush_timeout_msTracingSession514     uint32_t flush_timeout_ms() {
515       uint32_t timeout_ms = config.flush_timeout_ms();
516       return timeout_ms ? timeout_ms : kDefaultFlushTimeoutMs;
517     }
518 
data_source_stop_timeout_msTracingSession519     uint32_t data_source_stop_timeout_ms() {
520       uint32_t timeout_ms = config.data_source_stop_timeout_ms();
521       return timeout_ms ? timeout_ms : kDataSourceStopTimeoutMs;
522     }
523 
GetPacketSequenceIDTracingSession524     PacketSequenceID GetPacketSequenceID(MachineID machine_id,
525                                          ProducerID producer_id,
526                                          WriterID writer_id) {
527       auto key = std::make_tuple(machine_id, producer_id, writer_id);
528       auto it = packet_sequence_ids.find(key);
529       if (it != packet_sequence_ids.end())
530         return it->second;
531       // We shouldn't run out of sequence IDs (producer ID is 16 bit, writer IDs
532       // are limited to 1024).
533       static_assert(kMaxPacketSequenceID > kMaxProducerID * kMaxWriterID,
534                     "PacketSequenceID value space doesn't cover service "
535                     "sequence ID and all producer/writer ID combinations!");
536       PERFETTO_DCHECK(last_packet_sequence_id < kMaxPacketSequenceID);
537       PacketSequenceID sequence_id = ++last_packet_sequence_id;
538       packet_sequence_ids[key] = sequence_id;
539       return sequence_id;
540     }
541 
GetDataSourceInstanceTracingSession542     DataSourceInstance* GetDataSourceInstance(
543         ProducerID producer_id,
544         DataSourceInstanceID instance_id) {
545       for (auto& inst_kv : data_source_instances) {
546         if (inst_kv.first != producer_id ||
547             inst_kv.second.instance_id != instance_id) {
548           continue;
549         }
550         return &inst_kv.second;
551       }
552       return nullptr;
553     }
554 
AllDataSourceInstancesStartedTracingSession555     bool AllDataSourceInstancesStarted() {
556       return std::all_of(
557           data_source_instances.begin(), data_source_instances.end(),
558           [](decltype(data_source_instances)::const_reference x) {
559             return x.second.state == DataSourceInstance::STARTED;
560           });
561     }
562 
AllDataSourceInstancesStoppedTracingSession563     bool AllDataSourceInstancesStopped() {
564       return std::all_of(
565           data_source_instances.begin(), data_source_instances.end(),
566           [](decltype(data_source_instances)::const_reference x) {
567             return x.second.state == DataSourceInstance::STOPPED;
568           });
569     }
570 
571     // Checks whether |clone_uid| is allowed to clone the current tracing
572     // session.
573     bool IsCloneAllowed(uid_t clone_uid) const;
574 
575     const TracingSessionID id;
576 
577     // The consumer that started the session.
578     // Can be nullptr if the consumer detached from the session.
579     ConsumerEndpointImpl* consumer_maybe_null;
580 
581     // Unix uid of the consumer. This is valid even after the consumer detaches
582     // and does not change for the entire duration of the session. It is used to
583     // prevent that a consumer re-attaches to a session from a different uid.
584     uid_t const consumer_uid;
585 
586     // The list of triggers this session received while alive and the time they
587     // were received at. This is used to insert 'fake' packets back to the
588     // consumer so they can tell when some event happened. The order matches the
589     // order they were received.
590     std::vector<TriggerInfo> received_triggers;
591 
592     // The trace config provided by the Consumer when calling
593     // EnableTracing(), plus any updates performed by ChangeTraceConfig.
594     TraceConfig config;
595 
596     // List of data source instances that have been enabled on the various
597     // producers for this tracing session.
598     std::multimap<ProducerID, DataSourceInstance> data_source_instances;
599 
600     // For each Flush(N) request, keeps track of the set of producers for which
601     // we are still awaiting a NotifyFlushComplete(N) ack.
602     std::map<FlushRequestID, PendingFlush> pending_flushes;
603 
604     // For each Clone request, keeps track of the flushes acknowledgement that
605     // we are still waiting for.
606     std::map<PendingCloneID, PendingClone> pending_clones;
607 
608     PendingCloneID last_pending_clone_id_ = 0;
609 
610     // Maps a per-trace-session buffer index into the corresponding global
611     // BufferID (shared namespace amongst all consumers). This vector has as
612     // many entries as |config.buffers_size()|.
613     std::vector<BufferID> buffers_index;
614 
615     std::map<std::tuple<MachineID, ProducerID, WriterID>, PacketSequenceID>
616         packet_sequence_ids;
617     PacketSequenceID last_packet_sequence_id = kServicePacketSequenceID;
618 
619     // Whether we should emit the trace stats next time we reach EOF while
620     // performing ReadBuffers.
621     bool should_emit_stats = false;
622 
623     // Whether we should emit the sync marker the next time ReadBuffers() is
624     // called.
625     bool should_emit_sync_marker = false;
626 
627     // Whether we put the initial packets (trace config, system info,
628     // etc.) into the trace output yet.
629     bool did_emit_initial_packets = false;
630 
631     // Whether we emitted clock offsets for relay clients yet.
632     bool did_emit_remote_clock_sync_ = false;
633 
634     // Whether we should compress TracePackets after reading them.
635     bool compress_deflate = false;
636 
637     // The number of received triggers we've emitted into the trace output.
638     size_t num_triggers_emitted_into_trace = 0;
639 
640     // Packets that failed validation of the TrustedPacket.
641     uint64_t invalid_packets = 0;
642 
643     // Flush() stats. See comments in trace_stats.proto for more.
644     uint64_t flushes_requested = 0;
645     uint64_t flushes_succeeded = 0;
646     uint64_t flushes_failed = 0;
647 
648     // Outcome of the final Flush() done by FlushAndDisableTracing().
649     protos::gen::TraceStats_FinalFlushOutcome final_flush_outcome{};
650 
651     // Set to true on the first call to MaybeNotifyAllDataSourcesStarted().
652     bool did_notify_all_data_source_started = false;
653 
654     // Stores simple lifecycle events of a particular type (i.e. associated with
655     // a single field id in the TracingServiceEvent proto).
656     struct LifecycleEvent {
657       LifecycleEvent(uint32_t f_id, uint32_t m_size = 1)
field_idTracingSession::LifecycleEvent658           : field_id(f_id), max_size(m_size), timestamps(m_size) {}
659 
660       // The field id of the event in the TracingServiceEvent proto.
661       uint32_t field_id;
662 
663       // Stores the max size of |timestamps|. Set to 1 by default (in
664       // the constructor) but can be overriden in TraceSession constructor
665       // if a larger size is required.
666       uint32_t max_size;
667 
668       // Stores the timestamps emitted for each event type (in nanoseconds).
669       // Emitted into the trace and cleared when the consumer next calls
670       // ReadBuffers.
671       base::CircularQueue<int64_t> timestamps;
672     };
673     std::vector<LifecycleEvent> lifecycle_events;
674 
675     // Stores arbitrary lifecycle events that don't fit in lifecycle_events as
676     // serialized TracePacket protos.
677     struct ArbitraryLifecycleEvent {
678       int64_t timestamp;
679       std::vector<uint8_t> data;
680     };
681 
682     std::optional<ArbitraryLifecycleEvent> slow_start_event;
683 
684     std::vector<ArbitraryLifecycleEvent> last_flush_events;
685 
686     using ClockSnapshotData = base::ClockSnapshotVector;
687 
688     // Initial clock snapshot, captured at trace start time (when state goes to
689     // TracingSession::STARTED). Emitted into the trace when the consumer first
690     // calls ReadBuffers().
691     ClockSnapshotData initial_clock_snapshot;
692 
693     // Stores clock snapshots to emit into the trace as a ring buffer. This
694     // buffer is populated both periodically and when lifecycle events happen
695     // but only when significant clock drift is detected. Emitted into the trace
696     // and cleared when the consumer next calls ReadBuffers().
697     base::CircularQueue<ClockSnapshotData> clock_snapshot_ring_buffer;
698 
699     State state = DISABLED;
700 
701     // If the consumer detached the session, this variable defines the key used
702     // for identifying the session later when reattaching.
703     std::string detach_key;
704 
705     // This is set when the Consumer calls sets |write_into_file| == true in the
706     // TraceConfig. In this case this represents the file we should stream the
707     // trace packets into, rather than returning it to the consumer via
708     // OnTraceData().
709     base::ScopedFile write_into_file;
710     uint32_t write_period_ms = 0;
711     uint64_t max_file_size_bytes = 0;
712     uint64_t bytes_written_into_file = 0;
713 
714     // Periodic task for snapshotting service events (e.g. clocks, sync markers
715     // etc)
716     base::PeriodicTask snapshot_periodic_task;
717 
718     // Deferred task that stops the trace when |duration_ms| expires. This is
719     // to handle the case of |prefer_suspend_clock_for_duration| which cannot
720     // use PostDelayedTask.
721     base::PeriodicTask timed_stop_task;
722 
723     // When non-NULL the packets should be post-processed using the filter.
724     std::unique_ptr<protozero::MessageFilter> trace_filter;
725     uint64_t filter_input_packets = 0;
726     uint64_t filter_input_bytes = 0;
727     uint64_t filter_output_bytes = 0;
728     uint64_t filter_errors = 0;
729     uint64_t filter_time_taken_ns = 0;
730     std::vector<uint64_t> filter_bytes_discarded_per_buffer;
731 
732     // A randomly generated trace identifier. Note that this does NOT always
733     // match the requested TraceConfig.trace_uuid_msb/lsb. Spcifically, it does
734     // until a gap-less snapshot is requested. Each snapshot re-generates the
735     // uuid to avoid emitting two different traces with the same uuid.
736     base::Uuid trace_uuid;
737 
738     // This is set when the clone operation was caused by a clone trigger.
739     std::optional<TriggerInfo> clone_trigger;
740 
741     // NOTE: when adding new fields here consider whether that state should be
742     // copied over in DoCloneSession() or not. Ask yourself: is this a
743     // "runtime state" (e.g. active data sources) or a "trace (meta)data state"?
744     // If the latter, it should be handled by DoCloneSession()).
745   };
746 
747   TracingServiceImpl(const TracingServiceImpl&) = delete;
748   TracingServiceImpl& operator=(const TracingServiceImpl&) = delete;
749 
750   bool IsInitiatorPrivileged(const TracingSession&);
751 
752   DataSourceInstance* SetupDataSource(const TraceConfig::DataSource&,
753                                       const TraceConfig::ProducerConfig&,
754                                       const RegisteredDataSource&,
755                                       TracingSession*);
756 
757   // Returns the next available ProducerID that is not in |producers_|.
758   ProducerID GetNextProducerID();
759 
760   // Returns a pointer to the |tracing_sessions_| entry or nullptr if the
761   // session doesn't exists.
762   TracingSession* GetTracingSession(TracingSessionID);
763 
764   // Returns a pointer to the |tracing_sessions_| entry with
765   // |unique_session_name| in the config (or nullptr if the
766   // session doesn't exists). CLONED_READ_ONLY sessions are ignored.
767   TracingSession* GetTracingSessionByUniqueName(
768       const std::string& unique_session_name);
769 
770   // Returns a pointer to the tracing session that has the highest
771   // TraceConfig.bugreport_score, if any, or nullptr.
772   TracingSession* FindTracingSessionWithMaxBugreportScore();
773 
774   // Returns a pointer to the |tracing_sessions_| entry, matching the given
775   // uid and detach key, or nullptr if no such session exists.
776   TracingSession* GetDetachedSession(uid_t, const std::string& key);
777 
778   // Update the memory guard rail by using the latest information from the
779   // shared memory and trace buffers.
780   void UpdateMemoryGuardrail();
781 
782   uint32_t DelayToNextWritePeriodMs(const TracingSession&);
783   void StartDataSourceInstance(ProducerEndpointImpl*,
784                                TracingSession*,
785                                DataSourceInstance*);
786   void StopDataSourceInstance(ProducerEndpointImpl*,
787                               TracingSession*,
788                               DataSourceInstance*,
789                               bool disable_immediately);
790   void PeriodicSnapshotTask(TracingSessionID);
791   void MaybeSnapshotClocksIntoRingBuffer(TracingSession*);
792   bool SnapshotClocks(TracingSession::ClockSnapshotData*);
793   void SnapshotLifecyleEvent(TracingSession*,
794                              uint32_t field_id,
795                              bool snapshot_clocks);
796   void EmitClockSnapshot(TracingSession*,
797                          TracingSession::ClockSnapshotData,
798                          std::vector<TracePacket>*);
799   void EmitSyncMarker(std::vector<TracePacket>*);
800   void EmitStats(TracingSession*, std::vector<TracePacket>*);
801   TraceStats GetTraceStats(TracingSession*);
802   void EmitLifecycleEvents(TracingSession*, std::vector<TracePacket>*);
803   void EmitUuid(TracingSession*, std::vector<TracePacket>*);
804   void MaybeEmitTraceConfig(TracingSession*, std::vector<TracePacket>*);
805   void EmitSystemInfo(std::vector<TracePacket>*);
806   void MaybeEmitCloneTrigger(TracingSession*, std::vector<TracePacket>*);
807   void MaybeEmitReceivedTriggers(TracingSession*, std::vector<TracePacket>*);
808   void MaybeEmitRemoteClockSync(TracingSession*, std::vector<TracePacket>*);
809   void MaybeNotifyAllDataSourcesStarted(TracingSession*);
810   void OnFlushTimeout(TracingSessionID, FlushRequestID, FlushFlags);
811   void OnDisableTracingTimeout(TracingSessionID);
812   void OnAllDataSourceStartedTimeout(TracingSessionID);
813   void DisableTracingNotifyConsumerAndFlushFile(TracingSession*);
814   void PeriodicFlushTask(TracingSessionID, bool post_next_only);
815   void CompleteFlush(TracingSessionID tsid,
816                      ConsumerEndpoint::FlushCallback callback,
817                      bool success);
818   void ScrapeSharedMemoryBuffers(TracingSession*, ProducerEndpointImpl*);
819   void PeriodicClearIncrementalStateTask(TracingSessionID, bool post_next_only);
820   TraceBuffer* GetBufferByID(BufferID);
821   void FlushDataSourceInstances(
822       TracingSession*,
823       uint32_t timeout_ms,
824       const std::map<ProducerID, std::vector<DataSourceInstanceID>>&,
825       ConsumerEndpoint::FlushCallback,
826       FlushFlags);
827   std::map<ProducerID, std::vector<DataSourceInstanceID>>
828   GetFlushableDataSourceInstancesForBuffers(TracingSession*,
829                                             const std::set<BufferID>&);
830   bool DoCloneBuffers(TracingSession*,
831                       const std::set<BufferID>&,
832                       std::vector<std::unique_ptr<TraceBuffer>>*);
833   base::Status FinishCloneSession(ConsumerEndpointImpl*,
834                                   TracingSessionID,
835                                   std::vector<std::unique_ptr<TraceBuffer>>,
836                                   bool skip_filter,
837                                   bool final_flush_outcome,
838                                   std::optional<TriggerInfo> clone_trigger,
839                                   base::Uuid*);
840   void OnFlushDoneForClone(TracingSessionID src_tsid,
841                            PendingCloneID clone_id,
842                            const std::set<BufferID>& buf_ids,
843                            bool final_flush_outcome);
844 
845   // Returns true if `*tracing_session` is waiting for a trigger that hasn't
846   // happened.
847   static bool IsWaitingForTrigger(TracingSession* tracing_session);
848 
849   // Reads the buffers from `*tracing_session` and returns them (along with some
850   // metadata packets).
851   //
852   // The function stops when the cumulative size of the return packets exceeds
853   // `threshold` (so it's not a strict upper bound) and sets `*has_more` to
854   // true, or when there are no more packets (and sets `*has_more` to false).
855   std::vector<TracePacket> ReadBuffers(TracingSession* tracing_session,
856                                        size_t threshold,
857                                        bool* has_more);
858 
859   // If `*tracing_session` has a filter, applies it to `*packets`. Doesn't
860   // change the number of `*packets`, only their content.
861   void MaybeFilterPackets(TracingSession* tracing_session,
862                           std::vector<TracePacket>* packets);
863 
864   // If `*tracing_session` has compression enabled, compress `*packets`.
865   void MaybeCompressPackets(TracingSession* tracing_session,
866                             std::vector<TracePacket>* packets);
867 
868   // If `*tracing_session` is configured to write into a file, writes `packets`
869   // into the file.
870   //
871   // Returns true if the file should be closed (because it's full or there has
872   // been an error), false otherwise.
873   bool WriteIntoFile(TracingSession* tracing_session,
874                      std::vector<TracePacket> packets);
875   void OnStartTriggersTimeout(TracingSessionID tsid);
876   void MaybeLogUploadEvent(const TraceConfig&,
877                            const base::Uuid&,
878                            PerfettoStatsdAtom atom,
879                            const std::string& trigger_name = "");
880   void MaybeLogTriggerEvent(const TraceConfig&,
881                             PerfettoTriggerAtom atom,
882                             const std::string& trigger_name);
883   size_t PurgeExpiredAndCountTriggerInWindow(int64_t now_ns,
884                                              uint64_t trigger_name_hash);
885   void StopOnDurationMsExpiry(TracingSessionID);
886 
887   std::unique_ptr<tracing_service::Clock> clock_;
888   std::unique_ptr<tracing_service::Random> random_;
889   const InitOpts init_opts_;
890   std::unique_ptr<SharedMemory::Factory> shm_factory_;
891   ProducerID last_producer_id_ = 0;
892   DataSourceInstanceID last_data_source_instance_id_ = 0;
893   TracingSessionID last_tracing_session_id_ = 0;
894   FlushRequestID last_flush_request_id_ = 0;
895   uid_t uid_ = 0;
896 
897   // Buffer IDs are global across all consumers (because a Producer can produce
898   // data for more than one trace session, hence more than one consumer).
899   IdAllocator<BufferID> buffer_ids_;
900 
901   std::multimap<std::string /*name*/, RegisteredDataSource> data_sources_;
902   std::map<ProducerID, ProducerEndpointImpl*> producers_;
903   std::map<RelayClientID, RelayEndpointImpl*> relay_clients_;
904   std::map<TracingSessionID, TracingSession> tracing_sessions_;
905   std::map<BufferID, std::unique_ptr<TraceBuffer>> buffers_;
906   std::map<std::string, int64_t> session_to_last_trace_s_;
907 
908   // Contains timestamps of triggers.
909   // The queue is sorted by timestamp and invocations older than 24 hours are
910   // purged when a trigger happens.
911   base::CircularQueue<TriggerHistory> trigger_history_;
912 
913   bool smb_scraping_enabled_ = false;
914   bool lockdown_mode_ = false;
915 
916   uint8_t sync_marker_packet_[32];  // Lazily initialized.
917   size_t sync_marker_packet_size_ = 0;
918 
919   // Stats.
920   uint64_t chunks_discarded_ = 0;
921   uint64_t patches_discarded_ = 0;
922 
923   PERFETTO_THREAD_CHECKER(thread_checker_)
924 
925   base::WeakRunner weak_runner_;
926 };
927 
928 }  // namespace perfetto
929 
930 #endif  // SRC_TRACING_SERVICE_TRACING_SERVICE_IMPL_H_
931