1 // 2 // 3 // Copyright 2015 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 // 17 // 18 19 #ifndef GRPCPP_SUPPORT_ASYNC_UNARY_CALL_H 20 #define GRPCPP_SUPPORT_ASYNC_UNARY_CALL_H 21 22 #include <grpc/grpc.h> 23 #include <grpc/support/log.h> 24 #include <grpcpp/client_context.h> 25 #include <grpcpp/impl/call.h> 26 #include <grpcpp/impl/call_op_set.h> 27 #include <grpcpp/impl/call_op_set_interface.h> 28 #include <grpcpp/impl/channel_interface.h> 29 #include <grpcpp/impl/service_type.h> 30 #include <grpcpp/server_context.h> 31 #include <grpcpp/support/status.h> 32 33 namespace grpc { 34 35 // Forward declaration for use in Helper class 36 template <class R> 37 class ClientAsyncResponseReader; 38 39 /// An interface relevant for async client side unary RPCs (which send 40 /// one request message to a server and receive one response message). 41 template <class R> 42 class ClientAsyncResponseReaderInterface { 43 public: ~ClientAsyncResponseReaderInterface()44 virtual ~ClientAsyncResponseReaderInterface() {} 45 46 /// Start the call that was set up by the constructor, but only if the 47 /// constructor was invoked through the "Prepare" API which doesn't actually 48 /// start the call 49 virtual void StartCall() = 0; 50 51 /// Request notification of the reading of initial metadata. Completion 52 /// will be notified by \a tag on the associated completion queue. 53 /// This call is optional, but if it is used, it cannot be used concurrently 54 /// with or after the \a Finish method. 55 /// 56 /// \param[in] tag Tag identifying this request. 57 virtual void ReadInitialMetadata(void* tag) = 0; 58 59 /// Request to receive the server's response \a msg and final \a status for 60 /// the call, and to notify \a tag on this call's completion queue when 61 /// finished. 62 /// 63 /// This function will return when either: 64 /// - when the server's response message and status have been received. 65 /// - when the server has returned a non-OK status (no message expected in 66 /// this case). 67 /// - when the call failed for some reason and the library generated a 68 /// non-OK status. 69 /// 70 /// \param[in] tag Tag identifying this request. 71 /// \param[out] status To be updated with the operation status. 72 /// \param[out] msg To be filled in with the server's response message. 73 virtual void Finish(R* msg, grpc::Status* status, void* tag) = 0; 74 }; 75 76 namespace internal { 77 78 class ClientAsyncResponseReaderHelper { 79 public: 80 /// Start a call and write the request out if \a start is set. 81 /// \a tag will be notified on \a cq when the call has been started (i.e. 82 /// initial metadata sent) and \a request has been written out. 83 /// If \a start is not set, the actual call must be initiated by StartCall 84 /// Note that \a context will be used to fill in custom initial metadata 85 /// used to send to the server when starting the call. 86 /// 87 /// Optionally pass in a base class for request and response types so that the 88 /// internal functions and structs can be templated based on that, allowing 89 /// reuse across RPCs (e.g., MessageLite for protobuf). Since constructors 90 /// can't have an explicit template parameter, the last argument is an 91 /// extraneous parameter just to provide the needed type information. 92 template <class R, class W, class BaseR = R, class BaseW = W> Create(grpc::ChannelInterface * channel,grpc::CompletionQueue * cq,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,const W & request)93 static ClientAsyncResponseReader<R>* Create( 94 grpc::ChannelInterface* channel, grpc::CompletionQueue* cq, 95 const grpc::internal::RpcMethod& method, grpc::ClientContext* context, 96 const W& request) /* __attribute__((noinline)) */ { 97 grpc::internal::Call call = channel->CreateCall(method, context, cq); 98 ClientAsyncResponseReader<R>* result = new (grpc_call_arena_alloc( 99 call.call(), sizeof(ClientAsyncResponseReader<R>))) 100 ClientAsyncResponseReader<R>(call, context); 101 SetupRequest<BaseR, BaseW>( 102 call.call(), &result->single_buf_, &result->read_initial_metadata_, 103 &result->finish_, static_cast<const BaseW&>(request)); 104 105 return result; 106 } 107 108 // Various helper functions to reduce templating use 109 110 template <class R, class W> SetupRequest(grpc_call * call,grpc::internal::CallOpSendInitialMetadata ** single_buf_ptr,std::function<void (ClientContext *,internal::Call *,internal::CallOpSendInitialMetadata *,void *)> * read_initial_metadata,std::function<void (ClientContext *,internal::Call *,bool initial_metadata_read,internal::CallOpSendInitialMetadata *,internal::CallOpSetInterface **,void *,Status *,void *)> * finish,const W & request)111 static void SetupRequest( 112 grpc_call* call, 113 grpc::internal::CallOpSendInitialMetadata** single_buf_ptr, 114 std::function<void(ClientContext*, internal::Call*, 115 internal::CallOpSendInitialMetadata*, void*)>* 116 read_initial_metadata, 117 std::function< 118 void(ClientContext*, internal::Call*, bool initial_metadata_read, 119 internal::CallOpSendInitialMetadata*, 120 internal::CallOpSetInterface**, void*, Status*, void*)>* finish, 121 const W& request) { 122 using SingleBufType = 123 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, 124 grpc::internal::CallOpSendMessage, 125 grpc::internal::CallOpClientSendClose, 126 grpc::internal::CallOpRecvInitialMetadata, 127 grpc::internal::CallOpRecvMessage<R>, 128 grpc::internal::CallOpClientRecvStatus>; 129 SingleBufType* single_buf = 130 new (grpc_call_arena_alloc(call, sizeof(SingleBufType))) SingleBufType; 131 *single_buf_ptr = single_buf; 132 // TODO(ctiller): don't assert 133 GPR_ASSERT(single_buf->SendMessage(request).ok()); 134 single_buf->ClientSendClose(); 135 136 // The purpose of the following functions is to type-erase the actual 137 // templated type of the CallOpSet being used by hiding that type inside the 138 // function definition rather than specifying it as an argument of the 139 // function or a member of the class. The type-erased CallOpSet will get 140 // static_cast'ed back to the real type so that it can be used properly. 141 *read_initial_metadata = 142 [](ClientContext* context, internal::Call* call, 143 internal::CallOpSendInitialMetadata* single_buf_view, void* tag) { 144 auto* single_buf = static_cast<SingleBufType*>(single_buf_view); 145 single_buf->set_output_tag(tag); 146 single_buf->RecvInitialMetadata(context); 147 call->PerformOps(single_buf); 148 }; 149 150 // Note that this function goes one step further than the previous one 151 // because it type-erases the message being written down to a void*. This 152 // will be static-cast'ed back to the class specified here by hiding that 153 // class information inside the function definition. Note that this feature 154 // expects the class being specified here for R to be a base-class of the 155 // "real" R without any multiple-inheritance (as applies in protobuf wrt 156 // MessageLite) 157 *finish = [](ClientContext* context, internal::Call* call, 158 bool initial_metadata_read, 159 internal::CallOpSendInitialMetadata* single_buf_view, 160 internal::CallOpSetInterface** finish_buf_ptr, void* msg, 161 Status* status, void* tag) { 162 if (initial_metadata_read) { 163 using FinishBufType = 164 grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<R>, 165 grpc::internal::CallOpClientRecvStatus>; 166 FinishBufType* finish_buf = 167 new (grpc_call_arena_alloc(call->call(), sizeof(FinishBufType))) 168 FinishBufType; 169 *finish_buf_ptr = finish_buf; 170 finish_buf->set_output_tag(tag); 171 finish_buf->RecvMessage(static_cast<R*>(msg)); 172 finish_buf->AllowNoMessage(); 173 finish_buf->ClientRecvStatus(context, status); 174 call->PerformOps(finish_buf); 175 } else { 176 auto* single_buf = static_cast<SingleBufType*>(single_buf_view); 177 single_buf->set_output_tag(tag); 178 single_buf->RecvInitialMetadata(context); 179 single_buf->RecvMessage(static_cast<R*>(msg)); 180 single_buf->AllowNoMessage(); 181 single_buf->ClientRecvStatus(context, status); 182 call->PerformOps(single_buf); 183 } 184 }; 185 } 186 StartCall(grpc::ClientContext * context,grpc::internal::CallOpSendInitialMetadata * single_buf)187 static void StartCall(grpc::ClientContext* context, 188 grpc::internal::CallOpSendInitialMetadata* single_buf) { 189 single_buf->SendInitialMetadata(&context->send_initial_metadata_, 190 context->initial_metadata_flags()); 191 } 192 }; 193 194 // TODO(vjpai): This templated factory is deprecated and will be replaced by 195 //. the non-templated helper as soon as possible. 196 template <class R> 197 class ClientAsyncResponseReaderFactory { 198 public: 199 template <class W> Create(grpc::ChannelInterface * channel,grpc::CompletionQueue * cq,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,const W & request,bool start)200 static ClientAsyncResponseReader<R>* Create( 201 grpc::ChannelInterface* channel, grpc::CompletionQueue* cq, 202 const grpc::internal::RpcMethod& method, grpc::ClientContext* context, 203 const W& request, bool start) { 204 auto* result = ClientAsyncResponseReaderHelper::Create<R>( 205 channel, cq, method, context, request); 206 if (start) { 207 result->StartCall(); 208 } 209 return result; 210 } 211 }; 212 213 } // namespace internal 214 215 /// Async API for client-side unary RPCs, where the message response 216 /// received from the server is of type \a R. 217 template <class R> 218 class ClientAsyncResponseReader final 219 : public ClientAsyncResponseReaderInterface<R> { 220 public: 221 // always allocated against a call arena, no memory free required delete(void *,std::size_t size)222 static void operator delete(void* /*ptr*/, std::size_t size) { 223 GPR_ASSERT(size == sizeof(ClientAsyncResponseReader)); 224 } 225 226 // This operator should never be called as the memory should be freed as part 227 // of the arena destruction. It only exists to provide a matching operator 228 // delete to the operator new so that some compilers will not complain (see 229 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this 230 // there are no tests catching the compiler warning. delete(void *,void *)231 static void operator delete(void*, void*) { GPR_ASSERT(false); } 232 StartCall()233 void StartCall() override { 234 GPR_DEBUG_ASSERT(!started_); 235 started_ = true; 236 internal::ClientAsyncResponseReaderHelper::StartCall(context_, single_buf_); 237 } 238 239 /// See \a ClientAsyncResponseReaderInterface::ReadInitialMetadata for 240 /// semantics. 241 /// 242 /// Side effect: 243 /// - the \a ClientContext associated with this call is updated with 244 /// possible initial and trailing metadata sent from the server. ReadInitialMetadata(void * tag)245 void ReadInitialMetadata(void* tag) override { 246 GPR_DEBUG_ASSERT(started_); 247 GPR_DEBUG_ASSERT(!context_->initial_metadata_received_); 248 read_initial_metadata_(context_, &call_, single_buf_, tag); 249 initial_metadata_read_ = true; 250 } 251 252 /// See \a ClientAsyncResponseReaderInterface::Finish for semantics. 253 /// 254 /// Side effect: 255 /// - the \a ClientContext associated with this call is updated with 256 /// possible initial and trailing metadata sent from the server. Finish(R * msg,grpc::Status * status,void * tag)257 void Finish(R* msg, grpc::Status* status, void* tag) override { 258 GPR_DEBUG_ASSERT(started_); 259 finish_(context_, &call_, initial_metadata_read_, single_buf_, &finish_buf_, 260 static_cast<void*>(msg), status, tag); 261 } 262 263 private: 264 friend class internal::ClientAsyncResponseReaderHelper; 265 grpc::ClientContext* const context_; 266 grpc::internal::Call call_; 267 bool started_ = false; 268 bool initial_metadata_read_ = false; 269 ClientAsyncResponseReader(grpc::internal::Call call,grpc::ClientContext * context)270 ClientAsyncResponseReader(grpc::internal::Call call, 271 grpc::ClientContext* context) 272 : context_(context), call_(call) {} 273 274 // disable operator new 275 static void* operator new(std::size_t size); new(std::size_t,void * p)276 static void* operator new(std::size_t /*size*/, void* p) { return p; } 277 278 internal::CallOpSendInitialMetadata* single_buf_; 279 internal::CallOpSetInterface* finish_buf_ = nullptr; 280 std::function<void(ClientContext*, internal::Call*, 281 internal::CallOpSendInitialMetadata*, void*)> 282 read_initial_metadata_; 283 std::function<void(ClientContext*, internal::Call*, 284 bool initial_metadata_read, 285 internal::CallOpSendInitialMetadata*, 286 internal::CallOpSetInterface**, void*, Status*, void*)> 287 finish_; 288 }; 289 290 /// Async server-side API for handling unary calls, where the single 291 /// response message sent to the client is of type \a W. 292 template <class W> 293 class ServerAsyncResponseWriter final 294 : public grpc::internal::ServerAsyncStreamingInterface { 295 public: ServerAsyncResponseWriter(grpc::ServerContext * ctx)296 explicit ServerAsyncResponseWriter(grpc::ServerContext* ctx) 297 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} 298 299 /// See \a ServerAsyncStreamingInterface::SendInitialMetadata for semantics. 300 /// 301 /// Side effect: 302 /// The initial metadata that will be sent to the client from this op will 303 /// be taken from the \a ServerContext associated with the call. 304 /// 305 /// \param[in] tag Tag identifying this request. SendInitialMetadata(void * tag)306 void SendInitialMetadata(void* tag) override { 307 GPR_ASSERT(!ctx_->sent_initial_metadata_); 308 309 meta_buf_.set_output_tag(tag); 310 meta_buf_.SendInitialMetadata(&ctx_->initial_metadata_, 311 ctx_->initial_metadata_flags()); 312 if (ctx_->compression_level_set()) { 313 meta_buf_.set_compression_level(ctx_->compression_level()); 314 } 315 ctx_->sent_initial_metadata_ = true; 316 call_.PerformOps(&meta_buf_); 317 } 318 319 /// Indicate that the stream is to be finished and request notification 320 /// when the server has sent the appropriate signals to the client to 321 /// end the call. Should not be used concurrently with other operations. 322 /// 323 /// \param[in] tag Tag identifying this request. 324 /// \param[in] status To be sent to the client as the result of the call. 325 /// \param[in] msg Message to be sent to the client. 326 /// 327 /// Side effect: 328 /// - also sends initial metadata if not already sent (using the 329 /// \a ServerContext associated with this call). 330 /// 331 /// Note: if \a status has a non-OK code, then \a msg will not be sent, 332 /// and the client will receive only the status with possible trailing 333 /// metadata. 334 /// 335 /// gRPC doesn't take ownership or a reference to msg and status, so it is 336 /// safe to deallocate them once the Finish operation is complete (i.e. a 337 /// result arrives in the completion queue). Finish(const W & msg,const grpc::Status & status,void * tag)338 void Finish(const W& msg, const grpc::Status& status, void* tag) { 339 finish_buf_.set_output_tag(tag); 340 finish_buf_.set_core_cq_tag(&finish_buf_); 341 if (!ctx_->sent_initial_metadata_) { 342 finish_buf_.SendInitialMetadata(&ctx_->initial_metadata_, 343 ctx_->initial_metadata_flags()); 344 if (ctx_->compression_level_set()) { 345 finish_buf_.set_compression_level(ctx_->compression_level()); 346 } 347 ctx_->sent_initial_metadata_ = true; 348 } 349 // The response is dropped if the status is not OK. 350 if (status.ok()) { 351 finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, 352 finish_buf_.SendMessage(msg)); 353 } else { 354 finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, status); 355 } 356 call_.PerformOps(&finish_buf_); 357 } 358 359 /// Indicate that the stream is to be finished with a non-OK status, 360 /// and request notification for when the server has finished sending the 361 /// appropriate signals to the client to end the call. 362 /// Should not be used concurrently with other operations. 363 /// 364 /// \param[in] tag Tag identifying this request. 365 /// \param[in] status To be sent to the client as the result of the call. 366 /// - Note: \a status must have a non-OK code. 367 /// 368 /// Side effect: 369 /// - also sends initial metadata if not already sent (using the 370 /// \a ServerContext associated with this call). 371 /// 372 /// gRPC doesn't take ownership or a reference to status, so it is safe to 373 /// deallocate them once the Finish operation is complete (i.e. a result 374 /// arrives in the completion queue). FinishWithError(const grpc::Status & status,void * tag)375 void FinishWithError(const grpc::Status& status, void* tag) { 376 GPR_ASSERT(!status.ok()); 377 finish_buf_.set_output_tag(tag); 378 if (!ctx_->sent_initial_metadata_) { 379 finish_buf_.SendInitialMetadata(&ctx_->initial_metadata_, 380 ctx_->initial_metadata_flags()); 381 if (ctx_->compression_level_set()) { 382 finish_buf_.set_compression_level(ctx_->compression_level()); 383 } 384 ctx_->sent_initial_metadata_ = true; 385 } 386 finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, status); 387 call_.PerformOps(&finish_buf_); 388 } 389 390 private: BindCall(grpc::internal::Call * call)391 void BindCall(grpc::internal::Call* call) override { call_ = *call; } 392 393 grpc::internal::Call call_; 394 grpc::ServerContext* ctx_; 395 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> 396 meta_buf_; 397 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, 398 grpc::internal::CallOpSendMessage, 399 grpc::internal::CallOpServerSendStatus> 400 finish_buf_; 401 }; 402 403 } // namespace grpc 404 405 namespace std { 406 template <class R> 407 class default_delete<grpc::ClientAsyncResponseReader<R>> { 408 public: operator()409 void operator()(void* /*p*/) {} 410 }; 411 template <class R> 412 class default_delete<grpc::ClientAsyncResponseReaderInterface<R>> { 413 public: operator()414 void operator()(void* /*p*/) {} 415 }; 416 } // namespace std 417 418 #endif // GRPCPP_SUPPORT_ASYNC_UNARY_CALL_H 419