1 // Copyright 2023 gRPC authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef GRPC_SRC_CORE_LIB_SURFACE_WAIT_FOR_CQ_END_OP_H 16 #define GRPC_SRC_CORE_LIB_SURFACE_WAIT_FOR_CQ_END_OP_H 17 18 #include <grpc/support/port_platform.h> 19 20 #include "src/core/lib/iomgr/exec_ctx.h" 21 #include "src/core/lib/promise/activity.h" 22 #include "src/core/lib/surface/completion_queue.h" 23 24 namespace grpc_core { 25 26 // Defines a promise that calls grpc_cq_end_op() (on first poll) and then waits 27 // for the callback supplied to grpc_cq_end_op() to be called, before resolving 28 // to Empty{} 29 class WaitForCqEndOp { 30 public: WaitForCqEndOp(bool is_closure,void * tag,grpc_error_handle error,grpc_completion_queue * cq)31 WaitForCqEndOp(bool is_closure, void* tag, grpc_error_handle error, 32 grpc_completion_queue* cq) 33 : state_{NotStarted{is_closure, tag, std::move(error), cq}} {} 34 35 Poll<Empty> operator()(); 36 37 WaitForCqEndOp(const WaitForCqEndOp&) = delete; 38 WaitForCqEndOp& operator=(const WaitForCqEndOp&) = delete; WaitForCqEndOp(WaitForCqEndOp && other)39 WaitForCqEndOp(WaitForCqEndOp&& other) noexcept 40 : state_(std::move(absl::get<NotStarted>(other.state_))) { 41 other.state_.emplace<Invalid>(); 42 } 43 WaitForCqEndOp& operator=(WaitForCqEndOp&& other) noexcept { 44 state_ = std::move(absl::get<NotStarted>(other.state_)); 45 other.state_.emplace<Invalid>(); 46 return *this; 47 } 48 49 private: 50 struct NotStarted { 51 bool is_closure; 52 void* tag; 53 grpc_error_handle error; 54 grpc_completion_queue* cq; 55 }; 56 struct Started { StartedStarted57 explicit Started(Waker waker) : waker(std::move(waker)) {} 58 Waker waker; 59 grpc_cq_completion completion; 60 std::atomic<bool> done{false}; 61 }; 62 struct Invalid {}; 63 using State = absl::variant<NotStarted, Started, Invalid>; 64 65 static std::string StateString(const State& state); 66 67 State state_{Invalid{}}; 68 }; 69 70 } // namespace grpc_core 71 72 #endif // GRPC_SRC_CORE_LIB_SURFACE_WAIT_FOR_CQ_END_OP_H 73