1# Copyright 2021 The gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Tests xDS server and channel credentials.""" 15 16from concurrent import futures 17import contextlib 18import logging 19import unittest 20 21import grpc 22import grpc.experimental 23 24from tests.unit import resources 25from tests.unit import test_common 26 27 28class _GenericHandler(grpc.GenericRpcHandler): 29 def service(self, handler_call_details): 30 return grpc.unary_unary_rpc_method_handler( 31 lambda request, unused_context: request 32 ) 33 34 35@contextlib.contextmanager 36def xds_channel_server_without_xds(server_fallback_creds): 37 server = grpc.server(futures.ThreadPoolExecutor()) 38 server.add_generic_rpc_handlers((_GenericHandler(),)) 39 server_server_fallback_creds = grpc.ssl_server_credentials( 40 ((resources.private_key(), resources.certificate_chain()),) 41 ) 42 server_creds = grpc.xds_server_credentials(server_fallback_creds) 43 port = server.add_secure_port("localhost:0", server_creds) 44 server.start() 45 try: 46 yield "localhost:{}".format(port) 47 finally: 48 server.stop(None) 49 50 51class XdsCredentialsTest(unittest.TestCase): 52 def test_xds_creds_fallback_ssl(self): 53 # Since there is no xDS server, the fallback credentials will be used. 54 # In this case, SSL credentials. 55 server_fallback_creds = grpc.ssl_server_credentials( 56 ((resources.private_key(), resources.certificate_chain()),) 57 ) 58 with xds_channel_server_without_xds( 59 server_fallback_creds 60 ) as server_address: 61 override_options = ( 62 ("grpc.ssl_target_name_override", "foo.test.google.fr"), 63 ) 64 channel_fallback_creds = grpc.ssl_channel_credentials( 65 root_certificates=resources.test_root_certificates(), 66 private_key=resources.private_key(), 67 certificate_chain=resources.certificate_chain(), 68 ) 69 channel_creds = grpc.xds_channel_credentials(channel_fallback_creds) 70 with grpc.secure_channel( 71 server_address, channel_creds, options=override_options 72 ) as channel: 73 request = b"abc" 74 response = channel.unary_unary( 75 "/test/method", 76 _registered_method=True, 77 )(request, wait_for_ready=True) 78 self.assertEqual(response, request) 79 80 def test_xds_creds_fallback_insecure(self): 81 # Since there is no xDS server, the fallback credentials will be used. 82 # In this case, insecure. 83 server_fallback_creds = grpc.insecure_server_credentials() 84 with xds_channel_server_without_xds( 85 server_fallback_creds 86 ) as server_address: 87 channel_fallback_creds = ( 88 grpc.experimental.insecure_channel_credentials() 89 ) 90 channel_creds = grpc.xds_channel_credentials(channel_fallback_creds) 91 with grpc.secure_channel(server_address, channel_creds) as channel: 92 request = b"abc" 93 response = channel.unary_unary( 94 "/test/method", 95 _registered_method=True, 96 )(request, wait_for_ready=True) 97 self.assertEqual(response, request) 98 99 def test_start_xds_server(self): 100 server = grpc.server(futures.ThreadPoolExecutor(), xds=True) 101 server.add_generic_rpc_handlers((_GenericHandler(),)) 102 server_fallback_creds = grpc.insecure_server_credentials() 103 server_creds = grpc.xds_server_credentials(server_fallback_creds) 104 port = server.add_secure_port("localhost:0", server_creds) 105 server.start() 106 server.stop(None) 107 # No exceptions thrown. A more comprehensive suite of tests will be 108 # provided by the interop tests. 109 110 111if __name__ == "__main__": 112 logging.basicConfig() 113 unittest.main() 114