1# Copyright 2018 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Tests experimental TLS Session Resumption API""" 15 16import logging 17import pickle 18import unittest 19 20import grpc 21from grpc import _channel 22from grpc.experimental import session_cache 23 24from tests.unit import resources 25from tests.unit import test_common 26 27_REQUEST = b"\x00\x00\x00" 28_RESPONSE = b"\x00\x00\x00" 29 30_UNARY_UNARY = "/test/UnaryUnary" 31 32_SERVER_HOST_OVERRIDE = "foo.test.google.fr" 33_ID = "id" 34_ID_KEY = "id_key" 35_AUTH_CTX = "auth_ctx" 36 37_PRIVATE_KEY = resources.private_key() 38_CERTIFICATE_CHAIN = resources.certificate_chain() 39_TEST_ROOT_CERTIFICATES = resources.test_root_certificates() 40_SERVER_CERTS = ((_PRIVATE_KEY, _CERTIFICATE_CHAIN),) 41_PROPERTY_OPTIONS = ( 42 ( 43 "grpc.ssl_target_name_override", 44 _SERVER_HOST_OVERRIDE, 45 ), 46) 47 48 49def handle_unary_unary(request, servicer_context): 50 return pickle.dumps( 51 { 52 _ID: servicer_context.peer_identities(), 53 _ID_KEY: servicer_context.peer_identity_key(), 54 _AUTH_CTX: servicer_context.auth_context(), 55 } 56 ) 57 58 59def start_secure_server(): 60 handler = grpc.method_handlers_generic_handler( 61 "test", 62 {"UnaryUnary": grpc.unary_unary_rpc_method_handler(handle_unary_unary)}, 63 ) 64 server = test_common.test_server() 65 server.add_generic_rpc_handlers((handler,)) 66 server_cred = grpc.ssl_server_credentials(_SERVER_CERTS) 67 port = server.add_secure_port("[::]:0", server_cred) 68 server.start() 69 70 return server, port 71 72 73class SSLSessionCacheTest(unittest.TestCase): 74 def _do_one_shot_client_rpc( 75 self, channel_creds, channel_options, port, expect_ssl_session_reused 76 ): 77 channel = grpc.secure_channel( 78 "localhost:{}".format(port), channel_creds, options=channel_options 79 ) 80 response = channel.unary_unary( 81 _UNARY_UNARY, 82 _registered_method=True, 83 )(_REQUEST) 84 auth_data = pickle.loads(response) 85 self.assertEqual( 86 expect_ssl_session_reused, 87 auth_data[_AUTH_CTX]["ssl_session_reused"], 88 ) 89 channel.close() 90 91 def testSSLSessionCacheLRU(self): 92 server_1, port_1 = start_secure_server() 93 94 cache = session_cache.ssl_session_cache_lru(1) 95 channel_creds = grpc.ssl_channel_credentials( 96 root_certificates=_TEST_ROOT_CERTIFICATES 97 ) 98 channel_options = _PROPERTY_OPTIONS + ( 99 ("grpc.ssl_session_cache", cache), 100 ) 101 102 # Initial connection has no session to resume 103 self._do_one_shot_client_rpc( 104 channel_creds, 105 channel_options, 106 port_1, 107 expect_ssl_session_reused=[b"false"], 108 ) 109 110 # Connection to server_1 resumes from initial session 111 self._do_one_shot_client_rpc( 112 channel_creds, 113 channel_options, 114 port_1, 115 expect_ssl_session_reused=[b"true"], 116 ) 117 118 # Connection to a different server with the same name overwrites the cache entry 119 server_2, port_2 = start_secure_server() 120 self._do_one_shot_client_rpc( 121 channel_creds, 122 channel_options, 123 port_2, 124 expect_ssl_session_reused=[b"false"], 125 ) 126 self._do_one_shot_client_rpc( 127 channel_creds, 128 channel_options, 129 port_2, 130 expect_ssl_session_reused=[b"true"], 131 ) 132 server_2.stop(None) 133 134 # Connection to server_1 now falls back to full TLS handshake 135 self._do_one_shot_client_rpc( 136 channel_creds, 137 channel_options, 138 port_1, 139 expect_ssl_session_reused=[b"false"], 140 ) 141 142 # Re-creating server_1 causes old sessions to become invalid 143 server_1.stop(None) 144 server_1, port_1 = start_secure_server() 145 146 # Old sessions should no longer be valid 147 self._do_one_shot_client_rpc( 148 channel_creds, 149 channel_options, 150 port_1, 151 expect_ssl_session_reused=[b"false"], 152 ) 153 154 # Resumption should work for subsequent connections 155 self._do_one_shot_client_rpc( 156 channel_creds, 157 channel_options, 158 port_1, 159 expect_ssl_session_reused=[b"true"], 160 ) 161 server_1.stop(None) 162 163 164if __name__ == "__main__": 165 logging.basicConfig() 166 unittest.main(verbosity=2) 167