xref: /aosp_15_r20/external/grpc-grpc/examples/python/cancellation/search.py (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1# Copyright 2019 the gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""A search algorithm over the space of all bytestrings."""
15
16from __future__ import absolute_import
17from __future__ import division
18from __future__ import print_function
19
20import base64
21import hashlib
22import itertools
23import logging
24import struct
25
26from examples.python.cancellation import hash_name_pb2
27
28_LOGGER = logging.getLogger(__name__)
29_BYTE_MAX = 255
30
31
32def _get_hamming_distance(a, b):
33    """Calculates hamming distance between strings of equal length."""
34    distance = 0
35    for char_a, char_b in zip(a, b):
36        if char_a != char_b:
37            distance += 1
38    return distance
39
40
41def _get_substring_hamming_distance(candidate, target):
42    """Calculates the minimum hamming distance between between the target
43        and any substring of the candidate.
44
45    Args:
46      candidate: The string whose substrings will be tested.
47      target: The target string.
48
49    Returns:
50      The minimum Hamming distance between candidate and target.
51    """
52    min_distance = None
53    if len(target) > len(candidate):
54        raise ValueError("Candidate must be at least as long as target.")
55    for i in range(len(candidate) - len(target) + 1):
56        distance = _get_hamming_distance(
57            candidate[i : i + len(target)].lower(), target.lower()
58        )
59        if min_distance is None or distance < min_distance:
60            min_distance = distance
61    return min_distance
62
63
64def _get_hash(secret):
65    hasher = hashlib.sha1()
66    hasher.update(secret)
67    return base64.b64encode(hasher.digest()).decode("ascii")
68
69
70class ResourceLimitExceededError(Exception):
71    """Signifies the request has exceeded configured limits."""
72
73
74def _bytestrings_of_length(length):
75    """Generates a stream containing all bytestrings of a given length.
76
77    Args:
78      length: A positive integer length.
79
80    Yields:
81      All bytestrings of length `length`.
82    """
83    for digits in itertools.product(range(_BYTE_MAX), repeat=length):
84        yield b"".join(struct.pack("B", i) for i in digits)
85
86
87def _all_bytestrings():
88    """Generates a stream containing all possible bytestrings.
89
90    This generator does not terminate.
91
92    Yields:
93      All bytestrings in ascending order of length.
94    """
95    for bytestring in itertools.chain.from_iterable(
96        _bytestrings_of_length(length) for length in itertools.count()
97    ):
98        yield bytestring
99
100
101def search(
102    target,
103    ideal_distance,
104    stop_event,
105    maximum_hashes,
106    interesting_hamming_distance=None,
107):
108    """Find candidate strings.
109
110    Search through the space of all bytestrings, in order of increasing length,
111    indefinitely, until a hash with a Hamming distance of `maximum_distance` or
112    less has been found.
113
114    Args:
115      target: The search string.
116      ideal_distance: The desired Hamming distance.
117      stop_event: An event indicating whether the RPC should terminate.
118      maximum_hashes: The maximum number of hashes to check before stopping.
119      interesting_hamming_distance: If specified, strings with a Hamming
120        distance from the target below this value will be yielded.
121
122    Yields:
123      Instances  of HashNameResponse. The final entry in the stream will be of
124        `maximum_distance` Hamming distance or less from the target string,
125        while all others will be of less than `interesting_hamming_distance`.
126
127    Raises:
128      ResourceLimitExceededError: If the computation exceeds `maximum_hashes`
129        iterations.
130    """
131    hashes_computed = 0
132    for secret in _all_bytestrings():
133        if stop_event.is_set():
134            return
135        candidate_hash = _get_hash(secret)
136        distance = _get_substring_hamming_distance(candidate_hash, target)
137        if (
138            interesting_hamming_distance is not None
139            and distance <= interesting_hamming_distance
140        ):
141            # Surface interesting candidates, but don't stop.
142            yield hash_name_pb2.HashNameResponse(
143                secret=base64.b64encode(secret),
144                hashed_name=candidate_hash,
145                hamming_distance=distance,
146            )
147        elif distance <= ideal_distance:
148            # Yield ideal candidate and end the stream.
149            yield hash_name_pb2.HashNameResponse(
150                secret=base64.b64encode(secret),
151                hashed_name=candidate_hash,
152                hamming_distance=distance,
153            )
154            return
155        hashes_computed += 1
156        if hashes_computed == maximum_hashes:
157            raise ResourceLimitExceededError()
158