xref: /aosp_15_r20/external/grpc-grpc/src/python/grpcio_health_checking/grpc_health/v1/health.py (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Reference implementation for health checking in gRPC Python."""
15
16import collections
17import sys
18import threading
19
20import grpc
21from grpc_health.v1 import health_pb2 as _health_pb2
22from grpc_health.v1 import health_pb2_grpc as _health_pb2_grpc
23
24if sys.version_info[0] >= 3 and sys.version_info[1] >= 6:
25    # Exposes AsyncHealthServicer as public API.
26    from . import _async as aio  # pylint: disable=unused-import
27
28# The service name of the health checking servicer.
29SERVICE_NAME = _health_pb2.DESCRIPTOR.services_by_name["Health"].full_name
30# The entry of overall health for the entire server.
31OVERALL_HEALTH = ""
32
33
34class _Watcher:
35    def __init__(self):
36        self._condition = threading.Condition()
37        self._responses = collections.deque()
38        self._open = True
39
40    def __iter__(self):
41        return self
42
43    def _next(self):
44        with self._condition:
45            while not self._responses and self._open:
46                self._condition.wait()
47            if self._responses:
48                return self._responses.popleft()
49            else:
50                raise StopIteration()
51
52    def next(self):
53        return self._next()
54
55    def __next__(self):
56        return self._next()
57
58    def add(self, response):
59        with self._condition:
60            self._responses.append(response)
61            self._condition.notify()
62
63    def close(self):
64        with self._condition:
65            self._open = False
66            self._condition.notify()
67
68
69def _watcher_to_send_response_callback_adapter(watcher):
70    def send_response_callback(response):
71        if response is None:
72            watcher.close()
73        else:
74            watcher.add(response)
75
76    return send_response_callback
77
78
79class HealthServicer(_health_pb2_grpc.HealthServicer):
80    """Servicer handling RPCs for service statuses."""
81
82    def __init__(
83        self, experimental_non_blocking=True, experimental_thread_pool=None
84    ):
85        self._lock = threading.RLock()
86        self._server_status = {"": _health_pb2.HealthCheckResponse.SERVING}
87        self._send_response_callbacks = {}
88        self.Watch.__func__.experimental_non_blocking = (
89            experimental_non_blocking
90        )
91        self.Watch.__func__.experimental_thread_pool = experimental_thread_pool
92        self._gracefully_shutting_down = False
93
94    def _on_close_callback(self, send_response_callback, service):
95        def callback():
96            with self._lock:
97                self._send_response_callbacks[service].remove(
98                    send_response_callback
99                )
100            send_response_callback(None)
101
102        return callback
103
104    def Check(self, request, context):
105        with self._lock:
106            status = self._server_status.get(request.service)
107            if status is None:
108                context.set_code(grpc.StatusCode.NOT_FOUND)
109                return _health_pb2.HealthCheckResponse()
110            else:
111                return _health_pb2.HealthCheckResponse(status=status)
112
113    # pylint: disable=arguments-differ
114    def Watch(self, request, context, send_response_callback=None):
115        blocking_watcher = None
116        if send_response_callback is None:
117            # The server does not support the experimental_non_blocking
118            # parameter. For backwards compatibility, return a blocking response
119            # generator.
120            blocking_watcher = _Watcher()
121            send_response_callback = _watcher_to_send_response_callback_adapter(
122                blocking_watcher
123            )
124        service = request.service
125        with self._lock:
126            status = self._server_status.get(service)
127            if status is None:
128                status = (
129                    _health_pb2.HealthCheckResponse.SERVICE_UNKNOWN
130                )  # pylint: disable=no-member
131            send_response_callback(
132                _health_pb2.HealthCheckResponse(status=status)
133            )
134            if service not in self._send_response_callbacks:
135                self._send_response_callbacks[service] = set()
136            self._send_response_callbacks[service].add(send_response_callback)
137            context.add_callback(
138                self._on_close_callback(send_response_callback, service)
139            )
140        return blocking_watcher
141
142    def set(self, service, status):
143        """Sets the status of a service.
144
145        Args:
146          service: string, the name of the service.
147          status: HealthCheckResponse.status enum value indicating the status of
148            the service
149        """
150        with self._lock:
151            if self._gracefully_shutting_down:
152                return
153            else:
154                self._server_status[service] = status
155                if service in self._send_response_callbacks:
156                    for send_response_callback in self._send_response_callbacks[
157                        service
158                    ]:
159                        send_response_callback(
160                            _health_pb2.HealthCheckResponse(status=status)
161                        )
162
163    def enter_graceful_shutdown(self):
164        """Permanently sets the status of all services to NOT_SERVING.
165
166        This should be invoked when the server is entering a graceful shutdown
167        period. After this method is invoked, future attempts to set the status
168        of a service will be ignored.
169
170        This is an EXPERIMENTAL API.
171        """
172        with self._lock:
173            if self._gracefully_shutting_down:
174                return
175            else:
176                for service in self._server_status:
177                    self.set(
178                        service, _health_pb2.HealthCheckResponse.NOT_SERVING
179                    )  # pylint: disable=no-member
180                self._gracefully_shutting_down = True
181