1 /* 2 * Copyright (C) 2017 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef SRC_TRACING_CORE_SHARED_MEMORY_ARBITER_IMPL_H_ 18 #define SRC_TRACING_CORE_SHARED_MEMORY_ARBITER_IMPL_H_ 19 20 #include <stdint.h> 21 22 #include <functional> 23 #include <map> 24 #include <memory> 25 #include <mutex> 26 #include <vector> 27 28 #include "perfetto/ext/base/weak_ptr.h" 29 #include "perfetto/ext/tracing/core/basic_types.h" 30 #include "perfetto/ext/tracing/core/shared_memory_abi.h" 31 #include "perfetto/ext/tracing/core/shared_memory_arbiter.h" 32 #include "perfetto/tracing/core/forward_decls.h" 33 #include "src/tracing/core/id_allocator.h" 34 35 namespace perfetto { 36 37 class PatchList; 38 class Patch; 39 class TraceWriter; 40 class TraceWriterImpl; 41 42 namespace base { 43 class TaskRunner; 44 } // namespace base 45 46 // This class handles the shared memory buffer on the producer side. It is used 47 // to obtain thread-local chunks and to partition pages from several threads. 48 // There is one arbiter instance per Producer. 49 // This class is thread-safe and uses locks to do so. Data sources are supposed 50 // to interact with this sporadically, only when they run out of space on their 51 // current thread-local chunk. 52 // 53 // The arbiter can become "unbound" as a consequence of: 54 // (a) being created without an endpoint 55 // (b) CreateStartupTraceWriter calls after creation (whether created with or 56 // without endpoint). 57 // 58 // Entering the unbound state is only supported if all trace writers are created 59 // in kDrop mode. In the unbound state, the arbiter buffers commit messages 60 // until all trace writers are bound to a target buffer. 61 // 62 // The following state transitions are possible: 63 // 64 // CreateInstance() 65 // | 66 // | CreateUnboundInstance() 67 // | | 68 // | | 69 // | V 70 // | [ !fully_bound_, !endpoint_, 0 unbound buffer reservations ] 71 // | | | 72 // | | | CreateStartupTraceWriter(buf) 73 // | | | buffer reservations += buf 74 // | | | 75 // | | | ---- 76 // | | | | | CreateStartupTraceWriter(buf) 77 // | | | | | buffer reservations += buf 78 // | | V | V 79 // | | [ !fully_bound_, !endpoint_, >=1 unbound buffer reservations ] 80 // | | | 81 // | | BindToProducerEndpoint() | 82 // | | | 83 // | | BindToProducerEndpoint() | 84 // | | V 85 // | | [ !fully_bound_, endpoint_, >=1 unbound buffer reservations ] 86 // | | A | A | A 87 // | | | | | | | 88 // | | | ---- | | 89 // | | | CreateStartupTraceWriter(buf) | | 90 // | | | buffer reservations += buf | | 91 // | | | | | 92 // | | | CreateStartupTraceWriter(buf) | | 93 // | | | where buf is not yet bound | | 94 // | | | buffer reservations += buf | | (yes) 95 // | | | | | 96 // | | | BindStartupTargetBuffer(buf, id) |----- 97 // | | | buffer reservations -= buf | reservations > 0? 98 // | | | | 99 // | | | | (no) 100 // | V | V 101 // --> [ fully_bound_, endpoint_, 0 unbound buffer reservations ] 102 // | A 103 // | | CreateStartupTraceWriter(buf) 104 // | | where buf is already bound 105 // ---- 106 class SharedMemoryArbiterImpl : public SharedMemoryArbiter { 107 public: 108 // See SharedMemoryArbiter::CreateInstance(). |start|, |size| define the 109 // boundaries of the shared memory buffer. ProducerEndpoint and TaskRunner may 110 // be |nullptr| if created unbound, see 111 // SharedMemoryArbiter::CreateUnboundInstance(). 112 113 // SharedMemoryArbiterImpl(void* start, 114 // size_t size, 115 // size_t page_size, 116 // TracingService::ProducerEndpoint* 117 // producer_endpoint, base::TaskRunner* task_runner) : 118 // SharedMemoryArbiterImpl(start, size, page_size, false, producer_endpoint, 119 // task_runner) { 120 // } 121 122 SharedMemoryArbiterImpl(void* start, 123 size_t size, 124 ShmemMode mode, 125 size_t page_size, 126 TracingService::ProducerEndpoint*, 127 base::TaskRunner*); 128 129 // Returns a new Chunk to write tracing data. Depending on the provided 130 // BufferExhaustedPolicy, this may return an invalid chunk if no valid free 131 // chunk could be found in the SMB. 132 SharedMemoryABI::Chunk GetNewChunk(const SharedMemoryABI::ChunkHeader&, 133 BufferExhaustedPolicy); 134 135 // Puts back a Chunk that has been completed and sends a request to the 136 // service to move it to the central tracing buffer. |target_buffer| is the 137 // absolute trace buffer ID where the service should move the chunk onto (the 138 // producer is just to copy back the same number received in the 139 // DataSourceConfig upon the StartDataSource() reques). 140 // PatchList is a pointer to the list of patches for previous chunks. The 141 // first patched entries will be removed from the patched list and sent over 142 // to the service in the same CommitData() IPC request. 143 void ReturnCompletedChunk(SharedMemoryABI::Chunk, 144 MaybeUnboundBufferID target_buffer, 145 PatchList*); 146 147 // Send a request to the service to apply completed patches from |patch_list|. 148 // |writer_id| is the ID of the TraceWriter that calls this method, 149 // |target_buffer| is the global trace buffer ID of its target buffer. 150 void SendPatches(WriterID writer_id, 151 MaybeUnboundBufferID target_buffer, 152 PatchList* patch_list); 153 shmem_abi_for_testing()154 SharedMemoryABI* shmem_abi_for_testing() { return &shmem_abi_; } 155 set_default_layout_for_testing(SharedMemoryABI::PageLayout l)156 static void set_default_layout_for_testing(SharedMemoryABI::PageLayout l) { 157 default_page_layout = l; 158 } 159 default_page_layout_for_testing()160 static SharedMemoryABI::PageLayout default_page_layout_for_testing() { 161 return default_page_layout; 162 } 163 164 // SharedMemoryArbiter implementation. 165 // See include/perfetto/tracing/core/shared_memory_arbiter.h for comments. 166 std::unique_ptr<TraceWriter> CreateTraceWriter( 167 BufferID target_buffer, 168 BufferExhaustedPolicy = BufferExhaustedPolicy::kDefault) override; 169 std::unique_ptr<TraceWriter> CreateStartupTraceWriter( 170 uint16_t target_buffer_reservation_id) override; 171 void BindToProducerEndpoint(TracingService::ProducerEndpoint*, 172 base::TaskRunner*) override; 173 void BindStartupTargetBuffer(uint16_t target_buffer_reservation_id, 174 BufferID target_buffer_id) override; 175 void AbortStartupTracingForReservation( 176 uint16_t target_buffer_reservation_id) override; 177 void NotifyFlushComplete(FlushRequestID) override; 178 179 void SetBatchCommitsDuration(uint32_t batch_commits_duration_ms) override; 180 181 bool EnableDirectSMBPatching() override; 182 183 void SetDirectSMBPatchingSupportedByService() override; 184 185 void FlushPendingCommitDataRequests( 186 std::function<void()> callback = {}) override; 187 bool TryShutdown() override; 188 task_runner()189 base::TaskRunner* task_runner() const { return task_runner_; } page_size()190 size_t page_size() const { return shmem_abi_.page_size(); } num_pages()191 size_t num_pages() const { return shmem_abi_.num_pages(); } 192 GetWeakPtr()193 base::WeakPtr<SharedMemoryArbiterImpl> GetWeakPtr() const { 194 return weak_ptr_factory_.GetWeakPtr(); 195 } 196 197 private: 198 friend class TraceWriterImpl; 199 friend class StartupTraceWriterTest; 200 friend class SharedMemoryArbiterImplTest; 201 202 struct TargetBufferReservation { 203 bool resolved = false; 204 BufferID target_buffer = kInvalidBufferId; 205 }; 206 207 // Placeholder for the actual target buffer ID of a startup target buffer 208 // reservation ID in |target_buffer_reservations_|. 209 static constexpr BufferID kInvalidBufferId = 0; 210 211 static SharedMemoryABI::PageLayout default_page_layout; 212 213 SharedMemoryArbiterImpl(const SharedMemoryArbiterImpl&) = delete; 214 SharedMemoryArbiterImpl& operator=(const SharedMemoryArbiterImpl&) = delete; 215 216 void UpdateCommitDataRequest(SharedMemoryABI::Chunk chunk, 217 WriterID writer_id, 218 MaybeUnboundBufferID target_buffer, 219 PatchList* patch_list); 220 221 // Search the chunks that are being batched in |commit_data_req_| for a chunk 222 // that needs patching and that matches the provided |writer_id| and 223 // |patch.chunk_id|. If found, apply |patch| to that chunk, and if 224 // |chunk_needs_more_patching| is true, clear the needs patching flag of the 225 // chunk and mark it as complete - to allow the service to read it (and other 226 // chunks after it) during scraping. Returns true if the patch was applied, 227 // false otherwise. 228 // 229 // Note: the caller must be holding |lock_| for the duration of the call. 230 bool TryDirectPatchLocked(WriterID writer_id, 231 const Patch& patch, 232 bool chunk_needs_more_patching); 233 std::unique_ptr<TraceWriter> CreateTraceWriterInternal( 234 MaybeUnboundBufferID target_buffer, 235 BufferExhaustedPolicy); 236 237 // Called by the TraceWriter destructor. 238 void ReleaseWriterID(WriterID); 239 240 void BindStartupTargetBufferImpl(std::unique_lock<std::mutex> scoped_lock, 241 uint16_t target_buffer_reservation_id, 242 BufferID target_buffer_id); 243 244 // Returns some statistics about chunks/pages in the shared memory buffer. 245 struct Stats { 246 size_t chunks_free = 0; 247 size_t chunks_being_written = 0; 248 size_t chunks_being_read = 0; 249 size_t chunks_complete = 0; 250 251 // No chunks are included from free/malformed pages. 252 size_t pages_free = 0; 253 size_t pages_unexpected = 0; 254 }; 255 Stats GetStats(); 256 257 // If any flush callbacks were queued up while the arbiter or any target 258 // buffer reservation was unbound, this wraps the pending callbacks into a new 259 // std::function and returns it. Otherwise returns an invalid std::function. 260 std::function<void()> TakePendingFlushCallbacksLocked(); 261 262 // Replace occurrences of target buffer reservation IDs in |commit_data_req_| 263 // with their respective actual BufferIDs if they were already bound. Returns 264 // true iff all occurrences were replaced. 265 bool ReplaceCommitPlaceholderBufferIdsLocked(); 266 267 // Update and return |fully_bound_| based on the arbiter's |pending_writers_| 268 // state. 269 bool UpdateFullyBoundLocked(); 270 271 // Only accessed on |task_runner_| after the producer endpoint was bound. 272 TracingService::ProducerEndpoint* producer_endpoint_ = nullptr; 273 274 // Set to true when this instance runs in a emulation mode for a producer 275 // endpoint that doesn't support shared memory (e.g. vsock). 276 const bool use_shmem_emulation_ = false; 277 278 // --- Begin lock-protected members --- 279 280 std::mutex lock_; 281 282 base::TaskRunner* task_runner_ = nullptr; 283 SharedMemoryABI shmem_abi_; 284 size_t page_idx_ = 0; 285 std::unique_ptr<CommitDataRequest> commit_data_req_; 286 size_t bytes_pending_commit_ = 0; // SUM(chunk.size() : commit_data_req_). 287 IdAllocator<WriterID> active_writer_ids_; 288 bool did_shutdown_ = false; 289 290 // Whether the arbiter itself and all startup target buffer reservations are 291 // bound. Note that this can become false again later if a new target buffer 292 // reservation is created by calling CreateStartupTraceWriter() with a new 293 // reservation id. 294 bool fully_bound_; 295 296 // Whether the arbiter was always bound. If false, the arbiter was unbound at 297 // one point in time. 298 bool was_always_bound_; 299 300 // Whether all created trace writers were created with kDrop policy. 301 bool all_writers_have_drop_policy_ = true; 302 303 // IDs of writers and their assigned target buffers that should be registered 304 // with the service after the arbiter and/or their startup target buffer is 305 // bound. 306 std::map<WriterID, MaybeUnboundBufferID> pending_writers_; 307 308 // Callbacks for flush requests issued while the arbiter or a target buffer 309 // reservation was unbound. 310 std::vector<std::function<void()>> pending_flush_callbacks_; 311 312 // See SharedMemoryArbiter::SetBatchCommitsDuration. 313 uint32_t batch_commits_duration_ms_ = 0; 314 315 // See SharedMemoryArbiter::EnableDirectSMBPatching. 316 bool direct_patching_enabled_ = false; 317 318 // See SharedMemoryArbiter::SetDirectSMBPatchingSupportedByService. 319 bool direct_patching_supported_by_service_ = false; 320 321 // Indicates whether we have already scheduled a delayed flush for the 322 // purposes of batching. Set to true at the beginning of a batching period and 323 // cleared at the end of the period. Immediate flushes that happen during a 324 // batching period will empty the |commit_data_req| (triggering an immediate 325 // IPC to the service), but will not clear this flag and the 326 // previously-scheduled delayed flush will still occur at the end of the 327 // batching period. 328 bool delayed_flush_scheduled_ = false; 329 330 // Stores target buffer reservations for writers created via 331 // CreateStartupTraceWriter(). A bound reservation sets 332 // TargetBufferReservation::resolved to true and is associated with the actual 333 // BufferID supplied in BindStartupTargetBuffer(). 334 // 335 // TODO(eseckler): Clean up entries from this map. This would probably require 336 // a method in SharedMemoryArbiter that allows a producer to invalidate a 337 // reservation ID. 338 std::map<MaybeUnboundBufferID, TargetBufferReservation> 339 target_buffer_reservations_; 340 341 // --- End lock-protected members --- 342 343 // Keep at the end. 344 base::WeakPtrFactory<SharedMemoryArbiterImpl> weak_ptr_factory_; 345 }; 346 347 } // namespace perfetto 348 349 #endif // SRC_TRACING_CORE_SHARED_MEMORY_ARBITER_IMPL_H_ 350