xref: /aosp_15_r20/external/grpc-grpc/include/grpcpp/support/server_callback.h (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2018 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPCPP_SUPPORT_SERVER_CALLBACK_H
20 #define GRPCPP_SUPPORT_SERVER_CALLBACK_H
21 
22 #include <atomic>
23 #include <functional>
24 #include <type_traits>
25 
26 #include "absl/functional/any_invocable.h"
27 
28 #include <grpc/impl/call.h>
29 #include <grpcpp/impl/call.h>
30 #include <grpcpp/impl/call_op_set.h>
31 #include <grpcpp/impl/sync.h>
32 #include <grpcpp/support/callback_common.h>
33 #include <grpcpp/support/config.h>
34 #include <grpcpp/support/message_allocator.h>
35 #include <grpcpp/support/status.h>
36 
37 namespace grpc {
38 
39 // Declare base class of all reactors as internal
40 namespace internal {
41 
42 // Forward declarations
43 template <class Request, class Response>
44 class CallbackUnaryHandler;
45 template <class Request, class Response>
46 class CallbackClientStreamingHandler;
47 template <class Request, class Response>
48 class CallbackServerStreamingHandler;
49 template <class Request, class Response>
50 class CallbackBidiHandler;
51 
52 class ServerReactor {
53  public:
54   virtual ~ServerReactor() = default;
55   virtual void OnDone() = 0;
56   virtual void OnCancel() = 0;
57 
58   // The following is not API. It is for internal use only and specifies whether
59   // all reactions of this Reactor can be run without an extra executor
60   // scheduling. This should only be used for internally-defined reactors with
61   // trivial reactions.
InternalInlineable()62   virtual bool InternalInlineable() { return false; }
63 
64  private:
65   template <class Request, class Response>
66   friend class CallbackUnaryHandler;
67   template <class Request, class Response>
68   friend class CallbackClientStreamingHandler;
69   template <class Request, class Response>
70   friend class CallbackServerStreamingHandler;
71   template <class Request, class Response>
72   friend class CallbackBidiHandler;
73 };
74 
75 /// The base class of ServerCallbackUnary etc.
76 class ServerCallbackCall {
77  public:
~ServerCallbackCall()78   virtual ~ServerCallbackCall() {}
79 
80   // This object is responsible for tracking when it is safe to call OnDone and
81   // OnCancel. OnDone should not be called until the method handler is complete,
82   // Finish has been called, the ServerContext CompletionOp (which tracks
83   // cancellation or successful completion) has completed, and all outstanding
84   // Read/Write actions have seen their reactions. OnCancel should not be called
85   // until after the method handler is done and the RPC has completed with a
86   // cancellation. This is tracked by counting how many of these conditions have
87   // been met and calling OnCancel when none remain unmet.
88 
89   // Public versions of MaybeDone: one where we don't know the reactor in
90   // advance (used for the ServerContext CompletionOp), and one for where we
91   // know the inlineability of the OnDone reaction. You should set the inline
92   // flag to true if either the Reactor is InternalInlineable() or if this
93   // callback is already being forced to run dispatched to an executor
94   // (typically because it contains additional work than just the MaybeDone).
95 
MaybeDone()96   void MaybeDone() {
97     if (GPR_UNLIKELY(Unref() == 1)) {
98       ScheduleOnDone(reactor()->InternalInlineable());
99     }
100   }
101 
MaybeDone(bool inline_ondone)102   void MaybeDone(bool inline_ondone) {
103     if (GPR_UNLIKELY(Unref() == 1)) {
104       ScheduleOnDone(inline_ondone);
105     }
106   }
107 
108   // Fast version called with known reactor passed in, used from derived
109   // classes, typically in non-cancel case
MaybeCallOnCancel(ServerReactor * reactor)110   void MaybeCallOnCancel(ServerReactor* reactor) {
111     if (GPR_UNLIKELY(UnblockCancellation())) {
112       CallOnCancel(reactor);
113     }
114   }
115 
116   // Slower version called from object that doesn't know the reactor a priori
117   // (such as the ServerContext CompletionOp which is formed before the
118   // reactor). This is used in cancel cases only, so it's ok to be slower and
119   // invoke a virtual function.
MaybeCallOnCancel()120   void MaybeCallOnCancel() {
121     if (GPR_UNLIKELY(UnblockCancellation())) {
122       CallOnCancel(reactor());
123     }
124   }
125 
126  protected:
127   /// Increases the reference count
Ref()128   void Ref() { callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); }
129 
130  private:
131   virtual ServerReactor* reactor() = 0;
132 
133   virtual grpc_call* call() = 0;
134 
RunAsync(absl::AnyInvocable<void ()> cb)135   virtual void RunAsync(absl::AnyInvocable<void()> cb) {
136     grpc_call_run_in_event_engine(call(), std::move(cb));
137   }
138 
139   // CallOnDone performs the work required at completion of the RPC: invoking
140   // the OnDone function and doing all necessary cleanup. This function is only
141   // ever invoked on a fully-Unref'fed ServerCallbackCall.
142   virtual void CallOnDone() = 0;
143 
144   // If the OnDone reaction is inlineable, execute it inline. Otherwise send it
145   // to an executor.
146   void ScheduleOnDone(bool inline_ondone);
147 
148   // If the OnCancel reaction is inlineable, execute it inline. Otherwise send
149   // it to an executor.
150   void CallOnCancel(ServerReactor* reactor);
151 
152   // Implement the cancellation constraint counter. Return true if OnCancel
153   // should be called, false otherwise.
UnblockCancellation()154   bool UnblockCancellation() {
155     return on_cancel_conditions_remaining_.fetch_sub(
156                1, std::memory_order_acq_rel) == 1;
157   }
158 
159   /// Decreases the reference count and returns the previous value
Unref()160   int Unref() {
161     return callbacks_outstanding_.fetch_sub(1, std::memory_order_acq_rel);
162   }
163 
164   std::atomic_int on_cancel_conditions_remaining_{2};
165   std::atomic_int callbacks_outstanding_{
166       3};  // reserve for start, Finish, and CompletionOp
167 };
168 
169 template <class Request, class Response>
170 class DefaultMessageHolder : public MessageHolder<Request, Response> {
171  public:
DefaultMessageHolder()172   DefaultMessageHolder() {
173     this->set_request(&request_obj_);
174     this->set_response(&response_obj_);
175   }
Release()176   void Release() override {
177     // the object is allocated in the call arena.
178     this->~DefaultMessageHolder<Request, Response>();
179   }
180 
181  private:
182   Request request_obj_;
183   Response response_obj_;
184 };
185 
186 }  // namespace internal
187 
188 // Forward declarations
189 class ServerUnaryReactor;
190 template <class Request>
191 class ServerReadReactor;
192 template <class Response>
193 class ServerWriteReactor;
194 template <class Request, class Response>
195 class ServerBidiReactor;
196 
197 // NOTE: The actual call/stream object classes are provided as API only to
198 // support mocking. There are no implementations of these class interfaces in
199 // the API.
200 class ServerCallbackUnary : public internal::ServerCallbackCall {
201  public:
~ServerCallbackUnary()202   ~ServerCallbackUnary() override {}
203   virtual void Finish(grpc::Status s) = 0;
204   virtual void SendInitialMetadata() = 0;
205 
206  protected:
207   // Use a template rather than explicitly specifying ServerUnaryReactor to
208   // delay binding and avoid a circular forward declaration issue
209   template <class Reactor>
BindReactor(Reactor * reactor)210   void BindReactor(Reactor* reactor) {
211     reactor->InternalBindCall(this);
212   }
213 };
214 
215 template <class Request>
216 class ServerCallbackReader : public internal::ServerCallbackCall {
217  public:
~ServerCallbackReader()218   ~ServerCallbackReader() override {}
219   virtual void Finish(grpc::Status s) = 0;
220   virtual void SendInitialMetadata() = 0;
221   virtual void Read(Request* msg) = 0;
222 
223  protected:
BindReactor(ServerReadReactor<Request> * reactor)224   void BindReactor(ServerReadReactor<Request>* reactor) {
225     reactor->InternalBindReader(this);
226   }
227 };
228 
229 template <class Response>
230 class ServerCallbackWriter : public internal::ServerCallbackCall {
231  public:
~ServerCallbackWriter()232   ~ServerCallbackWriter() override {}
233 
234   virtual void Finish(grpc::Status s) = 0;
235   virtual void SendInitialMetadata() = 0;
236   virtual void Write(const Response* msg, grpc::WriteOptions options) = 0;
237   virtual void WriteAndFinish(const Response* msg, grpc::WriteOptions options,
238                               grpc::Status s) = 0;
239 
240  protected:
BindReactor(ServerWriteReactor<Response> * reactor)241   void BindReactor(ServerWriteReactor<Response>* reactor) {
242     reactor->InternalBindWriter(this);
243   }
244 };
245 
246 template <class Request, class Response>
247 class ServerCallbackReaderWriter : public internal::ServerCallbackCall {
248  public:
~ServerCallbackReaderWriter()249   ~ServerCallbackReaderWriter() override {}
250 
251   virtual void Finish(grpc::Status s) = 0;
252   virtual void SendInitialMetadata() = 0;
253   virtual void Read(Request* msg) = 0;
254   virtual void Write(const Response* msg, grpc::WriteOptions options) = 0;
255   virtual void WriteAndFinish(const Response* msg, grpc::WriteOptions options,
256                               grpc::Status s) = 0;
257 
258  protected:
BindReactor(ServerBidiReactor<Request,Response> * reactor)259   void BindReactor(ServerBidiReactor<Request, Response>* reactor) {
260     reactor->InternalBindStream(this);
261   }
262 };
263 
264 // The following classes are the reactor interfaces that are to be implemented
265 // by the user, returned as the output parameter of the method handler for a
266 // callback method. Note that none of the classes are pure; all reactions have a
267 // default empty reaction so that the user class only needs to override those
268 // reactions that it cares about. The reaction methods will be invoked by the
269 // library in response to the completion of various operations. Reactions must
270 // not include blocking operations (such as blocking I/O, starting synchronous
271 // RPCs, or waiting on condition variables). Reactions may be invoked
272 // concurrently, except that OnDone is called after all others (assuming proper
273 // API usage). The reactor may not be deleted until OnDone is called.
274 
275 /// \a ServerBidiReactor is the interface for a bidirectional streaming RPC.
276 template <class Request, class Response>
277 class ServerBidiReactor : public internal::ServerReactor {
278  public:
279   // NOTE: Initializing stream_ as a constructor initializer rather than a
280   //       default initializer because gcc-4.x requires a copy constructor for
281   //       default initializing a templated member, which isn't ok for atomic.
282   // TODO(vjpai): Switch to default constructor and default initializer when
283   //              gcc-4.x is no longer supported
ServerBidiReactor()284   ServerBidiReactor() : stream_(nullptr) {}
285   ~ServerBidiReactor() override = default;
286 
287   /// Send any initial metadata stored in the RPC context. If not invoked,
288   /// any initial metadata will be passed along with the first Write or the
289   /// Finish (if there are no writes).
StartSendInitialMetadata()290   void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(stream_mu_) {
291     ServerCallbackReaderWriter<Request, Response>* stream =
292         stream_.load(std::memory_order_acquire);
293     if (stream == nullptr) {
294       grpc::internal::MutexLock l(&stream_mu_);
295       stream = stream_.load(std::memory_order_relaxed);
296       if (stream == nullptr) {
297         backlog_.send_initial_metadata_wanted = true;
298         return;
299       }
300     }
301     stream->SendInitialMetadata();
302   }
303 
304   /// Initiate a read operation.
305   ///
306   /// \param[out] req Where to eventually store the read message. Valid when
307   ///                 the library calls OnReadDone
StartRead(Request * req)308   void StartRead(Request* req) ABSL_LOCKS_EXCLUDED(stream_mu_) {
309     ServerCallbackReaderWriter<Request, Response>* stream =
310         stream_.load(std::memory_order_acquire);
311     if (stream == nullptr) {
312       grpc::internal::MutexLock l(&stream_mu_);
313       stream = stream_.load(std::memory_order_relaxed);
314       if (stream == nullptr) {
315         backlog_.read_wanted = req;
316         return;
317       }
318     }
319     stream->Read(req);
320   }
321 
322   /// Initiate a write operation.
323   ///
324   /// \param[in] resp The message to be written. The library does not take
325   ///                 ownership but the caller must ensure that the message is
326   ///                 not deleted or modified until OnWriteDone is called.
StartWrite(const Response * resp)327   void StartWrite(const Response* resp) {
328     StartWrite(resp, grpc::WriteOptions());
329   }
330 
331   /// Initiate a write operation with specified options.
332   ///
333   /// \param[in] resp The message to be written. The library does not take
334   ///                 ownership but the caller must ensure that the message is
335   ///                 not deleted or modified until OnWriteDone is called.
336   /// \param[in] options The WriteOptions to use for writing this message
StartWrite(const Response * resp,grpc::WriteOptions options)337   void StartWrite(const Response* resp, grpc::WriteOptions options)
338       ABSL_LOCKS_EXCLUDED(stream_mu_) {
339     ServerCallbackReaderWriter<Request, Response>* stream =
340         stream_.load(std::memory_order_acquire);
341     if (stream == nullptr) {
342       grpc::internal::MutexLock l(&stream_mu_);
343       stream = stream_.load(std::memory_order_relaxed);
344       if (stream == nullptr) {
345         backlog_.write_wanted = resp;
346         backlog_.write_options_wanted = options;
347         return;
348       }
349     }
350     stream->Write(resp, options);
351   }
352 
353   /// Initiate a write operation with specified options and final RPC Status,
354   /// which also causes any trailing metadata for this RPC to be sent out.
355   /// StartWriteAndFinish is like merging StartWriteLast and Finish into a
356   /// single step. A key difference, though, is that this operation doesn't have
357   /// an OnWriteDone reaction - it is considered complete only when OnDone is
358   /// available. An RPC can either have StartWriteAndFinish or Finish, but not
359   /// both.
360   ///
361   /// \param[in] resp The message to be written. The library does not take
362   ///                 ownership but the caller must ensure that the message is
363   ///                 not deleted or modified until OnDone is called.
364   /// \param[in] options The WriteOptions to use for writing this message
365   /// \param[in] s The status outcome of this RPC
StartWriteAndFinish(const Response * resp,grpc::WriteOptions options,grpc::Status s)366   void StartWriteAndFinish(const Response* resp, grpc::WriteOptions options,
367                            grpc::Status s) ABSL_LOCKS_EXCLUDED(stream_mu_) {
368     ServerCallbackReaderWriter<Request, Response>* stream =
369         stream_.load(std::memory_order_acquire);
370     if (stream == nullptr) {
371       grpc::internal::MutexLock l(&stream_mu_);
372       stream = stream_.load(std::memory_order_relaxed);
373       if (stream == nullptr) {
374         backlog_.write_and_finish_wanted = true;
375         backlog_.write_wanted = resp;
376         backlog_.write_options_wanted = options;
377         backlog_.status_wanted = std::move(s);
378         return;
379       }
380     }
381     stream->WriteAndFinish(resp, options, std::move(s));
382   }
383 
384   /// Inform system of a planned write operation with specified options, but
385   /// allow the library to schedule the actual write coalesced with the writing
386   /// of trailing metadata (which takes place on a Finish call).
387   ///
388   /// \param[in] resp The message to be written. The library does not take
389   ///                 ownership but the caller must ensure that the message is
390   ///                 not deleted or modified until OnWriteDone is called.
391   /// \param[in] options The WriteOptions to use for writing this message
StartWriteLast(const Response * resp,grpc::WriteOptions options)392   void StartWriteLast(const Response* resp, grpc::WriteOptions options) {
393     StartWrite(resp, options.set_last_message());
394   }
395 
396   /// Indicate that the stream is to be finished and the trailing metadata and
397   /// RPC status are to be sent. Every RPC MUST be finished using either Finish
398   /// or StartWriteAndFinish (but not both), even if the RPC is already
399   /// cancelled.
400   ///
401   /// \param[in] s The status outcome of this RPC
Finish(grpc::Status s)402   void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(stream_mu_) {
403     ServerCallbackReaderWriter<Request, Response>* stream =
404         stream_.load(std::memory_order_acquire);
405     if (stream == nullptr) {
406       grpc::internal::MutexLock l(&stream_mu_);
407       stream = stream_.load(std::memory_order_relaxed);
408       if (stream == nullptr) {
409         backlog_.finish_wanted = true;
410         backlog_.status_wanted = std::move(s);
411         return;
412       }
413     }
414     stream->Finish(std::move(s));
415   }
416 
417   /// Notifies the application that an explicit StartSendInitialMetadata
418   /// operation completed. Not used when the sending of initial metadata
419   /// piggybacks onto the first write.
420   ///
421   /// \param[in] ok Was it successful? If false, no further write-side operation
422   ///               will succeed.
OnSendInitialMetadataDone(bool)423   virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
424 
425   /// Notifies the application that a StartRead operation completed.
426   ///
427   /// \param[in] ok Was it successful? If false, no further read-side operation
428   ///               will succeed.
OnReadDone(bool)429   virtual void OnReadDone(bool /*ok*/) {}
430 
431   /// Notifies the application that a StartWrite (or StartWriteLast) operation
432   /// completed.
433   ///
434   /// \param[in] ok Was it successful? If false, no further write-side operation
435   ///               will succeed.
OnWriteDone(bool)436   virtual void OnWriteDone(bool /*ok*/) {}
437 
438   /// Notifies the application that all operations associated with this RPC
439   /// have completed. This is an override (from the internal base class) but
440   /// still abstract, so derived classes MUST override it to be instantiated.
441   void OnDone() override = 0;
442 
443   /// Notifies the application that this RPC has been cancelled. This is an
444   /// override (from the internal base class) but not final, so derived classes
445   /// should override it if they want to take action.
OnCancel()446   void OnCancel() override {}
447 
448  private:
449   friend class ServerCallbackReaderWriter<Request, Response>;
450   // May be overridden by internal implementation details. This is not a public
451   // customization point.
InternalBindStream(ServerCallbackReaderWriter<Request,Response> * stream)452   virtual void InternalBindStream(
453       ServerCallbackReaderWriter<Request, Response>* stream) {
454     grpc::internal::MutexLock l(&stream_mu_);
455 
456     if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
457       stream->SendInitialMetadata();
458     }
459     if (GPR_UNLIKELY(backlog_.read_wanted != nullptr)) {
460       stream->Read(backlog_.read_wanted);
461     }
462     if (GPR_UNLIKELY(backlog_.write_and_finish_wanted)) {
463       stream->WriteAndFinish(backlog_.write_wanted,
464                              std::move(backlog_.write_options_wanted),
465                              std::move(backlog_.status_wanted));
466     } else {
467       if (GPR_UNLIKELY(backlog_.write_wanted != nullptr)) {
468         stream->Write(backlog_.write_wanted,
469                       std::move(backlog_.write_options_wanted));
470       }
471       if (GPR_UNLIKELY(backlog_.finish_wanted)) {
472         stream->Finish(std::move(backlog_.status_wanted));
473       }
474     }
475     // Set stream_ last so that other functions can use it lock-free
476     stream_.store(stream, std::memory_order_release);
477   }
478 
479   grpc::internal::Mutex stream_mu_;
480   // TODO(vjpai): Make stream_or_backlog_ into a std::variant or absl::variant
481   //              once C++17 or ABSL is supported since stream and backlog are
482   //              mutually exclusive in this class. Do likewise with the
483   //              remaining reactor classes and their backlogs as well.
484   std::atomic<ServerCallbackReaderWriter<Request, Response>*> stream_{nullptr};
485   struct PreBindBacklog {
486     bool send_initial_metadata_wanted = false;
487     bool write_and_finish_wanted = false;
488     bool finish_wanted = false;
489     Request* read_wanted = nullptr;
490     const Response* write_wanted = nullptr;
491     grpc::WriteOptions write_options_wanted;
492     grpc::Status status_wanted;
493   };
494   PreBindBacklog backlog_ ABSL_GUARDED_BY(stream_mu_);
495 };
496 
497 /// \a ServerReadReactor is the interface for a client-streaming RPC.
498 template <class Request>
499 class ServerReadReactor : public internal::ServerReactor {
500  public:
ServerReadReactor()501   ServerReadReactor() : reader_(nullptr) {}
502   ~ServerReadReactor() override = default;
503 
504   /// The following operation initiations are exactly like ServerBidiReactor.
StartSendInitialMetadata()505   void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(reader_mu_) {
506     ServerCallbackReader<Request>* reader =
507         reader_.load(std::memory_order_acquire);
508     if (reader == nullptr) {
509       grpc::internal::MutexLock l(&reader_mu_);
510       reader = reader_.load(std::memory_order_relaxed);
511       if (reader == nullptr) {
512         backlog_.send_initial_metadata_wanted = true;
513         return;
514       }
515     }
516     reader->SendInitialMetadata();
517   }
StartRead(Request * req)518   void StartRead(Request* req) ABSL_LOCKS_EXCLUDED(reader_mu_) {
519     ServerCallbackReader<Request>* reader =
520         reader_.load(std::memory_order_acquire);
521     if (reader == nullptr) {
522       grpc::internal::MutexLock l(&reader_mu_);
523       reader = reader_.load(std::memory_order_relaxed);
524       if (reader == nullptr) {
525         backlog_.read_wanted = req;
526         return;
527       }
528     }
529     reader->Read(req);
530   }
Finish(grpc::Status s)531   void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(reader_mu_) {
532     ServerCallbackReader<Request>* reader =
533         reader_.load(std::memory_order_acquire);
534     if (reader == nullptr) {
535       grpc::internal::MutexLock l(&reader_mu_);
536       reader = reader_.load(std::memory_order_relaxed);
537       if (reader == nullptr) {
538         backlog_.finish_wanted = true;
539         backlog_.status_wanted = std::move(s);
540         return;
541       }
542     }
543     reader->Finish(std::move(s));
544   }
545 
546   /// The following notifications are exactly like ServerBidiReactor.
OnSendInitialMetadataDone(bool)547   virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
OnReadDone(bool)548   virtual void OnReadDone(bool /*ok*/) {}
549   void OnDone() override = 0;
OnCancel()550   void OnCancel() override {}
551 
552  private:
553   friend class ServerCallbackReader<Request>;
554 
555   // May be overridden by internal implementation details. This is not a public
556   // customization point.
InternalBindReader(ServerCallbackReader<Request> * reader)557   virtual void InternalBindReader(ServerCallbackReader<Request>* reader)
558       ABSL_LOCKS_EXCLUDED(reader_mu_) {
559     grpc::internal::MutexLock l(&reader_mu_);
560 
561     if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
562       reader->SendInitialMetadata();
563     }
564     if (GPR_UNLIKELY(backlog_.read_wanted != nullptr)) {
565       reader->Read(backlog_.read_wanted);
566     }
567     if (GPR_UNLIKELY(backlog_.finish_wanted)) {
568       reader->Finish(std::move(backlog_.status_wanted));
569     }
570     // Set reader_ last so that other functions can use it lock-free
571     reader_.store(reader, std::memory_order_release);
572   }
573 
574   grpc::internal::Mutex reader_mu_;
575   std::atomic<ServerCallbackReader<Request>*> reader_{nullptr};
576   struct PreBindBacklog {
577     bool send_initial_metadata_wanted = false;
578     bool finish_wanted = false;
579     Request* read_wanted = nullptr;
580     grpc::Status status_wanted;
581   };
582   PreBindBacklog backlog_ ABSL_GUARDED_BY(reader_mu_);
583 };
584 
585 /// \a ServerWriteReactor is the interface for a server-streaming RPC.
586 template <class Response>
587 class ServerWriteReactor : public internal::ServerReactor {
588  public:
ServerWriteReactor()589   ServerWriteReactor() : writer_(nullptr) {}
590   ~ServerWriteReactor() override = default;
591 
592   /// The following operation initiations are exactly like ServerBidiReactor.
StartSendInitialMetadata()593   void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(writer_mu_) {
594     ServerCallbackWriter<Response>* writer =
595         writer_.load(std::memory_order_acquire);
596     if (writer == nullptr) {
597       grpc::internal::MutexLock l(&writer_mu_);
598       writer = writer_.load(std::memory_order_relaxed);
599       if (writer == nullptr) {
600         backlog_.send_initial_metadata_wanted = true;
601         return;
602       }
603     }
604     writer->SendInitialMetadata();
605   }
StartWrite(const Response * resp)606   void StartWrite(const Response* resp) {
607     StartWrite(resp, grpc::WriteOptions());
608   }
StartWrite(const Response * resp,grpc::WriteOptions options)609   void StartWrite(const Response* resp, grpc::WriteOptions options)
610       ABSL_LOCKS_EXCLUDED(writer_mu_) {
611     ServerCallbackWriter<Response>* writer =
612         writer_.load(std::memory_order_acquire);
613     if (writer == nullptr) {
614       grpc::internal::MutexLock l(&writer_mu_);
615       writer = writer_.load(std::memory_order_relaxed);
616       if (writer == nullptr) {
617         backlog_.write_wanted = resp;
618         backlog_.write_options_wanted = options;
619         return;
620       }
621     }
622     writer->Write(resp, options);
623   }
StartWriteAndFinish(const Response * resp,grpc::WriteOptions options,grpc::Status s)624   void StartWriteAndFinish(const Response* resp, grpc::WriteOptions options,
625                            grpc::Status s) ABSL_LOCKS_EXCLUDED(writer_mu_) {
626     ServerCallbackWriter<Response>* writer =
627         writer_.load(std::memory_order_acquire);
628     if (writer == nullptr) {
629       grpc::internal::MutexLock l(&writer_mu_);
630       writer = writer_.load(std::memory_order_relaxed);
631       if (writer == nullptr) {
632         backlog_.write_and_finish_wanted = true;
633         backlog_.write_wanted = resp;
634         backlog_.write_options_wanted = options;
635         backlog_.status_wanted = std::move(s);
636         return;
637       }
638     }
639     writer->WriteAndFinish(resp, options, std::move(s));
640   }
StartWriteLast(const Response * resp,grpc::WriteOptions options)641   void StartWriteLast(const Response* resp, grpc::WriteOptions options) {
642     StartWrite(resp, options.set_last_message());
643   }
Finish(grpc::Status s)644   void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(writer_mu_) {
645     ServerCallbackWriter<Response>* writer =
646         writer_.load(std::memory_order_acquire);
647     if (writer == nullptr) {
648       grpc::internal::MutexLock l(&writer_mu_);
649       writer = writer_.load(std::memory_order_relaxed);
650       if (writer == nullptr) {
651         backlog_.finish_wanted = true;
652         backlog_.status_wanted = std::move(s);
653         return;
654       }
655     }
656     writer->Finish(std::move(s));
657   }
658 
659   /// The following notifications are exactly like ServerBidiReactor.
OnSendInitialMetadataDone(bool)660   virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
OnWriteDone(bool)661   virtual void OnWriteDone(bool /*ok*/) {}
662   void OnDone() override = 0;
OnCancel()663   void OnCancel() override {}
664 
665  private:
666   friend class ServerCallbackWriter<Response>;
667   // May be overridden by internal implementation details. This is not a public
668   // customization point.
InternalBindWriter(ServerCallbackWriter<Response> * writer)669   virtual void InternalBindWriter(ServerCallbackWriter<Response>* writer)
670       ABSL_LOCKS_EXCLUDED(writer_mu_) {
671     grpc::internal::MutexLock l(&writer_mu_);
672 
673     if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
674       writer->SendInitialMetadata();
675     }
676     if (GPR_UNLIKELY(backlog_.write_and_finish_wanted)) {
677       writer->WriteAndFinish(backlog_.write_wanted,
678                              std::move(backlog_.write_options_wanted),
679                              std::move(backlog_.status_wanted));
680     } else {
681       if (GPR_UNLIKELY(backlog_.write_wanted != nullptr)) {
682         writer->Write(backlog_.write_wanted,
683                       std::move(backlog_.write_options_wanted));
684       }
685       if (GPR_UNLIKELY(backlog_.finish_wanted)) {
686         writer->Finish(std::move(backlog_.status_wanted));
687       }
688     }
689     // Set writer_ last so that other functions can use it lock-free
690     writer_.store(writer, std::memory_order_release);
691   }
692 
693   grpc::internal::Mutex writer_mu_;
694   std::atomic<ServerCallbackWriter<Response>*> writer_{nullptr};
695   struct PreBindBacklog {
696     bool send_initial_metadata_wanted = false;
697     bool write_and_finish_wanted = false;
698     bool finish_wanted = false;
699     const Response* write_wanted = nullptr;
700     grpc::WriteOptions write_options_wanted;
701     grpc::Status status_wanted;
702   };
703   PreBindBacklog backlog_ ABSL_GUARDED_BY(writer_mu_);
704 };
705 
706 class ServerUnaryReactor : public internal::ServerReactor {
707  public:
ServerUnaryReactor()708   ServerUnaryReactor() : call_(nullptr) {}
709   ~ServerUnaryReactor() override = default;
710 
711   /// StartSendInitialMetadata is exactly like ServerBidiReactor.
StartSendInitialMetadata()712   void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(call_mu_) {
713     ServerCallbackUnary* call = call_.load(std::memory_order_acquire);
714     if (call == nullptr) {
715       grpc::internal::MutexLock l(&call_mu_);
716       call = call_.load(std::memory_order_relaxed);
717       if (call == nullptr) {
718         backlog_.send_initial_metadata_wanted = true;
719         return;
720       }
721     }
722     call->SendInitialMetadata();
723   }
724   /// Finish is similar to ServerBidiReactor except for one detail.
725   /// If the status is non-OK, any message will not be sent. Instead,
726   /// the client will only receive the status and any trailing metadata.
Finish(grpc::Status s)727   void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(call_mu_) {
728     ServerCallbackUnary* call = call_.load(std::memory_order_acquire);
729     if (call == nullptr) {
730       grpc::internal::MutexLock l(&call_mu_);
731       call = call_.load(std::memory_order_relaxed);
732       if (call == nullptr) {
733         backlog_.finish_wanted = true;
734         backlog_.status_wanted = std::move(s);
735         return;
736       }
737     }
738     call->Finish(std::move(s));
739   }
740 
741   /// The following notifications are exactly like ServerBidiReactor.
OnSendInitialMetadataDone(bool)742   virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
743   void OnDone() override = 0;
OnCancel()744   void OnCancel() override {}
745 
746  private:
747   friend class ServerCallbackUnary;
748   // May be overridden by internal implementation details. This is not a public
749   // customization point.
InternalBindCall(ServerCallbackUnary * call)750   virtual void InternalBindCall(ServerCallbackUnary* call)
751       ABSL_LOCKS_EXCLUDED(call_mu_) {
752     grpc::internal::MutexLock l(&call_mu_);
753 
754     if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
755       call->SendInitialMetadata();
756     }
757     if (GPR_UNLIKELY(backlog_.finish_wanted)) {
758       call->Finish(std::move(backlog_.status_wanted));
759     }
760     // Set call_ last so that other functions can use it lock-free
761     call_.store(call, std::memory_order_release);
762   }
763 
764   grpc::internal::Mutex call_mu_;
765   std::atomic<ServerCallbackUnary*> call_{nullptr};
766   struct PreBindBacklog {
767     bool send_initial_metadata_wanted = false;
768     bool finish_wanted = false;
769     grpc::Status status_wanted;
770   };
771   PreBindBacklog backlog_ ABSL_GUARDED_BY(call_mu_);
772 };
773 
774 namespace internal {
775 
776 template <class Base>
777 class FinishOnlyReactor : public Base {
778  public:
FinishOnlyReactor(grpc::Status s)779   explicit FinishOnlyReactor(grpc::Status s) { this->Finish(std::move(s)); }
OnDone()780   void OnDone() override { this->~FinishOnlyReactor(); }
781 };
782 
783 using UnimplementedUnaryReactor = FinishOnlyReactor<ServerUnaryReactor>;
784 template <class Request>
785 using UnimplementedReadReactor = FinishOnlyReactor<ServerReadReactor<Request>>;
786 template <class Response>
787 using UnimplementedWriteReactor =
788     FinishOnlyReactor<ServerWriteReactor<Response>>;
789 template <class Request, class Response>
790 using UnimplementedBidiReactor =
791     FinishOnlyReactor<ServerBidiReactor<Request, Response>>;
792 
793 }  // namespace internal
794 
795 // TODO(vjpai): Remove namespace experimental when last known users are migrated
796 // off.
797 namespace experimental {
798 
799 template <class Request, class Response>
800 using ServerBidiReactor = ::grpc::ServerBidiReactor<Request, Response>;
801 
802 }  // namespace experimental
803 
804 }  // namespace grpc
805 
806 #endif  // GRPCPP_SUPPORT_SERVER_CALLBACK_H
807