1 // 2 // 3 // Copyright 2018 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 // 17 // 18 19 #ifndef GRPCPP_SUPPORT_SERVER_CALLBACK_H 20 #define GRPCPP_SUPPORT_SERVER_CALLBACK_H 21 22 #include <atomic> 23 #include <functional> 24 #include <type_traits> 25 26 #include "absl/functional/any_invocable.h" 27 28 #include <grpc/impl/call.h> 29 #include <grpcpp/impl/call.h> 30 #include <grpcpp/impl/call_op_set.h> 31 #include <grpcpp/impl/sync.h> 32 #include <grpcpp/support/callback_common.h> 33 #include <grpcpp/support/config.h> 34 #include <grpcpp/support/message_allocator.h> 35 #include <grpcpp/support/status.h> 36 37 namespace grpc { 38 39 // Declare base class of all reactors as internal 40 namespace internal { 41 42 // Forward declarations 43 template <class Request, class Response> 44 class CallbackUnaryHandler; 45 template <class Request, class Response> 46 class CallbackClientStreamingHandler; 47 template <class Request, class Response> 48 class CallbackServerStreamingHandler; 49 template <class Request, class Response> 50 class CallbackBidiHandler; 51 52 class ServerReactor { 53 public: 54 virtual ~ServerReactor() = default; 55 virtual void OnDone() = 0; 56 virtual void OnCancel() = 0; 57 58 // The following is not API. It is for internal use only and specifies whether 59 // all reactions of this Reactor can be run without an extra executor 60 // scheduling. This should only be used for internally-defined reactors with 61 // trivial reactions. InternalInlineable()62 virtual bool InternalInlineable() { return false; } 63 64 private: 65 template <class Request, class Response> 66 friend class CallbackUnaryHandler; 67 template <class Request, class Response> 68 friend class CallbackClientStreamingHandler; 69 template <class Request, class Response> 70 friend class CallbackServerStreamingHandler; 71 template <class Request, class Response> 72 friend class CallbackBidiHandler; 73 }; 74 75 /// The base class of ServerCallbackUnary etc. 76 class ServerCallbackCall { 77 public: ~ServerCallbackCall()78 virtual ~ServerCallbackCall() {} 79 80 // This object is responsible for tracking when it is safe to call OnDone and 81 // OnCancel. OnDone should not be called until the method handler is complete, 82 // Finish has been called, the ServerContext CompletionOp (which tracks 83 // cancellation or successful completion) has completed, and all outstanding 84 // Read/Write actions have seen their reactions. OnCancel should not be called 85 // until after the method handler is done and the RPC has completed with a 86 // cancellation. This is tracked by counting how many of these conditions have 87 // been met and calling OnCancel when none remain unmet. 88 89 // Public versions of MaybeDone: one where we don't know the reactor in 90 // advance (used for the ServerContext CompletionOp), and one for where we 91 // know the inlineability of the OnDone reaction. You should set the inline 92 // flag to true if either the Reactor is InternalInlineable() or if this 93 // callback is already being forced to run dispatched to an executor 94 // (typically because it contains additional work than just the MaybeDone). 95 MaybeDone()96 void MaybeDone() { 97 if (GPR_UNLIKELY(Unref() == 1)) { 98 ScheduleOnDone(reactor()->InternalInlineable()); 99 } 100 } 101 MaybeDone(bool inline_ondone)102 void MaybeDone(bool inline_ondone) { 103 if (GPR_UNLIKELY(Unref() == 1)) { 104 ScheduleOnDone(inline_ondone); 105 } 106 } 107 108 // Fast version called with known reactor passed in, used from derived 109 // classes, typically in non-cancel case MaybeCallOnCancel(ServerReactor * reactor)110 void MaybeCallOnCancel(ServerReactor* reactor) { 111 if (GPR_UNLIKELY(UnblockCancellation())) { 112 CallOnCancel(reactor); 113 } 114 } 115 116 // Slower version called from object that doesn't know the reactor a priori 117 // (such as the ServerContext CompletionOp which is formed before the 118 // reactor). This is used in cancel cases only, so it's ok to be slower and 119 // invoke a virtual function. MaybeCallOnCancel()120 void MaybeCallOnCancel() { 121 if (GPR_UNLIKELY(UnblockCancellation())) { 122 CallOnCancel(reactor()); 123 } 124 } 125 126 protected: 127 /// Increases the reference count Ref()128 void Ref() { callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); } 129 130 private: 131 virtual ServerReactor* reactor() = 0; 132 133 virtual grpc_call* call() = 0; 134 RunAsync(absl::AnyInvocable<void ()> cb)135 virtual void RunAsync(absl::AnyInvocable<void()> cb) { 136 grpc_call_run_in_event_engine(call(), std::move(cb)); 137 } 138 139 // CallOnDone performs the work required at completion of the RPC: invoking 140 // the OnDone function and doing all necessary cleanup. This function is only 141 // ever invoked on a fully-Unref'fed ServerCallbackCall. 142 virtual void CallOnDone() = 0; 143 144 // If the OnDone reaction is inlineable, execute it inline. Otherwise send it 145 // to an executor. 146 void ScheduleOnDone(bool inline_ondone); 147 148 // If the OnCancel reaction is inlineable, execute it inline. Otherwise send 149 // it to an executor. 150 void CallOnCancel(ServerReactor* reactor); 151 152 // Implement the cancellation constraint counter. Return true if OnCancel 153 // should be called, false otherwise. UnblockCancellation()154 bool UnblockCancellation() { 155 return on_cancel_conditions_remaining_.fetch_sub( 156 1, std::memory_order_acq_rel) == 1; 157 } 158 159 /// Decreases the reference count and returns the previous value Unref()160 int Unref() { 161 return callbacks_outstanding_.fetch_sub(1, std::memory_order_acq_rel); 162 } 163 164 std::atomic_int on_cancel_conditions_remaining_{2}; 165 std::atomic_int callbacks_outstanding_{ 166 3}; // reserve for start, Finish, and CompletionOp 167 }; 168 169 template <class Request, class Response> 170 class DefaultMessageHolder : public MessageHolder<Request, Response> { 171 public: DefaultMessageHolder()172 DefaultMessageHolder() { 173 this->set_request(&request_obj_); 174 this->set_response(&response_obj_); 175 } Release()176 void Release() override { 177 // the object is allocated in the call arena. 178 this->~DefaultMessageHolder<Request, Response>(); 179 } 180 181 private: 182 Request request_obj_; 183 Response response_obj_; 184 }; 185 186 } // namespace internal 187 188 // Forward declarations 189 class ServerUnaryReactor; 190 template <class Request> 191 class ServerReadReactor; 192 template <class Response> 193 class ServerWriteReactor; 194 template <class Request, class Response> 195 class ServerBidiReactor; 196 197 // NOTE: The actual call/stream object classes are provided as API only to 198 // support mocking. There are no implementations of these class interfaces in 199 // the API. 200 class ServerCallbackUnary : public internal::ServerCallbackCall { 201 public: ~ServerCallbackUnary()202 ~ServerCallbackUnary() override {} 203 virtual void Finish(grpc::Status s) = 0; 204 virtual void SendInitialMetadata() = 0; 205 206 protected: 207 // Use a template rather than explicitly specifying ServerUnaryReactor to 208 // delay binding and avoid a circular forward declaration issue 209 template <class Reactor> BindReactor(Reactor * reactor)210 void BindReactor(Reactor* reactor) { 211 reactor->InternalBindCall(this); 212 } 213 }; 214 215 template <class Request> 216 class ServerCallbackReader : public internal::ServerCallbackCall { 217 public: ~ServerCallbackReader()218 ~ServerCallbackReader() override {} 219 virtual void Finish(grpc::Status s) = 0; 220 virtual void SendInitialMetadata() = 0; 221 virtual void Read(Request* msg) = 0; 222 223 protected: BindReactor(ServerReadReactor<Request> * reactor)224 void BindReactor(ServerReadReactor<Request>* reactor) { 225 reactor->InternalBindReader(this); 226 } 227 }; 228 229 template <class Response> 230 class ServerCallbackWriter : public internal::ServerCallbackCall { 231 public: ~ServerCallbackWriter()232 ~ServerCallbackWriter() override {} 233 234 virtual void Finish(grpc::Status s) = 0; 235 virtual void SendInitialMetadata() = 0; 236 virtual void Write(const Response* msg, grpc::WriteOptions options) = 0; 237 virtual void WriteAndFinish(const Response* msg, grpc::WriteOptions options, 238 grpc::Status s) = 0; 239 240 protected: BindReactor(ServerWriteReactor<Response> * reactor)241 void BindReactor(ServerWriteReactor<Response>* reactor) { 242 reactor->InternalBindWriter(this); 243 } 244 }; 245 246 template <class Request, class Response> 247 class ServerCallbackReaderWriter : public internal::ServerCallbackCall { 248 public: ~ServerCallbackReaderWriter()249 ~ServerCallbackReaderWriter() override {} 250 251 virtual void Finish(grpc::Status s) = 0; 252 virtual void SendInitialMetadata() = 0; 253 virtual void Read(Request* msg) = 0; 254 virtual void Write(const Response* msg, grpc::WriteOptions options) = 0; 255 virtual void WriteAndFinish(const Response* msg, grpc::WriteOptions options, 256 grpc::Status s) = 0; 257 258 protected: BindReactor(ServerBidiReactor<Request,Response> * reactor)259 void BindReactor(ServerBidiReactor<Request, Response>* reactor) { 260 reactor->InternalBindStream(this); 261 } 262 }; 263 264 // The following classes are the reactor interfaces that are to be implemented 265 // by the user, returned as the output parameter of the method handler for a 266 // callback method. Note that none of the classes are pure; all reactions have a 267 // default empty reaction so that the user class only needs to override those 268 // reactions that it cares about. The reaction methods will be invoked by the 269 // library in response to the completion of various operations. Reactions must 270 // not include blocking operations (such as blocking I/O, starting synchronous 271 // RPCs, or waiting on condition variables). Reactions may be invoked 272 // concurrently, except that OnDone is called after all others (assuming proper 273 // API usage). The reactor may not be deleted until OnDone is called. 274 275 /// \a ServerBidiReactor is the interface for a bidirectional streaming RPC. 276 template <class Request, class Response> 277 class ServerBidiReactor : public internal::ServerReactor { 278 public: 279 // NOTE: Initializing stream_ as a constructor initializer rather than a 280 // default initializer because gcc-4.x requires a copy constructor for 281 // default initializing a templated member, which isn't ok for atomic. 282 // TODO(vjpai): Switch to default constructor and default initializer when 283 // gcc-4.x is no longer supported ServerBidiReactor()284 ServerBidiReactor() : stream_(nullptr) {} 285 ~ServerBidiReactor() override = default; 286 287 /// Send any initial metadata stored in the RPC context. If not invoked, 288 /// any initial metadata will be passed along with the first Write or the 289 /// Finish (if there are no writes). StartSendInitialMetadata()290 void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(stream_mu_) { 291 ServerCallbackReaderWriter<Request, Response>* stream = 292 stream_.load(std::memory_order_acquire); 293 if (stream == nullptr) { 294 grpc::internal::MutexLock l(&stream_mu_); 295 stream = stream_.load(std::memory_order_relaxed); 296 if (stream == nullptr) { 297 backlog_.send_initial_metadata_wanted = true; 298 return; 299 } 300 } 301 stream->SendInitialMetadata(); 302 } 303 304 /// Initiate a read operation. 305 /// 306 /// \param[out] req Where to eventually store the read message. Valid when 307 /// the library calls OnReadDone StartRead(Request * req)308 void StartRead(Request* req) ABSL_LOCKS_EXCLUDED(stream_mu_) { 309 ServerCallbackReaderWriter<Request, Response>* stream = 310 stream_.load(std::memory_order_acquire); 311 if (stream == nullptr) { 312 grpc::internal::MutexLock l(&stream_mu_); 313 stream = stream_.load(std::memory_order_relaxed); 314 if (stream == nullptr) { 315 backlog_.read_wanted = req; 316 return; 317 } 318 } 319 stream->Read(req); 320 } 321 322 /// Initiate a write operation. 323 /// 324 /// \param[in] resp The message to be written. The library does not take 325 /// ownership but the caller must ensure that the message is 326 /// not deleted or modified until OnWriteDone is called. StartWrite(const Response * resp)327 void StartWrite(const Response* resp) { 328 StartWrite(resp, grpc::WriteOptions()); 329 } 330 331 /// Initiate a write operation with specified options. 332 /// 333 /// \param[in] resp The message to be written. The library does not take 334 /// ownership but the caller must ensure that the message is 335 /// not deleted or modified until OnWriteDone is called. 336 /// \param[in] options The WriteOptions to use for writing this message StartWrite(const Response * resp,grpc::WriteOptions options)337 void StartWrite(const Response* resp, grpc::WriteOptions options) 338 ABSL_LOCKS_EXCLUDED(stream_mu_) { 339 ServerCallbackReaderWriter<Request, Response>* stream = 340 stream_.load(std::memory_order_acquire); 341 if (stream == nullptr) { 342 grpc::internal::MutexLock l(&stream_mu_); 343 stream = stream_.load(std::memory_order_relaxed); 344 if (stream == nullptr) { 345 backlog_.write_wanted = resp; 346 backlog_.write_options_wanted = options; 347 return; 348 } 349 } 350 stream->Write(resp, options); 351 } 352 353 /// Initiate a write operation with specified options and final RPC Status, 354 /// which also causes any trailing metadata for this RPC to be sent out. 355 /// StartWriteAndFinish is like merging StartWriteLast and Finish into a 356 /// single step. A key difference, though, is that this operation doesn't have 357 /// an OnWriteDone reaction - it is considered complete only when OnDone is 358 /// available. An RPC can either have StartWriteAndFinish or Finish, but not 359 /// both. 360 /// 361 /// \param[in] resp The message to be written. The library does not take 362 /// ownership but the caller must ensure that the message is 363 /// not deleted or modified until OnDone is called. 364 /// \param[in] options The WriteOptions to use for writing this message 365 /// \param[in] s The status outcome of this RPC StartWriteAndFinish(const Response * resp,grpc::WriteOptions options,grpc::Status s)366 void StartWriteAndFinish(const Response* resp, grpc::WriteOptions options, 367 grpc::Status s) ABSL_LOCKS_EXCLUDED(stream_mu_) { 368 ServerCallbackReaderWriter<Request, Response>* stream = 369 stream_.load(std::memory_order_acquire); 370 if (stream == nullptr) { 371 grpc::internal::MutexLock l(&stream_mu_); 372 stream = stream_.load(std::memory_order_relaxed); 373 if (stream == nullptr) { 374 backlog_.write_and_finish_wanted = true; 375 backlog_.write_wanted = resp; 376 backlog_.write_options_wanted = options; 377 backlog_.status_wanted = std::move(s); 378 return; 379 } 380 } 381 stream->WriteAndFinish(resp, options, std::move(s)); 382 } 383 384 /// Inform system of a planned write operation with specified options, but 385 /// allow the library to schedule the actual write coalesced with the writing 386 /// of trailing metadata (which takes place on a Finish call). 387 /// 388 /// \param[in] resp The message to be written. The library does not take 389 /// ownership but the caller must ensure that the message is 390 /// not deleted or modified until OnWriteDone is called. 391 /// \param[in] options The WriteOptions to use for writing this message StartWriteLast(const Response * resp,grpc::WriteOptions options)392 void StartWriteLast(const Response* resp, grpc::WriteOptions options) { 393 StartWrite(resp, options.set_last_message()); 394 } 395 396 /// Indicate that the stream is to be finished and the trailing metadata and 397 /// RPC status are to be sent. Every RPC MUST be finished using either Finish 398 /// or StartWriteAndFinish (but not both), even if the RPC is already 399 /// cancelled. 400 /// 401 /// \param[in] s The status outcome of this RPC Finish(grpc::Status s)402 void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(stream_mu_) { 403 ServerCallbackReaderWriter<Request, Response>* stream = 404 stream_.load(std::memory_order_acquire); 405 if (stream == nullptr) { 406 grpc::internal::MutexLock l(&stream_mu_); 407 stream = stream_.load(std::memory_order_relaxed); 408 if (stream == nullptr) { 409 backlog_.finish_wanted = true; 410 backlog_.status_wanted = std::move(s); 411 return; 412 } 413 } 414 stream->Finish(std::move(s)); 415 } 416 417 /// Notifies the application that an explicit StartSendInitialMetadata 418 /// operation completed. Not used when the sending of initial metadata 419 /// piggybacks onto the first write. 420 /// 421 /// \param[in] ok Was it successful? If false, no further write-side operation 422 /// will succeed. OnSendInitialMetadataDone(bool)423 virtual void OnSendInitialMetadataDone(bool /*ok*/) {} 424 425 /// Notifies the application that a StartRead operation completed. 426 /// 427 /// \param[in] ok Was it successful? If false, no further read-side operation 428 /// will succeed. OnReadDone(bool)429 virtual void OnReadDone(bool /*ok*/) {} 430 431 /// Notifies the application that a StartWrite (or StartWriteLast) operation 432 /// completed. 433 /// 434 /// \param[in] ok Was it successful? If false, no further write-side operation 435 /// will succeed. OnWriteDone(bool)436 virtual void OnWriteDone(bool /*ok*/) {} 437 438 /// Notifies the application that all operations associated with this RPC 439 /// have completed. This is an override (from the internal base class) but 440 /// still abstract, so derived classes MUST override it to be instantiated. 441 void OnDone() override = 0; 442 443 /// Notifies the application that this RPC has been cancelled. This is an 444 /// override (from the internal base class) but not final, so derived classes 445 /// should override it if they want to take action. OnCancel()446 void OnCancel() override {} 447 448 private: 449 friend class ServerCallbackReaderWriter<Request, Response>; 450 // May be overridden by internal implementation details. This is not a public 451 // customization point. InternalBindStream(ServerCallbackReaderWriter<Request,Response> * stream)452 virtual void InternalBindStream( 453 ServerCallbackReaderWriter<Request, Response>* stream) { 454 grpc::internal::MutexLock l(&stream_mu_); 455 456 if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) { 457 stream->SendInitialMetadata(); 458 } 459 if (GPR_UNLIKELY(backlog_.read_wanted != nullptr)) { 460 stream->Read(backlog_.read_wanted); 461 } 462 if (GPR_UNLIKELY(backlog_.write_and_finish_wanted)) { 463 stream->WriteAndFinish(backlog_.write_wanted, 464 std::move(backlog_.write_options_wanted), 465 std::move(backlog_.status_wanted)); 466 } else { 467 if (GPR_UNLIKELY(backlog_.write_wanted != nullptr)) { 468 stream->Write(backlog_.write_wanted, 469 std::move(backlog_.write_options_wanted)); 470 } 471 if (GPR_UNLIKELY(backlog_.finish_wanted)) { 472 stream->Finish(std::move(backlog_.status_wanted)); 473 } 474 } 475 // Set stream_ last so that other functions can use it lock-free 476 stream_.store(stream, std::memory_order_release); 477 } 478 479 grpc::internal::Mutex stream_mu_; 480 // TODO(vjpai): Make stream_or_backlog_ into a std::variant or absl::variant 481 // once C++17 or ABSL is supported since stream and backlog are 482 // mutually exclusive in this class. Do likewise with the 483 // remaining reactor classes and their backlogs as well. 484 std::atomic<ServerCallbackReaderWriter<Request, Response>*> stream_{nullptr}; 485 struct PreBindBacklog { 486 bool send_initial_metadata_wanted = false; 487 bool write_and_finish_wanted = false; 488 bool finish_wanted = false; 489 Request* read_wanted = nullptr; 490 const Response* write_wanted = nullptr; 491 grpc::WriteOptions write_options_wanted; 492 grpc::Status status_wanted; 493 }; 494 PreBindBacklog backlog_ ABSL_GUARDED_BY(stream_mu_); 495 }; 496 497 /// \a ServerReadReactor is the interface for a client-streaming RPC. 498 template <class Request> 499 class ServerReadReactor : public internal::ServerReactor { 500 public: ServerReadReactor()501 ServerReadReactor() : reader_(nullptr) {} 502 ~ServerReadReactor() override = default; 503 504 /// The following operation initiations are exactly like ServerBidiReactor. StartSendInitialMetadata()505 void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(reader_mu_) { 506 ServerCallbackReader<Request>* reader = 507 reader_.load(std::memory_order_acquire); 508 if (reader == nullptr) { 509 grpc::internal::MutexLock l(&reader_mu_); 510 reader = reader_.load(std::memory_order_relaxed); 511 if (reader == nullptr) { 512 backlog_.send_initial_metadata_wanted = true; 513 return; 514 } 515 } 516 reader->SendInitialMetadata(); 517 } StartRead(Request * req)518 void StartRead(Request* req) ABSL_LOCKS_EXCLUDED(reader_mu_) { 519 ServerCallbackReader<Request>* reader = 520 reader_.load(std::memory_order_acquire); 521 if (reader == nullptr) { 522 grpc::internal::MutexLock l(&reader_mu_); 523 reader = reader_.load(std::memory_order_relaxed); 524 if (reader == nullptr) { 525 backlog_.read_wanted = req; 526 return; 527 } 528 } 529 reader->Read(req); 530 } Finish(grpc::Status s)531 void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(reader_mu_) { 532 ServerCallbackReader<Request>* reader = 533 reader_.load(std::memory_order_acquire); 534 if (reader == nullptr) { 535 grpc::internal::MutexLock l(&reader_mu_); 536 reader = reader_.load(std::memory_order_relaxed); 537 if (reader == nullptr) { 538 backlog_.finish_wanted = true; 539 backlog_.status_wanted = std::move(s); 540 return; 541 } 542 } 543 reader->Finish(std::move(s)); 544 } 545 546 /// The following notifications are exactly like ServerBidiReactor. OnSendInitialMetadataDone(bool)547 virtual void OnSendInitialMetadataDone(bool /*ok*/) {} OnReadDone(bool)548 virtual void OnReadDone(bool /*ok*/) {} 549 void OnDone() override = 0; OnCancel()550 void OnCancel() override {} 551 552 private: 553 friend class ServerCallbackReader<Request>; 554 555 // May be overridden by internal implementation details. This is not a public 556 // customization point. InternalBindReader(ServerCallbackReader<Request> * reader)557 virtual void InternalBindReader(ServerCallbackReader<Request>* reader) 558 ABSL_LOCKS_EXCLUDED(reader_mu_) { 559 grpc::internal::MutexLock l(&reader_mu_); 560 561 if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) { 562 reader->SendInitialMetadata(); 563 } 564 if (GPR_UNLIKELY(backlog_.read_wanted != nullptr)) { 565 reader->Read(backlog_.read_wanted); 566 } 567 if (GPR_UNLIKELY(backlog_.finish_wanted)) { 568 reader->Finish(std::move(backlog_.status_wanted)); 569 } 570 // Set reader_ last so that other functions can use it lock-free 571 reader_.store(reader, std::memory_order_release); 572 } 573 574 grpc::internal::Mutex reader_mu_; 575 std::atomic<ServerCallbackReader<Request>*> reader_{nullptr}; 576 struct PreBindBacklog { 577 bool send_initial_metadata_wanted = false; 578 bool finish_wanted = false; 579 Request* read_wanted = nullptr; 580 grpc::Status status_wanted; 581 }; 582 PreBindBacklog backlog_ ABSL_GUARDED_BY(reader_mu_); 583 }; 584 585 /// \a ServerWriteReactor is the interface for a server-streaming RPC. 586 template <class Response> 587 class ServerWriteReactor : public internal::ServerReactor { 588 public: ServerWriteReactor()589 ServerWriteReactor() : writer_(nullptr) {} 590 ~ServerWriteReactor() override = default; 591 592 /// The following operation initiations are exactly like ServerBidiReactor. StartSendInitialMetadata()593 void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(writer_mu_) { 594 ServerCallbackWriter<Response>* writer = 595 writer_.load(std::memory_order_acquire); 596 if (writer == nullptr) { 597 grpc::internal::MutexLock l(&writer_mu_); 598 writer = writer_.load(std::memory_order_relaxed); 599 if (writer == nullptr) { 600 backlog_.send_initial_metadata_wanted = true; 601 return; 602 } 603 } 604 writer->SendInitialMetadata(); 605 } StartWrite(const Response * resp)606 void StartWrite(const Response* resp) { 607 StartWrite(resp, grpc::WriteOptions()); 608 } StartWrite(const Response * resp,grpc::WriteOptions options)609 void StartWrite(const Response* resp, grpc::WriteOptions options) 610 ABSL_LOCKS_EXCLUDED(writer_mu_) { 611 ServerCallbackWriter<Response>* writer = 612 writer_.load(std::memory_order_acquire); 613 if (writer == nullptr) { 614 grpc::internal::MutexLock l(&writer_mu_); 615 writer = writer_.load(std::memory_order_relaxed); 616 if (writer == nullptr) { 617 backlog_.write_wanted = resp; 618 backlog_.write_options_wanted = options; 619 return; 620 } 621 } 622 writer->Write(resp, options); 623 } StartWriteAndFinish(const Response * resp,grpc::WriteOptions options,grpc::Status s)624 void StartWriteAndFinish(const Response* resp, grpc::WriteOptions options, 625 grpc::Status s) ABSL_LOCKS_EXCLUDED(writer_mu_) { 626 ServerCallbackWriter<Response>* writer = 627 writer_.load(std::memory_order_acquire); 628 if (writer == nullptr) { 629 grpc::internal::MutexLock l(&writer_mu_); 630 writer = writer_.load(std::memory_order_relaxed); 631 if (writer == nullptr) { 632 backlog_.write_and_finish_wanted = true; 633 backlog_.write_wanted = resp; 634 backlog_.write_options_wanted = options; 635 backlog_.status_wanted = std::move(s); 636 return; 637 } 638 } 639 writer->WriteAndFinish(resp, options, std::move(s)); 640 } StartWriteLast(const Response * resp,grpc::WriteOptions options)641 void StartWriteLast(const Response* resp, grpc::WriteOptions options) { 642 StartWrite(resp, options.set_last_message()); 643 } Finish(grpc::Status s)644 void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(writer_mu_) { 645 ServerCallbackWriter<Response>* writer = 646 writer_.load(std::memory_order_acquire); 647 if (writer == nullptr) { 648 grpc::internal::MutexLock l(&writer_mu_); 649 writer = writer_.load(std::memory_order_relaxed); 650 if (writer == nullptr) { 651 backlog_.finish_wanted = true; 652 backlog_.status_wanted = std::move(s); 653 return; 654 } 655 } 656 writer->Finish(std::move(s)); 657 } 658 659 /// The following notifications are exactly like ServerBidiReactor. OnSendInitialMetadataDone(bool)660 virtual void OnSendInitialMetadataDone(bool /*ok*/) {} OnWriteDone(bool)661 virtual void OnWriteDone(bool /*ok*/) {} 662 void OnDone() override = 0; OnCancel()663 void OnCancel() override {} 664 665 private: 666 friend class ServerCallbackWriter<Response>; 667 // May be overridden by internal implementation details. This is not a public 668 // customization point. InternalBindWriter(ServerCallbackWriter<Response> * writer)669 virtual void InternalBindWriter(ServerCallbackWriter<Response>* writer) 670 ABSL_LOCKS_EXCLUDED(writer_mu_) { 671 grpc::internal::MutexLock l(&writer_mu_); 672 673 if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) { 674 writer->SendInitialMetadata(); 675 } 676 if (GPR_UNLIKELY(backlog_.write_and_finish_wanted)) { 677 writer->WriteAndFinish(backlog_.write_wanted, 678 std::move(backlog_.write_options_wanted), 679 std::move(backlog_.status_wanted)); 680 } else { 681 if (GPR_UNLIKELY(backlog_.write_wanted != nullptr)) { 682 writer->Write(backlog_.write_wanted, 683 std::move(backlog_.write_options_wanted)); 684 } 685 if (GPR_UNLIKELY(backlog_.finish_wanted)) { 686 writer->Finish(std::move(backlog_.status_wanted)); 687 } 688 } 689 // Set writer_ last so that other functions can use it lock-free 690 writer_.store(writer, std::memory_order_release); 691 } 692 693 grpc::internal::Mutex writer_mu_; 694 std::atomic<ServerCallbackWriter<Response>*> writer_{nullptr}; 695 struct PreBindBacklog { 696 bool send_initial_metadata_wanted = false; 697 bool write_and_finish_wanted = false; 698 bool finish_wanted = false; 699 const Response* write_wanted = nullptr; 700 grpc::WriteOptions write_options_wanted; 701 grpc::Status status_wanted; 702 }; 703 PreBindBacklog backlog_ ABSL_GUARDED_BY(writer_mu_); 704 }; 705 706 class ServerUnaryReactor : public internal::ServerReactor { 707 public: ServerUnaryReactor()708 ServerUnaryReactor() : call_(nullptr) {} 709 ~ServerUnaryReactor() override = default; 710 711 /// StartSendInitialMetadata is exactly like ServerBidiReactor. StartSendInitialMetadata()712 void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(call_mu_) { 713 ServerCallbackUnary* call = call_.load(std::memory_order_acquire); 714 if (call == nullptr) { 715 grpc::internal::MutexLock l(&call_mu_); 716 call = call_.load(std::memory_order_relaxed); 717 if (call == nullptr) { 718 backlog_.send_initial_metadata_wanted = true; 719 return; 720 } 721 } 722 call->SendInitialMetadata(); 723 } 724 /// Finish is similar to ServerBidiReactor except for one detail. 725 /// If the status is non-OK, any message will not be sent. Instead, 726 /// the client will only receive the status and any trailing metadata. Finish(grpc::Status s)727 void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(call_mu_) { 728 ServerCallbackUnary* call = call_.load(std::memory_order_acquire); 729 if (call == nullptr) { 730 grpc::internal::MutexLock l(&call_mu_); 731 call = call_.load(std::memory_order_relaxed); 732 if (call == nullptr) { 733 backlog_.finish_wanted = true; 734 backlog_.status_wanted = std::move(s); 735 return; 736 } 737 } 738 call->Finish(std::move(s)); 739 } 740 741 /// The following notifications are exactly like ServerBidiReactor. OnSendInitialMetadataDone(bool)742 virtual void OnSendInitialMetadataDone(bool /*ok*/) {} 743 void OnDone() override = 0; OnCancel()744 void OnCancel() override {} 745 746 private: 747 friend class ServerCallbackUnary; 748 // May be overridden by internal implementation details. This is not a public 749 // customization point. InternalBindCall(ServerCallbackUnary * call)750 virtual void InternalBindCall(ServerCallbackUnary* call) 751 ABSL_LOCKS_EXCLUDED(call_mu_) { 752 grpc::internal::MutexLock l(&call_mu_); 753 754 if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) { 755 call->SendInitialMetadata(); 756 } 757 if (GPR_UNLIKELY(backlog_.finish_wanted)) { 758 call->Finish(std::move(backlog_.status_wanted)); 759 } 760 // Set call_ last so that other functions can use it lock-free 761 call_.store(call, std::memory_order_release); 762 } 763 764 grpc::internal::Mutex call_mu_; 765 std::atomic<ServerCallbackUnary*> call_{nullptr}; 766 struct PreBindBacklog { 767 bool send_initial_metadata_wanted = false; 768 bool finish_wanted = false; 769 grpc::Status status_wanted; 770 }; 771 PreBindBacklog backlog_ ABSL_GUARDED_BY(call_mu_); 772 }; 773 774 namespace internal { 775 776 template <class Base> 777 class FinishOnlyReactor : public Base { 778 public: FinishOnlyReactor(grpc::Status s)779 explicit FinishOnlyReactor(grpc::Status s) { this->Finish(std::move(s)); } OnDone()780 void OnDone() override { this->~FinishOnlyReactor(); } 781 }; 782 783 using UnimplementedUnaryReactor = FinishOnlyReactor<ServerUnaryReactor>; 784 template <class Request> 785 using UnimplementedReadReactor = FinishOnlyReactor<ServerReadReactor<Request>>; 786 template <class Response> 787 using UnimplementedWriteReactor = 788 FinishOnlyReactor<ServerWriteReactor<Response>>; 789 template <class Request, class Response> 790 using UnimplementedBidiReactor = 791 FinishOnlyReactor<ServerBidiReactor<Request, Response>>; 792 793 } // namespace internal 794 795 // TODO(vjpai): Remove namespace experimental when last known users are migrated 796 // off. 797 namespace experimental { 798 799 template <class Request, class Response> 800 using ServerBidiReactor = ::grpc::ServerBidiReactor<Request, Response>; 801 802 } // namespace experimental 803 804 } // namespace grpc 805 806 #endif // GRPCPP_SUPPORT_SERVER_CALLBACK_H 807