1 // Copyright 2023 gRPC authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef GRPC_SRC_CORE_LIB_PROMISE_PARTY_H 16 #define GRPC_SRC_CORE_LIB_PROMISE_PARTY_H 17 18 #include <grpc/support/port_platform.h> 19 20 #include <stddef.h> 21 #include <stdint.h> 22 23 #include <atomic> 24 #include <string> 25 #include <utility> 26 27 #include "absl/base/attributes.h" 28 #include "absl/base/thread_annotations.h" 29 #include "absl/strings/string_view.h" 30 31 #include <grpc/event_engine/event_engine.h> 32 #include <grpc/support/log.h> 33 34 #include "src/core/lib/gprpp/construct_destruct.h" 35 #include "src/core/lib/gprpp/crash.h" 36 #include "src/core/lib/gprpp/ref_counted.h" 37 #include "src/core/lib/gprpp/ref_counted_ptr.h" 38 #include "src/core/lib/gprpp/sync.h" 39 #include "src/core/lib/promise/activity.h" 40 #include "src/core/lib/promise/context.h" 41 #include "src/core/lib/promise/detail/promise_factory.h" 42 #include "src/core/lib/promise/poll.h" 43 #include "src/core/lib/promise/trace.h" 44 45 // Two implementations of party synchronization are provided: one using a single 46 // atomic, the other using a mutex and a set of state variables. 47 // Originally the atomic implementation was implemented, but we found some race 48 // conditions on Arm that were not reported by our default TSAN implementation. 49 // The mutex implementation was added to see if it would fix the problem, and 50 // it did. Later we found the race condition, so there's no known reason to use 51 // the mutex version - however we keep it around as a just in case measure. 52 // There's a thought of fuzzing the two implementations against each other as 53 // a correctness check of both, but that's not implemented yet. 54 55 extern grpc_core::DebugOnlyTraceFlag grpc_trace_party_state; 56 57 #define GRPC_PARTY_SYNC_USING_ATOMICS 58 // #define GRPC_PARTY_SYNC_USING_MUTEX 59 60 #if defined(GRPC_PARTY_SYNC_USING_ATOMICS) + \ 61 defined(GRPC_PARTY_SYNC_USING_MUTEX) != \ 62 1 63 #error Must define a party sync mechanism 64 #endif 65 66 namespace grpc_core { 67 68 namespace party_detail { 69 70 // Number of bits reserved for wakeups gives us the maximum number of 71 // participants. 72 static constexpr size_t kMaxParticipants = 16; 73 74 } // namespace party_detail 75 76 class PartySyncUsingAtomics { 77 public: PartySyncUsingAtomics(size_t initial_refs)78 explicit PartySyncUsingAtomics(size_t initial_refs) 79 : state_(kOneRef * initial_refs) {} 80 IncrementRefCount()81 void IncrementRefCount() { 82 const uint64_t prev_state = 83 state_.fetch_add(kOneRef, std::memory_order_relaxed); 84 LogStateChange("IncrementRefCount", prev_state, prev_state + kOneRef); 85 } 86 GRPC_MUST_USE_RESULT bool RefIfNonZero(); 87 // Returns true if the ref count is now zero and the caller should call 88 // PartyOver Unref()89 GRPC_MUST_USE_RESULT bool Unref() { 90 const uint64_t prev_state = 91 state_.fetch_sub(kOneRef, std::memory_order_acq_rel); 92 LogStateChange("Unref", prev_state, prev_state - kOneRef); 93 if ((prev_state & kRefMask) == kOneRef) { 94 return UnreffedLast(); 95 } 96 return false; 97 } ForceImmediateRepoll(WakeupMask mask)98 void ForceImmediateRepoll(WakeupMask mask) { 99 // Or in the bit for the currently polling participant. 100 // Will be grabbed next round to force a repoll of this promise. 101 const uint64_t prev_state = 102 state_.fetch_or(mask, std::memory_order_relaxed); 103 LogStateChange("ForceImmediateRepoll", prev_state, prev_state | mask); 104 } 105 106 // Run the update loop: poll_one_participant is called with an integral index 107 // for the participant that should be polled. It should return true if the 108 // participant completed and should be removed from the allocated set. 109 template <typename F> RunParty(F poll_one_participant)110 GRPC_MUST_USE_RESULT bool RunParty(F poll_one_participant) { 111 uint64_t prev_state; 112 iteration_.fetch_add(1, std::memory_order_relaxed); 113 for (;;) { 114 // Grab the current state, and clear the wakeup bits & add flag. 115 prev_state = state_.fetch_and(kRefMask | kLocked | kAllocatedMask, 116 std::memory_order_acquire); 117 LogStateChange("Run", prev_state, 118 prev_state & (kRefMask | kLocked | kAllocatedMask)); 119 GPR_ASSERT(prev_state & kLocked); 120 if (prev_state & kDestroying) return true; 121 // From the previous state, extract which participants we're to wakeup. 122 uint64_t wakeups = prev_state & kWakeupMask; 123 // Now update prev_state to be what we want the CAS to see below. 124 prev_state &= kRefMask | kLocked | kAllocatedMask; 125 // For each wakeup bit... 126 for (size_t i = 0; wakeups != 0; i++, wakeups >>= 1) { 127 // If the bit is not set, skip. 128 if ((wakeups & 1) == 0) continue; 129 if (poll_one_participant(i)) { 130 const uint64_t allocated_bit = (1u << i << kAllocatedShift); 131 prev_state &= ~allocated_bit; 132 uint64_t finished_prev_state = 133 state_.fetch_and(~allocated_bit, std::memory_order_release); 134 LogStateChange("Run:ParticipantComplete", finished_prev_state, 135 finished_prev_state & ~allocated_bit); 136 } 137 } 138 // Try to CAS the state we expected to have (with no wakeups or adds) 139 // back to unlocked (by masking in only the ref mask - sans locked bit). 140 // If this succeeds then no wakeups were added, no adds were added, and we 141 // have successfully unlocked. 142 // Otherwise, we need to loop again. 143 // Note that if an owning waker is created or the weak cas spuriously 144 // fails we will also loop again, but in that case see no wakeups or adds 145 // and so will get back here fairly quickly. 146 // TODO(ctiller): consider mitigations for the accidental wakeup on owning 147 // waker creation case -- I currently expect this will be more expensive 148 // than this quick loop. 149 if (wake_after_poll_ == 0) { 150 if (state_.compare_exchange_weak( 151 prev_state, (prev_state & (kRefMask | kAllocatedMask)), 152 std::memory_order_acq_rel, std::memory_order_acquire)) { 153 LogStateChange("Run:End", prev_state, 154 prev_state & (kRefMask | kAllocatedMask)); 155 return false; 156 } 157 } else { 158 if (state_.compare_exchange_weak( 159 prev_state, 160 (prev_state & (kRefMask | kAllocatedMask | kLocked)) | 161 wake_after_poll_, 162 std::memory_order_acq_rel, std::memory_order_acquire)) { 163 LogStateChange("Run:EndIteration", prev_state, 164 prev_state & (kRefMask | kAllocatedMask)); 165 iteration_.fetch_add(1, std::memory_order_relaxed); 166 wake_after_poll_ = 0; 167 } 168 } 169 } 170 return false; 171 } 172 173 // Add new participants to the party. Returns true if the caller should run 174 // the party. store is called with an array of indices of the new 175 // participants. Adds a ref that should be dropped by the caller after 176 // RunParty has been called (if that was required). 177 template <typename F> AddParticipantsAndRef(size_t count,F store)178 GRPC_MUST_USE_RESULT bool AddParticipantsAndRef(size_t count, F store) { 179 uint64_t state = state_.load(std::memory_order_acquire); 180 uint64_t allocated; 181 182 size_t slots[party_detail::kMaxParticipants]; 183 184 // Find slots for each new participant, ordering them from lowest available 185 // slot upwards to ensure the same poll ordering as presentation ordering to 186 // this function. 187 WakeupMask wakeup_mask; 188 do { 189 wakeup_mask = 0; 190 allocated = (state & kAllocatedMask) >> kAllocatedShift; 191 size_t n = 0; 192 for (size_t bit = 0; n < count && bit < party_detail::kMaxParticipants; 193 bit++) { 194 if (allocated & (1 << bit)) continue; 195 wakeup_mask |= (1 << bit); 196 slots[n++] = bit; 197 allocated |= 1 << bit; 198 } 199 GPR_ASSERT(n == count); 200 // Try to allocate this slot and take a ref (atomically). 201 // Ref needs to be taken because once we store the participant it could be 202 // spuriously woken up and unref the party. 203 } while (!state_.compare_exchange_weak( 204 state, (state | (allocated << kAllocatedShift)) + kOneRef, 205 std::memory_order_acq_rel, std::memory_order_acquire)); 206 LogStateChange("AddParticipantsAndRef", state, 207 (state | (allocated << kAllocatedShift)) + kOneRef); 208 209 store(slots); 210 211 // Now we need to wake up the party. 212 state = state_.fetch_or(wakeup_mask | kLocked, std::memory_order_release); 213 LogStateChange("AddParticipantsAndRef:Wakeup", state, 214 state | wakeup_mask | kLocked); 215 216 // If the party was already locked, we're done. 217 return ((state & kLocked) == 0); 218 } 219 220 // Schedule a wakeup for the given participant. 221 // Returns true if the caller should run the party. 222 GRPC_MUST_USE_RESULT bool ScheduleWakeup(WakeupMask mask); 223 WakeAfterPoll(WakeupMask mask)224 void WakeAfterPoll(WakeupMask mask) { wake_after_poll_ |= mask; } iteration()225 uint32_t iteration() const { 226 return iteration_.load(std::memory_order_relaxed); 227 } 228 229 private: 230 bool UnreffedLast(); 231 232 void LogStateChange(const char* op, uint64_t prev_state, uint64_t new_state, 233 DebugLocation loc = {}) { 234 if (grpc_trace_party_state.enabled()) { 235 gpr_log(loc.file(), loc.line(), GPR_LOG_SEVERITY_INFO, 236 "Party %p %30s: %016" PRIx64 " -> %016" PRIx64, this, op, 237 prev_state, new_state); 238 } 239 } 240 241 // State bits: 242 // The atomic state_ field is composed of the following: 243 // - 24 bits for ref counts 244 // 1 is owned by the party prior to Orphan() 245 // All others are owned by owning wakers 246 // - 1 bit to indicate whether the party is locked 247 // The first thread to set this owns the party until it is unlocked 248 // That thread will run the main loop until no further work needs to 249 // be done. 250 // - 1 bit to indicate whether there are participants waiting to be 251 // added 252 // - 16 bits, one per participant, indicating which participants have 253 // been 254 // woken up and should be polled next time the main loop runs. 255 256 // clang-format off 257 // Bits used to store 16 bits of wakeups 258 static constexpr uint64_t kWakeupMask = 0x0000'0000'0000'ffff; 259 // Bits used to store 16 bits of allocated participant slots. 260 static constexpr uint64_t kAllocatedMask = 0x0000'0000'ffff'0000; 261 // Bit indicating destruction has begun (refs went to zero) 262 static constexpr uint64_t kDestroying = 0x0000'0001'0000'0000; 263 // Bit indicating locked or not 264 static constexpr uint64_t kLocked = 0x0000'0008'0000'0000; 265 // Bits used to store 24 bits of ref counts 266 static constexpr uint64_t kRefMask = 0xffff'ff00'0000'0000; 267 // clang-format on 268 269 // Shift to get from a participant mask to an allocated mask. 270 static constexpr size_t kAllocatedShift = 16; 271 // How far to shift to get the refcount 272 static constexpr size_t kRefShift = 40; 273 // One ref count 274 static constexpr uint64_t kOneRef = 1ull << kRefShift; 275 276 std::atomic<uint64_t> state_; 277 std::atomic<uint32_t> iteration_{0}; 278 WakeupMask wake_after_poll_ = 0; 279 }; 280 281 class PartySyncUsingMutex { 282 public: PartySyncUsingMutex(size_t initial_refs)283 explicit PartySyncUsingMutex(size_t initial_refs) : refs_(initial_refs) {} 284 285 void IncrementRefCount() { refs_.Ref(); } 286 GRPC_MUST_USE_RESULT bool RefIfNonZero() { return refs_.RefIfNonZero(); } 287 GRPC_MUST_USE_RESULT bool Unref() { return refs_.Unref(); } 288 void ForceImmediateRepoll(WakeupMask mask) { 289 MutexLock lock(&mu_); 290 wakeups_ |= mask; 291 } 292 template <typename F> 293 GRPC_MUST_USE_RESULT bool RunParty(F poll_one_participant) { 294 WakeupMask freed = 0; 295 while (true) { 296 ReleasableMutexLock lock(&mu_); 297 GPR_ASSERT(locked_); 298 allocated_ &= ~std::exchange(freed, 0); 299 auto wakeup = std::exchange(wakeups_, 0); 300 if (wakeup == 0) { 301 locked_ = false; 302 return false; 303 } 304 lock.Release(); 305 for (size_t i = 0; wakeup != 0; i++, wakeup >>= 1) { 306 if ((wakeup & 1) == 0) continue; 307 if (poll_one_participant(i)) freed |= 1 << i; 308 } 309 } 310 } 311 312 template <typename F> 313 GRPC_MUST_USE_RESULT bool AddParticipantsAndRef(size_t count, F store) { 314 IncrementRefCount(); 315 MutexLock lock(&mu_); 316 size_t slots[party_detail::kMaxParticipants]; 317 WakeupMask wakeup_mask = 0; 318 size_t n = 0; 319 for (size_t bit = 0; n < count && bit < party_detail::kMaxParticipants; 320 bit++) { 321 if (allocated_ & (1 << bit)) continue; 322 slots[n++] = bit; 323 wakeup_mask |= 1 << bit; 324 allocated_ |= 1 << bit; 325 } 326 GPR_ASSERT(n == count); 327 store(slots); 328 wakeups_ |= wakeup_mask; 329 return !std::exchange(locked_, true); 330 } 331 332 GRPC_MUST_USE_RESULT bool ScheduleWakeup(WakeupMask mask); 333 334 private: 335 RefCount refs_; 336 Mutex mu_; 337 WakeupMask allocated_ ABSL_GUARDED_BY(mu_) = 0; 338 WakeupMask wakeups_ ABSL_GUARDED_BY(mu_) = 0; 339 bool locked_ ABSL_GUARDED_BY(mu_) = false; 340 }; 341 342 // A Party is an Activity with multiple participant promises. 343 class Party : public Activity, private Wakeable { 344 private: 345 // Non-owning wakeup handle. 346 class Handle; 347 348 // One participant in the party. 349 class Participant { 350 public: 351 explicit Participant(absl::string_view name) : name_(name) {} 352 // Poll the participant. Return true if complete. 353 // Participant should take care of its own deallocation in this case. 354 virtual bool PollParticipantPromise() = 0; 355 356 // Destroy the participant before finishing. 357 virtual void Destroy() = 0; 358 359 // Return a Handle instance for this participant. 360 Wakeable* MakeNonOwningWakeable(Party* party); 361 362 absl::string_view name() const { return name_; } 363 364 protected: 365 ~Participant(); 366 367 private: 368 Handle* handle_ = nullptr; 369 absl::string_view name_; 370 }; 371 372 public: 373 Party(const Party&) = delete; 374 Party& operator=(const Party&) = delete; 375 376 // Spawn one promise into the party. 377 // The promise will be polled until it is resolved, or until the party is shut 378 // down. 379 // The on_complete callback will be called with the result of the promise if 380 // it completes. 381 // A maximum of sixteen promises can be spawned onto a party. 382 template <typename Factory, typename OnComplete> 383 void Spawn(absl::string_view name, Factory promise_factory, 384 OnComplete on_complete); 385 386 template <typename Factory> 387 auto SpawnWaitable(absl::string_view name, Factory factory); 388 389 void Orphan() final { Crash("unused"); } 390 391 // Activity implementation: not allowed to be overridden by derived types. 392 void ForceImmediateRepoll(WakeupMask mask) final; 393 WakeupMask CurrentParticipant() const final { 394 GPR_DEBUG_ASSERT(currently_polling_ != kNotPolling); 395 return 1u << currently_polling_; 396 } 397 Waker MakeOwningWaker() final; 398 Waker MakeNonOwningWaker() final; 399 std::string ActivityDebugTag(WakeupMask wakeup_mask) const final; 400 401 void IncrementRefCount() { sync_.IncrementRefCount(); } 402 void Unref() { 403 if (sync_.Unref()) PartyIsOver(); 404 } 405 RefCountedPtr<Party> Ref() { 406 IncrementRefCount(); 407 return RefCountedPtr<Party>(this); 408 } 409 410 // Return a promise that resolves to Empty{} when the current party poll is 411 // complete. 412 // This is useful for implementing batching and the like: we can hold some 413 // action until the rest of the party resolves itself. 414 auto AfterCurrentPoll() { 415 GPR_DEBUG_ASSERT(GetContext<Activity>() == this); 416 sync_.WakeAfterPoll(CurrentParticipant()); 417 return [this, iteration = sync_.iteration()]() -> Poll<Empty> { 418 GPR_DEBUG_ASSERT(GetContext<Activity>() == this); 419 if (iteration == sync_.iteration()) return Pending{}; 420 return Empty{}; 421 }; 422 } 423 424 class BulkSpawner { 425 public: 426 explicit BulkSpawner(Party* party) : party_(party) {} 427 ~BulkSpawner() { 428 party_->AddParticipants(participants_, num_participants_); 429 } 430 431 template <typename Factory, typename OnComplete> 432 void Spawn(absl::string_view name, Factory promise_factory, 433 OnComplete on_complete); 434 435 private: 436 Party* const party_; 437 size_t num_participants_ = 0; 438 Participant* participants_[party_detail::kMaxParticipants]; 439 }; 440 441 protected: 442 explicit Party(size_t initial_refs) : sync_(initial_refs) {} 443 ~Party() override; 444 445 // Main run loop. Must be locked. 446 // Polls participants and drains the add queue until there is no work left to 447 // be done. 448 // Derived types will likely want to override this to set up their 449 // contexts before polling. 450 // Should not be called by derived types except as a tail call to the base 451 // class RunParty when overriding this method to add custom context. 452 // Returns true if the party is over. 453 GRPC_MUST_USE_RESULT virtual bool RunParty(); 454 455 bool RefIfNonZero() { return sync_.RefIfNonZero(); } 456 457 // Destroy any remaining participants. 458 // Should be called by derived types in response to PartyOver. 459 // Needs to have normal context setup before calling. 460 void CancelRemainingParticipants(); 461 462 private: 463 // Concrete implementation of a participant for some promise & oncomplete 464 // type. 465 template <typename SuppliedFactory, typename OnComplete> 466 class ParticipantImpl final : public Participant { 467 using Factory = promise_detail::OncePromiseFactory<void, SuppliedFactory>; 468 using Promise = typename Factory::Promise; 469 470 public: 471 ParticipantImpl(absl::string_view name, SuppliedFactory promise_factory, 472 OnComplete on_complete) 473 : Participant(name), on_complete_(std::move(on_complete)) { 474 Construct(&factory_, std::move(promise_factory)); 475 } 476 ~ParticipantImpl() { 477 if (!started_) { 478 Destruct(&factory_); 479 } else { 480 Destruct(&promise_); 481 } 482 } 483 484 bool PollParticipantPromise() override { 485 if (!started_) { 486 auto p = factory_.Make(); 487 Destruct(&factory_); 488 Construct(&promise_, std::move(p)); 489 started_ = true; 490 } 491 auto p = promise_(); 492 if (auto* r = p.value_if_ready()) { 493 on_complete_(std::move(*r)); 494 delete this; 495 return true; 496 } 497 return false; 498 } 499 500 void Destroy() override { delete this; } 501 502 private: 503 union { 504 GPR_NO_UNIQUE_ADDRESS Factory factory_; 505 GPR_NO_UNIQUE_ADDRESS Promise promise_; 506 }; 507 GPR_NO_UNIQUE_ADDRESS OnComplete on_complete_; 508 bool started_ = false; 509 }; 510 511 template <typename SuppliedFactory> 512 class PromiseParticipantImpl final 513 : public RefCounted<PromiseParticipantImpl<SuppliedFactory>, 514 NonPolymorphicRefCount>, 515 public Participant { 516 using Factory = promise_detail::OncePromiseFactory<void, SuppliedFactory>; 517 using Promise = typename Factory::Promise; 518 using Result = typename Promise::Result; 519 520 public: 521 PromiseParticipantImpl(absl::string_view name, 522 SuppliedFactory promise_factory) 523 : Participant(name) { 524 Construct(&factory_, std::move(promise_factory)); 525 } 526 527 ~PromiseParticipantImpl() { 528 switch (state_.load(std::memory_order_acquire)) { 529 case State::kFactory: 530 Destruct(&factory_); 531 break; 532 case State::kPromise: 533 Destruct(&promise_); 534 break; 535 case State::kResult: 536 Destruct(&result_); 537 break; 538 } 539 } 540 541 // Inside party poll: drive from factory -> promise -> result 542 bool PollParticipantPromise() override { 543 switch (state_.load(std::memory_order_relaxed)) { 544 case State::kFactory: { 545 auto p = factory_.Make(); 546 Destruct(&factory_); 547 Construct(&promise_, std::move(p)); 548 state_.store(State::kPromise, std::memory_order_relaxed); 549 } 550 ABSL_FALLTHROUGH_INTENDED; 551 case State::kPromise: { 552 auto p = promise_(); 553 if (auto* r = p.value_if_ready()) { 554 Destruct(&promise_); 555 Construct(&result_, std::move(*r)); 556 state_.store(State::kResult, std::memory_order_release); 557 waiter_.Wakeup(); 558 this->Unref(); 559 return true; 560 } 561 return false; 562 } 563 case State::kResult: 564 Crash( 565 "unreachable: promises should not be repolled after completion"); 566 } 567 } 568 569 // Outside party poll: check whether the spawning party has completed this 570 // promise. 571 Poll<Result> PollCompletion() { 572 switch (state_.load(std::memory_order_acquire)) { 573 case State::kFactory: 574 case State::kPromise: 575 return Pending{}; 576 case State::kResult: 577 return std::move(result_); 578 } 579 } 580 581 void Destroy() override { this->Unref(); } 582 583 private: 584 enum class State : uint8_t { kFactory, kPromise, kResult }; 585 union { 586 GPR_NO_UNIQUE_ADDRESS Factory factory_; 587 GPR_NO_UNIQUE_ADDRESS Promise promise_; 588 GPR_NO_UNIQUE_ADDRESS Result result_; 589 }; 590 Waker waiter_{GetContext<Activity>()->MakeOwningWaker()}; 591 std::atomic<State> state_{State::kFactory}; 592 }; 593 594 // Notification that the party has finished and this instance can be deleted. 595 // Derived types should arrange to call CancelRemainingParticipants during 596 // this sequence. 597 virtual void PartyOver() = 0; 598 599 // Run the locked part of the party until it is unlocked. 600 void RunLocked(); 601 // Called in response to Unref() hitting zero - ultimately calls PartyOver, 602 // but needs to set some stuff up. 603 // Here so it gets compiled out of line. 604 void PartyIsOver(); 605 606 // Wakeable implementation 607 void Wakeup(WakeupMask wakeup_mask) final; 608 void WakeupAsync(WakeupMask wakeup_mask) final; 609 void Drop(WakeupMask wakeup_mask) final; 610 611 // Add a participant (backs Spawn, after type erasure to ParticipantFactory). 612 void AddParticipants(Participant** participant, size_t count); 613 bool RunOneParticipant(int i); 614 615 virtual grpc_event_engine::experimental::EventEngine* event_engine() 616 const = 0; 617 618 // Sentinal value for currently_polling_ when no participant is being polled. 619 static constexpr uint8_t kNotPolling = 255; 620 621 #ifdef GRPC_PARTY_SYNC_USING_ATOMICS 622 PartySyncUsingAtomics sync_; 623 #elif defined(GRPC_PARTY_SYNC_USING_MUTEX) 624 PartySyncUsingMutex sync_; 625 #else 626 #error No synchronization method defined 627 #endif 628 629 uint8_t currently_polling_ = kNotPolling; 630 // All current participants, using a tagged format. 631 // If the lower bit is unset, then this is a Participant*. 632 // If the lower bit is set, then this is a ParticipantFactory*. 633 std::atomic<Participant*> participants_[party_detail::kMaxParticipants] = {}; 634 }; 635 636 template <> 637 struct ContextSubclass<Party> { 638 using Base = Activity; 639 }; 640 641 template <typename Factory, typename OnComplete> 642 void Party::BulkSpawner::Spawn(absl::string_view name, Factory promise_factory, 643 OnComplete on_complete) { 644 if (grpc_trace_promise_primitives.enabled()) { 645 gpr_log(GPR_DEBUG, "%s[bulk_spawn] On %p queue %s", 646 party_->DebugTag().c_str(), this, std::string(name).c_str()); 647 } 648 participants_[num_participants_++] = new ParticipantImpl<Factory, OnComplete>( 649 name, std::move(promise_factory), std::move(on_complete)); 650 } 651 652 template <typename Factory, typename OnComplete> 653 void Party::Spawn(absl::string_view name, Factory promise_factory, 654 OnComplete on_complete) { 655 BulkSpawner(this).Spawn(name, std::move(promise_factory), 656 std::move(on_complete)); 657 } 658 659 template <typename Factory> 660 auto Party::SpawnWaitable(absl::string_view name, Factory promise_factory) { 661 auto participant = MakeRefCounted<PromiseParticipantImpl<Factory>>( 662 name, std::move(promise_factory)); 663 Participant* p = participant->Ref().release(); 664 AddParticipants(&p, 1); 665 return [participant = std::move(participant)]() mutable { 666 return participant->PollCompletion(); 667 }; 668 } 669 670 } // namespace grpc_core 671 672 #endif // GRPC_SRC_CORE_LIB_PROMISE_PARTY_H 673