1 // 2 // 3 // Copyright 2015 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 // 17 // 18 19 #ifndef GRPCPP_SERVER_INTERFACE_H 20 #define GRPCPP_SERVER_INTERFACE_H 21 22 #include <grpc/grpc.h> 23 #include <grpc/impl/grpc_types.h> 24 #include <grpc/support/log.h> 25 #include <grpc/support/port_platform.h> 26 #include <grpc/support/time.h> 27 #include <grpcpp/impl/call.h> 28 #include <grpcpp/impl/call_hook.h> 29 #include <grpcpp/impl/codegen/interceptor_common.h> 30 #include <grpcpp/impl/completion_queue_tag.h> 31 #include <grpcpp/impl/rpc_service_method.h> 32 #include <grpcpp/server_context.h> 33 #include <grpcpp/support/byte_buffer.h> 34 35 namespace grpc { 36 37 class AsyncGenericService; 38 class Channel; 39 class CompletionQueue; 40 class GenericServerContext; 41 class ServerCompletionQueue; 42 class ServerCredentials; 43 class Service; 44 45 /// Models a gRPC server. 46 /// 47 /// Servers are configured and started via \a grpc::ServerBuilder. 48 namespace internal { 49 class ServerAsyncStreamingInterface; 50 } // namespace internal 51 52 class CallbackGenericService; 53 54 namespace experimental { 55 class ServerInterceptorFactoryInterface; 56 class ServerMetricRecorder; 57 } // namespace experimental 58 59 class ServerInterface : public internal::CallHook { 60 public: ~ServerInterface()61 ~ServerInterface() override {} 62 63 /// \a Shutdown does the following things: 64 /// 65 /// 1. Shutdown the server: deactivate all listening ports, mark it in 66 /// "shutdown mode" so that further call Request's or incoming RPC matches 67 /// are no longer allowed. Also return all Request'ed-but-not-yet-active 68 /// calls as failed (!ok). This refers to calls that have been requested 69 /// at the server by the server-side library or application code but that 70 /// have not yet been matched to incoming RPCs from the client. Note that 71 /// this would even include default calls added automatically by the gRPC 72 /// C++ API without the user's input (e.g., "Unimplemented RPC method") 73 /// 74 /// 2. Block until all rpc method handlers invoked automatically by the sync 75 /// API finish. 76 /// 77 /// 3. If all pending calls complete (and all their operations are 78 /// retrieved by Next) before \a deadline expires, this finishes 79 /// gracefully. Otherwise, forcefully cancel all pending calls associated 80 /// with the server after \a deadline expires. In the case of the sync API, 81 /// if the RPC function for a streaming call has already been started and 82 /// takes a week to complete, the RPC function won't be forcefully 83 /// terminated (since that would leave state corrupt and incomplete) and 84 /// the method handler will just keep running (which will prevent the 85 /// server from completing the "join" operation that it needs to do at 86 /// shutdown time). 87 /// 88 /// All completion queue associated with the server (for example, for async 89 /// serving) must be shutdown *after* this method has returned: 90 /// See \a ServerBuilder::AddCompletionQueue for details. 91 /// They must also be drained (by repeated Next) after being shutdown. 92 /// 93 /// \param deadline How long to wait until pending rpcs are forcefully 94 /// terminated. 95 template <class T> Shutdown(const T & deadline)96 void Shutdown(const T& deadline) { 97 ShutdownInternal(TimePoint<T>(deadline).raw_time()); 98 } 99 100 /// Shutdown the server without a deadline and forced cancellation. 101 /// 102 /// All completion queue associated with the server (for example, for async 103 /// serving) must be shutdown *after* this method has returned: 104 /// See \a ServerBuilder::AddCompletionQueue for details. Shutdown()105 void Shutdown() { ShutdownInternal(gpr_inf_future(GPR_CLOCK_MONOTONIC)); } 106 107 /// Block waiting for all work to complete. 108 /// 109 /// \warning The server must be either shutting down or some other thread must 110 /// call \a Shutdown for this function to ever return. 111 virtual void Wait() = 0; 112 113 protected: 114 friend class grpc::Service; 115 116 /// Register a service. This call does not take ownership of the service. 117 /// The service must exist for the lifetime of the Server instance. 118 virtual bool RegisterService(const std::string* host, Service* service) = 0; 119 120 /// Register a generic service. This call does not take ownership of the 121 /// service. The service must exist for the lifetime of the Server instance. 122 virtual void RegisterAsyncGenericService(AsyncGenericService* service) = 0; 123 124 /// Register a callback generic service. This call does not take ownership of 125 /// the service. The service must exist for the lifetime of the Server 126 /// instance. May not be abstract since this is a post-1.0 API addition. 127 RegisterCallbackGenericService(CallbackGenericService *)128 virtual void RegisterCallbackGenericService(CallbackGenericService* 129 /*service*/) {} 130 131 /// Tries to bind \a server to the given \a addr. 132 /// 133 /// It can be invoked multiple times. 134 /// 135 /// \param addr The address to try to bind to the server (eg, localhost:1234, 136 /// 192.168.1.1:31416, [::1]:27182, etc.). 137 /// \params creds The credentials associated with the server. 138 /// 139 /// \return bound port number on success, 0 on failure. 140 /// 141 /// \warning It's an error to call this method on an already started server. 142 virtual int AddListeningPort(const std::string& addr, 143 ServerCredentials* creds) = 0; 144 145 /// Start the server. 146 /// 147 /// \param cqs Completion queues for handling asynchronous services. The 148 /// caller is required to keep all completion queues live until the server is 149 /// destroyed. 150 /// \param num_cqs How many completion queues does \a cqs hold. 151 virtual void Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) = 0; 152 153 virtual void ShutdownInternal(gpr_timespec deadline) = 0; 154 155 virtual int max_receive_message_size() const = 0; 156 157 virtual grpc_server* server() = 0; 158 159 void PerformOpsOnCall(internal::CallOpSetInterface* ops, 160 internal::Call* call) override = 0; 161 162 class BaseAsyncRequest : public internal::CompletionQueueTag { 163 public: 164 BaseAsyncRequest(ServerInterface* server, grpc::ServerContext* context, 165 internal::ServerAsyncStreamingInterface* stream, 166 grpc::CompletionQueue* call_cq, 167 grpc::ServerCompletionQueue* notification_cq, void* tag, 168 bool delete_on_finalize); 169 ~BaseAsyncRequest() override; 170 171 bool FinalizeResult(void** tag, bool* status) override; 172 173 private: 174 void ContinueFinalizeResultAfterInterception(); 175 176 protected: 177 ServerInterface* const server_; 178 grpc::ServerContext* const context_; 179 internal::ServerAsyncStreamingInterface* const stream_; 180 grpc::CompletionQueue* const call_cq_; 181 grpc::ServerCompletionQueue* const notification_cq_; 182 void* const tag_; 183 const bool delete_on_finalize_; 184 grpc_call* call_; 185 internal::Call call_wrapper_; 186 internal::InterceptorBatchMethodsImpl interceptor_methods_; 187 bool done_intercepting_; 188 bool call_metric_recording_enabled_; 189 experimental::ServerMetricRecorder* server_metric_recorder_; 190 }; 191 192 /// RegisteredAsyncRequest is not part of the C++ API 193 class RegisteredAsyncRequest : public BaseAsyncRequest { 194 public: 195 RegisteredAsyncRequest(ServerInterface* server, 196 grpc::ServerContext* context, 197 internal::ServerAsyncStreamingInterface* stream, 198 grpc::CompletionQueue* call_cq, 199 grpc::ServerCompletionQueue* notification_cq, 200 void* tag, const char* name, 201 internal::RpcMethod::RpcType type); 202 FinalizeResult(void ** tag,bool * status)203 bool FinalizeResult(void** tag, bool* status) override { 204 // If we are done intercepting, then there is nothing more for us to do 205 if (done_intercepting_) { 206 return BaseAsyncRequest::FinalizeResult(tag, status); 207 } 208 call_wrapper_ = grpc::internal::Call( 209 call_, server_, call_cq_, server_->max_receive_message_size(), 210 context_->set_server_rpc_info(name_, type_, 211 *server_->interceptor_creators())); 212 return BaseAsyncRequest::FinalizeResult(tag, status); 213 } 214 215 protected: 216 void IssueRequest(void* registered_method, grpc_byte_buffer** payload, 217 grpc::ServerCompletionQueue* notification_cq); 218 const char* name_; 219 const internal::RpcMethod::RpcType type_; 220 }; 221 222 class NoPayloadAsyncRequest final : public RegisteredAsyncRequest { 223 public: NoPayloadAsyncRequest(internal::RpcServiceMethod * registered_method,ServerInterface * server,grpc::ServerContext * context,internal::ServerAsyncStreamingInterface * stream,grpc::CompletionQueue * call_cq,grpc::ServerCompletionQueue * notification_cq,void * tag)224 NoPayloadAsyncRequest(internal::RpcServiceMethod* registered_method, 225 ServerInterface* server, grpc::ServerContext* context, 226 internal::ServerAsyncStreamingInterface* stream, 227 grpc::CompletionQueue* call_cq, 228 grpc::ServerCompletionQueue* notification_cq, 229 void* tag) 230 : RegisteredAsyncRequest( 231 server, context, stream, call_cq, notification_cq, tag, 232 registered_method->name(), registered_method->method_type()) { 233 IssueRequest(registered_method->server_tag(), nullptr, notification_cq); 234 } 235 236 // uses RegisteredAsyncRequest::FinalizeResult 237 }; 238 239 template <class Message> 240 class PayloadAsyncRequest final : public RegisteredAsyncRequest { 241 public: PayloadAsyncRequest(internal::RpcServiceMethod * registered_method,ServerInterface * server,grpc::ServerContext * context,internal::ServerAsyncStreamingInterface * stream,grpc::CompletionQueue * call_cq,grpc::ServerCompletionQueue * notification_cq,void * tag,Message * request)242 PayloadAsyncRequest(internal::RpcServiceMethod* registered_method, 243 ServerInterface* server, grpc::ServerContext* context, 244 internal::ServerAsyncStreamingInterface* stream, 245 grpc::CompletionQueue* call_cq, 246 grpc::ServerCompletionQueue* notification_cq, void* tag, 247 Message* request) 248 : RegisteredAsyncRequest( 249 server, context, stream, call_cq, notification_cq, tag, 250 registered_method->name(), registered_method->method_type()), 251 registered_method_(registered_method), 252 request_(request) { 253 IssueRequest(registered_method->server_tag(), payload_.bbuf_ptr(), 254 notification_cq); 255 } 256 ~PayloadAsyncRequest()257 ~PayloadAsyncRequest() override { 258 payload_.Release(); // We do not own the payload_ 259 } 260 FinalizeResult(void ** tag,bool * status)261 bool FinalizeResult(void** tag, bool* status) override { 262 // If we are done intercepting, then there is nothing more for us to do 263 if (done_intercepting_) { 264 return RegisteredAsyncRequest::FinalizeResult(tag, status); 265 } 266 if (*status) { 267 if (!payload_.Valid() || !SerializationTraits<Message>::Deserialize( 268 payload_.bbuf_ptr(), request_) 269 .ok()) { 270 // If deserialization fails, we cancel the call and instantiate 271 // a new instance of ourselves to request another call. We then 272 // return false, which prevents the call from being returned to 273 // the application. 274 grpc_call_cancel_with_status(call_, GRPC_STATUS_INTERNAL, 275 "Unable to parse request", nullptr); 276 grpc_call_unref(call_); 277 new PayloadAsyncRequest(registered_method_, server_, context_, 278 stream_, call_cq_, notification_cq_, tag_, 279 request_); 280 delete this; 281 return false; 282 } 283 } 284 // Set interception point for recv message 285 interceptor_methods_.AddInterceptionHookPoint( 286 experimental::InterceptionHookPoints::POST_RECV_MESSAGE); 287 interceptor_methods_.SetRecvMessage(request_, nullptr); 288 return RegisteredAsyncRequest::FinalizeResult(tag, status); 289 } 290 291 private: 292 internal::RpcServiceMethod* const registered_method_; 293 Message* const request_; 294 ByteBuffer payload_; 295 }; 296 297 class GenericAsyncRequest : public BaseAsyncRequest { 298 public: 299 GenericAsyncRequest(ServerInterface* server, GenericServerContext* context, 300 internal::ServerAsyncStreamingInterface* stream, 301 grpc::CompletionQueue* call_cq, 302 grpc::ServerCompletionQueue* notification_cq, void* tag, 303 bool delete_on_finalize, bool issue_request = true); 304 305 bool FinalizeResult(void** tag, bool* status) override; 306 307 protected: 308 void IssueRequest(); 309 310 private: 311 grpc_call_details call_details_; 312 }; 313 314 template <class Message> RequestAsyncCall(internal::RpcServiceMethod * method,grpc::ServerContext * context,internal::ServerAsyncStreamingInterface * stream,grpc::CompletionQueue * call_cq,grpc::ServerCompletionQueue * notification_cq,void * tag,Message * message)315 void RequestAsyncCall(internal::RpcServiceMethod* method, 316 grpc::ServerContext* context, 317 internal::ServerAsyncStreamingInterface* stream, 318 grpc::CompletionQueue* call_cq, 319 grpc::ServerCompletionQueue* notification_cq, void* tag, 320 Message* message) { 321 GPR_ASSERT(method); 322 new PayloadAsyncRequest<Message>(method, this, context, stream, call_cq, 323 notification_cq, tag, message); 324 } 325 RequestAsyncCall(internal::RpcServiceMethod * method,grpc::ServerContext * context,internal::ServerAsyncStreamingInterface * stream,grpc::CompletionQueue * call_cq,grpc::ServerCompletionQueue * notification_cq,void * tag)326 void RequestAsyncCall(internal::RpcServiceMethod* method, 327 grpc::ServerContext* context, 328 internal::ServerAsyncStreamingInterface* stream, 329 grpc::CompletionQueue* call_cq, 330 grpc::ServerCompletionQueue* notification_cq, 331 void* tag) { 332 GPR_ASSERT(method); 333 new NoPayloadAsyncRequest(method, this, context, stream, call_cq, 334 notification_cq, tag); 335 } 336 RequestAsyncGenericCall(GenericServerContext * context,internal::ServerAsyncStreamingInterface * stream,grpc::CompletionQueue * call_cq,grpc::ServerCompletionQueue * notification_cq,void * tag)337 void RequestAsyncGenericCall(GenericServerContext* context, 338 internal::ServerAsyncStreamingInterface* stream, 339 grpc::CompletionQueue* call_cq, 340 grpc::ServerCompletionQueue* notification_cq, 341 void* tag) { 342 new GenericAsyncRequest(this, context, stream, call_cq, notification_cq, 343 tag, true); 344 } 345 346 private: 347 // EXPERIMENTAL 348 // Getter method for the vector of interceptor factory objects. 349 // Returns a nullptr (rather than being pure) since this is a post-1.0 method 350 // and adding a new pure method to an interface would be a breaking change 351 // (even though this is private and non-API) 352 virtual std::vector< 353 std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>* interceptor_creators()354 interceptor_creators() { 355 return nullptr; 356 } 357 358 // Whether per-call load reporting is enabled. 359 virtual bool call_metric_recording_enabled() const = 0; 360 361 // Interface to read or update server-wide metrics. Returns null when not set. 362 virtual experimental::ServerMetricRecorder* server_metric_recorder() 363 const = 0; 364 365 // A method to get the callbackable completion queue associated with this 366 // server. If the return value is nullptr, this server doesn't support 367 // callback operations. 368 // TODO(vjpai): Consider a better default like using a global CQ 369 // Returns nullptr (rather than being pure) since this is a post-1.0 method 370 // and adding a new pure method to an interface would be a breaking change 371 // (even though this is private and non-API) CallbackCQ()372 virtual grpc::CompletionQueue* CallbackCQ() { return nullptr; } 373 }; 374 375 } // namespace grpc 376 377 #endif // GRPCPP_SERVER_INTERFACE_H 378