1 // Copyright 2021 gRPC authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef GRPC_SRC_CORE_LIB_PROMISE_INTER_ACTIVITY_LATCH_H 16 #define GRPC_SRC_CORE_LIB_PROMISE_INTER_ACTIVITY_LATCH_H 17 18 #include <grpc/support/port_platform.h> 19 20 #include <stdint.h> 21 22 #include <string> 23 24 #include "absl/base/thread_annotations.h" 25 #include "absl/strings/str_cat.h" 26 27 #include <grpc/support/log.h> 28 29 #include "src/core/lib/gprpp/sync.h" 30 #include "src/core/lib/promise/activity.h" 31 #include "src/core/lib/promise/poll.h" 32 #include "src/core/lib/promise/trace.h" 33 #include "src/core/lib/promise/wait_set.h" 34 35 namespace grpc_core { 36 37 // A latch providing true cross activity wakeups 38 template <typename T> 39 class InterActivityLatch { 40 public: 41 InterActivityLatch() = default; 42 InterActivityLatch(const InterActivityLatch&) = delete; 43 InterActivityLatch& operator=(const InterActivityLatch&) = delete; 44 45 // Produce a promise to wait for this latch. Wait()46 auto Wait() { 47 return [this]() -> Poll<T> { 48 MutexLock lock(&mu_); 49 if (grpc_trace_promise_primitives.enabled()) { 50 gpr_log(GPR_INFO, "%sPollWait %s", DebugTag().c_str(), 51 StateString().c_str()); 52 } 53 if (is_set_) { 54 return std::move(value_); 55 } else { 56 return waiters_.AddPending( 57 GetContext<Activity>()->MakeNonOwningWaker()); 58 } 59 }; 60 } 61 62 // Set the latch. Set(T value)63 void Set(T value) { 64 MutexLock lock(&mu_); 65 if (grpc_trace_promise_primitives.enabled()) { 66 gpr_log(GPR_INFO, "%sSet %s", DebugTag().c_str(), StateString().c_str()); 67 } 68 is_set_ = true; 69 value_ = std::move(value); 70 waiters_.WakeupAsync(); 71 } 72 IsSet()73 bool IsSet() const ABSL_LOCKS_EXCLUDED(mu_) { 74 MutexLock lock(&mu_); 75 return is_set_; 76 } 77 78 private: DebugTag()79 std::string DebugTag() { 80 return absl::StrCat(GetContext<Activity>()->DebugTag(), 81 " INTER_ACTIVITY_LATCH[0x", 82 reinterpret_cast<uintptr_t>(this), "]: "); 83 } 84 StateString()85 std::string StateString() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { 86 return absl::StrCat("is_set:", is_set_); 87 } 88 89 mutable Mutex mu_; 90 // True if we have a value set, false otherwise. 91 bool is_set_ ABSL_GUARDED_BY(mu_) = false; 92 WaitSet waiters_ ABSL_GUARDED_BY(mu_); 93 T value_ ABSL_GUARDED_BY(mu_); 94 }; 95 96 template <> 97 class InterActivityLatch<void> { 98 public: 99 InterActivityLatch() = default; 100 InterActivityLatch(const InterActivityLatch&) = delete; 101 InterActivityLatch& operator=(const InterActivityLatch&) = delete; 102 103 // Produce a promise to wait for this latch. Wait()104 auto Wait() { 105 return [this]() -> Poll<Empty> { 106 MutexLock lock(&mu_); 107 if (grpc_trace_promise_primitives.enabled()) { 108 gpr_log(GPR_INFO, "%sPollWait %s", DebugTag().c_str(), 109 StateString().c_str()); 110 } 111 if (is_set_) { 112 return Empty{}; 113 } else { 114 return waiters_.AddPending( 115 GetContext<Activity>()->MakeNonOwningWaker()); 116 } 117 }; 118 } 119 120 // Set the latch. Set()121 void Set() { 122 MutexLock lock(&mu_); 123 if (grpc_trace_promise_primitives.enabled()) { 124 gpr_log(GPR_INFO, "%sSet %s", DebugTag().c_str(), StateString().c_str()); 125 } 126 is_set_ = true; 127 waiters_.WakeupAsync(); 128 } 129 IsSet()130 bool IsSet() const ABSL_LOCKS_EXCLUDED(mu_) { 131 MutexLock lock(&mu_); 132 return is_set_; 133 } 134 135 private: DebugTag()136 std::string DebugTag() { 137 return absl::StrCat( 138 HasContext<Activity>() ? GetContext<Activity>()->DebugTag() 139 : "NO_ACTIVITY:", 140 " INTER_ACTIVITY_LATCH[0x", reinterpret_cast<uintptr_t>(this), "]: "); 141 } 142 StateString()143 std::string StateString() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { 144 return absl::StrCat("is_set:", is_set_); 145 } 146 147 mutable Mutex mu_; 148 // True if we have a value set, false otherwise. 149 bool is_set_ ABSL_GUARDED_BY(mu_) = false; 150 WaitSet waiters_ ABSL_GUARDED_BY(mu_); 151 }; 152 153 } // namespace grpc_core 154 155 #endif // GRPC_SRC_CORE_LIB_PROMISE_INTER_ACTIVITY_LATCH_H 156