1 //
2 //
3 // Copyright 2018 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #ifndef GRPCPP_SUPPORT_CLIENT_CALLBACK_H
20 #define GRPCPP_SUPPORT_CLIENT_CALLBACK_H
21
22 #include <atomic>
23 #include <functional>
24
25 #include <grpc/grpc.h>
26 #include <grpc/impl/call.h>
27 #include <grpc/support/log.h>
28 #include <grpcpp/impl/call.h>
29 #include <grpcpp/impl/call_op_set.h>
30 #include <grpcpp/impl/sync.h>
31 #include <grpcpp/support/callback_common.h>
32 #include <grpcpp/support/config.h>
33 #include <grpcpp/support/status.h>
34
35 namespace grpc {
36 class Channel;
37 class ClientContext;
38
39 namespace internal {
40 class RpcMethod;
41
42 /// Perform a callback-based unary call. May optionally specify the base
43 /// class of the Request and Response so that the internal calls and structures
44 /// below this may be based on those base classes and thus achieve code reuse
45 /// across different RPCs (e.g., for protobuf, MessageLite would be a base
46 /// class).
47 /// TODO(vjpai): Combine as much as possible with the blocking unary call code
48 template <class InputMessage, class OutputMessage,
49 class BaseInputMessage = InputMessage,
50 class BaseOutputMessage = OutputMessage>
CallbackUnaryCall(grpc::ChannelInterface * channel,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,const InputMessage * request,OutputMessage * result,std::function<void (grpc::Status)> on_completion)51 void CallbackUnaryCall(grpc::ChannelInterface* channel,
52 const grpc::internal::RpcMethod& method,
53 grpc::ClientContext* context,
54 const InputMessage* request, OutputMessage* result,
55 std::function<void(grpc::Status)> on_completion) {
56 static_assert(std::is_base_of<BaseInputMessage, InputMessage>::value,
57 "Invalid input message specification");
58 static_assert(std::is_base_of<BaseOutputMessage, OutputMessage>::value,
59 "Invalid output message specification");
60 CallbackUnaryCallImpl<BaseInputMessage, BaseOutputMessage> x(
61 channel, method, context, request, result, on_completion);
62 }
63
64 template <class InputMessage, class OutputMessage>
65 class CallbackUnaryCallImpl {
66 public:
CallbackUnaryCallImpl(grpc::ChannelInterface * channel,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,const InputMessage * request,OutputMessage * result,std::function<void (grpc::Status)> on_completion)67 CallbackUnaryCallImpl(grpc::ChannelInterface* channel,
68 const grpc::internal::RpcMethod& method,
69 grpc::ClientContext* context,
70 const InputMessage* request, OutputMessage* result,
71 std::function<void(grpc::Status)> on_completion) {
72 grpc::CompletionQueue* cq = channel->CallbackCQ();
73 GPR_ASSERT(cq != nullptr);
74 grpc::internal::Call call(channel->CreateCall(method, context, cq));
75
76 using FullCallOpSet = grpc::internal::CallOpSet<
77 grpc::internal::CallOpSendInitialMetadata,
78 grpc::internal::CallOpSendMessage,
79 grpc::internal::CallOpRecvInitialMetadata,
80 grpc::internal::CallOpRecvMessage<OutputMessage>,
81 grpc::internal::CallOpClientSendClose,
82 grpc::internal::CallOpClientRecvStatus>;
83
84 struct OpSetAndTag {
85 FullCallOpSet opset;
86 grpc::internal::CallbackWithStatusTag tag;
87 };
88 const size_t alloc_sz = sizeof(OpSetAndTag);
89 auto* const alloced =
90 static_cast<OpSetAndTag*>(grpc_call_arena_alloc(call.call(), alloc_sz));
91 auto* ops = new (&alloced->opset) FullCallOpSet;
92 auto* tag = new (&alloced->tag)
93 grpc::internal::CallbackWithStatusTag(call.call(), on_completion, ops);
94
95 // TODO(vjpai): Unify code with sync API as much as possible
96 grpc::Status s = ops->SendMessagePtr(request);
97 if (!s.ok()) {
98 tag->force_run(s);
99 return;
100 }
101 ops->SendInitialMetadata(&context->send_initial_metadata_,
102 context->initial_metadata_flags());
103 ops->RecvInitialMetadata(context);
104 ops->RecvMessage(result);
105 ops->AllowNoMessage();
106 ops->ClientSendClose();
107 ops->ClientRecvStatus(context, tag->status_ptr());
108 ops->set_core_cq_tag(tag);
109 call.PerformOps(ops);
110 }
111 };
112
113 // Base class for public API classes.
114 class ClientReactor {
115 public:
116 virtual ~ClientReactor() = default;
117
118 /// Called by the library when all operations associated with this RPC have
119 /// completed and all Holds have been removed. OnDone provides the RPC status
120 /// outcome for both successful and failed RPCs. If it is never called on an
121 /// RPC, it indicates an application-level problem (like failure to remove a
122 /// hold).
123 ///
124 /// \param[in] s The status outcome of this RPC
125 virtual void OnDone(const grpc::Status& /*s*/) = 0;
126
127 /// InternalTrailersOnly is not part of the API and is not meant to be
128 /// overridden. It is virtual to allow successful builds for certain bazel
129 /// build users that only want to depend on gRPC codegen headers and not the
130 /// full library (although this is not a generally-supported option). Although
131 /// the virtual call is slower than a direct call, this function is
132 /// heavyweight and the cost of the virtual call is not much in comparison.
133 /// This function may be removed or devirtualized in the future.
134 virtual bool InternalTrailersOnly(const grpc_call* call) const;
135 };
136
137 } // namespace internal
138
139 // Forward declarations
140 template <class Request, class Response>
141 class ClientBidiReactor;
142 template <class Response>
143 class ClientReadReactor;
144 template <class Request>
145 class ClientWriteReactor;
146 class ClientUnaryReactor;
147
148 // NOTE: The streaming objects are not actually implemented in the public API.
149 // These interfaces are provided for mocking only. Typical applications
150 // will interact exclusively with the reactors that they define.
151 template <class Request, class Response>
152 class ClientCallbackReaderWriter {
153 public:
~ClientCallbackReaderWriter()154 virtual ~ClientCallbackReaderWriter() {}
155 virtual void StartCall() = 0;
156 virtual void Write(const Request* req, grpc::WriteOptions options) = 0;
157 virtual void WritesDone() = 0;
158 virtual void Read(Response* resp) = 0;
159 virtual void AddHold(int holds) = 0;
160 virtual void RemoveHold() = 0;
161
162 protected:
BindReactor(ClientBidiReactor<Request,Response> * reactor)163 void BindReactor(ClientBidiReactor<Request, Response>* reactor) {
164 reactor->BindStream(this);
165 }
166 };
167
168 template <class Response>
169 class ClientCallbackReader {
170 public:
~ClientCallbackReader()171 virtual ~ClientCallbackReader() {}
172 virtual void StartCall() = 0;
173 virtual void Read(Response* resp) = 0;
174 virtual void AddHold(int holds) = 0;
175 virtual void RemoveHold() = 0;
176
177 protected:
BindReactor(ClientReadReactor<Response> * reactor)178 void BindReactor(ClientReadReactor<Response>* reactor) {
179 reactor->BindReader(this);
180 }
181 };
182
183 template <class Request>
184 class ClientCallbackWriter {
185 public:
~ClientCallbackWriter()186 virtual ~ClientCallbackWriter() {}
187 virtual void StartCall() = 0;
Write(const Request * req)188 void Write(const Request* req) { Write(req, grpc::WriteOptions()); }
189 virtual void Write(const Request* req, grpc::WriteOptions options) = 0;
WriteLast(const Request * req,grpc::WriteOptions options)190 void WriteLast(const Request* req, grpc::WriteOptions options) {
191 Write(req, options.set_last_message());
192 }
193 virtual void WritesDone() = 0;
194
195 virtual void AddHold(int holds) = 0;
196 virtual void RemoveHold() = 0;
197
198 protected:
BindReactor(ClientWriteReactor<Request> * reactor)199 void BindReactor(ClientWriteReactor<Request>* reactor) {
200 reactor->BindWriter(this);
201 }
202 };
203
204 class ClientCallbackUnary {
205 public:
~ClientCallbackUnary()206 virtual ~ClientCallbackUnary() {}
207 virtual void StartCall() = 0;
208
209 protected:
210 void BindReactor(ClientUnaryReactor* reactor);
211 };
212
213 // The following classes are the reactor interfaces that are to be implemented
214 // by the user. They are passed in to the library as an argument to a call on a
215 // stub (either a codegen-ed call or a generic call). The streaming RPC is
216 // activated by calling StartCall, possibly after initiating StartRead,
217 // StartWrite, or AddHold operations on the streaming object. Note that none of
218 // the classes are pure; all reactions have a default empty reaction so that the
219 // user class only needs to override those reactions that it cares about.
220 // The reactor must be passed to the stub invocation before any of the below
221 // operations can be called and its reactions will be invoked by the library in
222 // response to the completion of various operations. Reactions must not include
223 // blocking operations (such as blocking I/O, starting synchronous RPCs, or
224 // waiting on condition variables). Reactions may be invoked concurrently,
225 // except that OnDone is called after all others (assuming proper API usage).
226 // The reactor may not be deleted until OnDone is called.
227
228 /// \a ClientBidiReactor is the interface for a bidirectional streaming RPC.
229 template <class Request, class Response>
230 class ClientBidiReactor : public internal::ClientReactor {
231 public:
232 /// Activate the RPC and initiate any reads or writes that have been Start'ed
233 /// before this call. All streaming RPCs issued by the client MUST have
234 /// StartCall invoked on them (even if they are canceled) as this call is the
235 /// activation of their lifecycle.
StartCall()236 void StartCall() { stream_->StartCall(); }
237
238 /// Initiate a read operation (or post it for later initiation if StartCall
239 /// has not yet been invoked).
240 ///
241 /// \param[out] resp Where to eventually store the read message. Valid when
242 /// the library calls OnReadDone
StartRead(Response * resp)243 void StartRead(Response* resp) { stream_->Read(resp); }
244
245 /// Initiate a write operation (or post it for later initiation if StartCall
246 /// has not yet been invoked).
247 ///
248 /// \param[in] req The message to be written. The library does not take
249 /// ownership but the caller must ensure that the message is
250 /// not deleted or modified until OnWriteDone is called.
StartWrite(const Request * req)251 void StartWrite(const Request* req) { StartWrite(req, grpc::WriteOptions()); }
252
253 /// Initiate/post a write operation with specified options.
254 ///
255 /// \param[in] req The message to be written. The library does not take
256 /// ownership but the caller must ensure that the message is
257 /// not deleted or modified until OnWriteDone is called.
258 /// \param[in] options The WriteOptions to use for writing this message
StartWrite(const Request * req,grpc::WriteOptions options)259 void StartWrite(const Request* req, grpc::WriteOptions options) {
260 stream_->Write(req, options);
261 }
262
263 /// Initiate/post a write operation with specified options and an indication
264 /// that this is the last write (like StartWrite and StartWritesDone, merged).
265 /// Note that calling this means that no more calls to StartWrite,
266 /// StartWriteLast, or StartWritesDone are allowed.
267 ///
268 /// \param[in] req The message to be written. The library does not take
269 /// ownership but the caller must ensure that the message is
270 /// not deleted or modified until OnWriteDone is called.
271 /// \param[in] options The WriteOptions to use for writing this message
StartWriteLast(const Request * req,grpc::WriteOptions options)272 void StartWriteLast(const Request* req, grpc::WriteOptions options) {
273 StartWrite(req, options.set_last_message());
274 }
275
276 /// Indicate that the RPC will have no more write operations. This can only be
277 /// issued once for a given RPC. This is not required or allowed if
278 /// StartWriteLast is used since that already has the same implication.
279 /// Note that calling this means that no more calls to StartWrite,
280 /// StartWriteLast, or StartWritesDone are allowed.
StartWritesDone()281 void StartWritesDone() { stream_->WritesDone(); }
282
283 /// Holds are needed if (and only if) this stream has operations that take
284 /// place on it after StartCall but from outside one of the reactions
285 /// (OnReadDone, etc). This is _not_ a common use of the streaming API.
286 ///
287 /// Holds must be added before calling StartCall. If a stream still has a hold
288 /// in place, its resources will not be destroyed even if the status has
289 /// already come in from the wire and there are currently no active callbacks
290 /// outstanding. Similarly, the stream will not call OnDone if there are still
291 /// holds on it.
292 ///
293 /// For example, if a StartRead or StartWrite operation is going to be
294 /// initiated from elsewhere in the application, the application should call
295 /// AddHold or AddMultipleHolds before StartCall. If there is going to be,
296 /// for example, a read-flow and a write-flow taking place outside the
297 /// reactions, then call AddMultipleHolds(2) before StartCall. When the
298 /// application knows that it won't issue any more read operations (such as
299 /// when a read comes back as not ok), it should issue a RemoveHold(). It
300 /// should also call RemoveHold() again after it does StartWriteLast or
301 /// StartWritesDone that indicates that there will be no more write ops.
302 /// The number of RemoveHold calls must match the total number of AddHold
303 /// calls plus the number of holds added by AddMultipleHolds.
304 /// The argument to AddMultipleHolds must be positive.
AddHold()305 void AddHold() { AddMultipleHolds(1); }
AddMultipleHolds(int holds)306 void AddMultipleHolds(int holds) {
307 GPR_DEBUG_ASSERT(holds > 0);
308 stream_->AddHold(holds);
309 }
RemoveHold()310 void RemoveHold() { stream_->RemoveHold(); }
311
312 /// Notifies the application that all operations associated with this RPC
313 /// have completed and all Holds have been removed. OnDone provides the RPC
314 /// status outcome for both successful and failed RPCs and will be called in
315 /// all cases. If it is not called, it indicates an application-level problem
316 /// (like failure to remove a hold).
317 ///
318 /// \param[in] s The status outcome of this RPC
OnDone(const grpc::Status &)319 void OnDone(const grpc::Status& /*s*/) override {}
320
321 /// Notifies the application that a read of initial metadata from the
322 /// server is done. If the application chooses not to implement this method,
323 /// it can assume that the initial metadata has been read before the first
324 /// call of OnReadDone or OnDone.
325 ///
326 /// \param[in] ok Was the initial metadata read successfully? If false, no
327 /// new read/write operation will succeed, and any further
328 /// Start* operations should not be called.
OnReadInitialMetadataDone(bool)329 virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
330
331 /// Notifies the application that a StartRead operation completed.
332 ///
333 /// \param[in] ok Was it successful? If false, no new read/write operation
334 /// will succeed, and any further Start* should not be called.
OnReadDone(bool)335 virtual void OnReadDone(bool /*ok*/) {}
336
337 /// Notifies the application that a StartWrite or StartWriteLast operation
338 /// completed.
339 ///
340 /// \param[in] ok Was it successful? If false, no new read/write operation
341 /// will succeed, and any further Start* should not be called.
OnWriteDone(bool)342 virtual void OnWriteDone(bool /*ok*/) {}
343
344 /// Notifies the application that a StartWritesDone operation completed. Note
345 /// that this is only used on explicit StartWritesDone operations and not for
346 /// those that are implicitly invoked as part of a StartWriteLast.
347 ///
348 /// \param[in] ok Was it successful? If false, the application will later see
349 /// the failure reflected as a bad status in OnDone and no
350 /// further Start* should be called.
OnWritesDoneDone(bool)351 virtual void OnWritesDoneDone(bool /*ok*/) {}
352
353 private:
354 friend class ClientCallbackReaderWriter<Request, Response>;
BindStream(ClientCallbackReaderWriter<Request,Response> * stream)355 void BindStream(ClientCallbackReaderWriter<Request, Response>* stream) {
356 stream_ = stream;
357 }
358 ClientCallbackReaderWriter<Request, Response>* stream_;
359 };
360
361 /// \a ClientReadReactor is the interface for a server-streaming RPC.
362 /// All public methods behave as in ClientBidiReactor.
363 template <class Response>
364 class ClientReadReactor : public internal::ClientReactor {
365 public:
StartCall()366 void StartCall() { reader_->StartCall(); }
StartRead(Response * resp)367 void StartRead(Response* resp) { reader_->Read(resp); }
368
AddHold()369 void AddHold() { AddMultipleHolds(1); }
AddMultipleHolds(int holds)370 void AddMultipleHolds(int holds) {
371 GPR_DEBUG_ASSERT(holds > 0);
372 reader_->AddHold(holds);
373 }
RemoveHold()374 void RemoveHold() { reader_->RemoveHold(); }
375
OnDone(const grpc::Status &)376 void OnDone(const grpc::Status& /*s*/) override {}
OnReadInitialMetadataDone(bool)377 virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
OnReadDone(bool)378 virtual void OnReadDone(bool /*ok*/) {}
379
380 private:
381 friend class ClientCallbackReader<Response>;
BindReader(ClientCallbackReader<Response> * reader)382 void BindReader(ClientCallbackReader<Response>* reader) { reader_ = reader; }
383 ClientCallbackReader<Response>* reader_;
384 };
385
386 /// \a ClientWriteReactor is the interface for a client-streaming RPC.
387 /// All public methods behave as in ClientBidiReactor.
388 template <class Request>
389 class ClientWriteReactor : public internal::ClientReactor {
390 public:
StartCall()391 void StartCall() { writer_->StartCall(); }
StartWrite(const Request * req)392 void StartWrite(const Request* req) { StartWrite(req, grpc::WriteOptions()); }
StartWrite(const Request * req,grpc::WriteOptions options)393 void StartWrite(const Request* req, grpc::WriteOptions options) {
394 writer_->Write(req, options);
395 }
StartWriteLast(const Request * req,grpc::WriteOptions options)396 void StartWriteLast(const Request* req, grpc::WriteOptions options) {
397 StartWrite(req, options.set_last_message());
398 }
StartWritesDone()399 void StartWritesDone() { writer_->WritesDone(); }
400
AddHold()401 void AddHold() { AddMultipleHolds(1); }
AddMultipleHolds(int holds)402 void AddMultipleHolds(int holds) {
403 GPR_DEBUG_ASSERT(holds > 0);
404 writer_->AddHold(holds);
405 }
RemoveHold()406 void RemoveHold() { writer_->RemoveHold(); }
407
OnDone(const grpc::Status &)408 void OnDone(const grpc::Status& /*s*/) override {}
OnReadInitialMetadataDone(bool)409 virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
OnWriteDone(bool)410 virtual void OnWriteDone(bool /*ok*/) {}
OnWritesDoneDone(bool)411 virtual void OnWritesDoneDone(bool /*ok*/) {}
412
413 private:
414 friend class ClientCallbackWriter<Request>;
BindWriter(ClientCallbackWriter<Request> * writer)415 void BindWriter(ClientCallbackWriter<Request>* writer) { writer_ = writer; }
416
417 ClientCallbackWriter<Request>* writer_;
418 };
419
420 /// \a ClientUnaryReactor is a reactor-style interface for a unary RPC.
421 /// This is _not_ a common way of invoking a unary RPC. In practice, this
422 /// option should be used only if the unary RPC wants to receive initial
423 /// metadata without waiting for the response to complete. Most deployments of
424 /// RPC systems do not use this option, but it is needed for generality.
425 /// All public methods behave as in ClientBidiReactor.
426 /// StartCall is included for consistency with the other reactor flavors: even
427 /// though there are no StartRead or StartWrite operations to queue before the
428 /// call (that is part of the unary call itself) and there is no reactor object
429 /// being created as a result of this call, we keep a consistent 2-phase
430 /// initiation API among all the reactor flavors.
431 class ClientUnaryReactor : public internal::ClientReactor {
432 public:
StartCall()433 void StartCall() { call_->StartCall(); }
OnDone(const grpc::Status &)434 void OnDone(const grpc::Status& /*s*/) override {}
OnReadInitialMetadataDone(bool)435 virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
436
437 private:
438 friend class ClientCallbackUnary;
BindCall(ClientCallbackUnary * call)439 void BindCall(ClientCallbackUnary* call) { call_ = call; }
440 ClientCallbackUnary* call_;
441 };
442
443 // Define function out-of-line from class to avoid forward declaration issue
BindReactor(ClientUnaryReactor * reactor)444 inline void ClientCallbackUnary::BindReactor(ClientUnaryReactor* reactor) {
445 reactor->BindCall(this);
446 }
447
448 namespace internal {
449
450 // Forward declare factory classes for friendship
451 template <class Request, class Response>
452 class ClientCallbackReaderWriterFactory;
453 template <class Response>
454 class ClientCallbackReaderFactory;
455 template <class Request>
456 class ClientCallbackWriterFactory;
457
458 template <class Request, class Response>
459 class ClientCallbackReaderWriterImpl
460 : public ClientCallbackReaderWriter<Request, Response> {
461 public:
462 // always allocated against a call arena, no memory free required
delete(void *,std::size_t size)463 static void operator delete(void* /*ptr*/, std::size_t size) {
464 GPR_ASSERT(size == sizeof(ClientCallbackReaderWriterImpl));
465 }
466
467 // This operator should never be called as the memory should be freed as part
468 // of the arena destruction. It only exists to provide a matching operator
469 // delete to the operator new so that some compilers will not complain (see
470 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
471 // there are no tests catching the compiler warning.
delete(void *,void *)472 static void operator delete(void*, void*) { GPR_ASSERT(false); }
473
StartCall()474 void StartCall() ABSL_LOCKS_EXCLUDED(start_mu_) override {
475 // This call initiates two batches, plus any backlog, each with a callback
476 // 1. Send initial metadata (unless corked) + recv initial metadata
477 // 2. Any read backlog
478 // 3. Any write backlog
479 // 4. Recv trailing metadata (unless corked)
480 if (!start_corked_) {
481 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
482 context_->initial_metadata_flags());
483 }
484
485 call_.PerformOps(&start_ops_);
486
487 {
488 grpc::internal::MutexLock lock(&start_mu_);
489
490 if (backlog_.read_ops) {
491 call_.PerformOps(&read_ops_);
492 }
493 if (backlog_.write_ops) {
494 call_.PerformOps(&write_ops_);
495 }
496 if (backlog_.writes_done_ops) {
497 call_.PerformOps(&writes_done_ops_);
498 }
499 call_.PerformOps(&finish_ops_);
500 // The last thing in this critical section is to set started_ so that it
501 // can be used lock-free as well.
502 started_.store(true, std::memory_order_release);
503 }
504 // MaybeFinish outside the lock to make sure that destruction of this object
505 // doesn't take place while holding the lock (which would cause the lock to
506 // be released after destruction)
507 this->MaybeFinish(/*from_reaction=*/false);
508 }
509
Read(Response * msg)510 void Read(Response* msg) override {
511 read_ops_.RecvMessage(msg);
512 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
513 if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
514 grpc::internal::MutexLock lock(&start_mu_);
515 if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
516 backlog_.read_ops = true;
517 return;
518 }
519 }
520 call_.PerformOps(&read_ops_);
521 }
522
Write(const Request * msg,grpc::WriteOptions options)523 void Write(const Request* msg, grpc::WriteOptions options)
524 ABSL_LOCKS_EXCLUDED(start_mu_) override {
525 if (options.is_last_message()) {
526 options.set_buffer_hint();
527 write_ops_.ClientSendClose();
528 }
529 // TODO(vjpai): don't assert
530 GPR_ASSERT(write_ops_.SendMessagePtr(msg, options).ok());
531 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
532 if (GPR_UNLIKELY(corked_write_needed_)) {
533 write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
534 context_->initial_metadata_flags());
535 corked_write_needed_ = false;
536 }
537
538 if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
539 grpc::internal::MutexLock lock(&start_mu_);
540 if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
541 backlog_.write_ops = true;
542 return;
543 }
544 }
545 call_.PerformOps(&write_ops_);
546 }
WritesDone()547 void WritesDone() ABSL_LOCKS_EXCLUDED(start_mu_) override {
548 writes_done_ops_.ClientSendClose();
549 writes_done_tag_.Set(
550 call_.call(),
551 [this](bool ok) {
552 reactor_->OnWritesDoneDone(ok);
553 MaybeFinish(/*from_reaction=*/true);
554 },
555 &writes_done_ops_, /*can_inline=*/false);
556 writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
557 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
558 if (GPR_UNLIKELY(corked_write_needed_)) {
559 writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
560 context_->initial_metadata_flags());
561 corked_write_needed_ = false;
562 }
563 if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
564 grpc::internal::MutexLock lock(&start_mu_);
565 if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
566 backlog_.writes_done_ops = true;
567 return;
568 }
569 }
570 call_.PerformOps(&writes_done_ops_);
571 }
572
AddHold(int holds)573 void AddHold(int holds) override {
574 callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
575 }
RemoveHold()576 void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
577
578 private:
579 friend class ClientCallbackReaderWriterFactory<Request, Response>;
580
ClientCallbackReaderWriterImpl(grpc::internal::Call call,grpc::ClientContext * context,ClientBidiReactor<Request,Response> * reactor)581 ClientCallbackReaderWriterImpl(grpc::internal::Call call,
582 grpc::ClientContext* context,
583 ClientBidiReactor<Request, Response>* reactor)
584 : context_(context),
585 call_(call),
586 reactor_(reactor),
587 start_corked_(context_->initial_metadata_corked_),
588 corked_write_needed_(start_corked_) {
589 this->BindReactor(reactor);
590
591 // Set up the unchanging parts of the start, read, and write tags and ops.
592 start_tag_.Set(
593 call_.call(),
594 [this](bool ok) {
595 reactor_->OnReadInitialMetadataDone(
596 ok && !reactor_->InternalTrailersOnly(call_.call()));
597 MaybeFinish(/*from_reaction=*/true);
598 },
599 &start_ops_, /*can_inline=*/false);
600 start_ops_.RecvInitialMetadata(context_);
601 start_ops_.set_core_cq_tag(&start_tag_);
602
603 write_tag_.Set(
604 call_.call(),
605 [this](bool ok) {
606 reactor_->OnWriteDone(ok);
607 MaybeFinish(/*from_reaction=*/true);
608 },
609 &write_ops_, /*can_inline=*/false);
610 write_ops_.set_core_cq_tag(&write_tag_);
611
612 read_tag_.Set(
613 call_.call(),
614 [this](bool ok) {
615 reactor_->OnReadDone(ok);
616 MaybeFinish(/*from_reaction=*/true);
617 },
618 &read_ops_, /*can_inline=*/false);
619 read_ops_.set_core_cq_tag(&read_tag_);
620
621 // Also set up the Finish tag and op set.
622 finish_tag_.Set(
623 call_.call(),
624 [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
625 &finish_ops_,
626 /*can_inline=*/false);
627 finish_ops_.ClientRecvStatus(context_, &finish_status_);
628 finish_ops_.set_core_cq_tag(&finish_tag_);
629 }
630
631 // MaybeFinish can be called from reactions or from user-initiated operations
632 // like StartCall or RemoveHold. If this is the last operation or hold on this
633 // object, it will invoke the OnDone reaction. If MaybeFinish was called from
634 // a reaction, it can call OnDone directly. If not, it would need to schedule
635 // OnDone onto an executor thread to avoid the possibility of deadlocking with
636 // any locks in the user code that invoked it.
MaybeFinish(bool from_reaction)637 void MaybeFinish(bool from_reaction) {
638 if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
639 1, std::memory_order_acq_rel) == 1)) {
640 grpc::Status s = std::move(finish_status_);
641 auto* reactor = reactor_;
642 auto* call = call_.call();
643 this->~ClientCallbackReaderWriterImpl();
644 if (GPR_LIKELY(from_reaction)) {
645 grpc_call_unref(call);
646 reactor->OnDone(s);
647 } else {
648 grpc_call_run_in_event_engine(
649 call, [reactor, s = std::move(s)]() { reactor->OnDone(s); });
650 grpc_call_unref(call);
651 }
652 }
653 }
654
655 grpc::ClientContext* const context_;
656 grpc::internal::Call call_;
657 ClientBidiReactor<Request, Response>* const reactor_;
658
659 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
660 grpc::internal::CallOpRecvInitialMetadata>
661 start_ops_;
662 grpc::internal::CallbackWithSuccessTag start_tag_;
663 const bool start_corked_;
664 bool corked_write_needed_; // no lock needed since only accessed in
665 // Write/WritesDone which cannot be concurrent
666
667 grpc::internal::CallOpSet<grpc::internal::CallOpClientRecvStatus> finish_ops_;
668 grpc::internal::CallbackWithSuccessTag finish_tag_;
669 grpc::Status finish_status_;
670
671 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
672 grpc::internal::CallOpSendMessage,
673 grpc::internal::CallOpClientSendClose>
674 write_ops_;
675 grpc::internal::CallbackWithSuccessTag write_tag_;
676
677 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
678 grpc::internal::CallOpClientSendClose>
679 writes_done_ops_;
680 grpc::internal::CallbackWithSuccessTag writes_done_tag_;
681
682 grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<Response>>
683 read_ops_;
684 grpc::internal::CallbackWithSuccessTag read_tag_;
685
686 struct StartCallBacklog {
687 bool write_ops = false;
688 bool writes_done_ops = false;
689 bool read_ops = false;
690 };
691 StartCallBacklog backlog_ ABSL_GUARDED_BY(start_mu_);
692
693 // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish
694 std::atomic<intptr_t> callbacks_outstanding_{3};
695 std::atomic_bool started_{false};
696 grpc::internal::Mutex start_mu_;
697 };
698
699 template <class Request, class Response>
700 class ClientCallbackReaderWriterFactory {
701 public:
Create(grpc::ChannelInterface * channel,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,ClientBidiReactor<Request,Response> * reactor)702 static void Create(grpc::ChannelInterface* channel,
703 const grpc::internal::RpcMethod& method,
704 grpc::ClientContext* context,
705 ClientBidiReactor<Request, Response>* reactor) {
706 grpc::internal::Call call =
707 channel->CreateCall(method, context, channel->CallbackCQ());
708
709 grpc_call_ref(call.call());
710 new (grpc_call_arena_alloc(
711 call.call(), sizeof(ClientCallbackReaderWriterImpl<Request, Response>)))
712 ClientCallbackReaderWriterImpl<Request, Response>(call, context,
713 reactor);
714 }
715 };
716
717 template <class Response>
718 class ClientCallbackReaderImpl : public ClientCallbackReader<Response> {
719 public:
720 // always allocated against a call arena, no memory free required
delete(void *,std::size_t size)721 static void operator delete(void* /*ptr*/, std::size_t size) {
722 GPR_ASSERT(size == sizeof(ClientCallbackReaderImpl));
723 }
724
725 // This operator should never be called as the memory should be freed as part
726 // of the arena destruction. It only exists to provide a matching operator
727 // delete to the operator new so that some compilers will not complain (see
728 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
729 // there are no tests catching the compiler warning.
delete(void *,void *)730 static void operator delete(void*, void*) { GPR_ASSERT(false); }
731
StartCall()732 void StartCall() override {
733 // This call initiates two batches, plus any backlog, each with a callback
734 // 1. Send initial metadata (unless corked) + recv initial metadata
735 // 2. Any backlog
736 // 3. Recv trailing metadata
737
738 start_tag_.Set(
739 call_.call(),
740 [this](bool ok) {
741 reactor_->OnReadInitialMetadataDone(
742 ok && !reactor_->InternalTrailersOnly(call_.call()));
743 MaybeFinish(/*from_reaction=*/true);
744 },
745 &start_ops_, /*can_inline=*/false);
746 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
747 context_->initial_metadata_flags());
748 start_ops_.RecvInitialMetadata(context_);
749 start_ops_.set_core_cq_tag(&start_tag_);
750 call_.PerformOps(&start_ops_);
751
752 // Also set up the read tag so it doesn't have to be set up each time
753 read_tag_.Set(
754 call_.call(),
755 [this](bool ok) {
756 reactor_->OnReadDone(ok);
757 MaybeFinish(/*from_reaction=*/true);
758 },
759 &read_ops_, /*can_inline=*/false);
760 read_ops_.set_core_cq_tag(&read_tag_);
761
762 {
763 grpc::internal::MutexLock lock(&start_mu_);
764 if (backlog_.read_ops) {
765 call_.PerformOps(&read_ops_);
766 }
767 started_.store(true, std::memory_order_release);
768 }
769
770 finish_tag_.Set(
771 call_.call(),
772 [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
773 &finish_ops_, /*can_inline=*/false);
774 finish_ops_.ClientRecvStatus(context_, &finish_status_);
775 finish_ops_.set_core_cq_tag(&finish_tag_);
776 call_.PerformOps(&finish_ops_);
777 }
778
Read(Response * msg)779 void Read(Response* msg) override {
780 read_ops_.RecvMessage(msg);
781 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
782 if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
783 grpc::internal::MutexLock lock(&start_mu_);
784 if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
785 backlog_.read_ops = true;
786 return;
787 }
788 }
789 call_.PerformOps(&read_ops_);
790 }
791
AddHold(int holds)792 void AddHold(int holds) override {
793 callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
794 }
RemoveHold()795 void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
796
797 private:
798 friend class ClientCallbackReaderFactory<Response>;
799
800 template <class Request>
ClientCallbackReaderImpl(grpc::internal::Call call,grpc::ClientContext * context,Request * request,ClientReadReactor<Response> * reactor)801 ClientCallbackReaderImpl(grpc::internal::Call call,
802 grpc::ClientContext* context, Request* request,
803 ClientReadReactor<Response>* reactor)
804 : context_(context), call_(call), reactor_(reactor) {
805 this->BindReactor(reactor);
806 // TODO(vjpai): don't assert
807 GPR_ASSERT(start_ops_.SendMessagePtr(request).ok());
808 start_ops_.ClientSendClose();
809 }
810
811 // MaybeFinish behaves as in ClientCallbackReaderWriterImpl.
MaybeFinish(bool from_reaction)812 void MaybeFinish(bool from_reaction) {
813 if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
814 1, std::memory_order_acq_rel) == 1)) {
815 grpc::Status s = std::move(finish_status_);
816 auto* reactor = reactor_;
817 auto* call = call_.call();
818 this->~ClientCallbackReaderImpl();
819 if (GPR_LIKELY(from_reaction)) {
820 grpc_call_unref(call);
821 reactor->OnDone(s);
822 } else {
823 grpc_call_run_in_event_engine(
824 call, [reactor, s = std::move(s)]() { reactor->OnDone(s); });
825 grpc_call_unref(call);
826 }
827 }
828 }
829
830 grpc::ClientContext* const context_;
831 grpc::internal::Call call_;
832 ClientReadReactor<Response>* const reactor_;
833
834 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
835 grpc::internal::CallOpSendMessage,
836 grpc::internal::CallOpClientSendClose,
837 grpc::internal::CallOpRecvInitialMetadata>
838 start_ops_;
839 grpc::internal::CallbackWithSuccessTag start_tag_;
840
841 grpc::internal::CallOpSet<grpc::internal::CallOpClientRecvStatus> finish_ops_;
842 grpc::internal::CallbackWithSuccessTag finish_tag_;
843 grpc::Status finish_status_;
844
845 grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<Response>>
846 read_ops_;
847 grpc::internal::CallbackWithSuccessTag read_tag_;
848
849 struct StartCallBacklog {
850 bool read_ops = false;
851 };
852 StartCallBacklog backlog_ ABSL_GUARDED_BY(start_mu_);
853
854 // Minimum of 2 callbacks to pre-register for start and finish
855 std::atomic<intptr_t> callbacks_outstanding_{2};
856 std::atomic_bool started_{false};
857 grpc::internal::Mutex start_mu_;
858 };
859
860 template <class Response>
861 class ClientCallbackReaderFactory {
862 public:
863 template <class Request>
Create(grpc::ChannelInterface * channel,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,const Request * request,ClientReadReactor<Response> * reactor)864 static void Create(grpc::ChannelInterface* channel,
865 const grpc::internal::RpcMethod& method,
866 grpc::ClientContext* context, const Request* request,
867 ClientReadReactor<Response>* reactor) {
868 grpc::internal::Call call =
869 channel->CreateCall(method, context, channel->CallbackCQ());
870
871 grpc_call_ref(call.call());
872 new (grpc_call_arena_alloc(call.call(),
873 sizeof(ClientCallbackReaderImpl<Response>)))
874 ClientCallbackReaderImpl<Response>(call, context, request, reactor);
875 }
876 };
877
878 template <class Request>
879 class ClientCallbackWriterImpl : public ClientCallbackWriter<Request> {
880 public:
881 // always allocated against a call arena, no memory free required
delete(void *,std::size_t size)882 static void operator delete(void* /*ptr*/, std::size_t size) {
883 GPR_ASSERT(size == sizeof(ClientCallbackWriterImpl));
884 }
885
886 // This operator should never be called as the memory should be freed as part
887 // of the arena destruction. It only exists to provide a matching operator
888 // delete to the operator new so that some compilers will not complain (see
889 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
890 // there are no tests catching the compiler warning.
delete(void *,void *)891 static void operator delete(void*, void*) { GPR_ASSERT(false); }
892
StartCall()893 void StartCall() ABSL_LOCKS_EXCLUDED(start_mu_) override {
894 // This call initiates two batches, plus any backlog, each with a callback
895 // 1. Send initial metadata (unless corked) + recv initial metadata
896 // 2. Any backlog
897 // 3. Recv trailing metadata
898
899 if (!start_corked_) {
900 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
901 context_->initial_metadata_flags());
902 }
903 call_.PerformOps(&start_ops_);
904
905 {
906 grpc::internal::MutexLock lock(&start_mu_);
907
908 if (backlog_.write_ops) {
909 call_.PerformOps(&write_ops_);
910 }
911 if (backlog_.writes_done_ops) {
912 call_.PerformOps(&writes_done_ops_);
913 }
914 call_.PerformOps(&finish_ops_);
915 // The last thing in this critical section is to set started_ so that it
916 // can be used lock-free as well.
917 started_.store(true, std::memory_order_release);
918 }
919 // MaybeFinish outside the lock to make sure that destruction of this object
920 // doesn't take place while holding the lock (which would cause the lock to
921 // be released after destruction)
922 this->MaybeFinish(/*from_reaction=*/false);
923 }
924
Write(const Request * msg,grpc::WriteOptions options)925 void Write(const Request* msg, grpc::WriteOptions options)
926 ABSL_LOCKS_EXCLUDED(start_mu_) override {
927 if (GPR_UNLIKELY(options.is_last_message())) {
928 options.set_buffer_hint();
929 write_ops_.ClientSendClose();
930 }
931 // TODO(vjpai): don't assert
932 GPR_ASSERT(write_ops_.SendMessagePtr(msg, options).ok());
933 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
934
935 if (GPR_UNLIKELY(corked_write_needed_)) {
936 write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
937 context_->initial_metadata_flags());
938 corked_write_needed_ = false;
939 }
940
941 if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
942 grpc::internal::MutexLock lock(&start_mu_);
943 if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
944 backlog_.write_ops = true;
945 return;
946 }
947 }
948 call_.PerformOps(&write_ops_);
949 }
950
WritesDone()951 void WritesDone() ABSL_LOCKS_EXCLUDED(start_mu_) override {
952 writes_done_ops_.ClientSendClose();
953 writes_done_tag_.Set(
954 call_.call(),
955 [this](bool ok) {
956 reactor_->OnWritesDoneDone(ok);
957 MaybeFinish(/*from_reaction=*/true);
958 },
959 &writes_done_ops_, /*can_inline=*/false);
960 writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
961 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
962
963 if (GPR_UNLIKELY(corked_write_needed_)) {
964 writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
965 context_->initial_metadata_flags());
966 corked_write_needed_ = false;
967 }
968
969 if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
970 grpc::internal::MutexLock lock(&start_mu_);
971 if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
972 backlog_.writes_done_ops = true;
973 return;
974 }
975 }
976 call_.PerformOps(&writes_done_ops_);
977 }
978
AddHold(int holds)979 void AddHold(int holds) override {
980 callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
981 }
RemoveHold()982 void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
983
984 private:
985 friend class ClientCallbackWriterFactory<Request>;
986
987 template <class Response>
ClientCallbackWriterImpl(grpc::internal::Call call,grpc::ClientContext * context,Response * response,ClientWriteReactor<Request> * reactor)988 ClientCallbackWriterImpl(grpc::internal::Call call,
989 grpc::ClientContext* context, Response* response,
990 ClientWriteReactor<Request>* reactor)
991 : context_(context),
992 call_(call),
993 reactor_(reactor),
994 start_corked_(context_->initial_metadata_corked_),
995 corked_write_needed_(start_corked_) {
996 this->BindReactor(reactor);
997
998 // Set up the unchanging parts of the start and write tags and ops.
999 start_tag_.Set(
1000 call_.call(),
1001 [this](bool ok) {
1002 reactor_->OnReadInitialMetadataDone(
1003 ok && !reactor_->InternalTrailersOnly(call_.call()));
1004 MaybeFinish(/*from_reaction=*/true);
1005 },
1006 &start_ops_, /*can_inline=*/false);
1007 start_ops_.RecvInitialMetadata(context_);
1008 start_ops_.set_core_cq_tag(&start_tag_);
1009
1010 write_tag_.Set(
1011 call_.call(),
1012 [this](bool ok) {
1013 reactor_->OnWriteDone(ok);
1014 MaybeFinish(/*from_reaction=*/true);
1015 },
1016 &write_ops_, /*can_inline=*/false);
1017 write_ops_.set_core_cq_tag(&write_tag_);
1018
1019 // Also set up the Finish tag and op set.
1020 finish_ops_.RecvMessage(response);
1021 finish_ops_.AllowNoMessage();
1022 finish_tag_.Set(
1023 call_.call(),
1024 [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
1025 &finish_ops_,
1026 /*can_inline=*/false);
1027 finish_ops_.ClientRecvStatus(context_, &finish_status_);
1028 finish_ops_.set_core_cq_tag(&finish_tag_);
1029 }
1030
1031 // MaybeFinish behaves as in ClientCallbackReaderWriterImpl.
MaybeFinish(bool from_reaction)1032 void MaybeFinish(bool from_reaction) {
1033 if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1034 1, std::memory_order_acq_rel) == 1)) {
1035 grpc::Status s = std::move(finish_status_);
1036 auto* reactor = reactor_;
1037 auto* call = call_.call();
1038 this->~ClientCallbackWriterImpl();
1039 if (GPR_LIKELY(from_reaction)) {
1040 grpc_call_unref(call);
1041 reactor->OnDone(s);
1042 } else {
1043 grpc_call_run_in_event_engine(
1044 call, [reactor, s = std::move(s)]() { reactor->OnDone(s); });
1045 grpc_call_unref(call);
1046 }
1047 }
1048 }
1049
1050 grpc::ClientContext* const context_;
1051 grpc::internal::Call call_;
1052 ClientWriteReactor<Request>* const reactor_;
1053
1054 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
1055 grpc::internal::CallOpRecvInitialMetadata>
1056 start_ops_;
1057 grpc::internal::CallbackWithSuccessTag start_tag_;
1058 const bool start_corked_;
1059 bool corked_write_needed_; // no lock needed since only accessed in
1060 // Write/WritesDone which cannot be concurrent
1061
1062 grpc::internal::CallOpSet<grpc::internal::CallOpGenericRecvMessage,
1063 grpc::internal::CallOpClientRecvStatus>
1064 finish_ops_;
1065 grpc::internal::CallbackWithSuccessTag finish_tag_;
1066 grpc::Status finish_status_;
1067
1068 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
1069 grpc::internal::CallOpSendMessage,
1070 grpc::internal::CallOpClientSendClose>
1071 write_ops_;
1072 grpc::internal::CallbackWithSuccessTag write_tag_;
1073
1074 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
1075 grpc::internal::CallOpClientSendClose>
1076 writes_done_ops_;
1077 grpc::internal::CallbackWithSuccessTag writes_done_tag_;
1078
1079 struct StartCallBacklog {
1080 bool write_ops = false;
1081 bool writes_done_ops = false;
1082 };
1083 StartCallBacklog backlog_ ABSL_GUARDED_BY(start_mu_);
1084
1085 // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish
1086 std::atomic<intptr_t> callbacks_outstanding_{3};
1087 std::atomic_bool started_{false};
1088 grpc::internal::Mutex start_mu_;
1089 };
1090
1091 template <class Request>
1092 class ClientCallbackWriterFactory {
1093 public:
1094 template <class Response>
Create(grpc::ChannelInterface * channel,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,Response * response,ClientWriteReactor<Request> * reactor)1095 static void Create(grpc::ChannelInterface* channel,
1096 const grpc::internal::RpcMethod& method,
1097 grpc::ClientContext* context, Response* response,
1098 ClientWriteReactor<Request>* reactor) {
1099 grpc::internal::Call call =
1100 channel->CreateCall(method, context, channel->CallbackCQ());
1101
1102 grpc_call_ref(call.call());
1103 new (grpc_call_arena_alloc(call.call(),
1104 sizeof(ClientCallbackWriterImpl<Request>)))
1105 ClientCallbackWriterImpl<Request>(call, context, response, reactor);
1106 }
1107 };
1108
1109 class ClientCallbackUnaryImpl final : public ClientCallbackUnary {
1110 public:
1111 // always allocated against a call arena, no memory free required
delete(void *,std::size_t size)1112 static void operator delete(void* /*ptr*/, std::size_t size) {
1113 GPR_ASSERT(size == sizeof(ClientCallbackUnaryImpl));
1114 }
1115
1116 // This operator should never be called as the memory should be freed as part
1117 // of the arena destruction. It only exists to provide a matching operator
1118 // delete to the operator new so that some compilers will not complain (see
1119 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
1120 // there are no tests catching the compiler warning.
delete(void *,void *)1121 static void operator delete(void*, void*) { GPR_ASSERT(false); }
1122
StartCall()1123 void StartCall() override {
1124 // This call initiates two batches, each with a callback
1125 // 1. Send initial metadata + write + writes done + recv initial metadata
1126 // 2. Read message, recv trailing metadata
1127
1128 start_tag_.Set(
1129 call_.call(),
1130 [this](bool ok) {
1131 reactor_->OnReadInitialMetadataDone(
1132 ok && !reactor_->InternalTrailersOnly(call_.call()));
1133 MaybeFinish();
1134 },
1135 &start_ops_, /*can_inline=*/false);
1136 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
1137 context_->initial_metadata_flags());
1138 start_ops_.RecvInitialMetadata(context_);
1139 start_ops_.set_core_cq_tag(&start_tag_);
1140 call_.PerformOps(&start_ops_);
1141
1142 finish_tag_.Set(
1143 call_.call(), [this](bool /*ok*/) { MaybeFinish(); }, &finish_ops_,
1144 /*can_inline=*/false);
1145 finish_ops_.ClientRecvStatus(context_, &finish_status_);
1146 finish_ops_.set_core_cq_tag(&finish_tag_);
1147 call_.PerformOps(&finish_ops_);
1148 }
1149
1150 private:
1151 friend class ClientCallbackUnaryFactory;
1152
1153 template <class Request, class Response>
ClientCallbackUnaryImpl(grpc::internal::Call call,grpc::ClientContext * context,Request * request,Response * response,ClientUnaryReactor * reactor)1154 ClientCallbackUnaryImpl(grpc::internal::Call call,
1155 grpc::ClientContext* context, Request* request,
1156 Response* response, ClientUnaryReactor* reactor)
1157 : context_(context), call_(call), reactor_(reactor) {
1158 this->BindReactor(reactor);
1159 // TODO(vjpai): don't assert
1160 GPR_ASSERT(start_ops_.SendMessagePtr(request).ok());
1161 start_ops_.ClientSendClose();
1162 finish_ops_.RecvMessage(response);
1163 finish_ops_.AllowNoMessage();
1164 }
1165
1166 // In the unary case, MaybeFinish is only ever invoked from a
1167 // library-initiated reaction, so it will just directly call OnDone if this is
1168 // the last reaction for this RPC.
MaybeFinish()1169 void MaybeFinish() {
1170 if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1171 1, std::memory_order_acq_rel) == 1)) {
1172 grpc::Status s = std::move(finish_status_);
1173 auto* reactor = reactor_;
1174 auto* call = call_.call();
1175 this->~ClientCallbackUnaryImpl();
1176 grpc_call_unref(call);
1177 reactor->OnDone(s);
1178 }
1179 }
1180
1181 grpc::ClientContext* const context_;
1182 grpc::internal::Call call_;
1183 ClientUnaryReactor* const reactor_;
1184
1185 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
1186 grpc::internal::CallOpSendMessage,
1187 grpc::internal::CallOpClientSendClose,
1188 grpc::internal::CallOpRecvInitialMetadata>
1189 start_ops_;
1190 grpc::internal::CallbackWithSuccessTag start_tag_;
1191
1192 grpc::internal::CallOpSet<grpc::internal::CallOpGenericRecvMessage,
1193 grpc::internal::CallOpClientRecvStatus>
1194 finish_ops_;
1195 grpc::internal::CallbackWithSuccessTag finish_tag_;
1196 grpc::Status finish_status_;
1197
1198 // This call will have 2 callbacks: start and finish
1199 std::atomic<intptr_t> callbacks_outstanding_{2};
1200 };
1201
1202 class ClientCallbackUnaryFactory {
1203 public:
1204 template <class Request, class Response, class BaseRequest = Request,
1205 class BaseResponse = Response>
Create(grpc::ChannelInterface * channel,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,const Request * request,Response * response,ClientUnaryReactor * reactor)1206 static void Create(grpc::ChannelInterface* channel,
1207 const grpc::internal::RpcMethod& method,
1208 grpc::ClientContext* context, const Request* request,
1209 Response* response, ClientUnaryReactor* reactor) {
1210 grpc::internal::Call call =
1211 channel->CreateCall(method, context, channel->CallbackCQ());
1212
1213 grpc_call_ref(call.call());
1214
1215 new (grpc_call_arena_alloc(call.call(), sizeof(ClientCallbackUnaryImpl)))
1216 ClientCallbackUnaryImpl(call, context,
1217 static_cast<const BaseRequest*>(request),
1218 static_cast<BaseResponse*>(response), reactor);
1219 }
1220 };
1221
1222 } // namespace internal
1223 } // namespace grpc
1224
1225 #endif // GRPCPP_SUPPORT_CLIENT_CALLBACK_H
1226