xref: /aosp_15_r20/external/grpc-grpc/include/grpcpp/support/client_callback.h (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2018 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPCPP_SUPPORT_CLIENT_CALLBACK_H
20 #define GRPCPP_SUPPORT_CLIENT_CALLBACK_H
21 
22 #include <atomic>
23 #include <functional>
24 
25 #include <grpc/grpc.h>
26 #include <grpc/impl/call.h>
27 #include <grpc/support/log.h>
28 #include <grpcpp/impl/call.h>
29 #include <grpcpp/impl/call_op_set.h>
30 #include <grpcpp/impl/sync.h>
31 #include <grpcpp/support/callback_common.h>
32 #include <grpcpp/support/config.h>
33 #include <grpcpp/support/status.h>
34 
35 namespace grpc {
36 class Channel;
37 class ClientContext;
38 
39 namespace internal {
40 class RpcMethod;
41 
42 /// Perform a callback-based unary call.  May optionally specify the base
43 /// class of the Request and Response so that the internal calls and structures
44 /// below this may be based on those base classes and thus achieve code reuse
45 /// across different RPCs (e.g., for protobuf, MessageLite would be a base
46 /// class).
47 /// TODO(vjpai): Combine as much as possible with the blocking unary call code
48 template <class InputMessage, class OutputMessage,
49           class BaseInputMessage = InputMessage,
50           class BaseOutputMessage = OutputMessage>
CallbackUnaryCall(grpc::ChannelInterface * channel,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,const InputMessage * request,OutputMessage * result,std::function<void (grpc::Status)> on_completion)51 void CallbackUnaryCall(grpc::ChannelInterface* channel,
52                        const grpc::internal::RpcMethod& method,
53                        grpc::ClientContext* context,
54                        const InputMessage* request, OutputMessage* result,
55                        std::function<void(grpc::Status)> on_completion) {
56   static_assert(std::is_base_of<BaseInputMessage, InputMessage>::value,
57                 "Invalid input message specification");
58   static_assert(std::is_base_of<BaseOutputMessage, OutputMessage>::value,
59                 "Invalid output message specification");
60   CallbackUnaryCallImpl<BaseInputMessage, BaseOutputMessage> x(
61       channel, method, context, request, result, on_completion);
62 }
63 
64 template <class InputMessage, class OutputMessage>
65 class CallbackUnaryCallImpl {
66  public:
CallbackUnaryCallImpl(grpc::ChannelInterface * channel,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,const InputMessage * request,OutputMessage * result,std::function<void (grpc::Status)> on_completion)67   CallbackUnaryCallImpl(grpc::ChannelInterface* channel,
68                         const grpc::internal::RpcMethod& method,
69                         grpc::ClientContext* context,
70                         const InputMessage* request, OutputMessage* result,
71                         std::function<void(grpc::Status)> on_completion) {
72     grpc::CompletionQueue* cq = channel->CallbackCQ();
73     GPR_ASSERT(cq != nullptr);
74     grpc::internal::Call call(channel->CreateCall(method, context, cq));
75 
76     using FullCallOpSet = grpc::internal::CallOpSet<
77         grpc::internal::CallOpSendInitialMetadata,
78         grpc::internal::CallOpSendMessage,
79         grpc::internal::CallOpRecvInitialMetadata,
80         grpc::internal::CallOpRecvMessage<OutputMessage>,
81         grpc::internal::CallOpClientSendClose,
82         grpc::internal::CallOpClientRecvStatus>;
83 
84     struct OpSetAndTag {
85       FullCallOpSet opset;
86       grpc::internal::CallbackWithStatusTag tag;
87     };
88     const size_t alloc_sz = sizeof(OpSetAndTag);
89     auto* const alloced =
90         static_cast<OpSetAndTag*>(grpc_call_arena_alloc(call.call(), alloc_sz));
91     auto* ops = new (&alloced->opset) FullCallOpSet;
92     auto* tag = new (&alloced->tag)
93         grpc::internal::CallbackWithStatusTag(call.call(), on_completion, ops);
94 
95     // TODO(vjpai): Unify code with sync API as much as possible
96     grpc::Status s = ops->SendMessagePtr(request);
97     if (!s.ok()) {
98       tag->force_run(s);
99       return;
100     }
101     ops->SendInitialMetadata(&context->send_initial_metadata_,
102                              context->initial_metadata_flags());
103     ops->RecvInitialMetadata(context);
104     ops->RecvMessage(result);
105     ops->AllowNoMessage();
106     ops->ClientSendClose();
107     ops->ClientRecvStatus(context, tag->status_ptr());
108     ops->set_core_cq_tag(tag);
109     call.PerformOps(ops);
110   }
111 };
112 
113 // Base class for public API classes.
114 class ClientReactor {
115  public:
116   virtual ~ClientReactor() = default;
117 
118   /// Called by the library when all operations associated with this RPC have
119   /// completed and all Holds have been removed. OnDone provides the RPC status
120   /// outcome for both successful and failed RPCs. If it is never called on an
121   /// RPC, it indicates an application-level problem (like failure to remove a
122   /// hold).
123   ///
124   /// \param[in] s The status outcome of this RPC
125   virtual void OnDone(const grpc::Status& /*s*/) = 0;
126 
127   /// InternalTrailersOnly is not part of the API and is not meant to be
128   /// overridden. It is virtual to allow successful builds for certain bazel
129   /// build users that only want to depend on gRPC codegen headers and not the
130   /// full library (although this is not a generally-supported option). Although
131   /// the virtual call is slower than a direct call, this function is
132   /// heavyweight and the cost of the virtual call is not much in comparison.
133   /// This function may be removed or devirtualized in the future.
134   virtual bool InternalTrailersOnly(const grpc_call* call) const;
135 };
136 
137 }  // namespace internal
138 
139 // Forward declarations
140 template <class Request, class Response>
141 class ClientBidiReactor;
142 template <class Response>
143 class ClientReadReactor;
144 template <class Request>
145 class ClientWriteReactor;
146 class ClientUnaryReactor;
147 
148 // NOTE: The streaming objects are not actually implemented in the public API.
149 //       These interfaces are provided for mocking only. Typical applications
150 //       will interact exclusively with the reactors that they define.
151 template <class Request, class Response>
152 class ClientCallbackReaderWriter {
153  public:
~ClientCallbackReaderWriter()154   virtual ~ClientCallbackReaderWriter() {}
155   virtual void StartCall() = 0;
156   virtual void Write(const Request* req, grpc::WriteOptions options) = 0;
157   virtual void WritesDone() = 0;
158   virtual void Read(Response* resp) = 0;
159   virtual void AddHold(int holds) = 0;
160   virtual void RemoveHold() = 0;
161 
162  protected:
BindReactor(ClientBidiReactor<Request,Response> * reactor)163   void BindReactor(ClientBidiReactor<Request, Response>* reactor) {
164     reactor->BindStream(this);
165   }
166 };
167 
168 template <class Response>
169 class ClientCallbackReader {
170  public:
~ClientCallbackReader()171   virtual ~ClientCallbackReader() {}
172   virtual void StartCall() = 0;
173   virtual void Read(Response* resp) = 0;
174   virtual void AddHold(int holds) = 0;
175   virtual void RemoveHold() = 0;
176 
177  protected:
BindReactor(ClientReadReactor<Response> * reactor)178   void BindReactor(ClientReadReactor<Response>* reactor) {
179     reactor->BindReader(this);
180   }
181 };
182 
183 template <class Request>
184 class ClientCallbackWriter {
185  public:
~ClientCallbackWriter()186   virtual ~ClientCallbackWriter() {}
187   virtual void StartCall() = 0;
Write(const Request * req)188   void Write(const Request* req) { Write(req, grpc::WriteOptions()); }
189   virtual void Write(const Request* req, grpc::WriteOptions options) = 0;
WriteLast(const Request * req,grpc::WriteOptions options)190   void WriteLast(const Request* req, grpc::WriteOptions options) {
191     Write(req, options.set_last_message());
192   }
193   virtual void WritesDone() = 0;
194 
195   virtual void AddHold(int holds) = 0;
196   virtual void RemoveHold() = 0;
197 
198  protected:
BindReactor(ClientWriteReactor<Request> * reactor)199   void BindReactor(ClientWriteReactor<Request>* reactor) {
200     reactor->BindWriter(this);
201   }
202 };
203 
204 class ClientCallbackUnary {
205  public:
~ClientCallbackUnary()206   virtual ~ClientCallbackUnary() {}
207   virtual void StartCall() = 0;
208 
209  protected:
210   void BindReactor(ClientUnaryReactor* reactor);
211 };
212 
213 // The following classes are the reactor interfaces that are to be implemented
214 // by the user. They are passed in to the library as an argument to a call on a
215 // stub (either a codegen-ed call or a generic call). The streaming RPC is
216 // activated by calling StartCall, possibly after initiating StartRead,
217 // StartWrite, or AddHold operations on the streaming object. Note that none of
218 // the classes are pure; all reactions have a default empty reaction so that the
219 // user class only needs to override those reactions that it cares about.
220 // The reactor must be passed to the stub invocation before any of the below
221 // operations can be called and its reactions will be invoked by the library in
222 // response to the completion of various operations. Reactions must not include
223 // blocking operations (such as blocking I/O, starting synchronous RPCs, or
224 // waiting on condition variables). Reactions may be invoked concurrently,
225 // except that OnDone is called after all others (assuming proper API usage).
226 // The reactor may not be deleted until OnDone is called.
227 
228 /// \a ClientBidiReactor is the interface for a bidirectional streaming RPC.
229 template <class Request, class Response>
230 class ClientBidiReactor : public internal::ClientReactor {
231  public:
232   /// Activate the RPC and initiate any reads or writes that have been Start'ed
233   /// before this call. All streaming RPCs issued by the client MUST have
234   /// StartCall invoked on them (even if they are canceled) as this call is the
235   /// activation of their lifecycle.
StartCall()236   void StartCall() { stream_->StartCall(); }
237 
238   /// Initiate a read operation (or post it for later initiation if StartCall
239   /// has not yet been invoked).
240   ///
241   /// \param[out] resp Where to eventually store the read message. Valid when
242   ///                  the library calls OnReadDone
StartRead(Response * resp)243   void StartRead(Response* resp) { stream_->Read(resp); }
244 
245   /// Initiate a write operation (or post it for later initiation if StartCall
246   /// has not yet been invoked).
247   ///
248   /// \param[in] req The message to be written. The library does not take
249   ///                ownership but the caller must ensure that the message is
250   ///                not deleted or modified until OnWriteDone is called.
StartWrite(const Request * req)251   void StartWrite(const Request* req) { StartWrite(req, grpc::WriteOptions()); }
252 
253   /// Initiate/post a write operation with specified options.
254   ///
255   /// \param[in] req The message to be written. The library does not take
256   ///                ownership but the caller must ensure that the message is
257   ///                not deleted or modified until OnWriteDone is called.
258   /// \param[in] options The WriteOptions to use for writing this message
StartWrite(const Request * req,grpc::WriteOptions options)259   void StartWrite(const Request* req, grpc::WriteOptions options) {
260     stream_->Write(req, options);
261   }
262 
263   /// Initiate/post a write operation with specified options and an indication
264   /// that this is the last write (like StartWrite and StartWritesDone, merged).
265   /// Note that calling this means that no more calls to StartWrite,
266   /// StartWriteLast, or StartWritesDone are allowed.
267   ///
268   /// \param[in] req The message to be written. The library does not take
269   ///                ownership but the caller must ensure that the message is
270   ///                not deleted or modified until OnWriteDone is called.
271   /// \param[in] options The WriteOptions to use for writing this message
StartWriteLast(const Request * req,grpc::WriteOptions options)272   void StartWriteLast(const Request* req, grpc::WriteOptions options) {
273     StartWrite(req, options.set_last_message());
274   }
275 
276   /// Indicate that the RPC will have no more write operations. This can only be
277   /// issued once for a given RPC. This is not required or allowed if
278   /// StartWriteLast is used since that already has the same implication.
279   /// Note that calling this means that no more calls to StartWrite,
280   /// StartWriteLast, or StartWritesDone are allowed.
StartWritesDone()281   void StartWritesDone() { stream_->WritesDone(); }
282 
283   /// Holds are needed if (and only if) this stream has operations that take
284   /// place on it after StartCall but from outside one of the reactions
285   /// (OnReadDone, etc). This is _not_ a common use of the streaming API.
286   ///
287   /// Holds must be added before calling StartCall. If a stream still has a hold
288   /// in place, its resources will not be destroyed even if the status has
289   /// already come in from the wire and there are currently no active callbacks
290   /// outstanding. Similarly, the stream will not call OnDone if there are still
291   /// holds on it.
292   ///
293   /// For example, if a StartRead or StartWrite operation is going to be
294   /// initiated from elsewhere in the application, the application should call
295   /// AddHold or AddMultipleHolds before StartCall.  If there is going to be,
296   /// for example, a read-flow and a write-flow taking place outside the
297   /// reactions, then call AddMultipleHolds(2) before StartCall. When the
298   /// application knows that it won't issue any more read operations (such as
299   /// when a read comes back as not ok), it should issue a RemoveHold(). It
300   /// should also call RemoveHold() again after it does StartWriteLast or
301   /// StartWritesDone that indicates that there will be no more write ops.
302   /// The number of RemoveHold calls must match the total number of AddHold
303   /// calls plus the number of holds added by AddMultipleHolds.
304   /// The argument to AddMultipleHolds must be positive.
AddHold()305   void AddHold() { AddMultipleHolds(1); }
AddMultipleHolds(int holds)306   void AddMultipleHolds(int holds) {
307     GPR_DEBUG_ASSERT(holds > 0);
308     stream_->AddHold(holds);
309   }
RemoveHold()310   void RemoveHold() { stream_->RemoveHold(); }
311 
312   /// Notifies the application that all operations associated with this RPC
313   /// have completed and all Holds have been removed. OnDone provides the RPC
314   /// status outcome for both successful and failed RPCs and will be called in
315   /// all cases. If it is not called, it indicates an application-level problem
316   /// (like failure to remove a hold).
317   ///
318   /// \param[in] s The status outcome of this RPC
OnDone(const grpc::Status &)319   void OnDone(const grpc::Status& /*s*/) override {}
320 
321   /// Notifies the application that a read of initial metadata from the
322   /// server is done. If the application chooses not to implement this method,
323   /// it can assume that the initial metadata has been read before the first
324   /// call of OnReadDone or OnDone.
325   ///
326   /// \param[in] ok Was the initial metadata read successfully? If false, no
327   ///               new read/write operation will succeed, and any further
328   ///               Start* operations should not be called.
OnReadInitialMetadataDone(bool)329   virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
330 
331   /// Notifies the application that a StartRead operation completed.
332   ///
333   /// \param[in] ok Was it successful? If false, no new read/write operation
334   ///               will succeed, and any further Start* should not be called.
OnReadDone(bool)335   virtual void OnReadDone(bool /*ok*/) {}
336 
337   /// Notifies the application that a StartWrite or StartWriteLast operation
338   /// completed.
339   ///
340   /// \param[in] ok Was it successful? If false, no new read/write operation
341   ///               will succeed, and any further Start* should not be called.
OnWriteDone(bool)342   virtual void OnWriteDone(bool /*ok*/) {}
343 
344   /// Notifies the application that a StartWritesDone operation completed. Note
345   /// that this is only used on explicit StartWritesDone operations and not for
346   /// those that are implicitly invoked as part of a StartWriteLast.
347   ///
348   /// \param[in] ok Was it successful? If false, the application will later see
349   ///               the failure reflected as a bad status in OnDone and no
350   ///               further Start* should be called.
OnWritesDoneDone(bool)351   virtual void OnWritesDoneDone(bool /*ok*/) {}
352 
353  private:
354   friend class ClientCallbackReaderWriter<Request, Response>;
BindStream(ClientCallbackReaderWriter<Request,Response> * stream)355   void BindStream(ClientCallbackReaderWriter<Request, Response>* stream) {
356     stream_ = stream;
357   }
358   ClientCallbackReaderWriter<Request, Response>* stream_;
359 };
360 
361 /// \a ClientReadReactor is the interface for a server-streaming RPC.
362 /// All public methods behave as in ClientBidiReactor.
363 template <class Response>
364 class ClientReadReactor : public internal::ClientReactor {
365  public:
StartCall()366   void StartCall() { reader_->StartCall(); }
StartRead(Response * resp)367   void StartRead(Response* resp) { reader_->Read(resp); }
368 
AddHold()369   void AddHold() { AddMultipleHolds(1); }
AddMultipleHolds(int holds)370   void AddMultipleHolds(int holds) {
371     GPR_DEBUG_ASSERT(holds > 0);
372     reader_->AddHold(holds);
373   }
RemoveHold()374   void RemoveHold() { reader_->RemoveHold(); }
375 
OnDone(const grpc::Status &)376   void OnDone(const grpc::Status& /*s*/) override {}
OnReadInitialMetadataDone(bool)377   virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
OnReadDone(bool)378   virtual void OnReadDone(bool /*ok*/) {}
379 
380  private:
381   friend class ClientCallbackReader<Response>;
BindReader(ClientCallbackReader<Response> * reader)382   void BindReader(ClientCallbackReader<Response>* reader) { reader_ = reader; }
383   ClientCallbackReader<Response>* reader_;
384 };
385 
386 /// \a ClientWriteReactor is the interface for a client-streaming RPC.
387 /// All public methods behave as in ClientBidiReactor.
388 template <class Request>
389 class ClientWriteReactor : public internal::ClientReactor {
390  public:
StartCall()391   void StartCall() { writer_->StartCall(); }
StartWrite(const Request * req)392   void StartWrite(const Request* req) { StartWrite(req, grpc::WriteOptions()); }
StartWrite(const Request * req,grpc::WriteOptions options)393   void StartWrite(const Request* req, grpc::WriteOptions options) {
394     writer_->Write(req, options);
395   }
StartWriteLast(const Request * req,grpc::WriteOptions options)396   void StartWriteLast(const Request* req, grpc::WriteOptions options) {
397     StartWrite(req, options.set_last_message());
398   }
StartWritesDone()399   void StartWritesDone() { writer_->WritesDone(); }
400 
AddHold()401   void AddHold() { AddMultipleHolds(1); }
AddMultipleHolds(int holds)402   void AddMultipleHolds(int holds) {
403     GPR_DEBUG_ASSERT(holds > 0);
404     writer_->AddHold(holds);
405   }
RemoveHold()406   void RemoveHold() { writer_->RemoveHold(); }
407 
OnDone(const grpc::Status &)408   void OnDone(const grpc::Status& /*s*/) override {}
OnReadInitialMetadataDone(bool)409   virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
OnWriteDone(bool)410   virtual void OnWriteDone(bool /*ok*/) {}
OnWritesDoneDone(bool)411   virtual void OnWritesDoneDone(bool /*ok*/) {}
412 
413  private:
414   friend class ClientCallbackWriter<Request>;
BindWriter(ClientCallbackWriter<Request> * writer)415   void BindWriter(ClientCallbackWriter<Request>* writer) { writer_ = writer; }
416 
417   ClientCallbackWriter<Request>* writer_;
418 };
419 
420 /// \a ClientUnaryReactor is a reactor-style interface for a unary RPC.
421 /// This is _not_ a common way of invoking a unary RPC. In practice, this
422 /// option should be used only if the unary RPC wants to receive initial
423 /// metadata without waiting for the response to complete. Most deployments of
424 /// RPC systems do not use this option, but it is needed for generality.
425 /// All public methods behave as in ClientBidiReactor.
426 /// StartCall is included for consistency with the other reactor flavors: even
427 /// though there are no StartRead or StartWrite operations to queue before the
428 /// call (that is part of the unary call itself) and there is no reactor object
429 /// being created as a result of this call, we keep a consistent 2-phase
430 /// initiation API among all the reactor flavors.
431 class ClientUnaryReactor : public internal::ClientReactor {
432  public:
StartCall()433   void StartCall() { call_->StartCall(); }
OnDone(const grpc::Status &)434   void OnDone(const grpc::Status& /*s*/) override {}
OnReadInitialMetadataDone(bool)435   virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
436 
437  private:
438   friend class ClientCallbackUnary;
BindCall(ClientCallbackUnary * call)439   void BindCall(ClientCallbackUnary* call) { call_ = call; }
440   ClientCallbackUnary* call_;
441 };
442 
443 // Define function out-of-line from class to avoid forward declaration issue
BindReactor(ClientUnaryReactor * reactor)444 inline void ClientCallbackUnary::BindReactor(ClientUnaryReactor* reactor) {
445   reactor->BindCall(this);
446 }
447 
448 namespace internal {
449 
450 // Forward declare factory classes for friendship
451 template <class Request, class Response>
452 class ClientCallbackReaderWriterFactory;
453 template <class Response>
454 class ClientCallbackReaderFactory;
455 template <class Request>
456 class ClientCallbackWriterFactory;
457 
458 template <class Request, class Response>
459 class ClientCallbackReaderWriterImpl
460     : public ClientCallbackReaderWriter<Request, Response> {
461  public:
462   // always allocated against a call arena, no memory free required
delete(void *,std::size_t size)463   static void operator delete(void* /*ptr*/, std::size_t size) {
464     GPR_ASSERT(size == sizeof(ClientCallbackReaderWriterImpl));
465   }
466 
467   // This operator should never be called as the memory should be freed as part
468   // of the arena destruction. It only exists to provide a matching operator
469   // delete to the operator new so that some compilers will not complain (see
470   // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
471   // there are no tests catching the compiler warning.
delete(void *,void *)472   static void operator delete(void*, void*) { GPR_ASSERT(false); }
473 
StartCall()474   void StartCall() ABSL_LOCKS_EXCLUDED(start_mu_) override {
475     // This call initiates two batches, plus any backlog, each with a callback
476     // 1. Send initial metadata (unless corked) + recv initial metadata
477     // 2. Any read backlog
478     // 3. Any write backlog
479     // 4. Recv trailing metadata (unless corked)
480     if (!start_corked_) {
481       start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
482                                      context_->initial_metadata_flags());
483     }
484 
485     call_.PerformOps(&start_ops_);
486 
487     {
488       grpc::internal::MutexLock lock(&start_mu_);
489 
490       if (backlog_.read_ops) {
491         call_.PerformOps(&read_ops_);
492       }
493       if (backlog_.write_ops) {
494         call_.PerformOps(&write_ops_);
495       }
496       if (backlog_.writes_done_ops) {
497         call_.PerformOps(&writes_done_ops_);
498       }
499       call_.PerformOps(&finish_ops_);
500       // The last thing in this critical section is to set started_ so that it
501       // can be used lock-free as well.
502       started_.store(true, std::memory_order_release);
503     }
504     // MaybeFinish outside the lock to make sure that destruction of this object
505     // doesn't take place while holding the lock (which would cause the lock to
506     // be released after destruction)
507     this->MaybeFinish(/*from_reaction=*/false);
508   }
509 
Read(Response * msg)510   void Read(Response* msg) override {
511     read_ops_.RecvMessage(msg);
512     callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
513     if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
514       grpc::internal::MutexLock lock(&start_mu_);
515       if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
516         backlog_.read_ops = true;
517         return;
518       }
519     }
520     call_.PerformOps(&read_ops_);
521   }
522 
Write(const Request * msg,grpc::WriteOptions options)523   void Write(const Request* msg, grpc::WriteOptions options)
524       ABSL_LOCKS_EXCLUDED(start_mu_) override {
525     if (options.is_last_message()) {
526       options.set_buffer_hint();
527       write_ops_.ClientSendClose();
528     }
529     // TODO(vjpai): don't assert
530     GPR_ASSERT(write_ops_.SendMessagePtr(msg, options).ok());
531     callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
532     if (GPR_UNLIKELY(corked_write_needed_)) {
533       write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
534                                      context_->initial_metadata_flags());
535       corked_write_needed_ = false;
536     }
537 
538     if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
539       grpc::internal::MutexLock lock(&start_mu_);
540       if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
541         backlog_.write_ops = true;
542         return;
543       }
544     }
545     call_.PerformOps(&write_ops_);
546   }
WritesDone()547   void WritesDone() ABSL_LOCKS_EXCLUDED(start_mu_) override {
548     writes_done_ops_.ClientSendClose();
549     writes_done_tag_.Set(
550         call_.call(),
551         [this](bool ok) {
552           reactor_->OnWritesDoneDone(ok);
553           MaybeFinish(/*from_reaction=*/true);
554         },
555         &writes_done_ops_, /*can_inline=*/false);
556     writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
557     callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
558     if (GPR_UNLIKELY(corked_write_needed_)) {
559       writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
560                                            context_->initial_metadata_flags());
561       corked_write_needed_ = false;
562     }
563     if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
564       grpc::internal::MutexLock lock(&start_mu_);
565       if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
566         backlog_.writes_done_ops = true;
567         return;
568       }
569     }
570     call_.PerformOps(&writes_done_ops_);
571   }
572 
AddHold(int holds)573   void AddHold(int holds) override {
574     callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
575   }
RemoveHold()576   void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
577 
578  private:
579   friend class ClientCallbackReaderWriterFactory<Request, Response>;
580 
ClientCallbackReaderWriterImpl(grpc::internal::Call call,grpc::ClientContext * context,ClientBidiReactor<Request,Response> * reactor)581   ClientCallbackReaderWriterImpl(grpc::internal::Call call,
582                                  grpc::ClientContext* context,
583                                  ClientBidiReactor<Request, Response>* reactor)
584       : context_(context),
585         call_(call),
586         reactor_(reactor),
587         start_corked_(context_->initial_metadata_corked_),
588         corked_write_needed_(start_corked_) {
589     this->BindReactor(reactor);
590 
591     // Set up the unchanging parts of the start, read, and write tags and ops.
592     start_tag_.Set(
593         call_.call(),
594         [this](bool ok) {
595           reactor_->OnReadInitialMetadataDone(
596               ok && !reactor_->InternalTrailersOnly(call_.call()));
597           MaybeFinish(/*from_reaction=*/true);
598         },
599         &start_ops_, /*can_inline=*/false);
600     start_ops_.RecvInitialMetadata(context_);
601     start_ops_.set_core_cq_tag(&start_tag_);
602 
603     write_tag_.Set(
604         call_.call(),
605         [this](bool ok) {
606           reactor_->OnWriteDone(ok);
607           MaybeFinish(/*from_reaction=*/true);
608         },
609         &write_ops_, /*can_inline=*/false);
610     write_ops_.set_core_cq_tag(&write_tag_);
611 
612     read_tag_.Set(
613         call_.call(),
614         [this](bool ok) {
615           reactor_->OnReadDone(ok);
616           MaybeFinish(/*from_reaction=*/true);
617         },
618         &read_ops_, /*can_inline=*/false);
619     read_ops_.set_core_cq_tag(&read_tag_);
620 
621     // Also set up the Finish tag and op set.
622     finish_tag_.Set(
623         call_.call(),
624         [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
625         &finish_ops_,
626         /*can_inline=*/false);
627     finish_ops_.ClientRecvStatus(context_, &finish_status_);
628     finish_ops_.set_core_cq_tag(&finish_tag_);
629   }
630 
631   // MaybeFinish can be called from reactions or from user-initiated operations
632   // like StartCall or RemoveHold. If this is the last operation or hold on this
633   // object, it will invoke the OnDone reaction. If MaybeFinish was called from
634   // a reaction, it can call OnDone directly. If not, it would need to schedule
635   // OnDone onto an executor thread to avoid the possibility of deadlocking with
636   // any locks in the user code that invoked it.
MaybeFinish(bool from_reaction)637   void MaybeFinish(bool from_reaction) {
638     if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
639                          1, std::memory_order_acq_rel) == 1)) {
640       grpc::Status s = std::move(finish_status_);
641       auto* reactor = reactor_;
642       auto* call = call_.call();
643       this->~ClientCallbackReaderWriterImpl();
644       if (GPR_LIKELY(from_reaction)) {
645         grpc_call_unref(call);
646         reactor->OnDone(s);
647       } else {
648         grpc_call_run_in_event_engine(
649             call, [reactor, s = std::move(s)]() { reactor->OnDone(s); });
650         grpc_call_unref(call);
651       }
652     }
653   }
654 
655   grpc::ClientContext* const context_;
656   grpc::internal::Call call_;
657   ClientBidiReactor<Request, Response>* const reactor_;
658 
659   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
660                             grpc::internal::CallOpRecvInitialMetadata>
661       start_ops_;
662   grpc::internal::CallbackWithSuccessTag start_tag_;
663   const bool start_corked_;
664   bool corked_write_needed_;  // no lock needed since only accessed in
665                               // Write/WritesDone which cannot be concurrent
666 
667   grpc::internal::CallOpSet<grpc::internal::CallOpClientRecvStatus> finish_ops_;
668   grpc::internal::CallbackWithSuccessTag finish_tag_;
669   grpc::Status finish_status_;
670 
671   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
672                             grpc::internal::CallOpSendMessage,
673                             grpc::internal::CallOpClientSendClose>
674       write_ops_;
675   grpc::internal::CallbackWithSuccessTag write_tag_;
676 
677   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
678                             grpc::internal::CallOpClientSendClose>
679       writes_done_ops_;
680   grpc::internal::CallbackWithSuccessTag writes_done_tag_;
681 
682   grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<Response>>
683       read_ops_;
684   grpc::internal::CallbackWithSuccessTag read_tag_;
685 
686   struct StartCallBacklog {
687     bool write_ops = false;
688     bool writes_done_ops = false;
689     bool read_ops = false;
690   };
691   StartCallBacklog backlog_ ABSL_GUARDED_BY(start_mu_);
692 
693   // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish
694   std::atomic<intptr_t> callbacks_outstanding_{3};
695   std::atomic_bool started_{false};
696   grpc::internal::Mutex start_mu_;
697 };
698 
699 template <class Request, class Response>
700 class ClientCallbackReaderWriterFactory {
701  public:
Create(grpc::ChannelInterface * channel,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,ClientBidiReactor<Request,Response> * reactor)702   static void Create(grpc::ChannelInterface* channel,
703                      const grpc::internal::RpcMethod& method,
704                      grpc::ClientContext* context,
705                      ClientBidiReactor<Request, Response>* reactor) {
706     grpc::internal::Call call =
707         channel->CreateCall(method, context, channel->CallbackCQ());
708 
709     grpc_call_ref(call.call());
710     new (grpc_call_arena_alloc(
711         call.call(), sizeof(ClientCallbackReaderWriterImpl<Request, Response>)))
712         ClientCallbackReaderWriterImpl<Request, Response>(call, context,
713                                                           reactor);
714   }
715 };
716 
717 template <class Response>
718 class ClientCallbackReaderImpl : public ClientCallbackReader<Response> {
719  public:
720   // always allocated against a call arena, no memory free required
delete(void *,std::size_t size)721   static void operator delete(void* /*ptr*/, std::size_t size) {
722     GPR_ASSERT(size == sizeof(ClientCallbackReaderImpl));
723   }
724 
725   // This operator should never be called as the memory should be freed as part
726   // of the arena destruction. It only exists to provide a matching operator
727   // delete to the operator new so that some compilers will not complain (see
728   // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
729   // there are no tests catching the compiler warning.
delete(void *,void *)730   static void operator delete(void*, void*) { GPR_ASSERT(false); }
731 
StartCall()732   void StartCall() override {
733     // This call initiates two batches, plus any backlog, each with a callback
734     // 1. Send initial metadata (unless corked) + recv initial metadata
735     // 2. Any backlog
736     // 3. Recv trailing metadata
737 
738     start_tag_.Set(
739         call_.call(),
740         [this](bool ok) {
741           reactor_->OnReadInitialMetadataDone(
742               ok && !reactor_->InternalTrailersOnly(call_.call()));
743           MaybeFinish(/*from_reaction=*/true);
744         },
745         &start_ops_, /*can_inline=*/false);
746     start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
747                                    context_->initial_metadata_flags());
748     start_ops_.RecvInitialMetadata(context_);
749     start_ops_.set_core_cq_tag(&start_tag_);
750     call_.PerformOps(&start_ops_);
751 
752     // Also set up the read tag so it doesn't have to be set up each time
753     read_tag_.Set(
754         call_.call(),
755         [this](bool ok) {
756           reactor_->OnReadDone(ok);
757           MaybeFinish(/*from_reaction=*/true);
758         },
759         &read_ops_, /*can_inline=*/false);
760     read_ops_.set_core_cq_tag(&read_tag_);
761 
762     {
763       grpc::internal::MutexLock lock(&start_mu_);
764       if (backlog_.read_ops) {
765         call_.PerformOps(&read_ops_);
766       }
767       started_.store(true, std::memory_order_release);
768     }
769 
770     finish_tag_.Set(
771         call_.call(),
772         [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
773         &finish_ops_, /*can_inline=*/false);
774     finish_ops_.ClientRecvStatus(context_, &finish_status_);
775     finish_ops_.set_core_cq_tag(&finish_tag_);
776     call_.PerformOps(&finish_ops_);
777   }
778 
Read(Response * msg)779   void Read(Response* msg) override {
780     read_ops_.RecvMessage(msg);
781     callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
782     if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
783       grpc::internal::MutexLock lock(&start_mu_);
784       if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
785         backlog_.read_ops = true;
786         return;
787       }
788     }
789     call_.PerformOps(&read_ops_);
790   }
791 
AddHold(int holds)792   void AddHold(int holds) override {
793     callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
794   }
RemoveHold()795   void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
796 
797  private:
798   friend class ClientCallbackReaderFactory<Response>;
799 
800   template <class Request>
ClientCallbackReaderImpl(grpc::internal::Call call,grpc::ClientContext * context,Request * request,ClientReadReactor<Response> * reactor)801   ClientCallbackReaderImpl(grpc::internal::Call call,
802                            grpc::ClientContext* context, Request* request,
803                            ClientReadReactor<Response>* reactor)
804       : context_(context), call_(call), reactor_(reactor) {
805     this->BindReactor(reactor);
806     // TODO(vjpai): don't assert
807     GPR_ASSERT(start_ops_.SendMessagePtr(request).ok());
808     start_ops_.ClientSendClose();
809   }
810 
811   // MaybeFinish behaves as in ClientCallbackReaderWriterImpl.
MaybeFinish(bool from_reaction)812   void MaybeFinish(bool from_reaction) {
813     if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
814                          1, std::memory_order_acq_rel) == 1)) {
815       grpc::Status s = std::move(finish_status_);
816       auto* reactor = reactor_;
817       auto* call = call_.call();
818       this->~ClientCallbackReaderImpl();
819       if (GPR_LIKELY(from_reaction)) {
820         grpc_call_unref(call);
821         reactor->OnDone(s);
822       } else {
823         grpc_call_run_in_event_engine(
824             call, [reactor, s = std::move(s)]() { reactor->OnDone(s); });
825         grpc_call_unref(call);
826       }
827     }
828   }
829 
830   grpc::ClientContext* const context_;
831   grpc::internal::Call call_;
832   ClientReadReactor<Response>* const reactor_;
833 
834   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
835                             grpc::internal::CallOpSendMessage,
836                             grpc::internal::CallOpClientSendClose,
837                             grpc::internal::CallOpRecvInitialMetadata>
838       start_ops_;
839   grpc::internal::CallbackWithSuccessTag start_tag_;
840 
841   grpc::internal::CallOpSet<grpc::internal::CallOpClientRecvStatus> finish_ops_;
842   grpc::internal::CallbackWithSuccessTag finish_tag_;
843   grpc::Status finish_status_;
844 
845   grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<Response>>
846       read_ops_;
847   grpc::internal::CallbackWithSuccessTag read_tag_;
848 
849   struct StartCallBacklog {
850     bool read_ops = false;
851   };
852   StartCallBacklog backlog_ ABSL_GUARDED_BY(start_mu_);
853 
854   // Minimum of 2 callbacks to pre-register for start and finish
855   std::atomic<intptr_t> callbacks_outstanding_{2};
856   std::atomic_bool started_{false};
857   grpc::internal::Mutex start_mu_;
858 };
859 
860 template <class Response>
861 class ClientCallbackReaderFactory {
862  public:
863   template <class Request>
Create(grpc::ChannelInterface * channel,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,const Request * request,ClientReadReactor<Response> * reactor)864   static void Create(grpc::ChannelInterface* channel,
865                      const grpc::internal::RpcMethod& method,
866                      grpc::ClientContext* context, const Request* request,
867                      ClientReadReactor<Response>* reactor) {
868     grpc::internal::Call call =
869         channel->CreateCall(method, context, channel->CallbackCQ());
870 
871     grpc_call_ref(call.call());
872     new (grpc_call_arena_alloc(call.call(),
873                                sizeof(ClientCallbackReaderImpl<Response>)))
874         ClientCallbackReaderImpl<Response>(call, context, request, reactor);
875   }
876 };
877 
878 template <class Request>
879 class ClientCallbackWriterImpl : public ClientCallbackWriter<Request> {
880  public:
881   // always allocated against a call arena, no memory free required
delete(void *,std::size_t size)882   static void operator delete(void* /*ptr*/, std::size_t size) {
883     GPR_ASSERT(size == sizeof(ClientCallbackWriterImpl));
884   }
885 
886   // This operator should never be called as the memory should be freed as part
887   // of the arena destruction. It only exists to provide a matching operator
888   // delete to the operator new so that some compilers will not complain (see
889   // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
890   // there are no tests catching the compiler warning.
delete(void *,void *)891   static void operator delete(void*, void*) { GPR_ASSERT(false); }
892 
StartCall()893   void StartCall() ABSL_LOCKS_EXCLUDED(start_mu_) override {
894     // This call initiates two batches, plus any backlog, each with a callback
895     // 1. Send initial metadata (unless corked) + recv initial metadata
896     // 2. Any backlog
897     // 3. Recv trailing metadata
898 
899     if (!start_corked_) {
900       start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
901                                      context_->initial_metadata_flags());
902     }
903     call_.PerformOps(&start_ops_);
904 
905     {
906       grpc::internal::MutexLock lock(&start_mu_);
907 
908       if (backlog_.write_ops) {
909         call_.PerformOps(&write_ops_);
910       }
911       if (backlog_.writes_done_ops) {
912         call_.PerformOps(&writes_done_ops_);
913       }
914       call_.PerformOps(&finish_ops_);
915       // The last thing in this critical section is to set started_ so that it
916       // can be used lock-free as well.
917       started_.store(true, std::memory_order_release);
918     }
919     // MaybeFinish outside the lock to make sure that destruction of this object
920     // doesn't take place while holding the lock (which would cause the lock to
921     // be released after destruction)
922     this->MaybeFinish(/*from_reaction=*/false);
923   }
924 
Write(const Request * msg,grpc::WriteOptions options)925   void Write(const Request* msg, grpc::WriteOptions options)
926       ABSL_LOCKS_EXCLUDED(start_mu_) override {
927     if (GPR_UNLIKELY(options.is_last_message())) {
928       options.set_buffer_hint();
929       write_ops_.ClientSendClose();
930     }
931     // TODO(vjpai): don't assert
932     GPR_ASSERT(write_ops_.SendMessagePtr(msg, options).ok());
933     callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
934 
935     if (GPR_UNLIKELY(corked_write_needed_)) {
936       write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
937                                      context_->initial_metadata_flags());
938       corked_write_needed_ = false;
939     }
940 
941     if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
942       grpc::internal::MutexLock lock(&start_mu_);
943       if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
944         backlog_.write_ops = true;
945         return;
946       }
947     }
948     call_.PerformOps(&write_ops_);
949   }
950 
WritesDone()951   void WritesDone() ABSL_LOCKS_EXCLUDED(start_mu_) override {
952     writes_done_ops_.ClientSendClose();
953     writes_done_tag_.Set(
954         call_.call(),
955         [this](bool ok) {
956           reactor_->OnWritesDoneDone(ok);
957           MaybeFinish(/*from_reaction=*/true);
958         },
959         &writes_done_ops_, /*can_inline=*/false);
960     writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
961     callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
962 
963     if (GPR_UNLIKELY(corked_write_needed_)) {
964       writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
965                                            context_->initial_metadata_flags());
966       corked_write_needed_ = false;
967     }
968 
969     if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
970       grpc::internal::MutexLock lock(&start_mu_);
971       if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
972         backlog_.writes_done_ops = true;
973         return;
974       }
975     }
976     call_.PerformOps(&writes_done_ops_);
977   }
978 
AddHold(int holds)979   void AddHold(int holds) override {
980     callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
981   }
RemoveHold()982   void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
983 
984  private:
985   friend class ClientCallbackWriterFactory<Request>;
986 
987   template <class Response>
ClientCallbackWriterImpl(grpc::internal::Call call,grpc::ClientContext * context,Response * response,ClientWriteReactor<Request> * reactor)988   ClientCallbackWriterImpl(grpc::internal::Call call,
989                            grpc::ClientContext* context, Response* response,
990                            ClientWriteReactor<Request>* reactor)
991       : context_(context),
992         call_(call),
993         reactor_(reactor),
994         start_corked_(context_->initial_metadata_corked_),
995         corked_write_needed_(start_corked_) {
996     this->BindReactor(reactor);
997 
998     // Set up the unchanging parts of the start and write tags and ops.
999     start_tag_.Set(
1000         call_.call(),
1001         [this](bool ok) {
1002           reactor_->OnReadInitialMetadataDone(
1003               ok && !reactor_->InternalTrailersOnly(call_.call()));
1004           MaybeFinish(/*from_reaction=*/true);
1005         },
1006         &start_ops_, /*can_inline=*/false);
1007     start_ops_.RecvInitialMetadata(context_);
1008     start_ops_.set_core_cq_tag(&start_tag_);
1009 
1010     write_tag_.Set(
1011         call_.call(),
1012         [this](bool ok) {
1013           reactor_->OnWriteDone(ok);
1014           MaybeFinish(/*from_reaction=*/true);
1015         },
1016         &write_ops_, /*can_inline=*/false);
1017     write_ops_.set_core_cq_tag(&write_tag_);
1018 
1019     // Also set up the Finish tag and op set.
1020     finish_ops_.RecvMessage(response);
1021     finish_ops_.AllowNoMessage();
1022     finish_tag_.Set(
1023         call_.call(),
1024         [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
1025         &finish_ops_,
1026         /*can_inline=*/false);
1027     finish_ops_.ClientRecvStatus(context_, &finish_status_);
1028     finish_ops_.set_core_cq_tag(&finish_tag_);
1029   }
1030 
1031   // MaybeFinish behaves as in ClientCallbackReaderWriterImpl.
MaybeFinish(bool from_reaction)1032   void MaybeFinish(bool from_reaction) {
1033     if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1034                          1, std::memory_order_acq_rel) == 1)) {
1035       grpc::Status s = std::move(finish_status_);
1036       auto* reactor = reactor_;
1037       auto* call = call_.call();
1038       this->~ClientCallbackWriterImpl();
1039       if (GPR_LIKELY(from_reaction)) {
1040         grpc_call_unref(call);
1041         reactor->OnDone(s);
1042       } else {
1043         grpc_call_run_in_event_engine(
1044             call, [reactor, s = std::move(s)]() { reactor->OnDone(s); });
1045         grpc_call_unref(call);
1046       }
1047     }
1048   }
1049 
1050   grpc::ClientContext* const context_;
1051   grpc::internal::Call call_;
1052   ClientWriteReactor<Request>* const reactor_;
1053 
1054   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
1055                             grpc::internal::CallOpRecvInitialMetadata>
1056       start_ops_;
1057   grpc::internal::CallbackWithSuccessTag start_tag_;
1058   const bool start_corked_;
1059   bool corked_write_needed_;  // no lock needed since only accessed in
1060                               // Write/WritesDone which cannot be concurrent
1061 
1062   grpc::internal::CallOpSet<grpc::internal::CallOpGenericRecvMessage,
1063                             grpc::internal::CallOpClientRecvStatus>
1064       finish_ops_;
1065   grpc::internal::CallbackWithSuccessTag finish_tag_;
1066   grpc::Status finish_status_;
1067 
1068   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
1069                             grpc::internal::CallOpSendMessage,
1070                             grpc::internal::CallOpClientSendClose>
1071       write_ops_;
1072   grpc::internal::CallbackWithSuccessTag write_tag_;
1073 
1074   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
1075                             grpc::internal::CallOpClientSendClose>
1076       writes_done_ops_;
1077   grpc::internal::CallbackWithSuccessTag writes_done_tag_;
1078 
1079   struct StartCallBacklog {
1080     bool write_ops = false;
1081     bool writes_done_ops = false;
1082   };
1083   StartCallBacklog backlog_ ABSL_GUARDED_BY(start_mu_);
1084 
1085   // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish
1086   std::atomic<intptr_t> callbacks_outstanding_{3};
1087   std::atomic_bool started_{false};
1088   grpc::internal::Mutex start_mu_;
1089 };
1090 
1091 template <class Request>
1092 class ClientCallbackWriterFactory {
1093  public:
1094   template <class Response>
Create(grpc::ChannelInterface * channel,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,Response * response,ClientWriteReactor<Request> * reactor)1095   static void Create(grpc::ChannelInterface* channel,
1096                      const grpc::internal::RpcMethod& method,
1097                      grpc::ClientContext* context, Response* response,
1098                      ClientWriteReactor<Request>* reactor) {
1099     grpc::internal::Call call =
1100         channel->CreateCall(method, context, channel->CallbackCQ());
1101 
1102     grpc_call_ref(call.call());
1103     new (grpc_call_arena_alloc(call.call(),
1104                                sizeof(ClientCallbackWriterImpl<Request>)))
1105         ClientCallbackWriterImpl<Request>(call, context, response, reactor);
1106   }
1107 };
1108 
1109 class ClientCallbackUnaryImpl final : public ClientCallbackUnary {
1110  public:
1111   // always allocated against a call arena, no memory free required
delete(void *,std::size_t size)1112   static void operator delete(void* /*ptr*/, std::size_t size) {
1113     GPR_ASSERT(size == sizeof(ClientCallbackUnaryImpl));
1114   }
1115 
1116   // This operator should never be called as the memory should be freed as part
1117   // of the arena destruction. It only exists to provide a matching operator
1118   // delete to the operator new so that some compilers will not complain (see
1119   // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
1120   // there are no tests catching the compiler warning.
delete(void *,void *)1121   static void operator delete(void*, void*) { GPR_ASSERT(false); }
1122 
StartCall()1123   void StartCall() override {
1124     // This call initiates two batches, each with a callback
1125     // 1. Send initial metadata + write + writes done + recv initial metadata
1126     // 2. Read message, recv trailing metadata
1127 
1128     start_tag_.Set(
1129         call_.call(),
1130         [this](bool ok) {
1131           reactor_->OnReadInitialMetadataDone(
1132               ok && !reactor_->InternalTrailersOnly(call_.call()));
1133           MaybeFinish();
1134         },
1135         &start_ops_, /*can_inline=*/false);
1136     start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
1137                                    context_->initial_metadata_flags());
1138     start_ops_.RecvInitialMetadata(context_);
1139     start_ops_.set_core_cq_tag(&start_tag_);
1140     call_.PerformOps(&start_ops_);
1141 
1142     finish_tag_.Set(
1143         call_.call(), [this](bool /*ok*/) { MaybeFinish(); }, &finish_ops_,
1144         /*can_inline=*/false);
1145     finish_ops_.ClientRecvStatus(context_, &finish_status_);
1146     finish_ops_.set_core_cq_tag(&finish_tag_);
1147     call_.PerformOps(&finish_ops_);
1148   }
1149 
1150  private:
1151   friend class ClientCallbackUnaryFactory;
1152 
1153   template <class Request, class Response>
ClientCallbackUnaryImpl(grpc::internal::Call call,grpc::ClientContext * context,Request * request,Response * response,ClientUnaryReactor * reactor)1154   ClientCallbackUnaryImpl(grpc::internal::Call call,
1155                           grpc::ClientContext* context, Request* request,
1156                           Response* response, ClientUnaryReactor* reactor)
1157       : context_(context), call_(call), reactor_(reactor) {
1158     this->BindReactor(reactor);
1159     // TODO(vjpai): don't assert
1160     GPR_ASSERT(start_ops_.SendMessagePtr(request).ok());
1161     start_ops_.ClientSendClose();
1162     finish_ops_.RecvMessage(response);
1163     finish_ops_.AllowNoMessage();
1164   }
1165 
1166   // In the unary case, MaybeFinish is only ever invoked from a
1167   // library-initiated reaction, so it will just directly call OnDone if this is
1168   // the last reaction for this RPC.
MaybeFinish()1169   void MaybeFinish() {
1170     if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1171                          1, std::memory_order_acq_rel) == 1)) {
1172       grpc::Status s = std::move(finish_status_);
1173       auto* reactor = reactor_;
1174       auto* call = call_.call();
1175       this->~ClientCallbackUnaryImpl();
1176       grpc_call_unref(call);
1177       reactor->OnDone(s);
1178     }
1179   }
1180 
1181   grpc::ClientContext* const context_;
1182   grpc::internal::Call call_;
1183   ClientUnaryReactor* const reactor_;
1184 
1185   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
1186                             grpc::internal::CallOpSendMessage,
1187                             grpc::internal::CallOpClientSendClose,
1188                             grpc::internal::CallOpRecvInitialMetadata>
1189       start_ops_;
1190   grpc::internal::CallbackWithSuccessTag start_tag_;
1191 
1192   grpc::internal::CallOpSet<grpc::internal::CallOpGenericRecvMessage,
1193                             grpc::internal::CallOpClientRecvStatus>
1194       finish_ops_;
1195   grpc::internal::CallbackWithSuccessTag finish_tag_;
1196   grpc::Status finish_status_;
1197 
1198   // This call will have 2 callbacks: start and finish
1199   std::atomic<intptr_t> callbacks_outstanding_{2};
1200 };
1201 
1202 class ClientCallbackUnaryFactory {
1203  public:
1204   template <class Request, class Response, class BaseRequest = Request,
1205             class BaseResponse = Response>
Create(grpc::ChannelInterface * channel,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,const Request * request,Response * response,ClientUnaryReactor * reactor)1206   static void Create(grpc::ChannelInterface* channel,
1207                      const grpc::internal::RpcMethod& method,
1208                      grpc::ClientContext* context, const Request* request,
1209                      Response* response, ClientUnaryReactor* reactor) {
1210     grpc::internal::Call call =
1211         channel->CreateCall(method, context, channel->CallbackCQ());
1212 
1213     grpc_call_ref(call.call());
1214 
1215     new (grpc_call_arena_alloc(call.call(), sizeof(ClientCallbackUnaryImpl)))
1216         ClientCallbackUnaryImpl(call, context,
1217                                 static_cast<const BaseRequest*>(request),
1218                                 static_cast<BaseResponse*>(response), reactor);
1219   }
1220 };
1221 
1222 }  // namespace internal
1223 }  // namespace grpc
1224 
1225 #endif  // GRPCPP_SUPPORT_CLIENT_CALLBACK_H
1226