xref: /aosp_15_r20/external/grpc-grpc/src/core/lib/promise/party.h (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_SRC_CORE_LIB_PROMISE_PARTY_H
16 #define GRPC_SRC_CORE_LIB_PROMISE_PARTY_H
17 
18 #include <grpc/support/port_platform.h>
19 
20 #include <stddef.h>
21 #include <stdint.h>
22 
23 #include <atomic>
24 #include <string>
25 #include <utility>
26 
27 #include "absl/base/attributes.h"
28 #include "absl/base/thread_annotations.h"
29 #include "absl/strings/string_view.h"
30 
31 #include <grpc/event_engine/event_engine.h>
32 #include <grpc/support/log.h>
33 
34 #include "src/core/lib/gprpp/construct_destruct.h"
35 #include "src/core/lib/gprpp/crash.h"
36 #include "src/core/lib/gprpp/ref_counted.h"
37 #include "src/core/lib/gprpp/ref_counted_ptr.h"
38 #include "src/core/lib/gprpp/sync.h"
39 #include "src/core/lib/promise/activity.h"
40 #include "src/core/lib/promise/context.h"
41 #include "src/core/lib/promise/detail/promise_factory.h"
42 #include "src/core/lib/promise/poll.h"
43 #include "src/core/lib/promise/trace.h"
44 
45 // Two implementations of party synchronization are provided: one using a single
46 // atomic, the other using a mutex and a set of state variables.
47 // Originally the atomic implementation was implemented, but we found some race
48 // conditions on Arm that were not reported by our default TSAN implementation.
49 // The mutex implementation was added to see if it would fix the problem, and
50 // it did. Later we found the race condition, so there's no known reason to use
51 // the mutex version - however we keep it around as a just in case measure.
52 // There's a thought of fuzzing the two implementations against each other as
53 // a correctness check of both, but that's not implemented yet.
54 
55 extern grpc_core::DebugOnlyTraceFlag grpc_trace_party_state;
56 
57 #define GRPC_PARTY_SYNC_USING_ATOMICS
58 // #define GRPC_PARTY_SYNC_USING_MUTEX
59 
60 #if defined(GRPC_PARTY_SYNC_USING_ATOMICS) +    \
61         defined(GRPC_PARTY_SYNC_USING_MUTEX) != \
62     1
63 #error Must define a party sync mechanism
64 #endif
65 
66 namespace grpc_core {
67 
68 namespace party_detail {
69 
70 // Number of bits reserved for wakeups gives us the maximum number of
71 // participants.
72 static constexpr size_t kMaxParticipants = 16;
73 
74 }  // namespace party_detail
75 
76 class PartySyncUsingAtomics {
77  public:
PartySyncUsingAtomics(size_t initial_refs)78   explicit PartySyncUsingAtomics(size_t initial_refs)
79       : state_(kOneRef * initial_refs) {}
80 
IncrementRefCount()81   void IncrementRefCount() {
82     const uint64_t prev_state =
83         state_.fetch_add(kOneRef, std::memory_order_relaxed);
84     LogStateChange("IncrementRefCount", prev_state, prev_state + kOneRef);
85   }
86   GRPC_MUST_USE_RESULT bool RefIfNonZero();
87   // Returns true if the ref count is now zero and the caller should call
88   // PartyOver
Unref()89   GRPC_MUST_USE_RESULT bool Unref() {
90     const uint64_t prev_state =
91         state_.fetch_sub(kOneRef, std::memory_order_acq_rel);
92     LogStateChange("Unref", prev_state, prev_state - kOneRef);
93     if ((prev_state & kRefMask) == kOneRef) {
94       return UnreffedLast();
95     }
96     return false;
97   }
ForceImmediateRepoll(WakeupMask mask)98   void ForceImmediateRepoll(WakeupMask mask) {
99     // Or in the bit for the currently polling participant.
100     // Will be grabbed next round to force a repoll of this promise.
101     const uint64_t prev_state =
102         state_.fetch_or(mask, std::memory_order_relaxed);
103     LogStateChange("ForceImmediateRepoll", prev_state, prev_state | mask);
104   }
105 
106   // Run the update loop: poll_one_participant is called with an integral index
107   // for the participant that should be polled. It should return true if the
108   // participant completed and should be removed from the allocated set.
109   template <typename F>
RunParty(F poll_one_participant)110   GRPC_MUST_USE_RESULT bool RunParty(F poll_one_participant) {
111     uint64_t prev_state;
112     iteration_.fetch_add(1, std::memory_order_relaxed);
113     for (;;) {
114       // Grab the current state, and clear the wakeup bits & add flag.
115       prev_state = state_.fetch_and(kRefMask | kLocked | kAllocatedMask,
116                                     std::memory_order_acquire);
117       LogStateChange("Run", prev_state,
118                      prev_state & (kRefMask | kLocked | kAllocatedMask));
119       GPR_ASSERT(prev_state & kLocked);
120       if (prev_state & kDestroying) return true;
121       // From the previous state, extract which participants we're to wakeup.
122       uint64_t wakeups = prev_state & kWakeupMask;
123       // Now update prev_state to be what we want the CAS to see below.
124       prev_state &= kRefMask | kLocked | kAllocatedMask;
125       // For each wakeup bit...
126       for (size_t i = 0; wakeups != 0; i++, wakeups >>= 1) {
127         // If the bit is not set, skip.
128         if ((wakeups & 1) == 0) continue;
129         if (poll_one_participant(i)) {
130           const uint64_t allocated_bit = (1u << i << kAllocatedShift);
131           prev_state &= ~allocated_bit;
132           uint64_t finished_prev_state =
133               state_.fetch_and(~allocated_bit, std::memory_order_release);
134           LogStateChange("Run:ParticipantComplete", finished_prev_state,
135                          finished_prev_state & ~allocated_bit);
136         }
137       }
138       // Try to CAS the state we expected to have (with no wakeups or adds)
139       // back to unlocked (by masking in only the ref mask - sans locked bit).
140       // If this succeeds then no wakeups were added, no adds were added, and we
141       // have successfully unlocked.
142       // Otherwise, we need to loop again.
143       // Note that if an owning waker is created or the weak cas spuriously
144       // fails we will also loop again, but in that case see no wakeups or adds
145       // and so will get back here fairly quickly.
146       // TODO(ctiller): consider mitigations for the accidental wakeup on owning
147       // waker creation case -- I currently expect this will be more expensive
148       // than this quick loop.
149       if (wake_after_poll_ == 0) {
150         if (state_.compare_exchange_weak(
151                 prev_state, (prev_state & (kRefMask | kAllocatedMask)),
152                 std::memory_order_acq_rel, std::memory_order_acquire)) {
153           LogStateChange("Run:End", prev_state,
154                          prev_state & (kRefMask | kAllocatedMask));
155           return false;
156         }
157       } else {
158         if (state_.compare_exchange_weak(
159                 prev_state,
160                 (prev_state & (kRefMask | kAllocatedMask | kLocked)) |
161                     wake_after_poll_,
162                 std::memory_order_acq_rel, std::memory_order_acquire)) {
163           LogStateChange("Run:EndIteration", prev_state,
164                          prev_state & (kRefMask | kAllocatedMask));
165           iteration_.fetch_add(1, std::memory_order_relaxed);
166           wake_after_poll_ = 0;
167         }
168       }
169     }
170     return false;
171   }
172 
173   // Add new participants to the party. Returns true if the caller should run
174   // the party. store is called with an array of indices of the new
175   // participants. Adds a ref that should be dropped by the caller after
176   // RunParty has been called (if that was required).
177   template <typename F>
AddParticipantsAndRef(size_t count,F store)178   GRPC_MUST_USE_RESULT bool AddParticipantsAndRef(size_t count, F store) {
179     uint64_t state = state_.load(std::memory_order_acquire);
180     uint64_t allocated;
181 
182     size_t slots[party_detail::kMaxParticipants];
183 
184     // Find slots for each new participant, ordering them from lowest available
185     // slot upwards to ensure the same poll ordering as presentation ordering to
186     // this function.
187     WakeupMask wakeup_mask;
188     do {
189       wakeup_mask = 0;
190       allocated = (state & kAllocatedMask) >> kAllocatedShift;
191       size_t n = 0;
192       for (size_t bit = 0; n < count && bit < party_detail::kMaxParticipants;
193            bit++) {
194         if (allocated & (1 << bit)) continue;
195         wakeup_mask |= (1 << bit);
196         slots[n++] = bit;
197         allocated |= 1 << bit;
198       }
199       GPR_ASSERT(n == count);
200       // Try to allocate this slot and take a ref (atomically).
201       // Ref needs to be taken because once we store the participant it could be
202       // spuriously woken up and unref the party.
203     } while (!state_.compare_exchange_weak(
204         state, (state | (allocated << kAllocatedShift)) + kOneRef,
205         std::memory_order_acq_rel, std::memory_order_acquire));
206     LogStateChange("AddParticipantsAndRef", state,
207                    (state | (allocated << kAllocatedShift)) + kOneRef);
208 
209     store(slots);
210 
211     // Now we need to wake up the party.
212     state = state_.fetch_or(wakeup_mask | kLocked, std::memory_order_release);
213     LogStateChange("AddParticipantsAndRef:Wakeup", state,
214                    state | wakeup_mask | kLocked);
215 
216     // If the party was already locked, we're done.
217     return ((state & kLocked) == 0);
218   }
219 
220   // Schedule a wakeup for the given participant.
221   // Returns true if the caller should run the party.
222   GRPC_MUST_USE_RESULT bool ScheduleWakeup(WakeupMask mask);
223 
WakeAfterPoll(WakeupMask mask)224   void WakeAfterPoll(WakeupMask mask) { wake_after_poll_ |= mask; }
iteration()225   uint32_t iteration() const {
226     return iteration_.load(std::memory_order_relaxed);
227   }
228 
229  private:
230   bool UnreffedLast();
231 
232   void LogStateChange(const char* op, uint64_t prev_state, uint64_t new_state,
233                       DebugLocation loc = {}) {
234     if (grpc_trace_party_state.enabled()) {
235       gpr_log(loc.file(), loc.line(), GPR_LOG_SEVERITY_INFO,
236               "Party %p %30s: %016" PRIx64 " -> %016" PRIx64, this, op,
237               prev_state, new_state);
238     }
239   }
240 
241   // State bits:
242   // The atomic state_ field is composed of the following:
243   //   - 24 bits for ref counts
244   //     1 is owned by the party prior to Orphan()
245   //     All others are owned by owning wakers
246   //   - 1 bit to indicate whether the party is locked
247   //     The first thread to set this owns the party until it is unlocked
248   //     That thread will run the main loop until no further work needs to
249   //     be done.
250   //   - 1 bit to indicate whether there are participants waiting to be
251   //   added
252   //   - 16 bits, one per participant, indicating which participants have
253   //   been
254   //     woken up and should be polled next time the main loop runs.
255 
256   // clang-format off
257   // Bits used to store 16 bits of wakeups
258   static constexpr uint64_t kWakeupMask    = 0x0000'0000'0000'ffff;
259   // Bits used to store 16 bits of allocated participant slots.
260   static constexpr uint64_t kAllocatedMask = 0x0000'0000'ffff'0000;
261   // Bit indicating destruction has begun (refs went to zero)
262   static constexpr uint64_t kDestroying    = 0x0000'0001'0000'0000;
263   // Bit indicating locked or not
264   static constexpr uint64_t kLocked        = 0x0000'0008'0000'0000;
265   // Bits used to store 24 bits of ref counts
266   static constexpr uint64_t kRefMask       = 0xffff'ff00'0000'0000;
267   // clang-format on
268 
269   // Shift to get from a participant mask to an allocated mask.
270   static constexpr size_t kAllocatedShift = 16;
271   // How far to shift to get the refcount
272   static constexpr size_t kRefShift = 40;
273   // One ref count
274   static constexpr uint64_t kOneRef = 1ull << kRefShift;
275 
276   std::atomic<uint64_t> state_;
277   std::atomic<uint32_t> iteration_{0};
278   WakeupMask wake_after_poll_ = 0;
279 };
280 
281 class PartySyncUsingMutex {
282  public:
PartySyncUsingMutex(size_t initial_refs)283   explicit PartySyncUsingMutex(size_t initial_refs) : refs_(initial_refs) {}
284 
285   void IncrementRefCount() { refs_.Ref(); }
286   GRPC_MUST_USE_RESULT bool RefIfNonZero() { return refs_.RefIfNonZero(); }
287   GRPC_MUST_USE_RESULT bool Unref() { return refs_.Unref(); }
288   void ForceImmediateRepoll(WakeupMask mask) {
289     MutexLock lock(&mu_);
290     wakeups_ |= mask;
291   }
292   template <typename F>
293   GRPC_MUST_USE_RESULT bool RunParty(F poll_one_participant) {
294     WakeupMask freed = 0;
295     while (true) {
296       ReleasableMutexLock lock(&mu_);
297       GPR_ASSERT(locked_);
298       allocated_ &= ~std::exchange(freed, 0);
299       auto wakeup = std::exchange(wakeups_, 0);
300       if (wakeup == 0) {
301         locked_ = false;
302         return false;
303       }
304       lock.Release();
305       for (size_t i = 0; wakeup != 0; i++, wakeup >>= 1) {
306         if ((wakeup & 1) == 0) continue;
307         if (poll_one_participant(i)) freed |= 1 << i;
308       }
309     }
310   }
311 
312   template <typename F>
313   GRPC_MUST_USE_RESULT bool AddParticipantsAndRef(size_t count, F store) {
314     IncrementRefCount();
315     MutexLock lock(&mu_);
316     size_t slots[party_detail::kMaxParticipants];
317     WakeupMask wakeup_mask = 0;
318     size_t n = 0;
319     for (size_t bit = 0; n < count && bit < party_detail::kMaxParticipants;
320          bit++) {
321       if (allocated_ & (1 << bit)) continue;
322       slots[n++] = bit;
323       wakeup_mask |= 1 << bit;
324       allocated_ |= 1 << bit;
325     }
326     GPR_ASSERT(n == count);
327     store(slots);
328     wakeups_ |= wakeup_mask;
329     return !std::exchange(locked_, true);
330   }
331 
332   GRPC_MUST_USE_RESULT bool ScheduleWakeup(WakeupMask mask);
333 
334  private:
335   RefCount refs_;
336   Mutex mu_;
337   WakeupMask allocated_ ABSL_GUARDED_BY(mu_) = 0;
338   WakeupMask wakeups_ ABSL_GUARDED_BY(mu_) = 0;
339   bool locked_ ABSL_GUARDED_BY(mu_) = false;
340 };
341 
342 // A Party is an Activity with multiple participant promises.
343 class Party : public Activity, private Wakeable {
344  private:
345   // Non-owning wakeup handle.
346   class Handle;
347 
348   // One participant in the party.
349   class Participant {
350    public:
351     explicit Participant(absl::string_view name) : name_(name) {}
352     // Poll the participant. Return true if complete.
353     // Participant should take care of its own deallocation in this case.
354     virtual bool PollParticipantPromise() = 0;
355 
356     // Destroy the participant before finishing.
357     virtual void Destroy() = 0;
358 
359     // Return a Handle instance for this participant.
360     Wakeable* MakeNonOwningWakeable(Party* party);
361 
362     absl::string_view name() const { return name_; }
363 
364    protected:
365     ~Participant();
366 
367    private:
368     Handle* handle_ = nullptr;
369     absl::string_view name_;
370   };
371 
372  public:
373   Party(const Party&) = delete;
374   Party& operator=(const Party&) = delete;
375 
376   // Spawn one promise into the party.
377   // The promise will be polled until it is resolved, or until the party is shut
378   // down.
379   // The on_complete callback will be called with the result of the promise if
380   // it completes.
381   // A maximum of sixteen promises can be spawned onto a party.
382   template <typename Factory, typename OnComplete>
383   void Spawn(absl::string_view name, Factory promise_factory,
384              OnComplete on_complete);
385 
386   template <typename Factory>
387   auto SpawnWaitable(absl::string_view name, Factory factory);
388 
389   void Orphan() final { Crash("unused"); }
390 
391   // Activity implementation: not allowed to be overridden by derived types.
392   void ForceImmediateRepoll(WakeupMask mask) final;
393   WakeupMask CurrentParticipant() const final {
394     GPR_DEBUG_ASSERT(currently_polling_ != kNotPolling);
395     return 1u << currently_polling_;
396   }
397   Waker MakeOwningWaker() final;
398   Waker MakeNonOwningWaker() final;
399   std::string ActivityDebugTag(WakeupMask wakeup_mask) const final;
400 
401   void IncrementRefCount() { sync_.IncrementRefCount(); }
402   void Unref() {
403     if (sync_.Unref()) PartyIsOver();
404   }
405   RefCountedPtr<Party> Ref() {
406     IncrementRefCount();
407     return RefCountedPtr<Party>(this);
408   }
409 
410   // Return a promise that resolves to Empty{} when the current party poll is
411   // complete.
412   // This is useful for implementing batching and the like: we can hold some
413   // action until the rest of the party resolves itself.
414   auto AfterCurrentPoll() {
415     GPR_DEBUG_ASSERT(GetContext<Activity>() == this);
416     sync_.WakeAfterPoll(CurrentParticipant());
417     return [this, iteration = sync_.iteration()]() -> Poll<Empty> {
418       GPR_DEBUG_ASSERT(GetContext<Activity>() == this);
419       if (iteration == sync_.iteration()) return Pending{};
420       return Empty{};
421     };
422   }
423 
424   class BulkSpawner {
425    public:
426     explicit BulkSpawner(Party* party) : party_(party) {}
427     ~BulkSpawner() {
428       party_->AddParticipants(participants_, num_participants_);
429     }
430 
431     template <typename Factory, typename OnComplete>
432     void Spawn(absl::string_view name, Factory promise_factory,
433                OnComplete on_complete);
434 
435    private:
436     Party* const party_;
437     size_t num_participants_ = 0;
438     Participant* participants_[party_detail::kMaxParticipants];
439   };
440 
441  protected:
442   explicit Party(size_t initial_refs) : sync_(initial_refs) {}
443   ~Party() override;
444 
445   // Main run loop. Must be locked.
446   // Polls participants and drains the add queue until there is no work left to
447   // be done.
448   // Derived types will likely want to override this to set up their
449   // contexts before polling.
450   // Should not be called by derived types except as a tail call to the base
451   // class RunParty when overriding this method to add custom context.
452   // Returns true if the party is over.
453   GRPC_MUST_USE_RESULT virtual bool RunParty();
454 
455   bool RefIfNonZero() { return sync_.RefIfNonZero(); }
456 
457   // Destroy any remaining participants.
458   // Should be called by derived types in response to PartyOver.
459   // Needs to have normal context setup before calling.
460   void CancelRemainingParticipants();
461 
462  private:
463   // Concrete implementation of a participant for some promise & oncomplete
464   // type.
465   template <typename SuppliedFactory, typename OnComplete>
466   class ParticipantImpl final : public Participant {
467     using Factory = promise_detail::OncePromiseFactory<void, SuppliedFactory>;
468     using Promise = typename Factory::Promise;
469 
470    public:
471     ParticipantImpl(absl::string_view name, SuppliedFactory promise_factory,
472                     OnComplete on_complete)
473         : Participant(name), on_complete_(std::move(on_complete)) {
474       Construct(&factory_, std::move(promise_factory));
475     }
476     ~ParticipantImpl() {
477       if (!started_) {
478         Destruct(&factory_);
479       } else {
480         Destruct(&promise_);
481       }
482     }
483 
484     bool PollParticipantPromise() override {
485       if (!started_) {
486         auto p = factory_.Make();
487         Destruct(&factory_);
488         Construct(&promise_, std::move(p));
489         started_ = true;
490       }
491       auto p = promise_();
492       if (auto* r = p.value_if_ready()) {
493         on_complete_(std::move(*r));
494         delete this;
495         return true;
496       }
497       return false;
498     }
499 
500     void Destroy() override { delete this; }
501 
502    private:
503     union {
504       GPR_NO_UNIQUE_ADDRESS Factory factory_;
505       GPR_NO_UNIQUE_ADDRESS Promise promise_;
506     };
507     GPR_NO_UNIQUE_ADDRESS OnComplete on_complete_;
508     bool started_ = false;
509   };
510 
511   template <typename SuppliedFactory>
512   class PromiseParticipantImpl final
513       : public RefCounted<PromiseParticipantImpl<SuppliedFactory>,
514                           NonPolymorphicRefCount>,
515         public Participant {
516     using Factory = promise_detail::OncePromiseFactory<void, SuppliedFactory>;
517     using Promise = typename Factory::Promise;
518     using Result = typename Promise::Result;
519 
520    public:
521     PromiseParticipantImpl(absl::string_view name,
522                            SuppliedFactory promise_factory)
523         : Participant(name) {
524       Construct(&factory_, std::move(promise_factory));
525     }
526 
527     ~PromiseParticipantImpl() {
528       switch (state_.load(std::memory_order_acquire)) {
529         case State::kFactory:
530           Destruct(&factory_);
531           break;
532         case State::kPromise:
533           Destruct(&promise_);
534           break;
535         case State::kResult:
536           Destruct(&result_);
537           break;
538       }
539     }
540 
541     // Inside party poll: drive from factory -> promise -> result
542     bool PollParticipantPromise() override {
543       switch (state_.load(std::memory_order_relaxed)) {
544         case State::kFactory: {
545           auto p = factory_.Make();
546           Destruct(&factory_);
547           Construct(&promise_, std::move(p));
548           state_.store(State::kPromise, std::memory_order_relaxed);
549         }
550           ABSL_FALLTHROUGH_INTENDED;
551         case State::kPromise: {
552           auto p = promise_();
553           if (auto* r = p.value_if_ready()) {
554             Destruct(&promise_);
555             Construct(&result_, std::move(*r));
556             state_.store(State::kResult, std::memory_order_release);
557             waiter_.Wakeup();
558             this->Unref();
559             return true;
560           }
561           return false;
562         }
563         case State::kResult:
564           Crash(
565               "unreachable: promises should not be repolled after completion");
566       }
567     }
568 
569     // Outside party poll: check whether the spawning party has completed this
570     // promise.
571     Poll<Result> PollCompletion() {
572       switch (state_.load(std::memory_order_acquire)) {
573         case State::kFactory:
574         case State::kPromise:
575           return Pending{};
576         case State::kResult:
577           return std::move(result_);
578       }
579     }
580 
581     void Destroy() override { this->Unref(); }
582 
583    private:
584     enum class State : uint8_t { kFactory, kPromise, kResult };
585     union {
586       GPR_NO_UNIQUE_ADDRESS Factory factory_;
587       GPR_NO_UNIQUE_ADDRESS Promise promise_;
588       GPR_NO_UNIQUE_ADDRESS Result result_;
589     };
590     Waker waiter_{GetContext<Activity>()->MakeOwningWaker()};
591     std::atomic<State> state_{State::kFactory};
592   };
593 
594   // Notification that the party has finished and this instance can be deleted.
595   // Derived types should arrange to call CancelRemainingParticipants during
596   // this sequence.
597   virtual void PartyOver() = 0;
598 
599   // Run the locked part of the party until it is unlocked.
600   void RunLocked();
601   // Called in response to Unref() hitting zero - ultimately calls PartyOver,
602   // but needs to set some stuff up.
603   // Here so it gets compiled out of line.
604   void PartyIsOver();
605 
606   // Wakeable implementation
607   void Wakeup(WakeupMask wakeup_mask) final;
608   void WakeupAsync(WakeupMask wakeup_mask) final;
609   void Drop(WakeupMask wakeup_mask) final;
610 
611   // Add a participant (backs Spawn, after type erasure to ParticipantFactory).
612   void AddParticipants(Participant** participant, size_t count);
613   bool RunOneParticipant(int i);
614 
615   virtual grpc_event_engine::experimental::EventEngine* event_engine()
616       const = 0;
617 
618   // Sentinal value for currently_polling_ when no participant is being polled.
619   static constexpr uint8_t kNotPolling = 255;
620 
621 #ifdef GRPC_PARTY_SYNC_USING_ATOMICS
622   PartySyncUsingAtomics sync_;
623 #elif defined(GRPC_PARTY_SYNC_USING_MUTEX)
624   PartySyncUsingMutex sync_;
625 #else
626 #error No synchronization method defined
627 #endif
628 
629   uint8_t currently_polling_ = kNotPolling;
630   // All current participants, using a tagged format.
631   // If the lower bit is unset, then this is a Participant*.
632   // If the lower bit is set, then this is a ParticipantFactory*.
633   std::atomic<Participant*> participants_[party_detail::kMaxParticipants] = {};
634 };
635 
636 template <>
637 struct ContextSubclass<Party> {
638   using Base = Activity;
639 };
640 
641 template <typename Factory, typename OnComplete>
642 void Party::BulkSpawner::Spawn(absl::string_view name, Factory promise_factory,
643                                OnComplete on_complete) {
644   if (grpc_trace_promise_primitives.enabled()) {
645     gpr_log(GPR_DEBUG, "%s[bulk_spawn] On %p queue %s",
646             party_->DebugTag().c_str(), this, std::string(name).c_str());
647   }
648   participants_[num_participants_++] = new ParticipantImpl<Factory, OnComplete>(
649       name, std::move(promise_factory), std::move(on_complete));
650 }
651 
652 template <typename Factory, typename OnComplete>
653 void Party::Spawn(absl::string_view name, Factory promise_factory,
654                   OnComplete on_complete) {
655   BulkSpawner(this).Spawn(name, std::move(promise_factory),
656                           std::move(on_complete));
657 }
658 
659 template <typename Factory>
660 auto Party::SpawnWaitable(absl::string_view name, Factory promise_factory) {
661   auto participant = MakeRefCounted<PromiseParticipantImpl<Factory>>(
662       name, std::move(promise_factory));
663   Participant* p = participant->Ref().release();
664   AddParticipants(&p, 1);
665   return [participant = std::move(participant)]() mutable {
666     return participant->PollCompletion();
667   };
668 }
669 
670 }  // namespace grpc_core
671 
672 #endif  // GRPC_SRC_CORE_LIB_PROMISE_PARTY_H
673