xref: /aosp_15_r20/external/grpc-grpc/src/cpp/server/server_context.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <assert.h>
20 
21 #include <atomic>
22 #include <cstdlib>
23 #include <functional>
24 #include <map>
25 #include <memory>
26 #include <new>
27 #include <string>
28 #include <utility>
29 #include <vector>
30 
31 #include "absl/strings/str_format.h"
32 #include "absl/strings/string_view.h"
33 
34 #include <grpc/compression.h>
35 #include <grpc/grpc.h>
36 #include <grpc/impl/compression_types.h>
37 #include <grpc/load_reporting.h>
38 #include <grpc/status.h>
39 #include <grpc/support/alloc.h>
40 #include <grpc/support/log.h>
41 #include <grpc/support/time.h>
42 #include <grpcpp/completion_queue.h>
43 #include <grpcpp/ext/call_metric_recorder.h>
44 #include <grpcpp/ext/server_metric_recorder.h>
45 #include <grpcpp/impl/call.h>
46 #include <grpcpp/impl/call_op_set.h>
47 #include <grpcpp/impl/call_op_set_interface.h>
48 #include <grpcpp/impl/completion_queue_tag.h>
49 #include <grpcpp/impl/interceptor_common.h>
50 #include <grpcpp/impl/metadata_map.h>
51 #include <grpcpp/server_context.h>
52 #include <grpcpp/support/callback_common.h>
53 #include <grpcpp/support/interceptor.h>
54 #include <grpcpp/support/server_callback.h>
55 #include <grpcpp/support/server_interceptor.h>
56 #include <grpcpp/support/string_ref.h>
57 
58 #include "src/core/lib/channel/context.h"
59 #include "src/core/lib/gprpp/crash.h"
60 #include "src/core/lib/gprpp/ref_counted.h"
61 #include "src/core/lib/gprpp/sync.h"
62 #include "src/core/lib/resource_quota/arena.h"
63 #include "src/core/lib/surface/call.h"
64 #include "src/cpp/server/backend_metric_recorder.h"
65 
66 namespace grpc {
67 
68 // CompletionOp
69 
70 class ServerContextBase::CompletionOp final
71     : public internal::CallOpSetInterface {
72  public:
73   // initial refs: one in the server context, one in the cq
74   // must ref the call before calling constructor and after deleting this
CompletionOp(internal::Call * call,grpc::internal::ServerCallbackCall * callback_controller)75   CompletionOp(internal::Call* call,
76                grpc::internal::ServerCallbackCall* callback_controller)
77       : call_(*call),
78         callback_controller_(callback_controller),
79         has_tag_(false),
80         tag_(nullptr),
81         core_cq_tag_(this),
82         refs_(2),
83         finalized_(false),
84         cancelled_(0),
85         done_intercepting_(false) {}
86 
87   // CompletionOp isn't copyable or movable
88   CompletionOp(const CompletionOp&) = delete;
89   CompletionOp& operator=(const CompletionOp&) = delete;
90   CompletionOp(CompletionOp&&) = delete;
91   CompletionOp& operator=(CompletionOp&&) = delete;
92 
~CompletionOp()93   ~CompletionOp() override {
94     if (call_.server_rpc_info()) {
95       call_.server_rpc_info()->Unref();
96     }
97   }
98 
99   void FillOps(internal::Call* call) override;
100 
101   // This should always be arena allocated in the call, so override delete.
102   // But this class is not trivially destructible, so must actually call delete
103   // before allowing the arena to be freed
operator delete(void *,std::size_t size)104   static void operator delete(void* /*ptr*/, std::size_t size) {
105     // Use size to avoid unused-parameter warning since assert seems to be
106     // compiled out and treated as unused in some gcc optimized versions.
107     (void)size;
108     assert(size == sizeof(CompletionOp));
109   }
110 
111   // This operator should never be called as the memory should be freed as part
112   // of the arena destruction. It only exists to provide a matching operator
113   // delete to the operator new so that some compilers will not complain (see
114   // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
115   // there are no tests catching the compiler warning.
operator delete(void *,void *)116   static void operator delete(void*, void*) { assert(0); }
117 
118   bool FinalizeResult(void** tag, bool* status) override;
119 
CheckCancelled(CompletionQueue * cq)120   bool CheckCancelled(CompletionQueue* cq) {
121     cq->TryPluck(this);
122     return CheckCancelledNoPluck();
123   }
CheckCancelledAsync()124   bool CheckCancelledAsync() { return CheckCancelledNoPluck(); }
125 
set_tag(void * tag)126   void set_tag(void* tag) {
127     has_tag_ = true;
128     tag_ = tag;
129   }
130 
set_core_cq_tag(void * core_cq_tag)131   void set_core_cq_tag(void* core_cq_tag) { core_cq_tag_ = core_cq_tag; }
132 
core_cq_tag()133   void* core_cq_tag() override { return core_cq_tag_; }
134 
135   void Unref();
136 
137   // This will be called while interceptors are run if the RPC is a hijacked
138   // RPC. This should set hijacking state for each of the ops.
SetHijackingState()139   void SetHijackingState() override {
140     // Servers don't allow hijacking
141     grpc_core::Crash("unreachable");
142   }
143 
144   // Should be called after interceptors are done running
ContinueFillOpsAfterInterception()145   void ContinueFillOpsAfterInterception() override {}
146 
147   // Should be called after interceptors are done running on the finalize result
148   // path
ContinueFinalizeResultAfterInterception()149   void ContinueFinalizeResultAfterInterception() override {
150     done_intercepting_ = true;
151     if (!has_tag_) {
152       // We don't have a tag to return.
153       Unref();
154       // Unref can delete this, so do not access anything from this afterward.
155       return;
156     }
157     // Start a phony op so that we can return the tag
158     GPR_ASSERT(grpc_call_start_batch(call_.call(), nullptr, 0, core_cq_tag_,
159                                      nullptr) == GRPC_CALL_OK);
160   }
161 
162  private:
CheckCancelledNoPluck()163   bool CheckCancelledNoPluck() {
164     grpc_core::MutexLock lock(&mu_);
165     return finalized_ ? (cancelled_ != 0) : false;
166   }
167 
168   internal::Call call_;
169   grpc::internal::ServerCallbackCall* const callback_controller_;
170   bool has_tag_;
171   void* tag_;
172   void* core_cq_tag_;
173   grpc_core::RefCount refs_;
174   grpc_core::Mutex mu_;
175   bool finalized_;
176   int cancelled_;  // This is an int (not bool) because it is passed to core
177   bool done_intercepting_;
178   internal::InterceptorBatchMethodsImpl interceptor_methods_;
179 };
180 
Unref()181 void ServerContextBase::CompletionOp::Unref() {
182   if (refs_.Unref()) {
183     grpc_call* call = call_.call();
184     delete this;
185     grpc_call_unref(call);
186   }
187 }
188 
FillOps(internal::Call * call)189 void ServerContextBase::CompletionOp::FillOps(internal::Call* call) {
190   grpc_op ops;
191   ops.op = GRPC_OP_RECV_CLOSE_ON_SERVER;
192   ops.data.recv_close_on_server.cancelled = &cancelled_;
193   ops.flags = 0;
194   ops.reserved = nullptr;
195   interceptor_methods_.SetCall(&call_);
196   interceptor_methods_.SetReverse();
197   interceptor_methods_.SetCallOpSetInterface(this);
198   // The following call_start_batch is internally-generated so no need for an
199   // explanatory log on failure.
200   GPR_ASSERT(grpc_call_start_batch(call->call(), &ops, 1, core_cq_tag_,
201                                    nullptr) == GRPC_CALL_OK);
202   // No interceptors to run here
203 }
204 
FinalizeResult(void ** tag,bool * status)205 bool ServerContextBase::CompletionOp::FinalizeResult(void** tag, bool* status) {
206   // Decide whether to do the unref or call the cancel callback within the lock
207   bool do_unref = false;
208   bool has_tag = false;
209   bool call_cancel = false;
210 
211   {
212     grpc_core::MutexLock lock(&mu_);
213     if (done_intercepting_) {
214       // We are done intercepting.
215       has_tag = has_tag_;
216       if (has_tag) {
217         *tag = tag_;
218       }
219       // Release the lock before unreffing as Unref may delete this object
220       do_unref = true;
221     } else {
222       finalized_ = true;
223 
224       // If for some reason the incoming status is false, mark that as a
225       // cancellation.
226       // TODO(vjpai): does this ever happen?
227       if (!*status) {
228         cancelled_ = 1;
229       }
230 
231       call_cancel = (cancelled_ != 0);
232       // Release the lock since we may call a callback and interceptors.
233     }
234   }
235 
236   if (do_unref) {
237     Unref();
238     // Unref can delete this, so do not access anything from this afterward.
239     return has_tag;
240   }
241   if (call_cancel && callback_controller_ != nullptr) {
242     callback_controller_->MaybeCallOnCancel();
243   }
244   // Add interception point and run through interceptors
245   interceptor_methods_.AddInterceptionHookPoint(
246       experimental::InterceptionHookPoints::POST_RECV_CLOSE);
247   if (interceptor_methods_.RunInterceptors()) {
248     // No interceptors were run
249     bool has_tag = has_tag_;
250     if (has_tag) {
251       *tag = tag_;
252     }
253     Unref();
254     // Unref can delete this, so do not access anything from this afterward.
255     return has_tag;
256   }
257   // There are interceptors to be run. Return false for now.
258   return false;
259 }
260 
261 // ServerContextBase body
262 
ServerContextBase()263 ServerContextBase::ServerContextBase()
264     : deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)) {}
265 
ServerContextBase(gpr_timespec deadline,grpc_metadata_array * arr)266 ServerContextBase::ServerContextBase(gpr_timespec deadline,
267                                      grpc_metadata_array* arr)
268     : deadline_(deadline) {
269   std::swap(*client_metadata_.arr(), *arr);
270 }
271 
BindDeadlineAndMetadata(gpr_timespec deadline,grpc_metadata_array * arr)272 void ServerContextBase::BindDeadlineAndMetadata(gpr_timespec deadline,
273                                                 grpc_metadata_array* arr) {
274   deadline_ = deadline;
275   std::swap(*client_metadata_.arr(), *arr);
276 }
277 
~ServerContextBase()278 ServerContextBase::~ServerContextBase() {
279   if (completion_op_) {
280     completion_op_->Unref();
281     // Unref can delete completion_op_, so do not access it afterward.
282   }
283   if (rpc_info_) {
284     rpc_info_->Unref();
285   }
286   if (default_reactor_used_.load(std::memory_order_relaxed)) {
287     reinterpret_cast<Reactor*>(&default_reactor_)->~Reactor();
288   }
289   if (call_metric_recorder_ != nullptr) {
290     call_metric_recorder_->~CallMetricRecorder();
291   }
292 }
293 
~CallWrapper()294 ServerContextBase::CallWrapper::~CallWrapper() {
295   if (call) {
296     // If the ServerContext is part of the call's arena, this could free the
297     // object itself.
298     grpc_call_unref(call);
299   }
300 }
301 
BeginCompletionOp(internal::Call * call,std::function<void (bool)> callback,grpc::internal::ServerCallbackCall * callback_controller)302 void ServerContextBase::BeginCompletionOp(
303     internal::Call* call, std::function<void(bool)> callback,
304     grpc::internal::ServerCallbackCall* callback_controller) {
305   GPR_ASSERT(!completion_op_);
306   if (rpc_info_) {
307     rpc_info_->Ref();
308   }
309   grpc_call_ref(call->call());
310   completion_op_ =
311       new (grpc_call_arena_alloc(call->call(), sizeof(CompletionOp)))
312           CompletionOp(call, callback_controller);
313   if (callback_controller != nullptr) {
314     completion_tag_.Set(call->call(), std::move(callback), completion_op_,
315                         true);
316     completion_op_->set_core_cq_tag(&completion_tag_);
317     completion_op_->set_tag(completion_op_);
318   } else if (has_notify_when_done_tag_) {
319     completion_op_->set_tag(async_notify_when_done_tag_);
320   }
321   call->PerformOps(completion_op_);
322 }
323 
GetCompletionOpTag()324 internal::CompletionQueueTag* ServerContextBase::GetCompletionOpTag() {
325   return static_cast<internal::CompletionQueueTag*>(completion_op_);
326 }
327 
AddInitialMetadata(const std::string & key,const std::string & value)328 void ServerContextBase::AddInitialMetadata(const std::string& key,
329                                            const std::string& value) {
330   initial_metadata_.insert(std::make_pair(key, value));
331 }
332 
AddTrailingMetadata(const std::string & key,const std::string & value)333 void ServerContextBase::AddTrailingMetadata(const std::string& key,
334                                             const std::string& value) {
335   trailing_metadata_.insert(std::make_pair(key, value));
336 }
337 
TryCancel() const338 void ServerContextBase::TryCancel() const {
339   internal::CancelInterceptorBatchMethods cancel_methods;
340   if (rpc_info_) {
341     for (size_t i = 0; i < rpc_info_->interceptors_.size(); i++) {
342       rpc_info_->RunInterceptor(&cancel_methods, i);
343     }
344   }
345   grpc_call_error err =
346       grpc_call_cancel_with_status(call_.call, GRPC_STATUS_CANCELLED,
347                                    "Cancelled on the server side", nullptr);
348   if (err != GRPC_CALL_OK) {
349     gpr_log(GPR_ERROR, "TryCancel failed with: %d", err);
350   }
351 }
352 
IsCancelled() const353 bool ServerContextBase::IsCancelled() const {
354   if (completion_tag_) {
355     // When using callback API, this result is always valid.
356     return marked_cancelled_.load(std::memory_order_acquire) ||
357            completion_op_->CheckCancelledAsync();
358   } else if (has_notify_when_done_tag_) {
359     // When using async API, the result is only valid
360     // if the tag has already been delivered at the completion queue
361     return completion_op_ && completion_op_->CheckCancelledAsync();
362   } else {
363     // when using sync API, the result is always valid
364     return marked_cancelled_.load(std::memory_order_acquire) ||
365            (completion_op_ && completion_op_->CheckCancelled(cq_));
366   }
367 }
368 
set_compression_algorithm(grpc_compression_algorithm algorithm)369 void ServerContextBase::set_compression_algorithm(
370     grpc_compression_algorithm algorithm) {
371   compression_algorithm_ = algorithm;
372   const char* algorithm_name = nullptr;
373   if (!grpc_compression_algorithm_name(algorithm, &algorithm_name)) {
374     grpc_core::Crash(absl::StrFormat(
375         "Name for compression algorithm '%d' unknown.", algorithm));
376   }
377   GPR_ASSERT(algorithm_name != nullptr);
378   AddInitialMetadata(GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY, algorithm_name);
379 }
380 
peer() const381 std::string ServerContextBase::peer() const {
382   std::string peer;
383   if (call_.call) {
384     char* c_peer = grpc_call_get_peer(call_.call);
385     peer = c_peer;
386     gpr_free(c_peer);
387   }
388   return peer;
389 }
390 
census_context() const391 const struct census_context* ServerContextBase::census_context() const {
392   return call_.call == nullptr ? nullptr
393                                : grpc_census_call_get_context(call_.call);
394 }
395 
SetLoadReportingCosts(const std::vector<std::string> & cost_data)396 void ServerContextBase::SetLoadReportingCosts(
397     const std::vector<std::string>& cost_data) {
398   if (call_.call == nullptr) return;
399   for (const auto& cost_datum : cost_data) {
400     AddTrailingMetadata(GRPC_LB_COST_MD_KEY, cost_datum);
401   }
402 }
403 
CreateCallMetricRecorder(experimental::ServerMetricRecorder * server_metric_recorder)404 void ServerContextBase::CreateCallMetricRecorder(
405     experimental::ServerMetricRecorder* server_metric_recorder) {
406   if (call_.call == nullptr) return;
407   GPR_ASSERT(call_metric_recorder_ == nullptr);
408   grpc_core::Arena* arena = grpc_call_get_arena(call_.call);
409   auto* backend_metric_state =
410       arena->New<BackendMetricState>(server_metric_recorder);
411   call_metric_recorder_ = backend_metric_state;
412   grpc_call_context_set(call_.call, GRPC_CONTEXT_BACKEND_METRIC_PROVIDER,
413                         backend_metric_state, nullptr);
414 }
415 
ExperimentalGetAuthority() const416 grpc::string_ref ServerContextBase::ExperimentalGetAuthority() const {
417   absl::string_view authority = grpc_call_server_authority(call_.call);
418   return grpc::string_ref(authority.data(), authority.size());
419 }
420 
421 }  // namespace grpc
422