xref: /aosp_15_r20/external/grpc-grpc/src/core/lib/promise/inter_activity_latch.h (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_SRC_CORE_LIB_PROMISE_INTER_ACTIVITY_LATCH_H
16 #define GRPC_SRC_CORE_LIB_PROMISE_INTER_ACTIVITY_LATCH_H
17 
18 #include <grpc/support/port_platform.h>
19 
20 #include <stdint.h>
21 
22 #include <string>
23 
24 #include "absl/base/thread_annotations.h"
25 #include "absl/strings/str_cat.h"
26 
27 #include <grpc/support/log.h>
28 
29 #include "src/core/lib/gprpp/sync.h"
30 #include "src/core/lib/promise/activity.h"
31 #include "src/core/lib/promise/poll.h"
32 #include "src/core/lib/promise/trace.h"
33 #include "src/core/lib/promise/wait_set.h"
34 
35 namespace grpc_core {
36 
37 // A latch providing true cross activity wakeups
38 template <typename T>
39 class InterActivityLatch {
40  public:
41   InterActivityLatch() = default;
42   InterActivityLatch(const InterActivityLatch&) = delete;
43   InterActivityLatch& operator=(const InterActivityLatch&) = delete;
44 
45   // Produce a promise to wait for this latch.
Wait()46   auto Wait() {
47     return [this]() -> Poll<T> {
48       MutexLock lock(&mu_);
49       if (grpc_trace_promise_primitives.enabled()) {
50         gpr_log(GPR_INFO, "%sPollWait %s", DebugTag().c_str(),
51                 StateString().c_str());
52       }
53       if (is_set_) {
54         return std::move(value_);
55       } else {
56         return waiters_.AddPending(
57             GetContext<Activity>()->MakeNonOwningWaker());
58       }
59     };
60   }
61 
62   // Set the latch.
Set(T value)63   void Set(T value) {
64     MutexLock lock(&mu_);
65     if (grpc_trace_promise_primitives.enabled()) {
66       gpr_log(GPR_INFO, "%sSet %s", DebugTag().c_str(), StateString().c_str());
67     }
68     is_set_ = true;
69     value_ = std::move(value);
70     waiters_.WakeupAsync();
71   }
72 
IsSet()73   bool IsSet() const ABSL_LOCKS_EXCLUDED(mu_) {
74     MutexLock lock(&mu_);
75     return is_set_;
76   }
77 
78  private:
DebugTag()79   std::string DebugTag() {
80     return absl::StrCat(GetContext<Activity>()->DebugTag(),
81                         " INTER_ACTIVITY_LATCH[0x",
82                         reinterpret_cast<uintptr_t>(this), "]: ");
83   }
84 
StateString()85   std::string StateString() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
86     return absl::StrCat("is_set:", is_set_);
87   }
88 
89   mutable Mutex mu_;
90   // True if we have a value set, false otherwise.
91   bool is_set_ ABSL_GUARDED_BY(mu_) = false;
92   WaitSet waiters_ ABSL_GUARDED_BY(mu_);
93   T value_ ABSL_GUARDED_BY(mu_);
94 };
95 
96 template <>
97 class InterActivityLatch<void> {
98  public:
99   InterActivityLatch() = default;
100   InterActivityLatch(const InterActivityLatch&) = delete;
101   InterActivityLatch& operator=(const InterActivityLatch&) = delete;
102 
103   // Produce a promise to wait for this latch.
Wait()104   auto Wait() {
105     return [this]() -> Poll<Empty> {
106       MutexLock lock(&mu_);
107       if (grpc_trace_promise_primitives.enabled()) {
108         gpr_log(GPR_INFO, "%sPollWait %s", DebugTag().c_str(),
109                 StateString().c_str());
110       }
111       if (is_set_) {
112         return Empty{};
113       } else {
114         return waiters_.AddPending(
115             GetContext<Activity>()->MakeNonOwningWaker());
116       }
117     };
118   }
119 
120   // Set the latch.
Set()121   void Set() {
122     MutexLock lock(&mu_);
123     if (grpc_trace_promise_primitives.enabled()) {
124       gpr_log(GPR_INFO, "%sSet %s", DebugTag().c_str(), StateString().c_str());
125     }
126     is_set_ = true;
127     waiters_.WakeupAsync();
128   }
129 
IsSet()130   bool IsSet() const ABSL_LOCKS_EXCLUDED(mu_) {
131     MutexLock lock(&mu_);
132     return is_set_;
133   }
134 
135  private:
DebugTag()136   std::string DebugTag() {
137     return absl::StrCat(
138         HasContext<Activity>() ? GetContext<Activity>()->DebugTag()
139                                : "NO_ACTIVITY:",
140         " INTER_ACTIVITY_LATCH[0x", reinterpret_cast<uintptr_t>(this), "]: ");
141   }
142 
StateString()143   std::string StateString() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
144     return absl::StrCat("is_set:", is_set_);
145   }
146 
147   mutable Mutex mu_;
148   // True if we have a value set, false otherwise.
149   bool is_set_ ABSL_GUARDED_BY(mu_) = false;
150   WaitSet waiters_ ABSL_GUARDED_BY(mu_);
151 };
152 
153 }  // namespace grpc_core
154 
155 #endif  // GRPC_SRC_CORE_LIB_PROMISE_INTER_ACTIVITY_LATCH_H
156