xref: /aosp_15_r20/external/perfetto/src/tracing/core/shared_memory_arbiter_impl.h (revision 6dbdd20afdafa5e3ca9b8809fa73465d530080dc)
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef SRC_TRACING_CORE_SHARED_MEMORY_ARBITER_IMPL_H_
18 #define SRC_TRACING_CORE_SHARED_MEMORY_ARBITER_IMPL_H_
19 
20 #include <stdint.h>
21 
22 #include <functional>
23 #include <map>
24 #include <memory>
25 #include <mutex>
26 #include <vector>
27 
28 #include "perfetto/ext/base/weak_ptr.h"
29 #include "perfetto/ext/tracing/core/basic_types.h"
30 #include "perfetto/ext/tracing/core/shared_memory_abi.h"
31 #include "perfetto/ext/tracing/core/shared_memory_arbiter.h"
32 #include "perfetto/tracing/core/forward_decls.h"
33 #include "src/tracing/core/id_allocator.h"
34 
35 namespace perfetto {
36 
37 class PatchList;
38 class Patch;
39 class TraceWriter;
40 class TraceWriterImpl;
41 
42 namespace base {
43 class TaskRunner;
44 }  // namespace base
45 
46 // This class handles the shared memory buffer on the producer side. It is used
47 // to obtain thread-local chunks and to partition pages from several threads.
48 // There is one arbiter instance per Producer.
49 // This class is thread-safe and uses locks to do so. Data sources are supposed
50 // to interact with this sporadically, only when they run out of space on their
51 // current thread-local chunk.
52 //
53 // The arbiter can become "unbound" as a consequence of:
54 //  (a) being created without an endpoint
55 //  (b) CreateStartupTraceWriter calls after creation (whether created with or
56 //      without endpoint).
57 //
58 // Entering the unbound state is only supported if all trace writers are created
59 // in kDrop mode. In the unbound state, the arbiter buffers commit messages
60 // until all trace writers are bound to a target buffer.
61 //
62 // The following state transitions are possible:
63 //
64 //   CreateInstance()
65 //    |
66 //    |  CreateUnboundInstance()
67 //    |    |
68 //    |    |
69 //    |    V
70 //    |  [ !fully_bound_, !endpoint_, 0 unbound buffer reservations ]
71 //    |      |     |
72 //    |      |     | CreateStartupTraceWriter(buf)
73 //    |      |     |  buffer reservations += buf
74 //    |      |     |
75 //    |      |     |             ----
76 //    |      |     |            |    | CreateStartupTraceWriter(buf)
77 //    |      |     |            |    |  buffer reservations += buf
78 //    |      |     V            |    V
79 //    |      |   [ !fully_bound_, !endpoint_, >=1 unbound buffer reservations ]
80 //    |      |                                                |
81 //    |      |                       BindToProducerEndpoint() |
82 //    |      |                                                |
83 //    |      | BindToProducerEndpoint()                       |
84 //    |      |                                                V
85 //    |      |   [ !fully_bound_, endpoint_, >=1 unbound buffer reservations ]
86 //    |      |   A    |    A                               |     A
87 //    |      |   |    |    |                               |     |
88 //    |      |   |     ----                                |     |
89 //    |      |   |    CreateStartupTraceWriter(buf)        |     |
90 //    |      |   |     buffer reservations += buf          |     |
91 //    |      |   |                                         |     |
92 //    |      |   | CreateStartupTraceWriter(buf)           |     |
93 //    |      |   |  where buf is not yet bound             |     |
94 //    |      |   |  buffer reservations += buf             |     | (yes)
95 //    |      |   |                                         |     |
96 //    |      |   |        BindStartupTargetBuffer(buf, id) |-----
97 //    |      |   |           buffer reservations -= buf    | reservations > 0?
98 //    |      |   |                                         |
99 //    |      |   |                                         | (no)
100 //    |      V   |                                         V
101 //     --> [ fully_bound_, endpoint_, 0 unbound buffer reservations ]
102 //              |    A
103 //              |    | CreateStartupTraceWriter(buf)
104 //              |    |  where buf is already bound
105 //               ----
106 class SharedMemoryArbiterImpl : public SharedMemoryArbiter {
107  public:
108   // See SharedMemoryArbiter::CreateInstance(). |start|, |size| define the
109   // boundaries of the shared memory buffer. ProducerEndpoint and TaskRunner may
110   // be |nullptr| if created unbound, see
111   // SharedMemoryArbiter::CreateUnboundInstance().
112 
113   // SharedMemoryArbiterImpl(void* start,
114   //                         size_t size,
115   //                         size_t page_size,
116   //                         TracingService::ProducerEndpoint*
117   //                         producer_endpoint, base::TaskRunner* task_runner) :
118   //   SharedMemoryArbiterImpl(start, size, page_size, false, producer_endpoint,
119   //   task_runner) {
120   // }
121 
122   SharedMemoryArbiterImpl(void* start,
123                           size_t size,
124                           ShmemMode mode,
125                           size_t page_size,
126                           TracingService::ProducerEndpoint*,
127                           base::TaskRunner*);
128 
129   // Returns a new Chunk to write tracing data. Depending on the provided
130   // BufferExhaustedPolicy, this may return an invalid chunk if no valid free
131   // chunk could be found in the SMB.
132   SharedMemoryABI::Chunk GetNewChunk(const SharedMemoryABI::ChunkHeader&,
133                                      BufferExhaustedPolicy);
134 
135   // Puts back a Chunk that has been completed and sends a request to the
136   // service to move it to the central tracing buffer. |target_buffer| is the
137   // absolute trace buffer ID where the service should move the chunk onto (the
138   // producer is just to copy back the same number received in the
139   // DataSourceConfig upon the StartDataSource() reques).
140   // PatchList is a pointer to the list of patches for previous chunks. The
141   // first patched entries will be removed from the patched list and sent over
142   // to the service in the same CommitData() IPC request.
143   void ReturnCompletedChunk(SharedMemoryABI::Chunk,
144                             MaybeUnboundBufferID target_buffer,
145                             PatchList*);
146 
147   // Send a request to the service to apply completed patches from |patch_list|.
148   // |writer_id| is the ID of the TraceWriter that calls this method,
149   // |target_buffer| is the global trace buffer ID of its target buffer.
150   void SendPatches(WriterID writer_id,
151                    MaybeUnboundBufferID target_buffer,
152                    PatchList* patch_list);
153 
shmem_abi_for_testing()154   SharedMemoryABI* shmem_abi_for_testing() { return &shmem_abi_; }
155 
set_default_layout_for_testing(SharedMemoryABI::PageLayout l)156   static void set_default_layout_for_testing(SharedMemoryABI::PageLayout l) {
157     default_page_layout = l;
158   }
159 
default_page_layout_for_testing()160   static SharedMemoryABI::PageLayout default_page_layout_for_testing() {
161     return default_page_layout;
162   }
163 
164   // SharedMemoryArbiter implementation.
165   // See include/perfetto/tracing/core/shared_memory_arbiter.h for comments.
166   std::unique_ptr<TraceWriter> CreateTraceWriter(
167       BufferID target_buffer,
168       BufferExhaustedPolicy = BufferExhaustedPolicy::kDefault) override;
169   std::unique_ptr<TraceWriter> CreateStartupTraceWriter(
170       uint16_t target_buffer_reservation_id) override;
171   void BindToProducerEndpoint(TracingService::ProducerEndpoint*,
172                               base::TaskRunner*) override;
173   void BindStartupTargetBuffer(uint16_t target_buffer_reservation_id,
174                                BufferID target_buffer_id) override;
175   void AbortStartupTracingForReservation(
176       uint16_t target_buffer_reservation_id) override;
177   void NotifyFlushComplete(FlushRequestID) override;
178 
179   void SetBatchCommitsDuration(uint32_t batch_commits_duration_ms) override;
180 
181   bool EnableDirectSMBPatching() override;
182 
183   void SetDirectSMBPatchingSupportedByService() override;
184 
185   void FlushPendingCommitDataRequests(
186       std::function<void()> callback = {}) override;
187   bool TryShutdown() override;
188 
task_runner()189   base::TaskRunner* task_runner() const { return task_runner_; }
page_size()190   size_t page_size() const { return shmem_abi_.page_size(); }
num_pages()191   size_t num_pages() const { return shmem_abi_.num_pages(); }
192 
GetWeakPtr()193   base::WeakPtr<SharedMemoryArbiterImpl> GetWeakPtr() const {
194     return weak_ptr_factory_.GetWeakPtr();
195   }
196 
197  private:
198   friend class TraceWriterImpl;
199   friend class StartupTraceWriterTest;
200   friend class SharedMemoryArbiterImplTest;
201 
202   struct TargetBufferReservation {
203     bool resolved = false;
204     BufferID target_buffer = kInvalidBufferId;
205   };
206 
207   // Placeholder for the actual target buffer ID of a startup target buffer
208   // reservation ID in |target_buffer_reservations_|.
209   static constexpr BufferID kInvalidBufferId = 0;
210 
211   static SharedMemoryABI::PageLayout default_page_layout;
212 
213   SharedMemoryArbiterImpl(const SharedMemoryArbiterImpl&) = delete;
214   SharedMemoryArbiterImpl& operator=(const SharedMemoryArbiterImpl&) = delete;
215 
216   void UpdateCommitDataRequest(SharedMemoryABI::Chunk chunk,
217                                WriterID writer_id,
218                                MaybeUnboundBufferID target_buffer,
219                                PatchList* patch_list);
220 
221   // Search the chunks that are being batched in |commit_data_req_| for a chunk
222   // that needs patching and that matches the provided |writer_id| and
223   // |patch.chunk_id|. If found, apply |patch| to that chunk, and if
224   // |chunk_needs_more_patching| is true, clear the needs patching flag of the
225   // chunk and mark it as complete - to allow the service to read it (and other
226   // chunks after it) during scraping. Returns true if the patch was applied,
227   // false otherwise.
228   //
229   // Note: the caller must be holding |lock_| for the duration of the call.
230   bool TryDirectPatchLocked(WriterID writer_id,
231                             const Patch& patch,
232                             bool chunk_needs_more_patching);
233   std::unique_ptr<TraceWriter> CreateTraceWriterInternal(
234       MaybeUnboundBufferID target_buffer,
235       BufferExhaustedPolicy);
236 
237   // Called by the TraceWriter destructor.
238   void ReleaseWriterID(WriterID);
239 
240   void BindStartupTargetBufferImpl(std::unique_lock<std::mutex> scoped_lock,
241                                    uint16_t target_buffer_reservation_id,
242                                    BufferID target_buffer_id);
243 
244   // Returns some statistics about chunks/pages in the shared memory buffer.
245   struct Stats {
246     size_t chunks_free = 0;
247     size_t chunks_being_written = 0;
248     size_t chunks_being_read = 0;
249     size_t chunks_complete = 0;
250 
251     // No chunks are included from free/malformed pages.
252     size_t pages_free = 0;
253     size_t pages_unexpected = 0;
254   };
255   Stats GetStats();
256 
257   // If any flush callbacks were queued up while the arbiter or any target
258   // buffer reservation was unbound, this wraps the pending callbacks into a new
259   // std::function and returns it. Otherwise returns an invalid std::function.
260   std::function<void()> TakePendingFlushCallbacksLocked();
261 
262   // Replace occurrences of target buffer reservation IDs in |commit_data_req_|
263   // with their respective actual BufferIDs if they were already bound. Returns
264   // true iff all occurrences were replaced.
265   bool ReplaceCommitPlaceholderBufferIdsLocked();
266 
267   // Update and return |fully_bound_| based on the arbiter's |pending_writers_|
268   // state.
269   bool UpdateFullyBoundLocked();
270 
271   // Only accessed on |task_runner_| after the producer endpoint was bound.
272   TracingService::ProducerEndpoint* producer_endpoint_ = nullptr;
273 
274   // Set to true when this instance runs in a emulation mode for a producer
275   // endpoint that doesn't support shared memory (e.g. vsock).
276   const bool use_shmem_emulation_ = false;
277 
278   // --- Begin lock-protected members ---
279 
280   std::mutex lock_;
281 
282   base::TaskRunner* task_runner_ = nullptr;
283   SharedMemoryABI shmem_abi_;
284   size_t page_idx_ = 0;
285   std::unique_ptr<CommitDataRequest> commit_data_req_;
286   size_t bytes_pending_commit_ = 0;  // SUM(chunk.size() : commit_data_req_).
287   IdAllocator<WriterID> active_writer_ids_;
288   bool did_shutdown_ = false;
289 
290   // Whether the arbiter itself and all startup target buffer reservations are
291   // bound. Note that this can become false again later if a new target buffer
292   // reservation is created by calling CreateStartupTraceWriter() with a new
293   // reservation id.
294   bool fully_bound_;
295 
296   // Whether the arbiter was always bound. If false, the arbiter was unbound at
297   // one point in time.
298   bool was_always_bound_;
299 
300   // Whether all created trace writers were created with kDrop policy.
301   bool all_writers_have_drop_policy_ = true;
302 
303   // IDs of writers and their assigned target buffers that should be registered
304   // with the service after the arbiter and/or their startup target buffer is
305   // bound.
306   std::map<WriterID, MaybeUnboundBufferID> pending_writers_;
307 
308   // Callbacks for flush requests issued while the arbiter or a target buffer
309   // reservation was unbound.
310   std::vector<std::function<void()>> pending_flush_callbacks_;
311 
312   // See SharedMemoryArbiter::SetBatchCommitsDuration.
313   uint32_t batch_commits_duration_ms_ = 0;
314 
315   // See SharedMemoryArbiter::EnableDirectSMBPatching.
316   bool direct_patching_enabled_ = false;
317 
318   // See SharedMemoryArbiter::SetDirectSMBPatchingSupportedByService.
319   bool direct_patching_supported_by_service_ = false;
320 
321   // Indicates whether we have already scheduled a delayed flush for the
322   // purposes of batching. Set to true at the beginning of a batching period and
323   // cleared at the end of the period. Immediate flushes that happen during a
324   // batching period will empty the |commit_data_req| (triggering an immediate
325   // IPC to the service), but will not clear this flag and the
326   // previously-scheduled delayed flush will still occur at the end of the
327   // batching period.
328   bool delayed_flush_scheduled_ = false;
329 
330   // Stores target buffer reservations for writers created via
331   // CreateStartupTraceWriter(). A bound reservation sets
332   // TargetBufferReservation::resolved to true and is associated with the actual
333   // BufferID supplied in BindStartupTargetBuffer().
334   //
335   // TODO(eseckler): Clean up entries from this map. This would probably require
336   // a method in SharedMemoryArbiter that allows a producer to invalidate a
337   // reservation ID.
338   std::map<MaybeUnboundBufferID, TargetBufferReservation>
339       target_buffer_reservations_;
340 
341   // --- End lock-protected members ---
342 
343   // Keep at the end.
344   base::WeakPtrFactory<SharedMemoryArbiterImpl> weak_ptr_factory_;
345 };
346 
347 }  // namespace perfetto
348 
349 #endif  // SRC_TRACING_CORE_SHARED_MEMORY_ARBITER_IMPL_H_
350