1 // 2 // 3 // Copyright 2015 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 // 17 // 18 19 #ifndef GRPCPP_SUPPORT_ASYNC_STREAM_H 20 #define GRPCPP_SUPPORT_ASYNC_STREAM_H 21 22 #include <grpc/grpc.h> 23 #include <grpc/support/log.h> 24 #include <grpcpp/impl/call.h> 25 #include <grpcpp/impl/channel_interface.h> 26 #include <grpcpp/impl/service_type.h> 27 #include <grpcpp/server_context.h> 28 #include <grpcpp/support/status.h> 29 30 namespace grpc { 31 32 namespace internal { 33 /// Common interface for all client side asynchronous streaming. 34 class ClientAsyncStreamingInterface { 35 public: ~ClientAsyncStreamingInterface()36 virtual ~ClientAsyncStreamingInterface() {} 37 38 /// Start the call that was set up by the constructor, but only if the 39 /// constructor was invoked through the "Prepare" API which doesn't actually 40 /// start the call 41 virtual void StartCall(void* tag) = 0; 42 43 /// Request notification of the reading of the initial metadata. Completion 44 /// will be notified by \a tag on the associated completion queue. 45 /// This call is optional, but if it is used, it cannot be used concurrently 46 /// with or after the \a AsyncReaderInterface::Read method. 47 /// 48 /// \param[in] tag Tag identifying this request. 49 virtual void ReadInitialMetadata(void* tag) = 0; 50 51 /// Indicate that the stream is to be finished and request notification for 52 /// when the call has been ended. 53 /// Should not be used concurrently with other operations. 54 /// 55 /// It is appropriate to call this method exactly once when both: 56 /// * the client side has no more message to send 57 /// (this can be declared implicitly by calling this method, or 58 /// explicitly through an earlier call to the <i>WritesDone</i> method 59 /// of the class in use, e.g. \a ClientAsyncWriterInterface::WritesDone or 60 /// \a ClientAsyncReaderWriterInterface::WritesDone). 61 /// * there are no more messages to be received from the server (this can 62 /// be known implicitly by the calling code, or explicitly from an 63 /// earlier call to \a AsyncReaderInterface::Read that yielded a failed 64 /// result, e.g. cq->Next(&read_tag, &ok) filled in 'ok' with 'false'). 65 /// 66 /// The tag will be returned when either: 67 /// - all incoming messages have been read and the server has returned 68 /// a status. 69 /// - the server has returned a non-OK status. 70 /// - the call failed for some reason and the library generated a 71 /// status. 72 /// 73 /// Note that implementations of this method attempt to receive initial 74 /// metadata from the server if initial metadata hasn't yet been received. 75 /// 76 /// \param[in] tag Tag identifying this request. 77 /// \param[out] status To be updated with the operation status. 78 virtual void Finish(grpc::Status* status, void* tag) = 0; 79 }; 80 81 /// An interface that yields a sequence of messages of type \a R. 82 template <class R> 83 class AsyncReaderInterface { 84 public: ~AsyncReaderInterface()85 virtual ~AsyncReaderInterface() {} 86 87 /// Read a message of type \a R into \a msg. Completion will be notified by \a 88 /// tag on the associated completion queue. 89 /// This is thread-safe with respect to \a Write or \a WritesDone methods. It 90 /// should not be called concurrently with other streaming APIs 91 /// on the same stream. It is not meaningful to call it concurrently 92 /// with another \a AsyncReaderInterface::Read on the same stream since reads 93 /// on the same stream are delivered in order. 94 /// 95 /// \param[out] msg Where to eventually store the read message. 96 /// \param[in] tag The tag identifying the operation. 97 /// 98 /// Side effect: note that this method attempt to receive initial metadata for 99 /// a stream if it hasn't yet been received. 100 virtual void Read(R* msg, void* tag) = 0; 101 }; 102 103 /// An interface that can be fed a sequence of messages of type \a W. 104 template <class W> 105 class AsyncWriterInterface { 106 public: ~AsyncWriterInterface()107 virtual ~AsyncWriterInterface() {} 108 109 /// Request the writing of \a msg with identifying tag \a tag. 110 /// 111 /// Only one write may be outstanding at any given time. This means that 112 /// after calling Write, one must wait to receive \a tag from the completion 113 /// queue BEFORE calling Write again. 114 /// This is thread-safe with respect to \a AsyncReaderInterface::Read 115 /// 116 /// gRPC doesn't take ownership or a reference to \a msg, so it is safe to 117 /// to deallocate once Write returns. 118 /// 119 /// \param[in] msg The message to be written. 120 /// \param[in] tag The tag identifying the operation. 121 virtual void Write(const W& msg, void* tag) = 0; 122 123 /// Request the writing of \a msg using WriteOptions \a options with 124 /// identifying tag \a tag. 125 /// 126 /// Only one write may be outstanding at any given time. This means that 127 /// after calling Write, one must wait to receive \a tag from the completion 128 /// queue BEFORE calling Write again. 129 /// WriteOptions \a options is used to set the write options of this message. 130 /// This is thread-safe with respect to \a AsyncReaderInterface::Read 131 /// 132 /// gRPC doesn't take ownership or a reference to \a msg, so it is safe to 133 /// to deallocate once Write returns. 134 /// 135 /// \param[in] msg The message to be written. 136 /// \param[in] options The WriteOptions to be used to write this message. 137 /// \param[in] tag The tag identifying the operation. 138 virtual void Write(const W& msg, grpc::WriteOptions options, void* tag) = 0; 139 140 /// Request the writing of \a msg and coalesce it with the writing 141 /// of trailing metadata, using WriteOptions \a options with 142 /// identifying tag \a tag. 143 /// 144 /// For client, WriteLast is equivalent of performing Write and 145 /// WritesDone in a single step. 146 /// For server, WriteLast buffers the \a msg. The writing of \a msg is held 147 /// until Finish is called, where \a msg and trailing metadata are coalesced 148 /// and write is initiated. Note that WriteLast can only buffer \a msg up to 149 /// the flow control window size. If \a msg size is larger than the window 150 /// size, it will be sent on wire without buffering. 151 /// 152 /// gRPC doesn't take ownership or a reference to \a msg, so it is safe to 153 /// to deallocate once Write returns. 154 /// 155 /// \param[in] msg The message to be written. 156 /// \param[in] options The WriteOptions to be used to write this message. 157 /// \param[in] tag The tag identifying the operation. WriteLast(const W & msg,grpc::WriteOptions options,void * tag)158 void WriteLast(const W& msg, grpc::WriteOptions options, void* tag) { 159 Write(msg, options.set_last_message(), tag); 160 } 161 }; 162 163 } // namespace internal 164 165 template <class R> 166 class ClientAsyncReaderInterface 167 : public internal::ClientAsyncStreamingInterface, 168 public internal::AsyncReaderInterface<R> {}; 169 170 namespace internal { 171 template <class R> 172 class ClientAsyncReaderFactory { 173 public: 174 /// Create a stream object. 175 /// Write the first request out if \a start is set. 176 /// \a tag will be notified on \a cq when the call has been started and 177 /// \a request has been written out. If \a start is not set, \a tag must be 178 /// nullptr and the actual call must be initiated by StartCall 179 /// Note that \a context will be used to fill in custom initial metadata 180 /// used to send to the server when starting the call. 181 template <class W> Create(grpc::ChannelInterface * channel,grpc::CompletionQueue * cq,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,const W & request,bool start,void * tag)182 static ClientAsyncReader<R>* Create(grpc::ChannelInterface* channel, 183 grpc::CompletionQueue* cq, 184 const grpc::internal::RpcMethod& method, 185 grpc::ClientContext* context, 186 const W& request, bool start, void* tag) { 187 grpc::internal::Call call = channel->CreateCall(method, context, cq); 188 return new ( 189 grpc_call_arena_alloc(call.call(), sizeof(ClientAsyncReader<R>))) 190 ClientAsyncReader<R>(call, context, request, start, tag); 191 } 192 }; 193 } // namespace internal 194 195 /// Async client-side API for doing server-streaming RPCs, 196 /// where the incoming message stream coming from the server has 197 /// messages of type \a R. 198 template <class R> 199 class ClientAsyncReader final : public ClientAsyncReaderInterface<R> { 200 public: 201 // always allocated against a call arena, no memory free required delete(void *,std::size_t size)202 static void operator delete(void* /*ptr*/, std::size_t size) { 203 GPR_ASSERT(size == sizeof(ClientAsyncReader)); 204 } 205 206 // This operator should never be called as the memory should be freed as part 207 // of the arena destruction. It only exists to provide a matching operator 208 // delete to the operator new so that some compilers will not complain (see 209 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this 210 // there are no tests catching the compiler warning. delete(void *,void *)211 static void operator delete(void*, void*) { GPR_ASSERT(false); } 212 StartCall(void * tag)213 void StartCall(void* tag) override { 214 GPR_ASSERT(!started_); 215 started_ = true; 216 StartCallInternal(tag); 217 } 218 219 /// See the \a ClientAsyncStreamingInterface.ReadInitialMetadata 220 /// method for semantics. 221 /// 222 /// Side effect: 223 /// - upon receiving initial metadata from the server, 224 /// the \a ClientContext associated with this call is updated, and the 225 /// calling code can access the received metadata through the 226 /// \a ClientContext. ReadInitialMetadata(void * tag)227 void ReadInitialMetadata(void* tag) override { 228 GPR_ASSERT(started_); 229 GPR_ASSERT(!context_->initial_metadata_received_); 230 231 meta_ops_.set_output_tag(tag); 232 meta_ops_.RecvInitialMetadata(context_); 233 call_.PerformOps(&meta_ops_); 234 } 235 Read(R * msg,void * tag)236 void Read(R* msg, void* tag) override { 237 GPR_ASSERT(started_); 238 read_ops_.set_output_tag(tag); 239 if (!context_->initial_metadata_received_) { 240 read_ops_.RecvInitialMetadata(context_); 241 } 242 read_ops_.RecvMessage(msg); 243 call_.PerformOps(&read_ops_); 244 } 245 246 /// See the \a ClientAsyncStreamingInterface.Finish method for semantics. 247 /// 248 /// Side effect: 249 /// - the \a ClientContext associated with this call is updated with 250 /// possible initial and trailing metadata received from the server. Finish(grpc::Status * status,void * tag)251 void Finish(grpc::Status* status, void* tag) override { 252 GPR_ASSERT(started_); 253 finish_ops_.set_output_tag(tag); 254 if (!context_->initial_metadata_received_) { 255 finish_ops_.RecvInitialMetadata(context_); 256 } 257 finish_ops_.ClientRecvStatus(context_, status); 258 call_.PerformOps(&finish_ops_); 259 } 260 261 private: 262 friend class internal::ClientAsyncReaderFactory<R>; 263 template <class W> ClientAsyncReader(grpc::internal::Call call,grpc::ClientContext * context,const W & request,bool start,void * tag)264 ClientAsyncReader(grpc::internal::Call call, grpc::ClientContext* context, 265 const W& request, bool start, void* tag) 266 : context_(context), call_(call), started_(start) { 267 // TODO(ctiller): don't assert 268 GPR_ASSERT(init_ops_.SendMessage(request).ok()); 269 init_ops_.ClientSendClose(); 270 if (start) { 271 StartCallInternal(tag); 272 } else { 273 GPR_ASSERT(tag == nullptr); 274 } 275 } 276 StartCallInternal(void * tag)277 void StartCallInternal(void* tag) { 278 init_ops_.SendInitialMetadata(&context_->send_initial_metadata_, 279 context_->initial_metadata_flags()); 280 init_ops_.set_output_tag(tag); 281 call_.PerformOps(&init_ops_); 282 } 283 284 grpc::ClientContext* context_; 285 grpc::internal::Call call_; 286 bool started_; 287 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, 288 grpc::internal::CallOpSendMessage, 289 grpc::internal::CallOpClientSendClose> 290 init_ops_; 291 grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata> 292 meta_ops_; 293 grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata, 294 grpc::internal::CallOpRecvMessage<R>> 295 read_ops_; 296 grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata, 297 grpc::internal::CallOpClientRecvStatus> 298 finish_ops_; 299 }; 300 301 /// Common interface for client side asynchronous writing. 302 template <class W> 303 class ClientAsyncWriterInterface 304 : public internal::ClientAsyncStreamingInterface, 305 public internal::AsyncWriterInterface<W> { 306 public: 307 /// Signal the client is done with the writes (half-close the client stream). 308 /// Thread-safe with respect to \a AsyncReaderInterface::Read 309 /// 310 /// \param[in] tag The tag identifying the operation. 311 virtual void WritesDone(void* tag) = 0; 312 }; 313 314 namespace internal { 315 template <class W> 316 class ClientAsyncWriterFactory { 317 public: 318 /// Create a stream object. 319 /// Start the RPC if \a start is set 320 /// \a tag will be notified on \a cq when the call has been started (i.e. 321 /// initial metadata sent) and \a request has been written out. 322 /// If \a start is not set, \a tag must be nullptr and the actual call 323 /// must be initiated by StartCall 324 /// Note that \a context will be used to fill in custom initial metadata 325 /// used to send to the server when starting the call. 326 /// \a response will be filled in with the single expected response 327 /// message from the server upon a successful call to the \a Finish 328 /// method of this instance. 329 template <class R> Create(grpc::ChannelInterface * channel,grpc::CompletionQueue * cq,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,R * response,bool start,void * tag)330 static ClientAsyncWriter<W>* Create(grpc::ChannelInterface* channel, 331 grpc::CompletionQueue* cq, 332 const grpc::internal::RpcMethod& method, 333 grpc::ClientContext* context, R* response, 334 bool start, void* tag) { 335 grpc::internal::Call call = channel->CreateCall(method, context, cq); 336 return new ( 337 grpc_call_arena_alloc(call.call(), sizeof(ClientAsyncWriter<W>))) 338 ClientAsyncWriter<W>(call, context, response, start, tag); 339 } 340 }; 341 } // namespace internal 342 343 /// Async API on the client side for doing client-streaming RPCs, 344 /// where the outgoing message stream going to the server contains 345 /// messages of type \a W. 346 template <class W> 347 class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> { 348 public: 349 // always allocated against a call arena, no memory free required delete(void *,std::size_t size)350 static void operator delete(void* /*ptr*/, std::size_t size) { 351 GPR_ASSERT(size == sizeof(ClientAsyncWriter)); 352 } 353 354 // This operator should never be called as the memory should be freed as part 355 // of the arena destruction. It only exists to provide a matching operator 356 // delete to the operator new so that some compilers will not complain (see 357 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this 358 // there are no tests catching the compiler warning. delete(void *,void *)359 static void operator delete(void*, void*) { GPR_ASSERT(false); } 360 StartCall(void * tag)361 void StartCall(void* tag) override { 362 GPR_ASSERT(!started_); 363 started_ = true; 364 StartCallInternal(tag); 365 } 366 367 /// See the \a ClientAsyncStreamingInterface.ReadInitialMetadata method for 368 /// semantics. 369 /// 370 /// Side effect: 371 /// - upon receiving initial metadata from the server, the \a ClientContext 372 /// associated with this call is updated, and the calling code can access 373 /// the received metadata through the \a ClientContext. ReadInitialMetadata(void * tag)374 void ReadInitialMetadata(void* tag) override { 375 GPR_ASSERT(started_); 376 GPR_ASSERT(!context_->initial_metadata_received_); 377 378 meta_ops_.set_output_tag(tag); 379 meta_ops_.RecvInitialMetadata(context_); 380 call_.PerformOps(&meta_ops_); 381 } 382 Write(const W & msg,void * tag)383 void Write(const W& msg, void* tag) override { 384 GPR_ASSERT(started_); 385 write_ops_.set_output_tag(tag); 386 // TODO(ctiller): don't assert 387 GPR_ASSERT(write_ops_.SendMessage(msg).ok()); 388 call_.PerformOps(&write_ops_); 389 } 390 Write(const W & msg,grpc::WriteOptions options,void * tag)391 void Write(const W& msg, grpc::WriteOptions options, void* tag) override { 392 GPR_ASSERT(started_); 393 write_ops_.set_output_tag(tag); 394 if (options.is_last_message()) { 395 options.set_buffer_hint(); 396 write_ops_.ClientSendClose(); 397 } 398 // TODO(ctiller): don't assert 399 GPR_ASSERT(write_ops_.SendMessage(msg, options).ok()); 400 call_.PerformOps(&write_ops_); 401 } 402 WritesDone(void * tag)403 void WritesDone(void* tag) override { 404 GPR_ASSERT(started_); 405 write_ops_.set_output_tag(tag); 406 write_ops_.ClientSendClose(); 407 call_.PerformOps(&write_ops_); 408 } 409 410 /// See the \a ClientAsyncStreamingInterface.Finish method for semantics. 411 /// 412 /// Side effect: 413 /// - the \a ClientContext associated with this call is updated with 414 /// possible initial and trailing metadata received from the server. 415 /// - attempts to fill in the \a response parameter passed to this class's 416 /// constructor with the server's response message. Finish(grpc::Status * status,void * tag)417 void Finish(grpc::Status* status, void* tag) override { 418 GPR_ASSERT(started_); 419 finish_ops_.set_output_tag(tag); 420 if (!context_->initial_metadata_received_) { 421 finish_ops_.RecvInitialMetadata(context_); 422 } 423 finish_ops_.ClientRecvStatus(context_, status); 424 call_.PerformOps(&finish_ops_); 425 } 426 427 private: 428 friend class internal::ClientAsyncWriterFactory<W>; 429 template <class R> ClientAsyncWriter(grpc::internal::Call call,grpc::ClientContext * context,R * response,bool start,void * tag)430 ClientAsyncWriter(grpc::internal::Call call, grpc::ClientContext* context, 431 R* response, bool start, void* tag) 432 : context_(context), call_(call), started_(start) { 433 finish_ops_.RecvMessage(response); 434 finish_ops_.AllowNoMessage(); 435 if (start) { 436 StartCallInternal(tag); 437 } else { 438 GPR_ASSERT(tag == nullptr); 439 } 440 } 441 StartCallInternal(void * tag)442 void StartCallInternal(void* tag) { 443 write_ops_.SendInitialMetadata(&context_->send_initial_metadata_, 444 context_->initial_metadata_flags()); 445 // if corked bit is set in context, we just keep the initial metadata 446 // buffered up to coalesce with later message send. No op is performed. 447 if (!context_->initial_metadata_corked_) { 448 write_ops_.set_output_tag(tag); 449 call_.PerformOps(&write_ops_); 450 } 451 } 452 453 grpc::ClientContext* context_; 454 grpc::internal::Call call_; 455 bool started_; 456 grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata> 457 meta_ops_; 458 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, 459 grpc::internal::CallOpSendMessage, 460 grpc::internal::CallOpClientSendClose> 461 write_ops_; 462 grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata, 463 grpc::internal::CallOpGenericRecvMessage, 464 grpc::internal::CallOpClientRecvStatus> 465 finish_ops_; 466 }; 467 468 /// Async client-side interface for bi-directional streaming, 469 /// where the client-to-server message stream has messages of type \a W, 470 /// and the server-to-client message stream has messages of type \a R. 471 template <class W, class R> 472 class ClientAsyncReaderWriterInterface 473 : public internal::ClientAsyncStreamingInterface, 474 public internal::AsyncWriterInterface<W>, 475 public internal::AsyncReaderInterface<R> { 476 public: 477 /// Signal the client is done with the writes (half-close the client stream). 478 /// Thread-safe with respect to \a AsyncReaderInterface::Read 479 /// 480 /// \param[in] tag The tag identifying the operation. 481 virtual void WritesDone(void* tag) = 0; 482 }; 483 484 namespace internal { 485 template <class W, class R> 486 class ClientAsyncReaderWriterFactory { 487 public: 488 /// Create a stream object. 489 /// Start the RPC request if \a start is set. 490 /// \a tag will be notified on \a cq when the call has been started (i.e. 491 /// initial metadata sent). If \a start is not set, \a tag must be 492 /// nullptr and the actual call must be initiated by StartCall 493 /// Note that \a context will be used to fill in custom initial metadata 494 /// used to send to the server when starting the call. Create(grpc::ChannelInterface * channel,grpc::CompletionQueue * cq,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,bool start,void * tag)495 static ClientAsyncReaderWriter<W, R>* Create( 496 grpc::ChannelInterface* channel, grpc::CompletionQueue* cq, 497 const grpc::internal::RpcMethod& method, grpc::ClientContext* context, 498 bool start, void* tag) { 499 grpc::internal::Call call = channel->CreateCall(method, context, cq); 500 501 return new (grpc_call_arena_alloc(call.call(), 502 sizeof(ClientAsyncReaderWriter<W, R>))) 503 ClientAsyncReaderWriter<W, R>(call, context, start, tag); 504 } 505 }; 506 } // namespace internal 507 508 /// Async client-side interface for bi-directional streaming, 509 /// where the outgoing message stream going to the server 510 /// has messages of type \a W, and the incoming message stream coming 511 /// from the server has messages of type \a R. 512 template <class W, class R> 513 class ClientAsyncReaderWriter final 514 : public ClientAsyncReaderWriterInterface<W, R> { 515 public: 516 // always allocated against a call arena, no memory free required delete(void *,std::size_t size)517 static void operator delete(void* /*ptr*/, std::size_t size) { 518 GPR_ASSERT(size == sizeof(ClientAsyncReaderWriter)); 519 } 520 521 // This operator should never be called as the memory should be freed as part 522 // of the arena destruction. It only exists to provide a matching operator 523 // delete to the operator new so that some compilers will not complain (see 524 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this 525 // there are no tests catching the compiler warning. delete(void *,void *)526 static void operator delete(void*, void*) { GPR_ASSERT(false); } 527 StartCall(void * tag)528 void StartCall(void* tag) override { 529 GPR_ASSERT(!started_); 530 started_ = true; 531 StartCallInternal(tag); 532 } 533 534 /// See the \a ClientAsyncStreamingInterface.ReadInitialMetadata method 535 /// for semantics of this method. 536 /// 537 /// Side effect: 538 /// - upon receiving initial metadata from the server, the \a ClientContext 539 /// is updated with it, and then the receiving initial metadata can 540 /// be accessed through this \a ClientContext. ReadInitialMetadata(void * tag)541 void ReadInitialMetadata(void* tag) override { 542 GPR_ASSERT(started_); 543 GPR_ASSERT(!context_->initial_metadata_received_); 544 545 meta_ops_.set_output_tag(tag); 546 meta_ops_.RecvInitialMetadata(context_); 547 call_.PerformOps(&meta_ops_); 548 } 549 Read(R * msg,void * tag)550 void Read(R* msg, void* tag) override { 551 GPR_ASSERT(started_); 552 read_ops_.set_output_tag(tag); 553 if (!context_->initial_metadata_received_) { 554 read_ops_.RecvInitialMetadata(context_); 555 } 556 read_ops_.RecvMessage(msg); 557 call_.PerformOps(&read_ops_); 558 } 559 Write(const W & msg,void * tag)560 void Write(const W& msg, void* tag) override { 561 GPR_ASSERT(started_); 562 write_ops_.set_output_tag(tag); 563 // TODO(ctiller): don't assert 564 GPR_ASSERT(write_ops_.SendMessage(msg).ok()); 565 call_.PerformOps(&write_ops_); 566 } 567 Write(const W & msg,grpc::WriteOptions options,void * tag)568 void Write(const W& msg, grpc::WriteOptions options, void* tag) override { 569 GPR_ASSERT(started_); 570 write_ops_.set_output_tag(tag); 571 if (options.is_last_message()) { 572 options.set_buffer_hint(); 573 write_ops_.ClientSendClose(); 574 } 575 // TODO(ctiller): don't assert 576 GPR_ASSERT(write_ops_.SendMessage(msg, options).ok()); 577 call_.PerformOps(&write_ops_); 578 } 579 WritesDone(void * tag)580 void WritesDone(void* tag) override { 581 GPR_ASSERT(started_); 582 write_ops_.set_output_tag(tag); 583 write_ops_.ClientSendClose(); 584 call_.PerformOps(&write_ops_); 585 } 586 587 /// See the \a ClientAsyncStreamingInterface.Finish method for semantics. 588 /// Side effect 589 /// - the \a ClientContext associated with this call is updated with 590 /// possible initial and trailing metadata sent from the server. Finish(grpc::Status * status,void * tag)591 void Finish(grpc::Status* status, void* tag) override { 592 GPR_ASSERT(started_); 593 finish_ops_.set_output_tag(tag); 594 if (!context_->initial_metadata_received_) { 595 finish_ops_.RecvInitialMetadata(context_); 596 } 597 finish_ops_.ClientRecvStatus(context_, status); 598 call_.PerformOps(&finish_ops_); 599 } 600 601 private: 602 friend class internal::ClientAsyncReaderWriterFactory<W, R>; ClientAsyncReaderWriter(grpc::internal::Call call,grpc::ClientContext * context,bool start,void * tag)603 ClientAsyncReaderWriter(grpc::internal::Call call, 604 grpc::ClientContext* context, bool start, void* tag) 605 : context_(context), call_(call), started_(start) { 606 if (start) { 607 StartCallInternal(tag); 608 } else { 609 GPR_ASSERT(tag == nullptr); 610 } 611 } 612 StartCallInternal(void * tag)613 void StartCallInternal(void* tag) { 614 write_ops_.SendInitialMetadata(&context_->send_initial_metadata_, 615 context_->initial_metadata_flags()); 616 // if corked bit is set in context, we just keep the initial metadata 617 // buffered up to coalesce with later message send. No op is performed. 618 if (!context_->initial_metadata_corked_) { 619 write_ops_.set_output_tag(tag); 620 call_.PerformOps(&write_ops_); 621 } 622 } 623 624 grpc::ClientContext* context_; 625 grpc::internal::Call call_; 626 bool started_; 627 grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata> 628 meta_ops_; 629 grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata, 630 grpc::internal::CallOpRecvMessage<R>> 631 read_ops_; 632 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, 633 grpc::internal::CallOpSendMessage, 634 grpc::internal::CallOpClientSendClose> 635 write_ops_; 636 grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata, 637 grpc::internal::CallOpClientRecvStatus> 638 finish_ops_; 639 }; 640 641 template <class W, class R> 642 class ServerAsyncReaderInterface 643 : public grpc::internal::ServerAsyncStreamingInterface, 644 public internal::AsyncReaderInterface<R> { 645 public: 646 /// Indicate that the stream is to be finished with a certain status code 647 /// and also send out \a msg response to the client. 648 /// Request notification for when the server has sent the response and the 649 /// appropriate signals to the client to end the call. 650 /// Should not be used concurrently with other operations. 651 /// 652 /// It is appropriate to call this method when: 653 /// * all messages from the client have been received (either known 654 /// implicitly, or explicitly because a previous 655 /// \a AsyncReaderInterface::Read operation with a non-ok result, 656 /// e.g., cq->Next(&read_tag, &ok) filled in 'ok' with 'false'). 657 /// 658 /// This operation will end when the server has finished sending out initial 659 /// metadata (if not sent already), response message, and status, or if 660 /// some failure occurred when trying to do so. 661 /// 662 /// gRPC doesn't take ownership or a reference to \a msg or \a status, so it 663 /// is safe to deallocate once Finish returns. 664 /// 665 /// \param[in] tag Tag identifying this request. 666 /// \param[in] status To be sent to the client as the result of this call. 667 /// \param[in] msg To be sent to the client as the response for this call. 668 virtual void Finish(const W& msg, const grpc::Status& status, void* tag) = 0; 669 670 /// Indicate that the stream is to be finished with a certain 671 /// non-OK status code. 672 /// Request notification for when the server has sent the appropriate 673 /// signals to the client to end the call. 674 /// Should not be used concurrently with other operations. 675 /// 676 /// This call is meant to end the call with some error, and can be called at 677 /// any point that the server would like to "fail" the call (though note 678 /// this shouldn't be called concurrently with any other "sending" call, like 679 /// \a AsyncWriterInterface::Write). 680 /// 681 /// This operation will end when the server has finished sending out initial 682 /// metadata (if not sent already), and status, or if some failure occurred 683 /// when trying to do so. 684 /// 685 /// gRPC doesn't take ownership or a reference to \a status, so it is safe to 686 /// to deallocate once FinishWithError returns. 687 /// 688 /// \param[in] tag Tag identifying this request. 689 /// \param[in] status To be sent to the client as the result of this call. 690 /// - Note: \a status must have a non-OK code. 691 virtual void FinishWithError(const grpc::Status& status, void* tag) = 0; 692 }; 693 694 /// Async server-side API for doing client-streaming RPCs, 695 /// where the incoming message stream from the client has messages of type \a R, 696 /// and the single response message sent from the server is type \a W. 697 template <class W, class R> 698 class ServerAsyncReader final : public ServerAsyncReaderInterface<W, R> { 699 public: ServerAsyncReader(grpc::ServerContext * ctx)700 explicit ServerAsyncReader(grpc::ServerContext* ctx) 701 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} 702 703 /// See \a ServerAsyncStreamingInterface::SendInitialMetadata for semantics. 704 /// 705 /// Implicit input parameter: 706 /// - The initial metadata that will be sent to the client from this op will 707 /// be taken from the \a ServerContext associated with the call. SendInitialMetadata(void * tag)708 void SendInitialMetadata(void* tag) override { 709 GPR_ASSERT(!ctx_->sent_initial_metadata_); 710 711 meta_ops_.set_output_tag(tag); 712 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 713 ctx_->initial_metadata_flags()); 714 if (ctx_->compression_level_set()) { 715 meta_ops_.set_compression_level(ctx_->compression_level()); 716 } 717 ctx_->sent_initial_metadata_ = true; 718 call_.PerformOps(&meta_ops_); 719 } 720 Read(R * msg,void * tag)721 void Read(R* msg, void* tag) override { 722 read_ops_.set_output_tag(tag); 723 read_ops_.RecvMessage(msg); 724 call_.PerformOps(&read_ops_); 725 } 726 727 /// See the \a ServerAsyncReaderInterface.Read method for semantics 728 /// 729 /// Side effect: 730 /// - also sends initial metadata if not already sent. 731 /// - uses the \a ServerContext associated with this call to send possible 732 /// initial and trailing metadata. 733 /// 734 /// Note: \a msg is not sent if \a status has a non-OK code. 735 /// 736 /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it 737 /// is safe to deallocate once Finish returns. Finish(const W & msg,const grpc::Status & status,void * tag)738 void Finish(const W& msg, const grpc::Status& status, void* tag) override { 739 finish_ops_.set_output_tag(tag); 740 if (!ctx_->sent_initial_metadata_) { 741 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 742 ctx_->initial_metadata_flags()); 743 if (ctx_->compression_level_set()) { 744 finish_ops_.set_compression_level(ctx_->compression_level()); 745 } 746 ctx_->sent_initial_metadata_ = true; 747 } 748 // The response is dropped if the status is not OK. 749 if (status.ok()) { 750 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, 751 finish_ops_.SendMessage(msg)); 752 } else { 753 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status); 754 } 755 call_.PerformOps(&finish_ops_); 756 } 757 758 /// See the \a ServerAsyncReaderInterface.Read method for semantics 759 /// 760 /// Side effect: 761 /// - also sends initial metadata if not already sent. 762 /// - uses the \a ServerContext associated with this call to send possible 763 /// initial and trailing metadata. 764 /// 765 /// gRPC doesn't take ownership or a reference to \a status, so it is safe to 766 /// to deallocate once FinishWithError returns. FinishWithError(const grpc::Status & status,void * tag)767 void FinishWithError(const grpc::Status& status, void* tag) override { 768 GPR_ASSERT(!status.ok()); 769 finish_ops_.set_output_tag(tag); 770 if (!ctx_->sent_initial_metadata_) { 771 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 772 ctx_->initial_metadata_flags()); 773 if (ctx_->compression_level_set()) { 774 finish_ops_.set_compression_level(ctx_->compression_level()); 775 } 776 ctx_->sent_initial_metadata_ = true; 777 } 778 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status); 779 call_.PerformOps(&finish_ops_); 780 } 781 782 private: BindCall(grpc::internal::Call * call)783 void BindCall(grpc::internal::Call* call) override { call_ = *call; } 784 785 grpc::internal::Call call_; 786 grpc::ServerContext* ctx_; 787 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> 788 meta_ops_; 789 grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<R>> read_ops_; 790 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, 791 grpc::internal::CallOpSendMessage, 792 grpc::internal::CallOpServerSendStatus> 793 finish_ops_; 794 }; 795 796 template <class W> 797 class ServerAsyncWriterInterface 798 : public grpc::internal::ServerAsyncStreamingInterface, 799 public internal::AsyncWriterInterface<W> { 800 public: 801 /// Indicate that the stream is to be finished with a certain status code. 802 /// Request notification for when the server has sent the appropriate 803 /// signals to the client to end the call. 804 /// Should not be used concurrently with other operations. 805 /// 806 /// It is appropriate to call this method when either: 807 /// * all messages from the client have been received (either known 808 /// implicitly, or explicitly because a previous \a 809 /// AsyncReaderInterface::Read operation with a non-ok 810 /// result (e.g., cq->Next(&read_tag, &ok) filled in 'ok' with 'false'. 811 /// * it is desired to end the call early with some non-OK status code. 812 /// 813 /// This operation will end when the server has finished sending out initial 814 /// metadata (if not sent already), response message, and status, or if 815 /// some failure occurred when trying to do so. 816 /// 817 /// gRPC doesn't take ownership or a reference to \a status, so it is safe to 818 /// to deallocate once Finish returns. 819 /// 820 /// \param[in] tag Tag identifying this request. 821 /// \param[in] status To be sent to the client as the result of this call. 822 virtual void Finish(const grpc::Status& status, void* tag) = 0; 823 824 /// Request the writing of \a msg and coalesce it with trailing metadata which 825 /// contains \a status, using WriteOptions options with 826 /// identifying tag \a tag. 827 /// 828 /// WriteAndFinish is equivalent of performing WriteLast and Finish 829 /// in a single step. 830 /// 831 /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it 832 /// is safe to deallocate once WriteAndFinish returns. 833 /// 834 /// \param[in] msg The message to be written. 835 /// \param[in] options The WriteOptions to be used to write this message. 836 /// \param[in] status The Status that server returns to client. 837 /// \param[in] tag The tag identifying the operation. 838 virtual void WriteAndFinish(const W& msg, grpc::WriteOptions options, 839 const grpc::Status& status, void* tag) = 0; 840 }; 841 842 /// Async server-side API for doing server streaming RPCs, 843 /// where the outgoing message stream from the server has messages of type \a W. 844 template <class W> 845 class ServerAsyncWriter final : public ServerAsyncWriterInterface<W> { 846 public: ServerAsyncWriter(grpc::ServerContext * ctx)847 explicit ServerAsyncWriter(grpc::ServerContext* ctx) 848 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} 849 850 /// See \a ServerAsyncStreamingInterface::SendInitialMetadata for semantics. 851 /// 852 /// Implicit input parameter: 853 /// - The initial metadata that will be sent to the client from this op will 854 /// be taken from the \a ServerContext associated with the call. 855 /// 856 /// \param[in] tag Tag identifying this request. SendInitialMetadata(void * tag)857 void SendInitialMetadata(void* tag) override { 858 GPR_ASSERT(!ctx_->sent_initial_metadata_); 859 860 meta_ops_.set_output_tag(tag); 861 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 862 ctx_->initial_metadata_flags()); 863 if (ctx_->compression_level_set()) { 864 meta_ops_.set_compression_level(ctx_->compression_level()); 865 } 866 ctx_->sent_initial_metadata_ = true; 867 call_.PerformOps(&meta_ops_); 868 } 869 Write(const W & msg,void * tag)870 void Write(const W& msg, void* tag) override { 871 write_ops_.set_output_tag(tag); 872 EnsureInitialMetadataSent(&write_ops_); 873 // TODO(ctiller): don't assert 874 GPR_ASSERT(write_ops_.SendMessage(msg).ok()); 875 call_.PerformOps(&write_ops_); 876 } 877 Write(const W & msg,grpc::WriteOptions options,void * tag)878 void Write(const W& msg, grpc::WriteOptions options, void* tag) override { 879 write_ops_.set_output_tag(tag); 880 if (options.is_last_message()) { 881 options.set_buffer_hint(); 882 } 883 884 EnsureInitialMetadataSent(&write_ops_); 885 // TODO(ctiller): don't assert 886 GPR_ASSERT(write_ops_.SendMessage(msg, options).ok()); 887 call_.PerformOps(&write_ops_); 888 } 889 890 /// See the \a ServerAsyncWriterInterface.WriteAndFinish method for semantics. 891 /// 892 /// Implicit input parameter: 893 /// - the \a ServerContext associated with this call is used 894 /// for sending trailing (and initial) metadata to the client. 895 /// 896 /// Note: \a status must have an OK code. 897 /// 898 /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it 899 /// is safe to deallocate once WriteAndFinish returns. WriteAndFinish(const W & msg,grpc::WriteOptions options,const grpc::Status & status,void * tag)900 void WriteAndFinish(const W& msg, grpc::WriteOptions options, 901 const grpc::Status& status, void* tag) override { 902 write_ops_.set_output_tag(tag); 903 EnsureInitialMetadataSent(&write_ops_); 904 options.set_buffer_hint(); 905 GPR_ASSERT(write_ops_.SendMessage(msg, options).ok()); 906 write_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status); 907 call_.PerformOps(&write_ops_); 908 } 909 910 /// See the \a ServerAsyncWriterInterface.Finish method for semantics. 911 /// 912 /// Implicit input parameter: 913 /// - the \a ServerContext associated with this call is used for sending 914 /// trailing (and initial if not already sent) metadata to the client. 915 /// 916 /// Note: there are no restrictions are the code of 917 /// \a status,it may be non-OK 918 /// 919 /// gRPC doesn't take ownership or a reference to \a status, so it is safe to 920 /// to deallocate once Finish returns. Finish(const grpc::Status & status,void * tag)921 void Finish(const grpc::Status& status, void* tag) override { 922 finish_ops_.set_output_tag(tag); 923 EnsureInitialMetadataSent(&finish_ops_); 924 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status); 925 call_.PerformOps(&finish_ops_); 926 } 927 928 private: BindCall(grpc::internal::Call * call)929 void BindCall(grpc::internal::Call* call) override { call_ = *call; } 930 931 template <class T> EnsureInitialMetadataSent(T * ops)932 void EnsureInitialMetadataSent(T* ops) { 933 if (!ctx_->sent_initial_metadata_) { 934 ops->SendInitialMetadata(&ctx_->initial_metadata_, 935 ctx_->initial_metadata_flags()); 936 if (ctx_->compression_level_set()) { 937 ops->set_compression_level(ctx_->compression_level()); 938 } 939 ctx_->sent_initial_metadata_ = true; 940 } 941 } 942 943 grpc::internal::Call call_; 944 grpc::ServerContext* ctx_; 945 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> 946 meta_ops_; 947 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, 948 grpc::internal::CallOpSendMessage, 949 grpc::internal::CallOpServerSendStatus> 950 write_ops_; 951 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, 952 grpc::internal::CallOpServerSendStatus> 953 finish_ops_; 954 }; 955 956 /// Server-side interface for asynchronous bi-directional streaming. 957 template <class W, class R> 958 class ServerAsyncReaderWriterInterface 959 : public grpc::internal::ServerAsyncStreamingInterface, 960 public internal::AsyncWriterInterface<W>, 961 public internal::AsyncReaderInterface<R> { 962 public: 963 /// Indicate that the stream is to be finished with a certain status code. 964 /// Request notification for when the server has sent the appropriate 965 /// signals to the client to end the call. 966 /// Should not be used concurrently with other operations. 967 /// 968 /// It is appropriate to call this method when either: 969 /// * all messages from the client have been received (either known 970 /// implicitly, or explicitly because a previous \a 971 /// AsyncReaderInterface::Read operation 972 /// with a non-ok result (e.g., cq->Next(&read_tag, &ok) filled in 'ok' 973 /// with 'false'. 974 /// * it is desired to end the call early with some non-OK status code. 975 /// 976 /// This operation will end when the server has finished sending out initial 977 /// metadata (if not sent already), response message, and status, or if some 978 /// failure occurred when trying to do so. 979 /// 980 /// gRPC doesn't take ownership or a reference to \a status, so it is safe to 981 /// to deallocate once Finish returns. 982 /// 983 /// \param[in] tag Tag identifying this request. 984 /// \param[in] status To be sent to the client as the result of this call. 985 virtual void Finish(const grpc::Status& status, void* tag) = 0; 986 987 /// Request the writing of \a msg and coalesce it with trailing metadata which 988 /// contains \a status, using WriteOptions options with 989 /// identifying tag \a tag. 990 /// 991 /// WriteAndFinish is equivalent of performing WriteLast and Finish in a 992 /// single step. 993 /// 994 /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it 995 /// is safe to deallocate once WriteAndFinish returns. 996 /// 997 /// \param[in] msg The message to be written. 998 /// \param[in] options The WriteOptions to be used to write this message. 999 /// \param[in] status The Status that server returns to client. 1000 /// \param[in] tag The tag identifying the operation. 1001 virtual void WriteAndFinish(const W& msg, grpc::WriteOptions options, 1002 const grpc::Status& status, void* tag) = 0; 1003 }; 1004 1005 /// Async server-side API for doing bidirectional streaming RPCs, 1006 /// where the incoming message stream coming from the client has messages of 1007 /// type \a R, and the outgoing message stream coming from the server has 1008 /// messages of type \a W. 1009 template <class W, class R> 1010 class ServerAsyncReaderWriter final 1011 : public ServerAsyncReaderWriterInterface<W, R> { 1012 public: ServerAsyncReaderWriter(grpc::ServerContext * ctx)1013 explicit ServerAsyncReaderWriter(grpc::ServerContext* ctx) 1014 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} 1015 1016 /// See \a ServerAsyncStreamingInterface::SendInitialMetadata for semantics. 1017 /// 1018 /// Implicit input parameter: 1019 /// - The initial metadata that will be sent to the client from this op will 1020 /// be taken from the \a ServerContext associated with the call. 1021 /// 1022 /// \param[in] tag Tag identifying this request. SendInitialMetadata(void * tag)1023 void SendInitialMetadata(void* tag) override { 1024 GPR_ASSERT(!ctx_->sent_initial_metadata_); 1025 1026 meta_ops_.set_output_tag(tag); 1027 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 1028 ctx_->initial_metadata_flags()); 1029 if (ctx_->compression_level_set()) { 1030 meta_ops_.set_compression_level(ctx_->compression_level()); 1031 } 1032 ctx_->sent_initial_metadata_ = true; 1033 call_.PerformOps(&meta_ops_); 1034 } 1035 Read(R * msg,void * tag)1036 void Read(R* msg, void* tag) override { 1037 read_ops_.set_output_tag(tag); 1038 read_ops_.RecvMessage(msg); 1039 call_.PerformOps(&read_ops_); 1040 } 1041 Write(const W & msg,void * tag)1042 void Write(const W& msg, void* tag) override { 1043 write_ops_.set_output_tag(tag); 1044 EnsureInitialMetadataSent(&write_ops_); 1045 // TODO(ctiller): don't assert 1046 GPR_ASSERT(write_ops_.SendMessage(msg).ok()); 1047 call_.PerformOps(&write_ops_); 1048 } 1049 Write(const W & msg,grpc::WriteOptions options,void * tag)1050 void Write(const W& msg, grpc::WriteOptions options, void* tag) override { 1051 write_ops_.set_output_tag(tag); 1052 if (options.is_last_message()) { 1053 options.set_buffer_hint(); 1054 } 1055 EnsureInitialMetadataSent(&write_ops_); 1056 GPR_ASSERT(write_ops_.SendMessage(msg, options).ok()); 1057 call_.PerformOps(&write_ops_); 1058 } 1059 1060 /// See the \a ServerAsyncReaderWriterInterface.WriteAndFinish 1061 /// method for semantics. 1062 /// 1063 /// Implicit input parameter: 1064 /// - the \a ServerContext associated with this call is used 1065 /// for sending trailing (and initial) metadata to the client. 1066 /// 1067 /// Note: \a status must have an OK code. 1068 // 1069 /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it 1070 /// is safe to deallocate once WriteAndFinish returns. WriteAndFinish(const W & msg,grpc::WriteOptions options,const grpc::Status & status,void * tag)1071 void WriteAndFinish(const W& msg, grpc::WriteOptions options, 1072 const grpc::Status& status, void* tag) override { 1073 write_ops_.set_output_tag(tag); 1074 EnsureInitialMetadataSent(&write_ops_); 1075 options.set_buffer_hint(); 1076 GPR_ASSERT(write_ops_.SendMessage(msg, options).ok()); 1077 write_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status); 1078 call_.PerformOps(&write_ops_); 1079 } 1080 1081 /// See the \a ServerAsyncReaderWriterInterface.Finish method for semantics. 1082 /// 1083 /// Implicit input parameter: 1084 /// - the \a ServerContext associated with this call is used for sending 1085 /// trailing (and initial if not already sent) metadata to the client. 1086 /// 1087 /// Note: there are no restrictions are the code of \a status, 1088 /// it may be non-OK 1089 // 1090 /// gRPC doesn't take ownership or a reference to \a status, so it is safe to 1091 /// to deallocate once Finish returns. Finish(const grpc::Status & status,void * tag)1092 void Finish(const grpc::Status& status, void* tag) override { 1093 finish_ops_.set_output_tag(tag); 1094 EnsureInitialMetadataSent(&finish_ops_); 1095 1096 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status); 1097 call_.PerformOps(&finish_ops_); 1098 } 1099 1100 private: 1101 friend class grpc::Server; 1102 BindCall(grpc::internal::Call * call)1103 void BindCall(grpc::internal::Call* call) override { call_ = *call; } 1104 1105 template <class T> EnsureInitialMetadataSent(T * ops)1106 void EnsureInitialMetadataSent(T* ops) { 1107 if (!ctx_->sent_initial_metadata_) { 1108 ops->SendInitialMetadata(&ctx_->initial_metadata_, 1109 ctx_->initial_metadata_flags()); 1110 if (ctx_->compression_level_set()) { 1111 ops->set_compression_level(ctx_->compression_level()); 1112 } 1113 ctx_->sent_initial_metadata_ = true; 1114 } 1115 } 1116 1117 grpc::internal::Call call_; 1118 grpc::ServerContext* ctx_; 1119 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> 1120 meta_ops_; 1121 grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<R>> read_ops_; 1122 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, 1123 grpc::internal::CallOpSendMessage, 1124 grpc::internal::CallOpServerSendStatus> 1125 write_ops_; 1126 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, 1127 grpc::internal::CallOpServerSendStatus> 1128 finish_ops_; 1129 }; 1130 1131 } // namespace grpc 1132 1133 #endif // GRPCPP_SUPPORT_ASYNC_STREAM_H 1134