1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <assert.h>
20
21 #include <atomic>
22 #include <cstdlib>
23 #include <functional>
24 #include <map>
25 #include <memory>
26 #include <new>
27 #include <string>
28 #include <utility>
29 #include <vector>
30
31 #include "absl/strings/str_format.h"
32 #include "absl/strings/string_view.h"
33
34 #include <grpc/compression.h>
35 #include <grpc/grpc.h>
36 #include <grpc/impl/compression_types.h>
37 #include <grpc/load_reporting.h>
38 #include <grpc/status.h>
39 #include <grpc/support/alloc.h>
40 #include <grpc/support/log.h>
41 #include <grpc/support/time.h>
42 #include <grpcpp/completion_queue.h>
43 #include <grpcpp/ext/call_metric_recorder.h>
44 #include <grpcpp/ext/server_metric_recorder.h>
45 #include <grpcpp/impl/call.h>
46 #include <grpcpp/impl/call_op_set.h>
47 #include <grpcpp/impl/call_op_set_interface.h>
48 #include <grpcpp/impl/completion_queue_tag.h>
49 #include <grpcpp/impl/interceptor_common.h>
50 #include <grpcpp/impl/metadata_map.h>
51 #include <grpcpp/server_context.h>
52 #include <grpcpp/support/callback_common.h>
53 #include <grpcpp/support/interceptor.h>
54 #include <grpcpp/support/server_callback.h>
55 #include <grpcpp/support/server_interceptor.h>
56 #include <grpcpp/support/string_ref.h>
57
58 #include "src/core/lib/channel/context.h"
59 #include "src/core/lib/gprpp/crash.h"
60 #include "src/core/lib/gprpp/ref_counted.h"
61 #include "src/core/lib/gprpp/sync.h"
62 #include "src/core/lib/resource_quota/arena.h"
63 #include "src/core/lib/surface/call.h"
64 #include "src/cpp/server/backend_metric_recorder.h"
65
66 namespace grpc {
67
68 // CompletionOp
69
70 class ServerContextBase::CompletionOp final
71 : public internal::CallOpSetInterface {
72 public:
73 // initial refs: one in the server context, one in the cq
74 // must ref the call before calling constructor and after deleting this
CompletionOp(internal::Call * call,grpc::internal::ServerCallbackCall * callback_controller)75 CompletionOp(internal::Call* call,
76 grpc::internal::ServerCallbackCall* callback_controller)
77 : call_(*call),
78 callback_controller_(callback_controller),
79 has_tag_(false),
80 tag_(nullptr),
81 core_cq_tag_(this),
82 refs_(2),
83 finalized_(false),
84 cancelled_(0),
85 done_intercepting_(false) {}
86
87 // CompletionOp isn't copyable or movable
88 CompletionOp(const CompletionOp&) = delete;
89 CompletionOp& operator=(const CompletionOp&) = delete;
90 CompletionOp(CompletionOp&&) = delete;
91 CompletionOp& operator=(CompletionOp&&) = delete;
92
~CompletionOp()93 ~CompletionOp() override {
94 if (call_.server_rpc_info()) {
95 call_.server_rpc_info()->Unref();
96 }
97 }
98
99 void FillOps(internal::Call* call) override;
100
101 // This should always be arena allocated in the call, so override delete.
102 // But this class is not trivially destructible, so must actually call delete
103 // before allowing the arena to be freed
operator delete(void *,std::size_t size)104 static void operator delete(void* /*ptr*/, std::size_t size) {
105 // Use size to avoid unused-parameter warning since assert seems to be
106 // compiled out and treated as unused in some gcc optimized versions.
107 (void)size;
108 assert(size == sizeof(CompletionOp));
109 }
110
111 // This operator should never be called as the memory should be freed as part
112 // of the arena destruction. It only exists to provide a matching operator
113 // delete to the operator new so that some compilers will not complain (see
114 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
115 // there are no tests catching the compiler warning.
operator delete(void *,void *)116 static void operator delete(void*, void*) { assert(0); }
117
118 bool FinalizeResult(void** tag, bool* status) override;
119
CheckCancelled(CompletionQueue * cq)120 bool CheckCancelled(CompletionQueue* cq) {
121 cq->TryPluck(this);
122 return CheckCancelledNoPluck();
123 }
CheckCancelledAsync()124 bool CheckCancelledAsync() { return CheckCancelledNoPluck(); }
125
set_tag(void * tag)126 void set_tag(void* tag) {
127 has_tag_ = true;
128 tag_ = tag;
129 }
130
set_core_cq_tag(void * core_cq_tag)131 void set_core_cq_tag(void* core_cq_tag) { core_cq_tag_ = core_cq_tag; }
132
core_cq_tag()133 void* core_cq_tag() override { return core_cq_tag_; }
134
135 void Unref();
136
137 // This will be called while interceptors are run if the RPC is a hijacked
138 // RPC. This should set hijacking state for each of the ops.
SetHijackingState()139 void SetHijackingState() override {
140 // Servers don't allow hijacking
141 grpc_core::Crash("unreachable");
142 }
143
144 // Should be called after interceptors are done running
ContinueFillOpsAfterInterception()145 void ContinueFillOpsAfterInterception() override {}
146
147 // Should be called after interceptors are done running on the finalize result
148 // path
ContinueFinalizeResultAfterInterception()149 void ContinueFinalizeResultAfterInterception() override {
150 done_intercepting_ = true;
151 if (!has_tag_) {
152 // We don't have a tag to return.
153 Unref();
154 // Unref can delete this, so do not access anything from this afterward.
155 return;
156 }
157 // Start a phony op so that we can return the tag
158 GPR_ASSERT(grpc_call_start_batch(call_.call(), nullptr, 0, core_cq_tag_,
159 nullptr) == GRPC_CALL_OK);
160 }
161
162 private:
CheckCancelledNoPluck()163 bool CheckCancelledNoPluck() {
164 grpc_core::MutexLock lock(&mu_);
165 return finalized_ ? (cancelled_ != 0) : false;
166 }
167
168 internal::Call call_;
169 grpc::internal::ServerCallbackCall* const callback_controller_;
170 bool has_tag_;
171 void* tag_;
172 void* core_cq_tag_;
173 grpc_core::RefCount refs_;
174 grpc_core::Mutex mu_;
175 bool finalized_;
176 int cancelled_; // This is an int (not bool) because it is passed to core
177 bool done_intercepting_;
178 internal::InterceptorBatchMethodsImpl interceptor_methods_;
179 };
180
Unref()181 void ServerContextBase::CompletionOp::Unref() {
182 if (refs_.Unref()) {
183 grpc_call* call = call_.call();
184 delete this;
185 grpc_call_unref(call);
186 }
187 }
188
FillOps(internal::Call * call)189 void ServerContextBase::CompletionOp::FillOps(internal::Call* call) {
190 grpc_op ops;
191 ops.op = GRPC_OP_RECV_CLOSE_ON_SERVER;
192 ops.data.recv_close_on_server.cancelled = &cancelled_;
193 ops.flags = 0;
194 ops.reserved = nullptr;
195 interceptor_methods_.SetCall(&call_);
196 interceptor_methods_.SetReverse();
197 interceptor_methods_.SetCallOpSetInterface(this);
198 // The following call_start_batch is internally-generated so no need for an
199 // explanatory log on failure.
200 GPR_ASSERT(grpc_call_start_batch(call->call(), &ops, 1, core_cq_tag_,
201 nullptr) == GRPC_CALL_OK);
202 // No interceptors to run here
203 }
204
FinalizeResult(void ** tag,bool * status)205 bool ServerContextBase::CompletionOp::FinalizeResult(void** tag, bool* status) {
206 // Decide whether to do the unref or call the cancel callback within the lock
207 bool do_unref = false;
208 bool has_tag = false;
209 bool call_cancel = false;
210
211 {
212 grpc_core::MutexLock lock(&mu_);
213 if (done_intercepting_) {
214 // We are done intercepting.
215 has_tag = has_tag_;
216 if (has_tag) {
217 *tag = tag_;
218 }
219 // Release the lock before unreffing as Unref may delete this object
220 do_unref = true;
221 } else {
222 finalized_ = true;
223
224 // If for some reason the incoming status is false, mark that as a
225 // cancellation.
226 // TODO(vjpai): does this ever happen?
227 if (!*status) {
228 cancelled_ = 1;
229 }
230
231 call_cancel = (cancelled_ != 0);
232 // Release the lock since we may call a callback and interceptors.
233 }
234 }
235
236 if (do_unref) {
237 Unref();
238 // Unref can delete this, so do not access anything from this afterward.
239 return has_tag;
240 }
241 if (call_cancel && callback_controller_ != nullptr) {
242 callback_controller_->MaybeCallOnCancel();
243 }
244 // Add interception point and run through interceptors
245 interceptor_methods_.AddInterceptionHookPoint(
246 experimental::InterceptionHookPoints::POST_RECV_CLOSE);
247 if (interceptor_methods_.RunInterceptors()) {
248 // No interceptors were run
249 bool has_tag = has_tag_;
250 if (has_tag) {
251 *tag = tag_;
252 }
253 Unref();
254 // Unref can delete this, so do not access anything from this afterward.
255 return has_tag;
256 }
257 // There are interceptors to be run. Return false for now.
258 return false;
259 }
260
261 // ServerContextBase body
262
ServerContextBase()263 ServerContextBase::ServerContextBase()
264 : deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)) {}
265
ServerContextBase(gpr_timespec deadline,grpc_metadata_array * arr)266 ServerContextBase::ServerContextBase(gpr_timespec deadline,
267 grpc_metadata_array* arr)
268 : deadline_(deadline) {
269 std::swap(*client_metadata_.arr(), *arr);
270 }
271
BindDeadlineAndMetadata(gpr_timespec deadline,grpc_metadata_array * arr)272 void ServerContextBase::BindDeadlineAndMetadata(gpr_timespec deadline,
273 grpc_metadata_array* arr) {
274 deadline_ = deadline;
275 std::swap(*client_metadata_.arr(), *arr);
276 }
277
~ServerContextBase()278 ServerContextBase::~ServerContextBase() {
279 if (completion_op_) {
280 completion_op_->Unref();
281 // Unref can delete completion_op_, so do not access it afterward.
282 }
283 if (rpc_info_) {
284 rpc_info_->Unref();
285 }
286 if (default_reactor_used_.load(std::memory_order_relaxed)) {
287 reinterpret_cast<Reactor*>(&default_reactor_)->~Reactor();
288 }
289 if (call_metric_recorder_ != nullptr) {
290 call_metric_recorder_->~CallMetricRecorder();
291 }
292 }
293
~CallWrapper()294 ServerContextBase::CallWrapper::~CallWrapper() {
295 if (call) {
296 // If the ServerContext is part of the call's arena, this could free the
297 // object itself.
298 grpc_call_unref(call);
299 }
300 }
301
BeginCompletionOp(internal::Call * call,std::function<void (bool)> callback,grpc::internal::ServerCallbackCall * callback_controller)302 void ServerContextBase::BeginCompletionOp(
303 internal::Call* call, std::function<void(bool)> callback,
304 grpc::internal::ServerCallbackCall* callback_controller) {
305 GPR_ASSERT(!completion_op_);
306 if (rpc_info_) {
307 rpc_info_->Ref();
308 }
309 grpc_call_ref(call->call());
310 completion_op_ =
311 new (grpc_call_arena_alloc(call->call(), sizeof(CompletionOp)))
312 CompletionOp(call, callback_controller);
313 if (callback_controller != nullptr) {
314 completion_tag_.Set(call->call(), std::move(callback), completion_op_,
315 true);
316 completion_op_->set_core_cq_tag(&completion_tag_);
317 completion_op_->set_tag(completion_op_);
318 } else if (has_notify_when_done_tag_) {
319 completion_op_->set_tag(async_notify_when_done_tag_);
320 }
321 call->PerformOps(completion_op_);
322 }
323
GetCompletionOpTag()324 internal::CompletionQueueTag* ServerContextBase::GetCompletionOpTag() {
325 return static_cast<internal::CompletionQueueTag*>(completion_op_);
326 }
327
AddInitialMetadata(const std::string & key,const std::string & value)328 void ServerContextBase::AddInitialMetadata(const std::string& key,
329 const std::string& value) {
330 initial_metadata_.insert(std::make_pair(key, value));
331 }
332
AddTrailingMetadata(const std::string & key,const std::string & value)333 void ServerContextBase::AddTrailingMetadata(const std::string& key,
334 const std::string& value) {
335 trailing_metadata_.insert(std::make_pair(key, value));
336 }
337
TryCancel() const338 void ServerContextBase::TryCancel() const {
339 internal::CancelInterceptorBatchMethods cancel_methods;
340 if (rpc_info_) {
341 for (size_t i = 0; i < rpc_info_->interceptors_.size(); i++) {
342 rpc_info_->RunInterceptor(&cancel_methods, i);
343 }
344 }
345 grpc_call_error err =
346 grpc_call_cancel_with_status(call_.call, GRPC_STATUS_CANCELLED,
347 "Cancelled on the server side", nullptr);
348 if (err != GRPC_CALL_OK) {
349 gpr_log(GPR_ERROR, "TryCancel failed with: %d", err);
350 }
351 }
352
IsCancelled() const353 bool ServerContextBase::IsCancelled() const {
354 if (completion_tag_) {
355 // When using callback API, this result is always valid.
356 return marked_cancelled_.load(std::memory_order_acquire) ||
357 completion_op_->CheckCancelledAsync();
358 } else if (has_notify_when_done_tag_) {
359 // When using async API, the result is only valid
360 // if the tag has already been delivered at the completion queue
361 return completion_op_ && completion_op_->CheckCancelledAsync();
362 } else {
363 // when using sync API, the result is always valid
364 return marked_cancelled_.load(std::memory_order_acquire) ||
365 (completion_op_ && completion_op_->CheckCancelled(cq_));
366 }
367 }
368
set_compression_algorithm(grpc_compression_algorithm algorithm)369 void ServerContextBase::set_compression_algorithm(
370 grpc_compression_algorithm algorithm) {
371 compression_algorithm_ = algorithm;
372 const char* algorithm_name = nullptr;
373 if (!grpc_compression_algorithm_name(algorithm, &algorithm_name)) {
374 grpc_core::Crash(absl::StrFormat(
375 "Name for compression algorithm '%d' unknown.", algorithm));
376 }
377 GPR_ASSERT(algorithm_name != nullptr);
378 AddInitialMetadata(GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY, algorithm_name);
379 }
380
peer() const381 std::string ServerContextBase::peer() const {
382 std::string peer;
383 if (call_.call) {
384 char* c_peer = grpc_call_get_peer(call_.call);
385 peer = c_peer;
386 gpr_free(c_peer);
387 }
388 return peer;
389 }
390
census_context() const391 const struct census_context* ServerContextBase::census_context() const {
392 return call_.call == nullptr ? nullptr
393 : grpc_census_call_get_context(call_.call);
394 }
395
SetLoadReportingCosts(const std::vector<std::string> & cost_data)396 void ServerContextBase::SetLoadReportingCosts(
397 const std::vector<std::string>& cost_data) {
398 if (call_.call == nullptr) return;
399 for (const auto& cost_datum : cost_data) {
400 AddTrailingMetadata(GRPC_LB_COST_MD_KEY, cost_datum);
401 }
402 }
403
CreateCallMetricRecorder(experimental::ServerMetricRecorder * server_metric_recorder)404 void ServerContextBase::CreateCallMetricRecorder(
405 experimental::ServerMetricRecorder* server_metric_recorder) {
406 if (call_.call == nullptr) return;
407 GPR_ASSERT(call_metric_recorder_ == nullptr);
408 grpc_core::Arena* arena = grpc_call_get_arena(call_.call);
409 auto* backend_metric_state =
410 arena->New<BackendMetricState>(server_metric_recorder);
411 call_metric_recorder_ = backend_metric_state;
412 grpc_call_context_set(call_.call, GRPC_CONTEXT_BACKEND_METRIC_PROVIDER,
413 backend_metric_state, nullptr);
414 }
415
ExperimentalGetAuthority() const416 grpc::string_ref ServerContextBase::ExperimentalGetAuthority() const {
417 absl::string_view authority = grpc_call_server_authority(call_.call);
418 return grpc::string_ref(authority.data(), authority.size());
419 }
420
421 } // namespace grpc
422