1# Copyright 2020 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14import dataclasses
15import datetime
16import enum
17import logging
18from typing import Any, Dict, List, Optional, Set
19
20from googleapiclient import discovery
21import googleapiclient.errors
22
23from framework.helpers import retryers
24from framework.infrastructure import gcp
25
26logger = logging.getLogger(__name__)
27
28
29class ComputeV1(gcp.api.GcpProjectApiResource):  # pylint: disable=too-many-public-methods
30    # TODO(sergiitk): move someplace better
31    _WAIT_FOR_BACKEND_SEC = 60 * 10
32    _WAIT_FOR_BACKEND_SLEEP_SEC = 4
33    _WAIT_FOR_OPERATION_SEC = 60 * 10
34
35    @dataclasses.dataclass(frozen=True)
36    class GcpResource:
37        name: str
38        url: str
39
40    @dataclasses.dataclass(frozen=True)
41    class ZonalGcpResource(GcpResource):
42        zone: str
43
44    def __init__(self,
45                 api_manager: gcp.api.GcpApiManager,
46                 project: str,
47                 version: str = 'v1'):
48        super().__init__(api_manager.compute(version), project)
49
50    class HealthCheckProtocol(enum.Enum):
51        TCP = enum.auto()
52        GRPC = enum.auto()
53
54    class BackendServiceProtocol(enum.Enum):
55        HTTP2 = enum.auto()
56        GRPC = enum.auto()
57
58    def create_health_check(self,
59                            name: str,
60                            protocol: HealthCheckProtocol,
61                            *,
62                            port: Optional[int] = None) -> 'GcpResource':
63        if protocol is self.HealthCheckProtocol.TCP:
64            health_check_field = 'tcpHealthCheck'
65        elif protocol is self.HealthCheckProtocol.GRPC:
66            health_check_field = 'grpcHealthCheck'
67        else:
68            raise TypeError(f'Unexpected Health Check protocol: {protocol}')
69
70        health_check_settings = {}
71        if port is None:
72            health_check_settings['portSpecification'] = 'USE_SERVING_PORT'
73        else:
74            health_check_settings['portSpecification'] = 'USE_FIXED_PORT'
75            health_check_settings['port'] = port
76
77        return self._insert_resource(
78            self.api.healthChecks(), {
79                'name': name,
80                'type': protocol.name,
81                health_check_field: health_check_settings,
82            })
83
84    def list_health_check(self):
85        return self._list_resource(self.api.healthChecks())
86
87    def delete_health_check(self, name):
88        self._delete_resource(self.api.healthChecks(), 'healthCheck', name)
89
90    def create_firewall_rule(self, name: str, network_url: str,
91                             source_ranges: List[str],
92                             ports: List[str]) -> Optional['GcpResource']:
93        try:
94            return self._insert_resource(
95                self.api.firewalls(), {
96                    "allowed": [{
97                        "IPProtocol": "tcp",
98                        "ports": ports
99                    }],
100                    "direction": "INGRESS",
101                    "name": name,
102                    "network": network_url,
103                    "priority": 1000,
104                    "sourceRanges": source_ranges,
105                    "targetTags": ["allow-health-checks"]
106                })
107        except googleapiclient.errors.HttpError as http_error:
108            # TODO(lidiz) use status_code() when we upgrade googleapiclient
109            if http_error.resp.status == 409:
110                logger.debug('Firewall rule %s already existed', name)
111                return None
112            else:
113                raise
114
115    def delete_firewall_rule(self, name):
116        self._delete_resource(self.api.firewalls(), 'firewall', name)
117
118    def create_backend_service_traffic_director(
119            self,
120            name: str,
121            health_check: 'GcpResource',
122            affinity_header: Optional[str] = None,
123            protocol: Optional[BackendServiceProtocol] = None,
124            subset_size: Optional[int] = None,
125            locality_lb_policies: Optional[List[dict]] = None,
126            outlier_detection: Optional[dict] = None) -> 'GcpResource':
127        if not isinstance(protocol, self.BackendServiceProtocol):
128            raise TypeError(f'Unexpected Backend Service protocol: {protocol}')
129        body = {
130            'name': name,
131            'loadBalancingScheme': 'INTERNAL_SELF_MANAGED',  # Traffic Director
132            'healthChecks': [health_check.url],
133            'protocol': protocol.name,
134        }
135        # If affinity header is specified, config the backend service to support
136        # affinity, and set affinity header to the one given.
137        if affinity_header:
138            body['sessionAffinity'] = 'HEADER_FIELD'
139            body['localityLbPolicy'] = 'RING_HASH'
140            body['consistentHash'] = {
141                'httpHeaderName': affinity_header,
142            }
143        if subset_size:
144            body['subsetting'] = {
145                'policy': 'CONSISTENT_HASH_SUBSETTING',
146                'subsetSize': subset_size
147            }
148        if locality_lb_policies:
149            body['localityLbPolicies'] = locality_lb_policies
150        if outlier_detection:
151            body['outlierDetection'] = outlier_detection
152        return self._insert_resource(self.api.backendServices(), body)
153
154    def get_backend_service_traffic_director(self, name: str) -> 'GcpResource':
155        return self._get_resource(self.api.backendServices(),
156                                  backendService=name)
157
158    def patch_backend_service(self, backend_service, body, **kwargs):
159        self._patch_resource(collection=self.api.backendServices(),
160                             backendService=backend_service.name,
161                             body=body,
162                             **kwargs)
163
164    def backend_service_patch_backends(
165            self,
166            backend_service,
167            backends,
168            max_rate_per_endpoint: Optional[int] = None):
169        if max_rate_per_endpoint is None:
170            max_rate_per_endpoint = 5
171        backend_list = [{
172            'group': backend.url,
173            'balancingMode': 'RATE',
174            'maxRatePerEndpoint': max_rate_per_endpoint
175        } for backend in backends]
176
177        self._patch_resource(collection=self.api.backendServices(),
178                             body={'backends': backend_list},
179                             backendService=backend_service.name)
180
181    def backend_service_remove_all_backends(self, backend_service):
182        self._patch_resource(collection=self.api.backendServices(),
183                             body={'backends': []},
184                             backendService=backend_service.name)
185
186    def delete_backend_service(self, name):
187        self._delete_resource(self.api.backendServices(), 'backendService',
188                              name)
189
190    def create_url_map(
191        self,
192        name: str,
193        matcher_name: str,
194        src_hosts,
195        dst_default_backend_service: 'GcpResource',
196        dst_host_rule_match_backend_service: Optional['GcpResource'] = None,
197    ) -> 'GcpResource':
198        if dst_host_rule_match_backend_service is None:
199            dst_host_rule_match_backend_service = dst_default_backend_service
200        return self._insert_resource(
201            self.api.urlMaps(), {
202                'name':
203                    name,
204                'defaultService':
205                    dst_default_backend_service.url,
206                'hostRules': [{
207                    'hosts': src_hosts,
208                    'pathMatcher': matcher_name,
209                }],
210                'pathMatchers': [{
211                    'name': matcher_name,
212                    'defaultService': dst_host_rule_match_backend_service.url,
213                }],
214            })
215
216    def create_url_map_with_content(self, url_map_body: Any) -> 'GcpResource':
217        return self._insert_resource(self.api.urlMaps(), url_map_body)
218
219    def patch_url_map(self, url_map: 'GcpResource', body, **kwargs):
220        self._patch_resource(collection=self.api.urlMaps(),
221                             urlMap=url_map.name,
222                             body=body,
223                             **kwargs)
224
225    def delete_url_map(self, name):
226        self._delete_resource(self.api.urlMaps(), 'urlMap', name)
227
228    def create_target_grpc_proxy(
229        self,
230        name: str,
231        url_map: 'GcpResource',
232        validate_for_proxyless: bool = True,
233    ) -> 'GcpResource':
234        return self._insert_resource(
235            self.api.targetGrpcProxies(), {
236                'name': name,
237                'url_map': url_map.url,
238                'validate_for_proxyless': validate_for_proxyless,
239            })
240
241    def delete_target_grpc_proxy(self, name):
242        self._delete_resource(self.api.targetGrpcProxies(), 'targetGrpcProxy',
243                              name)
244
245    def create_target_http_proxy(
246        self,
247        name: str,
248        url_map: 'GcpResource',
249    ) -> 'GcpResource':
250        return self._insert_resource(self.api.targetHttpProxies(), {
251            'name': name,
252            'url_map': url_map.url,
253        })
254
255    def delete_target_http_proxy(self, name):
256        self._delete_resource(self.api.targetHttpProxies(), 'targetHttpProxy',
257                              name)
258
259    def create_forwarding_rule(self,
260                               name: str,
261                               src_port: int,
262                               target_proxy: 'GcpResource',
263                               network_url: str,
264                               *,
265                               ip_address: str = '0.0.0.0') -> 'GcpResource':
266        return self._insert_resource(
267            self.api.globalForwardingRules(),
268            {
269                'name': name,
270                'loadBalancingScheme':
271                    'INTERNAL_SELF_MANAGED',  # Traffic Director
272                'portRange': src_port,
273                'IPAddress': ip_address,
274                'network': network_url,
275                'target': target_proxy.url,
276            })
277
278    def exists_forwarding_rule(self, src_port) -> bool:
279        # TODO(sergiitk): Better approach for confirming the port is available.
280        #   It's possible a rule allocates actual port range, e.g 8000-9000,
281        #   and this wouldn't catch it. For now, we assume there's no
282        #   port ranges used in the project.
283        filter_str = (f'(portRange eq "{src_port}-{src_port}") '
284                      f'(IPAddress eq "0.0.0.0")'
285                      f'(loadBalancingScheme eq "INTERNAL_SELF_MANAGED")')
286        return self._exists_resource(self.api.globalForwardingRules(),
287                                     filter=filter_str)
288
289    def delete_forwarding_rule(self, name):
290        self._delete_resource(self.api.globalForwardingRules(),
291                              'forwardingRule', name)
292
293    def wait_for_network_endpoint_group(self,
294                                        name: str,
295                                        zone: str,
296                                        *,
297                                        timeout_sec=_WAIT_FOR_BACKEND_SEC,
298                                        wait_sec=_WAIT_FOR_BACKEND_SLEEP_SEC):
299        retryer = retryers.constant_retryer(
300            wait_fixed=datetime.timedelta(seconds=wait_sec),
301            timeout=datetime.timedelta(seconds=timeout_sec),
302            check_result=lambda neg: neg and neg.get('size', 0) > 0)
303        network_endpoint_group = retryer(
304            self._retry_network_endpoint_group_ready, name, zone)
305        # TODO(sergiitk): dataclass
306        return self.ZonalGcpResource(network_endpoint_group['name'],
307                                     network_endpoint_group['selfLink'], zone)
308
309    def _retry_network_endpoint_group_ready(self, name: str, zone: str):
310        try:
311            neg = self.get_network_endpoint_group(name, zone)
312            logger.debug(
313                'Waiting for endpoints: NEG %s in zone %s, '
314                'current count %s', neg['name'], zone, neg.get('size'))
315        except googleapiclient.errors.HttpError as error:
316            # noinspection PyProtectedMember
317            reason = error._get_reason()
318            logger.debug('Retrying NEG load, got %s, details %s',
319                         error.resp.status, reason)
320            raise
321        return neg
322
323    def get_network_endpoint_group(self, name, zone):
324        neg = self.api.networkEndpointGroups().get(project=self.project,
325                                                   networkEndpointGroup=name,
326                                                   zone=zone).execute()
327        # TODO(sergiitk): dataclass
328        return neg
329
330    def wait_for_backends_healthy_status(
331            self,
332            backend_service: GcpResource,
333            backends: Set[ZonalGcpResource],
334            *,
335            timeout_sec: int = _WAIT_FOR_BACKEND_SEC,
336            wait_sec: int = _WAIT_FOR_BACKEND_SLEEP_SEC):
337        retryer = retryers.constant_retryer(
338            wait_fixed=datetime.timedelta(seconds=wait_sec),
339            timeout=datetime.timedelta(seconds=timeout_sec),
340            check_result=lambda result: result)
341        pending = set(backends)
342        retryer(self._retry_backends_health, backend_service, pending)
343
344    def _retry_backends_health(self, backend_service: GcpResource,
345                               pending: Set[ZonalGcpResource]):
346        for backend in pending:
347            result = self.get_backend_service_backend_health(
348                backend_service, backend)
349            if 'healthStatus' not in result:
350                logger.debug('Waiting for instances: backend %s, zone %s',
351                             backend.name, backend.zone)
352                continue
353
354            backend_healthy = True
355            for instance in result['healthStatus']:
356                logger.debug('Backend %s in zone %s: instance %s:%s health: %s',
357                             backend.name, backend.zone, instance['ipAddress'],
358                             instance['port'], instance['healthState'])
359                if instance['healthState'] != 'HEALTHY':
360                    backend_healthy = False
361
362            if backend_healthy:
363                logger.info('Backend %s in zone %s reported healthy',
364                            backend.name, backend.zone)
365                pending.remove(backend)
366
367        return not pending
368
369    def get_backend_service_backend_health(self, backend_service, backend):
370        return self.api.backendServices().getHealth(
371            project=self.project,
372            backendService=backend_service.name,
373            body={
374                "group": backend.url
375            }).execute()
376
377    def _get_resource(self, collection: discovery.Resource,
378                      **kwargs) -> 'GcpResource':
379        resp = collection.get(project=self.project, **kwargs).execute()
380        logger.info('Loaded compute resource:\n%s',
381                    self.resource_pretty_format(resp))
382        return self.GcpResource(resp['name'], resp['selfLink'])
383
384    def _exists_resource(
385            self, collection: discovery.Resource, filter: str) -> bool:  # pylint: disable=redefined-builtin
386        resp = collection.list(
387            project=self.project, filter=filter,
388            maxResults=1).execute(num_retries=self._GCP_API_RETRIES)
389        if 'kind' not in resp:
390            # TODO(sergiitk): better error
391            raise ValueError('List response "kind" is missing')
392        return 'items' in resp and resp['items']
393
394    def _insert_resource(self, collection: discovery.Resource,
395                         body: Dict[str, Any]) -> 'GcpResource':
396        logger.info('Creating compute resource:\n%s',
397                    self.resource_pretty_format(body))
398        resp = self._execute(collection.insert(project=self.project, body=body))
399        return self.GcpResource(body['name'], resp['targetLink'])
400
401    def _patch_resource(self, collection, body, **kwargs):
402        logger.info('Patching compute resource:\n%s',
403                    self.resource_pretty_format(body))
404        self._execute(
405            collection.patch(project=self.project, body=body, **kwargs))
406
407    def _list_resource(self, collection: discovery.Resource):
408        return collection.list(project=self.project).execute(
409            num_retries=self._GCP_API_RETRIES)
410
411    def _delete_resource(self, collection: discovery.Resource,
412                         resource_type: str, resource_name: str) -> bool:
413        try:
414            params = {"project": self.project, resource_type: resource_name}
415            self._execute(collection.delete(**params))
416            return True
417        except googleapiclient.errors.HttpError as error:
418            if error.resp and error.resp.status == 404:
419                logger.info(
420                    'Resource %s "%s" not deleted since it does not exist',
421                    resource_type, resource_name)
422            else:
423                logger.warning('Failed to delete %s "%s", %r', resource_type,
424                               resource_name, error)
425        return False
426
427    @staticmethod
428    def _operation_status_done(operation):
429        return 'status' in operation and operation['status'] == 'DONE'
430
431    def _execute(  # pylint: disable=arguments-differ
432            self,
433            request,
434            *,
435            timeout_sec=_WAIT_FOR_OPERATION_SEC):
436        operation = request.execute(num_retries=self._GCP_API_RETRIES)
437        logger.debug('Operation %s', operation)
438        return self._wait(operation['name'], timeout_sec)
439
440    def _wait(self,
441              operation_id: str,
442              timeout_sec: int = _WAIT_FOR_OPERATION_SEC) -> dict:
443        logger.info('Waiting %s sec for compute operation id: %s', timeout_sec,
444                    operation_id)
445
446        # TODO(sergiitk) try using wait() here
447        # https://googleapis.github.io/google-api-python-client/docs/dyn/compute_v1.globalOperations.html#wait
448        op_request = self.api.globalOperations().get(project=self.project,
449                                                     operation=operation_id)
450        operation = self.wait_for_operation(
451            operation_request=op_request,
452            test_success_fn=self._operation_status_done,
453            timeout_sec=timeout_sec)
454
455        logger.debug('Completed operation: %s', operation)
456        if 'error' in operation:
457            # This shouldn't normally happen: gcp library raises on errors.
458            raise Exception(f'Compute operation {operation_id} '
459                            f'failed: {operation}')
460        return operation
461