1*cc02d7e2SAndroid Build Coastguard Worker# Copyright 2019 the gRPC authors. 2*cc02d7e2SAndroid Build Coastguard Worker# 3*cc02d7e2SAndroid Build Coastguard Worker# Licensed under the Apache License, Version 2.0 (the "License"); 4*cc02d7e2SAndroid Build Coastguard Worker# you may not use this file except in compliance with the License. 5*cc02d7e2SAndroid Build Coastguard Worker# You may obtain a copy of the License at 6*cc02d7e2SAndroid Build Coastguard Worker# 7*cc02d7e2SAndroid Build Coastguard Worker# http://www.apache.org/licenses/LICENSE-2.0 8*cc02d7e2SAndroid Build Coastguard Worker# 9*cc02d7e2SAndroid Build Coastguard Worker# Unless required by applicable law or agreed to in writing, software 10*cc02d7e2SAndroid Build Coastguard Worker# distributed under the License is distributed on an "AS IS" BASIS, 11*cc02d7e2SAndroid Build Coastguard Worker# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12*cc02d7e2SAndroid Build Coastguard Worker# See the License for the specific language governing permissions and 13*cc02d7e2SAndroid Build Coastguard Worker# limitations under the License. 14*cc02d7e2SAndroid Build Coastguard Worker"""An example of cancelling requests in gRPC.""" 15*cc02d7e2SAndroid Build Coastguard Worker 16*cc02d7e2SAndroid Build Coastguard Workerfrom __future__ import absolute_import 17*cc02d7e2SAndroid Build Coastguard Workerfrom __future__ import division 18*cc02d7e2SAndroid Build Coastguard Workerfrom __future__ import print_function 19*cc02d7e2SAndroid Build Coastguard Worker 20*cc02d7e2SAndroid Build Coastguard Workerimport argparse 21*cc02d7e2SAndroid Build Coastguard Workerfrom concurrent import futures 22*cc02d7e2SAndroid Build Coastguard Workerimport logging 23*cc02d7e2SAndroid Build Coastguard Workerimport threading 24*cc02d7e2SAndroid Build Coastguard Worker 25*cc02d7e2SAndroid Build Coastguard Workerimport grpc 26*cc02d7e2SAndroid Build Coastguard Workerimport search 27*cc02d7e2SAndroid Build Coastguard Worker 28*cc02d7e2SAndroid Build Coastguard Workerfrom examples.python.cancellation import hash_name_pb2 29*cc02d7e2SAndroid Build Coastguard Workerfrom examples.python.cancellation import hash_name_pb2_grpc 30*cc02d7e2SAndroid Build Coastguard Worker 31*cc02d7e2SAndroid Build Coastguard Worker_LOGGER = logging.getLogger(__name__) 32*cc02d7e2SAndroid Build Coastguard Worker_SERVER_HOST = "localhost" 33*cc02d7e2SAndroid Build Coastguard Worker 34*cc02d7e2SAndroid Build Coastguard Worker_DESCRIPTION = "A server for finding hashes similar to names." 35*cc02d7e2SAndroid Build Coastguard Worker 36*cc02d7e2SAndroid Build Coastguard Worker 37*cc02d7e2SAndroid Build Coastguard Workerclass HashFinder(hash_name_pb2_grpc.HashFinderServicer): 38*cc02d7e2SAndroid Build Coastguard Worker def __init__(self, maximum_hashes): 39*cc02d7e2SAndroid Build Coastguard Worker super(HashFinder, self).__init__() 40*cc02d7e2SAndroid Build Coastguard Worker self._maximum_hashes = maximum_hashes 41*cc02d7e2SAndroid Build Coastguard Worker 42*cc02d7e2SAndroid Build Coastguard Worker def Find(self, request, context): 43*cc02d7e2SAndroid Build Coastguard Worker stop_event = threading.Event() 44*cc02d7e2SAndroid Build Coastguard Worker 45*cc02d7e2SAndroid Build Coastguard Worker def on_rpc_done(): 46*cc02d7e2SAndroid Build Coastguard Worker _LOGGER.debug("Attempting to regain servicer thread.") 47*cc02d7e2SAndroid Build Coastguard Worker stop_event.set() 48*cc02d7e2SAndroid Build Coastguard Worker 49*cc02d7e2SAndroid Build Coastguard Worker context.add_callback(on_rpc_done) 50*cc02d7e2SAndroid Build Coastguard Worker candidates = [] 51*cc02d7e2SAndroid Build Coastguard Worker try: 52*cc02d7e2SAndroid Build Coastguard Worker candidates = list( 53*cc02d7e2SAndroid Build Coastguard Worker search.search( 54*cc02d7e2SAndroid Build Coastguard Worker request.desired_name, 55*cc02d7e2SAndroid Build Coastguard Worker request.ideal_hamming_distance, 56*cc02d7e2SAndroid Build Coastguard Worker stop_event, 57*cc02d7e2SAndroid Build Coastguard Worker self._maximum_hashes, 58*cc02d7e2SAndroid Build Coastguard Worker ) 59*cc02d7e2SAndroid Build Coastguard Worker ) 60*cc02d7e2SAndroid Build Coastguard Worker except search.ResourceLimitExceededError: 61*cc02d7e2SAndroid Build Coastguard Worker _LOGGER.info("Cancelling RPC due to exhausted resources.") 62*cc02d7e2SAndroid Build Coastguard Worker context.cancel() 63*cc02d7e2SAndroid Build Coastguard Worker _LOGGER.debug("Servicer thread returning.") 64*cc02d7e2SAndroid Build Coastguard Worker if not candidates: 65*cc02d7e2SAndroid Build Coastguard Worker return hash_name_pb2.HashNameResponse() 66*cc02d7e2SAndroid Build Coastguard Worker return candidates[-1] 67*cc02d7e2SAndroid Build Coastguard Worker 68*cc02d7e2SAndroid Build Coastguard Worker def FindRange(self, request, context): 69*cc02d7e2SAndroid Build Coastguard Worker stop_event = threading.Event() 70*cc02d7e2SAndroid Build Coastguard Worker 71*cc02d7e2SAndroid Build Coastguard Worker def on_rpc_done(): 72*cc02d7e2SAndroid Build Coastguard Worker _LOGGER.debug("Attempting to regain servicer thread.") 73*cc02d7e2SAndroid Build Coastguard Worker stop_event.set() 74*cc02d7e2SAndroid Build Coastguard Worker 75*cc02d7e2SAndroid Build Coastguard Worker context.add_callback(on_rpc_done) 76*cc02d7e2SAndroid Build Coastguard Worker secret_generator = search.search( 77*cc02d7e2SAndroid Build Coastguard Worker request.desired_name, 78*cc02d7e2SAndroid Build Coastguard Worker request.ideal_hamming_distance, 79*cc02d7e2SAndroid Build Coastguard Worker stop_event, 80*cc02d7e2SAndroid Build Coastguard Worker self._maximum_hashes, 81*cc02d7e2SAndroid Build Coastguard Worker interesting_hamming_distance=request.interesting_hamming_distance, 82*cc02d7e2SAndroid Build Coastguard Worker ) 83*cc02d7e2SAndroid Build Coastguard Worker try: 84*cc02d7e2SAndroid Build Coastguard Worker for candidate in secret_generator: 85*cc02d7e2SAndroid Build Coastguard Worker yield candidate 86*cc02d7e2SAndroid Build Coastguard Worker except search.ResourceLimitExceededError: 87*cc02d7e2SAndroid Build Coastguard Worker _LOGGER.info("Cancelling RPC due to exhausted resources.") 88*cc02d7e2SAndroid Build Coastguard Worker context.cancel() 89*cc02d7e2SAndroid Build Coastguard Worker _LOGGER.debug("Regained servicer thread.") 90*cc02d7e2SAndroid Build Coastguard Worker 91*cc02d7e2SAndroid Build Coastguard Worker 92*cc02d7e2SAndroid Build Coastguard Workerdef _running_server(port, maximum_hashes): 93*cc02d7e2SAndroid Build Coastguard Worker # We use only a single servicer thread here to demonstrate that, if managed 94*cc02d7e2SAndroid Build Coastguard Worker # carefully, cancelled RPCs can need not continue occupying servicers 95*cc02d7e2SAndroid Build Coastguard Worker # threads. 96*cc02d7e2SAndroid Build Coastguard Worker server = grpc.server( 97*cc02d7e2SAndroid Build Coastguard Worker futures.ThreadPoolExecutor(max_workers=1), maximum_concurrent_rpcs=1 98*cc02d7e2SAndroid Build Coastguard Worker ) 99*cc02d7e2SAndroid Build Coastguard Worker hash_name_pb2_grpc.add_HashFinderServicer_to_server( 100*cc02d7e2SAndroid Build Coastguard Worker HashFinder(maximum_hashes), server 101*cc02d7e2SAndroid Build Coastguard Worker ) 102*cc02d7e2SAndroid Build Coastguard Worker address = "{}:{}".format(_SERVER_HOST, port) 103*cc02d7e2SAndroid Build Coastguard Worker actual_port = server.add_insecure_port(address) 104*cc02d7e2SAndroid Build Coastguard Worker server.start() 105*cc02d7e2SAndroid Build Coastguard Worker print("Server listening at '{}'".format(address)) 106*cc02d7e2SAndroid Build Coastguard Worker return server 107*cc02d7e2SAndroid Build Coastguard Worker 108*cc02d7e2SAndroid Build Coastguard Worker 109*cc02d7e2SAndroid Build Coastguard Workerdef main(): 110*cc02d7e2SAndroid Build Coastguard Worker parser = argparse.ArgumentParser(description=_DESCRIPTION) 111*cc02d7e2SAndroid Build Coastguard Worker parser.add_argument( 112*cc02d7e2SAndroid Build Coastguard Worker "--port", 113*cc02d7e2SAndroid Build Coastguard Worker type=int, 114*cc02d7e2SAndroid Build Coastguard Worker default=50051, 115*cc02d7e2SAndroid Build Coastguard Worker nargs="?", 116*cc02d7e2SAndroid Build Coastguard Worker help="The port on which the server will listen.", 117*cc02d7e2SAndroid Build Coastguard Worker ) 118*cc02d7e2SAndroid Build Coastguard Worker parser.add_argument( 119*cc02d7e2SAndroid Build Coastguard Worker "--maximum-hashes", 120*cc02d7e2SAndroid Build Coastguard Worker type=int, 121*cc02d7e2SAndroid Build Coastguard Worker default=1000000, 122*cc02d7e2SAndroid Build Coastguard Worker nargs="?", 123*cc02d7e2SAndroid Build Coastguard Worker help="The maximum number of hashes to search before cancelling.", 124*cc02d7e2SAndroid Build Coastguard Worker ) 125*cc02d7e2SAndroid Build Coastguard Worker args = parser.parse_args() 126*cc02d7e2SAndroid Build Coastguard Worker server = _running_server(args.port, args.maximum_hashes) 127*cc02d7e2SAndroid Build Coastguard Worker server.wait_for_termination() 128*cc02d7e2SAndroid Build Coastguard Worker 129*cc02d7e2SAndroid Build Coastguard Worker 130*cc02d7e2SAndroid Build Coastguard Workerif __name__ == "__main__": 131*cc02d7e2SAndroid Build Coastguard Worker logging.basicConfig() 132*cc02d7e2SAndroid Build Coastguard Worker main() 133