xref: /aosp_15_r20/external/grpc-grpc/examples/python/cancellation/server.py (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1*cc02d7e2SAndroid Build Coastguard Worker# Copyright 2019 the gRPC authors.
2*cc02d7e2SAndroid Build Coastguard Worker#
3*cc02d7e2SAndroid Build Coastguard Worker# Licensed under the Apache License, Version 2.0 (the "License");
4*cc02d7e2SAndroid Build Coastguard Worker# you may not use this file except in compliance with the License.
5*cc02d7e2SAndroid Build Coastguard Worker# You may obtain a copy of the License at
6*cc02d7e2SAndroid Build Coastguard Worker#
7*cc02d7e2SAndroid Build Coastguard Worker#     http://www.apache.org/licenses/LICENSE-2.0
8*cc02d7e2SAndroid Build Coastguard Worker#
9*cc02d7e2SAndroid Build Coastguard Worker# Unless required by applicable law or agreed to in writing, software
10*cc02d7e2SAndroid Build Coastguard Worker# distributed under the License is distributed on an "AS IS" BASIS,
11*cc02d7e2SAndroid Build Coastguard Worker# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12*cc02d7e2SAndroid Build Coastguard Worker# See the License for the specific language governing permissions and
13*cc02d7e2SAndroid Build Coastguard Worker# limitations under the License.
14*cc02d7e2SAndroid Build Coastguard Worker"""An example of cancelling requests in gRPC."""
15*cc02d7e2SAndroid Build Coastguard Worker
16*cc02d7e2SAndroid Build Coastguard Workerfrom __future__ import absolute_import
17*cc02d7e2SAndroid Build Coastguard Workerfrom __future__ import division
18*cc02d7e2SAndroid Build Coastguard Workerfrom __future__ import print_function
19*cc02d7e2SAndroid Build Coastguard Worker
20*cc02d7e2SAndroid Build Coastguard Workerimport argparse
21*cc02d7e2SAndroid Build Coastguard Workerfrom concurrent import futures
22*cc02d7e2SAndroid Build Coastguard Workerimport logging
23*cc02d7e2SAndroid Build Coastguard Workerimport threading
24*cc02d7e2SAndroid Build Coastguard Worker
25*cc02d7e2SAndroid Build Coastguard Workerimport grpc
26*cc02d7e2SAndroid Build Coastguard Workerimport search
27*cc02d7e2SAndroid Build Coastguard Worker
28*cc02d7e2SAndroid Build Coastguard Workerfrom examples.python.cancellation import hash_name_pb2
29*cc02d7e2SAndroid Build Coastguard Workerfrom examples.python.cancellation import hash_name_pb2_grpc
30*cc02d7e2SAndroid Build Coastguard Worker
31*cc02d7e2SAndroid Build Coastguard Worker_LOGGER = logging.getLogger(__name__)
32*cc02d7e2SAndroid Build Coastguard Worker_SERVER_HOST = "localhost"
33*cc02d7e2SAndroid Build Coastguard Worker
34*cc02d7e2SAndroid Build Coastguard Worker_DESCRIPTION = "A server for finding hashes similar to names."
35*cc02d7e2SAndroid Build Coastguard Worker
36*cc02d7e2SAndroid Build Coastguard Worker
37*cc02d7e2SAndroid Build Coastguard Workerclass HashFinder(hash_name_pb2_grpc.HashFinderServicer):
38*cc02d7e2SAndroid Build Coastguard Worker    def __init__(self, maximum_hashes):
39*cc02d7e2SAndroid Build Coastguard Worker        super(HashFinder, self).__init__()
40*cc02d7e2SAndroid Build Coastguard Worker        self._maximum_hashes = maximum_hashes
41*cc02d7e2SAndroid Build Coastguard Worker
42*cc02d7e2SAndroid Build Coastguard Worker    def Find(self, request, context):
43*cc02d7e2SAndroid Build Coastguard Worker        stop_event = threading.Event()
44*cc02d7e2SAndroid Build Coastguard Worker
45*cc02d7e2SAndroid Build Coastguard Worker        def on_rpc_done():
46*cc02d7e2SAndroid Build Coastguard Worker            _LOGGER.debug("Attempting to regain servicer thread.")
47*cc02d7e2SAndroid Build Coastguard Worker            stop_event.set()
48*cc02d7e2SAndroid Build Coastguard Worker
49*cc02d7e2SAndroid Build Coastguard Worker        context.add_callback(on_rpc_done)
50*cc02d7e2SAndroid Build Coastguard Worker        candidates = []
51*cc02d7e2SAndroid Build Coastguard Worker        try:
52*cc02d7e2SAndroid Build Coastguard Worker            candidates = list(
53*cc02d7e2SAndroid Build Coastguard Worker                search.search(
54*cc02d7e2SAndroid Build Coastguard Worker                    request.desired_name,
55*cc02d7e2SAndroid Build Coastguard Worker                    request.ideal_hamming_distance,
56*cc02d7e2SAndroid Build Coastguard Worker                    stop_event,
57*cc02d7e2SAndroid Build Coastguard Worker                    self._maximum_hashes,
58*cc02d7e2SAndroid Build Coastguard Worker                )
59*cc02d7e2SAndroid Build Coastguard Worker            )
60*cc02d7e2SAndroid Build Coastguard Worker        except search.ResourceLimitExceededError:
61*cc02d7e2SAndroid Build Coastguard Worker            _LOGGER.info("Cancelling RPC due to exhausted resources.")
62*cc02d7e2SAndroid Build Coastguard Worker            context.cancel()
63*cc02d7e2SAndroid Build Coastguard Worker        _LOGGER.debug("Servicer thread returning.")
64*cc02d7e2SAndroid Build Coastguard Worker        if not candidates:
65*cc02d7e2SAndroid Build Coastguard Worker            return hash_name_pb2.HashNameResponse()
66*cc02d7e2SAndroid Build Coastguard Worker        return candidates[-1]
67*cc02d7e2SAndroid Build Coastguard Worker
68*cc02d7e2SAndroid Build Coastguard Worker    def FindRange(self, request, context):
69*cc02d7e2SAndroid Build Coastguard Worker        stop_event = threading.Event()
70*cc02d7e2SAndroid Build Coastguard Worker
71*cc02d7e2SAndroid Build Coastguard Worker        def on_rpc_done():
72*cc02d7e2SAndroid Build Coastguard Worker            _LOGGER.debug("Attempting to regain servicer thread.")
73*cc02d7e2SAndroid Build Coastguard Worker            stop_event.set()
74*cc02d7e2SAndroid Build Coastguard Worker
75*cc02d7e2SAndroid Build Coastguard Worker        context.add_callback(on_rpc_done)
76*cc02d7e2SAndroid Build Coastguard Worker        secret_generator = search.search(
77*cc02d7e2SAndroid Build Coastguard Worker            request.desired_name,
78*cc02d7e2SAndroid Build Coastguard Worker            request.ideal_hamming_distance,
79*cc02d7e2SAndroid Build Coastguard Worker            stop_event,
80*cc02d7e2SAndroid Build Coastguard Worker            self._maximum_hashes,
81*cc02d7e2SAndroid Build Coastguard Worker            interesting_hamming_distance=request.interesting_hamming_distance,
82*cc02d7e2SAndroid Build Coastguard Worker        )
83*cc02d7e2SAndroid Build Coastguard Worker        try:
84*cc02d7e2SAndroid Build Coastguard Worker            for candidate in secret_generator:
85*cc02d7e2SAndroid Build Coastguard Worker                yield candidate
86*cc02d7e2SAndroid Build Coastguard Worker        except search.ResourceLimitExceededError:
87*cc02d7e2SAndroid Build Coastguard Worker            _LOGGER.info("Cancelling RPC due to exhausted resources.")
88*cc02d7e2SAndroid Build Coastguard Worker            context.cancel()
89*cc02d7e2SAndroid Build Coastguard Worker        _LOGGER.debug("Regained servicer thread.")
90*cc02d7e2SAndroid Build Coastguard Worker
91*cc02d7e2SAndroid Build Coastguard Worker
92*cc02d7e2SAndroid Build Coastguard Workerdef _running_server(port, maximum_hashes):
93*cc02d7e2SAndroid Build Coastguard Worker    # We use only a single servicer thread here to demonstrate that, if managed
94*cc02d7e2SAndroid Build Coastguard Worker    # carefully, cancelled RPCs can need not continue occupying servicers
95*cc02d7e2SAndroid Build Coastguard Worker    # threads.
96*cc02d7e2SAndroid Build Coastguard Worker    server = grpc.server(
97*cc02d7e2SAndroid Build Coastguard Worker        futures.ThreadPoolExecutor(max_workers=1), maximum_concurrent_rpcs=1
98*cc02d7e2SAndroid Build Coastguard Worker    )
99*cc02d7e2SAndroid Build Coastguard Worker    hash_name_pb2_grpc.add_HashFinderServicer_to_server(
100*cc02d7e2SAndroid Build Coastguard Worker        HashFinder(maximum_hashes), server
101*cc02d7e2SAndroid Build Coastguard Worker    )
102*cc02d7e2SAndroid Build Coastguard Worker    address = "{}:{}".format(_SERVER_HOST, port)
103*cc02d7e2SAndroid Build Coastguard Worker    actual_port = server.add_insecure_port(address)
104*cc02d7e2SAndroid Build Coastguard Worker    server.start()
105*cc02d7e2SAndroid Build Coastguard Worker    print("Server listening at '{}'".format(address))
106*cc02d7e2SAndroid Build Coastguard Worker    return server
107*cc02d7e2SAndroid Build Coastguard Worker
108*cc02d7e2SAndroid Build Coastguard Worker
109*cc02d7e2SAndroid Build Coastguard Workerdef main():
110*cc02d7e2SAndroid Build Coastguard Worker    parser = argparse.ArgumentParser(description=_DESCRIPTION)
111*cc02d7e2SAndroid Build Coastguard Worker    parser.add_argument(
112*cc02d7e2SAndroid Build Coastguard Worker        "--port",
113*cc02d7e2SAndroid Build Coastguard Worker        type=int,
114*cc02d7e2SAndroid Build Coastguard Worker        default=50051,
115*cc02d7e2SAndroid Build Coastguard Worker        nargs="?",
116*cc02d7e2SAndroid Build Coastguard Worker        help="The port on which the server will listen.",
117*cc02d7e2SAndroid Build Coastguard Worker    )
118*cc02d7e2SAndroid Build Coastguard Worker    parser.add_argument(
119*cc02d7e2SAndroid Build Coastguard Worker        "--maximum-hashes",
120*cc02d7e2SAndroid Build Coastguard Worker        type=int,
121*cc02d7e2SAndroid Build Coastguard Worker        default=1000000,
122*cc02d7e2SAndroid Build Coastguard Worker        nargs="?",
123*cc02d7e2SAndroid Build Coastguard Worker        help="The maximum number of hashes to search before cancelling.",
124*cc02d7e2SAndroid Build Coastguard Worker    )
125*cc02d7e2SAndroid Build Coastguard Worker    args = parser.parse_args()
126*cc02d7e2SAndroid Build Coastguard Worker    server = _running_server(args.port, args.maximum_hashes)
127*cc02d7e2SAndroid Build Coastguard Worker    server.wait_for_termination()
128*cc02d7e2SAndroid Build Coastguard Worker
129*cc02d7e2SAndroid Build Coastguard Worker
130*cc02d7e2SAndroid Build Coastguard Workerif __name__ == "__main__":
131*cc02d7e2SAndroid Build Coastguard Worker    logging.basicConfig()
132*cc02d7e2SAndroid Build Coastguard Worker    main()
133