xref: /aosp_15_r20/external/grpc-grpc/include/grpcpp/support/async_stream.h (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPCPP_SUPPORT_ASYNC_STREAM_H
20 #define GRPCPP_SUPPORT_ASYNC_STREAM_H
21 
22 #include <grpc/grpc.h>
23 #include <grpc/support/log.h>
24 #include <grpcpp/impl/call.h>
25 #include <grpcpp/impl/channel_interface.h>
26 #include <grpcpp/impl/service_type.h>
27 #include <grpcpp/server_context.h>
28 #include <grpcpp/support/status.h>
29 
30 namespace grpc {
31 
32 namespace internal {
33 /// Common interface for all client side asynchronous streaming.
34 class ClientAsyncStreamingInterface {
35  public:
~ClientAsyncStreamingInterface()36   virtual ~ClientAsyncStreamingInterface() {}
37 
38   /// Start the call that was set up by the constructor, but only if the
39   /// constructor was invoked through the "Prepare" API which doesn't actually
40   /// start the call
41   virtual void StartCall(void* tag) = 0;
42 
43   /// Request notification of the reading of the initial metadata. Completion
44   /// will be notified by \a tag on the associated completion queue.
45   /// This call is optional, but if it is used, it cannot be used concurrently
46   /// with or after the \a AsyncReaderInterface::Read method.
47   ///
48   /// \param[in] tag Tag identifying this request.
49   virtual void ReadInitialMetadata(void* tag) = 0;
50 
51   /// Indicate that the stream is to be finished and request notification for
52   /// when the call has been ended.
53   /// Should not be used concurrently with other operations.
54   ///
55   /// It is appropriate to call this method exactly once when both:
56   ///   * the client side has no more message to send
57   ///     (this can be declared implicitly by calling this method, or
58   ///     explicitly through an earlier call to the <i>WritesDone</i> method
59   ///     of the class in use, e.g. \a ClientAsyncWriterInterface::WritesDone or
60   ///     \a ClientAsyncReaderWriterInterface::WritesDone).
61   ///   * there are no more messages to be received from the server (this can
62   ///     be known implicitly by the calling code, or explicitly from an
63   ///     earlier call to \a AsyncReaderInterface::Read that yielded a failed
64   ///     result, e.g. cq->Next(&read_tag, &ok) filled in 'ok' with 'false').
65   ///
66   /// The tag will be returned when either:
67   /// - all incoming messages have been read and the server has returned
68   ///   a status.
69   /// - the server has returned a non-OK status.
70   /// - the call failed for some reason and the library generated a
71   ///   status.
72   ///
73   /// Note that implementations of this method attempt to receive initial
74   /// metadata from the server if initial metadata hasn't yet been received.
75   ///
76   /// \param[in] tag Tag identifying this request.
77   /// \param[out] status To be updated with the operation status.
78   virtual void Finish(grpc::Status* status, void* tag) = 0;
79 };
80 
81 /// An interface that yields a sequence of messages of type \a R.
82 template <class R>
83 class AsyncReaderInterface {
84  public:
~AsyncReaderInterface()85   virtual ~AsyncReaderInterface() {}
86 
87   /// Read a message of type \a R into \a msg. Completion will be notified by \a
88   /// tag on the associated completion queue.
89   /// This is thread-safe with respect to \a Write or \a WritesDone methods. It
90   /// should not be called concurrently with other streaming APIs
91   /// on the same stream. It is not meaningful to call it concurrently
92   /// with another \a AsyncReaderInterface::Read on the same stream since reads
93   /// on the same stream are delivered in order.
94   ///
95   /// \param[out] msg Where to eventually store the read message.
96   /// \param[in] tag The tag identifying the operation.
97   ///
98   /// Side effect: note that this method attempt to receive initial metadata for
99   /// a stream if it hasn't yet been received.
100   virtual void Read(R* msg, void* tag) = 0;
101 };
102 
103 /// An interface that can be fed a sequence of messages of type \a W.
104 template <class W>
105 class AsyncWriterInterface {
106  public:
~AsyncWriterInterface()107   virtual ~AsyncWriterInterface() {}
108 
109   /// Request the writing of \a msg with identifying tag \a tag.
110   ///
111   /// Only one write may be outstanding at any given time. This means that
112   /// after calling Write, one must wait to receive \a tag from the completion
113   /// queue BEFORE calling Write again.
114   /// This is thread-safe with respect to \a AsyncReaderInterface::Read
115   ///
116   /// gRPC doesn't take ownership or a reference to \a msg, so it is safe to
117   /// to deallocate once Write returns.
118   ///
119   /// \param[in] msg The message to be written.
120   /// \param[in] tag The tag identifying the operation.
121   virtual void Write(const W& msg, void* tag) = 0;
122 
123   /// Request the writing of \a msg using WriteOptions \a options with
124   /// identifying tag \a tag.
125   ///
126   /// Only one write may be outstanding at any given time. This means that
127   /// after calling Write, one must wait to receive \a tag from the completion
128   /// queue BEFORE calling Write again.
129   /// WriteOptions \a options is used to set the write options of this message.
130   /// This is thread-safe with respect to \a AsyncReaderInterface::Read
131   ///
132   /// gRPC doesn't take ownership or a reference to \a msg, so it is safe to
133   /// to deallocate once Write returns.
134   ///
135   /// \param[in] msg The message to be written.
136   /// \param[in] options The WriteOptions to be used to write this message.
137   /// \param[in] tag The tag identifying the operation.
138   virtual void Write(const W& msg, grpc::WriteOptions options, void* tag) = 0;
139 
140   /// Request the writing of \a msg and coalesce it with the writing
141   /// of trailing metadata, using WriteOptions \a options with
142   /// identifying tag \a tag.
143   ///
144   /// For client, WriteLast is equivalent of performing Write and
145   /// WritesDone in a single step.
146   /// For server, WriteLast buffers the \a msg. The writing of \a msg is held
147   /// until Finish is called, where \a msg and trailing metadata are coalesced
148   /// and write is initiated. Note that WriteLast can only buffer \a msg up to
149   /// the flow control window size. If \a msg size is larger than the window
150   /// size, it will be sent on wire without buffering.
151   ///
152   /// gRPC doesn't take ownership or a reference to \a msg, so it is safe to
153   /// to deallocate once Write returns.
154   ///
155   /// \param[in] msg The message to be written.
156   /// \param[in] options The WriteOptions to be used to write this message.
157   /// \param[in] tag The tag identifying the operation.
WriteLast(const W & msg,grpc::WriteOptions options,void * tag)158   void WriteLast(const W& msg, grpc::WriteOptions options, void* tag) {
159     Write(msg, options.set_last_message(), tag);
160   }
161 };
162 
163 }  // namespace internal
164 
165 template <class R>
166 class ClientAsyncReaderInterface
167     : public internal::ClientAsyncStreamingInterface,
168       public internal::AsyncReaderInterface<R> {};
169 
170 namespace internal {
171 template <class R>
172 class ClientAsyncReaderFactory {
173  public:
174   /// Create a stream object.
175   /// Write the first request out if \a start is set.
176   /// \a tag will be notified on \a cq when the call has been started and
177   /// \a request has been written out. If \a start is not set, \a tag must be
178   /// nullptr and the actual call must be initiated by StartCall
179   /// Note that \a context will be used to fill in custom initial metadata
180   /// used to send to the server when starting the call.
181   template <class W>
Create(grpc::ChannelInterface * channel,grpc::CompletionQueue * cq,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,const W & request,bool start,void * tag)182   static ClientAsyncReader<R>* Create(grpc::ChannelInterface* channel,
183                                       grpc::CompletionQueue* cq,
184                                       const grpc::internal::RpcMethod& method,
185                                       grpc::ClientContext* context,
186                                       const W& request, bool start, void* tag) {
187     grpc::internal::Call call = channel->CreateCall(method, context, cq);
188     return new (
189         grpc_call_arena_alloc(call.call(), sizeof(ClientAsyncReader<R>)))
190         ClientAsyncReader<R>(call, context, request, start, tag);
191   }
192 };
193 }  // namespace internal
194 
195 /// Async client-side API for doing server-streaming RPCs,
196 /// where the incoming message stream coming from the server has
197 /// messages of type \a R.
198 template <class R>
199 class ClientAsyncReader final : public ClientAsyncReaderInterface<R> {
200  public:
201   // always allocated against a call arena, no memory free required
delete(void *,std::size_t size)202   static void operator delete(void* /*ptr*/, std::size_t size) {
203     GPR_ASSERT(size == sizeof(ClientAsyncReader));
204   }
205 
206   // This operator should never be called as the memory should be freed as part
207   // of the arena destruction. It only exists to provide a matching operator
208   // delete to the operator new so that some compilers will not complain (see
209   // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
210   // there are no tests catching the compiler warning.
delete(void *,void *)211   static void operator delete(void*, void*) { GPR_ASSERT(false); }
212 
StartCall(void * tag)213   void StartCall(void* tag) override {
214     GPR_ASSERT(!started_);
215     started_ = true;
216     StartCallInternal(tag);
217   }
218 
219   /// See the \a ClientAsyncStreamingInterface.ReadInitialMetadata
220   /// method for semantics.
221   ///
222   /// Side effect:
223   ///   - upon receiving initial metadata from the server,
224   ///     the \a ClientContext associated with this call is updated, and the
225   ///     calling code can access the received metadata through the
226   ///     \a ClientContext.
ReadInitialMetadata(void * tag)227   void ReadInitialMetadata(void* tag) override {
228     GPR_ASSERT(started_);
229     GPR_ASSERT(!context_->initial_metadata_received_);
230 
231     meta_ops_.set_output_tag(tag);
232     meta_ops_.RecvInitialMetadata(context_);
233     call_.PerformOps(&meta_ops_);
234   }
235 
Read(R * msg,void * tag)236   void Read(R* msg, void* tag) override {
237     GPR_ASSERT(started_);
238     read_ops_.set_output_tag(tag);
239     if (!context_->initial_metadata_received_) {
240       read_ops_.RecvInitialMetadata(context_);
241     }
242     read_ops_.RecvMessage(msg);
243     call_.PerformOps(&read_ops_);
244   }
245 
246   /// See the \a ClientAsyncStreamingInterface.Finish method for semantics.
247   ///
248   /// Side effect:
249   ///   - the \a ClientContext associated with this call is updated with
250   ///     possible initial and trailing metadata received from the server.
Finish(grpc::Status * status,void * tag)251   void Finish(grpc::Status* status, void* tag) override {
252     GPR_ASSERT(started_);
253     finish_ops_.set_output_tag(tag);
254     if (!context_->initial_metadata_received_) {
255       finish_ops_.RecvInitialMetadata(context_);
256     }
257     finish_ops_.ClientRecvStatus(context_, status);
258     call_.PerformOps(&finish_ops_);
259   }
260 
261  private:
262   friend class internal::ClientAsyncReaderFactory<R>;
263   template <class W>
ClientAsyncReader(grpc::internal::Call call,grpc::ClientContext * context,const W & request,bool start,void * tag)264   ClientAsyncReader(grpc::internal::Call call, grpc::ClientContext* context,
265                     const W& request, bool start, void* tag)
266       : context_(context), call_(call), started_(start) {
267     // TODO(ctiller): don't assert
268     GPR_ASSERT(init_ops_.SendMessage(request).ok());
269     init_ops_.ClientSendClose();
270     if (start) {
271       StartCallInternal(tag);
272     } else {
273       GPR_ASSERT(tag == nullptr);
274     }
275   }
276 
StartCallInternal(void * tag)277   void StartCallInternal(void* tag) {
278     init_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
279                                   context_->initial_metadata_flags());
280     init_ops_.set_output_tag(tag);
281     call_.PerformOps(&init_ops_);
282   }
283 
284   grpc::ClientContext* context_;
285   grpc::internal::Call call_;
286   bool started_;
287   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
288                             grpc::internal::CallOpSendMessage,
289                             grpc::internal::CallOpClientSendClose>
290       init_ops_;
291   grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata>
292       meta_ops_;
293   grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata,
294                             grpc::internal::CallOpRecvMessage<R>>
295       read_ops_;
296   grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata,
297                             grpc::internal::CallOpClientRecvStatus>
298       finish_ops_;
299 };
300 
301 /// Common interface for client side asynchronous writing.
302 template <class W>
303 class ClientAsyncWriterInterface
304     : public internal::ClientAsyncStreamingInterface,
305       public internal::AsyncWriterInterface<W> {
306  public:
307   /// Signal the client is done with the writes (half-close the client stream).
308   /// Thread-safe with respect to \a AsyncReaderInterface::Read
309   ///
310   /// \param[in] tag The tag identifying the operation.
311   virtual void WritesDone(void* tag) = 0;
312 };
313 
314 namespace internal {
315 template <class W>
316 class ClientAsyncWriterFactory {
317  public:
318   /// Create a stream object.
319   /// Start the RPC if \a start is set
320   /// \a tag will be notified on \a cq when the call has been started (i.e.
321   /// initial metadata sent) and \a request has been written out.
322   /// If \a start is not set, \a tag must be nullptr and the actual call
323   /// must be initiated by StartCall
324   /// Note that \a context will be used to fill in custom initial metadata
325   /// used to send to the server when starting the call.
326   /// \a response will be filled in with the single expected response
327   /// message from the server upon a successful call to the \a Finish
328   /// method of this instance.
329   template <class R>
Create(grpc::ChannelInterface * channel,grpc::CompletionQueue * cq,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,R * response,bool start,void * tag)330   static ClientAsyncWriter<W>* Create(grpc::ChannelInterface* channel,
331                                       grpc::CompletionQueue* cq,
332                                       const grpc::internal::RpcMethod& method,
333                                       grpc::ClientContext* context, R* response,
334                                       bool start, void* tag) {
335     grpc::internal::Call call = channel->CreateCall(method, context, cq);
336     return new (
337         grpc_call_arena_alloc(call.call(), sizeof(ClientAsyncWriter<W>)))
338         ClientAsyncWriter<W>(call, context, response, start, tag);
339   }
340 };
341 }  // namespace internal
342 
343 /// Async API on the client side for doing client-streaming RPCs,
344 /// where the outgoing message stream going to the server contains
345 /// messages of type \a W.
346 template <class W>
347 class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> {
348  public:
349   // always allocated against a call arena, no memory free required
delete(void *,std::size_t size)350   static void operator delete(void* /*ptr*/, std::size_t size) {
351     GPR_ASSERT(size == sizeof(ClientAsyncWriter));
352   }
353 
354   // This operator should never be called as the memory should be freed as part
355   // of the arena destruction. It only exists to provide a matching operator
356   // delete to the operator new so that some compilers will not complain (see
357   // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
358   // there are no tests catching the compiler warning.
delete(void *,void *)359   static void operator delete(void*, void*) { GPR_ASSERT(false); }
360 
StartCall(void * tag)361   void StartCall(void* tag) override {
362     GPR_ASSERT(!started_);
363     started_ = true;
364     StartCallInternal(tag);
365   }
366 
367   /// See the \a ClientAsyncStreamingInterface.ReadInitialMetadata method for
368   /// semantics.
369   ///
370   /// Side effect:
371   ///   - upon receiving initial metadata from the server, the \a ClientContext
372   ///     associated with this call is updated, and the calling code can access
373   ///     the received metadata through the \a ClientContext.
ReadInitialMetadata(void * tag)374   void ReadInitialMetadata(void* tag) override {
375     GPR_ASSERT(started_);
376     GPR_ASSERT(!context_->initial_metadata_received_);
377 
378     meta_ops_.set_output_tag(tag);
379     meta_ops_.RecvInitialMetadata(context_);
380     call_.PerformOps(&meta_ops_);
381   }
382 
Write(const W & msg,void * tag)383   void Write(const W& msg, void* tag) override {
384     GPR_ASSERT(started_);
385     write_ops_.set_output_tag(tag);
386     // TODO(ctiller): don't assert
387     GPR_ASSERT(write_ops_.SendMessage(msg).ok());
388     call_.PerformOps(&write_ops_);
389   }
390 
Write(const W & msg,grpc::WriteOptions options,void * tag)391   void Write(const W& msg, grpc::WriteOptions options, void* tag) override {
392     GPR_ASSERT(started_);
393     write_ops_.set_output_tag(tag);
394     if (options.is_last_message()) {
395       options.set_buffer_hint();
396       write_ops_.ClientSendClose();
397     }
398     // TODO(ctiller): don't assert
399     GPR_ASSERT(write_ops_.SendMessage(msg, options).ok());
400     call_.PerformOps(&write_ops_);
401   }
402 
WritesDone(void * tag)403   void WritesDone(void* tag) override {
404     GPR_ASSERT(started_);
405     write_ops_.set_output_tag(tag);
406     write_ops_.ClientSendClose();
407     call_.PerformOps(&write_ops_);
408   }
409 
410   /// See the \a ClientAsyncStreamingInterface.Finish method for semantics.
411   ///
412   /// Side effect:
413   ///   - the \a ClientContext associated with this call is updated with
414   ///     possible initial and trailing metadata received from the server.
415   ///   - attempts to fill in the \a response parameter passed to this class's
416   ///     constructor with the server's response message.
Finish(grpc::Status * status,void * tag)417   void Finish(grpc::Status* status, void* tag) override {
418     GPR_ASSERT(started_);
419     finish_ops_.set_output_tag(tag);
420     if (!context_->initial_metadata_received_) {
421       finish_ops_.RecvInitialMetadata(context_);
422     }
423     finish_ops_.ClientRecvStatus(context_, status);
424     call_.PerformOps(&finish_ops_);
425   }
426 
427  private:
428   friend class internal::ClientAsyncWriterFactory<W>;
429   template <class R>
ClientAsyncWriter(grpc::internal::Call call,grpc::ClientContext * context,R * response,bool start,void * tag)430   ClientAsyncWriter(grpc::internal::Call call, grpc::ClientContext* context,
431                     R* response, bool start, void* tag)
432       : context_(context), call_(call), started_(start) {
433     finish_ops_.RecvMessage(response);
434     finish_ops_.AllowNoMessage();
435     if (start) {
436       StartCallInternal(tag);
437     } else {
438       GPR_ASSERT(tag == nullptr);
439     }
440   }
441 
StartCallInternal(void * tag)442   void StartCallInternal(void* tag) {
443     write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
444                                    context_->initial_metadata_flags());
445     // if corked bit is set in context, we just keep the initial metadata
446     // buffered up to coalesce with later message send. No op is performed.
447     if (!context_->initial_metadata_corked_) {
448       write_ops_.set_output_tag(tag);
449       call_.PerformOps(&write_ops_);
450     }
451   }
452 
453   grpc::ClientContext* context_;
454   grpc::internal::Call call_;
455   bool started_;
456   grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata>
457       meta_ops_;
458   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
459                             grpc::internal::CallOpSendMessage,
460                             grpc::internal::CallOpClientSendClose>
461       write_ops_;
462   grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata,
463                             grpc::internal::CallOpGenericRecvMessage,
464                             grpc::internal::CallOpClientRecvStatus>
465       finish_ops_;
466 };
467 
468 /// Async client-side interface for bi-directional streaming,
469 /// where the client-to-server message stream has messages of type \a W,
470 /// and the server-to-client message stream has messages of type \a R.
471 template <class W, class R>
472 class ClientAsyncReaderWriterInterface
473     : public internal::ClientAsyncStreamingInterface,
474       public internal::AsyncWriterInterface<W>,
475       public internal::AsyncReaderInterface<R> {
476  public:
477   /// Signal the client is done with the writes (half-close the client stream).
478   /// Thread-safe with respect to \a AsyncReaderInterface::Read
479   ///
480   /// \param[in] tag The tag identifying the operation.
481   virtual void WritesDone(void* tag) = 0;
482 };
483 
484 namespace internal {
485 template <class W, class R>
486 class ClientAsyncReaderWriterFactory {
487  public:
488   /// Create a stream object.
489   /// Start the RPC request if \a start is set.
490   /// \a tag will be notified on \a cq when the call has been started (i.e.
491   /// initial metadata sent). If \a start is not set, \a tag must be
492   /// nullptr and the actual call must be initiated by StartCall
493   /// Note that \a context will be used to fill in custom initial metadata
494   /// used to send to the server when starting the call.
Create(grpc::ChannelInterface * channel,grpc::CompletionQueue * cq,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,bool start,void * tag)495   static ClientAsyncReaderWriter<W, R>* Create(
496       grpc::ChannelInterface* channel, grpc::CompletionQueue* cq,
497       const grpc::internal::RpcMethod& method, grpc::ClientContext* context,
498       bool start, void* tag) {
499     grpc::internal::Call call = channel->CreateCall(method, context, cq);
500 
501     return new (grpc_call_arena_alloc(call.call(),
502                                       sizeof(ClientAsyncReaderWriter<W, R>)))
503         ClientAsyncReaderWriter<W, R>(call, context, start, tag);
504   }
505 };
506 }  // namespace internal
507 
508 /// Async client-side interface for bi-directional streaming,
509 /// where the outgoing message stream going to the server
510 /// has messages of type \a W,  and the incoming message stream coming
511 /// from the server has messages of type \a R.
512 template <class W, class R>
513 class ClientAsyncReaderWriter final
514     : public ClientAsyncReaderWriterInterface<W, R> {
515  public:
516   // always allocated against a call arena, no memory free required
delete(void *,std::size_t size)517   static void operator delete(void* /*ptr*/, std::size_t size) {
518     GPR_ASSERT(size == sizeof(ClientAsyncReaderWriter));
519   }
520 
521   // This operator should never be called as the memory should be freed as part
522   // of the arena destruction. It only exists to provide a matching operator
523   // delete to the operator new so that some compilers will not complain (see
524   // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
525   // there are no tests catching the compiler warning.
delete(void *,void *)526   static void operator delete(void*, void*) { GPR_ASSERT(false); }
527 
StartCall(void * tag)528   void StartCall(void* tag) override {
529     GPR_ASSERT(!started_);
530     started_ = true;
531     StartCallInternal(tag);
532   }
533 
534   /// See the \a ClientAsyncStreamingInterface.ReadInitialMetadata method
535   /// for semantics of this method.
536   ///
537   /// Side effect:
538   ///   - upon receiving initial metadata from the server, the \a ClientContext
539   ///     is updated with it, and then the receiving initial metadata can
540   ///     be accessed through this \a ClientContext.
ReadInitialMetadata(void * tag)541   void ReadInitialMetadata(void* tag) override {
542     GPR_ASSERT(started_);
543     GPR_ASSERT(!context_->initial_metadata_received_);
544 
545     meta_ops_.set_output_tag(tag);
546     meta_ops_.RecvInitialMetadata(context_);
547     call_.PerformOps(&meta_ops_);
548   }
549 
Read(R * msg,void * tag)550   void Read(R* msg, void* tag) override {
551     GPR_ASSERT(started_);
552     read_ops_.set_output_tag(tag);
553     if (!context_->initial_metadata_received_) {
554       read_ops_.RecvInitialMetadata(context_);
555     }
556     read_ops_.RecvMessage(msg);
557     call_.PerformOps(&read_ops_);
558   }
559 
Write(const W & msg,void * tag)560   void Write(const W& msg, void* tag) override {
561     GPR_ASSERT(started_);
562     write_ops_.set_output_tag(tag);
563     // TODO(ctiller): don't assert
564     GPR_ASSERT(write_ops_.SendMessage(msg).ok());
565     call_.PerformOps(&write_ops_);
566   }
567 
Write(const W & msg,grpc::WriteOptions options,void * tag)568   void Write(const W& msg, grpc::WriteOptions options, void* tag) override {
569     GPR_ASSERT(started_);
570     write_ops_.set_output_tag(tag);
571     if (options.is_last_message()) {
572       options.set_buffer_hint();
573       write_ops_.ClientSendClose();
574     }
575     // TODO(ctiller): don't assert
576     GPR_ASSERT(write_ops_.SendMessage(msg, options).ok());
577     call_.PerformOps(&write_ops_);
578   }
579 
WritesDone(void * tag)580   void WritesDone(void* tag) override {
581     GPR_ASSERT(started_);
582     write_ops_.set_output_tag(tag);
583     write_ops_.ClientSendClose();
584     call_.PerformOps(&write_ops_);
585   }
586 
587   /// See the \a ClientAsyncStreamingInterface.Finish method for semantics.
588   /// Side effect
589   ///   - the \a ClientContext associated with this call is updated with
590   ///     possible initial and trailing metadata sent from the server.
Finish(grpc::Status * status,void * tag)591   void Finish(grpc::Status* status, void* tag) override {
592     GPR_ASSERT(started_);
593     finish_ops_.set_output_tag(tag);
594     if (!context_->initial_metadata_received_) {
595       finish_ops_.RecvInitialMetadata(context_);
596     }
597     finish_ops_.ClientRecvStatus(context_, status);
598     call_.PerformOps(&finish_ops_);
599   }
600 
601  private:
602   friend class internal::ClientAsyncReaderWriterFactory<W, R>;
ClientAsyncReaderWriter(grpc::internal::Call call,grpc::ClientContext * context,bool start,void * tag)603   ClientAsyncReaderWriter(grpc::internal::Call call,
604                           grpc::ClientContext* context, bool start, void* tag)
605       : context_(context), call_(call), started_(start) {
606     if (start) {
607       StartCallInternal(tag);
608     } else {
609       GPR_ASSERT(tag == nullptr);
610     }
611   }
612 
StartCallInternal(void * tag)613   void StartCallInternal(void* tag) {
614     write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
615                                    context_->initial_metadata_flags());
616     // if corked bit is set in context, we just keep the initial metadata
617     // buffered up to coalesce with later message send. No op is performed.
618     if (!context_->initial_metadata_corked_) {
619       write_ops_.set_output_tag(tag);
620       call_.PerformOps(&write_ops_);
621     }
622   }
623 
624   grpc::ClientContext* context_;
625   grpc::internal::Call call_;
626   bool started_;
627   grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata>
628       meta_ops_;
629   grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata,
630                             grpc::internal::CallOpRecvMessage<R>>
631       read_ops_;
632   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
633                             grpc::internal::CallOpSendMessage,
634                             grpc::internal::CallOpClientSendClose>
635       write_ops_;
636   grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata,
637                             grpc::internal::CallOpClientRecvStatus>
638       finish_ops_;
639 };
640 
641 template <class W, class R>
642 class ServerAsyncReaderInterface
643     : public grpc::internal::ServerAsyncStreamingInterface,
644       public internal::AsyncReaderInterface<R> {
645  public:
646   /// Indicate that the stream is to be finished with a certain status code
647   /// and also send out \a msg response to the client.
648   /// Request notification for when the server has sent the response and the
649   /// appropriate signals to the client to end the call.
650   /// Should not be used concurrently with other operations.
651   ///
652   /// It is appropriate to call this method when:
653   ///   * all messages from the client have been received (either known
654   ///     implicitly, or explicitly because a previous
655   ///     \a AsyncReaderInterface::Read operation with a non-ok result,
656   ///     e.g., cq->Next(&read_tag, &ok) filled in 'ok' with 'false').
657   ///
658   /// This operation will end when the server has finished sending out initial
659   /// metadata (if not sent already), response message, and status, or if
660   /// some failure occurred when trying to do so.
661   ///
662   /// gRPC doesn't take ownership or a reference to \a msg or \a status, so it
663   /// is safe to deallocate once Finish returns.
664   ///
665   /// \param[in] tag Tag identifying this request.
666   /// \param[in] status To be sent to the client as the result of this call.
667   /// \param[in] msg To be sent to the client as the response for this call.
668   virtual void Finish(const W& msg, const grpc::Status& status, void* tag) = 0;
669 
670   /// Indicate that the stream is to be finished with a certain
671   /// non-OK status code.
672   /// Request notification for when the server has sent the appropriate
673   /// signals to the client to end the call.
674   /// Should not be used concurrently with other operations.
675   ///
676   /// This call is meant to end the call with some error, and can be called at
677   /// any point that the server would like to "fail" the call (though note
678   /// this shouldn't be called concurrently with any other "sending" call, like
679   /// \a AsyncWriterInterface::Write).
680   ///
681   /// This operation will end when the server has finished sending out initial
682   /// metadata (if not sent already), and status, or if some failure occurred
683   /// when trying to do so.
684   ///
685   /// gRPC doesn't take ownership or a reference to \a status, so it is safe to
686   /// to deallocate once FinishWithError returns.
687   ///
688   /// \param[in] tag Tag identifying this request.
689   /// \param[in] status To be sent to the client as the result of this call.
690   ///     - Note: \a status must have a non-OK code.
691   virtual void FinishWithError(const grpc::Status& status, void* tag) = 0;
692 };
693 
694 /// Async server-side API for doing client-streaming RPCs,
695 /// where the incoming message stream from the client has messages of type \a R,
696 /// and the single response message sent from the server is type \a W.
697 template <class W, class R>
698 class ServerAsyncReader final : public ServerAsyncReaderInterface<W, R> {
699  public:
ServerAsyncReader(grpc::ServerContext * ctx)700   explicit ServerAsyncReader(grpc::ServerContext* ctx)
701       : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
702 
703   /// See \a ServerAsyncStreamingInterface::SendInitialMetadata for semantics.
704   ///
705   /// Implicit input parameter:
706   ///   - The initial metadata that will be sent to the client from this op will
707   ///     be taken from the \a ServerContext associated with the call.
SendInitialMetadata(void * tag)708   void SendInitialMetadata(void* tag) override {
709     GPR_ASSERT(!ctx_->sent_initial_metadata_);
710 
711     meta_ops_.set_output_tag(tag);
712     meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
713                                   ctx_->initial_metadata_flags());
714     if (ctx_->compression_level_set()) {
715       meta_ops_.set_compression_level(ctx_->compression_level());
716     }
717     ctx_->sent_initial_metadata_ = true;
718     call_.PerformOps(&meta_ops_);
719   }
720 
Read(R * msg,void * tag)721   void Read(R* msg, void* tag) override {
722     read_ops_.set_output_tag(tag);
723     read_ops_.RecvMessage(msg);
724     call_.PerformOps(&read_ops_);
725   }
726 
727   /// See the \a ServerAsyncReaderInterface.Read method for semantics
728   ///
729   /// Side effect:
730   ///   - also sends initial metadata if not already sent.
731   ///   - uses the \a ServerContext associated with this call to send possible
732   ///     initial and trailing metadata.
733   ///
734   /// Note: \a msg is not sent if \a status has a non-OK code.
735   ///
736   /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it
737   /// is safe to deallocate once Finish returns.
Finish(const W & msg,const grpc::Status & status,void * tag)738   void Finish(const W& msg, const grpc::Status& status, void* tag) override {
739     finish_ops_.set_output_tag(tag);
740     if (!ctx_->sent_initial_metadata_) {
741       finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
742                                       ctx_->initial_metadata_flags());
743       if (ctx_->compression_level_set()) {
744         finish_ops_.set_compression_level(ctx_->compression_level());
745       }
746       ctx_->sent_initial_metadata_ = true;
747     }
748     // The response is dropped if the status is not OK.
749     if (status.ok()) {
750       finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
751                                    finish_ops_.SendMessage(msg));
752     } else {
753       finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
754     }
755     call_.PerformOps(&finish_ops_);
756   }
757 
758   /// See the \a ServerAsyncReaderInterface.Read method for semantics
759   ///
760   /// Side effect:
761   ///   - also sends initial metadata if not already sent.
762   ///   - uses the \a ServerContext associated with this call to send possible
763   ///     initial and trailing metadata.
764   ///
765   /// gRPC doesn't take ownership or a reference to \a status, so it is safe to
766   /// to deallocate once FinishWithError returns.
FinishWithError(const grpc::Status & status,void * tag)767   void FinishWithError(const grpc::Status& status, void* tag) override {
768     GPR_ASSERT(!status.ok());
769     finish_ops_.set_output_tag(tag);
770     if (!ctx_->sent_initial_metadata_) {
771       finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
772                                       ctx_->initial_metadata_flags());
773       if (ctx_->compression_level_set()) {
774         finish_ops_.set_compression_level(ctx_->compression_level());
775       }
776       ctx_->sent_initial_metadata_ = true;
777     }
778     finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
779     call_.PerformOps(&finish_ops_);
780   }
781 
782  private:
BindCall(grpc::internal::Call * call)783   void BindCall(grpc::internal::Call* call) override { call_ = *call; }
784 
785   grpc::internal::Call call_;
786   grpc::ServerContext* ctx_;
787   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata>
788       meta_ops_;
789   grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<R>> read_ops_;
790   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
791                             grpc::internal::CallOpSendMessage,
792                             grpc::internal::CallOpServerSendStatus>
793       finish_ops_;
794 };
795 
796 template <class W>
797 class ServerAsyncWriterInterface
798     : public grpc::internal::ServerAsyncStreamingInterface,
799       public internal::AsyncWriterInterface<W> {
800  public:
801   /// Indicate that the stream is to be finished with a certain status code.
802   /// Request notification for when the server has sent the appropriate
803   /// signals to the client to end the call.
804   /// Should not be used concurrently with other operations.
805   ///
806   /// It is appropriate to call this method when either:
807   ///   * all messages from the client have been received (either known
808   ///     implicitly, or explicitly because a previous \a
809   ///     AsyncReaderInterface::Read operation with a non-ok
810   ///     result (e.g., cq->Next(&read_tag, &ok) filled in 'ok' with 'false'.
811   ///   * it is desired to end the call early with some non-OK status code.
812   ///
813   /// This operation will end when the server has finished sending out initial
814   /// metadata (if not sent already), response message, and status, or if
815   /// some failure occurred when trying to do so.
816   ///
817   /// gRPC doesn't take ownership or a reference to \a status, so it is safe to
818   /// to deallocate once Finish returns.
819   ///
820   /// \param[in] tag Tag identifying this request.
821   /// \param[in] status To be sent to the client as the result of this call.
822   virtual void Finish(const grpc::Status& status, void* tag) = 0;
823 
824   /// Request the writing of \a msg and coalesce it with trailing metadata which
825   /// contains \a status, using WriteOptions options with
826   /// identifying tag \a tag.
827   ///
828   /// WriteAndFinish is equivalent of performing WriteLast and Finish
829   /// in a single step.
830   ///
831   /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it
832   /// is safe to deallocate once WriteAndFinish returns.
833   ///
834   /// \param[in] msg The message to be written.
835   /// \param[in] options The WriteOptions to be used to write this message.
836   /// \param[in] status The Status that server returns to client.
837   /// \param[in] tag The tag identifying the operation.
838   virtual void WriteAndFinish(const W& msg, grpc::WriteOptions options,
839                               const grpc::Status& status, void* tag) = 0;
840 };
841 
842 /// Async server-side API for doing server streaming RPCs,
843 /// where the outgoing message stream from the server has messages of type \a W.
844 template <class W>
845 class ServerAsyncWriter final : public ServerAsyncWriterInterface<W> {
846  public:
ServerAsyncWriter(grpc::ServerContext * ctx)847   explicit ServerAsyncWriter(grpc::ServerContext* ctx)
848       : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
849 
850   /// See \a ServerAsyncStreamingInterface::SendInitialMetadata for semantics.
851   ///
852   /// Implicit input parameter:
853   ///   - The initial metadata that will be sent to the client from this op will
854   ///     be taken from the \a ServerContext associated with the call.
855   ///
856   /// \param[in] tag Tag identifying this request.
SendInitialMetadata(void * tag)857   void SendInitialMetadata(void* tag) override {
858     GPR_ASSERT(!ctx_->sent_initial_metadata_);
859 
860     meta_ops_.set_output_tag(tag);
861     meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
862                                   ctx_->initial_metadata_flags());
863     if (ctx_->compression_level_set()) {
864       meta_ops_.set_compression_level(ctx_->compression_level());
865     }
866     ctx_->sent_initial_metadata_ = true;
867     call_.PerformOps(&meta_ops_);
868   }
869 
Write(const W & msg,void * tag)870   void Write(const W& msg, void* tag) override {
871     write_ops_.set_output_tag(tag);
872     EnsureInitialMetadataSent(&write_ops_);
873     // TODO(ctiller): don't assert
874     GPR_ASSERT(write_ops_.SendMessage(msg).ok());
875     call_.PerformOps(&write_ops_);
876   }
877 
Write(const W & msg,grpc::WriteOptions options,void * tag)878   void Write(const W& msg, grpc::WriteOptions options, void* tag) override {
879     write_ops_.set_output_tag(tag);
880     if (options.is_last_message()) {
881       options.set_buffer_hint();
882     }
883 
884     EnsureInitialMetadataSent(&write_ops_);
885     // TODO(ctiller): don't assert
886     GPR_ASSERT(write_ops_.SendMessage(msg, options).ok());
887     call_.PerformOps(&write_ops_);
888   }
889 
890   /// See the \a ServerAsyncWriterInterface.WriteAndFinish method for semantics.
891   ///
892   /// Implicit input parameter:
893   ///   - the \a ServerContext associated with this call is used
894   ///     for sending trailing (and initial) metadata to the client.
895   ///
896   /// Note: \a status must have an OK code.
897   ///
898   /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it
899   /// is safe to deallocate once WriteAndFinish returns.
WriteAndFinish(const W & msg,grpc::WriteOptions options,const grpc::Status & status,void * tag)900   void WriteAndFinish(const W& msg, grpc::WriteOptions options,
901                       const grpc::Status& status, void* tag) override {
902     write_ops_.set_output_tag(tag);
903     EnsureInitialMetadataSent(&write_ops_);
904     options.set_buffer_hint();
905     GPR_ASSERT(write_ops_.SendMessage(msg, options).ok());
906     write_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
907     call_.PerformOps(&write_ops_);
908   }
909 
910   /// See the \a ServerAsyncWriterInterface.Finish method for semantics.
911   ///
912   /// Implicit input parameter:
913   ///   - the \a ServerContext associated with this call is used for sending
914   ///     trailing (and initial if not already sent) metadata to the client.
915   ///
916   /// Note: there are no restrictions are the code of
917   /// \a status,it may be non-OK
918   ///
919   /// gRPC doesn't take ownership or a reference to \a status, so it is safe to
920   /// to deallocate once Finish returns.
Finish(const grpc::Status & status,void * tag)921   void Finish(const grpc::Status& status, void* tag) override {
922     finish_ops_.set_output_tag(tag);
923     EnsureInitialMetadataSent(&finish_ops_);
924     finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
925     call_.PerformOps(&finish_ops_);
926   }
927 
928  private:
BindCall(grpc::internal::Call * call)929   void BindCall(grpc::internal::Call* call) override { call_ = *call; }
930 
931   template <class T>
EnsureInitialMetadataSent(T * ops)932   void EnsureInitialMetadataSent(T* ops) {
933     if (!ctx_->sent_initial_metadata_) {
934       ops->SendInitialMetadata(&ctx_->initial_metadata_,
935                                ctx_->initial_metadata_flags());
936       if (ctx_->compression_level_set()) {
937         ops->set_compression_level(ctx_->compression_level());
938       }
939       ctx_->sent_initial_metadata_ = true;
940     }
941   }
942 
943   grpc::internal::Call call_;
944   grpc::ServerContext* ctx_;
945   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata>
946       meta_ops_;
947   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
948                             grpc::internal::CallOpSendMessage,
949                             grpc::internal::CallOpServerSendStatus>
950       write_ops_;
951   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
952                             grpc::internal::CallOpServerSendStatus>
953       finish_ops_;
954 };
955 
956 /// Server-side interface for asynchronous bi-directional streaming.
957 template <class W, class R>
958 class ServerAsyncReaderWriterInterface
959     : public grpc::internal::ServerAsyncStreamingInterface,
960       public internal::AsyncWriterInterface<W>,
961       public internal::AsyncReaderInterface<R> {
962  public:
963   /// Indicate that the stream is to be finished with a certain status code.
964   /// Request notification for when the server has sent the appropriate
965   /// signals to the client to end the call.
966   /// Should not be used concurrently with other operations.
967   ///
968   /// It is appropriate to call this method when either:
969   ///   * all messages from the client have been received (either known
970   ///     implicitly, or explicitly because a previous \a
971   ///     AsyncReaderInterface::Read operation
972   ///     with a non-ok result (e.g., cq->Next(&read_tag, &ok) filled in 'ok'
973   ///     with 'false'.
974   ///   * it is desired to end the call early with some non-OK status code.
975   ///
976   /// This operation will end when the server has finished sending out initial
977   /// metadata (if not sent already), response message, and status, or if some
978   /// failure occurred when trying to do so.
979   ///
980   /// gRPC doesn't take ownership or a reference to \a status, so it is safe to
981   /// to deallocate once Finish returns.
982   ///
983   /// \param[in] tag Tag identifying this request.
984   /// \param[in] status To be sent to the client as the result of this call.
985   virtual void Finish(const grpc::Status& status, void* tag) = 0;
986 
987   /// Request the writing of \a msg and coalesce it with trailing metadata which
988   /// contains \a status, using WriteOptions options with
989   /// identifying tag \a tag.
990   ///
991   /// WriteAndFinish is equivalent of performing WriteLast and Finish in a
992   /// single step.
993   ///
994   /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it
995   /// is safe to deallocate once WriteAndFinish returns.
996   ///
997   /// \param[in] msg The message to be written.
998   /// \param[in] options The WriteOptions to be used to write this message.
999   /// \param[in] status The Status that server returns to client.
1000   /// \param[in] tag The tag identifying the operation.
1001   virtual void WriteAndFinish(const W& msg, grpc::WriteOptions options,
1002                               const grpc::Status& status, void* tag) = 0;
1003 };
1004 
1005 /// Async server-side API for doing bidirectional streaming RPCs,
1006 /// where the incoming message stream coming from the client has messages of
1007 /// type \a R, and the outgoing message stream coming from the server has
1008 /// messages of type \a W.
1009 template <class W, class R>
1010 class ServerAsyncReaderWriter final
1011     : public ServerAsyncReaderWriterInterface<W, R> {
1012  public:
ServerAsyncReaderWriter(grpc::ServerContext * ctx)1013   explicit ServerAsyncReaderWriter(grpc::ServerContext* ctx)
1014       : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
1015 
1016   /// See \a ServerAsyncStreamingInterface::SendInitialMetadata for semantics.
1017   ///
1018   /// Implicit input parameter:
1019   ///   - The initial metadata that will be sent to the client from this op will
1020   ///     be taken from the \a ServerContext associated with the call.
1021   ///
1022   /// \param[in] tag Tag identifying this request.
SendInitialMetadata(void * tag)1023   void SendInitialMetadata(void* tag) override {
1024     GPR_ASSERT(!ctx_->sent_initial_metadata_);
1025 
1026     meta_ops_.set_output_tag(tag);
1027     meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
1028                                   ctx_->initial_metadata_flags());
1029     if (ctx_->compression_level_set()) {
1030       meta_ops_.set_compression_level(ctx_->compression_level());
1031     }
1032     ctx_->sent_initial_metadata_ = true;
1033     call_.PerformOps(&meta_ops_);
1034   }
1035 
Read(R * msg,void * tag)1036   void Read(R* msg, void* tag) override {
1037     read_ops_.set_output_tag(tag);
1038     read_ops_.RecvMessage(msg);
1039     call_.PerformOps(&read_ops_);
1040   }
1041 
Write(const W & msg,void * tag)1042   void Write(const W& msg, void* tag) override {
1043     write_ops_.set_output_tag(tag);
1044     EnsureInitialMetadataSent(&write_ops_);
1045     // TODO(ctiller): don't assert
1046     GPR_ASSERT(write_ops_.SendMessage(msg).ok());
1047     call_.PerformOps(&write_ops_);
1048   }
1049 
Write(const W & msg,grpc::WriteOptions options,void * tag)1050   void Write(const W& msg, grpc::WriteOptions options, void* tag) override {
1051     write_ops_.set_output_tag(tag);
1052     if (options.is_last_message()) {
1053       options.set_buffer_hint();
1054     }
1055     EnsureInitialMetadataSent(&write_ops_);
1056     GPR_ASSERT(write_ops_.SendMessage(msg, options).ok());
1057     call_.PerformOps(&write_ops_);
1058   }
1059 
1060   /// See the \a ServerAsyncReaderWriterInterface.WriteAndFinish
1061   /// method for semantics.
1062   ///
1063   /// Implicit input parameter:
1064   ///   - the \a ServerContext associated with this call is used
1065   ///     for sending trailing (and initial) metadata to the client.
1066   ///
1067   /// Note: \a status must have an OK code.
1068   //
1069   /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it
1070   /// is safe to deallocate once WriteAndFinish returns.
WriteAndFinish(const W & msg,grpc::WriteOptions options,const grpc::Status & status,void * tag)1071   void WriteAndFinish(const W& msg, grpc::WriteOptions options,
1072                       const grpc::Status& status, void* tag) override {
1073     write_ops_.set_output_tag(tag);
1074     EnsureInitialMetadataSent(&write_ops_);
1075     options.set_buffer_hint();
1076     GPR_ASSERT(write_ops_.SendMessage(msg, options).ok());
1077     write_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
1078     call_.PerformOps(&write_ops_);
1079   }
1080 
1081   /// See the \a ServerAsyncReaderWriterInterface.Finish method for semantics.
1082   ///
1083   /// Implicit input parameter:
1084   ///   - the \a ServerContext associated with this call is used for sending
1085   ///     trailing (and initial if not already sent) metadata to the client.
1086   ///
1087   /// Note: there are no restrictions are the code of \a status,
1088   /// it may be non-OK
1089   //
1090   /// gRPC doesn't take ownership or a reference to \a status, so it is safe to
1091   /// to deallocate once Finish returns.
Finish(const grpc::Status & status,void * tag)1092   void Finish(const grpc::Status& status, void* tag) override {
1093     finish_ops_.set_output_tag(tag);
1094     EnsureInitialMetadataSent(&finish_ops_);
1095 
1096     finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
1097     call_.PerformOps(&finish_ops_);
1098   }
1099 
1100  private:
1101   friend class grpc::Server;
1102 
BindCall(grpc::internal::Call * call)1103   void BindCall(grpc::internal::Call* call) override { call_ = *call; }
1104 
1105   template <class T>
EnsureInitialMetadataSent(T * ops)1106   void EnsureInitialMetadataSent(T* ops) {
1107     if (!ctx_->sent_initial_metadata_) {
1108       ops->SendInitialMetadata(&ctx_->initial_metadata_,
1109                                ctx_->initial_metadata_flags());
1110       if (ctx_->compression_level_set()) {
1111         ops->set_compression_level(ctx_->compression_level());
1112       }
1113       ctx_->sent_initial_metadata_ = true;
1114     }
1115   }
1116 
1117   grpc::internal::Call call_;
1118   grpc::ServerContext* ctx_;
1119   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata>
1120       meta_ops_;
1121   grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<R>> read_ops_;
1122   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
1123                             grpc::internal::CallOpSendMessage,
1124                             grpc::internal::CallOpServerSendStatus>
1125       write_ops_;
1126   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
1127                             grpc::internal::CallOpServerSendStatus>
1128       finish_ops_;
1129 };
1130 
1131 }  // namespace grpc
1132 
1133 #endif  // GRPCPP_SUPPORT_ASYNC_STREAM_H
1134