1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Reference implementation for health checking in gRPC Python.""" 15 16import collections 17import sys 18import threading 19 20import grpc 21from grpc_health.v1 import health_pb2 as _health_pb2 22from grpc_health.v1 import health_pb2_grpc as _health_pb2_grpc 23 24if sys.version_info[0] >= 3 and sys.version_info[1] >= 6: 25 # Exposes AsyncHealthServicer as public API. 26 from . import _async as aio # pylint: disable=unused-import 27 28# The service name of the health checking servicer. 29SERVICE_NAME = _health_pb2.DESCRIPTOR.services_by_name["Health"].full_name 30# The entry of overall health for the entire server. 31OVERALL_HEALTH = "" 32 33 34class _Watcher: 35 def __init__(self): 36 self._condition = threading.Condition() 37 self._responses = collections.deque() 38 self._open = True 39 40 def __iter__(self): 41 return self 42 43 def _next(self): 44 with self._condition: 45 while not self._responses and self._open: 46 self._condition.wait() 47 if self._responses: 48 return self._responses.popleft() 49 else: 50 raise StopIteration() 51 52 def next(self): 53 return self._next() 54 55 def __next__(self): 56 return self._next() 57 58 def add(self, response): 59 with self._condition: 60 self._responses.append(response) 61 self._condition.notify() 62 63 def close(self): 64 with self._condition: 65 self._open = False 66 self._condition.notify() 67 68 69def _watcher_to_send_response_callback_adapter(watcher): 70 def send_response_callback(response): 71 if response is None: 72 watcher.close() 73 else: 74 watcher.add(response) 75 76 return send_response_callback 77 78 79class HealthServicer(_health_pb2_grpc.HealthServicer): 80 """Servicer handling RPCs for service statuses.""" 81 82 def __init__( 83 self, experimental_non_blocking=True, experimental_thread_pool=None 84 ): 85 self._lock = threading.RLock() 86 self._server_status = {"": _health_pb2.HealthCheckResponse.SERVING} 87 self._send_response_callbacks = {} 88 self.Watch.__func__.experimental_non_blocking = ( 89 experimental_non_blocking 90 ) 91 self.Watch.__func__.experimental_thread_pool = experimental_thread_pool 92 self._gracefully_shutting_down = False 93 94 def _on_close_callback(self, send_response_callback, service): 95 def callback(): 96 with self._lock: 97 self._send_response_callbacks[service].remove( 98 send_response_callback 99 ) 100 send_response_callback(None) 101 102 return callback 103 104 def Check(self, request, context): 105 with self._lock: 106 status = self._server_status.get(request.service) 107 if status is None: 108 context.set_code(grpc.StatusCode.NOT_FOUND) 109 return _health_pb2.HealthCheckResponse() 110 else: 111 return _health_pb2.HealthCheckResponse(status=status) 112 113 # pylint: disable=arguments-differ 114 def Watch(self, request, context, send_response_callback=None): 115 blocking_watcher = None 116 if send_response_callback is None: 117 # The server does not support the experimental_non_blocking 118 # parameter. For backwards compatibility, return a blocking response 119 # generator. 120 blocking_watcher = _Watcher() 121 send_response_callback = _watcher_to_send_response_callback_adapter( 122 blocking_watcher 123 ) 124 service = request.service 125 with self._lock: 126 status = self._server_status.get(service) 127 if status is None: 128 status = ( 129 _health_pb2.HealthCheckResponse.SERVICE_UNKNOWN 130 ) # pylint: disable=no-member 131 send_response_callback( 132 _health_pb2.HealthCheckResponse(status=status) 133 ) 134 if service not in self._send_response_callbacks: 135 self._send_response_callbacks[service] = set() 136 self._send_response_callbacks[service].add(send_response_callback) 137 context.add_callback( 138 self._on_close_callback(send_response_callback, service) 139 ) 140 return blocking_watcher 141 142 def set(self, service, status): 143 """Sets the status of a service. 144 145 Args: 146 service: string, the name of the service. 147 status: HealthCheckResponse.status enum value indicating the status of 148 the service 149 """ 150 with self._lock: 151 if self._gracefully_shutting_down: 152 return 153 else: 154 self._server_status[service] = status 155 if service in self._send_response_callbacks: 156 for send_response_callback in self._send_response_callbacks[ 157 service 158 ]: 159 send_response_callback( 160 _health_pb2.HealthCheckResponse(status=status) 161 ) 162 163 def enter_graceful_shutdown(self): 164 """Permanently sets the status of all services to NOT_SERVING. 165 166 This should be invoked when the server is entering a graceful shutdown 167 period. After this method is invoked, future attempts to set the status 168 of a service will be ignored. 169 170 This is an EXPERIMENTAL API. 171 """ 172 with self._lock: 173 if self._gracefully_shutting_down: 174 return 175 else: 176 for service in self._server_status: 177 self.set( 178 service, _health_pb2.HealthCheckResponse.NOT_SERVING 179 ) # pylint: disable=no-member 180 self._gracefully_shutting_down = True 181