1*cc02d7e2SAndroid Build Coastguard Worker //
2*cc02d7e2SAndroid Build Coastguard Worker // Copyright 2015 gRPC authors.
3*cc02d7e2SAndroid Build Coastguard Worker //
4*cc02d7e2SAndroid Build Coastguard Worker // Licensed under the Apache License, Version 2.0 (the "License");
5*cc02d7e2SAndroid Build Coastguard Worker // you may not use this file except in compliance with the License.
6*cc02d7e2SAndroid Build Coastguard Worker // You may obtain a copy of the License at
7*cc02d7e2SAndroid Build Coastguard Worker //
8*cc02d7e2SAndroid Build Coastguard Worker // http://www.apache.org/licenses/LICENSE-2.0
9*cc02d7e2SAndroid Build Coastguard Worker //
10*cc02d7e2SAndroid Build Coastguard Worker // Unless required by applicable law or agreed to in writing, software
11*cc02d7e2SAndroid Build Coastguard Worker // distributed under the License is distributed on an "AS IS" BASIS,
12*cc02d7e2SAndroid Build Coastguard Worker // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13*cc02d7e2SAndroid Build Coastguard Worker // See the License for the specific language governing permissions and
14*cc02d7e2SAndroid Build Coastguard Worker // limitations under the License.
15*cc02d7e2SAndroid Build Coastguard Worker //
16*cc02d7e2SAndroid Build Coastguard Worker //
17*cc02d7e2SAndroid Build Coastguard Worker
18*cc02d7e2SAndroid Build Coastguard Worker #include <limits.h>
19*cc02d7e2SAndroid Build Coastguard Worker #include <string.h>
20*cc02d7e2SAndroid Build Coastguard Worker
21*cc02d7e2SAndroid Build Coastguard Worker #include <algorithm>
22*cc02d7e2SAndroid Build Coastguard Worker #include <atomic>
23*cc02d7e2SAndroid Build Coastguard Worker #include <cstdlib>
24*cc02d7e2SAndroid Build Coastguard Worker #include <memory>
25*cc02d7e2SAndroid Build Coastguard Worker #include <new>
26*cc02d7e2SAndroid Build Coastguard Worker #include <sstream>
27*cc02d7e2SAndroid Build Coastguard Worker #include <string>
28*cc02d7e2SAndroid Build Coastguard Worker #include <type_traits>
29*cc02d7e2SAndroid Build Coastguard Worker #include <utility>
30*cc02d7e2SAndroid Build Coastguard Worker #include <vector>
31*cc02d7e2SAndroid Build Coastguard Worker
32*cc02d7e2SAndroid Build Coastguard Worker #include "absl/status/status.h"
33*cc02d7e2SAndroid Build Coastguard Worker
34*cc02d7e2SAndroid Build Coastguard Worker #include <grpc/byte_buffer.h>
35*cc02d7e2SAndroid Build Coastguard Worker #include <grpc/grpc.h>
36*cc02d7e2SAndroid Build Coastguard Worker #include <grpc/impl/channel_arg_names.h>
37*cc02d7e2SAndroid Build Coastguard Worker #include <grpc/slice.h>
38*cc02d7e2SAndroid Build Coastguard Worker #include <grpc/support/log.h>
39*cc02d7e2SAndroid Build Coastguard Worker #include <grpc/support/sync.h>
40*cc02d7e2SAndroid Build Coastguard Worker #include <grpc/support/time.h>
41*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/channel.h>
42*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/completion_queue.h>
43*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/generic/async_generic_service.h>
44*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/health_check_service_interface.h>
45*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/impl/call.h>
46*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/impl/call_op_set.h>
47*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/impl/call_op_set_interface.h>
48*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/impl/completion_queue_tag.h>
49*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/impl/interceptor_common.h>
50*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/impl/metadata_map.h>
51*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/impl/rpc_method.h>
52*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/impl/rpc_service_method.h>
53*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/impl/server_callback_handlers.h>
54*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/impl/server_initializer.h>
55*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/impl/service_type.h>
56*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/impl/sync.h>
57*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/security/server_credentials.h>
58*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/server.h>
59*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/server_context.h>
60*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/server_interface.h>
61*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/support/byte_buffer.h>
62*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/support/channel_arguments.h>
63*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/support/client_interceptor.h>
64*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/support/interceptor.h>
65*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/support/method_handler.h>
66*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/support/server_interceptor.h>
67*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/support/slice.h>
68*cc02d7e2SAndroid Build Coastguard Worker #include <grpcpp/support/status.h>
69*cc02d7e2SAndroid Build Coastguard Worker
70*cc02d7e2SAndroid Build Coastguard Worker #include "src/core/ext/transport/inproc/inproc_transport.h"
71*cc02d7e2SAndroid Build Coastguard Worker #include "src/core/lib/gprpp/manual_constructor.h"
72*cc02d7e2SAndroid Build Coastguard Worker #include "src/core/lib/iomgr/exec_ctx.h"
73*cc02d7e2SAndroid Build Coastguard Worker #include "src/core/lib/iomgr/iomgr.h"
74*cc02d7e2SAndroid Build Coastguard Worker #include "src/core/lib/resource_quota/api.h"
75*cc02d7e2SAndroid Build Coastguard Worker #include "src/core/lib/surface/completion_queue.h"
76*cc02d7e2SAndroid Build Coastguard Worker #include "src/core/lib/surface/server.h"
77*cc02d7e2SAndroid Build Coastguard Worker #include "src/cpp/client/create_channel_internal.h"
78*cc02d7e2SAndroid Build Coastguard Worker #include "src/cpp/server/external_connection_acceptor_impl.h"
79*cc02d7e2SAndroid Build Coastguard Worker #include "src/cpp/server/health/default_health_check_service.h"
80*cc02d7e2SAndroid Build Coastguard Worker #include "src/cpp/thread_manager/thread_manager.h"
81*cc02d7e2SAndroid Build Coastguard Worker
82*cc02d7e2SAndroid Build Coastguard Worker namespace grpc {
83*cc02d7e2SAndroid Build Coastguard Worker namespace {
84*cc02d7e2SAndroid Build Coastguard Worker
85*cc02d7e2SAndroid Build Coastguard Worker // The default value for maximum number of threads that can be created in the
86*cc02d7e2SAndroid Build Coastguard Worker // sync server. This value of INT_MAX is chosen to match the default behavior if
87*cc02d7e2SAndroid Build Coastguard Worker // no ResourceQuota is set. To modify the max number of threads in a sync
88*cc02d7e2SAndroid Build Coastguard Worker // server, pass a custom ResourceQuota object (with the desired number of
89*cc02d7e2SAndroid Build Coastguard Worker // max-threads set) to the server builder.
90*cc02d7e2SAndroid Build Coastguard Worker #define DEFAULT_MAX_SYNC_SERVER_THREADS INT_MAX
91*cc02d7e2SAndroid Build Coastguard Worker
92*cc02d7e2SAndroid Build Coastguard Worker // Give a useful status error message if the resource is exhausted specifically
93*cc02d7e2SAndroid Build Coastguard Worker // because the server threadpool is full.
94*cc02d7e2SAndroid Build Coastguard Worker const char* kServerThreadpoolExhausted = "Server Threadpool Exhausted";
95*cc02d7e2SAndroid Build Coastguard Worker
96*cc02d7e2SAndroid Build Coastguard Worker // Although we might like to give a useful status error message on unimplemented
97*cc02d7e2SAndroid Build Coastguard Worker // RPCs, it's not always possible since that also would need to be added across
98*cc02d7e2SAndroid Build Coastguard Worker // languages and isn't actually required by the spec.
99*cc02d7e2SAndroid Build Coastguard Worker const char* kUnknownRpcMethod = "";
100*cc02d7e2SAndroid Build Coastguard Worker
101*cc02d7e2SAndroid Build Coastguard Worker class DefaultGlobalCallbacks final : public Server::GlobalCallbacks {
102*cc02d7e2SAndroid Build Coastguard Worker public:
~DefaultGlobalCallbacks()103*cc02d7e2SAndroid Build Coastguard Worker ~DefaultGlobalCallbacks() override {}
PreSynchronousRequest(ServerContext *)104*cc02d7e2SAndroid Build Coastguard Worker void PreSynchronousRequest(ServerContext* /*context*/) override {}
PostSynchronousRequest(ServerContext *)105*cc02d7e2SAndroid Build Coastguard Worker void PostSynchronousRequest(ServerContext* /*context*/) override {}
106*cc02d7e2SAndroid Build Coastguard Worker };
107*cc02d7e2SAndroid Build Coastguard Worker
108*cc02d7e2SAndroid Build Coastguard Worker std::shared_ptr<Server::GlobalCallbacks> g_callbacks = nullptr;
109*cc02d7e2SAndroid Build Coastguard Worker gpr_once g_once_init_callbacks = GPR_ONCE_INIT;
110*cc02d7e2SAndroid Build Coastguard Worker
InitGlobalCallbacks()111*cc02d7e2SAndroid Build Coastguard Worker void InitGlobalCallbacks() {
112*cc02d7e2SAndroid Build Coastguard Worker if (!g_callbacks) {
113*cc02d7e2SAndroid Build Coastguard Worker g_callbacks.reset(new DefaultGlobalCallbacks());
114*cc02d7e2SAndroid Build Coastguard Worker }
115*cc02d7e2SAndroid Build Coastguard Worker }
116*cc02d7e2SAndroid Build Coastguard Worker
117*cc02d7e2SAndroid Build Coastguard Worker class ShutdownTag : public internal::CompletionQueueTag {
118*cc02d7e2SAndroid Build Coastguard Worker public:
FinalizeResult(void **,bool *)119*cc02d7e2SAndroid Build Coastguard Worker bool FinalizeResult(void** /*tag*/, bool* /*status*/) override {
120*cc02d7e2SAndroid Build Coastguard Worker return false;
121*cc02d7e2SAndroid Build Coastguard Worker }
122*cc02d7e2SAndroid Build Coastguard Worker };
123*cc02d7e2SAndroid Build Coastguard Worker
124*cc02d7e2SAndroid Build Coastguard Worker class PhonyTag : public internal::CompletionQueueTag {
125*cc02d7e2SAndroid Build Coastguard Worker public:
FinalizeResult(void **,bool *)126*cc02d7e2SAndroid Build Coastguard Worker bool FinalizeResult(void** /*tag*/, bool* /*status*/) override {
127*cc02d7e2SAndroid Build Coastguard Worker return true;
128*cc02d7e2SAndroid Build Coastguard Worker }
129*cc02d7e2SAndroid Build Coastguard Worker };
130*cc02d7e2SAndroid Build Coastguard Worker
131*cc02d7e2SAndroid Build Coastguard Worker class UnimplementedAsyncRequestContext {
132*cc02d7e2SAndroid Build Coastguard Worker protected:
UnimplementedAsyncRequestContext()133*cc02d7e2SAndroid Build Coastguard Worker UnimplementedAsyncRequestContext() : generic_stream_(&server_context_) {}
134*cc02d7e2SAndroid Build Coastguard Worker
135*cc02d7e2SAndroid Build Coastguard Worker GenericServerContext server_context_;
136*cc02d7e2SAndroid Build Coastguard Worker GenericServerAsyncReaderWriter generic_stream_;
137*cc02d7e2SAndroid Build Coastguard Worker };
138*cc02d7e2SAndroid Build Coastguard Worker
139*cc02d7e2SAndroid Build Coastguard Worker } // namespace
140*cc02d7e2SAndroid Build Coastguard Worker
BaseAsyncRequest(ServerInterface * server,ServerContext * context,internal::ServerAsyncStreamingInterface * stream,CompletionQueue * call_cq,ServerCompletionQueue * notification_cq,void * tag,bool delete_on_finalize)141*cc02d7e2SAndroid Build Coastguard Worker ServerInterface::BaseAsyncRequest::BaseAsyncRequest(
142*cc02d7e2SAndroid Build Coastguard Worker ServerInterface* server, ServerContext* context,
143*cc02d7e2SAndroid Build Coastguard Worker internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
144*cc02d7e2SAndroid Build Coastguard Worker ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize)
145*cc02d7e2SAndroid Build Coastguard Worker : server_(server),
146*cc02d7e2SAndroid Build Coastguard Worker context_(context),
147*cc02d7e2SAndroid Build Coastguard Worker stream_(stream),
148*cc02d7e2SAndroid Build Coastguard Worker call_cq_(call_cq),
149*cc02d7e2SAndroid Build Coastguard Worker notification_cq_(notification_cq),
150*cc02d7e2SAndroid Build Coastguard Worker tag_(tag),
151*cc02d7e2SAndroid Build Coastguard Worker delete_on_finalize_(delete_on_finalize),
152*cc02d7e2SAndroid Build Coastguard Worker call_(nullptr),
153*cc02d7e2SAndroid Build Coastguard Worker done_intercepting_(false) {
154*cc02d7e2SAndroid Build Coastguard Worker // Set up interception state partially for the receive ops. call_wrapper_ is
155*cc02d7e2SAndroid Build Coastguard Worker // not filled at this point, but it will be filled before the interceptors are
156*cc02d7e2SAndroid Build Coastguard Worker // run.
157*cc02d7e2SAndroid Build Coastguard Worker interceptor_methods_.SetCall(&call_wrapper_);
158*cc02d7e2SAndroid Build Coastguard Worker interceptor_methods_.SetReverse();
159*cc02d7e2SAndroid Build Coastguard Worker call_cq_->RegisterAvalanching(); // This op will trigger more ops
160*cc02d7e2SAndroid Build Coastguard Worker call_metric_recording_enabled_ = server_->call_metric_recording_enabled();
161*cc02d7e2SAndroid Build Coastguard Worker server_metric_recorder_ = server_->server_metric_recorder();
162*cc02d7e2SAndroid Build Coastguard Worker }
163*cc02d7e2SAndroid Build Coastguard Worker
~BaseAsyncRequest()164*cc02d7e2SAndroid Build Coastguard Worker ServerInterface::BaseAsyncRequest::~BaseAsyncRequest() {
165*cc02d7e2SAndroid Build Coastguard Worker call_cq_->CompleteAvalanching();
166*cc02d7e2SAndroid Build Coastguard Worker }
167*cc02d7e2SAndroid Build Coastguard Worker
FinalizeResult(void ** tag,bool * status)168*cc02d7e2SAndroid Build Coastguard Worker bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag,
169*cc02d7e2SAndroid Build Coastguard Worker bool* status) {
170*cc02d7e2SAndroid Build Coastguard Worker if (done_intercepting_) {
171*cc02d7e2SAndroid Build Coastguard Worker *tag = tag_;
172*cc02d7e2SAndroid Build Coastguard Worker if (delete_on_finalize_) {
173*cc02d7e2SAndroid Build Coastguard Worker delete this;
174*cc02d7e2SAndroid Build Coastguard Worker }
175*cc02d7e2SAndroid Build Coastguard Worker return true;
176*cc02d7e2SAndroid Build Coastguard Worker }
177*cc02d7e2SAndroid Build Coastguard Worker context_->set_call(call_, call_metric_recording_enabled_,
178*cc02d7e2SAndroid Build Coastguard Worker server_metric_recorder_);
179*cc02d7e2SAndroid Build Coastguard Worker context_->cq_ = call_cq_;
180*cc02d7e2SAndroid Build Coastguard Worker if (call_wrapper_.call() == nullptr) {
181*cc02d7e2SAndroid Build Coastguard Worker // Fill it since it is empty.
182*cc02d7e2SAndroid Build Coastguard Worker call_wrapper_ = internal::Call(
183*cc02d7e2SAndroid Build Coastguard Worker call_, server_, call_cq_, server_->max_receive_message_size(), nullptr);
184*cc02d7e2SAndroid Build Coastguard Worker }
185*cc02d7e2SAndroid Build Coastguard Worker
186*cc02d7e2SAndroid Build Coastguard Worker // just the pointers inside call are copied here
187*cc02d7e2SAndroid Build Coastguard Worker stream_->BindCall(&call_wrapper_);
188*cc02d7e2SAndroid Build Coastguard Worker
189*cc02d7e2SAndroid Build Coastguard Worker if (*status && call_ && call_wrapper_.server_rpc_info()) {
190*cc02d7e2SAndroid Build Coastguard Worker done_intercepting_ = true;
191*cc02d7e2SAndroid Build Coastguard Worker // Set interception point for RECV INITIAL METADATA
192*cc02d7e2SAndroid Build Coastguard Worker interceptor_methods_.AddInterceptionHookPoint(
193*cc02d7e2SAndroid Build Coastguard Worker experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA);
194*cc02d7e2SAndroid Build Coastguard Worker interceptor_methods_.SetRecvInitialMetadata(&context_->client_metadata_);
195*cc02d7e2SAndroid Build Coastguard Worker if (interceptor_methods_.RunInterceptors(
196*cc02d7e2SAndroid Build Coastguard Worker [this]() { ContinueFinalizeResultAfterInterception(); })) {
197*cc02d7e2SAndroid Build Coastguard Worker // There are no interceptors to run. Continue
198*cc02d7e2SAndroid Build Coastguard Worker } else {
199*cc02d7e2SAndroid Build Coastguard Worker // There were interceptors to be run, so
200*cc02d7e2SAndroid Build Coastguard Worker // ContinueFinalizeResultAfterInterception will be run when interceptors
201*cc02d7e2SAndroid Build Coastguard Worker // are done.
202*cc02d7e2SAndroid Build Coastguard Worker return false;
203*cc02d7e2SAndroid Build Coastguard Worker }
204*cc02d7e2SAndroid Build Coastguard Worker }
205*cc02d7e2SAndroid Build Coastguard Worker if (*status && call_) {
206*cc02d7e2SAndroid Build Coastguard Worker context_->BeginCompletionOp(&call_wrapper_, nullptr, nullptr);
207*cc02d7e2SAndroid Build Coastguard Worker }
208*cc02d7e2SAndroid Build Coastguard Worker *tag = tag_;
209*cc02d7e2SAndroid Build Coastguard Worker if (delete_on_finalize_) {
210*cc02d7e2SAndroid Build Coastguard Worker delete this;
211*cc02d7e2SAndroid Build Coastguard Worker }
212*cc02d7e2SAndroid Build Coastguard Worker return true;
213*cc02d7e2SAndroid Build Coastguard Worker }
214*cc02d7e2SAndroid Build Coastguard Worker
215*cc02d7e2SAndroid Build Coastguard Worker void ServerInterface::BaseAsyncRequest::
ContinueFinalizeResultAfterInterception()216*cc02d7e2SAndroid Build Coastguard Worker ContinueFinalizeResultAfterInterception() {
217*cc02d7e2SAndroid Build Coastguard Worker context_->BeginCompletionOp(&call_wrapper_, nullptr, nullptr);
218*cc02d7e2SAndroid Build Coastguard Worker // Queue a tag which will be returned immediately
219*cc02d7e2SAndroid Build Coastguard Worker grpc_core::ExecCtx exec_ctx;
220*cc02d7e2SAndroid Build Coastguard Worker grpc_cq_begin_op(notification_cq_->cq(), this);
221*cc02d7e2SAndroid Build Coastguard Worker grpc_cq_end_op(
222*cc02d7e2SAndroid Build Coastguard Worker notification_cq_->cq(), this, absl::OkStatus(),
223*cc02d7e2SAndroid Build Coastguard Worker [](void* /*arg*/, grpc_cq_completion* completion) { delete completion; },
224*cc02d7e2SAndroid Build Coastguard Worker nullptr, new grpc_cq_completion());
225*cc02d7e2SAndroid Build Coastguard Worker }
226*cc02d7e2SAndroid Build Coastguard Worker
RegisteredAsyncRequest(ServerInterface * server,ServerContext * context,internal::ServerAsyncStreamingInterface * stream,CompletionQueue * call_cq,ServerCompletionQueue * notification_cq,void * tag,const char * name,internal::RpcMethod::RpcType type)227*cc02d7e2SAndroid Build Coastguard Worker ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest(
228*cc02d7e2SAndroid Build Coastguard Worker ServerInterface* server, ServerContext* context,
229*cc02d7e2SAndroid Build Coastguard Worker internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
230*cc02d7e2SAndroid Build Coastguard Worker ServerCompletionQueue* notification_cq, void* tag, const char* name,
231*cc02d7e2SAndroid Build Coastguard Worker internal::RpcMethod::RpcType type)
232*cc02d7e2SAndroid Build Coastguard Worker : BaseAsyncRequest(server, context, stream, call_cq, notification_cq, tag,
233*cc02d7e2SAndroid Build Coastguard Worker true),
234*cc02d7e2SAndroid Build Coastguard Worker name_(name),
235*cc02d7e2SAndroid Build Coastguard Worker type_(type) {}
236*cc02d7e2SAndroid Build Coastguard Worker
IssueRequest(void * registered_method,grpc_byte_buffer ** payload,ServerCompletionQueue * notification_cq)237*cc02d7e2SAndroid Build Coastguard Worker void ServerInterface::RegisteredAsyncRequest::IssueRequest(
238*cc02d7e2SAndroid Build Coastguard Worker void* registered_method, grpc_byte_buffer** payload,
239*cc02d7e2SAndroid Build Coastguard Worker ServerCompletionQueue* notification_cq) {
240*cc02d7e2SAndroid Build Coastguard Worker // The following call_start_batch is internally-generated so no need for an
241*cc02d7e2SAndroid Build Coastguard Worker // explanatory log on failure.
242*cc02d7e2SAndroid Build Coastguard Worker GPR_ASSERT(grpc_server_request_registered_call(
243*cc02d7e2SAndroid Build Coastguard Worker server_->server(), registered_method, &call_,
244*cc02d7e2SAndroid Build Coastguard Worker &context_->deadline_, context_->client_metadata_.arr(),
245*cc02d7e2SAndroid Build Coastguard Worker payload, call_cq_->cq(), notification_cq->cq(),
246*cc02d7e2SAndroid Build Coastguard Worker this) == GRPC_CALL_OK);
247*cc02d7e2SAndroid Build Coastguard Worker }
248*cc02d7e2SAndroid Build Coastguard Worker
GenericAsyncRequest(ServerInterface * server,GenericServerContext * context,internal::ServerAsyncStreamingInterface * stream,CompletionQueue * call_cq,ServerCompletionQueue * notification_cq,void * tag,bool delete_on_finalize,bool issue_request)249*cc02d7e2SAndroid Build Coastguard Worker ServerInterface::GenericAsyncRequest::GenericAsyncRequest(
250*cc02d7e2SAndroid Build Coastguard Worker ServerInterface* server, GenericServerContext* context,
251*cc02d7e2SAndroid Build Coastguard Worker internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
252*cc02d7e2SAndroid Build Coastguard Worker ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize,
253*cc02d7e2SAndroid Build Coastguard Worker bool issue_request)
254*cc02d7e2SAndroid Build Coastguard Worker : BaseAsyncRequest(server, context, stream, call_cq, notification_cq, tag,
255*cc02d7e2SAndroid Build Coastguard Worker delete_on_finalize) {
256*cc02d7e2SAndroid Build Coastguard Worker grpc_call_details_init(&call_details_);
257*cc02d7e2SAndroid Build Coastguard Worker GPR_ASSERT(notification_cq);
258*cc02d7e2SAndroid Build Coastguard Worker GPR_ASSERT(call_cq);
259*cc02d7e2SAndroid Build Coastguard Worker if (issue_request) {
260*cc02d7e2SAndroid Build Coastguard Worker IssueRequest();
261*cc02d7e2SAndroid Build Coastguard Worker }
262*cc02d7e2SAndroid Build Coastguard Worker }
263*cc02d7e2SAndroid Build Coastguard Worker
FinalizeResult(void ** tag,bool * status)264*cc02d7e2SAndroid Build Coastguard Worker bool ServerInterface::GenericAsyncRequest::FinalizeResult(void** tag,
265*cc02d7e2SAndroid Build Coastguard Worker bool* status) {
266*cc02d7e2SAndroid Build Coastguard Worker // If we are done intercepting, there is nothing more for us to do
267*cc02d7e2SAndroid Build Coastguard Worker if (done_intercepting_) {
268*cc02d7e2SAndroid Build Coastguard Worker return BaseAsyncRequest::FinalizeResult(tag, status);
269*cc02d7e2SAndroid Build Coastguard Worker }
270*cc02d7e2SAndroid Build Coastguard Worker // TODO(yangg) remove the copy here.
271*cc02d7e2SAndroid Build Coastguard Worker if (*status) {
272*cc02d7e2SAndroid Build Coastguard Worker static_cast<GenericServerContext*>(context_)->method_ =
273*cc02d7e2SAndroid Build Coastguard Worker StringFromCopiedSlice(call_details_.method);
274*cc02d7e2SAndroid Build Coastguard Worker static_cast<GenericServerContext*>(context_)->host_ =
275*cc02d7e2SAndroid Build Coastguard Worker StringFromCopiedSlice(call_details_.host);
276*cc02d7e2SAndroid Build Coastguard Worker context_->deadline_ = call_details_.deadline;
277*cc02d7e2SAndroid Build Coastguard Worker }
278*cc02d7e2SAndroid Build Coastguard Worker grpc_slice_unref(call_details_.method);
279*cc02d7e2SAndroid Build Coastguard Worker grpc_slice_unref(call_details_.host);
280*cc02d7e2SAndroid Build Coastguard Worker call_wrapper_ = internal::Call(
281*cc02d7e2SAndroid Build Coastguard Worker call_, server_, call_cq_, server_->max_receive_message_size(),
282*cc02d7e2SAndroid Build Coastguard Worker context_->set_server_rpc_info(
283*cc02d7e2SAndroid Build Coastguard Worker static_cast<GenericServerContext*>(context_)->method_.c_str(),
284*cc02d7e2SAndroid Build Coastguard Worker internal::RpcMethod::BIDI_STREAMING,
285*cc02d7e2SAndroid Build Coastguard Worker *server_->interceptor_creators()));
286*cc02d7e2SAndroid Build Coastguard Worker return BaseAsyncRequest::FinalizeResult(tag, status);
287*cc02d7e2SAndroid Build Coastguard Worker }
288*cc02d7e2SAndroid Build Coastguard Worker
IssueRequest()289*cc02d7e2SAndroid Build Coastguard Worker void ServerInterface::GenericAsyncRequest::IssueRequest() {
290*cc02d7e2SAndroid Build Coastguard Worker // The following call_start_batch is internally-generated so no need for an
291*cc02d7e2SAndroid Build Coastguard Worker // explanatory log on failure.
292*cc02d7e2SAndroid Build Coastguard Worker GPR_ASSERT(grpc_server_request_call(server_->server(), &call_, &call_details_,
293*cc02d7e2SAndroid Build Coastguard Worker context_->client_metadata_.arr(),
294*cc02d7e2SAndroid Build Coastguard Worker call_cq_->cq(), notification_cq_->cq(),
295*cc02d7e2SAndroid Build Coastguard Worker this) == GRPC_CALL_OK);
296*cc02d7e2SAndroid Build Coastguard Worker }
297*cc02d7e2SAndroid Build Coastguard Worker
298*cc02d7e2SAndroid Build Coastguard Worker namespace {
299*cc02d7e2SAndroid Build Coastguard Worker class ShutdownCallback : public grpc_completion_queue_functor {
300*cc02d7e2SAndroid Build Coastguard Worker public:
ShutdownCallback()301*cc02d7e2SAndroid Build Coastguard Worker ShutdownCallback() {
302*cc02d7e2SAndroid Build Coastguard Worker functor_run = &ShutdownCallback::Run;
303*cc02d7e2SAndroid Build Coastguard Worker // Set inlineable to true since this callback is trivial and thus does not
304*cc02d7e2SAndroid Build Coastguard Worker // need to be run from the executor (triggering a thread hop). This should
305*cc02d7e2SAndroid Build Coastguard Worker // only be used by internal callbacks like this and not by user application
306*cc02d7e2SAndroid Build Coastguard Worker // code.
307*cc02d7e2SAndroid Build Coastguard Worker inlineable = true;
308*cc02d7e2SAndroid Build Coastguard Worker }
309*cc02d7e2SAndroid Build Coastguard Worker // TakeCQ takes ownership of the cq into the shutdown callback
310*cc02d7e2SAndroid Build Coastguard Worker // so that the shutdown callback will be responsible for destroying it
TakeCQ(CompletionQueue * cq)311*cc02d7e2SAndroid Build Coastguard Worker void TakeCQ(CompletionQueue* cq) { cq_ = cq; }
312*cc02d7e2SAndroid Build Coastguard Worker
313*cc02d7e2SAndroid Build Coastguard Worker // The Run function will get invoked by the completion queue library
314*cc02d7e2SAndroid Build Coastguard Worker // when the shutdown is actually complete
Run(grpc_completion_queue_functor * cb,int)315*cc02d7e2SAndroid Build Coastguard Worker static void Run(grpc_completion_queue_functor* cb, int) {
316*cc02d7e2SAndroid Build Coastguard Worker auto* callback = static_cast<ShutdownCallback*>(cb);
317*cc02d7e2SAndroid Build Coastguard Worker delete callback->cq_;
318*cc02d7e2SAndroid Build Coastguard Worker delete callback;
319*cc02d7e2SAndroid Build Coastguard Worker }
320*cc02d7e2SAndroid Build Coastguard Worker
321*cc02d7e2SAndroid Build Coastguard Worker private:
322*cc02d7e2SAndroid Build Coastguard Worker CompletionQueue* cq_ = nullptr;
323*cc02d7e2SAndroid Build Coastguard Worker };
324*cc02d7e2SAndroid Build Coastguard Worker } // namespace
325*cc02d7e2SAndroid Build Coastguard Worker
326*cc02d7e2SAndroid Build Coastguard Worker /// Use private inheritance rather than composition only to establish order
327*cc02d7e2SAndroid Build Coastguard Worker /// of construction, since the public base class should be constructed after the
328*cc02d7e2SAndroid Build Coastguard Worker /// elements belonging to the private base class are constructed. This is not
329*cc02d7e2SAndroid Build Coastguard Worker /// possible using true composition.
330*cc02d7e2SAndroid Build Coastguard Worker class Server::UnimplementedAsyncRequest final
331*cc02d7e2SAndroid Build Coastguard Worker : private grpc::UnimplementedAsyncRequestContext,
332*cc02d7e2SAndroid Build Coastguard Worker public GenericAsyncRequest {
333*cc02d7e2SAndroid Build Coastguard Worker public:
UnimplementedAsyncRequest(ServerInterface * server,grpc::ServerCompletionQueue * cq)334*cc02d7e2SAndroid Build Coastguard Worker UnimplementedAsyncRequest(ServerInterface* server,
335*cc02d7e2SAndroid Build Coastguard Worker grpc::ServerCompletionQueue* cq)
336*cc02d7e2SAndroid Build Coastguard Worker : GenericAsyncRequest(server, &server_context_, &generic_stream_, cq, cq,
337*cc02d7e2SAndroid Build Coastguard Worker /*tag=*/nullptr, /*delete_on_finalize=*/false,
338*cc02d7e2SAndroid Build Coastguard Worker /*issue_request=*/false) {
339*cc02d7e2SAndroid Build Coastguard Worker // Issue request here instead of the base class to prevent race on vptr.
340*cc02d7e2SAndroid Build Coastguard Worker IssueRequest();
341*cc02d7e2SAndroid Build Coastguard Worker }
342*cc02d7e2SAndroid Build Coastguard Worker
343*cc02d7e2SAndroid Build Coastguard Worker bool FinalizeResult(void** tag, bool* status) override;
344*cc02d7e2SAndroid Build Coastguard Worker
context()345*cc02d7e2SAndroid Build Coastguard Worker grpc::ServerContext* context() { return &server_context_; }
stream()346*cc02d7e2SAndroid Build Coastguard Worker grpc::GenericServerAsyncReaderWriter* stream() { return &generic_stream_; }
347*cc02d7e2SAndroid Build Coastguard Worker };
348*cc02d7e2SAndroid Build Coastguard Worker
349*cc02d7e2SAndroid Build Coastguard Worker /// UnimplementedAsyncResponse should not post user-visible completions to the
350*cc02d7e2SAndroid Build Coastguard Worker /// C++ completion queue, but is generated as a CQ event by the core
351*cc02d7e2SAndroid Build Coastguard Worker class Server::UnimplementedAsyncResponse final
352*cc02d7e2SAndroid Build Coastguard Worker : public grpc::internal::CallOpSet<
353*cc02d7e2SAndroid Build Coastguard Worker grpc::internal::CallOpSendInitialMetadata,
354*cc02d7e2SAndroid Build Coastguard Worker grpc::internal::CallOpServerSendStatus> {
355*cc02d7e2SAndroid Build Coastguard Worker public:
356*cc02d7e2SAndroid Build Coastguard Worker explicit UnimplementedAsyncResponse(UnimplementedAsyncRequest* request);
~UnimplementedAsyncResponse()357*cc02d7e2SAndroid Build Coastguard Worker ~UnimplementedAsyncResponse() override { delete request_; }
358*cc02d7e2SAndroid Build Coastguard Worker
FinalizeResult(void ** tag,bool * status)359*cc02d7e2SAndroid Build Coastguard Worker bool FinalizeResult(void** tag, bool* status) override {
360*cc02d7e2SAndroid Build Coastguard Worker if (grpc::internal::CallOpSet<
361*cc02d7e2SAndroid Build Coastguard Worker grpc::internal::CallOpSendInitialMetadata,
362*cc02d7e2SAndroid Build Coastguard Worker grpc::internal::CallOpServerSendStatus>::FinalizeResult(tag,
363*cc02d7e2SAndroid Build Coastguard Worker status)) {
364*cc02d7e2SAndroid Build Coastguard Worker delete this;
365*cc02d7e2SAndroid Build Coastguard Worker } else {
366*cc02d7e2SAndroid Build Coastguard Worker // The tag was swallowed due to interception. We will see it again.
367*cc02d7e2SAndroid Build Coastguard Worker }
368*cc02d7e2SAndroid Build Coastguard Worker return false;
369*cc02d7e2SAndroid Build Coastguard Worker }
370*cc02d7e2SAndroid Build Coastguard Worker
371*cc02d7e2SAndroid Build Coastguard Worker private:
372*cc02d7e2SAndroid Build Coastguard Worker UnimplementedAsyncRequest* const request_;
373*cc02d7e2SAndroid Build Coastguard Worker };
374*cc02d7e2SAndroid Build Coastguard Worker
375*cc02d7e2SAndroid Build Coastguard Worker class Server::SyncRequest final : public grpc::internal::CompletionQueueTag {
376*cc02d7e2SAndroid Build Coastguard Worker public:
SyncRequest(Server * server,grpc::internal::RpcServiceMethod * method,grpc_core::Server::RegisteredCallAllocation * data)377*cc02d7e2SAndroid Build Coastguard Worker SyncRequest(Server* server, grpc::internal::RpcServiceMethod* method,
378*cc02d7e2SAndroid Build Coastguard Worker grpc_core::Server::RegisteredCallAllocation* data)
379*cc02d7e2SAndroid Build Coastguard Worker : SyncRequest(server, method) {
380*cc02d7e2SAndroid Build Coastguard Worker CommonSetup(data);
381*cc02d7e2SAndroid Build Coastguard Worker data->deadline = &deadline_;
382*cc02d7e2SAndroid Build Coastguard Worker data->optional_payload = has_request_payload_ ? &request_payload_ : nullptr;
383*cc02d7e2SAndroid Build Coastguard Worker }
384*cc02d7e2SAndroid Build Coastguard Worker
SyncRequest(Server * server,grpc::internal::RpcServiceMethod * method,grpc_core::Server::BatchCallAllocation * data)385*cc02d7e2SAndroid Build Coastguard Worker SyncRequest(Server* server, grpc::internal::RpcServiceMethod* method,
386*cc02d7e2SAndroid Build Coastguard Worker grpc_core::Server::BatchCallAllocation* data)
387*cc02d7e2SAndroid Build Coastguard Worker : SyncRequest(server, method) {
388*cc02d7e2SAndroid Build Coastguard Worker CommonSetup(data);
389*cc02d7e2SAndroid Build Coastguard Worker call_details_ = new grpc_call_details;
390*cc02d7e2SAndroid Build Coastguard Worker grpc_call_details_init(call_details_);
391*cc02d7e2SAndroid Build Coastguard Worker data->details = call_details_;
392*cc02d7e2SAndroid Build Coastguard Worker }
393*cc02d7e2SAndroid Build Coastguard Worker
~SyncRequest()394*cc02d7e2SAndroid Build Coastguard Worker ~SyncRequest() override {
395*cc02d7e2SAndroid Build Coastguard Worker // The destructor should only cleanup those objects created in the
396*cc02d7e2SAndroid Build Coastguard Worker // constructor, since some paths may or may not actually go through the
397*cc02d7e2SAndroid Build Coastguard Worker // Run stage where other objects are allocated.
398*cc02d7e2SAndroid Build Coastguard Worker if (has_request_payload_ && request_payload_) {
399*cc02d7e2SAndroid Build Coastguard Worker grpc_byte_buffer_destroy(request_payload_);
400*cc02d7e2SAndroid Build Coastguard Worker }
401*cc02d7e2SAndroid Build Coastguard Worker if (call_details_ != nullptr) {
402*cc02d7e2SAndroid Build Coastguard Worker grpc_call_details_destroy(call_details_);
403*cc02d7e2SAndroid Build Coastguard Worker delete call_details_;
404*cc02d7e2SAndroid Build Coastguard Worker }
405*cc02d7e2SAndroid Build Coastguard Worker grpc_metadata_array_destroy(&request_metadata_);
406*cc02d7e2SAndroid Build Coastguard Worker server_->UnrefWithPossibleNotify();
407*cc02d7e2SAndroid Build Coastguard Worker }
408*cc02d7e2SAndroid Build Coastguard Worker
FinalizeResult(void **,bool * status)409*cc02d7e2SAndroid Build Coastguard Worker bool FinalizeResult(void** /*tag*/, bool* status) override {
410*cc02d7e2SAndroid Build Coastguard Worker if (!*status) {
411*cc02d7e2SAndroid Build Coastguard Worker delete this;
412*cc02d7e2SAndroid Build Coastguard Worker return false;
413*cc02d7e2SAndroid Build Coastguard Worker }
414*cc02d7e2SAndroid Build Coastguard Worker if (call_details_) {
415*cc02d7e2SAndroid Build Coastguard Worker deadline_ = call_details_->deadline;
416*cc02d7e2SAndroid Build Coastguard Worker }
417*cc02d7e2SAndroid Build Coastguard Worker return true;
418*cc02d7e2SAndroid Build Coastguard Worker }
419*cc02d7e2SAndroid Build Coastguard Worker
Run(const std::shared_ptr<GlobalCallbacks> & global_callbacks,bool resources)420*cc02d7e2SAndroid Build Coastguard Worker void Run(const std::shared_ptr<GlobalCallbacks>& global_callbacks,
421*cc02d7e2SAndroid Build Coastguard Worker bool resources) {
422*cc02d7e2SAndroid Build Coastguard Worker ctx_.Init(deadline_, &request_metadata_);
423*cc02d7e2SAndroid Build Coastguard Worker wrapped_call_.Init(
424*cc02d7e2SAndroid Build Coastguard Worker call_, server_, &cq_, server_->max_receive_message_size(),
425*cc02d7e2SAndroid Build Coastguard Worker ctx_->ctx.set_server_rpc_info(method_->name(), method_->method_type(),
426*cc02d7e2SAndroid Build Coastguard Worker server_->interceptor_creators_));
427*cc02d7e2SAndroid Build Coastguard Worker ctx_->ctx.set_call(call_, server_->call_metric_recording_enabled(),
428*cc02d7e2SAndroid Build Coastguard Worker server_->server_metric_recorder());
429*cc02d7e2SAndroid Build Coastguard Worker ctx_->ctx.cq_ = &cq_;
430*cc02d7e2SAndroid Build Coastguard Worker request_metadata_.count = 0;
431*cc02d7e2SAndroid Build Coastguard Worker
432*cc02d7e2SAndroid Build Coastguard Worker global_callbacks_ = global_callbacks;
433*cc02d7e2SAndroid Build Coastguard Worker resources_ = resources;
434*cc02d7e2SAndroid Build Coastguard Worker
435*cc02d7e2SAndroid Build Coastguard Worker interceptor_methods_.SetCall(&*wrapped_call_);
436*cc02d7e2SAndroid Build Coastguard Worker interceptor_methods_.SetReverse();
437*cc02d7e2SAndroid Build Coastguard Worker // Set interception point for RECV INITIAL METADATA
438*cc02d7e2SAndroid Build Coastguard Worker interceptor_methods_.AddInterceptionHookPoint(
439*cc02d7e2SAndroid Build Coastguard Worker grpc::experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA);
440*cc02d7e2SAndroid Build Coastguard Worker interceptor_methods_.SetRecvInitialMetadata(&ctx_->ctx.client_metadata_);
441*cc02d7e2SAndroid Build Coastguard Worker
442*cc02d7e2SAndroid Build Coastguard Worker if (has_request_payload_) {
443*cc02d7e2SAndroid Build Coastguard Worker // Set interception point for RECV MESSAGE
444*cc02d7e2SAndroid Build Coastguard Worker auto* handler = resources_ ? method_->handler()
445*cc02d7e2SAndroid Build Coastguard Worker : server_->resource_exhausted_handler_.get();
446*cc02d7e2SAndroid Build Coastguard Worker deserialized_request_ = handler->Deserialize(call_, request_payload_,
447*cc02d7e2SAndroid Build Coastguard Worker &request_status_, nullptr);
448*cc02d7e2SAndroid Build Coastguard Worker if (!request_status_.ok()) {
449*cc02d7e2SAndroid Build Coastguard Worker gpr_log(GPR_DEBUG, "Failed to deserialize message.");
450*cc02d7e2SAndroid Build Coastguard Worker }
451*cc02d7e2SAndroid Build Coastguard Worker request_payload_ = nullptr;
452*cc02d7e2SAndroid Build Coastguard Worker interceptor_methods_.AddInterceptionHookPoint(
453*cc02d7e2SAndroid Build Coastguard Worker grpc::experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
454*cc02d7e2SAndroid Build Coastguard Worker interceptor_methods_.SetRecvMessage(deserialized_request_, nullptr);
455*cc02d7e2SAndroid Build Coastguard Worker }
456*cc02d7e2SAndroid Build Coastguard Worker
457*cc02d7e2SAndroid Build Coastguard Worker if (interceptor_methods_.RunInterceptors(
458*cc02d7e2SAndroid Build Coastguard Worker [this]() { ContinueRunAfterInterception(); })) {
459*cc02d7e2SAndroid Build Coastguard Worker ContinueRunAfterInterception();
460*cc02d7e2SAndroid Build Coastguard Worker } else {
461*cc02d7e2SAndroid Build Coastguard Worker // There were interceptors to be run, so ContinueRunAfterInterception
462*cc02d7e2SAndroid Build Coastguard Worker // will be run when interceptors are done.
463*cc02d7e2SAndroid Build Coastguard Worker }
464*cc02d7e2SAndroid Build Coastguard Worker }
465*cc02d7e2SAndroid Build Coastguard Worker
ContinueRunAfterInterception()466*cc02d7e2SAndroid Build Coastguard Worker void ContinueRunAfterInterception() {
467*cc02d7e2SAndroid Build Coastguard Worker ctx_->ctx.BeginCompletionOp(&*wrapped_call_, nullptr, nullptr);
468*cc02d7e2SAndroid Build Coastguard Worker global_callbacks_->PreSynchronousRequest(&ctx_->ctx);
469*cc02d7e2SAndroid Build Coastguard Worker auto* handler = resources_ ? method_->handler()
470*cc02d7e2SAndroid Build Coastguard Worker : server_->resource_exhausted_handler_.get();
471*cc02d7e2SAndroid Build Coastguard Worker handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter(
472*cc02d7e2SAndroid Build Coastguard Worker &*wrapped_call_, &ctx_->ctx, deserialized_request_, request_status_,
473*cc02d7e2SAndroid Build Coastguard Worker nullptr, nullptr));
474*cc02d7e2SAndroid Build Coastguard Worker global_callbacks_->PostSynchronousRequest(&ctx_->ctx);
475*cc02d7e2SAndroid Build Coastguard Worker
476*cc02d7e2SAndroid Build Coastguard Worker cq_.Shutdown();
477*cc02d7e2SAndroid Build Coastguard Worker
478*cc02d7e2SAndroid Build Coastguard Worker grpc::internal::CompletionQueueTag* op_tag = ctx_->ctx.GetCompletionOpTag();
479*cc02d7e2SAndroid Build Coastguard Worker cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME));
480*cc02d7e2SAndroid Build Coastguard Worker
481*cc02d7e2SAndroid Build Coastguard Worker // Ensure the cq_ is shutdown
482*cc02d7e2SAndroid Build Coastguard Worker grpc::PhonyTag ignored_tag;
483*cc02d7e2SAndroid Build Coastguard Worker GPR_ASSERT(cq_.Pluck(&ignored_tag) == false);
484*cc02d7e2SAndroid Build Coastguard Worker
485*cc02d7e2SAndroid Build Coastguard Worker // Cleanup structures allocated during Run/ContinueRunAfterInterception
486*cc02d7e2SAndroid Build Coastguard Worker wrapped_call_.Destroy();
487*cc02d7e2SAndroid Build Coastguard Worker ctx_.Destroy();
488*cc02d7e2SAndroid Build Coastguard Worker
489*cc02d7e2SAndroid Build Coastguard Worker delete this;
490*cc02d7e2SAndroid Build Coastguard Worker }
491*cc02d7e2SAndroid Build Coastguard Worker
492*cc02d7e2SAndroid Build Coastguard Worker // For requests that must be only cleaned up but not actually Run
Cleanup()493*cc02d7e2SAndroid Build Coastguard Worker void Cleanup() {
494*cc02d7e2SAndroid Build Coastguard Worker cq_.Shutdown();
495*cc02d7e2SAndroid Build Coastguard Worker grpc_call_unref(call_);
496*cc02d7e2SAndroid Build Coastguard Worker delete this;
497*cc02d7e2SAndroid Build Coastguard Worker }
498*cc02d7e2SAndroid Build Coastguard Worker
499*cc02d7e2SAndroid Build Coastguard Worker private:
SyncRequest(Server * server,grpc::internal::RpcServiceMethod * method)500*cc02d7e2SAndroid Build Coastguard Worker SyncRequest(Server* server, grpc::internal::RpcServiceMethod* method)
501*cc02d7e2SAndroid Build Coastguard Worker : server_(server),
502*cc02d7e2SAndroid Build Coastguard Worker method_(method),
503*cc02d7e2SAndroid Build Coastguard Worker has_request_payload_(method->method_type() ==
504*cc02d7e2SAndroid Build Coastguard Worker grpc::internal::RpcMethod::NORMAL_RPC ||
505*cc02d7e2SAndroid Build Coastguard Worker method->method_type() ==
506*cc02d7e2SAndroid Build Coastguard Worker grpc::internal::RpcMethod::SERVER_STREAMING),
507*cc02d7e2SAndroid Build Coastguard Worker cq_(grpc_completion_queue_create_for_pluck(nullptr)) {}
508*cc02d7e2SAndroid Build Coastguard Worker
509*cc02d7e2SAndroid Build Coastguard Worker template <class CallAllocation>
CommonSetup(CallAllocation * data)510*cc02d7e2SAndroid Build Coastguard Worker void CommonSetup(CallAllocation* data) {
511*cc02d7e2SAndroid Build Coastguard Worker server_->Ref();
512*cc02d7e2SAndroid Build Coastguard Worker grpc_metadata_array_init(&request_metadata_);
513*cc02d7e2SAndroid Build Coastguard Worker data->tag = static_cast<void*>(this);
514*cc02d7e2SAndroid Build Coastguard Worker data->call = &call_;
515*cc02d7e2SAndroid Build Coastguard Worker data->initial_metadata = &request_metadata_;
516*cc02d7e2SAndroid Build Coastguard Worker data->cq = cq_.cq();
517*cc02d7e2SAndroid Build Coastguard Worker }
518*cc02d7e2SAndroid Build Coastguard Worker
519*cc02d7e2SAndroid Build Coastguard Worker Server* const server_;
520*cc02d7e2SAndroid Build Coastguard Worker grpc::internal::RpcServiceMethod* const method_;
521*cc02d7e2SAndroid Build Coastguard Worker const bool has_request_payload_;
522*cc02d7e2SAndroid Build Coastguard Worker grpc_call* call_;
523*cc02d7e2SAndroid Build Coastguard Worker grpc_call_details* call_details_ = nullptr;
524*cc02d7e2SAndroid Build Coastguard Worker gpr_timespec deadline_;
525*cc02d7e2SAndroid Build Coastguard Worker grpc_metadata_array request_metadata_;
526*cc02d7e2SAndroid Build Coastguard Worker grpc_byte_buffer* request_payload_ = nullptr;
527*cc02d7e2SAndroid Build Coastguard Worker grpc::CompletionQueue cq_;
528*cc02d7e2SAndroid Build Coastguard Worker grpc::Status request_status_;
529*cc02d7e2SAndroid Build Coastguard Worker std::shared_ptr<GlobalCallbacks> global_callbacks_;
530*cc02d7e2SAndroid Build Coastguard Worker bool resources_;
531*cc02d7e2SAndroid Build Coastguard Worker void* deserialized_request_ = nullptr;
532*cc02d7e2SAndroid Build Coastguard Worker grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_;
533*cc02d7e2SAndroid Build Coastguard Worker
534*cc02d7e2SAndroid Build Coastguard Worker // ServerContextWrapper allows ManualConstructor while using a private
535*cc02d7e2SAndroid Build Coastguard Worker // contructor of ServerContext via this friend class.
536*cc02d7e2SAndroid Build Coastguard Worker struct ServerContextWrapper {
537*cc02d7e2SAndroid Build Coastguard Worker ServerContext ctx;
538*cc02d7e2SAndroid Build Coastguard Worker
ServerContextWrappergrpc::Server::SyncRequest::ServerContextWrapper539*cc02d7e2SAndroid Build Coastguard Worker ServerContextWrapper(gpr_timespec deadline, grpc_metadata_array* arr)
540*cc02d7e2SAndroid Build Coastguard Worker : ctx(deadline, arr) {}
541*cc02d7e2SAndroid Build Coastguard Worker };
542*cc02d7e2SAndroid Build Coastguard Worker
543*cc02d7e2SAndroid Build Coastguard Worker grpc_core::ManualConstructor<ServerContextWrapper> ctx_;
544*cc02d7e2SAndroid Build Coastguard Worker grpc_core::ManualConstructor<internal::Call> wrapped_call_;
545*cc02d7e2SAndroid Build Coastguard Worker };
546*cc02d7e2SAndroid Build Coastguard Worker
547*cc02d7e2SAndroid Build Coastguard Worker template <class ServerContextType>
548*cc02d7e2SAndroid Build Coastguard Worker class Server::CallbackRequest final
549*cc02d7e2SAndroid Build Coastguard Worker : public grpc::internal::CompletionQueueTag {
550*cc02d7e2SAndroid Build Coastguard Worker public:
551*cc02d7e2SAndroid Build Coastguard Worker static_assert(
552*cc02d7e2SAndroid Build Coastguard Worker std::is_base_of<grpc::CallbackServerContext, ServerContextType>::value,
553*cc02d7e2SAndroid Build Coastguard Worker "ServerContextType must be derived from CallbackServerContext");
554*cc02d7e2SAndroid Build Coastguard Worker
555*cc02d7e2SAndroid Build Coastguard Worker // For codegen services, the value of method represents the defined
556*cc02d7e2SAndroid Build Coastguard Worker // characteristics of the method being requested. For generic services, method
557*cc02d7e2SAndroid Build Coastguard Worker // is nullptr since these services don't have pre-defined methods.
CallbackRequest(Server * server,grpc::internal::RpcServiceMethod * method,grpc::CompletionQueue * cq,grpc_core::Server::RegisteredCallAllocation * data)558*cc02d7e2SAndroid Build Coastguard Worker CallbackRequest(Server* server, grpc::internal::RpcServiceMethod* method,
559*cc02d7e2SAndroid Build Coastguard Worker grpc::CompletionQueue* cq,
560*cc02d7e2SAndroid Build Coastguard Worker grpc_core::Server::RegisteredCallAllocation* data)
561*cc02d7e2SAndroid Build Coastguard Worker : server_(server),
562*cc02d7e2SAndroid Build Coastguard Worker method_(method),
563*cc02d7e2SAndroid Build Coastguard Worker has_request_payload_(method->method_type() ==
564*cc02d7e2SAndroid Build Coastguard Worker grpc::internal::RpcMethod::NORMAL_RPC ||
565*cc02d7e2SAndroid Build Coastguard Worker method->method_type() ==
566*cc02d7e2SAndroid Build Coastguard Worker grpc::internal::RpcMethod::SERVER_STREAMING),
567*cc02d7e2SAndroid Build Coastguard Worker cq_(cq),
568*cc02d7e2SAndroid Build Coastguard Worker tag_(this),
569*cc02d7e2SAndroid Build Coastguard Worker ctx_(server_->context_allocator() != nullptr
570*cc02d7e2SAndroid Build Coastguard Worker ? server_->context_allocator()->NewCallbackServerContext()
571*cc02d7e2SAndroid Build Coastguard Worker : nullptr) {
572*cc02d7e2SAndroid Build Coastguard Worker CommonSetup(server, data);
573*cc02d7e2SAndroid Build Coastguard Worker data->deadline = &deadline_;
574*cc02d7e2SAndroid Build Coastguard Worker data->optional_payload = has_request_payload_ ? &request_payload_ : nullptr;
575*cc02d7e2SAndroid Build Coastguard Worker }
576*cc02d7e2SAndroid Build Coastguard Worker
577*cc02d7e2SAndroid Build Coastguard Worker // For generic services, method is nullptr since these services don't have
578*cc02d7e2SAndroid Build Coastguard Worker // pre-defined methods.
CallbackRequest(Server * server,grpc::CompletionQueue * cq,grpc_core::Server::BatchCallAllocation * data)579*cc02d7e2SAndroid Build Coastguard Worker CallbackRequest(Server* server, grpc::CompletionQueue* cq,
580*cc02d7e2SAndroid Build Coastguard Worker grpc_core::Server::BatchCallAllocation* data)
581*cc02d7e2SAndroid Build Coastguard Worker : server_(server),
582*cc02d7e2SAndroid Build Coastguard Worker method_(nullptr),
583*cc02d7e2SAndroid Build Coastguard Worker has_request_payload_(false),
584*cc02d7e2SAndroid Build Coastguard Worker call_details_(new grpc_call_details),
585*cc02d7e2SAndroid Build Coastguard Worker cq_(cq),
586*cc02d7e2SAndroid Build Coastguard Worker tag_(this),
587*cc02d7e2SAndroid Build Coastguard Worker ctx_(server_->context_allocator() != nullptr
588*cc02d7e2SAndroid Build Coastguard Worker ? server_->context_allocator()
589*cc02d7e2SAndroid Build Coastguard Worker ->NewGenericCallbackServerContext()
590*cc02d7e2SAndroid Build Coastguard Worker : nullptr) {
591*cc02d7e2SAndroid Build Coastguard Worker CommonSetup(server, data);
592*cc02d7e2SAndroid Build Coastguard Worker grpc_call_details_init(call_details_);
593*cc02d7e2SAndroid Build Coastguard Worker data->details = call_details_;
594*cc02d7e2SAndroid Build Coastguard Worker }
595*cc02d7e2SAndroid Build Coastguard Worker
~CallbackRequest()596*cc02d7e2SAndroid Build Coastguard Worker ~CallbackRequest() override {
597*cc02d7e2SAndroid Build Coastguard Worker delete call_details_;
598*cc02d7e2SAndroid Build Coastguard Worker grpc_metadata_array_destroy(&request_metadata_);
599*cc02d7e2SAndroid Build Coastguard Worker if (has_request_payload_ && request_payload_) {
600*cc02d7e2SAndroid Build Coastguard Worker grpc_byte_buffer_destroy(request_payload_);
601*cc02d7e2SAndroid Build Coastguard Worker }
602*cc02d7e2SAndroid Build Coastguard Worker if (ctx_alloc_by_default_ || server_->context_allocator() == nullptr) {
603*cc02d7e2SAndroid Build Coastguard Worker default_ctx_.Destroy();
604*cc02d7e2SAndroid Build Coastguard Worker }
605*cc02d7e2SAndroid Build Coastguard Worker server_->UnrefWithPossibleNotify();
606*cc02d7e2SAndroid Build Coastguard Worker }
607*cc02d7e2SAndroid Build Coastguard Worker
608*cc02d7e2SAndroid Build Coastguard Worker // Needs specialization to account for different processing of metadata
609*cc02d7e2SAndroid Build Coastguard Worker // in generic API
610*cc02d7e2SAndroid Build Coastguard Worker bool FinalizeResult(void** tag, bool* status) override;
611*cc02d7e2SAndroid Build Coastguard Worker
612*cc02d7e2SAndroid Build Coastguard Worker private:
613*cc02d7e2SAndroid Build Coastguard Worker // method_name needs to be specialized between named method and generic
614*cc02d7e2SAndroid Build Coastguard Worker const char* method_name() const;
615*cc02d7e2SAndroid Build Coastguard Worker
616*cc02d7e2SAndroid Build Coastguard Worker class CallbackCallTag : public grpc_completion_queue_functor {
617*cc02d7e2SAndroid Build Coastguard Worker public:
CallbackCallTag(Server::CallbackRequest<ServerContextType> * req)618*cc02d7e2SAndroid Build Coastguard Worker explicit CallbackCallTag(Server::CallbackRequest<ServerContextType>* req)
619*cc02d7e2SAndroid Build Coastguard Worker : req_(req) {
620*cc02d7e2SAndroid Build Coastguard Worker functor_run = &CallbackCallTag::StaticRun;
621*cc02d7e2SAndroid Build Coastguard Worker // Set inlineable to true since this callback is internally-controlled
622*cc02d7e2SAndroid Build Coastguard Worker // without taking any locks, and thus does not need to be run from the
623*cc02d7e2SAndroid Build Coastguard Worker // executor (which triggers a thread hop). This should only be used by
624*cc02d7e2SAndroid Build Coastguard Worker // internal callbacks like this and not by user application code. The work
625*cc02d7e2SAndroid Build Coastguard Worker // here is actually non-trivial, but there is no chance of having user
626*cc02d7e2SAndroid Build Coastguard Worker // locks conflict with each other so it's ok to run inlined.
627*cc02d7e2SAndroid Build Coastguard Worker inlineable = true;
628*cc02d7e2SAndroid Build Coastguard Worker }
629*cc02d7e2SAndroid Build Coastguard Worker
630*cc02d7e2SAndroid Build Coastguard Worker // force_run can not be performed on a tag if operations using this tag
631*cc02d7e2SAndroid Build Coastguard Worker // have been sent to PerformOpsOnCall. It is intended for error conditions
632*cc02d7e2SAndroid Build Coastguard Worker // that are detected before the operations are internally processed.
force_run(bool ok)633*cc02d7e2SAndroid Build Coastguard Worker void force_run(bool ok) { Run(ok); }
634*cc02d7e2SAndroid Build Coastguard Worker
635*cc02d7e2SAndroid Build Coastguard Worker private:
636*cc02d7e2SAndroid Build Coastguard Worker Server::CallbackRequest<ServerContextType>* req_;
637*cc02d7e2SAndroid Build Coastguard Worker grpc::internal::Call* call_;
638*cc02d7e2SAndroid Build Coastguard Worker
StaticRun(grpc_completion_queue_functor * cb,int ok)639*cc02d7e2SAndroid Build Coastguard Worker static void StaticRun(grpc_completion_queue_functor* cb, int ok) {
640*cc02d7e2SAndroid Build Coastguard Worker static_cast<CallbackCallTag*>(cb)->Run(static_cast<bool>(ok));
641*cc02d7e2SAndroid Build Coastguard Worker }
Run(bool ok)642*cc02d7e2SAndroid Build Coastguard Worker void Run(bool ok) {
643*cc02d7e2SAndroid Build Coastguard Worker void* ignored = req_;
644*cc02d7e2SAndroid Build Coastguard Worker bool new_ok = ok;
645*cc02d7e2SAndroid Build Coastguard Worker GPR_ASSERT(!req_->FinalizeResult(&ignored, &new_ok));
646*cc02d7e2SAndroid Build Coastguard Worker GPR_ASSERT(ignored == req_);
647*cc02d7e2SAndroid Build Coastguard Worker
648*cc02d7e2SAndroid Build Coastguard Worker if (!ok) {
649*cc02d7e2SAndroid Build Coastguard Worker // The call has been shutdown.
650*cc02d7e2SAndroid Build Coastguard Worker // Delete its contents to free up the request.
651*cc02d7e2SAndroid Build Coastguard Worker delete req_;
652*cc02d7e2SAndroid Build Coastguard Worker return;
653*cc02d7e2SAndroid Build Coastguard Worker }
654*cc02d7e2SAndroid Build Coastguard Worker
655*cc02d7e2SAndroid Build Coastguard Worker // Bind the call, deadline, and metadata from what we got
656*cc02d7e2SAndroid Build Coastguard Worker req_->ctx_->set_call(req_->call_,
657*cc02d7e2SAndroid Build Coastguard Worker req_->server_->call_metric_recording_enabled(),
658*cc02d7e2SAndroid Build Coastguard Worker req_->server_->server_metric_recorder());
659*cc02d7e2SAndroid Build Coastguard Worker req_->ctx_->cq_ = req_->cq_;
660*cc02d7e2SAndroid Build Coastguard Worker req_->ctx_->BindDeadlineAndMetadata(req_->deadline_,
661*cc02d7e2SAndroid Build Coastguard Worker &req_->request_metadata_);
662*cc02d7e2SAndroid Build Coastguard Worker req_->request_metadata_.count = 0;
663*cc02d7e2SAndroid Build Coastguard Worker
664*cc02d7e2SAndroid Build Coastguard Worker // Create a C++ Call to control the underlying core call
665*cc02d7e2SAndroid Build Coastguard Worker call_ =
666*cc02d7e2SAndroid Build Coastguard Worker new (grpc_call_arena_alloc(req_->call_, sizeof(grpc::internal::Call)))
667*cc02d7e2SAndroid Build Coastguard Worker grpc::internal::Call(
668*cc02d7e2SAndroid Build Coastguard Worker req_->call_, req_->server_, req_->cq_,
669*cc02d7e2SAndroid Build Coastguard Worker req_->server_->max_receive_message_size(),
670*cc02d7e2SAndroid Build Coastguard Worker req_->ctx_->set_server_rpc_info(
671*cc02d7e2SAndroid Build Coastguard Worker req_->method_name(),
672*cc02d7e2SAndroid Build Coastguard Worker (req_->method_ != nullptr)
673*cc02d7e2SAndroid Build Coastguard Worker ? req_->method_->method_type()
674*cc02d7e2SAndroid Build Coastguard Worker : grpc::internal::RpcMethod::BIDI_STREAMING,
675*cc02d7e2SAndroid Build Coastguard Worker req_->server_->interceptor_creators_));
676*cc02d7e2SAndroid Build Coastguard Worker
677*cc02d7e2SAndroid Build Coastguard Worker req_->interceptor_methods_.SetCall(call_);
678*cc02d7e2SAndroid Build Coastguard Worker req_->interceptor_methods_.SetReverse();
679*cc02d7e2SAndroid Build Coastguard Worker // Set interception point for RECV INITIAL METADATA
680*cc02d7e2SAndroid Build Coastguard Worker req_->interceptor_methods_.AddInterceptionHookPoint(
681*cc02d7e2SAndroid Build Coastguard Worker grpc::experimental::InterceptionHookPoints::
682*cc02d7e2SAndroid Build Coastguard Worker POST_RECV_INITIAL_METADATA);
683*cc02d7e2SAndroid Build Coastguard Worker req_->interceptor_methods_.SetRecvInitialMetadata(
684*cc02d7e2SAndroid Build Coastguard Worker &req_->ctx_->client_metadata_);
685*cc02d7e2SAndroid Build Coastguard Worker
686*cc02d7e2SAndroid Build Coastguard Worker if (req_->has_request_payload_) {
687*cc02d7e2SAndroid Build Coastguard Worker // Set interception point for RECV MESSAGE
688*cc02d7e2SAndroid Build Coastguard Worker req_->request_ = req_->method_->handler()->Deserialize(
689*cc02d7e2SAndroid Build Coastguard Worker req_->call_, req_->request_payload_, &req_->request_status_,
690*cc02d7e2SAndroid Build Coastguard Worker &req_->handler_data_);
691*cc02d7e2SAndroid Build Coastguard Worker if (!(req_->request_status_.ok())) {
692*cc02d7e2SAndroid Build Coastguard Worker gpr_log(GPR_DEBUG, "Failed to deserialize message.");
693*cc02d7e2SAndroid Build Coastguard Worker }
694*cc02d7e2SAndroid Build Coastguard Worker req_->request_payload_ = nullptr;
695*cc02d7e2SAndroid Build Coastguard Worker req_->interceptor_methods_.AddInterceptionHookPoint(
696*cc02d7e2SAndroid Build Coastguard Worker grpc::experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
697*cc02d7e2SAndroid Build Coastguard Worker req_->interceptor_methods_.SetRecvMessage(req_->request_, nullptr);
698*cc02d7e2SAndroid Build Coastguard Worker }
699*cc02d7e2SAndroid Build Coastguard Worker
700*cc02d7e2SAndroid Build Coastguard Worker if (req_->interceptor_methods_.RunInterceptors(
701*cc02d7e2SAndroid Build Coastguard Worker [this] { ContinueRunAfterInterception(); })) {
702*cc02d7e2SAndroid Build Coastguard Worker ContinueRunAfterInterception();
703*cc02d7e2SAndroid Build Coastguard Worker } else {
704*cc02d7e2SAndroid Build Coastguard Worker // There were interceptors to be run, so ContinueRunAfterInterception
705*cc02d7e2SAndroid Build Coastguard Worker // will be run when interceptors are done.
706*cc02d7e2SAndroid Build Coastguard Worker }
707*cc02d7e2SAndroid Build Coastguard Worker }
ContinueRunAfterInterception()708*cc02d7e2SAndroid Build Coastguard Worker void ContinueRunAfterInterception() {
709*cc02d7e2SAndroid Build Coastguard Worker auto* handler = (req_->method_ != nullptr)
710*cc02d7e2SAndroid Build Coastguard Worker ? req_->method_->handler()
711*cc02d7e2SAndroid Build Coastguard Worker : req_->server_->generic_handler_.get();
712*cc02d7e2SAndroid Build Coastguard Worker handler->RunHandler(grpc::internal::MethodHandler::HandlerParameter(
713*cc02d7e2SAndroid Build Coastguard Worker call_, req_->ctx_, req_->request_, req_->request_status_,
714*cc02d7e2SAndroid Build Coastguard Worker req_->handler_data_, [this] { delete req_; }));
715*cc02d7e2SAndroid Build Coastguard Worker }
716*cc02d7e2SAndroid Build Coastguard Worker };
717*cc02d7e2SAndroid Build Coastguard Worker
718*cc02d7e2SAndroid Build Coastguard Worker template <class CallAllocation>
CommonSetup(Server * server,CallAllocation * data)719*cc02d7e2SAndroid Build Coastguard Worker void CommonSetup(Server* server, CallAllocation* data) {
720*cc02d7e2SAndroid Build Coastguard Worker server->Ref();
721*cc02d7e2SAndroid Build Coastguard Worker grpc_metadata_array_init(&request_metadata_);
722*cc02d7e2SAndroid Build Coastguard Worker data->tag = static_cast<void*>(&tag_);
723*cc02d7e2SAndroid Build Coastguard Worker data->call = &call_;
724*cc02d7e2SAndroid Build Coastguard Worker data->initial_metadata = &request_metadata_;
725*cc02d7e2SAndroid Build Coastguard Worker if (ctx_ == nullptr) {
726*cc02d7e2SAndroid Build Coastguard Worker default_ctx_.Init();
727*cc02d7e2SAndroid Build Coastguard Worker ctx_ = &*default_ctx_;
728*cc02d7e2SAndroid Build Coastguard Worker ctx_alloc_by_default_ = true;
729*cc02d7e2SAndroid Build Coastguard Worker }
730*cc02d7e2SAndroid Build Coastguard Worker ctx_->set_context_allocator(server->context_allocator());
731*cc02d7e2SAndroid Build Coastguard Worker data->cq = cq_->cq();
732*cc02d7e2SAndroid Build Coastguard Worker }
733*cc02d7e2SAndroid Build Coastguard Worker
734*cc02d7e2SAndroid Build Coastguard Worker Server* const server_;
735*cc02d7e2SAndroid Build Coastguard Worker grpc::internal::RpcServiceMethod* const method_;
736*cc02d7e2SAndroid Build Coastguard Worker const bool has_request_payload_;
737*cc02d7e2SAndroid Build Coastguard Worker grpc_byte_buffer* request_payload_ = nullptr;
738*cc02d7e2SAndroid Build Coastguard Worker void* request_ = nullptr;
739*cc02d7e2SAndroid Build Coastguard Worker void* handler_data_ = nullptr;
740*cc02d7e2SAndroid Build Coastguard Worker grpc::Status request_status_;
741*cc02d7e2SAndroid Build Coastguard Worker grpc_call_details* const call_details_ = nullptr;
742*cc02d7e2SAndroid Build Coastguard Worker grpc_call* call_;
743*cc02d7e2SAndroid Build Coastguard Worker gpr_timespec deadline_;
744*cc02d7e2SAndroid Build Coastguard Worker grpc_metadata_array request_metadata_;
745*cc02d7e2SAndroid Build Coastguard Worker grpc::CompletionQueue* const cq_;
746*cc02d7e2SAndroid Build Coastguard Worker bool ctx_alloc_by_default_ = false;
747*cc02d7e2SAndroid Build Coastguard Worker CallbackCallTag tag_;
748*cc02d7e2SAndroid Build Coastguard Worker ServerContextType* ctx_ = nullptr;
749*cc02d7e2SAndroid Build Coastguard Worker grpc_core::ManualConstructor<ServerContextType> default_ctx_;
750*cc02d7e2SAndroid Build Coastguard Worker grpc::internal::InterceptorBatchMethodsImpl interceptor_methods_;
751*cc02d7e2SAndroid Build Coastguard Worker };
752*cc02d7e2SAndroid Build Coastguard Worker
753*cc02d7e2SAndroid Build Coastguard Worker template <>
FinalizeResult(void **,bool *)754*cc02d7e2SAndroid Build Coastguard Worker bool Server::CallbackRequest<grpc::CallbackServerContext>::FinalizeResult(
755*cc02d7e2SAndroid Build Coastguard Worker void** /*tag*/, bool* /*status*/) {
756*cc02d7e2SAndroid Build Coastguard Worker return false;
757*cc02d7e2SAndroid Build Coastguard Worker }
758*cc02d7e2SAndroid Build Coastguard Worker
759*cc02d7e2SAndroid Build Coastguard Worker template <>
760*cc02d7e2SAndroid Build Coastguard Worker bool Server::CallbackRequest<
FinalizeResult(void **,bool * status)761*cc02d7e2SAndroid Build Coastguard Worker grpc::GenericCallbackServerContext>::FinalizeResult(void** /*tag*/,
762*cc02d7e2SAndroid Build Coastguard Worker bool* status) {
763*cc02d7e2SAndroid Build Coastguard Worker if (*status) {
764*cc02d7e2SAndroid Build Coastguard Worker deadline_ = call_details_->deadline;
765*cc02d7e2SAndroid Build Coastguard Worker // TODO(yangg) remove the copy here
766*cc02d7e2SAndroid Build Coastguard Worker ctx_->method_ = grpc::StringFromCopiedSlice(call_details_->method);
767*cc02d7e2SAndroid Build Coastguard Worker ctx_->host_ = grpc::StringFromCopiedSlice(call_details_->host);
768*cc02d7e2SAndroid Build Coastguard Worker }
769*cc02d7e2SAndroid Build Coastguard Worker grpc_slice_unref(call_details_->method);
770*cc02d7e2SAndroid Build Coastguard Worker grpc_slice_unref(call_details_->host);
771*cc02d7e2SAndroid Build Coastguard Worker return false;
772*cc02d7e2SAndroid Build Coastguard Worker }
773*cc02d7e2SAndroid Build Coastguard Worker
774*cc02d7e2SAndroid Build Coastguard Worker template <>
method_name() const775*cc02d7e2SAndroid Build Coastguard Worker const char* Server::CallbackRequest<grpc::CallbackServerContext>::method_name()
776*cc02d7e2SAndroid Build Coastguard Worker const {
777*cc02d7e2SAndroid Build Coastguard Worker return method_->name();
778*cc02d7e2SAndroid Build Coastguard Worker }
779*cc02d7e2SAndroid Build Coastguard Worker
780*cc02d7e2SAndroid Build Coastguard Worker template <>
781*cc02d7e2SAndroid Build Coastguard Worker const char* Server::CallbackRequest<
method_name() const782*cc02d7e2SAndroid Build Coastguard Worker grpc::GenericCallbackServerContext>::method_name() const {
783*cc02d7e2SAndroid Build Coastguard Worker return ctx_->method().c_str();
784*cc02d7e2SAndroid Build Coastguard Worker }
785*cc02d7e2SAndroid Build Coastguard Worker
786*cc02d7e2SAndroid Build Coastguard Worker // Implementation of ThreadManager. Each instance of SyncRequestThreadManager
787*cc02d7e2SAndroid Build Coastguard Worker // manages a pool of threads that poll for incoming Sync RPCs and call the
788*cc02d7e2SAndroid Build Coastguard Worker // appropriate RPC handlers
789*cc02d7e2SAndroid Build Coastguard Worker class Server::SyncRequestThreadManager : public grpc::ThreadManager {
790*cc02d7e2SAndroid Build Coastguard Worker public:
SyncRequestThreadManager(Server * server,grpc::CompletionQueue * server_cq,std::shared_ptr<GlobalCallbacks> global_callbacks,grpc_resource_quota * rq,int min_pollers,int max_pollers,int cq_timeout_msec)791*cc02d7e2SAndroid Build Coastguard Worker SyncRequestThreadManager(Server* server, grpc::CompletionQueue* server_cq,
792*cc02d7e2SAndroid Build Coastguard Worker std::shared_ptr<GlobalCallbacks> global_callbacks,
793*cc02d7e2SAndroid Build Coastguard Worker grpc_resource_quota* rq, int min_pollers,
794*cc02d7e2SAndroid Build Coastguard Worker int max_pollers, int cq_timeout_msec)
795*cc02d7e2SAndroid Build Coastguard Worker : ThreadManager("SyncServer", rq, min_pollers, max_pollers),
796*cc02d7e2SAndroid Build Coastguard Worker server_(server),
797*cc02d7e2SAndroid Build Coastguard Worker server_cq_(server_cq),
798*cc02d7e2SAndroid Build Coastguard Worker cq_timeout_msec_(cq_timeout_msec),
799*cc02d7e2SAndroid Build Coastguard Worker global_callbacks_(std::move(global_callbacks)) {}
800*cc02d7e2SAndroid Build Coastguard Worker
PollForWork(void ** tag,bool * ok)801*cc02d7e2SAndroid Build Coastguard Worker WorkStatus PollForWork(void** tag, bool* ok) override {
802*cc02d7e2SAndroid Build Coastguard Worker *tag = nullptr;
803*cc02d7e2SAndroid Build Coastguard Worker // TODO(ctiller): workaround for GPR_TIMESPAN based deadlines not working
804*cc02d7e2SAndroid Build Coastguard Worker // right now
805*cc02d7e2SAndroid Build Coastguard Worker gpr_timespec deadline =
806*cc02d7e2SAndroid Build Coastguard Worker gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
807*cc02d7e2SAndroid Build Coastguard Worker gpr_time_from_millis(cq_timeout_msec_, GPR_TIMESPAN));
808*cc02d7e2SAndroid Build Coastguard Worker
809*cc02d7e2SAndroid Build Coastguard Worker switch (server_cq_->AsyncNext(tag, ok, deadline)) {
810*cc02d7e2SAndroid Build Coastguard Worker case grpc::CompletionQueue::TIMEOUT:
811*cc02d7e2SAndroid Build Coastguard Worker return TIMEOUT;
812*cc02d7e2SAndroid Build Coastguard Worker case grpc::CompletionQueue::SHUTDOWN:
813*cc02d7e2SAndroid Build Coastguard Worker return SHUTDOWN;
814*cc02d7e2SAndroid Build Coastguard Worker case grpc::CompletionQueue::GOT_EVENT:
815*cc02d7e2SAndroid Build Coastguard Worker return WORK_FOUND;
816*cc02d7e2SAndroid Build Coastguard Worker }
817*cc02d7e2SAndroid Build Coastguard Worker
818*cc02d7e2SAndroid Build Coastguard Worker GPR_UNREACHABLE_CODE(return TIMEOUT);
819*cc02d7e2SAndroid Build Coastguard Worker }
820*cc02d7e2SAndroid Build Coastguard Worker
DoWork(void * tag,bool ok,bool resources)821*cc02d7e2SAndroid Build Coastguard Worker void DoWork(void* tag, bool ok, bool resources) override {
822*cc02d7e2SAndroid Build Coastguard Worker (void)ok;
823*cc02d7e2SAndroid Build Coastguard Worker SyncRequest* sync_req = static_cast<SyncRequest*>(tag);
824*cc02d7e2SAndroid Build Coastguard Worker
825*cc02d7e2SAndroid Build Coastguard Worker // Under the AllocatingRequestMatcher model we will never see an invalid tag
826*cc02d7e2SAndroid Build Coastguard Worker // here.
827*cc02d7e2SAndroid Build Coastguard Worker GPR_DEBUG_ASSERT(sync_req != nullptr);
828*cc02d7e2SAndroid Build Coastguard Worker GPR_DEBUG_ASSERT(ok);
829*cc02d7e2SAndroid Build Coastguard Worker
830*cc02d7e2SAndroid Build Coastguard Worker sync_req->Run(global_callbacks_, resources);
831*cc02d7e2SAndroid Build Coastguard Worker }
832*cc02d7e2SAndroid Build Coastguard Worker
AddSyncMethod(grpc::internal::RpcServiceMethod * method,void * tag)833*cc02d7e2SAndroid Build Coastguard Worker void AddSyncMethod(grpc::internal::RpcServiceMethod* method, void* tag) {
834*cc02d7e2SAndroid Build Coastguard Worker grpc_core::Server::FromC(server_->server())
835*cc02d7e2SAndroid Build Coastguard Worker ->SetRegisteredMethodAllocator(server_cq_->cq(), tag, [this, method] {
836*cc02d7e2SAndroid Build Coastguard Worker grpc_core::Server::RegisteredCallAllocation result;
837*cc02d7e2SAndroid Build Coastguard Worker new SyncRequest(server_, method, &result);
838*cc02d7e2SAndroid Build Coastguard Worker return result;
839*cc02d7e2SAndroid Build Coastguard Worker });
840*cc02d7e2SAndroid Build Coastguard Worker has_sync_method_ = true;
841*cc02d7e2SAndroid Build Coastguard Worker }
842*cc02d7e2SAndroid Build Coastguard Worker
AddUnknownSyncMethod()843*cc02d7e2SAndroid Build Coastguard Worker void AddUnknownSyncMethod() {
844*cc02d7e2SAndroid Build Coastguard Worker if (has_sync_method_) {
845*cc02d7e2SAndroid Build Coastguard Worker unknown_method_ = std::make_unique<grpc::internal::RpcServiceMethod>(
846*cc02d7e2SAndroid Build Coastguard Worker "unknown", grpc::internal::RpcMethod::BIDI_STREAMING,
847*cc02d7e2SAndroid Build Coastguard Worker new grpc::internal::UnknownMethodHandler(kUnknownRpcMethod));
848*cc02d7e2SAndroid Build Coastguard Worker grpc_core::Server::FromC(server_->server())
849*cc02d7e2SAndroid Build Coastguard Worker ->SetBatchMethodAllocator(server_cq_->cq(), [this] {
850*cc02d7e2SAndroid Build Coastguard Worker grpc_core::Server::BatchCallAllocation result;
851*cc02d7e2SAndroid Build Coastguard Worker new SyncRequest(server_, unknown_method_.get(), &result);
852*cc02d7e2SAndroid Build Coastguard Worker return result;
853*cc02d7e2SAndroid Build Coastguard Worker });
854*cc02d7e2SAndroid Build Coastguard Worker }
855*cc02d7e2SAndroid Build Coastguard Worker }
856*cc02d7e2SAndroid Build Coastguard Worker
Shutdown()857*cc02d7e2SAndroid Build Coastguard Worker void Shutdown() override {
858*cc02d7e2SAndroid Build Coastguard Worker ThreadManager::Shutdown();
859*cc02d7e2SAndroid Build Coastguard Worker server_cq_->Shutdown();
860*cc02d7e2SAndroid Build Coastguard Worker }
861*cc02d7e2SAndroid Build Coastguard Worker
Wait()862*cc02d7e2SAndroid Build Coastguard Worker void Wait() override {
863*cc02d7e2SAndroid Build Coastguard Worker ThreadManager::Wait();
864*cc02d7e2SAndroid Build Coastguard Worker // Drain any pending items from the queue
865*cc02d7e2SAndroid Build Coastguard Worker void* tag;
866*cc02d7e2SAndroid Build Coastguard Worker bool ok;
867*cc02d7e2SAndroid Build Coastguard Worker while (server_cq_->Next(&tag, &ok)) {
868*cc02d7e2SAndroid Build Coastguard Worker // This problem can arise if the server CQ gets a request queued to it
869*cc02d7e2SAndroid Build Coastguard Worker // before it gets shutdown but then pulls it after shutdown.
870*cc02d7e2SAndroid Build Coastguard Worker static_cast<SyncRequest*>(tag)->Cleanup();
871*cc02d7e2SAndroid Build Coastguard Worker }
872*cc02d7e2SAndroid Build Coastguard Worker }
873*cc02d7e2SAndroid Build Coastguard Worker
Start()874*cc02d7e2SAndroid Build Coastguard Worker void Start() {
875*cc02d7e2SAndroid Build Coastguard Worker if (has_sync_method_) {
876*cc02d7e2SAndroid Build Coastguard Worker Initialize(); // ThreadManager's Initialize()
877*cc02d7e2SAndroid Build Coastguard Worker }
878*cc02d7e2SAndroid Build Coastguard Worker }
879*cc02d7e2SAndroid Build Coastguard Worker
880*cc02d7e2SAndroid Build Coastguard Worker private:
881*cc02d7e2SAndroid Build Coastguard Worker Server* server_;
882*cc02d7e2SAndroid Build Coastguard Worker grpc::CompletionQueue* server_cq_;
883*cc02d7e2SAndroid Build Coastguard Worker int cq_timeout_msec_;
884*cc02d7e2SAndroid Build Coastguard Worker bool has_sync_method_ = false;
885*cc02d7e2SAndroid Build Coastguard Worker std::unique_ptr<grpc::internal::RpcServiceMethod> unknown_method_;
886*cc02d7e2SAndroid Build Coastguard Worker std::shared_ptr<Server::GlobalCallbacks> global_callbacks_;
887*cc02d7e2SAndroid Build Coastguard Worker };
888*cc02d7e2SAndroid Build Coastguard Worker
Server(grpc::ChannelArguments * args,std::shared_ptr<std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>> sync_server_cqs,int min_pollers,int max_pollers,int sync_cq_timeout_msec,std::vector<std::shared_ptr<grpc::internal::ExternalConnectionAcceptorImpl>> acceptors,grpc_server_config_fetcher * server_config_fetcher,grpc_resource_quota * server_rq,std::vector<std::unique_ptr<grpc::experimental::ServerInterceptorFactoryInterface>> interceptor_creators,experimental::ServerMetricRecorder * server_metric_recorder)889*cc02d7e2SAndroid Build Coastguard Worker Server::Server(
890*cc02d7e2SAndroid Build Coastguard Worker grpc::ChannelArguments* args,
891*cc02d7e2SAndroid Build Coastguard Worker std::shared_ptr<std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>>
892*cc02d7e2SAndroid Build Coastguard Worker sync_server_cqs,
893*cc02d7e2SAndroid Build Coastguard Worker int min_pollers, int max_pollers, int sync_cq_timeout_msec,
894*cc02d7e2SAndroid Build Coastguard Worker std::vector<std::shared_ptr<grpc::internal::ExternalConnectionAcceptorImpl>>
895*cc02d7e2SAndroid Build Coastguard Worker acceptors,
896*cc02d7e2SAndroid Build Coastguard Worker grpc_server_config_fetcher* server_config_fetcher,
897*cc02d7e2SAndroid Build Coastguard Worker grpc_resource_quota* server_rq,
898*cc02d7e2SAndroid Build Coastguard Worker std::vector<
899*cc02d7e2SAndroid Build Coastguard Worker std::unique_ptr<grpc::experimental::ServerInterceptorFactoryInterface>>
900*cc02d7e2SAndroid Build Coastguard Worker interceptor_creators,
901*cc02d7e2SAndroid Build Coastguard Worker experimental::ServerMetricRecorder* server_metric_recorder)
902*cc02d7e2SAndroid Build Coastguard Worker : acceptors_(std::move(acceptors)),
903*cc02d7e2SAndroid Build Coastguard Worker interceptor_creators_(std::move(interceptor_creators)),
904*cc02d7e2SAndroid Build Coastguard Worker max_receive_message_size_(INT_MIN),
905*cc02d7e2SAndroid Build Coastguard Worker sync_server_cqs_(std::move(sync_server_cqs)),
906*cc02d7e2SAndroid Build Coastguard Worker started_(false),
907*cc02d7e2SAndroid Build Coastguard Worker shutdown_(false),
908*cc02d7e2SAndroid Build Coastguard Worker shutdown_notified_(false),
909*cc02d7e2SAndroid Build Coastguard Worker server_(nullptr),
910*cc02d7e2SAndroid Build Coastguard Worker server_initializer_(new ServerInitializer(this)),
911*cc02d7e2SAndroid Build Coastguard Worker health_check_service_disabled_(false),
912*cc02d7e2SAndroid Build Coastguard Worker server_metric_recorder_(server_metric_recorder) {
913*cc02d7e2SAndroid Build Coastguard Worker gpr_once_init(&grpc::g_once_init_callbacks, grpc::InitGlobalCallbacks);
914*cc02d7e2SAndroid Build Coastguard Worker global_callbacks_ = grpc::g_callbacks;
915*cc02d7e2SAndroid Build Coastguard Worker global_callbacks_->UpdateArguments(args);
916*cc02d7e2SAndroid Build Coastguard Worker
917*cc02d7e2SAndroid Build Coastguard Worker if (sync_server_cqs_ != nullptr) {
918*cc02d7e2SAndroid Build Coastguard Worker bool default_rq_created = false;
919*cc02d7e2SAndroid Build Coastguard Worker if (server_rq == nullptr) {
920*cc02d7e2SAndroid Build Coastguard Worker server_rq = grpc_resource_quota_create("SyncServer-default-rq");
921*cc02d7e2SAndroid Build Coastguard Worker grpc_resource_quota_set_max_threads(server_rq,
922*cc02d7e2SAndroid Build Coastguard Worker DEFAULT_MAX_SYNC_SERVER_THREADS);
923*cc02d7e2SAndroid Build Coastguard Worker default_rq_created = true;
924*cc02d7e2SAndroid Build Coastguard Worker }
925*cc02d7e2SAndroid Build Coastguard Worker
926*cc02d7e2SAndroid Build Coastguard Worker for (const auto& it : *sync_server_cqs_) {
927*cc02d7e2SAndroid Build Coastguard Worker sync_req_mgrs_.emplace_back(new SyncRequestThreadManager(
928*cc02d7e2SAndroid Build Coastguard Worker this, it.get(), global_callbacks_, server_rq, min_pollers,
929*cc02d7e2SAndroid Build Coastguard Worker max_pollers, sync_cq_timeout_msec));
930*cc02d7e2SAndroid Build Coastguard Worker }
931*cc02d7e2SAndroid Build Coastguard Worker
932*cc02d7e2SAndroid Build Coastguard Worker if (default_rq_created) {
933*cc02d7e2SAndroid Build Coastguard Worker grpc_resource_quota_unref(server_rq);
934*cc02d7e2SAndroid Build Coastguard Worker }
935*cc02d7e2SAndroid Build Coastguard Worker }
936*cc02d7e2SAndroid Build Coastguard Worker
937*cc02d7e2SAndroid Build Coastguard Worker for (auto& acceptor : acceptors_) {
938*cc02d7e2SAndroid Build Coastguard Worker acceptor->SetToChannelArgs(args);
939*cc02d7e2SAndroid Build Coastguard Worker }
940*cc02d7e2SAndroid Build Coastguard Worker
941*cc02d7e2SAndroid Build Coastguard Worker grpc_channel_args channel_args;
942*cc02d7e2SAndroid Build Coastguard Worker args->SetChannelArgs(&channel_args);
943*cc02d7e2SAndroid Build Coastguard Worker
944*cc02d7e2SAndroid Build Coastguard Worker for (size_t i = 0; i < channel_args.num_args; i++) {
945*cc02d7e2SAndroid Build Coastguard Worker if (0 == strcmp(channel_args.args[i].key,
946*cc02d7e2SAndroid Build Coastguard Worker grpc::kHealthCheckServiceInterfaceArg)) {
947*cc02d7e2SAndroid Build Coastguard Worker if (channel_args.args[i].value.pointer.p == nullptr) {
948*cc02d7e2SAndroid Build Coastguard Worker health_check_service_disabled_ = true;
949*cc02d7e2SAndroid Build Coastguard Worker } else {
950*cc02d7e2SAndroid Build Coastguard Worker health_check_service_.reset(
951*cc02d7e2SAndroid Build Coastguard Worker static_cast<grpc::HealthCheckServiceInterface*>(
952*cc02d7e2SAndroid Build Coastguard Worker channel_args.args[i].value.pointer.p));
953*cc02d7e2SAndroid Build Coastguard Worker }
954*cc02d7e2SAndroid Build Coastguard Worker }
955*cc02d7e2SAndroid Build Coastguard Worker if (0 ==
956*cc02d7e2SAndroid Build Coastguard Worker strcmp(channel_args.args[i].key, GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH)) {
957*cc02d7e2SAndroid Build Coastguard Worker max_receive_message_size_ = channel_args.args[i].value.integer;
958*cc02d7e2SAndroid Build Coastguard Worker }
959*cc02d7e2SAndroid Build Coastguard Worker if (0 == strcmp(channel_args.args[i].key,
960*cc02d7e2SAndroid Build Coastguard Worker GRPC_ARG_SERVER_CALL_METRIC_RECORDING)) {
961*cc02d7e2SAndroid Build Coastguard Worker call_metric_recording_enabled_ = channel_args.args[i].value.integer;
962*cc02d7e2SAndroid Build Coastguard Worker }
963*cc02d7e2SAndroid Build Coastguard Worker }
964*cc02d7e2SAndroid Build Coastguard Worker server_ = grpc_server_create(&channel_args, nullptr);
965*cc02d7e2SAndroid Build Coastguard Worker grpc_server_set_config_fetcher(server_, server_config_fetcher);
966*cc02d7e2SAndroid Build Coastguard Worker }
967*cc02d7e2SAndroid Build Coastguard Worker
~Server()968*cc02d7e2SAndroid Build Coastguard Worker Server::~Server() {
969*cc02d7e2SAndroid Build Coastguard Worker {
970*cc02d7e2SAndroid Build Coastguard Worker grpc::internal::ReleasableMutexLock lock(&mu_);
971*cc02d7e2SAndroid Build Coastguard Worker if (started_ && !shutdown_) {
972*cc02d7e2SAndroid Build Coastguard Worker lock.Release();
973*cc02d7e2SAndroid Build Coastguard Worker Shutdown();
974*cc02d7e2SAndroid Build Coastguard Worker } else if (!started_) {
975*cc02d7e2SAndroid Build Coastguard Worker // Shutdown the completion queues
976*cc02d7e2SAndroid Build Coastguard Worker for (const auto& value : sync_req_mgrs_) {
977*cc02d7e2SAndroid Build Coastguard Worker value->Shutdown();
978*cc02d7e2SAndroid Build Coastguard Worker }
979*cc02d7e2SAndroid Build Coastguard Worker CompletionQueue* callback_cq =
980*cc02d7e2SAndroid Build Coastguard Worker callback_cq_.load(std::memory_order_relaxed);
981*cc02d7e2SAndroid Build Coastguard Worker if (callback_cq != nullptr) {
982*cc02d7e2SAndroid Build Coastguard Worker if (grpc_iomgr_run_in_background()) {
983*cc02d7e2SAndroid Build Coastguard Worker // gRPC-core provides the backing needed for the preferred CQ type
984*cc02d7e2SAndroid Build Coastguard Worker callback_cq->Shutdown();
985*cc02d7e2SAndroid Build Coastguard Worker } else {
986*cc02d7e2SAndroid Build Coastguard Worker CompletionQueue::ReleaseCallbackAlternativeCQ(callback_cq);
987*cc02d7e2SAndroid Build Coastguard Worker }
988*cc02d7e2SAndroid Build Coastguard Worker callback_cq_.store(nullptr, std::memory_order_release);
989*cc02d7e2SAndroid Build Coastguard Worker }
990*cc02d7e2SAndroid Build Coastguard Worker }
991*cc02d7e2SAndroid Build Coastguard Worker }
992*cc02d7e2SAndroid Build Coastguard Worker // Destroy health check service before we destroy the C server so that
993*cc02d7e2SAndroid Build Coastguard Worker // it does not call grpc_server_request_registered_call() after the C
994*cc02d7e2SAndroid Build Coastguard Worker // server has been destroyed.
995*cc02d7e2SAndroid Build Coastguard Worker health_check_service_.reset();
996*cc02d7e2SAndroid Build Coastguard Worker grpc_server_destroy(server_);
997*cc02d7e2SAndroid Build Coastguard Worker }
998*cc02d7e2SAndroid Build Coastguard Worker
SetGlobalCallbacks(GlobalCallbacks * callbacks)999*cc02d7e2SAndroid Build Coastguard Worker void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) {
1000*cc02d7e2SAndroid Build Coastguard Worker GPR_ASSERT(!grpc::g_callbacks);
1001*cc02d7e2SAndroid Build Coastguard Worker GPR_ASSERT(callbacks);
1002*cc02d7e2SAndroid Build Coastguard Worker grpc::g_callbacks.reset(callbacks);
1003*cc02d7e2SAndroid Build Coastguard Worker }
1004*cc02d7e2SAndroid Build Coastguard Worker
c_server()1005*cc02d7e2SAndroid Build Coastguard Worker grpc_server* Server::c_server() { return server_; }
1006*cc02d7e2SAndroid Build Coastguard Worker
InProcessChannel(const grpc::ChannelArguments & args)1007*cc02d7e2SAndroid Build Coastguard Worker std::shared_ptr<grpc::Channel> Server::InProcessChannel(
1008*cc02d7e2SAndroid Build Coastguard Worker const grpc::ChannelArguments& args) {
1009*cc02d7e2SAndroid Build Coastguard Worker grpc_channel_args channel_args = args.c_channel_args();
1010*cc02d7e2SAndroid Build Coastguard Worker return grpc::CreateChannelInternal(
1011*cc02d7e2SAndroid Build Coastguard Worker "inproc", grpc_inproc_channel_create(server_, &channel_args, nullptr),
1012*cc02d7e2SAndroid Build Coastguard Worker std::vector<std::unique_ptr<
1013*cc02d7e2SAndroid Build Coastguard Worker grpc::experimental::ClientInterceptorFactoryInterface>>());
1014*cc02d7e2SAndroid Build Coastguard Worker }
1015*cc02d7e2SAndroid Build Coastguard Worker
1016*cc02d7e2SAndroid Build Coastguard Worker std::shared_ptr<grpc::Channel>
InProcessChannelWithInterceptors(const grpc::ChannelArguments & args,std::vector<std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>> interceptor_creators)1017*cc02d7e2SAndroid Build Coastguard Worker Server::experimental_type::InProcessChannelWithInterceptors(
1018*cc02d7e2SAndroid Build Coastguard Worker const grpc::ChannelArguments& args,
1019*cc02d7e2SAndroid Build Coastguard Worker std::vector<
1020*cc02d7e2SAndroid Build Coastguard Worker std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>>
1021*cc02d7e2SAndroid Build Coastguard Worker interceptor_creators) {
1022*cc02d7e2SAndroid Build Coastguard Worker grpc_channel_args channel_args = args.c_channel_args();
1023*cc02d7e2SAndroid Build Coastguard Worker return grpc::CreateChannelInternal(
1024*cc02d7e2SAndroid Build Coastguard Worker "inproc",
1025*cc02d7e2SAndroid Build Coastguard Worker grpc_inproc_channel_create(server_->server_, &channel_args, nullptr),
1026*cc02d7e2SAndroid Build Coastguard Worker std::move(interceptor_creators));
1027*cc02d7e2SAndroid Build Coastguard Worker }
1028*cc02d7e2SAndroid Build Coastguard Worker
PayloadHandlingForMethod(grpc::internal::RpcServiceMethod * method)1029*cc02d7e2SAndroid Build Coastguard Worker static grpc_server_register_method_payload_handling PayloadHandlingForMethod(
1030*cc02d7e2SAndroid Build Coastguard Worker grpc::internal::RpcServiceMethod* method) {
1031*cc02d7e2SAndroid Build Coastguard Worker switch (method->method_type()) {
1032*cc02d7e2SAndroid Build Coastguard Worker case grpc::internal::RpcMethod::NORMAL_RPC:
1033*cc02d7e2SAndroid Build Coastguard Worker case grpc::internal::RpcMethod::SERVER_STREAMING:
1034*cc02d7e2SAndroid Build Coastguard Worker return GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER;
1035*cc02d7e2SAndroid Build Coastguard Worker case grpc::internal::RpcMethod::CLIENT_STREAMING:
1036*cc02d7e2SAndroid Build Coastguard Worker case grpc::internal::RpcMethod::BIDI_STREAMING:
1037*cc02d7e2SAndroid Build Coastguard Worker return GRPC_SRM_PAYLOAD_NONE;
1038*cc02d7e2SAndroid Build Coastguard Worker }
1039*cc02d7e2SAndroid Build Coastguard Worker GPR_UNREACHABLE_CODE(return GRPC_SRM_PAYLOAD_NONE;);
1040*cc02d7e2SAndroid Build Coastguard Worker }
1041*cc02d7e2SAndroid Build Coastguard Worker
RegisterService(const std::string * addr,grpc::Service * service)1042*cc02d7e2SAndroid Build Coastguard Worker bool Server::RegisterService(const std::string* addr, grpc::Service* service) {
1043*cc02d7e2SAndroid Build Coastguard Worker bool has_async_methods = service->has_async_methods();
1044*cc02d7e2SAndroid Build Coastguard Worker if (has_async_methods) {
1045*cc02d7e2SAndroid Build Coastguard Worker GPR_ASSERT(service->server_ == nullptr &&
1046*cc02d7e2SAndroid Build Coastguard Worker "Can only register an asynchronous service against one server.");
1047*cc02d7e2SAndroid Build Coastguard Worker service->server_ = this;
1048*cc02d7e2SAndroid Build Coastguard Worker }
1049*cc02d7e2SAndroid Build Coastguard Worker
1050*cc02d7e2SAndroid Build Coastguard Worker const char* method_name = nullptr;
1051*cc02d7e2SAndroid Build Coastguard Worker
1052*cc02d7e2SAndroid Build Coastguard Worker for (const auto& method : service->methods_) {
1053*cc02d7e2SAndroid Build Coastguard Worker if (method == nullptr) { // Handled by generic service if any.
1054*cc02d7e2SAndroid Build Coastguard Worker continue;
1055*cc02d7e2SAndroid Build Coastguard Worker }
1056*cc02d7e2SAndroid Build Coastguard Worker
1057*cc02d7e2SAndroid Build Coastguard Worker void* method_registration_tag = grpc_server_register_method(
1058*cc02d7e2SAndroid Build Coastguard Worker server_, method->name(), addr ? addr->c_str() : nullptr,
1059*cc02d7e2SAndroid Build Coastguard Worker PayloadHandlingForMethod(method.get()), 0);
1060*cc02d7e2SAndroid Build Coastguard Worker if (method_registration_tag == nullptr) {
1061*cc02d7e2SAndroid Build Coastguard Worker gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
1062*cc02d7e2SAndroid Build Coastguard Worker method->name());
1063*cc02d7e2SAndroid Build Coastguard Worker return false;
1064*cc02d7e2SAndroid Build Coastguard Worker }
1065*cc02d7e2SAndroid Build Coastguard Worker
1066*cc02d7e2SAndroid Build Coastguard Worker if (method->handler() == nullptr) { // Async method without handler
1067*cc02d7e2SAndroid Build Coastguard Worker method->set_server_tag(method_registration_tag);
1068*cc02d7e2SAndroid Build Coastguard Worker } else if (method->api_type() ==
1069*cc02d7e2SAndroid Build Coastguard Worker grpc::internal::RpcServiceMethod::ApiType::SYNC) {
1070*cc02d7e2SAndroid Build Coastguard Worker for (const auto& value : sync_req_mgrs_) {
1071*cc02d7e2SAndroid Build Coastguard Worker value->AddSyncMethod(method.get(), method_registration_tag);
1072*cc02d7e2SAndroid Build Coastguard Worker }
1073*cc02d7e2SAndroid Build Coastguard Worker } else {
1074*cc02d7e2SAndroid Build Coastguard Worker has_callback_methods_ = true;
1075*cc02d7e2SAndroid Build Coastguard Worker grpc::internal::RpcServiceMethod* method_value = method.get();
1076*cc02d7e2SAndroid Build Coastguard Worker grpc::CompletionQueue* cq = CallbackCQ();
1077*cc02d7e2SAndroid Build Coastguard Worker grpc_server_register_completion_queue(server_, cq->cq(), nullptr);
1078*cc02d7e2SAndroid Build Coastguard Worker grpc_core::Server::FromC(server_)->SetRegisteredMethodAllocator(
1079*cc02d7e2SAndroid Build Coastguard Worker cq->cq(), method_registration_tag, [this, cq, method_value] {
1080*cc02d7e2SAndroid Build Coastguard Worker grpc_core::Server::RegisteredCallAllocation result;
1081*cc02d7e2SAndroid Build Coastguard Worker new CallbackRequest<grpc::CallbackServerContext>(this, method_value,
1082*cc02d7e2SAndroid Build Coastguard Worker cq, &result);
1083*cc02d7e2SAndroid Build Coastguard Worker return result;
1084*cc02d7e2SAndroid Build Coastguard Worker });
1085*cc02d7e2SAndroid Build Coastguard Worker }
1086*cc02d7e2SAndroid Build Coastguard Worker
1087*cc02d7e2SAndroid Build Coastguard Worker method_name = method->name();
1088*cc02d7e2SAndroid Build Coastguard Worker }
1089*cc02d7e2SAndroid Build Coastguard Worker
1090*cc02d7e2SAndroid Build Coastguard Worker // Parse service name.
1091*cc02d7e2SAndroid Build Coastguard Worker if (method_name != nullptr) {
1092*cc02d7e2SAndroid Build Coastguard Worker std::stringstream ss(method_name);
1093*cc02d7e2SAndroid Build Coastguard Worker std::string service_name;
1094*cc02d7e2SAndroid Build Coastguard Worker if (std::getline(ss, service_name, '/') &&
1095*cc02d7e2SAndroid Build Coastguard Worker std::getline(ss, service_name, '/')) {
1096*cc02d7e2SAndroid Build Coastguard Worker services_.push_back(service_name);
1097*cc02d7e2SAndroid Build Coastguard Worker }
1098*cc02d7e2SAndroid Build Coastguard Worker }
1099*cc02d7e2SAndroid Build Coastguard Worker return true;
1100*cc02d7e2SAndroid Build Coastguard Worker }
1101*cc02d7e2SAndroid Build Coastguard Worker
RegisterAsyncGenericService(grpc::AsyncGenericService * service)1102*cc02d7e2SAndroid Build Coastguard Worker void Server::RegisterAsyncGenericService(grpc::AsyncGenericService* service) {
1103*cc02d7e2SAndroid Build Coastguard Worker GPR_ASSERT(service->server_ == nullptr &&
1104*cc02d7e2SAndroid Build Coastguard Worker "Can only register an async generic service against one server.");
1105*cc02d7e2SAndroid Build Coastguard Worker service->server_ = this;
1106*cc02d7e2SAndroid Build Coastguard Worker has_async_generic_service_ = true;
1107*cc02d7e2SAndroid Build Coastguard Worker }
1108*cc02d7e2SAndroid Build Coastguard Worker
RegisterCallbackGenericService(grpc::CallbackGenericService * service)1109*cc02d7e2SAndroid Build Coastguard Worker void Server::RegisterCallbackGenericService(
1110*cc02d7e2SAndroid Build Coastguard Worker grpc::CallbackGenericService* service) {
1111*cc02d7e2SAndroid Build Coastguard Worker GPR_ASSERT(
1112*cc02d7e2SAndroid Build Coastguard Worker service->server_ == nullptr &&
1113*cc02d7e2SAndroid Build Coastguard Worker "Can only register a callback generic service against one server.");
1114*cc02d7e2SAndroid Build Coastguard Worker service->server_ = this;
1115*cc02d7e2SAndroid Build Coastguard Worker has_callback_generic_service_ = true;
1116*cc02d7e2SAndroid Build Coastguard Worker generic_handler_.reset(service->Handler());
1117*cc02d7e2SAndroid Build Coastguard Worker
1118*cc02d7e2SAndroid Build Coastguard Worker grpc::CompletionQueue* cq = CallbackCQ();
1119*cc02d7e2SAndroid Build Coastguard Worker grpc_core::Server::FromC(server_)->SetBatchMethodAllocator(cq->cq(), [this,
1120*cc02d7e2SAndroid Build Coastguard Worker cq] {
1121*cc02d7e2SAndroid Build Coastguard Worker grpc_core::Server::BatchCallAllocation result;
1122*cc02d7e2SAndroid Build Coastguard Worker new CallbackRequest<grpc::GenericCallbackServerContext>(this, cq, &result);
1123*cc02d7e2SAndroid Build Coastguard Worker return result;
1124*cc02d7e2SAndroid Build Coastguard Worker });
1125*cc02d7e2SAndroid Build Coastguard Worker }
1126*cc02d7e2SAndroid Build Coastguard Worker
AddListeningPort(const std::string & addr,grpc::ServerCredentials * creds)1127*cc02d7e2SAndroid Build Coastguard Worker int Server::AddListeningPort(const std::string& addr,
1128*cc02d7e2SAndroid Build Coastguard Worker grpc::ServerCredentials* creds) {
1129*cc02d7e2SAndroid Build Coastguard Worker GPR_ASSERT(!started_);
1130*cc02d7e2SAndroid Build Coastguard Worker int port = creds->AddPortToServer(addr, server_);
1131*cc02d7e2SAndroid Build Coastguard Worker global_callbacks_->AddPort(this, addr, creds, port);
1132*cc02d7e2SAndroid Build Coastguard Worker return port;
1133*cc02d7e2SAndroid Build Coastguard Worker }
1134*cc02d7e2SAndroid Build Coastguard Worker
Ref()1135*cc02d7e2SAndroid Build Coastguard Worker void Server::Ref() {
1136*cc02d7e2SAndroid Build Coastguard Worker shutdown_refs_outstanding_.fetch_add(1, std::memory_order_relaxed);
1137*cc02d7e2SAndroid Build Coastguard Worker }
1138*cc02d7e2SAndroid Build Coastguard Worker
UnrefWithPossibleNotify()1139*cc02d7e2SAndroid Build Coastguard Worker void Server::UnrefWithPossibleNotify() {
1140*cc02d7e2SAndroid Build Coastguard Worker if (GPR_UNLIKELY(shutdown_refs_outstanding_.fetch_sub(
1141*cc02d7e2SAndroid Build Coastguard Worker 1, std::memory_order_acq_rel) == 1)) {
1142*cc02d7e2SAndroid Build Coastguard Worker // No refs outstanding means that shutdown has been initiated and no more
1143*cc02d7e2SAndroid Build Coastguard Worker // callback requests are outstanding.
1144*cc02d7e2SAndroid Build Coastguard Worker grpc::internal::MutexLock lock(&mu_);
1145*cc02d7e2SAndroid Build Coastguard Worker GPR_ASSERT(shutdown_);
1146*cc02d7e2SAndroid Build Coastguard Worker shutdown_done_ = true;
1147*cc02d7e2SAndroid Build Coastguard Worker shutdown_done_cv_.Signal();
1148*cc02d7e2SAndroid Build Coastguard Worker }
1149*cc02d7e2SAndroid Build Coastguard Worker }
1150*cc02d7e2SAndroid Build Coastguard Worker
UnrefAndWaitLocked()1151*cc02d7e2SAndroid Build Coastguard Worker void Server::UnrefAndWaitLocked() {
1152*cc02d7e2SAndroid Build Coastguard Worker if (GPR_UNLIKELY(shutdown_refs_outstanding_.fetch_sub(
1153*cc02d7e2SAndroid Build Coastguard Worker 1, std::memory_order_acq_rel) == 1)) {
1154*cc02d7e2SAndroid Build Coastguard Worker shutdown_done_ = true;
1155*cc02d7e2SAndroid Build Coastguard Worker return; // no need to wait on CV since done condition already set
1156*cc02d7e2SAndroid Build Coastguard Worker }
1157*cc02d7e2SAndroid Build Coastguard Worker while (!shutdown_done_) {
1158*cc02d7e2SAndroid Build Coastguard Worker shutdown_done_cv_.Wait(&mu_);
1159*cc02d7e2SAndroid Build Coastguard Worker }
1160*cc02d7e2SAndroid Build Coastguard Worker }
1161*cc02d7e2SAndroid Build Coastguard Worker
Start(grpc::ServerCompletionQueue ** cqs,size_t num_cqs)1162*cc02d7e2SAndroid Build Coastguard Worker void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) {
1163*cc02d7e2SAndroid Build Coastguard Worker GPR_ASSERT(!started_);
1164*cc02d7e2SAndroid Build Coastguard Worker global_callbacks_->PreServerStart(this);
1165*cc02d7e2SAndroid Build Coastguard Worker started_ = true;
1166*cc02d7e2SAndroid Build Coastguard Worker
1167*cc02d7e2SAndroid Build Coastguard Worker // Only create default health check service when user did not provide an
1168*cc02d7e2SAndroid Build Coastguard Worker // explicit one.
1169*cc02d7e2SAndroid Build Coastguard Worker if (health_check_service_ == nullptr && !health_check_service_disabled_ &&
1170*cc02d7e2SAndroid Build Coastguard Worker grpc::DefaultHealthCheckServiceEnabled()) {
1171*cc02d7e2SAndroid Build Coastguard Worker auto default_hc_service = std::make_unique<DefaultHealthCheckService>();
1172*cc02d7e2SAndroid Build Coastguard Worker auto* hc_service_impl = default_hc_service->GetHealthCheckService();
1173*cc02d7e2SAndroid Build Coastguard Worker health_check_service_ = std::move(default_hc_service);
1174*cc02d7e2SAndroid Build Coastguard Worker RegisterService(nullptr, hc_service_impl);
1175*cc02d7e2SAndroid Build Coastguard Worker }
1176*cc02d7e2SAndroid Build Coastguard Worker
1177*cc02d7e2SAndroid Build Coastguard Worker for (auto& acceptor : acceptors_) {
1178*cc02d7e2SAndroid Build Coastguard Worker acceptor->GetCredentials()->AddPortToServer(acceptor->name(), server_);
1179*cc02d7e2SAndroid Build Coastguard Worker }
1180*cc02d7e2SAndroid Build Coastguard Worker
1181*cc02d7e2SAndroid Build Coastguard Worker #ifndef NDEBUG
1182*cc02d7e2SAndroid Build Coastguard Worker for (size_t i = 0; i < num_cqs; i++) {
1183*cc02d7e2SAndroid Build Coastguard Worker cq_list_.push_back(cqs[i]);
1184*cc02d7e2SAndroid Build Coastguard Worker }
1185*cc02d7e2SAndroid Build Coastguard Worker #endif
1186*cc02d7e2SAndroid Build Coastguard Worker
1187*cc02d7e2SAndroid Build Coastguard Worker // We must have exactly one generic service to handle requests for
1188*cc02d7e2SAndroid Build Coastguard Worker // unmatched method names (i.e., to return UNIMPLEMENTED for any RPC
1189*cc02d7e2SAndroid Build Coastguard Worker // method for which we don't have a registered implementation). This
1190*cc02d7e2SAndroid Build Coastguard Worker // service comes from one of the following places (first match wins):
1191*cc02d7e2SAndroid Build Coastguard Worker // - If the application supplied a generic service via either the async
1192*cc02d7e2SAndroid Build Coastguard Worker // or callback APIs, we use that.
1193*cc02d7e2SAndroid Build Coastguard Worker // - If there are callback methods, register a callback generic service.
1194*cc02d7e2SAndroid Build Coastguard Worker // - If there are sync methods, register a sync generic service.
1195*cc02d7e2SAndroid Build Coastguard Worker // (This must be done before server start to initialize an
1196*cc02d7e2SAndroid Build Coastguard Worker // AllocatingRequestMatcher.)
1197*cc02d7e2SAndroid Build Coastguard Worker // - Otherwise (we have only async methods), we wait until the server
1198*cc02d7e2SAndroid Build Coastguard Worker // is started and then start an UnimplementedAsyncRequest on each
1199*cc02d7e2SAndroid Build Coastguard Worker // async CQ, so that the requests will be moved along by polling
1200*cc02d7e2SAndroid Build Coastguard Worker // done in application threads.
1201*cc02d7e2SAndroid Build Coastguard Worker bool unknown_rpc_needed =
1202*cc02d7e2SAndroid Build Coastguard Worker !has_async_generic_service_ && !has_callback_generic_service_;
1203*cc02d7e2SAndroid Build Coastguard Worker if (unknown_rpc_needed && has_callback_methods_) {
1204*cc02d7e2SAndroid Build Coastguard Worker unimplemented_service_ = std::make_unique<grpc::CallbackGenericService>();
1205*cc02d7e2SAndroid Build Coastguard Worker RegisterCallbackGenericService(unimplemented_service_.get());
1206*cc02d7e2SAndroid Build Coastguard Worker unknown_rpc_needed = false;
1207*cc02d7e2SAndroid Build Coastguard Worker }
1208*cc02d7e2SAndroid Build Coastguard Worker if (unknown_rpc_needed && !sync_req_mgrs_.empty()) {
1209*cc02d7e2SAndroid Build Coastguard Worker sync_req_mgrs_[0]->AddUnknownSyncMethod();
1210*cc02d7e2SAndroid Build Coastguard Worker unknown_rpc_needed = false;
1211*cc02d7e2SAndroid Build Coastguard Worker }
1212*cc02d7e2SAndroid Build Coastguard Worker
1213*cc02d7e2SAndroid Build Coastguard Worker grpc_server_start(server_);
1214*cc02d7e2SAndroid Build Coastguard Worker
1215*cc02d7e2SAndroid Build Coastguard Worker if (unknown_rpc_needed) {
1216*cc02d7e2SAndroid Build Coastguard Worker for (size_t i = 0; i < num_cqs; i++) {
1217*cc02d7e2SAndroid Build Coastguard Worker if (cqs[i]->IsFrequentlyPolled()) {
1218*cc02d7e2SAndroid Build Coastguard Worker new UnimplementedAsyncRequest(this, cqs[i]);
1219*cc02d7e2SAndroid Build Coastguard Worker }
1220*cc02d7e2SAndroid Build Coastguard Worker }
1221*cc02d7e2SAndroid Build Coastguard Worker unknown_rpc_needed = false;
1222*cc02d7e2SAndroid Build Coastguard Worker }
1223*cc02d7e2SAndroid Build Coastguard Worker
1224*cc02d7e2SAndroid Build Coastguard Worker // If this server has any support for synchronous methods (has any sync
1225*cc02d7e2SAndroid Build Coastguard Worker // server CQs), make sure that we have a ResourceExhausted handler
1226*cc02d7e2SAndroid Build Coastguard Worker // to deal with the case of thread exhaustion
1227*cc02d7e2SAndroid Build Coastguard Worker if (sync_server_cqs_ != nullptr && !sync_server_cqs_->empty()) {
1228*cc02d7e2SAndroid Build Coastguard Worker resource_exhausted_handler_ =
1229*cc02d7e2SAndroid Build Coastguard Worker std::make_unique<grpc::internal::ResourceExhaustedHandler>(
1230*cc02d7e2SAndroid Build Coastguard Worker kServerThreadpoolExhausted);
1231*cc02d7e2SAndroid Build Coastguard Worker }
1232*cc02d7e2SAndroid Build Coastguard Worker
1233*cc02d7e2SAndroid Build Coastguard Worker for (const auto& value : sync_req_mgrs_) {
1234*cc02d7e2SAndroid Build Coastguard Worker value->Start();
1235*cc02d7e2SAndroid Build Coastguard Worker }
1236*cc02d7e2SAndroid Build Coastguard Worker
1237*cc02d7e2SAndroid Build Coastguard Worker for (auto& acceptor : acceptors_) {
1238*cc02d7e2SAndroid Build Coastguard Worker acceptor->Start();
1239*cc02d7e2SAndroid Build Coastguard Worker }
1240*cc02d7e2SAndroid Build Coastguard Worker }
1241*cc02d7e2SAndroid Build Coastguard Worker
ShutdownInternal(gpr_timespec deadline)1242*cc02d7e2SAndroid Build Coastguard Worker void Server::ShutdownInternal(gpr_timespec deadline) {
1243*cc02d7e2SAndroid Build Coastguard Worker grpc::internal::MutexLock lock(&mu_);
1244*cc02d7e2SAndroid Build Coastguard Worker if (shutdown_) {
1245*cc02d7e2SAndroid Build Coastguard Worker return;
1246*cc02d7e2SAndroid Build Coastguard Worker }
1247*cc02d7e2SAndroid Build Coastguard Worker
1248*cc02d7e2SAndroid Build Coastguard Worker shutdown_ = true;
1249*cc02d7e2SAndroid Build Coastguard Worker
1250*cc02d7e2SAndroid Build Coastguard Worker for (auto& acceptor : acceptors_) {
1251*cc02d7e2SAndroid Build Coastguard Worker acceptor->Shutdown();
1252*cc02d7e2SAndroid Build Coastguard Worker }
1253*cc02d7e2SAndroid Build Coastguard Worker
1254*cc02d7e2SAndroid Build Coastguard Worker /// The completion queue to use for server shutdown completion notification
1255*cc02d7e2SAndroid Build Coastguard Worker grpc::CompletionQueue shutdown_cq;
1256*cc02d7e2SAndroid Build Coastguard Worker grpc::ShutdownTag shutdown_tag; // Phony shutdown tag
1257*cc02d7e2SAndroid Build Coastguard Worker grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag);
1258*cc02d7e2SAndroid Build Coastguard Worker
1259*cc02d7e2SAndroid Build Coastguard Worker shutdown_cq.Shutdown();
1260*cc02d7e2SAndroid Build Coastguard Worker
1261*cc02d7e2SAndroid Build Coastguard Worker void* tag;
1262*cc02d7e2SAndroid Build Coastguard Worker bool ok;
1263*cc02d7e2SAndroid Build Coastguard Worker grpc::CompletionQueue::NextStatus status =
1264*cc02d7e2SAndroid Build Coastguard Worker shutdown_cq.AsyncNext(&tag, &ok, deadline);
1265*cc02d7e2SAndroid Build Coastguard Worker
1266*cc02d7e2SAndroid Build Coastguard Worker // If this timed out, it means we are done with the grace period for a clean
1267*cc02d7e2SAndroid Build Coastguard Worker // shutdown. We should force a shutdown now by cancelling all inflight calls
1268*cc02d7e2SAndroid Build Coastguard Worker if (status == grpc::CompletionQueue::NextStatus::TIMEOUT) {
1269*cc02d7e2SAndroid Build Coastguard Worker grpc_server_cancel_all_calls(server_);
1270*cc02d7e2SAndroid Build Coastguard Worker status =
1271*cc02d7e2SAndroid Build Coastguard Worker shutdown_cq.AsyncNext(&tag, &ok, gpr_inf_future(GPR_CLOCK_MONOTONIC));
1272*cc02d7e2SAndroid Build Coastguard Worker }
1273*cc02d7e2SAndroid Build Coastguard Worker // Else in case of SHUTDOWN or GOT_EVENT, it means that the server has
1274*cc02d7e2SAndroid Build Coastguard Worker // successfully shutdown
1275*cc02d7e2SAndroid Build Coastguard Worker
1276*cc02d7e2SAndroid Build Coastguard Worker // Drop the shutdown ref and wait for all other refs to drop as well.
1277*cc02d7e2SAndroid Build Coastguard Worker UnrefAndWaitLocked();
1278*cc02d7e2SAndroid Build Coastguard Worker
1279*cc02d7e2SAndroid Build Coastguard Worker // Shutdown all ThreadManagers. This will try to gracefully stop all the
1280*cc02d7e2SAndroid Build Coastguard Worker // threads in the ThreadManagers (once they process any inflight requests)
1281*cc02d7e2SAndroid Build Coastguard Worker for (const auto& value : sync_req_mgrs_) {
1282*cc02d7e2SAndroid Build Coastguard Worker value->Shutdown(); // ThreadManager's Shutdown()
1283*cc02d7e2SAndroid Build Coastguard Worker }
1284*cc02d7e2SAndroid Build Coastguard Worker
1285*cc02d7e2SAndroid Build Coastguard Worker // Wait for threads in all ThreadManagers to terminate
1286*cc02d7e2SAndroid Build Coastguard Worker for (const auto& value : sync_req_mgrs_) {
1287*cc02d7e2SAndroid Build Coastguard Worker value->Wait();
1288*cc02d7e2SAndroid Build Coastguard Worker }
1289*cc02d7e2SAndroid Build Coastguard Worker
1290*cc02d7e2SAndroid Build Coastguard Worker // Shutdown the callback CQ. The CQ is owned by its own shutdown tag, so it
1291*cc02d7e2SAndroid Build Coastguard Worker // will delete itself at true shutdown.
1292*cc02d7e2SAndroid Build Coastguard Worker CompletionQueue* callback_cq = callback_cq_.load(std::memory_order_relaxed);
1293*cc02d7e2SAndroid Build Coastguard Worker if (callback_cq != nullptr) {
1294*cc02d7e2SAndroid Build Coastguard Worker if (grpc_iomgr_run_in_background()) {
1295*cc02d7e2SAndroid Build Coastguard Worker // gRPC-core provides the backing needed for the preferred CQ type
1296*cc02d7e2SAndroid Build Coastguard Worker callback_cq->Shutdown();
1297*cc02d7e2SAndroid Build Coastguard Worker } else {
1298*cc02d7e2SAndroid Build Coastguard Worker CompletionQueue::ReleaseCallbackAlternativeCQ(callback_cq);
1299*cc02d7e2SAndroid Build Coastguard Worker }
1300*cc02d7e2SAndroid Build Coastguard Worker callback_cq_.store(nullptr, std::memory_order_release);
1301*cc02d7e2SAndroid Build Coastguard Worker }
1302*cc02d7e2SAndroid Build Coastguard Worker
1303*cc02d7e2SAndroid Build Coastguard Worker // Drain the shutdown queue (if the previous call to AsyncNext() timed out
1304*cc02d7e2SAndroid Build Coastguard Worker // and we didn't remove the tag from the queue yet)
1305*cc02d7e2SAndroid Build Coastguard Worker while (shutdown_cq.Next(&tag, &ok)) {
1306*cc02d7e2SAndroid Build Coastguard Worker // Nothing to be done here. Just ignore ok and tag values
1307*cc02d7e2SAndroid Build Coastguard Worker }
1308*cc02d7e2SAndroid Build Coastguard Worker
1309*cc02d7e2SAndroid Build Coastguard Worker shutdown_notified_ = true;
1310*cc02d7e2SAndroid Build Coastguard Worker shutdown_cv_.SignalAll();
1311*cc02d7e2SAndroid Build Coastguard Worker
1312*cc02d7e2SAndroid Build Coastguard Worker #ifndef NDEBUG
1313*cc02d7e2SAndroid Build Coastguard Worker // Unregister this server with the CQs passed into it by the user so that
1314*cc02d7e2SAndroid Build Coastguard Worker // those can be checked for properly-ordered shutdown.
1315*cc02d7e2SAndroid Build Coastguard Worker for (auto* cq : cq_list_) {
1316*cc02d7e2SAndroid Build Coastguard Worker cq->UnregisterServer(this);
1317*cc02d7e2SAndroid Build Coastguard Worker }
1318*cc02d7e2SAndroid Build Coastguard Worker cq_list_.clear();
1319*cc02d7e2SAndroid Build Coastguard Worker #endif
1320*cc02d7e2SAndroid Build Coastguard Worker }
1321*cc02d7e2SAndroid Build Coastguard Worker
Wait()1322*cc02d7e2SAndroid Build Coastguard Worker void Server::Wait() {
1323*cc02d7e2SAndroid Build Coastguard Worker grpc::internal::MutexLock lock(&mu_);
1324*cc02d7e2SAndroid Build Coastguard Worker while (started_ && !shutdown_notified_) {
1325*cc02d7e2SAndroid Build Coastguard Worker shutdown_cv_.Wait(&mu_);
1326*cc02d7e2SAndroid Build Coastguard Worker }
1327*cc02d7e2SAndroid Build Coastguard Worker }
1328*cc02d7e2SAndroid Build Coastguard Worker
PerformOpsOnCall(grpc::internal::CallOpSetInterface * ops,grpc::internal::Call * call)1329*cc02d7e2SAndroid Build Coastguard Worker void Server::PerformOpsOnCall(grpc::internal::CallOpSetInterface* ops,
1330*cc02d7e2SAndroid Build Coastguard Worker grpc::internal::Call* call) {
1331*cc02d7e2SAndroid Build Coastguard Worker ops->FillOps(call);
1332*cc02d7e2SAndroid Build Coastguard Worker }
1333*cc02d7e2SAndroid Build Coastguard Worker
FinalizeResult(void ** tag,bool * status)1334*cc02d7e2SAndroid Build Coastguard Worker bool Server::UnimplementedAsyncRequest::FinalizeResult(void** tag,
1335*cc02d7e2SAndroid Build Coastguard Worker bool* status) {
1336*cc02d7e2SAndroid Build Coastguard Worker if (GenericAsyncRequest::FinalizeResult(tag, status)) {
1337*cc02d7e2SAndroid Build Coastguard Worker // We either had no interceptors run or we are done intercepting
1338*cc02d7e2SAndroid Build Coastguard Worker if (*status) {
1339*cc02d7e2SAndroid Build Coastguard Worker // Create a new request/response pair using the server and CQ values
1340*cc02d7e2SAndroid Build Coastguard Worker // stored in this object's base class.
1341*cc02d7e2SAndroid Build Coastguard Worker new UnimplementedAsyncRequest(server_, notification_cq_);
1342*cc02d7e2SAndroid Build Coastguard Worker new UnimplementedAsyncResponse(this);
1343*cc02d7e2SAndroid Build Coastguard Worker } else {
1344*cc02d7e2SAndroid Build Coastguard Worker delete this;
1345*cc02d7e2SAndroid Build Coastguard Worker }
1346*cc02d7e2SAndroid Build Coastguard Worker } else {
1347*cc02d7e2SAndroid Build Coastguard Worker // The tag was swallowed due to interception. We will see it again.
1348*cc02d7e2SAndroid Build Coastguard Worker }
1349*cc02d7e2SAndroid Build Coastguard Worker return false;
1350*cc02d7e2SAndroid Build Coastguard Worker }
1351*cc02d7e2SAndroid Build Coastguard Worker
UnimplementedAsyncResponse(UnimplementedAsyncRequest * request)1352*cc02d7e2SAndroid Build Coastguard Worker Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse(
1353*cc02d7e2SAndroid Build Coastguard Worker UnimplementedAsyncRequest* request)
1354*cc02d7e2SAndroid Build Coastguard Worker : request_(request) {
1355*cc02d7e2SAndroid Build Coastguard Worker grpc::Status status(grpc::StatusCode::UNIMPLEMENTED, kUnknownRpcMethod);
1356*cc02d7e2SAndroid Build Coastguard Worker grpc::internal::UnknownMethodHandler::FillOps(request_->context(),
1357*cc02d7e2SAndroid Build Coastguard Worker kUnknownRpcMethod, this);
1358*cc02d7e2SAndroid Build Coastguard Worker request_->stream()->call_.PerformOps(this);
1359*cc02d7e2SAndroid Build Coastguard Worker }
1360*cc02d7e2SAndroid Build Coastguard Worker
initializer()1361*cc02d7e2SAndroid Build Coastguard Worker grpc::ServerInitializer* Server::initializer() {
1362*cc02d7e2SAndroid Build Coastguard Worker return server_initializer_.get();
1363*cc02d7e2SAndroid Build Coastguard Worker }
1364*cc02d7e2SAndroid Build Coastguard Worker
CallbackCQ()1365*cc02d7e2SAndroid Build Coastguard Worker grpc::CompletionQueue* Server::CallbackCQ() {
1366*cc02d7e2SAndroid Build Coastguard Worker // TODO(vjpai): Consider using a single global CQ for the default CQ
1367*cc02d7e2SAndroid Build Coastguard Worker // if there is no explicit per-server CQ registered
1368*cc02d7e2SAndroid Build Coastguard Worker CompletionQueue* callback_cq = callback_cq_.load(std::memory_order_acquire);
1369*cc02d7e2SAndroid Build Coastguard Worker if (callback_cq != nullptr) {
1370*cc02d7e2SAndroid Build Coastguard Worker return callback_cq;
1371*cc02d7e2SAndroid Build Coastguard Worker }
1372*cc02d7e2SAndroid Build Coastguard Worker // The callback_cq_ wasn't already set, so grab a lock and set it up exactly
1373*cc02d7e2SAndroid Build Coastguard Worker // once for this server.
1374*cc02d7e2SAndroid Build Coastguard Worker grpc::internal::MutexLock l(&mu_);
1375*cc02d7e2SAndroid Build Coastguard Worker callback_cq = callback_cq_.load(std::memory_order_relaxed);
1376*cc02d7e2SAndroid Build Coastguard Worker if (callback_cq != nullptr) {
1377*cc02d7e2SAndroid Build Coastguard Worker return callback_cq;
1378*cc02d7e2SAndroid Build Coastguard Worker }
1379*cc02d7e2SAndroid Build Coastguard Worker if (grpc_iomgr_run_in_background()) {
1380*cc02d7e2SAndroid Build Coastguard Worker // gRPC-core provides the backing needed for the preferred CQ type
1381*cc02d7e2SAndroid Build Coastguard Worker auto* shutdown_callback = new grpc::ShutdownCallback;
1382*cc02d7e2SAndroid Build Coastguard Worker callback_cq = new grpc::CompletionQueue(grpc_completion_queue_attributes{
1383*cc02d7e2SAndroid Build Coastguard Worker GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING,
1384*cc02d7e2SAndroid Build Coastguard Worker shutdown_callback});
1385*cc02d7e2SAndroid Build Coastguard Worker
1386*cc02d7e2SAndroid Build Coastguard Worker // Transfer ownership of the new cq to its own shutdown callback
1387*cc02d7e2SAndroid Build Coastguard Worker shutdown_callback->TakeCQ(callback_cq);
1388*cc02d7e2SAndroid Build Coastguard Worker } else {
1389*cc02d7e2SAndroid Build Coastguard Worker // Otherwise we need to use the alternative CQ variant
1390*cc02d7e2SAndroid Build Coastguard Worker callback_cq = CompletionQueue::CallbackAlternativeCQ();
1391*cc02d7e2SAndroid Build Coastguard Worker }
1392*cc02d7e2SAndroid Build Coastguard Worker
1393*cc02d7e2SAndroid Build Coastguard Worker callback_cq_.store(callback_cq, std::memory_order_release);
1394*cc02d7e2SAndroid Build Coastguard Worker return callback_cq;
1395*cc02d7e2SAndroid Build Coastguard Worker }
1396*cc02d7e2SAndroid Build Coastguard Worker
1397*cc02d7e2SAndroid Build Coastguard Worker } // namespace grpc
1398