xref: /aosp_15_r20/external/grpc-grpc/include/grpcpp/impl/server_callback_handlers.h (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2019 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 
18 #ifndef GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H
19 #define GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H
20 
21 #include <grpc/grpc.h>
22 #include <grpc/impl/call.h>
23 #include <grpc/support/log.h>
24 #include <grpcpp/impl/rpc_service_method.h>
25 #include <grpcpp/server_context.h>
26 #include <grpcpp/support/message_allocator.h>
27 #include <grpcpp/support/server_callback.h>
28 #include <grpcpp/support/status.h>
29 
30 namespace grpc {
31 namespace internal {
32 
33 template <class RequestType, class ResponseType>
34 class CallbackUnaryHandler : public grpc::internal::MethodHandler {
35  public:
CallbackUnaryHandler(std::function<ServerUnaryReactor * (grpc::CallbackServerContext *,const RequestType *,ResponseType *)> get_reactor)36   explicit CallbackUnaryHandler(
37       std::function<ServerUnaryReactor*(grpc::CallbackServerContext*,
38                                         const RequestType*, ResponseType*)>
39           get_reactor)
40       : get_reactor_(std::move(get_reactor)) {}
41 
SetMessageAllocator(MessageAllocator<RequestType,ResponseType> * allocator)42   void SetMessageAllocator(
43       MessageAllocator<RequestType, ResponseType>* allocator) {
44     allocator_ = allocator;
45   }
46 
RunHandler(const HandlerParameter & param)47   void RunHandler(const HandlerParameter& param) final {
48     // Arena allocate a controller structure (that includes request/response)
49     grpc_call_ref(param.call->call());
50     auto* allocator_state =
51         static_cast<MessageHolder<RequestType, ResponseType>*>(
52             param.internal_data);
53 
54     auto* call = new (grpc_call_arena_alloc(param.call->call(),
55                                             sizeof(ServerCallbackUnaryImpl)))
56         ServerCallbackUnaryImpl(
57             static_cast<grpc::CallbackServerContext*>(param.server_context),
58             param.call, allocator_state, param.call_requester);
59     param.server_context->BeginCompletionOp(
60         param.call, [call](bool) { call->MaybeDone(); }, call);
61 
62     ServerUnaryReactor* reactor = nullptr;
63     if (param.status.ok()) {
64       reactor = grpc::internal::CatchingReactorGetter<ServerUnaryReactor>(
65           get_reactor_,
66           static_cast<grpc::CallbackServerContext*>(param.server_context),
67           call->request(), call->response());
68     }
69 
70     if (reactor == nullptr) {
71       // if deserialization or reactor creator failed, we need to fail the call
72       reactor = new (grpc_call_arena_alloc(param.call->call(),
73                                            sizeof(UnimplementedUnaryReactor)))
74           UnimplementedUnaryReactor(
75               grpc::Status(grpc::StatusCode::UNIMPLEMENTED, ""));
76     }
77 
78     /// Invoke SetupReactor as the last part of the handler
79     call->SetupReactor(reactor);
80   }
81 
Deserialize(grpc_call * call,grpc_byte_buffer * req,grpc::Status * status,void ** handler_data)82   void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
83                     grpc::Status* status, void** handler_data) final {
84     grpc::ByteBuffer buf;
85     buf.set_buffer(req);
86     RequestType* request = nullptr;
87     MessageHolder<RequestType, ResponseType>* allocator_state;
88     if (allocator_ != nullptr) {
89       allocator_state = allocator_->AllocateMessages();
90     } else {
91       allocator_state = new (grpc_call_arena_alloc(
92           call, sizeof(DefaultMessageHolder<RequestType, ResponseType>)))
93           DefaultMessageHolder<RequestType, ResponseType>();
94     }
95     *handler_data = allocator_state;
96     request = allocator_state->request();
97     *status =
98         grpc::SerializationTraits<RequestType>::Deserialize(&buf, request);
99     buf.Release();
100     if (status->ok()) {
101       return request;
102     }
103     return nullptr;
104   }
105 
106  private:
107   std::function<ServerUnaryReactor*(grpc::CallbackServerContext*,
108                                     const RequestType*, ResponseType*)>
109       get_reactor_;
110   MessageAllocator<RequestType, ResponseType>* allocator_ = nullptr;
111 
112   class ServerCallbackUnaryImpl : public ServerCallbackUnary {
113    public:
Finish(grpc::Status s)114     void Finish(grpc::Status s) override {
115       // A callback that only contains a call to MaybeDone can be run as an
116       // inline callback regardless of whether or not OnDone is inlineable
117       // because if the actual OnDone callback needs to be scheduled, MaybeDone
118       // is responsible for dispatching to an executor thread if needed. Thus,
119       // when setting up the finish_tag_, we can set its own callback to
120       // inlineable.
121       finish_tag_.Set(
122           call_.call(),
123           [this](bool) {
124             this->MaybeDone(
125                 reactor_.load(std::memory_order_relaxed)->InternalInlineable());
126           },
127           &finish_ops_, /*can_inline=*/true);
128       finish_ops_.set_core_cq_tag(&finish_tag_);
129 
130       if (!ctx_->sent_initial_metadata_) {
131         finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
132                                         ctx_->initial_metadata_flags());
133         if (ctx_->compression_level_set()) {
134           finish_ops_.set_compression_level(ctx_->compression_level());
135         }
136         ctx_->sent_initial_metadata_ = true;
137       }
138       // The response is dropped if the status is not OK.
139       if (s.ok()) {
140         finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
141                                      finish_ops_.SendMessagePtr(response()));
142       } else {
143         finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
144       }
145       finish_ops_.set_core_cq_tag(&finish_tag_);
146       call_.PerformOps(&finish_ops_);
147     }
148 
SendInitialMetadata()149     void SendInitialMetadata() override {
150       GPR_ASSERT(!ctx_->sent_initial_metadata_);
151       this->Ref();
152       // The callback for this function should not be marked inline because it
153       // is directly invoking a user-controlled reaction
154       // (OnSendInitialMetadataDone). Thus it must be dispatched to an executor
155       // thread. However, any OnDone needed after that can be inlined because it
156       // is already running on an executor thread.
157       meta_tag_.Set(
158           call_.call(),
159           [this](bool ok) {
160             ServerUnaryReactor* reactor =
161                 reactor_.load(std::memory_order_relaxed);
162             reactor->OnSendInitialMetadataDone(ok);
163             this->MaybeDone(/*inlineable_ondone=*/true);
164           },
165           &meta_ops_, /*can_inline=*/false);
166       meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
167                                     ctx_->initial_metadata_flags());
168       if (ctx_->compression_level_set()) {
169         meta_ops_.set_compression_level(ctx_->compression_level());
170       }
171       ctx_->sent_initial_metadata_ = true;
172       meta_ops_.set_core_cq_tag(&meta_tag_);
173       call_.PerformOps(&meta_ops_);
174     }
175 
176    private:
177     friend class CallbackUnaryHandler<RequestType, ResponseType>;
178 
ServerCallbackUnaryImpl(grpc::CallbackServerContext * ctx,grpc::internal::Call * call,MessageHolder<RequestType,ResponseType> * allocator_state,std::function<void ()> call_requester)179     ServerCallbackUnaryImpl(
180         grpc::CallbackServerContext* ctx, grpc::internal::Call* call,
181         MessageHolder<RequestType, ResponseType>* allocator_state,
182         std::function<void()> call_requester)
183         : ctx_(ctx),
184           call_(*call),
185           allocator_state_(allocator_state),
186           call_requester_(std::move(call_requester)) {
187       ctx_->set_message_allocator_state(allocator_state);
188     }
189 
call()190     grpc_call* call() override { return call_.call(); }
191 
192     /// SetupReactor binds the reactor (which also releases any queued
193     /// operations), maybe calls OnCancel if possible/needed, and maybe marks
194     /// the completion of the RPC. This should be the last component of the
195     /// handler.
SetupReactor(ServerUnaryReactor * reactor)196     void SetupReactor(ServerUnaryReactor* reactor) {
197       reactor_.store(reactor, std::memory_order_relaxed);
198       this->BindReactor(reactor);
199       this->MaybeCallOnCancel(reactor);
200       this->MaybeDone(reactor->InternalInlineable());
201     }
202 
request()203     const RequestType* request() { return allocator_state_->request(); }
response()204     ResponseType* response() { return allocator_state_->response(); }
205 
CallOnDone()206     void CallOnDone() override {
207       reactor_.load(std::memory_order_relaxed)->OnDone();
208       grpc_call* call = call_.call();
209       auto call_requester = std::move(call_requester_);
210       allocator_state_->Release();
211       if (ctx_->context_allocator() != nullptr) {
212         ctx_->context_allocator()->Release(ctx_);
213       }
214       this->~ServerCallbackUnaryImpl();  // explicitly call destructor
215       grpc_call_unref(call);
216       call_requester();
217     }
218 
reactor()219     ServerReactor* reactor() override {
220       return reactor_.load(std::memory_order_relaxed);
221     }
222 
223     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata>
224         meta_ops_;
225     grpc::internal::CallbackWithSuccessTag meta_tag_;
226     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
227                               grpc::internal::CallOpSendMessage,
228                               grpc::internal::CallOpServerSendStatus>
229         finish_ops_;
230     grpc::internal::CallbackWithSuccessTag finish_tag_;
231 
232     grpc::CallbackServerContext* const ctx_;
233     grpc::internal::Call call_;
234     MessageHolder<RequestType, ResponseType>* const allocator_state_;
235     std::function<void()> call_requester_;
236     // reactor_ can always be loaded/stored with relaxed memory ordering because
237     // its value is only set once, independently of other data in the object,
238     // and the loads that use it will always actually come provably later even
239     // though they are from different threads since they are triggered by
240     // actions initiated only by the setting up of the reactor_ variable. In
241     // a sense, it's a delayed "const": it gets its value from the SetupReactor
242     // method (not the constructor, so it's not a true const), but it doesn't
243     // change after that and it only gets used by actions caused, directly or
244     // indirectly, by that setup. This comment also applies to the reactor_
245     // variables of the other streaming objects in this file.
246     std::atomic<ServerUnaryReactor*> reactor_;
247     // callbacks_outstanding_ follows a refcount pattern
248     std::atomic<intptr_t> callbacks_outstanding_{
249         3};  // reserve for start, Finish, and CompletionOp
250   };
251 };
252 
253 template <class RequestType, class ResponseType>
254 class CallbackClientStreamingHandler : public grpc::internal::MethodHandler {
255  public:
CallbackClientStreamingHandler(std::function<ServerReadReactor<RequestType> * (grpc::CallbackServerContext *,ResponseType *)> get_reactor)256   explicit CallbackClientStreamingHandler(
257       std::function<ServerReadReactor<RequestType>*(
258           grpc::CallbackServerContext*, ResponseType*)>
259           get_reactor)
260       : get_reactor_(std::move(get_reactor)) {}
RunHandler(const HandlerParameter & param)261   void RunHandler(const HandlerParameter& param) final {
262     // Arena allocate a reader structure (that includes response)
263     grpc_call_ref(param.call->call());
264 
265     auto* reader = new (grpc_call_arena_alloc(param.call->call(),
266                                               sizeof(ServerCallbackReaderImpl)))
267         ServerCallbackReaderImpl(
268             static_cast<grpc::CallbackServerContext*>(param.server_context),
269             param.call, param.call_requester);
270     // Inlineable OnDone can be false in the CompletionOp callback because there
271     // is no read reactor that has an inlineable OnDone; this only applies to
272     // the DefaultReactor (which is unary).
273     param.server_context->BeginCompletionOp(
274         param.call,
275         [reader](bool) { reader->MaybeDone(/*inlineable_ondone=*/false); },
276         reader);
277 
278     ServerReadReactor<RequestType>* reactor = nullptr;
279     if (param.status.ok()) {
280       reactor =
281           grpc::internal::CatchingReactorGetter<ServerReadReactor<RequestType>>(
282               get_reactor_,
283               static_cast<grpc::CallbackServerContext*>(param.server_context),
284               reader->response());
285     }
286 
287     if (reactor == nullptr) {
288       // if deserialization or reactor creator failed, we need to fail the call
289       reactor = new (grpc_call_arena_alloc(
290           param.call->call(), sizeof(UnimplementedReadReactor<RequestType>)))
291           UnimplementedReadReactor<RequestType>(
292               grpc::Status(grpc::StatusCode::UNIMPLEMENTED, ""));
293     }
294 
295     reader->SetupReactor(reactor);
296   }
297 
298  private:
299   std::function<ServerReadReactor<RequestType>*(grpc::CallbackServerContext*,
300                                                 ResponseType*)>
301       get_reactor_;
302 
303   class ServerCallbackReaderImpl : public ServerCallbackReader<RequestType> {
304    public:
Finish(grpc::Status s)305     void Finish(grpc::Status s) override {
306       // A finish tag with only MaybeDone can have its callback inlined
307       // regardless even if OnDone is not inlineable because this callback just
308       // checks a ref and then decides whether or not to dispatch OnDone.
309       finish_tag_.Set(
310           call_.call(),
311           [this](bool) {
312             // Inlineable OnDone can be false here because there is
313             // no read reactor that has an inlineable OnDone; this
314             // only applies to the DefaultReactor (which is unary).
315             this->MaybeDone(/*inlineable_ondone=*/false);
316           },
317           &finish_ops_, /*can_inline=*/true);
318       if (!ctx_->sent_initial_metadata_) {
319         finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
320                                         ctx_->initial_metadata_flags());
321         if (ctx_->compression_level_set()) {
322           finish_ops_.set_compression_level(ctx_->compression_level());
323         }
324         ctx_->sent_initial_metadata_ = true;
325       }
326       // The response is dropped if the status is not OK.
327       if (s.ok()) {
328         finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
329                                      finish_ops_.SendMessagePtr(&resp_));
330       } else {
331         finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
332       }
333       finish_ops_.set_core_cq_tag(&finish_tag_);
334       call_.PerformOps(&finish_ops_);
335     }
336 
SendInitialMetadata()337     void SendInitialMetadata() override {
338       GPR_ASSERT(!ctx_->sent_initial_metadata_);
339       this->Ref();
340       // The callback for this function should not be inlined because it invokes
341       // a user-controlled reaction, but any resulting OnDone can be inlined in
342       // the executor to which this callback is dispatched.
343       meta_tag_.Set(
344           call_.call(),
345           [this](bool ok) {
346             ServerReadReactor<RequestType>* reactor =
347                 reactor_.load(std::memory_order_relaxed);
348             reactor->OnSendInitialMetadataDone(ok);
349             this->MaybeDone(/*inlineable_ondone=*/true);
350           },
351           &meta_ops_, /*can_inline=*/false);
352       meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
353                                     ctx_->initial_metadata_flags());
354       if (ctx_->compression_level_set()) {
355         meta_ops_.set_compression_level(ctx_->compression_level());
356       }
357       ctx_->sent_initial_metadata_ = true;
358       meta_ops_.set_core_cq_tag(&meta_tag_);
359       call_.PerformOps(&meta_ops_);
360     }
361 
Read(RequestType * req)362     void Read(RequestType* req) override {
363       this->Ref();
364       read_ops_.RecvMessage(req);
365       call_.PerformOps(&read_ops_);
366     }
367 
368    private:
369     friend class CallbackClientStreamingHandler<RequestType, ResponseType>;
370 
ServerCallbackReaderImpl(grpc::CallbackServerContext * ctx,grpc::internal::Call * call,std::function<void ()> call_requester)371     ServerCallbackReaderImpl(grpc::CallbackServerContext* ctx,
372                              grpc::internal::Call* call,
373                              std::function<void()> call_requester)
374         : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
375 
call()376     grpc_call* call() override { return call_.call(); }
377 
SetupReactor(ServerReadReactor<RequestType> * reactor)378     void SetupReactor(ServerReadReactor<RequestType>* reactor) {
379       reactor_.store(reactor, std::memory_order_relaxed);
380       // The callback for this function should not be inlined because it invokes
381       // a user-controlled reaction, but any resulting OnDone can be inlined in
382       // the executor to which this callback is dispatched.
383       read_tag_.Set(
384           call_.call(),
385           [this, reactor](bool ok) {
386             if (GPR_UNLIKELY(!ok)) {
387               ctx_->MaybeMarkCancelledOnRead();
388             }
389             reactor->OnReadDone(ok);
390             this->MaybeDone(/*inlineable_ondone=*/true);
391           },
392           &read_ops_, /*can_inline=*/false);
393       read_ops_.set_core_cq_tag(&read_tag_);
394       this->BindReactor(reactor);
395       this->MaybeCallOnCancel(reactor);
396       // Inlineable OnDone can be false here because there is no read
397       // reactor that has an inlineable OnDone; this only applies to the
398       // DefaultReactor (which is unary).
399       this->MaybeDone(/*inlineable_ondone=*/false);
400     }
401 
~ServerCallbackReaderImpl()402     ~ServerCallbackReaderImpl() {}
403 
response()404     ResponseType* response() { return &resp_; }
405 
CallOnDone()406     void CallOnDone() override {
407       reactor_.load(std::memory_order_relaxed)->OnDone();
408       grpc_call* call = call_.call();
409       auto call_requester = std::move(call_requester_);
410       if (ctx_->context_allocator() != nullptr) {
411         ctx_->context_allocator()->Release(ctx_);
412       }
413       this->~ServerCallbackReaderImpl();  // explicitly call destructor
414       grpc_call_unref(call);
415       call_requester();
416     }
417 
reactor()418     ServerReactor* reactor() override {
419       return reactor_.load(std::memory_order_relaxed);
420     }
421 
422     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata>
423         meta_ops_;
424     grpc::internal::CallbackWithSuccessTag meta_tag_;
425     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
426                               grpc::internal::CallOpSendMessage,
427                               grpc::internal::CallOpServerSendStatus>
428         finish_ops_;
429     grpc::internal::CallbackWithSuccessTag finish_tag_;
430     grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<RequestType>>
431         read_ops_;
432     grpc::internal::CallbackWithSuccessTag read_tag_;
433 
434     grpc::CallbackServerContext* const ctx_;
435     grpc::internal::Call call_;
436     ResponseType resp_;
437     std::function<void()> call_requester_;
438     // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
439     std::atomic<ServerReadReactor<RequestType>*> reactor_;
440     // callbacks_outstanding_ follows a refcount pattern
441     std::atomic<intptr_t> callbacks_outstanding_{
442         3};  // reserve for OnStarted, Finish, and CompletionOp
443   };
444 };
445 
446 template <class RequestType, class ResponseType>
447 class CallbackServerStreamingHandler : public grpc::internal::MethodHandler {
448  public:
CallbackServerStreamingHandler(std::function<ServerWriteReactor<ResponseType> * (grpc::CallbackServerContext *,const RequestType *)> get_reactor)449   explicit CallbackServerStreamingHandler(
450       std::function<ServerWriteReactor<ResponseType>*(
451           grpc::CallbackServerContext*, const RequestType*)>
452           get_reactor)
453       : get_reactor_(std::move(get_reactor)) {}
RunHandler(const HandlerParameter & param)454   void RunHandler(const HandlerParameter& param) final {
455     // Arena allocate a writer structure
456     grpc_call_ref(param.call->call());
457 
458     auto* writer = new (grpc_call_arena_alloc(param.call->call(),
459                                               sizeof(ServerCallbackWriterImpl)))
460         ServerCallbackWriterImpl(
461             static_cast<grpc::CallbackServerContext*>(param.server_context),
462             param.call, static_cast<RequestType*>(param.request),
463             param.call_requester);
464     // Inlineable OnDone can be false in the CompletionOp callback because there
465     // is no write reactor that has an inlineable OnDone; this only applies to
466     // the DefaultReactor (which is unary).
467     param.server_context->BeginCompletionOp(
468         param.call,
469         [writer](bool) { writer->MaybeDone(/*inlineable_ondone=*/false); },
470         writer);
471 
472     ServerWriteReactor<ResponseType>* reactor = nullptr;
473     if (param.status.ok()) {
474       reactor = grpc::internal::CatchingReactorGetter<
475           ServerWriteReactor<ResponseType>>(
476           get_reactor_,
477           static_cast<grpc::CallbackServerContext*>(param.server_context),
478           writer->request());
479     }
480     if (reactor == nullptr) {
481       // if deserialization or reactor creator failed, we need to fail the call
482       reactor = new (grpc_call_arena_alloc(
483           param.call->call(), sizeof(UnimplementedWriteReactor<ResponseType>)))
484           UnimplementedWriteReactor<ResponseType>(
485               grpc::Status(grpc::StatusCode::UNIMPLEMENTED, ""));
486     }
487 
488     writer->SetupReactor(reactor);
489   }
490 
Deserialize(grpc_call * call,grpc_byte_buffer * req,grpc::Status * status,void **)491   void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
492                     grpc::Status* status, void** /*handler_data*/) final {
493     grpc::ByteBuffer buf;
494     buf.set_buffer(req);
495     auto* request =
496         new (grpc_call_arena_alloc(call, sizeof(RequestType))) RequestType();
497     *status =
498         grpc::SerializationTraits<RequestType>::Deserialize(&buf, request);
499     buf.Release();
500     if (status->ok()) {
501       return request;
502     }
503     request->~RequestType();
504     return nullptr;
505   }
506 
507  private:
508   std::function<ServerWriteReactor<ResponseType>*(grpc::CallbackServerContext*,
509                                                   const RequestType*)>
510       get_reactor_;
511 
512   class ServerCallbackWriterImpl : public ServerCallbackWriter<ResponseType> {
513    public:
Finish(grpc::Status s)514     void Finish(grpc::Status s) override {
515       // A finish tag with only MaybeDone can have its callback inlined
516       // regardless even if OnDone is not inlineable because this callback just
517       // checks a ref and then decides whether or not to dispatch OnDone.
518       finish_tag_.Set(
519           call_.call(),
520           [this](bool) {
521             // Inlineable OnDone can be false here because there is
522             // no write reactor that has an inlineable OnDone; this
523             // only applies to the DefaultReactor (which is unary).
524             this->MaybeDone(/*inlineable_ondone=*/false);
525           },
526           &finish_ops_, /*can_inline=*/true);
527       finish_ops_.set_core_cq_tag(&finish_tag_);
528 
529       if (!ctx_->sent_initial_metadata_) {
530         finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
531                                         ctx_->initial_metadata_flags());
532         if (ctx_->compression_level_set()) {
533           finish_ops_.set_compression_level(ctx_->compression_level());
534         }
535         ctx_->sent_initial_metadata_ = true;
536       }
537       finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
538       call_.PerformOps(&finish_ops_);
539     }
540 
SendInitialMetadata()541     void SendInitialMetadata() override {
542       GPR_ASSERT(!ctx_->sent_initial_metadata_);
543       this->Ref();
544       // The callback for this function should not be inlined because it invokes
545       // a user-controlled reaction, but any resulting OnDone can be inlined in
546       // the executor to which this callback is dispatched.
547       meta_tag_.Set(
548           call_.call(),
549           [this](bool ok) {
550             ServerWriteReactor<ResponseType>* reactor =
551                 reactor_.load(std::memory_order_relaxed);
552             reactor->OnSendInitialMetadataDone(ok);
553             this->MaybeDone(/*inlineable_ondone=*/true);
554           },
555           &meta_ops_, /*can_inline=*/false);
556       meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
557                                     ctx_->initial_metadata_flags());
558       if (ctx_->compression_level_set()) {
559         meta_ops_.set_compression_level(ctx_->compression_level());
560       }
561       ctx_->sent_initial_metadata_ = true;
562       meta_ops_.set_core_cq_tag(&meta_tag_);
563       call_.PerformOps(&meta_ops_);
564     }
565 
Write(const ResponseType * resp,grpc::WriteOptions options)566     void Write(const ResponseType* resp, grpc::WriteOptions options) override {
567       this->Ref();
568       if (options.is_last_message()) {
569         options.set_buffer_hint();
570       }
571       if (!ctx_->sent_initial_metadata_) {
572         write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
573                                        ctx_->initial_metadata_flags());
574         if (ctx_->compression_level_set()) {
575           write_ops_.set_compression_level(ctx_->compression_level());
576         }
577         ctx_->sent_initial_metadata_ = true;
578       }
579       // TODO(vjpai): don't assert
580       GPR_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
581       call_.PerformOps(&write_ops_);
582     }
583 
WriteAndFinish(const ResponseType * resp,grpc::WriteOptions options,grpc::Status s)584     void WriteAndFinish(const ResponseType* resp, grpc::WriteOptions options,
585                         grpc::Status s) override {
586       // This combines the write into the finish callback
587       // TODO(vjpai): don't assert
588       GPR_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
589       Finish(std::move(s));
590     }
591 
592    private:
593     friend class CallbackServerStreamingHandler<RequestType, ResponseType>;
594 
ServerCallbackWriterImpl(grpc::CallbackServerContext * ctx,grpc::internal::Call * call,const RequestType * req,std::function<void ()> call_requester)595     ServerCallbackWriterImpl(grpc::CallbackServerContext* ctx,
596                              grpc::internal::Call* call, const RequestType* req,
597                              std::function<void()> call_requester)
598         : ctx_(ctx),
599           call_(*call),
600           req_(req),
601           call_requester_(std::move(call_requester)) {}
602 
call()603     grpc_call* call() override { return call_.call(); }
604 
SetupReactor(ServerWriteReactor<ResponseType> * reactor)605     void SetupReactor(ServerWriteReactor<ResponseType>* reactor) {
606       reactor_.store(reactor, std::memory_order_relaxed);
607       // The callback for this function should not be inlined because it invokes
608       // a user-controlled reaction, but any resulting OnDone can be inlined in
609       // the executor to which this callback is dispatched.
610       write_tag_.Set(
611           call_.call(),
612           [this, reactor](bool ok) {
613             reactor->OnWriteDone(ok);
614             this->MaybeDone(/*inlineable_ondone=*/true);
615           },
616           &write_ops_, /*can_inline=*/false);
617       write_ops_.set_core_cq_tag(&write_tag_);
618       this->BindReactor(reactor);
619       this->MaybeCallOnCancel(reactor);
620       // Inlineable OnDone can be false here because there is no write
621       // reactor that has an inlineable OnDone; this only applies to the
622       // DefaultReactor (which is unary).
623       this->MaybeDone(/*inlineable_ondone=*/false);
624     }
~ServerCallbackWriterImpl()625     ~ServerCallbackWriterImpl() {
626       if (req_ != nullptr) {
627         req_->~RequestType();
628       }
629     }
630 
request()631     const RequestType* request() { return req_; }
632 
CallOnDone()633     void CallOnDone() override {
634       reactor_.load(std::memory_order_relaxed)->OnDone();
635       grpc_call* call = call_.call();
636       auto call_requester = std::move(call_requester_);
637       if (ctx_->context_allocator() != nullptr) {
638         ctx_->context_allocator()->Release(ctx_);
639       }
640       this->~ServerCallbackWriterImpl();  // explicitly call destructor
641       grpc_call_unref(call);
642       call_requester();
643     }
644 
reactor()645     ServerReactor* reactor() override {
646       return reactor_.load(std::memory_order_relaxed);
647     }
648 
649     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata>
650         meta_ops_;
651     grpc::internal::CallbackWithSuccessTag meta_tag_;
652     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
653                               grpc::internal::CallOpSendMessage,
654                               grpc::internal::CallOpServerSendStatus>
655         finish_ops_;
656     grpc::internal::CallbackWithSuccessTag finish_tag_;
657     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
658                               grpc::internal::CallOpSendMessage>
659         write_ops_;
660     grpc::internal::CallbackWithSuccessTag write_tag_;
661 
662     grpc::CallbackServerContext* const ctx_;
663     grpc::internal::Call call_;
664     const RequestType* req_;
665     std::function<void()> call_requester_;
666     // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
667     std::atomic<ServerWriteReactor<ResponseType>*> reactor_;
668     // callbacks_outstanding_ follows a refcount pattern
669     std::atomic<intptr_t> callbacks_outstanding_{
670         3};  // reserve for OnStarted, Finish, and CompletionOp
671   };
672 };
673 
674 template <class RequestType, class ResponseType>
675 class CallbackBidiHandler : public grpc::internal::MethodHandler {
676  public:
CallbackBidiHandler(std::function<ServerBidiReactor<RequestType,ResponseType> * (grpc::CallbackServerContext *)> get_reactor)677   explicit CallbackBidiHandler(
678       std::function<ServerBidiReactor<RequestType, ResponseType>*(
679           grpc::CallbackServerContext*)>
680           get_reactor)
681       : get_reactor_(std::move(get_reactor)) {}
RunHandler(const HandlerParameter & param)682   void RunHandler(const HandlerParameter& param) final {
683     grpc_call_ref(param.call->call());
684 
685     auto* stream = new (grpc_call_arena_alloc(
686         param.call->call(), sizeof(ServerCallbackReaderWriterImpl)))
687         ServerCallbackReaderWriterImpl(
688             static_cast<grpc::CallbackServerContext*>(param.server_context),
689             param.call, param.call_requester);
690     // Inlineable OnDone can be false in the CompletionOp callback because there
691     // is no bidi reactor that has an inlineable OnDone; this only applies to
692     // the DefaultReactor (which is unary).
693     param.server_context->BeginCompletionOp(
694         param.call,
695         [stream](bool) { stream->MaybeDone(/*inlineable_ondone=*/false); },
696         stream);
697 
698     ServerBidiReactor<RequestType, ResponseType>* reactor = nullptr;
699     if (param.status.ok()) {
700       reactor = grpc::internal::CatchingReactorGetter<
701           ServerBidiReactor<RequestType, ResponseType>>(
702           get_reactor_,
703           static_cast<grpc::CallbackServerContext*>(param.server_context));
704     }
705 
706     if (reactor == nullptr) {
707       // if deserialization or reactor creator failed, we need to fail the call
708       reactor = new (grpc_call_arena_alloc(
709           param.call->call(),
710           sizeof(UnimplementedBidiReactor<RequestType, ResponseType>)))
711           UnimplementedBidiReactor<RequestType, ResponseType>(
712               grpc::Status(grpc::StatusCode::UNIMPLEMENTED, ""));
713     }
714 
715     stream->SetupReactor(reactor);
716   }
717 
718  private:
719   std::function<ServerBidiReactor<RequestType, ResponseType>*(
720       grpc::CallbackServerContext*)>
721       get_reactor_;
722 
723   class ServerCallbackReaderWriterImpl
724       : public ServerCallbackReaderWriter<RequestType, ResponseType> {
725    public:
Finish(grpc::Status s)726     void Finish(grpc::Status s) override {
727       // A finish tag with only MaybeDone can have its callback inlined
728       // regardless even if OnDone is not inlineable because this callback just
729       // checks a ref and then decides whether or not to dispatch OnDone.
730       finish_tag_.Set(
731           call_.call(),
732           [this](bool) {
733             // Inlineable OnDone can be false here because there is
734             // no bidi reactor that has an inlineable OnDone; this
735             // only applies to the DefaultReactor (which is unary).
736             this->MaybeDone(/*inlineable_ondone=*/false);
737           },
738           &finish_ops_, /*can_inline=*/true);
739       finish_ops_.set_core_cq_tag(&finish_tag_);
740 
741       if (!ctx_->sent_initial_metadata_) {
742         finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
743                                         ctx_->initial_metadata_flags());
744         if (ctx_->compression_level_set()) {
745           finish_ops_.set_compression_level(ctx_->compression_level());
746         }
747         ctx_->sent_initial_metadata_ = true;
748       }
749       finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
750       call_.PerformOps(&finish_ops_);
751     }
752 
SendInitialMetadata()753     void SendInitialMetadata() override {
754       GPR_ASSERT(!ctx_->sent_initial_metadata_);
755       this->Ref();
756       // The callback for this function should not be inlined because it invokes
757       // a user-controlled reaction, but any resulting OnDone can be inlined in
758       // the executor to which this callback is dispatched.
759       meta_tag_.Set(
760           call_.call(),
761           [this](bool ok) {
762             ServerBidiReactor<RequestType, ResponseType>* reactor =
763                 reactor_.load(std::memory_order_relaxed);
764             reactor->OnSendInitialMetadataDone(ok);
765             this->MaybeDone(/*inlineable_ondone=*/true);
766           },
767           &meta_ops_, /*can_inline=*/false);
768       meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
769                                     ctx_->initial_metadata_flags());
770       if (ctx_->compression_level_set()) {
771         meta_ops_.set_compression_level(ctx_->compression_level());
772       }
773       ctx_->sent_initial_metadata_ = true;
774       meta_ops_.set_core_cq_tag(&meta_tag_);
775       call_.PerformOps(&meta_ops_);
776     }
777 
Write(const ResponseType * resp,grpc::WriteOptions options)778     void Write(const ResponseType* resp, grpc::WriteOptions options) override {
779       this->Ref();
780       if (options.is_last_message()) {
781         options.set_buffer_hint();
782       }
783       if (!ctx_->sent_initial_metadata_) {
784         write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
785                                        ctx_->initial_metadata_flags());
786         if (ctx_->compression_level_set()) {
787           write_ops_.set_compression_level(ctx_->compression_level());
788         }
789         ctx_->sent_initial_metadata_ = true;
790       }
791       // TODO(vjpai): don't assert
792       GPR_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
793       call_.PerformOps(&write_ops_);
794     }
795 
WriteAndFinish(const ResponseType * resp,grpc::WriteOptions options,grpc::Status s)796     void WriteAndFinish(const ResponseType* resp, grpc::WriteOptions options,
797                         grpc::Status s) override {
798       // TODO(vjpai): don't assert
799       GPR_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
800       Finish(std::move(s));
801     }
802 
Read(RequestType * req)803     void Read(RequestType* req) override {
804       this->Ref();
805       read_ops_.RecvMessage(req);
806       call_.PerformOps(&read_ops_);
807     }
808 
809    private:
810     friend class CallbackBidiHandler<RequestType, ResponseType>;
811 
ServerCallbackReaderWriterImpl(grpc::CallbackServerContext * ctx,grpc::internal::Call * call,std::function<void ()> call_requester)812     ServerCallbackReaderWriterImpl(grpc::CallbackServerContext* ctx,
813                                    grpc::internal::Call* call,
814                                    std::function<void()> call_requester)
815         : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
816 
call()817     grpc_call* call() override { return call_.call(); }
818 
SetupReactor(ServerBidiReactor<RequestType,ResponseType> * reactor)819     void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) {
820       reactor_.store(reactor, std::memory_order_relaxed);
821       // The callbacks for these functions should not be inlined because they
822       // invoke user-controlled reactions, but any resulting OnDones can be
823       // inlined in the executor to which a callback is dispatched.
824       write_tag_.Set(
825           call_.call(),
826           [this, reactor](bool ok) {
827             reactor->OnWriteDone(ok);
828             this->MaybeDone(/*inlineable_ondone=*/true);
829           },
830           &write_ops_, /*can_inline=*/false);
831       write_ops_.set_core_cq_tag(&write_tag_);
832       read_tag_.Set(
833           call_.call(),
834           [this, reactor](bool ok) {
835             if (GPR_UNLIKELY(!ok)) {
836               ctx_->MaybeMarkCancelledOnRead();
837             }
838             reactor->OnReadDone(ok);
839             this->MaybeDone(/*inlineable_ondone=*/true);
840           },
841           &read_ops_, /*can_inline=*/false);
842       read_ops_.set_core_cq_tag(&read_tag_);
843       this->BindReactor(reactor);
844       this->MaybeCallOnCancel(reactor);
845       // Inlineable OnDone can be false here because there is no bidi
846       // reactor that has an inlineable OnDone; this only applies to the
847       // DefaultReactor (which is unary).
848       this->MaybeDone(/*inlineable_ondone=*/false);
849     }
850 
CallOnDone()851     void CallOnDone() override {
852       reactor_.load(std::memory_order_relaxed)->OnDone();
853       grpc_call* call = call_.call();
854       auto call_requester = std::move(call_requester_);
855       if (ctx_->context_allocator() != nullptr) {
856         ctx_->context_allocator()->Release(ctx_);
857       }
858       this->~ServerCallbackReaderWriterImpl();  // explicitly call destructor
859       grpc_call_unref(call);
860       call_requester();
861     }
862 
reactor()863     ServerReactor* reactor() override {
864       return reactor_.load(std::memory_order_relaxed);
865     }
866 
867     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata>
868         meta_ops_;
869     grpc::internal::CallbackWithSuccessTag meta_tag_;
870     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
871                               grpc::internal::CallOpSendMessage,
872                               grpc::internal::CallOpServerSendStatus>
873         finish_ops_;
874     grpc::internal::CallbackWithSuccessTag finish_tag_;
875     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
876                               grpc::internal::CallOpSendMessage>
877         write_ops_;
878     grpc::internal::CallbackWithSuccessTag write_tag_;
879     grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<RequestType>>
880         read_ops_;
881     grpc::internal::CallbackWithSuccessTag read_tag_;
882 
883     grpc::CallbackServerContext* const ctx_;
884     grpc::internal::Call call_;
885     std::function<void()> call_requester_;
886     // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
887     std::atomic<ServerBidiReactor<RequestType, ResponseType>*> reactor_;
888     // callbacks_outstanding_ follows a refcount pattern
889     std::atomic<intptr_t> callbacks_outstanding_{
890         3};  // reserve for OnStarted, Finish, and CompletionOp
891   };
892 };
893 
894 }  // namespace internal
895 }  // namespace grpc
896 
897 #endif  // GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H
898