xref: /aosp_15_r20/external/grpc-grpc/include/grpcpp/support/async_unary_call.h (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPCPP_SUPPORT_ASYNC_UNARY_CALL_H
20 #define GRPCPP_SUPPORT_ASYNC_UNARY_CALL_H
21 
22 #include <grpc/grpc.h>
23 #include <grpc/support/log.h>
24 #include <grpcpp/client_context.h>
25 #include <grpcpp/impl/call.h>
26 #include <grpcpp/impl/call_op_set.h>
27 #include <grpcpp/impl/call_op_set_interface.h>
28 #include <grpcpp/impl/channel_interface.h>
29 #include <grpcpp/impl/service_type.h>
30 #include <grpcpp/server_context.h>
31 #include <grpcpp/support/status.h>
32 
33 namespace grpc {
34 
35 // Forward declaration for use in Helper class
36 template <class R>
37 class ClientAsyncResponseReader;
38 
39 /// An interface relevant for async client side unary RPCs (which send
40 /// one request message to a server and receive one response message).
41 template <class R>
42 class ClientAsyncResponseReaderInterface {
43  public:
~ClientAsyncResponseReaderInterface()44   virtual ~ClientAsyncResponseReaderInterface() {}
45 
46   /// Start the call that was set up by the constructor, but only if the
47   /// constructor was invoked through the "Prepare" API which doesn't actually
48   /// start the call
49   virtual void StartCall() = 0;
50 
51   /// Request notification of the reading of initial metadata. Completion
52   /// will be notified by \a tag on the associated completion queue.
53   /// This call is optional, but if it is used, it cannot be used concurrently
54   /// with or after the \a Finish method.
55   ///
56   /// \param[in] tag Tag identifying this request.
57   virtual void ReadInitialMetadata(void* tag) = 0;
58 
59   /// Request to receive the server's response \a msg and final \a status for
60   /// the call, and to notify \a tag on this call's completion queue when
61   /// finished.
62   ///
63   /// This function will return when either:
64   /// - when the server's response message and status have been received.
65   /// - when the server has returned a non-OK status (no message expected in
66   ///   this case).
67   /// - when the call failed for some reason and the library generated a
68   ///   non-OK status.
69   ///
70   /// \param[in] tag Tag identifying this request.
71   /// \param[out] status To be updated with the operation status.
72   /// \param[out] msg To be filled in with the server's response message.
73   virtual void Finish(R* msg, grpc::Status* status, void* tag) = 0;
74 };
75 
76 namespace internal {
77 
78 class ClientAsyncResponseReaderHelper {
79  public:
80   /// Start a call and write the request out if \a start is set.
81   /// \a tag will be notified on \a cq when the call has been started (i.e.
82   /// initial metadata sent) and \a request has been written out.
83   /// If \a start is not set, the actual call must be initiated by StartCall
84   /// Note that \a context will be used to fill in custom initial metadata
85   /// used to send to the server when starting the call.
86   ///
87   /// Optionally pass in a base class for request and response types so that the
88   /// internal functions and structs can be templated based on that, allowing
89   /// reuse across RPCs (e.g., MessageLite for protobuf). Since constructors
90   /// can't have an explicit template parameter, the last argument is an
91   /// extraneous parameter just to provide the needed type information.
92   template <class R, class W, class BaseR = R, class BaseW = W>
Create(grpc::ChannelInterface * channel,grpc::CompletionQueue * cq,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,const W & request)93   static ClientAsyncResponseReader<R>* Create(
94       grpc::ChannelInterface* channel, grpc::CompletionQueue* cq,
95       const grpc::internal::RpcMethod& method, grpc::ClientContext* context,
96       const W& request) /* __attribute__((noinline)) */ {
97     grpc::internal::Call call = channel->CreateCall(method, context, cq);
98     ClientAsyncResponseReader<R>* result = new (grpc_call_arena_alloc(
99         call.call(), sizeof(ClientAsyncResponseReader<R>)))
100         ClientAsyncResponseReader<R>(call, context);
101     SetupRequest<BaseR, BaseW>(
102         call.call(), &result->single_buf_, &result->read_initial_metadata_,
103         &result->finish_, static_cast<const BaseW&>(request));
104 
105     return result;
106   }
107 
108   // Various helper functions to reduce templating use
109 
110   template <class R, class W>
SetupRequest(grpc_call * call,grpc::internal::CallOpSendInitialMetadata ** single_buf_ptr,std::function<void (ClientContext *,internal::Call *,internal::CallOpSendInitialMetadata *,void *)> * read_initial_metadata,std::function<void (ClientContext *,internal::Call *,bool initial_metadata_read,internal::CallOpSendInitialMetadata *,internal::CallOpSetInterface **,void *,Status *,void *)> * finish,const W & request)111   static void SetupRequest(
112       grpc_call* call,
113       grpc::internal::CallOpSendInitialMetadata** single_buf_ptr,
114       std::function<void(ClientContext*, internal::Call*,
115                          internal::CallOpSendInitialMetadata*, void*)>*
116           read_initial_metadata,
117       std::function<
118           void(ClientContext*, internal::Call*, bool initial_metadata_read,
119                internal::CallOpSendInitialMetadata*,
120                internal::CallOpSetInterface**, void*, Status*, void*)>* finish,
121       const W& request) {
122     using SingleBufType =
123         grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
124                                   grpc::internal::CallOpSendMessage,
125                                   grpc::internal::CallOpClientSendClose,
126                                   grpc::internal::CallOpRecvInitialMetadata,
127                                   grpc::internal::CallOpRecvMessage<R>,
128                                   grpc::internal::CallOpClientRecvStatus>;
129     SingleBufType* single_buf =
130         new (grpc_call_arena_alloc(call, sizeof(SingleBufType))) SingleBufType;
131     *single_buf_ptr = single_buf;
132     // TODO(ctiller): don't assert
133     GPR_ASSERT(single_buf->SendMessage(request).ok());
134     single_buf->ClientSendClose();
135 
136     // The purpose of the following functions is to type-erase the actual
137     // templated type of the CallOpSet being used by hiding that type inside the
138     // function definition rather than specifying it as an argument of the
139     // function or a member of the class. The type-erased CallOpSet will get
140     // static_cast'ed back to the real type so that it can be used properly.
141     *read_initial_metadata =
142         [](ClientContext* context, internal::Call* call,
143            internal::CallOpSendInitialMetadata* single_buf_view, void* tag) {
144           auto* single_buf = static_cast<SingleBufType*>(single_buf_view);
145           single_buf->set_output_tag(tag);
146           single_buf->RecvInitialMetadata(context);
147           call->PerformOps(single_buf);
148         };
149 
150     // Note that this function goes one step further than the previous one
151     // because it type-erases the message being written down to a void*. This
152     // will be static-cast'ed back to the class specified here by hiding that
153     // class information inside the function definition. Note that this feature
154     // expects the class being specified here for R to be a base-class of the
155     // "real" R without any multiple-inheritance (as applies in protobuf wrt
156     // MessageLite)
157     *finish = [](ClientContext* context, internal::Call* call,
158                  bool initial_metadata_read,
159                  internal::CallOpSendInitialMetadata* single_buf_view,
160                  internal::CallOpSetInterface** finish_buf_ptr, void* msg,
161                  Status* status, void* tag) {
162       if (initial_metadata_read) {
163         using FinishBufType =
164             grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<R>,
165                                       grpc::internal::CallOpClientRecvStatus>;
166         FinishBufType* finish_buf =
167             new (grpc_call_arena_alloc(call->call(), sizeof(FinishBufType)))
168                 FinishBufType;
169         *finish_buf_ptr = finish_buf;
170         finish_buf->set_output_tag(tag);
171         finish_buf->RecvMessage(static_cast<R*>(msg));
172         finish_buf->AllowNoMessage();
173         finish_buf->ClientRecvStatus(context, status);
174         call->PerformOps(finish_buf);
175       } else {
176         auto* single_buf = static_cast<SingleBufType*>(single_buf_view);
177         single_buf->set_output_tag(tag);
178         single_buf->RecvInitialMetadata(context);
179         single_buf->RecvMessage(static_cast<R*>(msg));
180         single_buf->AllowNoMessage();
181         single_buf->ClientRecvStatus(context, status);
182         call->PerformOps(single_buf);
183       }
184     };
185   }
186 
StartCall(grpc::ClientContext * context,grpc::internal::CallOpSendInitialMetadata * single_buf)187   static void StartCall(grpc::ClientContext* context,
188                         grpc::internal::CallOpSendInitialMetadata* single_buf) {
189     single_buf->SendInitialMetadata(&context->send_initial_metadata_,
190                                     context->initial_metadata_flags());
191   }
192 };
193 
194 // TODO(vjpai): This templated factory is deprecated and will be replaced by
195 //.             the non-templated helper as soon as possible.
196 template <class R>
197 class ClientAsyncResponseReaderFactory {
198  public:
199   template <class W>
Create(grpc::ChannelInterface * channel,grpc::CompletionQueue * cq,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,const W & request,bool start)200   static ClientAsyncResponseReader<R>* Create(
201       grpc::ChannelInterface* channel, grpc::CompletionQueue* cq,
202       const grpc::internal::RpcMethod& method, grpc::ClientContext* context,
203       const W& request, bool start) {
204     auto* result = ClientAsyncResponseReaderHelper::Create<R>(
205         channel, cq, method, context, request);
206     if (start) {
207       result->StartCall();
208     }
209     return result;
210   }
211 };
212 
213 }  // namespace internal
214 
215 /// Async API for client-side unary RPCs, where the message response
216 /// received from the server is of type \a R.
217 template <class R>
218 class ClientAsyncResponseReader final
219     : public ClientAsyncResponseReaderInterface<R> {
220  public:
221   // always allocated against a call arena, no memory free required
delete(void *,std::size_t size)222   static void operator delete(void* /*ptr*/, std::size_t size) {
223     GPR_ASSERT(size == sizeof(ClientAsyncResponseReader));
224   }
225 
226   // This operator should never be called as the memory should be freed as part
227   // of the arena destruction. It only exists to provide a matching operator
228   // delete to the operator new so that some compilers will not complain (see
229   // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
230   // there are no tests catching the compiler warning.
delete(void *,void *)231   static void operator delete(void*, void*) { GPR_ASSERT(false); }
232 
StartCall()233   void StartCall() override {
234     GPR_DEBUG_ASSERT(!started_);
235     started_ = true;
236     internal::ClientAsyncResponseReaderHelper::StartCall(context_, single_buf_);
237   }
238 
239   /// See \a ClientAsyncResponseReaderInterface::ReadInitialMetadata for
240   /// semantics.
241   ///
242   /// Side effect:
243   ///   - the \a ClientContext associated with this call is updated with
244   ///     possible initial and trailing metadata sent from the server.
ReadInitialMetadata(void * tag)245   void ReadInitialMetadata(void* tag) override {
246     GPR_DEBUG_ASSERT(started_);
247     GPR_DEBUG_ASSERT(!context_->initial_metadata_received_);
248     read_initial_metadata_(context_, &call_, single_buf_, tag);
249     initial_metadata_read_ = true;
250   }
251 
252   /// See \a ClientAsyncResponseReaderInterface::Finish for semantics.
253   ///
254   /// Side effect:
255   ///   - the \a ClientContext associated with this call is updated with
256   ///     possible initial and trailing metadata sent from the server.
Finish(R * msg,grpc::Status * status,void * tag)257   void Finish(R* msg, grpc::Status* status, void* tag) override {
258     GPR_DEBUG_ASSERT(started_);
259     finish_(context_, &call_, initial_metadata_read_, single_buf_, &finish_buf_,
260             static_cast<void*>(msg), status, tag);
261   }
262 
263  private:
264   friend class internal::ClientAsyncResponseReaderHelper;
265   grpc::ClientContext* const context_;
266   grpc::internal::Call call_;
267   bool started_ = false;
268   bool initial_metadata_read_ = false;
269 
ClientAsyncResponseReader(grpc::internal::Call call,grpc::ClientContext * context)270   ClientAsyncResponseReader(grpc::internal::Call call,
271                             grpc::ClientContext* context)
272       : context_(context), call_(call) {}
273 
274   // disable operator new
275   static void* operator new(std::size_t size);
new(std::size_t,void * p)276   static void* operator new(std::size_t /*size*/, void* p) { return p; }
277 
278   internal::CallOpSendInitialMetadata* single_buf_;
279   internal::CallOpSetInterface* finish_buf_ = nullptr;
280   std::function<void(ClientContext*, internal::Call*,
281                      internal::CallOpSendInitialMetadata*, void*)>
282       read_initial_metadata_;
283   std::function<void(ClientContext*, internal::Call*,
284                      bool initial_metadata_read,
285                      internal::CallOpSendInitialMetadata*,
286                      internal::CallOpSetInterface**, void*, Status*, void*)>
287       finish_;
288 };
289 
290 /// Async server-side API for handling unary calls, where the single
291 /// response message sent to the client is of type \a W.
292 template <class W>
293 class ServerAsyncResponseWriter final
294     : public grpc::internal::ServerAsyncStreamingInterface {
295  public:
ServerAsyncResponseWriter(grpc::ServerContext * ctx)296   explicit ServerAsyncResponseWriter(grpc::ServerContext* ctx)
297       : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
298 
299   /// See \a ServerAsyncStreamingInterface::SendInitialMetadata for semantics.
300   ///
301   /// Side effect:
302   ///   The initial metadata that will be sent to the client from this op will
303   ///   be taken from the \a ServerContext associated with the call.
304   ///
305   /// \param[in] tag Tag identifying this request.
SendInitialMetadata(void * tag)306   void SendInitialMetadata(void* tag) override {
307     GPR_ASSERT(!ctx_->sent_initial_metadata_);
308 
309     meta_buf_.set_output_tag(tag);
310     meta_buf_.SendInitialMetadata(&ctx_->initial_metadata_,
311                                   ctx_->initial_metadata_flags());
312     if (ctx_->compression_level_set()) {
313       meta_buf_.set_compression_level(ctx_->compression_level());
314     }
315     ctx_->sent_initial_metadata_ = true;
316     call_.PerformOps(&meta_buf_);
317   }
318 
319   /// Indicate that the stream is to be finished and request notification
320   /// when the server has sent the appropriate signals to the client to
321   /// end the call. Should not be used concurrently with other operations.
322   ///
323   /// \param[in] tag Tag identifying this request.
324   /// \param[in] status To be sent to the client as the result of the call.
325   /// \param[in] msg Message to be sent to the client.
326   ///
327   /// Side effect:
328   ///   - also sends initial metadata if not already sent (using the
329   ///     \a ServerContext associated with this call).
330   ///
331   /// Note: if \a status has a non-OK code, then \a msg will not be sent,
332   /// and the client will receive only the status with possible trailing
333   /// metadata.
334   ///
335   /// gRPC doesn't take ownership or a reference to msg and status, so it is
336   /// safe to deallocate them once the Finish operation is complete (i.e. a
337   /// result arrives in the completion queue).
Finish(const W & msg,const grpc::Status & status,void * tag)338   void Finish(const W& msg, const grpc::Status& status, void* tag) {
339     finish_buf_.set_output_tag(tag);
340     finish_buf_.set_core_cq_tag(&finish_buf_);
341     if (!ctx_->sent_initial_metadata_) {
342       finish_buf_.SendInitialMetadata(&ctx_->initial_metadata_,
343                                       ctx_->initial_metadata_flags());
344       if (ctx_->compression_level_set()) {
345         finish_buf_.set_compression_level(ctx_->compression_level());
346       }
347       ctx_->sent_initial_metadata_ = true;
348     }
349     // The response is dropped if the status is not OK.
350     if (status.ok()) {
351       finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_,
352                                    finish_buf_.SendMessage(msg));
353     } else {
354       finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, status);
355     }
356     call_.PerformOps(&finish_buf_);
357   }
358 
359   /// Indicate that the stream is to be finished with a non-OK status,
360   /// and request notification for when the server has finished sending the
361   /// appropriate signals to the client to end the call.
362   /// Should not be used concurrently with other operations.
363   ///
364   /// \param[in] tag Tag identifying this request.
365   /// \param[in] status To be sent to the client as the result of the call.
366   ///   - Note: \a status must have a non-OK code.
367   ///
368   /// Side effect:
369   ///   - also sends initial metadata if not already sent (using the
370   ///     \a ServerContext associated with this call).
371   ///
372   /// gRPC doesn't take ownership or a reference to status, so it is safe to
373   /// deallocate them once the Finish operation is complete (i.e. a result
374   /// arrives in the completion queue).
FinishWithError(const grpc::Status & status,void * tag)375   void FinishWithError(const grpc::Status& status, void* tag) {
376     GPR_ASSERT(!status.ok());
377     finish_buf_.set_output_tag(tag);
378     if (!ctx_->sent_initial_metadata_) {
379       finish_buf_.SendInitialMetadata(&ctx_->initial_metadata_,
380                                       ctx_->initial_metadata_flags());
381       if (ctx_->compression_level_set()) {
382         finish_buf_.set_compression_level(ctx_->compression_level());
383       }
384       ctx_->sent_initial_metadata_ = true;
385     }
386     finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, status);
387     call_.PerformOps(&finish_buf_);
388   }
389 
390  private:
BindCall(grpc::internal::Call * call)391   void BindCall(grpc::internal::Call* call) override { call_ = *call; }
392 
393   grpc::internal::Call call_;
394   grpc::ServerContext* ctx_;
395   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata>
396       meta_buf_;
397   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
398                             grpc::internal::CallOpSendMessage,
399                             grpc::internal::CallOpServerSendStatus>
400       finish_buf_;
401 };
402 
403 }  // namespace grpc
404 
405 namespace std {
406 template <class R>
407 class default_delete<grpc::ClientAsyncResponseReader<R>> {
408  public:
operator()409   void operator()(void* /*p*/) {}
410 };
411 template <class R>
412 class default_delete<grpc::ClientAsyncResponseReaderInterface<R>> {
413  public:
operator()414   void operator()(void* /*p*/) {}
415 };
416 }  // namespace std
417 
418 #endif  // GRPCPP_SUPPORT_ASYNC_UNARY_CALL_H
419