1# Copyright 2017 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import abc 16import threading 17 18import grpc 19from grpc_testing import _common 20 21_CLIENT_INACTIVE = object() 22 23 24class Handler(_common.ServerRpcHandler): 25 @abc.abstractmethod 26 def initial_metadata(self): 27 raise NotImplementedError() 28 29 @abc.abstractmethod 30 def add_request(self, request): 31 raise NotImplementedError() 32 33 @abc.abstractmethod 34 def take_response(self): 35 raise NotImplementedError() 36 37 @abc.abstractmethod 38 def requests_closed(self): 39 raise NotImplementedError() 40 41 @abc.abstractmethod 42 def cancel(self): 43 raise NotImplementedError() 44 45 @abc.abstractmethod 46 def unary_response_termination(self): 47 raise NotImplementedError() 48 49 @abc.abstractmethod 50 def stream_response_termination(self): 51 raise NotImplementedError() 52 53 54class _Handler(Handler): 55 def __init__(self, requests_closed): 56 self._condition = threading.Condition() 57 self._requests = [] 58 self._requests_closed = requests_closed 59 self._initial_metadata = None 60 self._responses = [] 61 self._trailing_metadata = None 62 self._code = None 63 self._details = None 64 self._unary_response = None 65 self._expiration_future = None 66 self._termination_callbacks = [] 67 68 def send_initial_metadata(self, initial_metadata): 69 with self._condition: 70 self._initial_metadata = initial_metadata 71 self._condition.notify_all() 72 73 def take_request(self): 74 with self._condition: 75 while True: 76 if self._code is None: 77 if self._requests: 78 request = self._requests.pop(0) 79 self._condition.notify_all() 80 return _common.ServerRpcRead(request, False, False) 81 elif self._requests_closed: 82 return _common.REQUESTS_CLOSED 83 else: 84 self._condition.wait() 85 else: 86 return _common.TERMINATED 87 88 def is_active(self): 89 with self._condition: 90 return self._code is None 91 92 def add_response(self, response): 93 with self._condition: 94 self._responses.append(response) 95 self._condition.notify_all() 96 97 def send_termination(self, trailing_metadata, code, details): 98 with self._condition: 99 self._trailing_metadata = trailing_metadata 100 self._code = code 101 self._details = details 102 if self._expiration_future is not None: 103 self._expiration_future.cancel() 104 self._condition.notify_all() 105 106 def add_termination_callback(self, callback): 107 with self._condition: 108 if self._code is None: 109 self._termination_callbacks.append(callback) 110 return True 111 else: 112 return False 113 114 def initial_metadata(self): 115 with self._condition: 116 while True: 117 if self._initial_metadata is None: 118 if self._code is None: 119 self._condition.wait() 120 else: 121 raise ValueError( 122 "No initial metadata despite status code!" 123 ) 124 else: 125 return self._initial_metadata 126 127 def add_request(self, request): 128 with self._condition: 129 self._requests.append(request) 130 self._condition.notify_all() 131 132 def take_response(self): 133 with self._condition: 134 while True: 135 if self._responses: 136 response = self._responses.pop(0) 137 self._condition.notify_all() 138 return response 139 elif self._code is None: 140 self._condition.wait() 141 else: 142 raise ValueError("No more responses!") 143 144 def requests_closed(self): 145 with self._condition: 146 self._requests_closed = True 147 self._condition.notify_all() 148 149 def cancel(self): 150 with self._condition: 151 if self._code is None: 152 self._code = _CLIENT_INACTIVE 153 termination_callbacks = self._termination_callbacks 154 self._termination_callbacks = None 155 if self._expiration_future is not None: 156 self._expiration_future.cancel() 157 self._condition.notify_all() 158 for termination_callback in termination_callbacks: 159 termination_callback() 160 161 def unary_response_termination(self): 162 with self._condition: 163 while True: 164 if self._code is _CLIENT_INACTIVE: 165 raise ValueError("Huh? Cancelled but wanting status?") 166 elif self._code is None: 167 self._condition.wait() 168 else: 169 if self._unary_response is None: 170 if self._responses: 171 self._unary_response = self._responses.pop(0) 172 return ( 173 self._unary_response, 174 self._trailing_metadata, 175 self._code, 176 self._details, 177 ) 178 179 def stream_response_termination(self): 180 with self._condition: 181 while True: 182 if self._code is _CLIENT_INACTIVE: 183 raise ValueError("Huh? Cancelled but wanting status?") 184 elif self._code is None: 185 self._condition.wait() 186 else: 187 return self._trailing_metadata, self._code, self._details 188 189 def expire(self): 190 with self._condition: 191 if self._code is None: 192 if self._initial_metadata is None: 193 self._initial_metadata = _common.FUSSED_EMPTY_METADATA 194 self._trailing_metadata = _common.FUSSED_EMPTY_METADATA 195 self._code = grpc.StatusCode.DEADLINE_EXCEEDED 196 self._details = "Took too much time!" 197 termination_callbacks = self._termination_callbacks 198 self._termination_callbacks = None 199 self._condition.notify_all() 200 for termination_callback in termination_callbacks: 201 termination_callback() 202 203 def set_expiration_future(self, expiration_future): 204 with self._condition: 205 self._expiration_future = expiration_future 206 207 208def handler_without_deadline(requests_closed): 209 return _Handler(requests_closed) 210 211 212def handler_with_deadline(requests_closed, time, deadline): 213 handler = _Handler(requests_closed) 214 expiration_future = time.call_at(handler.expire, deadline) 215 handler.set_expiration_future(expiration_future) 216 return handler 217