xref: /aosp_15_r20/external/perfetto/src/tracing/ipc/consumer/consumer_ipc_client_impl.cc (revision 6dbdd20afdafa5e3ca9b8809fa73465d530080dc)
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/tracing/ipc/consumer/consumer_ipc_client_impl.h"
18 
19 #include <string.h>
20 
21 #include <cinttypes>
22 
23 #include "perfetto/base/task_runner.h"
24 #include "perfetto/ext/ipc/client.h"
25 #include "perfetto/ext/tracing/core/consumer.h"
26 #include "perfetto/ext/tracing/core/observable_events.h"
27 #include "perfetto/ext/tracing/core/trace_stats.h"
28 #include "perfetto/tracing/core/trace_config.h"
29 #include "perfetto/tracing/core/tracing_service_state.h"
30 
31 // TODO(fmayer): Add a test to check to what happens when ConsumerIPCClientImpl
32 // gets destroyed w.r.t. the Consumer pointer. Also think to lifetime of the
33 // Consumer* during the callbacks.
34 
35 namespace perfetto {
36 
37 // static. (Declared in include/tracing/ipc/consumer_ipc_client.h).
Connect(const char * service_sock_name,Consumer * consumer,base::TaskRunner * task_runner)38 std::unique_ptr<TracingService::ConsumerEndpoint> ConsumerIPCClient::Connect(
39     const char* service_sock_name,
40     Consumer* consumer,
41     base::TaskRunner* task_runner) {
42   return std::unique_ptr<TracingService::ConsumerEndpoint>(
43       new ConsumerIPCClientImpl(service_sock_name, consumer, task_runner));
44 }
45 
ConsumerIPCClientImpl(const char * service_sock_name,Consumer * consumer,base::TaskRunner * task_runner)46 ConsumerIPCClientImpl::ConsumerIPCClientImpl(const char* service_sock_name,
47                                              Consumer* consumer,
48                                              base::TaskRunner* task_runner)
49     : consumer_(consumer),
50       ipc_channel_(
51           ipc::Client::CreateInstance({service_sock_name, /*sock_retry=*/false},
52                                       task_runner)),
53       consumer_port_(this /* event_listener */),
54       weak_ptr_factory_(this) {
55   ipc_channel_->BindService(consumer_port_.GetWeakPtr());
56 }
57 
58 ConsumerIPCClientImpl::~ConsumerIPCClientImpl() = default;
59 
60 // Called by the IPC layer if the BindService() succeeds.
OnConnect()61 void ConsumerIPCClientImpl::OnConnect() {
62   connected_ = true;
63   consumer_->OnConnect();
64 }
65 
OnDisconnect()66 void ConsumerIPCClientImpl::OnDisconnect() {
67   PERFETTO_DLOG("Tracing service connection failure");
68   connected_ = false;
69   consumer_->OnDisconnect();  // Note: may delete |this|.
70 }
71 
EnableTracing(const TraceConfig & trace_config,base::ScopedFile fd)72 void ConsumerIPCClientImpl::EnableTracing(const TraceConfig& trace_config,
73                                           base::ScopedFile fd) {
74   if (!connected_) {
75     PERFETTO_DLOG("Cannot EnableTracing(), not connected to tracing service");
76     return;
77   }
78 
79 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
80   if (fd) {
81     consumer_->OnTracingDisabled(
82         "Passing FDs for write_into_file is not supported on Windows");
83     return;
84   }
85 #endif
86 
87   protos::gen::EnableTracingRequest req;
88   *req.mutable_trace_config() = trace_config;
89   ipc::Deferred<protos::gen::EnableTracingResponse> async_response;
90   auto weak_this = weak_ptr_factory_.GetWeakPtr();
91   async_response.Bind(
92       [weak_this](
93           ipc::AsyncResult<protos::gen::EnableTracingResponse> response) {
94         if (weak_this)
95           weak_this->OnEnableTracingResponse(std::move(response));
96       });
97 
98   // |fd| will be closed when this function returns, but it's fine because the
99   // IPC layer dup()'s it when sending the IPC.
100   consumer_port_.EnableTracing(req, std::move(async_response), *fd);
101 }
102 
ChangeTraceConfig(const TraceConfig & trace_config)103 void ConsumerIPCClientImpl::ChangeTraceConfig(const TraceConfig& trace_config) {
104   if (!connected_) {
105     PERFETTO_DLOG(
106         "Cannot ChangeTraceConfig(), not connected to tracing service");
107     return;
108   }
109 
110   ipc::Deferred<protos::gen::ChangeTraceConfigResponse> async_response;
111   async_response.Bind(
112       [](ipc::AsyncResult<protos::gen::ChangeTraceConfigResponse> response) {
113         if (!response)
114           PERFETTO_DLOG("ChangeTraceConfig() failed");
115       });
116   protos::gen::ChangeTraceConfigRequest req;
117   *req.mutable_trace_config() = trace_config;
118   consumer_port_.ChangeTraceConfig(req, std::move(async_response));
119 }
120 
StartTracing()121 void ConsumerIPCClientImpl::StartTracing() {
122   if (!connected_) {
123     PERFETTO_DLOG("Cannot StartTracing(), not connected to tracing service");
124     return;
125   }
126 
127   ipc::Deferred<protos::gen::StartTracingResponse> async_response;
128   async_response.Bind(
129       [](ipc::AsyncResult<protos::gen::StartTracingResponse> response) {
130         if (!response)
131           PERFETTO_DLOG("StartTracing() failed");
132       });
133   protos::gen::StartTracingRequest req;
134   consumer_port_.StartTracing(req, std::move(async_response));
135 }
136 
DisableTracing()137 void ConsumerIPCClientImpl::DisableTracing() {
138   if (!connected_) {
139     PERFETTO_DLOG("Cannot DisableTracing(), not connected to tracing service");
140     return;
141   }
142 
143   ipc::Deferred<protos::gen::DisableTracingResponse> async_response;
144   async_response.Bind(
145       [](ipc::AsyncResult<protos::gen::DisableTracingResponse> response) {
146         if (!response)
147           PERFETTO_DLOG("DisableTracing() failed");
148       });
149   consumer_port_.DisableTracing(protos::gen::DisableTracingRequest(),
150                                 std::move(async_response));
151 }
152 
ReadBuffers()153 void ConsumerIPCClientImpl::ReadBuffers() {
154   if (!connected_) {
155     PERFETTO_DLOG("Cannot ReadBuffers(), not connected to tracing service");
156     return;
157   }
158 
159   ipc::Deferred<protos::gen::ReadBuffersResponse> async_response;
160 
161   // The IPC layer guarantees that callbacks are destroyed after this object
162   // is destroyed (by virtue of destroying the |consumer_port_|). In turn the
163   // contract of this class expects the caller to not destroy the Consumer class
164   // before having destroyed this class. Hence binding |this| here is safe.
165   async_response.Bind(
166       [this](ipc::AsyncResult<protos::gen::ReadBuffersResponse> response) {
167         OnReadBuffersResponse(std::move(response));
168       });
169   consumer_port_.ReadBuffers(protos::gen::ReadBuffersRequest(),
170                              std::move(async_response));
171 }
172 
OnReadBuffersResponse(ipc::AsyncResult<protos::gen::ReadBuffersResponse> response)173 void ConsumerIPCClientImpl::OnReadBuffersResponse(
174     ipc::AsyncResult<protos::gen::ReadBuffersResponse> response) {
175   if (!response) {
176     PERFETTO_DLOG("ReadBuffers() failed");
177     return;
178   }
179   std::vector<TracePacket> trace_packets;
180   for (auto& resp_slice : response->slices()) {
181     const std::string& slice_data = resp_slice.data();
182     Slice slice = Slice::Allocate(slice_data.size());
183     memcpy(slice.own_data(), slice_data.data(), slice.size);
184     partial_packet_.AddSlice(std::move(slice));
185     if (resp_slice.last_slice_for_packet())
186       trace_packets.emplace_back(std::move(partial_packet_));
187   }
188   if (!trace_packets.empty() || !response.has_more())
189     consumer_->OnTraceData(std::move(trace_packets), response.has_more());
190 }
191 
OnEnableTracingResponse(ipc::AsyncResult<protos::gen::EnableTracingResponse> response)192 void ConsumerIPCClientImpl::OnEnableTracingResponse(
193     ipc::AsyncResult<protos::gen::EnableTracingResponse> response) {
194   std::string error;
195   // |response| might be empty when the request gets rejected (if the connection
196   // with the service is dropped all outstanding requests are auto-rejected).
197   if (!response) {
198     error =
199         "EnableTracing IPC request rejected. This is likely due to a loss of "
200         "the traced connection";
201   } else {
202     error = response->error();
203   }
204   if (!response || response->disabled())
205     consumer_->OnTracingDisabled(error);
206 }
207 
FreeBuffers()208 void ConsumerIPCClientImpl::FreeBuffers() {
209   if (!connected_) {
210     PERFETTO_DLOG("Cannot FreeBuffers(), not connected to tracing service");
211     return;
212   }
213 
214   protos::gen::FreeBuffersRequest req;
215   ipc::Deferred<protos::gen::FreeBuffersResponse> async_response;
216   async_response.Bind(
217       [](ipc::AsyncResult<protos::gen::FreeBuffersResponse> response) {
218         if (!response)
219           PERFETTO_DLOG("FreeBuffers() failed");
220       });
221   consumer_port_.FreeBuffers(req, std::move(async_response));
222 }
223 
Flush(uint32_t timeout_ms,FlushCallback callback,FlushFlags flush_flags)224 void ConsumerIPCClientImpl::Flush(uint32_t timeout_ms,
225                                   FlushCallback callback,
226                                   FlushFlags flush_flags) {
227   if (!connected_) {
228     PERFETTO_DLOG("Cannot Flush(), not connected to tracing service");
229     return callback(/*success=*/false);
230   }
231 
232   protos::gen::FlushRequest req;
233   req.set_timeout_ms(static_cast<uint32_t>(timeout_ms));
234   req.set_flags(flush_flags.flags());
235   ipc::Deferred<protos::gen::FlushResponse> async_response;
236   async_response.Bind(
237       [callback](ipc::AsyncResult<protos::gen::FlushResponse> response) {
238         callback(!!response);
239       });
240   consumer_port_.Flush(req, std::move(async_response));
241 }
242 
Detach(const std::string & key)243 void ConsumerIPCClientImpl::Detach(const std::string& key) {
244   if (!connected_) {
245     PERFETTO_DLOG("Cannot Detach(), not connected to tracing service");
246     return;
247   }
248 
249   protos::gen::DetachRequest req;
250   req.set_key(key);
251   ipc::Deferred<protos::gen::DetachResponse> async_response;
252   auto weak_this = weak_ptr_factory_.GetWeakPtr();
253 
254   async_response.Bind(
255       [weak_this](ipc::AsyncResult<protos::gen::DetachResponse> response) {
256         if (weak_this)
257           weak_this->consumer_->OnDetach(!!response);
258       });
259   consumer_port_.Detach(req, std::move(async_response));
260 }
261 
Attach(const std::string & key)262 void ConsumerIPCClientImpl::Attach(const std::string& key) {
263   if (!connected_) {
264     PERFETTO_DLOG("Cannot Attach(), not connected to tracing service");
265     return;
266   }
267 
268   {
269     protos::gen::AttachRequest req;
270     req.set_key(key);
271     ipc::Deferred<protos::gen::AttachResponse> async_response;
272     auto weak_this = weak_ptr_factory_.GetWeakPtr();
273 
274     async_response.Bind(
275         [weak_this](ipc::AsyncResult<protos::gen::AttachResponse> response) {
276           if (!weak_this)
277             return;
278           if (!response) {
279             weak_this->consumer_->OnAttach(/*success=*/false, TraceConfig());
280             return;
281           }
282           const TraceConfig& trace_config = response->trace_config();
283 
284           // If attached successfully, also attach to the end-of-trace
285           // notificaton callback, via EnableTracing(attach_notification_only).
286           protos::gen::EnableTracingRequest enable_req;
287           enable_req.set_attach_notification_only(true);
288           ipc::Deferred<protos::gen::EnableTracingResponse> enable_resp;
289           enable_resp.Bind(
290               [weak_this](
291                   ipc::AsyncResult<protos::gen::EnableTracingResponse> resp) {
292                 if (weak_this)
293                   weak_this->OnEnableTracingResponse(std::move(resp));
294               });
295           weak_this->consumer_port_.EnableTracing(enable_req,
296                                                   std::move(enable_resp));
297 
298           weak_this->consumer_->OnAttach(/*success=*/true, trace_config);
299         });
300     consumer_port_.Attach(req, std::move(async_response));
301   }
302 }
303 
GetTraceStats()304 void ConsumerIPCClientImpl::GetTraceStats() {
305   if (!connected_) {
306     PERFETTO_DLOG("Cannot GetTraceStats(), not connected to tracing service");
307     return;
308   }
309 
310   protos::gen::GetTraceStatsRequest req;
311   ipc::Deferred<protos::gen::GetTraceStatsResponse> async_response;
312 
313   // The IPC layer guarantees that callbacks are destroyed after this object
314   // is destroyed (by virtue of destroying the |consumer_port_|). In turn the
315   // contract of this class expects the caller to not destroy the Consumer class
316   // before having destroyed this class. Hence binding |this| here is safe.
317   async_response.Bind(
318       [this](ipc::AsyncResult<protos::gen::GetTraceStatsResponse> response) {
319         if (!response) {
320           consumer_->OnTraceStats(/*success=*/false, TraceStats());
321           return;
322         }
323         consumer_->OnTraceStats(/*success=*/true, response->trace_stats());
324       });
325   consumer_port_.GetTraceStats(req, std::move(async_response));
326 }
327 
ObserveEvents(uint32_t enabled_event_types)328 void ConsumerIPCClientImpl::ObserveEvents(uint32_t enabled_event_types) {
329   if (!connected_) {
330     PERFETTO_DLOG("Cannot ObserveEvents(), not connected to tracing service");
331     return;
332   }
333 
334   protos::gen::ObserveEventsRequest req;
335   for (uint32_t i = 0; i < 32; i++) {
336     const uint32_t event_id = 1u << i;
337     if (enabled_event_types & event_id)
338       req.add_events_to_observe(static_cast<ObservableEvents::Type>(event_id));
339   }
340 
341   ipc::Deferred<protos::gen::ObserveEventsResponse> async_response;
342   // The IPC layer guarantees that callbacks are destroyed after this object
343   // is destroyed (by virtue of destroying the |consumer_port_|). In turn the
344   // contract of this class expects the caller to not destroy the Consumer class
345   // before having destroyed this class. Hence binding |this| here is safe.
346   async_response.Bind(
347       [this](ipc::AsyncResult<protos::gen::ObserveEventsResponse> response) {
348         // Skip empty response, which the service sends to close the stream.
349         if (!response.has_more()) {
350           PERFETTO_DCHECK(!response.success());
351           return;
352         }
353         consumer_->OnObservableEvents(response->events());
354       });
355   consumer_port_.ObserveEvents(req, std::move(async_response));
356 }
357 
QueryServiceState(QueryServiceStateArgs args,QueryServiceStateCallback callback)358 void ConsumerIPCClientImpl::QueryServiceState(
359     QueryServiceStateArgs args,
360     QueryServiceStateCallback callback) {
361   if (!connected_) {
362     PERFETTO_DLOG(
363         "Cannot QueryServiceState(), not connected to tracing service");
364     return;
365   }
366 
367   auto it = pending_query_svc_reqs_.insert(pending_query_svc_reqs_.end(),
368                                            {std::move(callback), {}});
369   protos::gen::QueryServiceStateRequest req;
370   req.set_sessions_only(args.sessions_only);
371   ipc::Deferred<protos::gen::QueryServiceStateResponse> async_response;
372   auto weak_this = weak_ptr_factory_.GetWeakPtr();
373   async_response.Bind(
374       [weak_this,
375        it](ipc::AsyncResult<protos::gen::QueryServiceStateResponse> response) {
376         if (weak_this)
377           weak_this->OnQueryServiceStateResponse(std::move(response), it);
378       });
379   consumer_port_.QueryServiceState(req, std::move(async_response));
380 }
381 
OnQueryServiceStateResponse(ipc::AsyncResult<protos::gen::QueryServiceStateResponse> response,PendingQueryServiceRequests::iterator req_it)382 void ConsumerIPCClientImpl::OnQueryServiceStateResponse(
383     ipc::AsyncResult<protos::gen::QueryServiceStateResponse> response,
384     PendingQueryServiceRequests::iterator req_it) {
385   PERFETTO_DCHECK(req_it->callback);
386 
387   if (!response) {
388     auto callback = std::move(req_it->callback);
389     pending_query_svc_reqs_.erase(req_it);
390     callback(false, TracingServiceState());
391     return;
392   }
393 
394   // The QueryServiceState response can be split in several chunks if the
395   // service has several data sources. The client is supposed to merge all the
396   // replies. The easiest way to achieve this is to re-serialize the partial
397   // response and then re-decode the merged result in one shot.
398   std::vector<uint8_t>& merged_resp = req_it->merged_resp;
399   std::vector<uint8_t> part = response->service_state().SerializeAsArray();
400   merged_resp.insert(merged_resp.end(), part.begin(), part.end());
401 
402   if (response.has_more())
403     return;
404 
405   // All replies have been received. Decode the merged result and reply to the
406   // callback.
407   protos::gen::TracingServiceState svc_state;
408   bool ok = svc_state.ParseFromArray(merged_resp.data(), merged_resp.size());
409   if (!ok)
410     PERFETTO_ELOG("Failed to decode merged QueryServiceStateResponse");
411   auto callback = std::move(req_it->callback);
412   pending_query_svc_reqs_.erase(req_it);
413   callback(ok, std::move(svc_state));
414 }
415 
QueryCapabilities(QueryCapabilitiesCallback callback)416 void ConsumerIPCClientImpl::QueryCapabilities(
417     QueryCapabilitiesCallback callback) {
418   if (!connected_) {
419     PERFETTO_DLOG(
420         "Cannot QueryCapabilities(), not connected to tracing service");
421     return;
422   }
423 
424   protos::gen::QueryCapabilitiesRequest req;
425   ipc::Deferred<protos::gen::QueryCapabilitiesResponse> async_response;
426   async_response.Bind(
427       [callback](
428           ipc::AsyncResult<protos::gen::QueryCapabilitiesResponse> response) {
429         if (!response) {
430           // If the IPC fails, we are talking to an older version of the service
431           // that didn't support QueryCapabilities at all. In this case return
432           // an empty capabilities message.
433           callback(TracingServiceCapabilities());
434         } else {
435           callback(response->capabilities());
436         }
437       });
438   consumer_port_.QueryCapabilities(req, std::move(async_response));
439 }
440 
SaveTraceForBugreport(SaveTraceForBugreportCallback callback)441 void ConsumerIPCClientImpl::SaveTraceForBugreport(
442     SaveTraceForBugreportCallback callback) {
443   if (!connected_) {
444     PERFETTO_DLOG(
445         "Cannot SaveTraceForBugreport(), not connected to tracing service");
446     return;
447   }
448 
449   protos::gen::SaveTraceForBugreportRequest req;
450   ipc::Deferred<protos::gen::SaveTraceForBugreportResponse> async_response;
451   async_response.Bind(
452       [callback](ipc::AsyncResult<protos::gen::SaveTraceForBugreportResponse>
453                      response) {
454         if (!response) {
455           // If the IPC fails, we are talking to an older version of the service
456           // that didn't support SaveTraceForBugreport at all.
457           callback(
458               false,
459               "The tracing service doesn't support SaveTraceForBugreport()");
460         } else {
461           callback(response->success(), response->msg());
462         }
463       });
464   consumer_port_.SaveTraceForBugreport(req, std::move(async_response));
465 }
466 
CloneSession(CloneSessionArgs args)467 void ConsumerIPCClientImpl::CloneSession(CloneSessionArgs args) {
468   if (!connected_) {
469     PERFETTO_DLOG("Cannot CloneSession(), not connected to tracing service");
470     return;
471   }
472 
473   protos::gen::CloneSessionRequest req;
474   if (args.tsid) {
475     req.set_session_id(args.tsid);
476   }
477   if (!args.unique_session_name.empty()) {
478     req.set_unique_session_name(args.unique_session_name);
479   }
480   req.set_skip_trace_filter(args.skip_trace_filter);
481   req.set_for_bugreport(args.for_bugreport);
482   if (!args.clone_trigger_name.empty()) {
483     req.set_clone_trigger_name(args.clone_trigger_name);
484   }
485   if (!args.clone_trigger_producer_name.empty()) {
486     req.set_clone_trigger_producer_name(args.clone_trigger_producer_name);
487   }
488   if (args.clone_trigger_trusted_producer_uid != 0) {
489     req.set_clone_trigger_trusted_producer_uid(
490         static_cast<int32_t>(args.clone_trigger_trusted_producer_uid));
491   }
492   if (args.clone_trigger_boot_time_ns != 0) {
493     req.set_clone_trigger_boot_time_ns(args.clone_trigger_boot_time_ns);
494   }
495   ipc::Deferred<protos::gen::CloneSessionResponse> async_response;
496   auto weak_this = weak_ptr_factory_.GetWeakPtr();
497 
498   async_response.Bind(
499       [weak_this](
500           ipc::AsyncResult<protos::gen::CloneSessionResponse> response) {
501         if (!weak_this)
502           return;
503         if (!response) {
504           // If the IPC fails, we are talking to an older version of the service
505           // that didn't support CloneSession at all.
506           weak_this->consumer_->OnSessionCloned(
507               {false, "CloneSession IPC not supported", {}});
508         } else {
509           base::Uuid uuid(response->uuid_lsb(), response->uuid_msb());
510           weak_this->consumer_->OnSessionCloned(
511               {response->success(), response->error(), uuid});
512         }
513       });
514   consumer_port_.CloneSession(req, std::move(async_response));
515 }
516 }  // namespace perfetto
517