xref: /aosp_15_r20/external/grpc-grpc/include/grpcpp/server_interface.h (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPCPP_SERVER_INTERFACE_H
20 #define GRPCPP_SERVER_INTERFACE_H
21 
22 #include <grpc/grpc.h>
23 #include <grpc/impl/grpc_types.h>
24 #include <grpc/support/log.h>
25 #include <grpc/support/port_platform.h>
26 #include <grpc/support/time.h>
27 #include <grpcpp/impl/call.h>
28 #include <grpcpp/impl/call_hook.h>
29 #include <grpcpp/impl/codegen/interceptor_common.h>
30 #include <grpcpp/impl/completion_queue_tag.h>
31 #include <grpcpp/impl/rpc_service_method.h>
32 #include <grpcpp/server_context.h>
33 #include <grpcpp/support/byte_buffer.h>
34 
35 namespace grpc {
36 
37 class AsyncGenericService;
38 class Channel;
39 class CompletionQueue;
40 class GenericServerContext;
41 class ServerCompletionQueue;
42 class ServerCredentials;
43 class Service;
44 
45 /// Models a gRPC server.
46 ///
47 /// Servers are configured and started via \a grpc::ServerBuilder.
48 namespace internal {
49 class ServerAsyncStreamingInterface;
50 }  // namespace internal
51 
52 class CallbackGenericService;
53 
54 namespace experimental {
55 class ServerInterceptorFactoryInterface;
56 class ServerMetricRecorder;
57 }  // namespace experimental
58 
59 class ServerInterface : public internal::CallHook {
60  public:
~ServerInterface()61   ~ServerInterface() override {}
62 
63   /// \a Shutdown does the following things:
64   ///
65   /// 1. Shutdown the server: deactivate all listening ports, mark it in
66   ///    "shutdown mode" so that further call Request's or incoming RPC matches
67   ///    are no longer allowed. Also return all Request'ed-but-not-yet-active
68   ///    calls as failed (!ok). This refers to calls that have been requested
69   ///    at the server by the server-side library or application code but that
70   ///    have not yet been matched to incoming RPCs from the client. Note that
71   ///    this would even include default calls added automatically by the gRPC
72   ///    C++ API without the user's input (e.g., "Unimplemented RPC method")
73   ///
74   /// 2. Block until all rpc method handlers invoked automatically by the sync
75   ///    API finish.
76   ///
77   /// 3. If all pending calls complete (and all their operations are
78   ///    retrieved by Next) before \a deadline expires, this finishes
79   ///    gracefully. Otherwise, forcefully cancel all pending calls associated
80   ///    with the server after \a deadline expires. In the case of the sync API,
81   ///    if the RPC function for a streaming call has already been started and
82   ///    takes a week to complete, the RPC function won't be forcefully
83   ///    terminated (since that would leave state corrupt and incomplete) and
84   ///    the method handler will just keep running (which will prevent the
85   ///    server from completing the "join" operation that it needs to do at
86   ///    shutdown time).
87   ///
88   /// All completion queue associated with the server (for example, for async
89   /// serving) must be shutdown *after* this method has returned:
90   /// See \a ServerBuilder::AddCompletionQueue for details.
91   /// They must also be drained (by repeated Next) after being shutdown.
92   ///
93   /// \param deadline How long to wait until pending rpcs are forcefully
94   /// terminated.
95   template <class T>
Shutdown(const T & deadline)96   void Shutdown(const T& deadline) {
97     ShutdownInternal(TimePoint<T>(deadline).raw_time());
98   }
99 
100   /// Shutdown the server without a deadline and forced cancellation.
101   ///
102   /// All completion queue associated with the server (for example, for async
103   /// serving) must be shutdown *after* this method has returned:
104   /// See \a ServerBuilder::AddCompletionQueue for details.
Shutdown()105   void Shutdown() { ShutdownInternal(gpr_inf_future(GPR_CLOCK_MONOTONIC)); }
106 
107   /// Block waiting for all work to complete.
108   ///
109   /// \warning The server must be either shutting down or some other thread must
110   /// call \a Shutdown for this function to ever return.
111   virtual void Wait() = 0;
112 
113  protected:
114   friend class grpc::Service;
115 
116   /// Register a service. This call does not take ownership of the service.
117   /// The service must exist for the lifetime of the Server instance.
118   virtual bool RegisterService(const std::string* host, Service* service) = 0;
119 
120   /// Register a generic service. This call does not take ownership of the
121   /// service. The service must exist for the lifetime of the Server instance.
122   virtual void RegisterAsyncGenericService(AsyncGenericService* service) = 0;
123 
124   /// Register a callback generic service. This call does not take ownership of
125   /// the  service. The service must exist for the lifetime of the Server
126   /// instance. May not be abstract since this is a post-1.0 API addition.
127 
RegisterCallbackGenericService(CallbackGenericService *)128   virtual void RegisterCallbackGenericService(CallbackGenericService*
129                                               /*service*/) {}
130 
131   /// Tries to bind \a server to the given \a addr.
132   ///
133   /// It can be invoked multiple times.
134   ///
135   /// \param addr The address to try to bind to the server (eg, localhost:1234,
136   /// 192.168.1.1:31416, [::1]:27182, etc.).
137   /// \params creds The credentials associated with the server.
138   ///
139   /// \return bound port number on success, 0 on failure.
140   ///
141   /// \warning It's an error to call this method on an already started server.
142   virtual int AddListeningPort(const std::string& addr,
143                                ServerCredentials* creds) = 0;
144 
145   /// Start the server.
146   ///
147   /// \param cqs Completion queues for handling asynchronous services. The
148   /// caller is required to keep all completion queues live until the server is
149   /// destroyed.
150   /// \param num_cqs How many completion queues does \a cqs hold.
151   virtual void Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) = 0;
152 
153   virtual void ShutdownInternal(gpr_timespec deadline) = 0;
154 
155   virtual int max_receive_message_size() const = 0;
156 
157   virtual grpc_server* server() = 0;
158 
159   void PerformOpsOnCall(internal::CallOpSetInterface* ops,
160                         internal::Call* call) override = 0;
161 
162   class BaseAsyncRequest : public internal::CompletionQueueTag {
163    public:
164     BaseAsyncRequest(ServerInterface* server, grpc::ServerContext* context,
165                      internal::ServerAsyncStreamingInterface* stream,
166                      grpc::CompletionQueue* call_cq,
167                      grpc::ServerCompletionQueue* notification_cq, void* tag,
168                      bool delete_on_finalize);
169     ~BaseAsyncRequest() override;
170 
171     bool FinalizeResult(void** tag, bool* status) override;
172 
173    private:
174     void ContinueFinalizeResultAfterInterception();
175 
176    protected:
177     ServerInterface* const server_;
178     grpc::ServerContext* const context_;
179     internal::ServerAsyncStreamingInterface* const stream_;
180     grpc::CompletionQueue* const call_cq_;
181     grpc::ServerCompletionQueue* const notification_cq_;
182     void* const tag_;
183     const bool delete_on_finalize_;
184     grpc_call* call_;
185     internal::Call call_wrapper_;
186     internal::InterceptorBatchMethodsImpl interceptor_methods_;
187     bool done_intercepting_;
188     bool call_metric_recording_enabled_;
189     experimental::ServerMetricRecorder* server_metric_recorder_;
190   };
191 
192   /// RegisteredAsyncRequest is not part of the C++ API
193   class RegisteredAsyncRequest : public BaseAsyncRequest {
194    public:
195     RegisteredAsyncRequest(ServerInterface* server,
196                            grpc::ServerContext* context,
197                            internal::ServerAsyncStreamingInterface* stream,
198                            grpc::CompletionQueue* call_cq,
199                            grpc::ServerCompletionQueue* notification_cq,
200                            void* tag, const char* name,
201                            internal::RpcMethod::RpcType type);
202 
FinalizeResult(void ** tag,bool * status)203     bool FinalizeResult(void** tag, bool* status) override {
204       // If we are done intercepting, then there is nothing more for us to do
205       if (done_intercepting_) {
206         return BaseAsyncRequest::FinalizeResult(tag, status);
207       }
208       call_wrapper_ = grpc::internal::Call(
209           call_, server_, call_cq_, server_->max_receive_message_size(),
210           context_->set_server_rpc_info(name_, type_,
211                                         *server_->interceptor_creators()));
212       return BaseAsyncRequest::FinalizeResult(tag, status);
213     }
214 
215    protected:
216     void IssueRequest(void* registered_method, grpc_byte_buffer** payload,
217                       grpc::ServerCompletionQueue* notification_cq);
218     const char* name_;
219     const internal::RpcMethod::RpcType type_;
220   };
221 
222   class NoPayloadAsyncRequest final : public RegisteredAsyncRequest {
223    public:
NoPayloadAsyncRequest(internal::RpcServiceMethod * registered_method,ServerInterface * server,grpc::ServerContext * context,internal::ServerAsyncStreamingInterface * stream,grpc::CompletionQueue * call_cq,grpc::ServerCompletionQueue * notification_cq,void * tag)224     NoPayloadAsyncRequest(internal::RpcServiceMethod* registered_method,
225                           ServerInterface* server, grpc::ServerContext* context,
226                           internal::ServerAsyncStreamingInterface* stream,
227                           grpc::CompletionQueue* call_cq,
228                           grpc::ServerCompletionQueue* notification_cq,
229                           void* tag)
230         : RegisteredAsyncRequest(
231               server, context, stream, call_cq, notification_cq, tag,
232               registered_method->name(), registered_method->method_type()) {
233       IssueRequest(registered_method->server_tag(), nullptr, notification_cq);
234     }
235 
236     // uses RegisteredAsyncRequest::FinalizeResult
237   };
238 
239   template <class Message>
240   class PayloadAsyncRequest final : public RegisteredAsyncRequest {
241    public:
PayloadAsyncRequest(internal::RpcServiceMethod * registered_method,ServerInterface * server,grpc::ServerContext * context,internal::ServerAsyncStreamingInterface * stream,grpc::CompletionQueue * call_cq,grpc::ServerCompletionQueue * notification_cq,void * tag,Message * request)242     PayloadAsyncRequest(internal::RpcServiceMethod* registered_method,
243                         ServerInterface* server, grpc::ServerContext* context,
244                         internal::ServerAsyncStreamingInterface* stream,
245                         grpc::CompletionQueue* call_cq,
246                         grpc::ServerCompletionQueue* notification_cq, void* tag,
247                         Message* request)
248         : RegisteredAsyncRequest(
249               server, context, stream, call_cq, notification_cq, tag,
250               registered_method->name(), registered_method->method_type()),
251           registered_method_(registered_method),
252           request_(request) {
253       IssueRequest(registered_method->server_tag(), payload_.bbuf_ptr(),
254                    notification_cq);
255     }
256 
~PayloadAsyncRequest()257     ~PayloadAsyncRequest() override {
258       payload_.Release();  // We do not own the payload_
259     }
260 
FinalizeResult(void ** tag,bool * status)261     bool FinalizeResult(void** tag, bool* status) override {
262       // If we are done intercepting, then there is nothing more for us to do
263       if (done_intercepting_) {
264         return RegisteredAsyncRequest::FinalizeResult(tag, status);
265       }
266       if (*status) {
267         if (!payload_.Valid() || !SerializationTraits<Message>::Deserialize(
268                                       payload_.bbuf_ptr(), request_)
269                                       .ok()) {
270           // If deserialization fails, we cancel the call and instantiate
271           // a new instance of ourselves to request another call.  We then
272           // return false, which prevents the call from being returned to
273           // the application.
274           grpc_call_cancel_with_status(call_, GRPC_STATUS_INTERNAL,
275                                        "Unable to parse request", nullptr);
276           grpc_call_unref(call_);
277           new PayloadAsyncRequest(registered_method_, server_, context_,
278                                   stream_, call_cq_, notification_cq_, tag_,
279                                   request_);
280           delete this;
281           return false;
282         }
283       }
284       // Set interception point for recv message
285       interceptor_methods_.AddInterceptionHookPoint(
286           experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
287       interceptor_methods_.SetRecvMessage(request_, nullptr);
288       return RegisteredAsyncRequest::FinalizeResult(tag, status);
289     }
290 
291    private:
292     internal::RpcServiceMethod* const registered_method_;
293     Message* const request_;
294     ByteBuffer payload_;
295   };
296 
297   class GenericAsyncRequest : public BaseAsyncRequest {
298    public:
299     GenericAsyncRequest(ServerInterface* server, GenericServerContext* context,
300                         internal::ServerAsyncStreamingInterface* stream,
301                         grpc::CompletionQueue* call_cq,
302                         grpc::ServerCompletionQueue* notification_cq, void* tag,
303                         bool delete_on_finalize, bool issue_request = true);
304 
305     bool FinalizeResult(void** tag, bool* status) override;
306 
307    protected:
308     void IssueRequest();
309 
310    private:
311     grpc_call_details call_details_;
312   };
313 
314   template <class Message>
RequestAsyncCall(internal::RpcServiceMethod * method,grpc::ServerContext * context,internal::ServerAsyncStreamingInterface * stream,grpc::CompletionQueue * call_cq,grpc::ServerCompletionQueue * notification_cq,void * tag,Message * message)315   void RequestAsyncCall(internal::RpcServiceMethod* method,
316                         grpc::ServerContext* context,
317                         internal::ServerAsyncStreamingInterface* stream,
318                         grpc::CompletionQueue* call_cq,
319                         grpc::ServerCompletionQueue* notification_cq, void* tag,
320                         Message* message) {
321     GPR_ASSERT(method);
322     new PayloadAsyncRequest<Message>(method, this, context, stream, call_cq,
323                                      notification_cq, tag, message);
324   }
325 
RequestAsyncCall(internal::RpcServiceMethod * method,grpc::ServerContext * context,internal::ServerAsyncStreamingInterface * stream,grpc::CompletionQueue * call_cq,grpc::ServerCompletionQueue * notification_cq,void * tag)326   void RequestAsyncCall(internal::RpcServiceMethod* method,
327                         grpc::ServerContext* context,
328                         internal::ServerAsyncStreamingInterface* stream,
329                         grpc::CompletionQueue* call_cq,
330                         grpc::ServerCompletionQueue* notification_cq,
331                         void* tag) {
332     GPR_ASSERT(method);
333     new NoPayloadAsyncRequest(method, this, context, stream, call_cq,
334                               notification_cq, tag);
335   }
336 
RequestAsyncGenericCall(GenericServerContext * context,internal::ServerAsyncStreamingInterface * stream,grpc::CompletionQueue * call_cq,grpc::ServerCompletionQueue * notification_cq,void * tag)337   void RequestAsyncGenericCall(GenericServerContext* context,
338                                internal::ServerAsyncStreamingInterface* stream,
339                                grpc::CompletionQueue* call_cq,
340                                grpc::ServerCompletionQueue* notification_cq,
341                                void* tag) {
342     new GenericAsyncRequest(this, context, stream, call_cq, notification_cq,
343                             tag, true);
344   }
345 
346  private:
347   // EXPERIMENTAL
348   // Getter method for the vector of interceptor factory objects.
349   // Returns a nullptr (rather than being pure) since this is a post-1.0 method
350   // and adding a new pure method to an interface would be a breaking change
351   // (even though this is private and non-API)
352   virtual std::vector<
353       std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>*
interceptor_creators()354   interceptor_creators() {
355     return nullptr;
356   }
357 
358   // Whether per-call load reporting is enabled.
359   virtual bool call_metric_recording_enabled() const = 0;
360 
361   // Interface to read or update server-wide metrics. Returns null when not set.
362   virtual experimental::ServerMetricRecorder* server_metric_recorder()
363       const = 0;
364 
365   // A method to get the callbackable completion queue associated with this
366   // server. If the return value is nullptr, this server doesn't support
367   // callback operations.
368   // TODO(vjpai): Consider a better default like using a global CQ
369   // Returns nullptr (rather than being pure) since this is a post-1.0 method
370   // and adding a new pure method to an interface would be a breaking change
371   // (even though this is private and non-API)
CallbackCQ()372   virtual grpc::CompletionQueue* CallbackCQ() { return nullptr; }
373 };
374 
375 }  // namespace grpc
376 
377 #endif  // GRPCPP_SERVER_INTERFACE_H
378