xref: /aosp_15_r20/external/grpc-grpc/src/python/grpcio_tests/tests/unit/_session_cache_test.py (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1# Copyright 2018 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Tests experimental TLS Session Resumption API"""
15
16import logging
17import pickle
18import unittest
19
20import grpc
21from grpc import _channel
22from grpc.experimental import session_cache
23
24from tests.unit import resources
25from tests.unit import test_common
26
27_REQUEST = b"\x00\x00\x00"
28_RESPONSE = b"\x00\x00\x00"
29
30_UNARY_UNARY = "/test/UnaryUnary"
31
32_SERVER_HOST_OVERRIDE = "foo.test.google.fr"
33_ID = "id"
34_ID_KEY = "id_key"
35_AUTH_CTX = "auth_ctx"
36
37_PRIVATE_KEY = resources.private_key()
38_CERTIFICATE_CHAIN = resources.certificate_chain()
39_TEST_ROOT_CERTIFICATES = resources.test_root_certificates()
40_SERVER_CERTS = ((_PRIVATE_KEY, _CERTIFICATE_CHAIN),)
41_PROPERTY_OPTIONS = (
42    (
43        "grpc.ssl_target_name_override",
44        _SERVER_HOST_OVERRIDE,
45    ),
46)
47
48
49def handle_unary_unary(request, servicer_context):
50    return pickle.dumps(
51        {
52            _ID: servicer_context.peer_identities(),
53            _ID_KEY: servicer_context.peer_identity_key(),
54            _AUTH_CTX: servicer_context.auth_context(),
55        }
56    )
57
58
59def start_secure_server():
60    handler = grpc.method_handlers_generic_handler(
61        "test",
62        {"UnaryUnary": grpc.unary_unary_rpc_method_handler(handle_unary_unary)},
63    )
64    server = test_common.test_server()
65    server.add_generic_rpc_handlers((handler,))
66    server_cred = grpc.ssl_server_credentials(_SERVER_CERTS)
67    port = server.add_secure_port("[::]:0", server_cred)
68    server.start()
69
70    return server, port
71
72
73class SSLSessionCacheTest(unittest.TestCase):
74    def _do_one_shot_client_rpc(
75        self, channel_creds, channel_options, port, expect_ssl_session_reused
76    ):
77        channel = grpc.secure_channel(
78            "localhost:{}".format(port), channel_creds, options=channel_options
79        )
80        response = channel.unary_unary(
81            _UNARY_UNARY,
82            _registered_method=True,
83        )(_REQUEST)
84        auth_data = pickle.loads(response)
85        self.assertEqual(
86            expect_ssl_session_reused,
87            auth_data[_AUTH_CTX]["ssl_session_reused"],
88        )
89        channel.close()
90
91    def testSSLSessionCacheLRU(self):
92        server_1, port_1 = start_secure_server()
93
94        cache = session_cache.ssl_session_cache_lru(1)
95        channel_creds = grpc.ssl_channel_credentials(
96            root_certificates=_TEST_ROOT_CERTIFICATES
97        )
98        channel_options = _PROPERTY_OPTIONS + (
99            ("grpc.ssl_session_cache", cache),
100        )
101
102        # Initial connection has no session to resume
103        self._do_one_shot_client_rpc(
104            channel_creds,
105            channel_options,
106            port_1,
107            expect_ssl_session_reused=[b"false"],
108        )
109
110        # Connection to server_1 resumes from initial session
111        self._do_one_shot_client_rpc(
112            channel_creds,
113            channel_options,
114            port_1,
115            expect_ssl_session_reused=[b"true"],
116        )
117
118        # Connection to a different server with the same name overwrites the cache entry
119        server_2, port_2 = start_secure_server()
120        self._do_one_shot_client_rpc(
121            channel_creds,
122            channel_options,
123            port_2,
124            expect_ssl_session_reused=[b"false"],
125        )
126        self._do_one_shot_client_rpc(
127            channel_creds,
128            channel_options,
129            port_2,
130            expect_ssl_session_reused=[b"true"],
131        )
132        server_2.stop(None)
133
134        # Connection to server_1 now falls back to full TLS handshake
135        self._do_one_shot_client_rpc(
136            channel_creds,
137            channel_options,
138            port_1,
139            expect_ssl_session_reused=[b"false"],
140        )
141
142        # Re-creating server_1 causes old sessions to become invalid
143        server_1.stop(None)
144        server_1, port_1 = start_secure_server()
145
146        # Old sessions should no longer be valid
147        self._do_one_shot_client_rpc(
148            channel_creds,
149            channel_options,
150            port_1,
151            expect_ssl_session_reused=[b"false"],
152        )
153
154        # Resumption should work for subsequent connections
155        self._do_one_shot_client_rpc(
156            channel_creds,
157            channel_options,
158            port_1,
159            expect_ssl_session_reused=[b"true"],
160        )
161        server_1.stop(None)
162
163
164if __name__ == "__main__":
165    logging.basicConfig()
166    unittest.main(verbosity=2)
167