1# Copyright 2021 The gRPC Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""A test framework built for urlMap related xDS test cases.""" 15 16import functools 17import inspect 18from typing import Any, Iterable, Mapping, Tuple 19 20from absl import flags 21from absl import logging 22 23from framework import xds_flags 24from framework import xds_k8s_flags 25import framework.helpers.rand 26from framework.infrastructure import gcp 27from framework.infrastructure import k8s 28from framework.infrastructure import traffic_director 29from framework.test_app.runners.k8s import k8s_xds_client_runner 30from framework.test_app.runners.k8s import k8s_xds_server_runner 31 32flags.adopt_module_key_flags(xds_flags) 33flags.adopt_module_key_flags(xds_k8s_flags) 34 35STRATEGY = flags.DEFINE_enum('strategy', 36 default='reuse', 37 enum_values=['create', 'keep', 'reuse'], 38 help='Strategy of GCP resources management') 39 40# Type alias 41_KubernetesServerRunner = k8s_xds_server_runner.KubernetesServerRunner 42_KubernetesClientRunner = k8s_xds_client_runner.KubernetesClientRunner 43UrlMapType = Any 44HostRule = Any 45PathMatcher = Any 46 47 48class _UrlMapChangeAggregator: 49 """Where all the urlMap change happens.""" 50 51 def __init__(self, url_map_name: str): 52 self._map = { 53 "name": url_map_name, 54 "defaultService": GcpResourceManager().default_backend_service(), 55 "hostRules": [], 56 "pathMatchers": [], 57 } 58 59 def get_map(self) -> UrlMapType: 60 return self._map 61 62 def apply_change(self, test_case: 'XdsUrlMapTestCase') -> None: 63 logging.info('Apply urlMap change for test case: %s.%s', 64 test_case.short_module_name, test_case.__name__) 65 url_map_parts = test_case.url_map_change( 66 *self._get_test_case_url_map(test_case)) 67 self._set_test_case_url_map(*url_map_parts) 68 69 @staticmethod 70 def _get_test_case_url_map( 71 test_case: 'XdsUrlMapTestCase') -> Tuple[HostRule, PathMatcher]: 72 host_rule = { 73 "hosts": [test_case.hostname()], 74 "pathMatcher": test_case.path_matcher_name(), 75 } 76 path_matcher = { 77 "name": test_case.path_matcher_name(), 78 "defaultService": GcpResourceManager().default_backend_service(), 79 } 80 return host_rule, path_matcher 81 82 def _set_test_case_url_map(self, host_rule: HostRule, 83 path_matcher: PathMatcher) -> None: 84 self._map["hostRules"].append(host_rule) 85 self._map["pathMatchers"].append(path_matcher) 86 87 88def _package_flags() -> Mapping[str, Any]: 89 """Automatically parse Abseil flags into a dictionary. 90 91 Abseil flag is only available after the Abseil app initialization. If we use 92 __new__ in our metaclass, the flag value parse will happen during the 93 initialization of modules, hence will fail. That's why we are using __call__ 94 to inject metaclass magics, and the flag parsing will be delayed until the 95 class is about to be instantiated. 96 """ 97 res = {} 98 for flag_module in [xds_flags, xds_k8s_flags]: 99 for key, value in inspect.getmembers(flag_module): 100 if isinstance(value, flags.FlagHolder): 101 res[key.lower()] = value.value 102 res['strategy'] = STRATEGY.value 103 return res 104 105 106class _MetaSingletonAndAbslFlags(type): 107 """Ensures singleton and injects flag values.""" 108 109 # Allow different subclasses to create different singletons. 110 _instances = {} 111 # But we only parse Abseil flags once. 112 _flags = None 113 114 def __call__(cls, *args, **kwargs): 115 if cls not in cls._instances: 116 if cls._flags is None: 117 cls._flags = _package_flags() 118 obj = super().__call__(cls._flags, *args, **kwargs) 119 cls._instances[cls] = obj 120 return obj 121 return cls._instances[cls] 122 123 124class GcpResourceManager(metaclass=_MetaSingletonAndAbslFlags): 125 """Manages the lifecycle of GCP resources. 126 127 The GCP resources including: 128 - 3 K8s deployment (client, default backends, alternative backends) 129 - Full set of the Traffic Director stuff 130 - Merged gigantic urlMap from all imported test cases 131 132 All resources are intended to be used across test cases and multiple runs 133 (except the client K8s deployment). 134 """ 135 136 # This class dynamically set, so disable "no-member" check. 137 # pylint: disable=no-member 138 139 def __init__(self, absl_flags: Mapping[str, Any] = None): 140 if absl_flags is not None: 141 for key in absl_flags: 142 setattr(self, key, absl_flags[key]) 143 # Pick a client_namespace_suffix if not set 144 if getattr(self, 'resource_suffix', None) is None: 145 self.resource_suffix = "" 146 else: 147 raise NotImplementedError( 148 'Predefined resource_suffix is not supported for UrlMap tests') 149 logging.info('GcpResourceManager: resource prefix=%s, suffix=%s', 150 self.resource_prefix, self.resource_suffix) 151 152 # Must be called before KubernetesApiManager or GcpApiManager init. 153 xds_flags.set_socket_default_timeout_from_flag() 154 155 # API managers 156 self.k8s_api_manager = k8s.KubernetesApiManager(self.kube_context) 157 self.gcp_api_manager = gcp.api.GcpApiManager() 158 self.td = traffic_director.TrafficDirectorManager( 159 self.gcp_api_manager, 160 self.project, 161 resource_prefix=self.resource_prefix, 162 resource_suffix=(self.resource_suffix or ""), 163 network=self.network, 164 compute_api_version=self.compute_api_version, 165 ) 166 # Kubernetes namespace 167 self.k8s_namespace = k8s.KubernetesNamespace(self.k8s_api_manager, 168 self.resource_prefix) 169 # Kubernetes Test Servers 170 self.test_server_runner = _KubernetesServerRunner( 171 self.k8s_namespace, 172 deployment_name=self.server_name, 173 image_name=self.server_image, 174 gcp_project=self.project, 175 gcp_api_manager=self.gcp_api_manager, 176 gcp_service_account=self.gcp_service_account, 177 td_bootstrap_image=self.td_bootstrap_image, 178 xds_server_uri=self.xds_server_uri, 179 network=self.network, 180 enable_workload_identity=self.enable_workload_identity) 181 self.test_server_alternative_runner = _KubernetesServerRunner( 182 self.k8s_namespace, 183 deployment_name=self.server_name + '-alternative', 184 image_name=self.server_image, 185 gcp_project=self.project, 186 gcp_api_manager=self.gcp_api_manager, 187 gcp_service_account=self.gcp_service_account, 188 td_bootstrap_image=self.td_bootstrap_image, 189 xds_server_uri=self.xds_server_uri, 190 network=self.network, 191 enable_workload_identity=self.enable_workload_identity, 192 reuse_namespace=True) 193 self.test_server_affinity_runner = _KubernetesServerRunner( 194 self.k8s_namespace, 195 deployment_name=self.server_name + '-affinity', 196 image_name=self.server_image, 197 gcp_project=self.project, 198 gcp_api_manager=self.gcp_api_manager, 199 gcp_service_account=self.gcp_service_account, 200 td_bootstrap_image=self.td_bootstrap_image, 201 xds_server_uri=self.xds_server_uri, 202 network=self.network, 203 enable_workload_identity=self.enable_workload_identity, 204 reuse_namespace=True) 205 logging.info('Strategy of GCP resources management: %s', self.strategy) 206 207 def create_test_client_runner(self): 208 if self.resource_suffix: 209 client_namespace_suffix = self.resource_suffix 210 else: 211 client_namespace_suffix = framework.helpers.rand.random_resource_suffix( 212 ) 213 logging.info('GcpResourceManager: client_namespace_suffix=%s', 214 client_namespace_suffix) 215 # Kubernetes Test Client 216 namespace_name = _KubernetesClientRunner.make_namespace_name( 217 self.resource_prefix, client_namespace_suffix) 218 return _KubernetesClientRunner( 219 k8s.KubernetesNamespace(self.k8s_api_manager, namespace_name), 220 deployment_name=self.client_name, 221 image_name=self.client_image, 222 gcp_project=self.project, 223 gcp_api_manager=self.gcp_api_manager, 224 gcp_service_account=self.gcp_service_account, 225 td_bootstrap_image=self.td_bootstrap_image, 226 xds_server_uri=self.xds_server_uri, 227 network=self.network, 228 debug_use_port_forwarding=self.debug_use_port_forwarding, 229 enable_workload_identity=self.enable_workload_identity, 230 stats_port=self.client_port) 231 232 def _pre_cleanup(self): 233 # Cleanup existing debris 234 logging.info('GcpResourceManager: pre clean-up') 235 self.td.cleanup(force=True) 236 self.test_server_runner.delete_namespace() 237 238 def setup(self, test_case_classes: Iterable['XdsUrlMapTestCase']) -> None: 239 if self.strategy not in ['create', 'keep']: 240 logging.info('GcpResourceManager: skipping setup for strategy [%s]', 241 self.strategy) 242 return 243 # Clean up debris from previous runs 244 self._pre_cleanup() 245 # Start creating GCP resources 246 logging.info('GcpResourceManager: start setup') 247 # Firewall 248 if self.ensure_firewall: 249 self.td.create_firewall_rule( 250 allowed_ports=self.firewall_allowed_ports) 251 # Health Checks 252 self.td.create_health_check() 253 # Backend Services 254 self.td.create_backend_service() 255 self.td.create_alternative_backend_service() 256 self.td.create_affinity_backend_service() 257 # Construct UrlMap from test classes 258 aggregator = _UrlMapChangeAggregator( 259 url_map_name=self.td.make_resource_name(self.td.URL_MAP_NAME)) 260 for test_case_class in test_case_classes: 261 aggregator.apply_change(test_case_class) 262 final_url_map = aggregator.get_map() 263 # UrlMap 264 self.td.create_url_map_with_content(final_url_map) 265 # Target Proxy 266 self.td.create_target_proxy() 267 # Forwarding Rule 268 self.td.create_forwarding_rule(self.server_xds_port) 269 # Kubernetes Test Server 270 self.test_server_runner.run( 271 test_port=self.server_port, 272 maintenance_port=self.server_maintenance_port) 273 # Kubernetes Test Server Alternative 274 self.test_server_alternative_runner.run( 275 test_port=self.server_port, 276 maintenance_port=self.server_maintenance_port) 277 # Kubernetes Test Server Affinity. 3 endpoints to test that only the 278 # picked sub-channel is connected. 279 self.test_server_affinity_runner.run( 280 test_port=self.server_port, 281 maintenance_port=self.server_maintenance_port, 282 replica_count=3) 283 # Add backend to default backend service 284 neg_name, neg_zones = self.k8s_namespace.get_service_neg( 285 self.test_server_runner.service_name, self.server_port) 286 self.td.backend_service_add_neg_backends(neg_name, neg_zones) 287 # Add backend to alternative backend service 288 neg_name_alt, neg_zones_alt = self.k8s_namespace.get_service_neg( 289 self.test_server_alternative_runner.service_name, self.server_port) 290 self.td.alternative_backend_service_add_neg_backends( 291 neg_name_alt, neg_zones_alt) 292 # Add backend to affinity backend service 293 neg_name_affinity, neg_zones_affinity = self.k8s_namespace.get_service_neg( 294 self.test_server_affinity_runner.service_name, self.server_port) 295 self.td.affinity_backend_service_add_neg_backends( 296 neg_name_affinity, neg_zones_affinity) 297 # Wait for healthy backends 298 self.td.wait_for_backends_healthy_status() 299 self.td.wait_for_alternative_backends_healthy_status() 300 self.td.wait_for_affinity_backends_healthy_status() 301 302 def cleanup(self) -> None: 303 if self.strategy not in ['create']: 304 logging.info( 305 'GcpResourceManager: skipping tear down for strategy [%s]', 306 self.strategy) 307 return 308 logging.info('GcpResourceManager: start tear down') 309 if hasattr(self, 'td'): 310 self.td.cleanup(force=True) 311 if hasattr(self, 'test_server_runner'): 312 self.test_server_runner.cleanup(force=True) 313 if hasattr(self, 'test_server_alternative_runner'): 314 self.test_server_alternative_runner.cleanup(force=True, 315 force_namespace=True) 316 if hasattr(self, 'test_server_affinity_runner'): 317 self.test_server_affinity_runner.cleanup(force=True, 318 force_namespace=True) 319 320 @functools.lru_cache(None) 321 def default_backend_service(self) -> str: 322 """Returns default backend service URL.""" 323 self.td.load_backend_service() 324 return self.td.backend_service.url 325 326 @functools.lru_cache(None) 327 def alternative_backend_service(self) -> str: 328 """Returns alternative backend service URL.""" 329 self.td.load_alternative_backend_service() 330 return self.td.alternative_backend_service.url 331 332 @functools.lru_cache(None) 333 def affinity_backend_service(self) -> str: 334 """Returns affinity backend service URL.""" 335 self.td.load_affinity_backend_service() 336 return self.td.affinity_backend_service.url 337