xref: /aosp_15_r20/external/grpc-grpc/src/core/lib/surface/wait_for_cq_end_op.h (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_SRC_CORE_LIB_SURFACE_WAIT_FOR_CQ_END_OP_H
16 #define GRPC_SRC_CORE_LIB_SURFACE_WAIT_FOR_CQ_END_OP_H
17 
18 #include <grpc/support/port_platform.h>
19 
20 #include "src/core/lib/iomgr/exec_ctx.h"
21 #include "src/core/lib/promise/activity.h"
22 #include "src/core/lib/surface/completion_queue.h"
23 
24 namespace grpc_core {
25 
26 // Defines a promise that calls grpc_cq_end_op() (on first poll) and then waits
27 // for the callback supplied to grpc_cq_end_op() to be called, before resolving
28 // to Empty{}
29 class WaitForCqEndOp {
30  public:
WaitForCqEndOp(bool is_closure,void * tag,grpc_error_handle error,grpc_completion_queue * cq)31   WaitForCqEndOp(bool is_closure, void* tag, grpc_error_handle error,
32                  grpc_completion_queue* cq)
33       : state_{NotStarted{is_closure, tag, std::move(error), cq}} {}
34 
35   Poll<Empty> operator()();
36 
37   WaitForCqEndOp(const WaitForCqEndOp&) = delete;
38   WaitForCqEndOp& operator=(const WaitForCqEndOp&) = delete;
WaitForCqEndOp(WaitForCqEndOp && other)39   WaitForCqEndOp(WaitForCqEndOp&& other) noexcept
40       : state_(std::move(absl::get<NotStarted>(other.state_))) {
41     other.state_.emplace<Invalid>();
42   }
43   WaitForCqEndOp& operator=(WaitForCqEndOp&& other) noexcept {
44     state_ = std::move(absl::get<NotStarted>(other.state_));
45     other.state_.emplace<Invalid>();
46     return *this;
47   }
48 
49  private:
50   struct NotStarted {
51     bool is_closure;
52     void* tag;
53     grpc_error_handle error;
54     grpc_completion_queue* cq;
55   };
56   struct Started {
StartedStarted57     explicit Started(Waker waker) : waker(std::move(waker)) {}
58     Waker waker;
59     grpc_cq_completion completion;
60     std::atomic<bool> done{false};
61   };
62   struct Invalid {};
63   using State = absl::variant<NotStarted, Started, Invalid>;
64 
65   static std::string StateString(const State& state);
66 
67   State state_{Invalid{}};
68 };
69 
70 }  // namespace grpc_core
71 
72 #endif  // GRPC_SRC_CORE_LIB_SURFACE_WAIT_FOR_CQ_END_OP_H
73