1 // 2 // 3 // Copyright 2019 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 // 17 18 #ifndef GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H 19 #define GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H 20 21 #include <grpc/grpc.h> 22 #include <grpc/impl/call.h> 23 #include <grpc/support/log.h> 24 #include <grpcpp/impl/rpc_service_method.h> 25 #include <grpcpp/server_context.h> 26 #include <grpcpp/support/message_allocator.h> 27 #include <grpcpp/support/server_callback.h> 28 #include <grpcpp/support/status.h> 29 30 namespace grpc { 31 namespace internal { 32 33 template <class RequestType, class ResponseType> 34 class CallbackUnaryHandler : public grpc::internal::MethodHandler { 35 public: CallbackUnaryHandler(std::function<ServerUnaryReactor * (grpc::CallbackServerContext *,const RequestType *,ResponseType *)> get_reactor)36 explicit CallbackUnaryHandler( 37 std::function<ServerUnaryReactor*(grpc::CallbackServerContext*, 38 const RequestType*, ResponseType*)> 39 get_reactor) 40 : get_reactor_(std::move(get_reactor)) {} 41 SetMessageAllocator(MessageAllocator<RequestType,ResponseType> * allocator)42 void SetMessageAllocator( 43 MessageAllocator<RequestType, ResponseType>* allocator) { 44 allocator_ = allocator; 45 } 46 RunHandler(const HandlerParameter & param)47 void RunHandler(const HandlerParameter& param) final { 48 // Arena allocate a controller structure (that includes request/response) 49 grpc_call_ref(param.call->call()); 50 auto* allocator_state = 51 static_cast<MessageHolder<RequestType, ResponseType>*>( 52 param.internal_data); 53 54 auto* call = new (grpc_call_arena_alloc(param.call->call(), 55 sizeof(ServerCallbackUnaryImpl))) 56 ServerCallbackUnaryImpl( 57 static_cast<grpc::CallbackServerContext*>(param.server_context), 58 param.call, allocator_state, param.call_requester); 59 param.server_context->BeginCompletionOp( 60 param.call, [call](bool) { call->MaybeDone(); }, call); 61 62 ServerUnaryReactor* reactor = nullptr; 63 if (param.status.ok()) { 64 reactor = grpc::internal::CatchingReactorGetter<ServerUnaryReactor>( 65 get_reactor_, 66 static_cast<grpc::CallbackServerContext*>(param.server_context), 67 call->request(), call->response()); 68 } 69 70 if (reactor == nullptr) { 71 // if deserialization or reactor creator failed, we need to fail the call 72 reactor = new (grpc_call_arena_alloc(param.call->call(), 73 sizeof(UnimplementedUnaryReactor))) 74 UnimplementedUnaryReactor( 75 grpc::Status(grpc::StatusCode::UNIMPLEMENTED, "")); 76 } 77 78 /// Invoke SetupReactor as the last part of the handler 79 call->SetupReactor(reactor); 80 } 81 Deserialize(grpc_call * call,grpc_byte_buffer * req,grpc::Status * status,void ** handler_data)82 void* Deserialize(grpc_call* call, grpc_byte_buffer* req, 83 grpc::Status* status, void** handler_data) final { 84 grpc::ByteBuffer buf; 85 buf.set_buffer(req); 86 RequestType* request = nullptr; 87 MessageHolder<RequestType, ResponseType>* allocator_state; 88 if (allocator_ != nullptr) { 89 allocator_state = allocator_->AllocateMessages(); 90 } else { 91 allocator_state = new (grpc_call_arena_alloc( 92 call, sizeof(DefaultMessageHolder<RequestType, ResponseType>))) 93 DefaultMessageHolder<RequestType, ResponseType>(); 94 } 95 *handler_data = allocator_state; 96 request = allocator_state->request(); 97 *status = 98 grpc::SerializationTraits<RequestType>::Deserialize(&buf, request); 99 buf.Release(); 100 if (status->ok()) { 101 return request; 102 } 103 return nullptr; 104 } 105 106 private: 107 std::function<ServerUnaryReactor*(grpc::CallbackServerContext*, 108 const RequestType*, ResponseType*)> 109 get_reactor_; 110 MessageAllocator<RequestType, ResponseType>* allocator_ = nullptr; 111 112 class ServerCallbackUnaryImpl : public ServerCallbackUnary { 113 public: Finish(grpc::Status s)114 void Finish(grpc::Status s) override { 115 // A callback that only contains a call to MaybeDone can be run as an 116 // inline callback regardless of whether or not OnDone is inlineable 117 // because if the actual OnDone callback needs to be scheduled, MaybeDone 118 // is responsible for dispatching to an executor thread if needed. Thus, 119 // when setting up the finish_tag_, we can set its own callback to 120 // inlineable. 121 finish_tag_.Set( 122 call_.call(), 123 [this](bool) { 124 this->MaybeDone( 125 reactor_.load(std::memory_order_relaxed)->InternalInlineable()); 126 }, 127 &finish_ops_, /*can_inline=*/true); 128 finish_ops_.set_core_cq_tag(&finish_tag_); 129 130 if (!ctx_->sent_initial_metadata_) { 131 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 132 ctx_->initial_metadata_flags()); 133 if (ctx_->compression_level_set()) { 134 finish_ops_.set_compression_level(ctx_->compression_level()); 135 } 136 ctx_->sent_initial_metadata_ = true; 137 } 138 // The response is dropped if the status is not OK. 139 if (s.ok()) { 140 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, 141 finish_ops_.SendMessagePtr(response())); 142 } else { 143 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); 144 } 145 finish_ops_.set_core_cq_tag(&finish_tag_); 146 call_.PerformOps(&finish_ops_); 147 } 148 SendInitialMetadata()149 void SendInitialMetadata() override { 150 GPR_ASSERT(!ctx_->sent_initial_metadata_); 151 this->Ref(); 152 // The callback for this function should not be marked inline because it 153 // is directly invoking a user-controlled reaction 154 // (OnSendInitialMetadataDone). Thus it must be dispatched to an executor 155 // thread. However, any OnDone needed after that can be inlined because it 156 // is already running on an executor thread. 157 meta_tag_.Set( 158 call_.call(), 159 [this](bool ok) { 160 ServerUnaryReactor* reactor = 161 reactor_.load(std::memory_order_relaxed); 162 reactor->OnSendInitialMetadataDone(ok); 163 this->MaybeDone(/*inlineable_ondone=*/true); 164 }, 165 &meta_ops_, /*can_inline=*/false); 166 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 167 ctx_->initial_metadata_flags()); 168 if (ctx_->compression_level_set()) { 169 meta_ops_.set_compression_level(ctx_->compression_level()); 170 } 171 ctx_->sent_initial_metadata_ = true; 172 meta_ops_.set_core_cq_tag(&meta_tag_); 173 call_.PerformOps(&meta_ops_); 174 } 175 176 private: 177 friend class CallbackUnaryHandler<RequestType, ResponseType>; 178 ServerCallbackUnaryImpl(grpc::CallbackServerContext * ctx,grpc::internal::Call * call,MessageHolder<RequestType,ResponseType> * allocator_state,std::function<void ()> call_requester)179 ServerCallbackUnaryImpl( 180 grpc::CallbackServerContext* ctx, grpc::internal::Call* call, 181 MessageHolder<RequestType, ResponseType>* allocator_state, 182 std::function<void()> call_requester) 183 : ctx_(ctx), 184 call_(*call), 185 allocator_state_(allocator_state), 186 call_requester_(std::move(call_requester)) { 187 ctx_->set_message_allocator_state(allocator_state); 188 } 189 call()190 grpc_call* call() override { return call_.call(); } 191 192 /// SetupReactor binds the reactor (which also releases any queued 193 /// operations), maybe calls OnCancel if possible/needed, and maybe marks 194 /// the completion of the RPC. This should be the last component of the 195 /// handler. SetupReactor(ServerUnaryReactor * reactor)196 void SetupReactor(ServerUnaryReactor* reactor) { 197 reactor_.store(reactor, std::memory_order_relaxed); 198 this->BindReactor(reactor); 199 this->MaybeCallOnCancel(reactor); 200 this->MaybeDone(reactor->InternalInlineable()); 201 } 202 request()203 const RequestType* request() { return allocator_state_->request(); } response()204 ResponseType* response() { return allocator_state_->response(); } 205 CallOnDone()206 void CallOnDone() override { 207 reactor_.load(std::memory_order_relaxed)->OnDone(); 208 grpc_call* call = call_.call(); 209 auto call_requester = std::move(call_requester_); 210 allocator_state_->Release(); 211 if (ctx_->context_allocator() != nullptr) { 212 ctx_->context_allocator()->Release(ctx_); 213 } 214 this->~ServerCallbackUnaryImpl(); // explicitly call destructor 215 grpc_call_unref(call); 216 call_requester(); 217 } 218 reactor()219 ServerReactor* reactor() override { 220 return reactor_.load(std::memory_order_relaxed); 221 } 222 223 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> 224 meta_ops_; 225 grpc::internal::CallbackWithSuccessTag meta_tag_; 226 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, 227 grpc::internal::CallOpSendMessage, 228 grpc::internal::CallOpServerSendStatus> 229 finish_ops_; 230 grpc::internal::CallbackWithSuccessTag finish_tag_; 231 232 grpc::CallbackServerContext* const ctx_; 233 grpc::internal::Call call_; 234 MessageHolder<RequestType, ResponseType>* const allocator_state_; 235 std::function<void()> call_requester_; 236 // reactor_ can always be loaded/stored with relaxed memory ordering because 237 // its value is only set once, independently of other data in the object, 238 // and the loads that use it will always actually come provably later even 239 // though they are from different threads since they are triggered by 240 // actions initiated only by the setting up of the reactor_ variable. In 241 // a sense, it's a delayed "const": it gets its value from the SetupReactor 242 // method (not the constructor, so it's not a true const), but it doesn't 243 // change after that and it only gets used by actions caused, directly or 244 // indirectly, by that setup. This comment also applies to the reactor_ 245 // variables of the other streaming objects in this file. 246 std::atomic<ServerUnaryReactor*> reactor_; 247 // callbacks_outstanding_ follows a refcount pattern 248 std::atomic<intptr_t> callbacks_outstanding_{ 249 3}; // reserve for start, Finish, and CompletionOp 250 }; 251 }; 252 253 template <class RequestType, class ResponseType> 254 class CallbackClientStreamingHandler : public grpc::internal::MethodHandler { 255 public: CallbackClientStreamingHandler(std::function<ServerReadReactor<RequestType> * (grpc::CallbackServerContext *,ResponseType *)> get_reactor)256 explicit CallbackClientStreamingHandler( 257 std::function<ServerReadReactor<RequestType>*( 258 grpc::CallbackServerContext*, ResponseType*)> 259 get_reactor) 260 : get_reactor_(std::move(get_reactor)) {} RunHandler(const HandlerParameter & param)261 void RunHandler(const HandlerParameter& param) final { 262 // Arena allocate a reader structure (that includes response) 263 grpc_call_ref(param.call->call()); 264 265 auto* reader = new (grpc_call_arena_alloc(param.call->call(), 266 sizeof(ServerCallbackReaderImpl))) 267 ServerCallbackReaderImpl( 268 static_cast<grpc::CallbackServerContext*>(param.server_context), 269 param.call, param.call_requester); 270 // Inlineable OnDone can be false in the CompletionOp callback because there 271 // is no read reactor that has an inlineable OnDone; this only applies to 272 // the DefaultReactor (which is unary). 273 param.server_context->BeginCompletionOp( 274 param.call, 275 [reader](bool) { reader->MaybeDone(/*inlineable_ondone=*/false); }, 276 reader); 277 278 ServerReadReactor<RequestType>* reactor = nullptr; 279 if (param.status.ok()) { 280 reactor = 281 grpc::internal::CatchingReactorGetter<ServerReadReactor<RequestType>>( 282 get_reactor_, 283 static_cast<grpc::CallbackServerContext*>(param.server_context), 284 reader->response()); 285 } 286 287 if (reactor == nullptr) { 288 // if deserialization or reactor creator failed, we need to fail the call 289 reactor = new (grpc_call_arena_alloc( 290 param.call->call(), sizeof(UnimplementedReadReactor<RequestType>))) 291 UnimplementedReadReactor<RequestType>( 292 grpc::Status(grpc::StatusCode::UNIMPLEMENTED, "")); 293 } 294 295 reader->SetupReactor(reactor); 296 } 297 298 private: 299 std::function<ServerReadReactor<RequestType>*(grpc::CallbackServerContext*, 300 ResponseType*)> 301 get_reactor_; 302 303 class ServerCallbackReaderImpl : public ServerCallbackReader<RequestType> { 304 public: Finish(grpc::Status s)305 void Finish(grpc::Status s) override { 306 // A finish tag with only MaybeDone can have its callback inlined 307 // regardless even if OnDone is not inlineable because this callback just 308 // checks a ref and then decides whether or not to dispatch OnDone. 309 finish_tag_.Set( 310 call_.call(), 311 [this](bool) { 312 // Inlineable OnDone can be false here because there is 313 // no read reactor that has an inlineable OnDone; this 314 // only applies to the DefaultReactor (which is unary). 315 this->MaybeDone(/*inlineable_ondone=*/false); 316 }, 317 &finish_ops_, /*can_inline=*/true); 318 if (!ctx_->sent_initial_metadata_) { 319 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 320 ctx_->initial_metadata_flags()); 321 if (ctx_->compression_level_set()) { 322 finish_ops_.set_compression_level(ctx_->compression_level()); 323 } 324 ctx_->sent_initial_metadata_ = true; 325 } 326 // The response is dropped if the status is not OK. 327 if (s.ok()) { 328 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, 329 finish_ops_.SendMessagePtr(&resp_)); 330 } else { 331 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); 332 } 333 finish_ops_.set_core_cq_tag(&finish_tag_); 334 call_.PerformOps(&finish_ops_); 335 } 336 SendInitialMetadata()337 void SendInitialMetadata() override { 338 GPR_ASSERT(!ctx_->sent_initial_metadata_); 339 this->Ref(); 340 // The callback for this function should not be inlined because it invokes 341 // a user-controlled reaction, but any resulting OnDone can be inlined in 342 // the executor to which this callback is dispatched. 343 meta_tag_.Set( 344 call_.call(), 345 [this](bool ok) { 346 ServerReadReactor<RequestType>* reactor = 347 reactor_.load(std::memory_order_relaxed); 348 reactor->OnSendInitialMetadataDone(ok); 349 this->MaybeDone(/*inlineable_ondone=*/true); 350 }, 351 &meta_ops_, /*can_inline=*/false); 352 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 353 ctx_->initial_metadata_flags()); 354 if (ctx_->compression_level_set()) { 355 meta_ops_.set_compression_level(ctx_->compression_level()); 356 } 357 ctx_->sent_initial_metadata_ = true; 358 meta_ops_.set_core_cq_tag(&meta_tag_); 359 call_.PerformOps(&meta_ops_); 360 } 361 Read(RequestType * req)362 void Read(RequestType* req) override { 363 this->Ref(); 364 read_ops_.RecvMessage(req); 365 call_.PerformOps(&read_ops_); 366 } 367 368 private: 369 friend class CallbackClientStreamingHandler<RequestType, ResponseType>; 370 ServerCallbackReaderImpl(grpc::CallbackServerContext * ctx,grpc::internal::Call * call,std::function<void ()> call_requester)371 ServerCallbackReaderImpl(grpc::CallbackServerContext* ctx, 372 grpc::internal::Call* call, 373 std::function<void()> call_requester) 374 : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {} 375 call()376 grpc_call* call() override { return call_.call(); } 377 SetupReactor(ServerReadReactor<RequestType> * reactor)378 void SetupReactor(ServerReadReactor<RequestType>* reactor) { 379 reactor_.store(reactor, std::memory_order_relaxed); 380 // The callback for this function should not be inlined because it invokes 381 // a user-controlled reaction, but any resulting OnDone can be inlined in 382 // the executor to which this callback is dispatched. 383 read_tag_.Set( 384 call_.call(), 385 [this, reactor](bool ok) { 386 if (GPR_UNLIKELY(!ok)) { 387 ctx_->MaybeMarkCancelledOnRead(); 388 } 389 reactor->OnReadDone(ok); 390 this->MaybeDone(/*inlineable_ondone=*/true); 391 }, 392 &read_ops_, /*can_inline=*/false); 393 read_ops_.set_core_cq_tag(&read_tag_); 394 this->BindReactor(reactor); 395 this->MaybeCallOnCancel(reactor); 396 // Inlineable OnDone can be false here because there is no read 397 // reactor that has an inlineable OnDone; this only applies to the 398 // DefaultReactor (which is unary). 399 this->MaybeDone(/*inlineable_ondone=*/false); 400 } 401 ~ServerCallbackReaderImpl()402 ~ServerCallbackReaderImpl() {} 403 response()404 ResponseType* response() { return &resp_; } 405 CallOnDone()406 void CallOnDone() override { 407 reactor_.load(std::memory_order_relaxed)->OnDone(); 408 grpc_call* call = call_.call(); 409 auto call_requester = std::move(call_requester_); 410 if (ctx_->context_allocator() != nullptr) { 411 ctx_->context_allocator()->Release(ctx_); 412 } 413 this->~ServerCallbackReaderImpl(); // explicitly call destructor 414 grpc_call_unref(call); 415 call_requester(); 416 } 417 reactor()418 ServerReactor* reactor() override { 419 return reactor_.load(std::memory_order_relaxed); 420 } 421 422 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> 423 meta_ops_; 424 grpc::internal::CallbackWithSuccessTag meta_tag_; 425 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, 426 grpc::internal::CallOpSendMessage, 427 grpc::internal::CallOpServerSendStatus> 428 finish_ops_; 429 grpc::internal::CallbackWithSuccessTag finish_tag_; 430 grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<RequestType>> 431 read_ops_; 432 grpc::internal::CallbackWithSuccessTag read_tag_; 433 434 grpc::CallbackServerContext* const ctx_; 435 grpc::internal::Call call_; 436 ResponseType resp_; 437 std::function<void()> call_requester_; 438 // The memory ordering of reactor_ follows ServerCallbackUnaryImpl. 439 std::atomic<ServerReadReactor<RequestType>*> reactor_; 440 // callbacks_outstanding_ follows a refcount pattern 441 std::atomic<intptr_t> callbacks_outstanding_{ 442 3}; // reserve for OnStarted, Finish, and CompletionOp 443 }; 444 }; 445 446 template <class RequestType, class ResponseType> 447 class CallbackServerStreamingHandler : public grpc::internal::MethodHandler { 448 public: CallbackServerStreamingHandler(std::function<ServerWriteReactor<ResponseType> * (grpc::CallbackServerContext *,const RequestType *)> get_reactor)449 explicit CallbackServerStreamingHandler( 450 std::function<ServerWriteReactor<ResponseType>*( 451 grpc::CallbackServerContext*, const RequestType*)> 452 get_reactor) 453 : get_reactor_(std::move(get_reactor)) {} RunHandler(const HandlerParameter & param)454 void RunHandler(const HandlerParameter& param) final { 455 // Arena allocate a writer structure 456 grpc_call_ref(param.call->call()); 457 458 auto* writer = new (grpc_call_arena_alloc(param.call->call(), 459 sizeof(ServerCallbackWriterImpl))) 460 ServerCallbackWriterImpl( 461 static_cast<grpc::CallbackServerContext*>(param.server_context), 462 param.call, static_cast<RequestType*>(param.request), 463 param.call_requester); 464 // Inlineable OnDone can be false in the CompletionOp callback because there 465 // is no write reactor that has an inlineable OnDone; this only applies to 466 // the DefaultReactor (which is unary). 467 param.server_context->BeginCompletionOp( 468 param.call, 469 [writer](bool) { writer->MaybeDone(/*inlineable_ondone=*/false); }, 470 writer); 471 472 ServerWriteReactor<ResponseType>* reactor = nullptr; 473 if (param.status.ok()) { 474 reactor = grpc::internal::CatchingReactorGetter< 475 ServerWriteReactor<ResponseType>>( 476 get_reactor_, 477 static_cast<grpc::CallbackServerContext*>(param.server_context), 478 writer->request()); 479 } 480 if (reactor == nullptr) { 481 // if deserialization or reactor creator failed, we need to fail the call 482 reactor = new (grpc_call_arena_alloc( 483 param.call->call(), sizeof(UnimplementedWriteReactor<ResponseType>))) 484 UnimplementedWriteReactor<ResponseType>( 485 grpc::Status(grpc::StatusCode::UNIMPLEMENTED, "")); 486 } 487 488 writer->SetupReactor(reactor); 489 } 490 Deserialize(grpc_call * call,grpc_byte_buffer * req,grpc::Status * status,void **)491 void* Deserialize(grpc_call* call, grpc_byte_buffer* req, 492 grpc::Status* status, void** /*handler_data*/) final { 493 grpc::ByteBuffer buf; 494 buf.set_buffer(req); 495 auto* request = 496 new (grpc_call_arena_alloc(call, sizeof(RequestType))) RequestType(); 497 *status = 498 grpc::SerializationTraits<RequestType>::Deserialize(&buf, request); 499 buf.Release(); 500 if (status->ok()) { 501 return request; 502 } 503 request->~RequestType(); 504 return nullptr; 505 } 506 507 private: 508 std::function<ServerWriteReactor<ResponseType>*(grpc::CallbackServerContext*, 509 const RequestType*)> 510 get_reactor_; 511 512 class ServerCallbackWriterImpl : public ServerCallbackWriter<ResponseType> { 513 public: Finish(grpc::Status s)514 void Finish(grpc::Status s) override { 515 // A finish tag with only MaybeDone can have its callback inlined 516 // regardless even if OnDone is not inlineable because this callback just 517 // checks a ref and then decides whether or not to dispatch OnDone. 518 finish_tag_.Set( 519 call_.call(), 520 [this](bool) { 521 // Inlineable OnDone can be false here because there is 522 // no write reactor that has an inlineable OnDone; this 523 // only applies to the DefaultReactor (which is unary). 524 this->MaybeDone(/*inlineable_ondone=*/false); 525 }, 526 &finish_ops_, /*can_inline=*/true); 527 finish_ops_.set_core_cq_tag(&finish_tag_); 528 529 if (!ctx_->sent_initial_metadata_) { 530 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 531 ctx_->initial_metadata_flags()); 532 if (ctx_->compression_level_set()) { 533 finish_ops_.set_compression_level(ctx_->compression_level()); 534 } 535 ctx_->sent_initial_metadata_ = true; 536 } 537 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); 538 call_.PerformOps(&finish_ops_); 539 } 540 SendInitialMetadata()541 void SendInitialMetadata() override { 542 GPR_ASSERT(!ctx_->sent_initial_metadata_); 543 this->Ref(); 544 // The callback for this function should not be inlined because it invokes 545 // a user-controlled reaction, but any resulting OnDone can be inlined in 546 // the executor to which this callback is dispatched. 547 meta_tag_.Set( 548 call_.call(), 549 [this](bool ok) { 550 ServerWriteReactor<ResponseType>* reactor = 551 reactor_.load(std::memory_order_relaxed); 552 reactor->OnSendInitialMetadataDone(ok); 553 this->MaybeDone(/*inlineable_ondone=*/true); 554 }, 555 &meta_ops_, /*can_inline=*/false); 556 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 557 ctx_->initial_metadata_flags()); 558 if (ctx_->compression_level_set()) { 559 meta_ops_.set_compression_level(ctx_->compression_level()); 560 } 561 ctx_->sent_initial_metadata_ = true; 562 meta_ops_.set_core_cq_tag(&meta_tag_); 563 call_.PerformOps(&meta_ops_); 564 } 565 Write(const ResponseType * resp,grpc::WriteOptions options)566 void Write(const ResponseType* resp, grpc::WriteOptions options) override { 567 this->Ref(); 568 if (options.is_last_message()) { 569 options.set_buffer_hint(); 570 } 571 if (!ctx_->sent_initial_metadata_) { 572 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 573 ctx_->initial_metadata_flags()); 574 if (ctx_->compression_level_set()) { 575 write_ops_.set_compression_level(ctx_->compression_level()); 576 } 577 ctx_->sent_initial_metadata_ = true; 578 } 579 // TODO(vjpai): don't assert 580 GPR_ASSERT(write_ops_.SendMessagePtr(resp, options).ok()); 581 call_.PerformOps(&write_ops_); 582 } 583 WriteAndFinish(const ResponseType * resp,grpc::WriteOptions options,grpc::Status s)584 void WriteAndFinish(const ResponseType* resp, grpc::WriteOptions options, 585 grpc::Status s) override { 586 // This combines the write into the finish callback 587 // TODO(vjpai): don't assert 588 GPR_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok()); 589 Finish(std::move(s)); 590 } 591 592 private: 593 friend class CallbackServerStreamingHandler<RequestType, ResponseType>; 594 ServerCallbackWriterImpl(grpc::CallbackServerContext * ctx,grpc::internal::Call * call,const RequestType * req,std::function<void ()> call_requester)595 ServerCallbackWriterImpl(grpc::CallbackServerContext* ctx, 596 grpc::internal::Call* call, const RequestType* req, 597 std::function<void()> call_requester) 598 : ctx_(ctx), 599 call_(*call), 600 req_(req), 601 call_requester_(std::move(call_requester)) {} 602 call()603 grpc_call* call() override { return call_.call(); } 604 SetupReactor(ServerWriteReactor<ResponseType> * reactor)605 void SetupReactor(ServerWriteReactor<ResponseType>* reactor) { 606 reactor_.store(reactor, std::memory_order_relaxed); 607 // The callback for this function should not be inlined because it invokes 608 // a user-controlled reaction, but any resulting OnDone can be inlined in 609 // the executor to which this callback is dispatched. 610 write_tag_.Set( 611 call_.call(), 612 [this, reactor](bool ok) { 613 reactor->OnWriteDone(ok); 614 this->MaybeDone(/*inlineable_ondone=*/true); 615 }, 616 &write_ops_, /*can_inline=*/false); 617 write_ops_.set_core_cq_tag(&write_tag_); 618 this->BindReactor(reactor); 619 this->MaybeCallOnCancel(reactor); 620 // Inlineable OnDone can be false here because there is no write 621 // reactor that has an inlineable OnDone; this only applies to the 622 // DefaultReactor (which is unary). 623 this->MaybeDone(/*inlineable_ondone=*/false); 624 } ~ServerCallbackWriterImpl()625 ~ServerCallbackWriterImpl() { 626 if (req_ != nullptr) { 627 req_->~RequestType(); 628 } 629 } 630 request()631 const RequestType* request() { return req_; } 632 CallOnDone()633 void CallOnDone() override { 634 reactor_.load(std::memory_order_relaxed)->OnDone(); 635 grpc_call* call = call_.call(); 636 auto call_requester = std::move(call_requester_); 637 if (ctx_->context_allocator() != nullptr) { 638 ctx_->context_allocator()->Release(ctx_); 639 } 640 this->~ServerCallbackWriterImpl(); // explicitly call destructor 641 grpc_call_unref(call); 642 call_requester(); 643 } 644 reactor()645 ServerReactor* reactor() override { 646 return reactor_.load(std::memory_order_relaxed); 647 } 648 649 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> 650 meta_ops_; 651 grpc::internal::CallbackWithSuccessTag meta_tag_; 652 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, 653 grpc::internal::CallOpSendMessage, 654 grpc::internal::CallOpServerSendStatus> 655 finish_ops_; 656 grpc::internal::CallbackWithSuccessTag finish_tag_; 657 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, 658 grpc::internal::CallOpSendMessage> 659 write_ops_; 660 grpc::internal::CallbackWithSuccessTag write_tag_; 661 662 grpc::CallbackServerContext* const ctx_; 663 grpc::internal::Call call_; 664 const RequestType* req_; 665 std::function<void()> call_requester_; 666 // The memory ordering of reactor_ follows ServerCallbackUnaryImpl. 667 std::atomic<ServerWriteReactor<ResponseType>*> reactor_; 668 // callbacks_outstanding_ follows a refcount pattern 669 std::atomic<intptr_t> callbacks_outstanding_{ 670 3}; // reserve for OnStarted, Finish, and CompletionOp 671 }; 672 }; 673 674 template <class RequestType, class ResponseType> 675 class CallbackBidiHandler : public grpc::internal::MethodHandler { 676 public: CallbackBidiHandler(std::function<ServerBidiReactor<RequestType,ResponseType> * (grpc::CallbackServerContext *)> get_reactor)677 explicit CallbackBidiHandler( 678 std::function<ServerBidiReactor<RequestType, ResponseType>*( 679 grpc::CallbackServerContext*)> 680 get_reactor) 681 : get_reactor_(std::move(get_reactor)) {} RunHandler(const HandlerParameter & param)682 void RunHandler(const HandlerParameter& param) final { 683 grpc_call_ref(param.call->call()); 684 685 auto* stream = new (grpc_call_arena_alloc( 686 param.call->call(), sizeof(ServerCallbackReaderWriterImpl))) 687 ServerCallbackReaderWriterImpl( 688 static_cast<grpc::CallbackServerContext*>(param.server_context), 689 param.call, param.call_requester); 690 // Inlineable OnDone can be false in the CompletionOp callback because there 691 // is no bidi reactor that has an inlineable OnDone; this only applies to 692 // the DefaultReactor (which is unary). 693 param.server_context->BeginCompletionOp( 694 param.call, 695 [stream](bool) { stream->MaybeDone(/*inlineable_ondone=*/false); }, 696 stream); 697 698 ServerBidiReactor<RequestType, ResponseType>* reactor = nullptr; 699 if (param.status.ok()) { 700 reactor = grpc::internal::CatchingReactorGetter< 701 ServerBidiReactor<RequestType, ResponseType>>( 702 get_reactor_, 703 static_cast<grpc::CallbackServerContext*>(param.server_context)); 704 } 705 706 if (reactor == nullptr) { 707 // if deserialization or reactor creator failed, we need to fail the call 708 reactor = new (grpc_call_arena_alloc( 709 param.call->call(), 710 sizeof(UnimplementedBidiReactor<RequestType, ResponseType>))) 711 UnimplementedBidiReactor<RequestType, ResponseType>( 712 grpc::Status(grpc::StatusCode::UNIMPLEMENTED, "")); 713 } 714 715 stream->SetupReactor(reactor); 716 } 717 718 private: 719 std::function<ServerBidiReactor<RequestType, ResponseType>*( 720 grpc::CallbackServerContext*)> 721 get_reactor_; 722 723 class ServerCallbackReaderWriterImpl 724 : public ServerCallbackReaderWriter<RequestType, ResponseType> { 725 public: Finish(grpc::Status s)726 void Finish(grpc::Status s) override { 727 // A finish tag with only MaybeDone can have its callback inlined 728 // regardless even if OnDone is not inlineable because this callback just 729 // checks a ref and then decides whether or not to dispatch OnDone. 730 finish_tag_.Set( 731 call_.call(), 732 [this](bool) { 733 // Inlineable OnDone can be false here because there is 734 // no bidi reactor that has an inlineable OnDone; this 735 // only applies to the DefaultReactor (which is unary). 736 this->MaybeDone(/*inlineable_ondone=*/false); 737 }, 738 &finish_ops_, /*can_inline=*/true); 739 finish_ops_.set_core_cq_tag(&finish_tag_); 740 741 if (!ctx_->sent_initial_metadata_) { 742 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 743 ctx_->initial_metadata_flags()); 744 if (ctx_->compression_level_set()) { 745 finish_ops_.set_compression_level(ctx_->compression_level()); 746 } 747 ctx_->sent_initial_metadata_ = true; 748 } 749 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); 750 call_.PerformOps(&finish_ops_); 751 } 752 SendInitialMetadata()753 void SendInitialMetadata() override { 754 GPR_ASSERT(!ctx_->sent_initial_metadata_); 755 this->Ref(); 756 // The callback for this function should not be inlined because it invokes 757 // a user-controlled reaction, but any resulting OnDone can be inlined in 758 // the executor to which this callback is dispatched. 759 meta_tag_.Set( 760 call_.call(), 761 [this](bool ok) { 762 ServerBidiReactor<RequestType, ResponseType>* reactor = 763 reactor_.load(std::memory_order_relaxed); 764 reactor->OnSendInitialMetadataDone(ok); 765 this->MaybeDone(/*inlineable_ondone=*/true); 766 }, 767 &meta_ops_, /*can_inline=*/false); 768 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 769 ctx_->initial_metadata_flags()); 770 if (ctx_->compression_level_set()) { 771 meta_ops_.set_compression_level(ctx_->compression_level()); 772 } 773 ctx_->sent_initial_metadata_ = true; 774 meta_ops_.set_core_cq_tag(&meta_tag_); 775 call_.PerformOps(&meta_ops_); 776 } 777 Write(const ResponseType * resp,grpc::WriteOptions options)778 void Write(const ResponseType* resp, grpc::WriteOptions options) override { 779 this->Ref(); 780 if (options.is_last_message()) { 781 options.set_buffer_hint(); 782 } 783 if (!ctx_->sent_initial_metadata_) { 784 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 785 ctx_->initial_metadata_flags()); 786 if (ctx_->compression_level_set()) { 787 write_ops_.set_compression_level(ctx_->compression_level()); 788 } 789 ctx_->sent_initial_metadata_ = true; 790 } 791 // TODO(vjpai): don't assert 792 GPR_ASSERT(write_ops_.SendMessagePtr(resp, options).ok()); 793 call_.PerformOps(&write_ops_); 794 } 795 WriteAndFinish(const ResponseType * resp,grpc::WriteOptions options,grpc::Status s)796 void WriteAndFinish(const ResponseType* resp, grpc::WriteOptions options, 797 grpc::Status s) override { 798 // TODO(vjpai): don't assert 799 GPR_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok()); 800 Finish(std::move(s)); 801 } 802 Read(RequestType * req)803 void Read(RequestType* req) override { 804 this->Ref(); 805 read_ops_.RecvMessage(req); 806 call_.PerformOps(&read_ops_); 807 } 808 809 private: 810 friend class CallbackBidiHandler<RequestType, ResponseType>; 811 ServerCallbackReaderWriterImpl(grpc::CallbackServerContext * ctx,grpc::internal::Call * call,std::function<void ()> call_requester)812 ServerCallbackReaderWriterImpl(grpc::CallbackServerContext* ctx, 813 grpc::internal::Call* call, 814 std::function<void()> call_requester) 815 : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {} 816 call()817 grpc_call* call() override { return call_.call(); } 818 SetupReactor(ServerBidiReactor<RequestType,ResponseType> * reactor)819 void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) { 820 reactor_.store(reactor, std::memory_order_relaxed); 821 // The callbacks for these functions should not be inlined because they 822 // invoke user-controlled reactions, but any resulting OnDones can be 823 // inlined in the executor to which a callback is dispatched. 824 write_tag_.Set( 825 call_.call(), 826 [this, reactor](bool ok) { 827 reactor->OnWriteDone(ok); 828 this->MaybeDone(/*inlineable_ondone=*/true); 829 }, 830 &write_ops_, /*can_inline=*/false); 831 write_ops_.set_core_cq_tag(&write_tag_); 832 read_tag_.Set( 833 call_.call(), 834 [this, reactor](bool ok) { 835 if (GPR_UNLIKELY(!ok)) { 836 ctx_->MaybeMarkCancelledOnRead(); 837 } 838 reactor->OnReadDone(ok); 839 this->MaybeDone(/*inlineable_ondone=*/true); 840 }, 841 &read_ops_, /*can_inline=*/false); 842 read_ops_.set_core_cq_tag(&read_tag_); 843 this->BindReactor(reactor); 844 this->MaybeCallOnCancel(reactor); 845 // Inlineable OnDone can be false here because there is no bidi 846 // reactor that has an inlineable OnDone; this only applies to the 847 // DefaultReactor (which is unary). 848 this->MaybeDone(/*inlineable_ondone=*/false); 849 } 850 CallOnDone()851 void CallOnDone() override { 852 reactor_.load(std::memory_order_relaxed)->OnDone(); 853 grpc_call* call = call_.call(); 854 auto call_requester = std::move(call_requester_); 855 if (ctx_->context_allocator() != nullptr) { 856 ctx_->context_allocator()->Release(ctx_); 857 } 858 this->~ServerCallbackReaderWriterImpl(); // explicitly call destructor 859 grpc_call_unref(call); 860 call_requester(); 861 } 862 reactor()863 ServerReactor* reactor() override { 864 return reactor_.load(std::memory_order_relaxed); 865 } 866 867 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> 868 meta_ops_; 869 grpc::internal::CallbackWithSuccessTag meta_tag_; 870 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, 871 grpc::internal::CallOpSendMessage, 872 grpc::internal::CallOpServerSendStatus> 873 finish_ops_; 874 grpc::internal::CallbackWithSuccessTag finish_tag_; 875 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, 876 grpc::internal::CallOpSendMessage> 877 write_ops_; 878 grpc::internal::CallbackWithSuccessTag write_tag_; 879 grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<RequestType>> 880 read_ops_; 881 grpc::internal::CallbackWithSuccessTag read_tag_; 882 883 grpc::CallbackServerContext* const ctx_; 884 grpc::internal::Call call_; 885 std::function<void()> call_requester_; 886 // The memory ordering of reactor_ follows ServerCallbackUnaryImpl. 887 std::atomic<ServerBidiReactor<RequestType, ResponseType>*> reactor_; 888 // callbacks_outstanding_ follows a refcount pattern 889 std::atomic<intptr_t> callbacks_outstanding_{ 890 3}; // reserve for OnStarted, Finish, and CompletionOp 891 }; 892 }; 893 894 } // namespace internal 895 } // namespace grpc 896 897 #endif // GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H 898