1# Copyright 2021 The gRPC Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""A test framework built for urlMap related xDS test cases."""
15
16import functools
17import inspect
18from typing import Any, Iterable, Mapping, Tuple
19
20from absl import flags
21from absl import logging
22
23from framework import xds_flags
24from framework import xds_k8s_flags
25import framework.helpers.rand
26from framework.infrastructure import gcp
27from framework.infrastructure import k8s
28from framework.infrastructure import traffic_director
29from framework.test_app.runners.k8s import k8s_xds_client_runner
30from framework.test_app.runners.k8s import k8s_xds_server_runner
31
32flags.adopt_module_key_flags(xds_flags)
33flags.adopt_module_key_flags(xds_k8s_flags)
34
35STRATEGY = flags.DEFINE_enum('strategy',
36                             default='reuse',
37                             enum_values=['create', 'keep', 'reuse'],
38                             help='Strategy of GCP resources management')
39
40# Type alias
41_KubernetesServerRunner = k8s_xds_server_runner.KubernetesServerRunner
42_KubernetesClientRunner = k8s_xds_client_runner.KubernetesClientRunner
43UrlMapType = Any
44HostRule = Any
45PathMatcher = Any
46
47
48class _UrlMapChangeAggregator:
49    """Where all the urlMap change happens."""
50
51    def __init__(self, url_map_name: str):
52        self._map = {
53            "name": url_map_name,
54            "defaultService": GcpResourceManager().default_backend_service(),
55            "hostRules": [],
56            "pathMatchers": [],
57        }
58
59    def get_map(self) -> UrlMapType:
60        return self._map
61
62    def apply_change(self, test_case: 'XdsUrlMapTestCase') -> None:
63        logging.info('Apply urlMap change for test case: %s.%s',
64                     test_case.short_module_name, test_case.__name__)
65        url_map_parts = test_case.url_map_change(
66            *self._get_test_case_url_map(test_case))
67        self._set_test_case_url_map(*url_map_parts)
68
69    @staticmethod
70    def _get_test_case_url_map(
71            test_case: 'XdsUrlMapTestCase') -> Tuple[HostRule, PathMatcher]:
72        host_rule = {
73            "hosts": [test_case.hostname()],
74            "pathMatcher": test_case.path_matcher_name(),
75        }
76        path_matcher = {
77            "name": test_case.path_matcher_name(),
78            "defaultService": GcpResourceManager().default_backend_service(),
79        }
80        return host_rule, path_matcher
81
82    def _set_test_case_url_map(self, host_rule: HostRule,
83                               path_matcher: PathMatcher) -> None:
84        self._map["hostRules"].append(host_rule)
85        self._map["pathMatchers"].append(path_matcher)
86
87
88def _package_flags() -> Mapping[str, Any]:
89    """Automatically parse Abseil flags into a dictionary.
90
91    Abseil flag is only available after the Abseil app initialization. If we use
92    __new__ in our metaclass, the flag value parse will happen during the
93    initialization of modules, hence will fail. That's why we are using __call__
94    to inject metaclass magics, and the flag parsing will be delayed until the
95    class is about to be instantiated.
96    """
97    res = {}
98    for flag_module in [xds_flags, xds_k8s_flags]:
99        for key, value in inspect.getmembers(flag_module):
100            if isinstance(value, flags.FlagHolder):
101                res[key.lower()] = value.value
102    res['strategy'] = STRATEGY.value
103    return res
104
105
106class _MetaSingletonAndAbslFlags(type):
107    """Ensures singleton and injects flag values."""
108
109    # Allow different subclasses to create different singletons.
110    _instances = {}
111    # But we only parse Abseil flags once.
112    _flags = None
113
114    def __call__(cls, *args, **kwargs):
115        if cls not in cls._instances:
116            if cls._flags is None:
117                cls._flags = _package_flags()
118            obj = super().__call__(cls._flags, *args, **kwargs)
119            cls._instances[cls] = obj
120            return obj
121        return cls._instances[cls]
122
123
124class GcpResourceManager(metaclass=_MetaSingletonAndAbslFlags):
125    """Manages the lifecycle of GCP resources.
126
127    The GCP resources including:
128        - 3 K8s deployment (client, default backends, alternative backends)
129        - Full set of the Traffic Director stuff
130        - Merged gigantic urlMap from all imported test cases
131
132    All resources are intended to be used across test cases and multiple runs
133    (except the client K8s deployment).
134    """
135
136    # This class dynamically set, so disable "no-member" check.
137    # pylint: disable=no-member
138
139    def __init__(self, absl_flags: Mapping[str, Any] = None):
140        if absl_flags is not None:
141            for key in absl_flags:
142                setattr(self, key, absl_flags[key])
143        # Pick a client_namespace_suffix if not set
144        if getattr(self, 'resource_suffix', None) is None:
145            self.resource_suffix = ""
146        else:
147            raise NotImplementedError(
148                'Predefined resource_suffix is not supported for UrlMap tests')
149        logging.info('GcpResourceManager: resource prefix=%s, suffix=%s',
150                     self.resource_prefix, self.resource_suffix)
151
152        # Must be called before KubernetesApiManager or GcpApiManager init.
153        xds_flags.set_socket_default_timeout_from_flag()
154
155        # API managers
156        self.k8s_api_manager = k8s.KubernetesApiManager(self.kube_context)
157        self.gcp_api_manager = gcp.api.GcpApiManager()
158        self.td = traffic_director.TrafficDirectorManager(
159            self.gcp_api_manager,
160            self.project,
161            resource_prefix=self.resource_prefix,
162            resource_suffix=(self.resource_suffix or ""),
163            network=self.network,
164            compute_api_version=self.compute_api_version,
165        )
166        # Kubernetes namespace
167        self.k8s_namespace = k8s.KubernetesNamespace(self.k8s_api_manager,
168                                                     self.resource_prefix)
169        # Kubernetes Test Servers
170        self.test_server_runner = _KubernetesServerRunner(
171            self.k8s_namespace,
172            deployment_name=self.server_name,
173            image_name=self.server_image,
174            gcp_project=self.project,
175            gcp_api_manager=self.gcp_api_manager,
176            gcp_service_account=self.gcp_service_account,
177            td_bootstrap_image=self.td_bootstrap_image,
178            xds_server_uri=self.xds_server_uri,
179            network=self.network,
180            enable_workload_identity=self.enable_workload_identity)
181        self.test_server_alternative_runner = _KubernetesServerRunner(
182            self.k8s_namespace,
183            deployment_name=self.server_name + '-alternative',
184            image_name=self.server_image,
185            gcp_project=self.project,
186            gcp_api_manager=self.gcp_api_manager,
187            gcp_service_account=self.gcp_service_account,
188            td_bootstrap_image=self.td_bootstrap_image,
189            xds_server_uri=self.xds_server_uri,
190            network=self.network,
191            enable_workload_identity=self.enable_workload_identity,
192            reuse_namespace=True)
193        self.test_server_affinity_runner = _KubernetesServerRunner(
194            self.k8s_namespace,
195            deployment_name=self.server_name + '-affinity',
196            image_name=self.server_image,
197            gcp_project=self.project,
198            gcp_api_manager=self.gcp_api_manager,
199            gcp_service_account=self.gcp_service_account,
200            td_bootstrap_image=self.td_bootstrap_image,
201            xds_server_uri=self.xds_server_uri,
202            network=self.network,
203            enable_workload_identity=self.enable_workload_identity,
204            reuse_namespace=True)
205        logging.info('Strategy of GCP resources management: %s', self.strategy)
206
207    def create_test_client_runner(self):
208        if self.resource_suffix:
209            client_namespace_suffix = self.resource_suffix
210        else:
211            client_namespace_suffix = framework.helpers.rand.random_resource_suffix(
212            )
213        logging.info('GcpResourceManager: client_namespace_suffix=%s',
214                     client_namespace_suffix)
215        # Kubernetes Test Client
216        namespace_name = _KubernetesClientRunner.make_namespace_name(
217            self.resource_prefix, client_namespace_suffix)
218        return _KubernetesClientRunner(
219            k8s.KubernetesNamespace(self.k8s_api_manager, namespace_name),
220            deployment_name=self.client_name,
221            image_name=self.client_image,
222            gcp_project=self.project,
223            gcp_api_manager=self.gcp_api_manager,
224            gcp_service_account=self.gcp_service_account,
225            td_bootstrap_image=self.td_bootstrap_image,
226            xds_server_uri=self.xds_server_uri,
227            network=self.network,
228            debug_use_port_forwarding=self.debug_use_port_forwarding,
229            enable_workload_identity=self.enable_workload_identity,
230            stats_port=self.client_port)
231
232    def _pre_cleanup(self):
233        # Cleanup existing debris
234        logging.info('GcpResourceManager: pre clean-up')
235        self.td.cleanup(force=True)
236        self.test_server_runner.delete_namespace()
237
238    def setup(self, test_case_classes: Iterable['XdsUrlMapTestCase']) -> None:
239        if self.strategy not in ['create', 'keep']:
240            logging.info('GcpResourceManager: skipping setup for strategy [%s]',
241                         self.strategy)
242            return
243        # Clean up debris from previous runs
244        self._pre_cleanup()
245        # Start creating GCP resources
246        logging.info('GcpResourceManager: start setup')
247        # Firewall
248        if self.ensure_firewall:
249            self.td.create_firewall_rule(
250                allowed_ports=self.firewall_allowed_ports)
251        # Health Checks
252        self.td.create_health_check()
253        # Backend Services
254        self.td.create_backend_service()
255        self.td.create_alternative_backend_service()
256        self.td.create_affinity_backend_service()
257        # Construct UrlMap from test classes
258        aggregator = _UrlMapChangeAggregator(
259            url_map_name=self.td.make_resource_name(self.td.URL_MAP_NAME))
260        for test_case_class in test_case_classes:
261            aggregator.apply_change(test_case_class)
262        final_url_map = aggregator.get_map()
263        # UrlMap
264        self.td.create_url_map_with_content(final_url_map)
265        # Target Proxy
266        self.td.create_target_proxy()
267        # Forwarding Rule
268        self.td.create_forwarding_rule(self.server_xds_port)
269        # Kubernetes Test Server
270        self.test_server_runner.run(
271            test_port=self.server_port,
272            maintenance_port=self.server_maintenance_port)
273        # Kubernetes Test Server Alternative
274        self.test_server_alternative_runner.run(
275            test_port=self.server_port,
276            maintenance_port=self.server_maintenance_port)
277        # Kubernetes Test Server Affinity. 3 endpoints to test that only the
278        # picked sub-channel is connected.
279        self.test_server_affinity_runner.run(
280            test_port=self.server_port,
281            maintenance_port=self.server_maintenance_port,
282            replica_count=3)
283        # Add backend to default backend service
284        neg_name, neg_zones = self.k8s_namespace.get_service_neg(
285            self.test_server_runner.service_name, self.server_port)
286        self.td.backend_service_add_neg_backends(neg_name, neg_zones)
287        # Add backend to alternative backend service
288        neg_name_alt, neg_zones_alt = self.k8s_namespace.get_service_neg(
289            self.test_server_alternative_runner.service_name, self.server_port)
290        self.td.alternative_backend_service_add_neg_backends(
291            neg_name_alt, neg_zones_alt)
292        # Add backend to affinity backend service
293        neg_name_affinity, neg_zones_affinity = self.k8s_namespace.get_service_neg(
294            self.test_server_affinity_runner.service_name, self.server_port)
295        self.td.affinity_backend_service_add_neg_backends(
296            neg_name_affinity, neg_zones_affinity)
297        # Wait for healthy backends
298        self.td.wait_for_backends_healthy_status()
299        self.td.wait_for_alternative_backends_healthy_status()
300        self.td.wait_for_affinity_backends_healthy_status()
301
302    def cleanup(self) -> None:
303        if self.strategy not in ['create']:
304            logging.info(
305                'GcpResourceManager: skipping tear down for strategy [%s]',
306                self.strategy)
307            return
308        logging.info('GcpResourceManager: start tear down')
309        if hasattr(self, 'td'):
310            self.td.cleanup(force=True)
311        if hasattr(self, 'test_server_runner'):
312            self.test_server_runner.cleanup(force=True)
313        if hasattr(self, 'test_server_alternative_runner'):
314            self.test_server_alternative_runner.cleanup(force=True,
315                                                        force_namespace=True)
316        if hasattr(self, 'test_server_affinity_runner'):
317            self.test_server_affinity_runner.cleanup(force=True,
318                                                     force_namespace=True)
319
320    @functools.lru_cache(None)
321    def default_backend_service(self) -> str:
322        """Returns default backend service URL."""
323        self.td.load_backend_service()
324        return self.td.backend_service.url
325
326    @functools.lru_cache(None)
327    def alternative_backend_service(self) -> str:
328        """Returns alternative backend service URL."""
329        self.td.load_alternative_backend_service()
330        return self.td.alternative_backend_service.url
331
332    @functools.lru_cache(None)
333    def affinity_backend_service(self) -> str:
334        """Returns affinity backend service URL."""
335        self.td.load_affinity_backend_service()
336        return self.td.affinity_backend_service.url
337