xref: /aosp_15_r20/external/grpc-grpc/examples/python/multiprocessing/server.py (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1*cc02d7e2SAndroid Build Coastguard Worker# Copyright 2019 gRPC authors.
2*cc02d7e2SAndroid Build Coastguard Worker#
3*cc02d7e2SAndroid Build Coastguard Worker# Licensed under the Apache License, Version 2.0 (the "License");
4*cc02d7e2SAndroid Build Coastguard Worker# you may not use this file except in compliance with the License.
5*cc02d7e2SAndroid Build Coastguard Worker# You may obtain a copy of the License at
6*cc02d7e2SAndroid Build Coastguard Worker#
7*cc02d7e2SAndroid Build Coastguard Worker#     http://www.apache.org/licenses/LICENSE-2.0
8*cc02d7e2SAndroid Build Coastguard Worker#
9*cc02d7e2SAndroid Build Coastguard Worker# Unless required by applicable law or agreed to in writing, software
10*cc02d7e2SAndroid Build Coastguard Worker# distributed under the License is distributed on an "AS IS" BASIS,
11*cc02d7e2SAndroid Build Coastguard Worker# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12*cc02d7e2SAndroid Build Coastguard Worker# See the License for the specific language governing permissions and
13*cc02d7e2SAndroid Build Coastguard Worker# limitations under the License.
14*cc02d7e2SAndroid Build Coastguard Worker"""An example of multiprocess concurrency with gRPC."""
15*cc02d7e2SAndroid Build Coastguard Worker
16*cc02d7e2SAndroid Build Coastguard Workerfrom __future__ import absolute_import
17*cc02d7e2SAndroid Build Coastguard Workerfrom __future__ import division
18*cc02d7e2SAndroid Build Coastguard Workerfrom __future__ import print_function
19*cc02d7e2SAndroid Build Coastguard Worker
20*cc02d7e2SAndroid Build Coastguard Workerfrom concurrent import futures
21*cc02d7e2SAndroid Build Coastguard Workerimport contextlib
22*cc02d7e2SAndroid Build Coastguard Workerimport datetime
23*cc02d7e2SAndroid Build Coastguard Workerimport logging
24*cc02d7e2SAndroid Build Coastguard Workerimport math
25*cc02d7e2SAndroid Build Coastguard Workerimport multiprocessing
26*cc02d7e2SAndroid Build Coastguard Workerimport socket
27*cc02d7e2SAndroid Build Coastguard Workerimport sys
28*cc02d7e2SAndroid Build Coastguard Workerimport time
29*cc02d7e2SAndroid Build Coastguard Worker
30*cc02d7e2SAndroid Build Coastguard Workerimport grpc
31*cc02d7e2SAndroid Build Coastguard Workerimport prime_pb2
32*cc02d7e2SAndroid Build Coastguard Workerimport prime_pb2_grpc
33*cc02d7e2SAndroid Build Coastguard Worker
34*cc02d7e2SAndroid Build Coastguard Worker_LOGGER = logging.getLogger(__name__)
35*cc02d7e2SAndroid Build Coastguard Worker
36*cc02d7e2SAndroid Build Coastguard Worker_ONE_DAY = datetime.timedelta(days=1)
37*cc02d7e2SAndroid Build Coastguard Worker_PROCESS_COUNT = multiprocessing.cpu_count()
38*cc02d7e2SAndroid Build Coastguard Worker_THREAD_CONCURRENCY = _PROCESS_COUNT
39*cc02d7e2SAndroid Build Coastguard Worker
40*cc02d7e2SAndroid Build Coastguard Worker
41*cc02d7e2SAndroid Build Coastguard Workerdef is_prime(n):
42*cc02d7e2SAndroid Build Coastguard Worker    for i in range(2, int(math.ceil(math.sqrt(n)))):
43*cc02d7e2SAndroid Build Coastguard Worker        if n % i == 0:
44*cc02d7e2SAndroid Build Coastguard Worker            return False
45*cc02d7e2SAndroid Build Coastguard Worker    else:
46*cc02d7e2SAndroid Build Coastguard Worker        return True
47*cc02d7e2SAndroid Build Coastguard Worker
48*cc02d7e2SAndroid Build Coastguard Worker
49*cc02d7e2SAndroid Build Coastguard Workerclass PrimeChecker(prime_pb2_grpc.PrimeCheckerServicer):
50*cc02d7e2SAndroid Build Coastguard Worker    def check(self, request, context):
51*cc02d7e2SAndroid Build Coastguard Worker        _LOGGER.info("Determining primality of %s", request.candidate)
52*cc02d7e2SAndroid Build Coastguard Worker        return prime_pb2.Primality(isPrime=is_prime(request.candidate))
53*cc02d7e2SAndroid Build Coastguard Worker
54*cc02d7e2SAndroid Build Coastguard Worker
55*cc02d7e2SAndroid Build Coastguard Workerdef _wait_forever(server):
56*cc02d7e2SAndroid Build Coastguard Worker    try:
57*cc02d7e2SAndroid Build Coastguard Worker        while True:
58*cc02d7e2SAndroid Build Coastguard Worker            time.sleep(_ONE_DAY.total_seconds())
59*cc02d7e2SAndroid Build Coastguard Worker    except KeyboardInterrupt:
60*cc02d7e2SAndroid Build Coastguard Worker        server.stop(None)
61*cc02d7e2SAndroid Build Coastguard Worker
62*cc02d7e2SAndroid Build Coastguard Worker
63*cc02d7e2SAndroid Build Coastguard Workerdef _run_server(bind_address):
64*cc02d7e2SAndroid Build Coastguard Worker    """Start a server in a subprocess."""
65*cc02d7e2SAndroid Build Coastguard Worker    _LOGGER.info("Starting new server.")
66*cc02d7e2SAndroid Build Coastguard Worker    options = (("grpc.so_reuseport", 1),)
67*cc02d7e2SAndroid Build Coastguard Worker
68*cc02d7e2SAndroid Build Coastguard Worker    server = grpc.server(
69*cc02d7e2SAndroid Build Coastguard Worker        futures.ThreadPoolExecutor(
70*cc02d7e2SAndroid Build Coastguard Worker            max_workers=_THREAD_CONCURRENCY,
71*cc02d7e2SAndroid Build Coastguard Worker        ),
72*cc02d7e2SAndroid Build Coastguard Worker        options=options,
73*cc02d7e2SAndroid Build Coastguard Worker    )
74*cc02d7e2SAndroid Build Coastguard Worker    prime_pb2_grpc.add_PrimeCheckerServicer_to_server(PrimeChecker(), server)
75*cc02d7e2SAndroid Build Coastguard Worker    server.add_insecure_port(bind_address)
76*cc02d7e2SAndroid Build Coastguard Worker    server.start()
77*cc02d7e2SAndroid Build Coastguard Worker    _wait_forever(server)
78*cc02d7e2SAndroid Build Coastguard Worker
79*cc02d7e2SAndroid Build Coastguard Worker
80*cc02d7e2SAndroid Build Coastguard Worker@contextlib.contextmanager
81*cc02d7e2SAndroid Build Coastguard Workerdef _reserve_port():
82*cc02d7e2SAndroid Build Coastguard Worker    """Find and reserve a port for all subprocesses to use."""
83*cc02d7e2SAndroid Build Coastguard Worker    sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
84*cc02d7e2SAndroid Build Coastguard Worker    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
85*cc02d7e2SAndroid Build Coastguard Worker    if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 0:
86*cc02d7e2SAndroid Build Coastguard Worker        raise RuntimeError("Failed to set SO_REUSEPORT.")
87*cc02d7e2SAndroid Build Coastguard Worker    sock.bind(("", 0))
88*cc02d7e2SAndroid Build Coastguard Worker    try:
89*cc02d7e2SAndroid Build Coastguard Worker        yield sock.getsockname()[1]
90*cc02d7e2SAndroid Build Coastguard Worker    finally:
91*cc02d7e2SAndroid Build Coastguard Worker        sock.close()
92*cc02d7e2SAndroid Build Coastguard Worker
93*cc02d7e2SAndroid Build Coastguard Worker
94*cc02d7e2SAndroid Build Coastguard Workerdef main():
95*cc02d7e2SAndroid Build Coastguard Worker    with _reserve_port() as port:
96*cc02d7e2SAndroid Build Coastguard Worker        bind_address = "localhost:{}".format(port)
97*cc02d7e2SAndroid Build Coastguard Worker        _LOGGER.info("Binding to '%s'", bind_address)
98*cc02d7e2SAndroid Build Coastguard Worker        sys.stdout.flush()
99*cc02d7e2SAndroid Build Coastguard Worker        workers = []
100*cc02d7e2SAndroid Build Coastguard Worker        for _ in range(_PROCESS_COUNT):
101*cc02d7e2SAndroid Build Coastguard Worker            # NOTE: It is imperative that the worker subprocesses be forked before
102*cc02d7e2SAndroid Build Coastguard Worker            # any gRPC servers start up. See
103*cc02d7e2SAndroid Build Coastguard Worker            # https://github.com/grpc/grpc/issues/16001 for more details.
104*cc02d7e2SAndroid Build Coastguard Worker            worker = multiprocessing.Process(
105*cc02d7e2SAndroid Build Coastguard Worker                target=_run_server, args=(bind_address,)
106*cc02d7e2SAndroid Build Coastguard Worker            )
107*cc02d7e2SAndroid Build Coastguard Worker            worker.start()
108*cc02d7e2SAndroid Build Coastguard Worker            workers.append(worker)
109*cc02d7e2SAndroid Build Coastguard Worker        for worker in workers:
110*cc02d7e2SAndroid Build Coastguard Worker            worker.join()
111*cc02d7e2SAndroid Build Coastguard Worker
112*cc02d7e2SAndroid Build Coastguard Worker
113*cc02d7e2SAndroid Build Coastguard Workerif __name__ == "__main__":
114*cc02d7e2SAndroid Build Coastguard Worker    handler = logging.StreamHandler(sys.stdout)
115*cc02d7e2SAndroid Build Coastguard Worker    formatter = logging.Formatter("[PID %(process)d] %(message)s")
116*cc02d7e2SAndroid Build Coastguard Worker    handler.setFormatter(formatter)
117*cc02d7e2SAndroid Build Coastguard Worker    _LOGGER.addHandler(handler)
118*cc02d7e2SAndroid Build Coastguard Worker    _LOGGER.setLevel(logging.INFO)
119*cc02d7e2SAndroid Build Coastguard Worker    main()
120