1 // 2 // 3 // Copyright 2015 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 // 17 // 18 19 #ifndef GRPCPP_SERVER_H 20 #define GRPCPP_SERVER_H 21 22 #include <list> 23 #include <memory> 24 #include <vector> 25 26 #include <grpc/compression.h> 27 #include <grpc/support/atm.h> 28 #include <grpc/support/port_platform.h> 29 #include <grpcpp/channel.h> 30 #include <grpcpp/completion_queue.h> 31 #include <grpcpp/health_check_service_interface.h> 32 #include <grpcpp/impl/call.h> 33 #include <grpcpp/impl/grpc_library.h> 34 #include <grpcpp/impl/rpc_service_method.h> 35 #include <grpcpp/security/server_credentials.h> 36 #include <grpcpp/server_interface.h> 37 #include <grpcpp/support/channel_arguments.h> 38 #include <grpcpp/support/client_interceptor.h> 39 #include <grpcpp/support/config.h> 40 #include <grpcpp/support/status.h> 41 42 struct grpc_server; 43 44 namespace grpc { 45 class AsyncGenericService; 46 class ServerContext; 47 class ServerInitializer; 48 49 namespace internal { 50 class ExternalConnectionAcceptorImpl; 51 } // namespace internal 52 53 /// Represents a gRPC server. 54 /// 55 /// Use a \a grpc::ServerBuilder to create, configure, and start 56 /// \a Server instances. 57 class Server : public ServerInterface, private internal::GrpcLibrary { 58 public: 59 ~Server() ABSL_LOCKS_EXCLUDED(mu_) override; 60 61 /// Block until the server shuts down. 62 /// 63 /// \warning The server must be either shutting down or some other thread must 64 /// call \a Shutdown for this function to ever return. 65 void Wait() ABSL_LOCKS_EXCLUDED(mu_) override; 66 67 /// Global callbacks are a set of hooks that are called when server 68 /// events occur. \a SetGlobalCallbacks method is used to register 69 /// the hooks with gRPC. Note that 70 /// the \a GlobalCallbacks instance will be shared among all 71 /// \a Server instances in an application and can be set exactly 72 /// once per application. 73 class GlobalCallbacks { 74 public: ~GlobalCallbacks()75 virtual ~GlobalCallbacks() {} 76 /// Called before server is created. UpdateArguments(ChannelArguments *)77 virtual void UpdateArguments(ChannelArguments* /*args*/) {} 78 /// Called before application callback for each synchronous server request 79 virtual void PreSynchronousRequest(ServerContext* context) = 0; 80 /// Called after application callback for each synchronous server request 81 virtual void PostSynchronousRequest(ServerContext* context) = 0; 82 /// Called before server is started. PreServerStart(Server *)83 virtual void PreServerStart(Server* /*server*/) {} 84 /// Called after a server port is added. AddPort(Server *,const std::string &,ServerCredentials *,int)85 virtual void AddPort(Server* /*server*/, const std::string& /*addr*/, 86 ServerCredentials* /*creds*/, int /*port*/) {} 87 }; 88 /// Set the global callback object. Can only be called once per application. 89 /// Does not take ownership of callbacks, and expects the pointed to object 90 /// to be alive until all server objects in the process have been destroyed. 91 /// The same \a GlobalCallbacks object will be used throughout the 92 /// application and is shared among all \a Server objects. 93 static void SetGlobalCallbacks(GlobalCallbacks* callbacks); 94 95 /// Returns a \em raw pointer to the underlying \a grpc_server instance. 96 /// EXPERIMENTAL: for internal/test use only 97 grpc_server* c_server(); 98 99 /// Returns the health check service. GetHealthCheckService()100 HealthCheckServiceInterface* GetHealthCheckService() const { 101 return health_check_service_.get(); 102 } 103 104 /// Establish a channel for in-process communication 105 std::shared_ptr<Channel> InProcessChannel(const ChannelArguments& args); 106 107 /// NOTE: class experimental_type is not part of the public API of this class. 108 /// TODO(yashykt): Integrate into public API when this is no longer 109 /// experimental. 110 class experimental_type { 111 public: experimental_type(Server * server)112 explicit experimental_type(Server* server) : server_(server) {} 113 114 /// Establish a channel for in-process communication with client 115 /// interceptors 116 std::shared_ptr<Channel> InProcessChannelWithInterceptors( 117 const ChannelArguments& args, 118 std::vector< 119 std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> 120 interceptor_creators); 121 122 private: 123 Server* server_; 124 }; 125 126 /// NOTE: The function experimental() is not stable public API. It is a view 127 /// to the experimental components of this class. It may be changed or removed 128 /// at any time. experimental()129 experimental_type experimental() { return experimental_type(this); } 130 131 protected: 132 /// Register a service. This call does not take ownership of the service. 133 /// The service must exist for the lifetime of the Server instance. 134 bool RegisterService(const std::string* addr, Service* service) override; 135 136 /// Try binding the server to the given \a addr endpoint 137 /// (port, and optionally including IP address to bind to). 138 /// 139 /// It can be invoked multiple times. Should be used before 140 /// starting the server. 141 /// 142 /// \param addr The address to try to bind to the server (eg, localhost:1234, 143 /// 192.168.1.1:31416, [::1]:27182, etc.). 144 /// \param creds The credentials associated with the server. 145 /// 146 /// \return bound port number on success, 0 on failure. 147 /// 148 /// \warning It is an error to call this method on an already started server. 149 int AddListeningPort(const std::string& addr, 150 ServerCredentials* creds) override; 151 152 /// NOTE: This is *NOT* a public API. The server constructors are supposed to 153 /// be used by \a ServerBuilder class only. The constructor will be made 154 /// 'private' very soon. 155 /// 156 /// Server constructors. To be used by \a ServerBuilder only. 157 /// 158 /// \param args The channel args 159 /// 160 /// \param sync_server_cqs The completion queues to use if the server is a 161 /// synchronous server (or a hybrid server). The server polls for new RPCs on 162 /// these queues 163 /// 164 /// \param min_pollers The minimum number of polling threads per server 165 /// completion queue (in param sync_server_cqs) to use for listening to 166 /// incoming requests (used only in case of sync server) 167 /// 168 /// \param max_pollers The maximum number of polling threads per server 169 /// completion queue (in param sync_server_cqs) to use for listening to 170 /// incoming requests (used only in case of sync server) 171 /// 172 /// \param sync_cq_timeout_msec The timeout to use when calling AsyncNext() on 173 /// server completion queues passed via sync_server_cqs param. 174 Server(ChannelArguments* args, 175 std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>> 176 sync_server_cqs, 177 int min_pollers, int max_pollers, int sync_cq_timeout_msec, 178 std::vector<std::shared_ptr<internal::ExternalConnectionAcceptorImpl>> 179 acceptors, 180 grpc_server_config_fetcher* server_config_fetcher = nullptr, 181 grpc_resource_quota* server_rq = nullptr, 182 std::vector< 183 std::unique_ptr<experimental::ServerInterceptorFactoryInterface>> 184 interceptor_creators = std::vector<std::unique_ptr< 185 experimental::ServerInterceptorFactoryInterface>>(), 186 experimental::ServerMetricRecorder* server_metric_recorder = nullptr); 187 188 /// Start the server. 189 /// 190 /// \param cqs Completion queues for handling asynchronous services. The 191 /// caller is required to keep all completion queues live until the server is 192 /// destroyed. 193 /// \param num_cqs How many completion queues does \a cqs hold. 194 void Start(ServerCompletionQueue** cqs, size_t num_cqs) override; 195 server()196 grpc_server* server() override { return server_; } 197 198 /// NOTE: This method is not part of the public API for this class. set_health_check_service(std::unique_ptr<HealthCheckServiceInterface> service)199 void set_health_check_service( 200 std::unique_ptr<HealthCheckServiceInterface> service) { 201 health_check_service_ = std::move(service); 202 } 203 context_allocator()204 ContextAllocator* context_allocator() { return context_allocator_.get(); } 205 206 /// NOTE: This method is not part of the public API for this class. health_check_service_disabled()207 bool health_check_service_disabled() const { 208 return health_check_service_disabled_; 209 } 210 211 private: 212 std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>* interceptor_creators()213 interceptor_creators() override { 214 return &interceptor_creators_; 215 } 216 217 friend class AsyncGenericService; 218 friend class ServerBuilder; 219 friend class ServerInitializer; 220 221 class SyncRequest; 222 class CallbackRequestBase; 223 template <class ServerContextType> 224 class CallbackRequest; 225 class UnimplementedAsyncRequest; 226 class UnimplementedAsyncResponse; 227 228 /// SyncRequestThreadManager is an implementation of ThreadManager. This class 229 /// is responsible for polling for incoming RPCs and calling the RPC handlers. 230 /// This is only used in case of a Sync server (i.e a server exposing a sync 231 /// interface) 232 class SyncRequestThreadManager; 233 234 /// Register a generic service. This call does not take ownership of the 235 /// service. The service must exist for the lifetime of the Server instance. 236 void RegisterAsyncGenericService(AsyncGenericService* service) override; 237 238 /// Register a callback-based generic service. This call does not take 239 /// ownership of theservice. The service must exist for the lifetime of the 240 /// Server instance. 241 void RegisterCallbackGenericService(CallbackGenericService* service) override; 242 RegisterContextAllocator(std::unique_ptr<ContextAllocator> context_allocator)243 void RegisterContextAllocator( 244 std::unique_ptr<ContextAllocator> context_allocator) { 245 context_allocator_ = std::move(context_allocator); 246 } 247 248 void PerformOpsOnCall(internal::CallOpSetInterface* ops, 249 internal::Call* call) override; 250 251 void ShutdownInternal(gpr_timespec deadline) 252 ABSL_LOCKS_EXCLUDED(mu_) override; 253 max_receive_message_size()254 int max_receive_message_size() const override { 255 return max_receive_message_size_; 256 } 257 call_metric_recording_enabled()258 bool call_metric_recording_enabled() const override { 259 return call_metric_recording_enabled_; 260 } 261 server_metric_recorder()262 experimental::ServerMetricRecorder* server_metric_recorder() const override { 263 return server_metric_recorder_; 264 } 265 266 CompletionQueue* CallbackCQ() ABSL_LOCKS_EXCLUDED(mu_) override; 267 268 ServerInitializer* initializer(); 269 270 // Functions to manage the server shutdown ref count. Things that increase 271 // the ref count are the running state of the server (take a ref at start and 272 // drop it at shutdown) and each running callback RPC. 273 void Ref(); 274 void UnrefWithPossibleNotify() ABSL_LOCKS_EXCLUDED(mu_); 275 void UnrefAndWaitLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 276 277 std::vector<std::shared_ptr<internal::ExternalConnectionAcceptorImpl>> 278 acceptors_; 279 280 // A vector of interceptor factory objects. 281 // This should be destroyed after health_check_service_ and this requirement 282 // is satisfied by declaring interceptor_creators_ before 283 // health_check_service_. (C++ mandates that member objects be destroyed in 284 // the reverse order of initialization.) 285 std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>> 286 interceptor_creators_; 287 288 int max_receive_message_size_; 289 290 /// The following completion queues are ONLY used in case of Sync API 291 /// i.e. if the server has any services with sync methods. The server uses 292 /// these completion queues to poll for new RPCs 293 std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>> 294 sync_server_cqs_; 295 296 /// List of \a ThreadManager instances (one for each cq in 297 /// the \a sync_server_cqs) 298 std::vector<std::unique_ptr<SyncRequestThreadManager>> sync_req_mgrs_; 299 300 // Server status 301 internal::Mutex mu_; 302 bool started_; 303 bool shutdown_ ABSL_GUARDED_BY(mu_); 304 bool shutdown_notified_ 305 ABSL_GUARDED_BY(mu_); // Was notify called on the shutdown_cv_ 306 internal::CondVar shutdown_done_cv_; 307 bool shutdown_done_ ABSL_GUARDED_BY(mu_) = false; 308 std::atomic_int shutdown_refs_outstanding_{1}; 309 310 internal::CondVar shutdown_cv_; 311 312 std::shared_ptr<GlobalCallbacks> global_callbacks_; 313 314 std::vector<std::string> services_; 315 bool has_async_generic_service_ = false; 316 bool has_callback_generic_service_ = false; 317 bool has_callback_methods_ = false; 318 319 // Pointer to the wrapped grpc_server. 320 grpc_server* server_; 321 322 std::unique_ptr<ServerInitializer> server_initializer_; 323 324 std::unique_ptr<ContextAllocator> context_allocator_; 325 326 std::unique_ptr<HealthCheckServiceInterface> health_check_service_; 327 bool health_check_service_disabled_; 328 329 // When appropriate, use a default callback generic service to handle 330 // unimplemented methods 331 std::unique_ptr<CallbackGenericService> unimplemented_service_; 332 333 // A special handler for resource exhausted in sync case 334 std::unique_ptr<internal::MethodHandler> resource_exhausted_handler_; 335 336 // Handler for callback generic service, if any 337 std::unique_ptr<internal::MethodHandler> generic_handler_; 338 339 // callback_cq_ references the callbackable completion queue associated 340 // with this server (if any). It is set on the first call to CallbackCQ(). 341 // It is _not owned_ by the server; ownership belongs with its internal 342 // shutdown callback tag (invoked when the CQ is fully shutdown). 343 std::atomic<CompletionQueue*> callback_cq_{nullptr}; 344 345 // List of CQs passed in by user that must be Shutdown only after Server is 346 // Shutdown. Even though this is only used with NDEBUG, instantiate it in all 347 // cases since otherwise the size will be inconsistent. 348 std::vector<CompletionQueue*> cq_list_; 349 350 // Whetner per-call load reporting is enabled. 351 bool call_metric_recording_enabled_ = false; 352 353 // Interface to read or update server-wide metrics. Optional. 354 experimental::ServerMetricRecorder* server_metric_recorder_ = nullptr; 355 }; 356 357 } // namespace grpc 358 359 #endif // GRPCPP_SERVER_H 360