xref: /aosp_15_r20/external/perfetto/src/profiling/memory/heapprofd_producer.h (revision 6dbdd20afdafa5e3ca9b8809fa73465d530080dc)
1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef SRC_PROFILING_MEMORY_HEAPPROFD_PRODUCER_H_
18 #define SRC_PROFILING_MEMORY_HEAPPROFD_PRODUCER_H_
19 
20 #include <array>
21 #include <cinttypes>
22 #include <functional>
23 #include <map>
24 #include <optional>
25 #include <vector>
26 
27 #include "perfetto/base/task_runner.h"
28 #include "perfetto/ext/base/unix_socket.h"
29 #include "perfetto/ext/base/unix_task_runner.h"
30 #include "perfetto/ext/tracing/core/basic_types.h"
31 #include "perfetto/ext/tracing/core/producer.h"
32 #include "perfetto/ext/tracing/core/trace_writer.h"
33 #include "perfetto/ext/tracing/core/tracing_service.h"
34 #include "perfetto/tracing/core/data_source_config.h"
35 #include "perfetto/tracing/core/forward_decls.h"
36 #include "src/profiling/common/interning_output.h"
37 #include "src/profiling/common/proc_utils.h"
38 #include "src/profiling/common/profiler_guardrails.h"
39 #include "src/profiling/memory/bookkeeping.h"
40 #include "src/profiling/memory/bookkeeping_dump.h"
41 #include "src/profiling/memory/log_histogram.h"
42 #include "src/profiling/memory/shared_ring_buffer.h"
43 #include "src/profiling/memory/system_property.h"
44 #include "src/profiling/memory/unwinding.h"
45 #include "src/profiling/memory/unwound_messages.h"
46 
47 #include "protos/perfetto/config/profiling/heapprofd_config.gen.h"
48 
49 namespace perfetto {
50 namespace profiling {
51 
52 using HeapprofdConfig = protos::gen::HeapprofdConfig;
53 
54 struct Process {
55   pid_t pid;
56   std::string cmdline;
57 };
58 
59 // TODO(rsavitski): central daemon can do less work if it knows that the global
60 // operating mode is fork-based, as it then will not be interacting with the
61 // clients. This can be implemented as an additional mode here.
62 enum class HeapprofdMode { kCentral, kChild };
63 
64 bool HeapprofdConfigToClientConfiguration(
65     const HeapprofdConfig& heapprofd_config,
66     ClientConfiguration* cli_config);
67 
68 // Heap profiling producer. Can be instantiated in two modes, central and
69 // child (also referred to as fork mode).
70 //
71 // The central mode producer is instantiated by the system heapprofd daemon. Its
72 // primary responsibility is activating profiling (via system properties and
73 // signals) in targets identified by profiling configs. On debug platform
74 // builds, the central producer can also handle the out-of-process unwinding &
75 // writing of the profiles for all client processes.
76 //
77 // An alternative model is where the central heapprofd triggers the profiling in
78 // the target process, but the latter fork-execs a private heapprofd binary to
79 // handle unwinding only for that process. The forked heapprofd instantiates
80 // this producer in the "child" mode. In this scenario, the profiled process
81 // never talks to the system daemon.
82 //
83 // TODO(fmayer||rsavitski): cover interesting invariants/structure of the
84 // implementation (e.g. number of data sources in child mode), including
85 // threading structure.
86 class HeapprofdProducer : public Producer, public UnwindingWorker::Delegate {
87  public:
88   friend class SocketDelegate;
89 
90   // TODO(fmayer): Split into two delegates for the listening socket in kCentral
91   // and for the per-client sockets to make this easier to understand?
92   // Alternatively, find a better name for this.
93   class SocketDelegate : public base::UnixSocket::EventListener {
94    public:
SocketDelegate(HeapprofdProducer * producer)95     explicit SocketDelegate(HeapprofdProducer* producer)
96         : producer_(producer) {}
97 
98     void OnDisconnect(base::UnixSocket* self) override;
99     void OnNewIncomingConnection(
100         base::UnixSocket* self,
101         std::unique_ptr<base::UnixSocket> new_connection) override;
102     void OnDataAvailable(base::UnixSocket* self) override;
103 
104    private:
105     HeapprofdProducer* producer_;
106   };
107 
108   HeapprofdProducer(HeapprofdMode mode,
109                     base::TaskRunner* task_runner,
110                     bool exit_when_done);
111   ~HeapprofdProducer() override;
112 
113   // Producer Impl:
114   void OnConnect() override;
115   void OnDisconnect() override;
116   void SetupDataSource(DataSourceInstanceID, const DataSourceConfig&) override;
117   void StartDataSource(DataSourceInstanceID, const DataSourceConfig&) override;
118   void StopDataSource(DataSourceInstanceID) override;
119   void OnTracingSetup() override;
120   void Flush(FlushRequestID,
121              const DataSourceInstanceID* data_source_ids,
122              size_t num_data_sources,
123              FlushFlags) override;
ClearIncrementalState(const DataSourceInstanceID *,size_t)124   void ClearIncrementalState(const DataSourceInstanceID* /*data_source_ids*/,
125                              size_t /*num_data_sources*/) override {}
126 
127   // TODO(fmayer): Refactor once/if we have generic reconnect logic.
128   void ConnectWithRetries(const char* socket_name);
129   void DumpAll();
130 
131   // UnwindingWorker::Delegate impl:
132   void PostAllocRecord(UnwindingWorker*, std::unique_ptr<AllocRecord>) override;
133   void PostFreeRecord(UnwindingWorker*, std::vector<FreeRecord>) override;
134   void PostHeapNameRecord(UnwindingWorker*, HeapNameRecord) override;
135   void PostSocketDisconnected(UnwindingWorker*,
136                               DataSourceInstanceID,
137                               pid_t,
138                               SharedRingBuffer::Stats) override;
139   void PostDrainDone(UnwindingWorker*, DataSourceInstanceID) override;
140 
141   void HandleAllocRecord(AllocRecord*);
142   void HandleFreeRecord(FreeRecord);
143   void HandleHeapNameRecord(HeapNameRecord);
144   void HandleSocketDisconnected(DataSourceInstanceID,
145                                 pid_t,
146                                 SharedRingBuffer::Stats);
147 
148   // Valid only if mode_ == kChild.
149   void SetTargetProcess(pid_t target_pid, std::string target_cmdline);
150   void SetDataSourceCallback(std::function<void()> fn);
151 
152   // Exposed for testing.
153   void SetProducerEndpoint(
154       std::unique_ptr<TracingService::ProducerEndpoint> endpoint);
155 
socket_delegate()156   base::UnixSocket::EventListener& socket_delegate() {
157     return socket_delegate_;
158   }
159 
160   // Adopts the (connected) sockets inherited from the target process, invoking
161   // the on-connection callback.
162   // Specific to mode_ == kChild
163   void AdoptSocket(base::ScopedFile fd);
164 
165   void TerminateWhenDone();
166 
167  private:
168   // State of the connection to tracing service (traced).
169   enum State {
170     kNotStarted = 0,
171     kNotConnected,
172     kConnecting,
173     kConnected,
174   };
175 
176   struct ProcessState {
177     struct HeapInfo {
HeapInfoProcessState::HeapInfo178       HeapInfo(GlobalCallstackTrie* cs, bool dam) : heap_tracker(cs, dam) {}
179 
180       HeapTracker heap_tracker;
181       std::string heap_name;
182       uint64_t sampling_interval = 0u;
183       uint64_t orig_sampling_interval = 0u;
184     };
ProcessStateProcessState185     ProcessState(GlobalCallstackTrie* c, bool d)
186         : callsites(c), dump_at_max_mode(d) {}
187     bool disconnected = false;
188     SharedRingBuffer::ErrorState error_state =
189         SharedRingBuffer::ErrorState::kNoError;
190     bool buffer_corrupted = false;
191 
192     uint64_t heap_samples = 0;
193     uint64_t map_reparses = 0;
194     uint64_t unwinding_errors = 0;
195 
196     uint64_t total_unwinding_time_us = 0;
197     uint64_t client_spinlock_blocked_us = 0;
198     GlobalCallstackTrie* callsites;
199     bool dump_at_max_mode;
200     LogHistogram unwinding_time_us;
201     std::map<uint32_t, HeapInfo> heap_infos;
202 
GetHeapInfoProcessState203     HeapInfo& GetHeapInfo(uint32_t heap_id) {
204       auto it = heap_infos.find(heap_id);
205       if (it == heap_infos.end()) {
206         it = heap_infos.emplace_hint(
207             it, std::piecewise_construct, std::forward_as_tuple(heap_id),
208             std::forward_as_tuple(callsites, dump_at_max_mode));
209       }
210       return it->second;
211     }
212 
GetHeapTrackerProcessState213     HeapTracker& GetHeapTracker(uint32_t heap_id) {
214       return GetHeapInfo(heap_id).heap_tracker;
215     }
216   };
217 
218   struct DataSource {
DataSourceDataSource219     explicit DataSource(std::unique_ptr<TraceWriter> tw)
220         : trace_writer(std::move(tw)) {
221       // Make MSAN happy.
222       memset(&client_configuration, 0, sizeof(client_configuration));
223     }
224 
225     DataSourceInstanceID id;
226     std::unique_ptr<TraceWriter> trace_writer;
227     DataSourceConfig ds_config;
228     HeapprofdConfig config;
229     ClientConfiguration client_configuration;
230     std::vector<SystemProperties::Handle> properties;
231     std::set<pid_t> signaled_pids;
232     std::set<pid_t> rejected_pids;
233     std::map<pid_t, ProcessState> process_states;
234     std::vector<std::string> normalized_cmdlines;
235     InterningOutputTracker intern_state;
236     bool shutting_down = false;
237     bool started = false;
238     bool hit_guardrail = false;
239     bool was_stopped = false;
240     uint32_t stop_timeout_ms;
241     uint32_t dump_interval_ms = 0;
242     size_t pending_free_drains = 0;
243     GuardrailConfig guardrail_config;
244   };
245 
246   struct PendingProcess {
247     std::unique_ptr<base::UnixSocket> sock;
248     DataSourceInstanceID data_source_instance_id;
249     SharedRingBuffer shmem;
250   };
251 
252   void HandleClientConnection(std::unique_ptr<base::UnixSocket> new_connection,
253                               Process process);
254 
255   void ConnectService();
256   void Restart();
257   void ResetConnectionBackoff();
258   void IncreaseConnectionBackoff();
259 
260   void CheckDataSourceMemoryTask();
261   void CheckDataSourceCpuTask();
262 
263   void FinishDataSourceFlush(FlushRequestID flush_id);
264   void DumpProcessesInDataSource(DataSource* ds);
265   void DumpProcessState(DataSource* ds, pid_t pid, ProcessState* process);
266   static void SetStats(protos::pbzero::ProfilePacket::ProcessStats* stats,
267                        const ProcessState& process_state);
268 
269   void DoDrainAndContinuousDump(DataSourceInstanceID id);
270   void DoContinuousDump(DataSource* ds);
271   void DrainDone(DataSourceInstanceID);
272 
273   UnwindingWorker& UnwinderForPID(pid_t);
274   bool IsPidProfiled(pid_t);
275   DataSource* GetDataSourceForProcess(const Process& proc);
276   void RecordOtherSourcesAsRejected(DataSource* active_ds, const Process& proc);
277 
278   void SetStartupProperties(DataSource* data_source);
279   void SignalRunningProcesses(DataSource* data_source);
280 
281   // Specific to mode_ == kChild
282   void TerminateProcess(int exit_status);
283 
284   void ShutdownDataSource(DataSource* ds);
285   bool MaybeFinishDataSource(DataSource* ds);
286 
287   void WriteRejectedConcurrentSession(BufferID buffer_id, pid_t pid);
288 
289   // Class state:
290 
291   // Task runner is owned by the main thread.
292   base::TaskRunner* const task_runner_;
293   const HeapprofdMode mode_;
294   // TODO(fmayer): Refactor to make this boolean unnecessary.
295   // Whether to terminate this producer after the first data-source has
296   // finished.
297   bool exit_when_done_;
298 
299   // State of connection to the tracing service.
300   State state_ = kNotStarted;
301   uint32_t connection_backoff_ms_ = 0;
302   const char* producer_sock_name_ = nullptr;
303 
304   // Client processes that have connected, but with which we have not yet
305   // finished the handshake.
306   std::map<pid_t, PendingProcess> pending_processes_;
307 
308   // Must outlive data_sources_ - owns at least the shared memory referenced by
309   // TraceWriters.
310   std::unique_ptr<TracingService::ProducerEndpoint> endpoint_;
311 
312   // Must outlive data_sources_ - HeapTracker references the trie.
313   GlobalCallstackTrie callsites_;
314 
315   // Must outlive data_sources_ - DataSource can hold
316   // SystemProperties::Handle-s.
317   // Specific to mode_ == kCentral
318   SystemProperties properties_;
319 
320   std::map<FlushRequestID, size_t> flushes_in_progress_;
321   std::map<DataSourceInstanceID, DataSource> data_sources_;
322 
323   // Specific to mode_ == kChild
324   Process target_process_{base::kInvalidPid, ""};
325   std::optional<std::function<void()>> data_source_callback_;
326 
327   SocketDelegate socket_delegate_;
328 
329   base::WeakPtrFactory<HeapprofdProducer> weak_factory_;
330 
331   // UnwindingWorker's destructor might attempt to post producer tasks, so this
332   // needs to outlive weak_factory_.
333   std::vector<UnwindingWorker> unwinding_workers_;
334 };
335 
336 }  // namespace profiling
337 }  // namespace perfetto
338 
339 #endif  // SRC_PROFILING_MEMORY_HEAPPROFD_PRODUCER_H_
340