1# Copyright 2017 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import logging 16import threading 17 18import grpc 19 20_NOT_YET_OBSERVED = object() 21logging.basicConfig() 22_LOGGER = logging.getLogger(__name__) 23 24 25def _cancel(handler): 26 return handler.cancel(grpc.StatusCode.CANCELLED, "Locally cancelled!") 27 28 29def _is_active(handler): 30 return handler.is_active() 31 32 33def _time_remaining(unused_handler): 34 raise NotImplementedError() 35 36 37def _add_callback(handler, callback): 38 return handler.add_callback(callback) 39 40 41def _initial_metadata(handler): 42 return handler.initial_metadata() 43 44 45def _trailing_metadata(handler): 46 trailing_metadata, unused_code, unused_details = handler.termination() 47 return trailing_metadata 48 49 50def _code(handler): 51 unused_trailing_metadata, code, unused_details = handler.termination() 52 return code 53 54 55def _details(handler): 56 unused_trailing_metadata, unused_code, details = handler.termination() 57 return details 58 59 60class _Call(grpc.Call): 61 def __init__(self, handler): 62 self._handler = handler 63 64 def cancel(self): 65 _cancel(self._handler) 66 67 def is_active(self): 68 return _is_active(self._handler) 69 70 def time_remaining(self): 71 return _time_remaining(self._handler) 72 73 def add_callback(self, callback): 74 return _add_callback(self._handler, callback) 75 76 def initial_metadata(self): 77 return _initial_metadata(self._handler) 78 79 def trailing_metadata(self): 80 return _trailing_metadata(self._handler) 81 82 def code(self): 83 return _code(self._handler) 84 85 def details(self): 86 return _details(self._handler) 87 88 89class _RpcErrorCall(grpc.RpcError, grpc.Call): 90 def __init__(self, handler): 91 self._handler = handler 92 93 def cancel(self): 94 _cancel(self._handler) 95 96 def is_active(self): 97 return _is_active(self._handler) 98 99 def time_remaining(self): 100 return _time_remaining(self._handler) 101 102 def add_callback(self, callback): 103 return _add_callback(self._handler, callback) 104 105 def initial_metadata(self): 106 return _initial_metadata(self._handler) 107 108 def trailing_metadata(self): 109 return _trailing_metadata(self._handler) 110 111 def code(self): 112 return _code(self._handler) 113 114 def details(self): 115 return _details(self._handler) 116 117 118def _next(handler): 119 read = handler.take_response() 120 if read.code is None: 121 return read.response 122 elif read.code is grpc.StatusCode.OK: 123 raise StopIteration() 124 else: 125 raise _RpcErrorCall(handler) 126 127 128class _HandlerExtras(object): 129 def __init__(self): 130 self.condition = threading.Condition() 131 self.unary_response = _NOT_YET_OBSERVED 132 self.cancelled = False 133 134 135def _with_extras_cancel(handler, extras): 136 with extras.condition: 137 if handler.cancel(grpc.StatusCode.CANCELLED, "Locally cancelled!"): 138 extras.cancelled = True 139 return True 140 else: 141 return False 142 143 144def _extras_without_cancelled(extras): 145 with extras.condition: 146 return extras.cancelled 147 148 149def _running(handler): 150 return handler.is_active() 151 152 153def _done(handler): 154 return not handler.is_active() 155 156 157def _with_extras_unary_response(handler, extras): 158 with extras.condition: 159 if extras.unary_response is _NOT_YET_OBSERVED: 160 read = handler.take_response() 161 if read.code is None: 162 extras.unary_response = read.response 163 return read.response 164 else: 165 raise _RpcErrorCall(handler) 166 else: 167 return extras.unary_response 168 169 170def _exception(unused_handler): 171 raise NotImplementedError("TODO!") 172 173 174def _traceback(unused_handler): 175 raise NotImplementedError("TODO!") 176 177 178def _add_done_callback(handler, callback, future): 179 adapted_callback = lambda: callback(future) 180 if not handler.add_callback(adapted_callback): 181 callback(future) 182 183 184class _FutureCall(grpc.Future, grpc.Call): 185 def __init__(self, handler, extras): 186 self._handler = handler 187 self._extras = extras 188 189 def cancel(self): 190 return _with_extras_cancel(self._handler, self._extras) 191 192 def cancelled(self): 193 return _extras_without_cancelled(self._extras) 194 195 def running(self): 196 return _running(self._handler) 197 198 def done(self): 199 return _done(self._handler) 200 201 def result(self): 202 return _with_extras_unary_response(self._handler, self._extras) 203 204 def exception(self): 205 return _exception(self._handler) 206 207 def traceback(self): 208 return _traceback(self._handler) 209 210 def add_done_callback(self, fn): 211 _add_done_callback(self._handler, fn, self) 212 213 def is_active(self): 214 return _is_active(self._handler) 215 216 def time_remaining(self): 217 return _time_remaining(self._handler) 218 219 def add_callback(self, callback): 220 return _add_callback(self._handler, callback) 221 222 def initial_metadata(self): 223 return _initial_metadata(self._handler) 224 225 def trailing_metadata(self): 226 return _trailing_metadata(self._handler) 227 228 def code(self): 229 return _code(self._handler) 230 231 def details(self): 232 return _details(self._handler) 233 234 235def consume_requests(request_iterator, handler): 236 def _consume(): 237 while True: 238 try: 239 request = next(request_iterator) 240 added = handler.add_request(request) 241 if not added: 242 break 243 except StopIteration: 244 handler.close_requests() 245 break 246 except Exception: # pylint: disable=broad-except 247 details = "Exception iterating requests!" 248 _LOGGER.exception(details) 249 handler.cancel(grpc.StatusCode.UNKNOWN, details) 250 251 consumption = threading.Thread(target=_consume) 252 consumption.start() 253 254 255def blocking_unary_response(handler): 256 read = handler.take_response() 257 if read.code is None: 258 unused_trailing_metadata, code, unused_details = handler.termination() 259 if code is grpc.StatusCode.OK: 260 return read.response 261 else: 262 raise _RpcErrorCall(handler) 263 else: 264 raise _RpcErrorCall(handler) 265 266 267def blocking_unary_response_with_call(handler): 268 read = handler.take_response() 269 if read.code is None: 270 unused_trailing_metadata, code, unused_details = handler.termination() 271 if code is grpc.StatusCode.OK: 272 return read.response, _Call(handler) 273 else: 274 raise _RpcErrorCall(handler) 275 else: 276 raise _RpcErrorCall(handler) 277 278 279def future_call(handler): 280 return _FutureCall(handler, _HandlerExtras()) 281 282 283class ResponseIteratorCall(grpc.Call): 284 def __init__(self, handler): 285 self._handler = handler 286 287 def __iter__(self): 288 return self 289 290 def __next__(self): 291 return _next(self._handler) 292 293 def next(self): 294 return _next(self._handler) 295 296 def cancel(self): 297 _cancel(self._handler) 298 299 def is_active(self): 300 return _is_active(self._handler) 301 302 def time_remaining(self): 303 return _time_remaining(self._handler) 304 305 def add_callback(self, callback): 306 return _add_callback(self._handler, callback) 307 308 def initial_metadata(self): 309 return _initial_metadata(self._handler) 310 311 def trailing_metadata(self): 312 return _trailing_metadata(self._handler) 313 314 def code(self): 315 return _code(self._handler) 316 317 def details(self): 318 return _details(self._handler) 319