1*cc02d7e2SAndroid Build Coastguard Worker# Copyright 2019 gRPC authors. 2*cc02d7e2SAndroid Build Coastguard Worker# 3*cc02d7e2SAndroid Build Coastguard Worker# Licensed under the Apache License, Version 2.0 (the "License"); 4*cc02d7e2SAndroid Build Coastguard Worker# you may not use this file except in compliance with the License. 5*cc02d7e2SAndroid Build Coastguard Worker# You may obtain a copy of the License at 6*cc02d7e2SAndroid Build Coastguard Worker# 7*cc02d7e2SAndroid Build Coastguard Worker# http://www.apache.org/licenses/LICENSE-2.0 8*cc02d7e2SAndroid Build Coastguard Worker# 9*cc02d7e2SAndroid Build Coastguard Worker# Unless required by applicable law or agreed to in writing, software 10*cc02d7e2SAndroid Build Coastguard Worker# distributed under the License is distributed on an "AS IS" BASIS, 11*cc02d7e2SAndroid Build Coastguard Worker# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12*cc02d7e2SAndroid Build Coastguard Worker# See the License for the specific language governing permissions and 13*cc02d7e2SAndroid Build Coastguard Worker# limitations under the License. 14*cc02d7e2SAndroid Build Coastguard Worker"""An example of multiprocess concurrency with gRPC.""" 15*cc02d7e2SAndroid Build Coastguard Worker 16*cc02d7e2SAndroid Build Coastguard Workerfrom __future__ import absolute_import 17*cc02d7e2SAndroid Build Coastguard Workerfrom __future__ import division 18*cc02d7e2SAndroid Build Coastguard Workerfrom __future__ import print_function 19*cc02d7e2SAndroid Build Coastguard Worker 20*cc02d7e2SAndroid Build Coastguard Workerfrom concurrent import futures 21*cc02d7e2SAndroid Build Coastguard Workerimport contextlib 22*cc02d7e2SAndroid Build Coastguard Workerimport datetime 23*cc02d7e2SAndroid Build Coastguard Workerimport logging 24*cc02d7e2SAndroid Build Coastguard Workerimport math 25*cc02d7e2SAndroid Build Coastguard Workerimport multiprocessing 26*cc02d7e2SAndroid Build Coastguard Workerimport socket 27*cc02d7e2SAndroid Build Coastguard Workerimport sys 28*cc02d7e2SAndroid Build Coastguard Workerimport time 29*cc02d7e2SAndroid Build Coastguard Worker 30*cc02d7e2SAndroid Build Coastguard Workerimport grpc 31*cc02d7e2SAndroid Build Coastguard Workerimport prime_pb2 32*cc02d7e2SAndroid Build Coastguard Workerimport prime_pb2_grpc 33*cc02d7e2SAndroid Build Coastguard Worker 34*cc02d7e2SAndroid Build Coastguard Worker_LOGGER = logging.getLogger(__name__) 35*cc02d7e2SAndroid Build Coastguard Worker 36*cc02d7e2SAndroid Build Coastguard Worker_ONE_DAY = datetime.timedelta(days=1) 37*cc02d7e2SAndroid Build Coastguard Worker_PROCESS_COUNT = multiprocessing.cpu_count() 38*cc02d7e2SAndroid Build Coastguard Worker_THREAD_CONCURRENCY = _PROCESS_COUNT 39*cc02d7e2SAndroid Build Coastguard Worker 40*cc02d7e2SAndroid Build Coastguard Worker 41*cc02d7e2SAndroid Build Coastguard Workerdef is_prime(n): 42*cc02d7e2SAndroid Build Coastguard Worker for i in range(2, int(math.ceil(math.sqrt(n)))): 43*cc02d7e2SAndroid Build Coastguard Worker if n % i == 0: 44*cc02d7e2SAndroid Build Coastguard Worker return False 45*cc02d7e2SAndroid Build Coastguard Worker else: 46*cc02d7e2SAndroid Build Coastguard Worker return True 47*cc02d7e2SAndroid Build Coastguard Worker 48*cc02d7e2SAndroid Build Coastguard Worker 49*cc02d7e2SAndroid Build Coastguard Workerclass PrimeChecker(prime_pb2_grpc.PrimeCheckerServicer): 50*cc02d7e2SAndroid Build Coastguard Worker def check(self, request, context): 51*cc02d7e2SAndroid Build Coastguard Worker _LOGGER.info("Determining primality of %s", request.candidate) 52*cc02d7e2SAndroid Build Coastguard Worker return prime_pb2.Primality(isPrime=is_prime(request.candidate)) 53*cc02d7e2SAndroid Build Coastguard Worker 54*cc02d7e2SAndroid Build Coastguard Worker 55*cc02d7e2SAndroid Build Coastguard Workerdef _wait_forever(server): 56*cc02d7e2SAndroid Build Coastguard Worker try: 57*cc02d7e2SAndroid Build Coastguard Worker while True: 58*cc02d7e2SAndroid Build Coastguard Worker time.sleep(_ONE_DAY.total_seconds()) 59*cc02d7e2SAndroid Build Coastguard Worker except KeyboardInterrupt: 60*cc02d7e2SAndroid Build Coastguard Worker server.stop(None) 61*cc02d7e2SAndroid Build Coastguard Worker 62*cc02d7e2SAndroid Build Coastguard Worker 63*cc02d7e2SAndroid Build Coastguard Workerdef _run_server(bind_address): 64*cc02d7e2SAndroid Build Coastguard Worker """Start a server in a subprocess.""" 65*cc02d7e2SAndroid Build Coastguard Worker _LOGGER.info("Starting new server.") 66*cc02d7e2SAndroid Build Coastguard Worker options = (("grpc.so_reuseport", 1),) 67*cc02d7e2SAndroid Build Coastguard Worker 68*cc02d7e2SAndroid Build Coastguard Worker server = grpc.server( 69*cc02d7e2SAndroid Build Coastguard Worker futures.ThreadPoolExecutor( 70*cc02d7e2SAndroid Build Coastguard Worker max_workers=_THREAD_CONCURRENCY, 71*cc02d7e2SAndroid Build Coastguard Worker ), 72*cc02d7e2SAndroid Build Coastguard Worker options=options, 73*cc02d7e2SAndroid Build Coastguard Worker ) 74*cc02d7e2SAndroid Build Coastguard Worker prime_pb2_grpc.add_PrimeCheckerServicer_to_server(PrimeChecker(), server) 75*cc02d7e2SAndroid Build Coastguard Worker server.add_insecure_port(bind_address) 76*cc02d7e2SAndroid Build Coastguard Worker server.start() 77*cc02d7e2SAndroid Build Coastguard Worker _wait_forever(server) 78*cc02d7e2SAndroid Build Coastguard Worker 79*cc02d7e2SAndroid Build Coastguard Worker 80*cc02d7e2SAndroid Build Coastguard Worker@contextlib.contextmanager 81*cc02d7e2SAndroid Build Coastguard Workerdef _reserve_port(): 82*cc02d7e2SAndroid Build Coastguard Worker """Find and reserve a port for all subprocesses to use.""" 83*cc02d7e2SAndroid Build Coastguard Worker sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) 84*cc02d7e2SAndroid Build Coastguard Worker sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) 85*cc02d7e2SAndroid Build Coastguard Worker if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 0: 86*cc02d7e2SAndroid Build Coastguard Worker raise RuntimeError("Failed to set SO_REUSEPORT.") 87*cc02d7e2SAndroid Build Coastguard Worker sock.bind(("", 0)) 88*cc02d7e2SAndroid Build Coastguard Worker try: 89*cc02d7e2SAndroid Build Coastguard Worker yield sock.getsockname()[1] 90*cc02d7e2SAndroid Build Coastguard Worker finally: 91*cc02d7e2SAndroid Build Coastguard Worker sock.close() 92*cc02d7e2SAndroid Build Coastguard Worker 93*cc02d7e2SAndroid Build Coastguard Worker 94*cc02d7e2SAndroid Build Coastguard Workerdef main(): 95*cc02d7e2SAndroid Build Coastguard Worker with _reserve_port() as port: 96*cc02d7e2SAndroid Build Coastguard Worker bind_address = "localhost:{}".format(port) 97*cc02d7e2SAndroid Build Coastguard Worker _LOGGER.info("Binding to '%s'", bind_address) 98*cc02d7e2SAndroid Build Coastguard Worker sys.stdout.flush() 99*cc02d7e2SAndroid Build Coastguard Worker workers = [] 100*cc02d7e2SAndroid Build Coastguard Worker for _ in range(_PROCESS_COUNT): 101*cc02d7e2SAndroid Build Coastguard Worker # NOTE: It is imperative that the worker subprocesses be forked before 102*cc02d7e2SAndroid Build Coastguard Worker # any gRPC servers start up. See 103*cc02d7e2SAndroid Build Coastguard Worker # https://github.com/grpc/grpc/issues/16001 for more details. 104*cc02d7e2SAndroid Build Coastguard Worker worker = multiprocessing.Process( 105*cc02d7e2SAndroid Build Coastguard Worker target=_run_server, args=(bind_address,) 106*cc02d7e2SAndroid Build Coastguard Worker ) 107*cc02d7e2SAndroid Build Coastguard Worker worker.start() 108*cc02d7e2SAndroid Build Coastguard Worker workers.append(worker) 109*cc02d7e2SAndroid Build Coastguard Worker for worker in workers: 110*cc02d7e2SAndroid Build Coastguard Worker worker.join() 111*cc02d7e2SAndroid Build Coastguard Worker 112*cc02d7e2SAndroid Build Coastguard Worker 113*cc02d7e2SAndroid Build Coastguard Workerif __name__ == "__main__": 114*cc02d7e2SAndroid Build Coastguard Worker handler = logging.StreamHandler(sys.stdout) 115*cc02d7e2SAndroid Build Coastguard Worker formatter = logging.Formatter("[PID %(process)d] %(message)s") 116*cc02d7e2SAndroid Build Coastguard Worker handler.setFormatter(formatter) 117*cc02d7e2SAndroid Build Coastguard Worker _LOGGER.addHandler(handler) 118*cc02d7e2SAndroid Build Coastguard Worker _LOGGER.setLevel(logging.INFO) 119*cc02d7e2SAndroid Build Coastguard Worker main() 120