1# Copyright 2020 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14import dataclasses 15import datetime 16import enum 17import logging 18from typing import Any, Dict, List, Optional, Set 19 20from googleapiclient import discovery 21import googleapiclient.errors 22 23from framework.helpers import retryers 24from framework.infrastructure import gcp 25 26logger = logging.getLogger(__name__) 27 28 29class ComputeV1(gcp.api.GcpProjectApiResource): # pylint: disable=too-many-public-methods 30 # TODO(sergiitk): move someplace better 31 _WAIT_FOR_BACKEND_SEC = 60 * 10 32 _WAIT_FOR_BACKEND_SLEEP_SEC = 4 33 _WAIT_FOR_OPERATION_SEC = 60 * 10 34 35 @dataclasses.dataclass(frozen=True) 36 class GcpResource: 37 name: str 38 url: str 39 40 @dataclasses.dataclass(frozen=True) 41 class ZonalGcpResource(GcpResource): 42 zone: str 43 44 def __init__(self, 45 api_manager: gcp.api.GcpApiManager, 46 project: str, 47 version: str = 'v1'): 48 super().__init__(api_manager.compute(version), project) 49 50 class HealthCheckProtocol(enum.Enum): 51 TCP = enum.auto() 52 GRPC = enum.auto() 53 54 class BackendServiceProtocol(enum.Enum): 55 HTTP2 = enum.auto() 56 GRPC = enum.auto() 57 58 def create_health_check(self, 59 name: str, 60 protocol: HealthCheckProtocol, 61 *, 62 port: Optional[int] = None) -> 'GcpResource': 63 if protocol is self.HealthCheckProtocol.TCP: 64 health_check_field = 'tcpHealthCheck' 65 elif protocol is self.HealthCheckProtocol.GRPC: 66 health_check_field = 'grpcHealthCheck' 67 else: 68 raise TypeError(f'Unexpected Health Check protocol: {protocol}') 69 70 health_check_settings = {} 71 if port is None: 72 health_check_settings['portSpecification'] = 'USE_SERVING_PORT' 73 else: 74 health_check_settings['portSpecification'] = 'USE_FIXED_PORT' 75 health_check_settings['port'] = port 76 77 return self._insert_resource( 78 self.api.healthChecks(), { 79 'name': name, 80 'type': protocol.name, 81 health_check_field: health_check_settings, 82 }) 83 84 def list_health_check(self): 85 return self._list_resource(self.api.healthChecks()) 86 87 def delete_health_check(self, name): 88 self._delete_resource(self.api.healthChecks(), 'healthCheck', name) 89 90 def create_firewall_rule(self, name: str, network_url: str, 91 source_ranges: List[str], 92 ports: List[str]) -> Optional['GcpResource']: 93 try: 94 return self._insert_resource( 95 self.api.firewalls(), { 96 "allowed": [{ 97 "IPProtocol": "tcp", 98 "ports": ports 99 }], 100 "direction": "INGRESS", 101 "name": name, 102 "network": network_url, 103 "priority": 1000, 104 "sourceRanges": source_ranges, 105 "targetTags": ["allow-health-checks"] 106 }) 107 except googleapiclient.errors.HttpError as http_error: 108 # TODO(lidiz) use status_code() when we upgrade googleapiclient 109 if http_error.resp.status == 409: 110 logger.debug('Firewall rule %s already existed', name) 111 return None 112 else: 113 raise 114 115 def delete_firewall_rule(self, name): 116 self._delete_resource(self.api.firewalls(), 'firewall', name) 117 118 def create_backend_service_traffic_director( 119 self, 120 name: str, 121 health_check: 'GcpResource', 122 affinity_header: Optional[str] = None, 123 protocol: Optional[BackendServiceProtocol] = None, 124 subset_size: Optional[int] = None, 125 locality_lb_policies: Optional[List[dict]] = None, 126 outlier_detection: Optional[dict] = None) -> 'GcpResource': 127 if not isinstance(protocol, self.BackendServiceProtocol): 128 raise TypeError(f'Unexpected Backend Service protocol: {protocol}') 129 body = { 130 'name': name, 131 'loadBalancingScheme': 'INTERNAL_SELF_MANAGED', # Traffic Director 132 'healthChecks': [health_check.url], 133 'protocol': protocol.name, 134 } 135 # If affinity header is specified, config the backend service to support 136 # affinity, and set affinity header to the one given. 137 if affinity_header: 138 body['sessionAffinity'] = 'HEADER_FIELD' 139 body['localityLbPolicy'] = 'RING_HASH' 140 body['consistentHash'] = { 141 'httpHeaderName': affinity_header, 142 } 143 if subset_size: 144 body['subsetting'] = { 145 'policy': 'CONSISTENT_HASH_SUBSETTING', 146 'subsetSize': subset_size 147 } 148 if locality_lb_policies: 149 body['localityLbPolicies'] = locality_lb_policies 150 if outlier_detection: 151 body['outlierDetection'] = outlier_detection 152 return self._insert_resource(self.api.backendServices(), body) 153 154 def get_backend_service_traffic_director(self, name: str) -> 'GcpResource': 155 return self._get_resource(self.api.backendServices(), 156 backendService=name) 157 158 def patch_backend_service(self, backend_service, body, **kwargs): 159 self._patch_resource(collection=self.api.backendServices(), 160 backendService=backend_service.name, 161 body=body, 162 **kwargs) 163 164 def backend_service_patch_backends( 165 self, 166 backend_service, 167 backends, 168 max_rate_per_endpoint: Optional[int] = None): 169 if max_rate_per_endpoint is None: 170 max_rate_per_endpoint = 5 171 backend_list = [{ 172 'group': backend.url, 173 'balancingMode': 'RATE', 174 'maxRatePerEndpoint': max_rate_per_endpoint 175 } for backend in backends] 176 177 self._patch_resource(collection=self.api.backendServices(), 178 body={'backends': backend_list}, 179 backendService=backend_service.name) 180 181 def backend_service_remove_all_backends(self, backend_service): 182 self._patch_resource(collection=self.api.backendServices(), 183 body={'backends': []}, 184 backendService=backend_service.name) 185 186 def delete_backend_service(self, name): 187 self._delete_resource(self.api.backendServices(), 'backendService', 188 name) 189 190 def create_url_map( 191 self, 192 name: str, 193 matcher_name: str, 194 src_hosts, 195 dst_default_backend_service: 'GcpResource', 196 dst_host_rule_match_backend_service: Optional['GcpResource'] = None, 197 ) -> 'GcpResource': 198 if dst_host_rule_match_backend_service is None: 199 dst_host_rule_match_backend_service = dst_default_backend_service 200 return self._insert_resource( 201 self.api.urlMaps(), { 202 'name': 203 name, 204 'defaultService': 205 dst_default_backend_service.url, 206 'hostRules': [{ 207 'hosts': src_hosts, 208 'pathMatcher': matcher_name, 209 }], 210 'pathMatchers': [{ 211 'name': matcher_name, 212 'defaultService': dst_host_rule_match_backend_service.url, 213 }], 214 }) 215 216 def create_url_map_with_content(self, url_map_body: Any) -> 'GcpResource': 217 return self._insert_resource(self.api.urlMaps(), url_map_body) 218 219 def patch_url_map(self, url_map: 'GcpResource', body, **kwargs): 220 self._patch_resource(collection=self.api.urlMaps(), 221 urlMap=url_map.name, 222 body=body, 223 **kwargs) 224 225 def delete_url_map(self, name): 226 self._delete_resource(self.api.urlMaps(), 'urlMap', name) 227 228 def create_target_grpc_proxy( 229 self, 230 name: str, 231 url_map: 'GcpResource', 232 validate_for_proxyless: bool = True, 233 ) -> 'GcpResource': 234 return self._insert_resource( 235 self.api.targetGrpcProxies(), { 236 'name': name, 237 'url_map': url_map.url, 238 'validate_for_proxyless': validate_for_proxyless, 239 }) 240 241 def delete_target_grpc_proxy(self, name): 242 self._delete_resource(self.api.targetGrpcProxies(), 'targetGrpcProxy', 243 name) 244 245 def create_target_http_proxy( 246 self, 247 name: str, 248 url_map: 'GcpResource', 249 ) -> 'GcpResource': 250 return self._insert_resource(self.api.targetHttpProxies(), { 251 'name': name, 252 'url_map': url_map.url, 253 }) 254 255 def delete_target_http_proxy(self, name): 256 self._delete_resource(self.api.targetHttpProxies(), 'targetHttpProxy', 257 name) 258 259 def create_forwarding_rule(self, 260 name: str, 261 src_port: int, 262 target_proxy: 'GcpResource', 263 network_url: str, 264 *, 265 ip_address: str = '0.0.0.0') -> 'GcpResource': 266 return self._insert_resource( 267 self.api.globalForwardingRules(), 268 { 269 'name': name, 270 'loadBalancingScheme': 271 'INTERNAL_SELF_MANAGED', # Traffic Director 272 'portRange': src_port, 273 'IPAddress': ip_address, 274 'network': network_url, 275 'target': target_proxy.url, 276 }) 277 278 def exists_forwarding_rule(self, src_port) -> bool: 279 # TODO(sergiitk): Better approach for confirming the port is available. 280 # It's possible a rule allocates actual port range, e.g 8000-9000, 281 # and this wouldn't catch it. For now, we assume there's no 282 # port ranges used in the project. 283 filter_str = (f'(portRange eq "{src_port}-{src_port}") ' 284 f'(IPAddress eq "0.0.0.0")' 285 f'(loadBalancingScheme eq "INTERNAL_SELF_MANAGED")') 286 return self._exists_resource(self.api.globalForwardingRules(), 287 filter=filter_str) 288 289 def delete_forwarding_rule(self, name): 290 self._delete_resource(self.api.globalForwardingRules(), 291 'forwardingRule', name) 292 293 def wait_for_network_endpoint_group(self, 294 name: str, 295 zone: str, 296 *, 297 timeout_sec=_WAIT_FOR_BACKEND_SEC, 298 wait_sec=_WAIT_FOR_BACKEND_SLEEP_SEC): 299 retryer = retryers.constant_retryer( 300 wait_fixed=datetime.timedelta(seconds=wait_sec), 301 timeout=datetime.timedelta(seconds=timeout_sec), 302 check_result=lambda neg: neg and neg.get('size', 0) > 0) 303 network_endpoint_group = retryer( 304 self._retry_network_endpoint_group_ready, name, zone) 305 # TODO(sergiitk): dataclass 306 return self.ZonalGcpResource(network_endpoint_group['name'], 307 network_endpoint_group['selfLink'], zone) 308 309 def _retry_network_endpoint_group_ready(self, name: str, zone: str): 310 try: 311 neg = self.get_network_endpoint_group(name, zone) 312 logger.debug( 313 'Waiting for endpoints: NEG %s in zone %s, ' 314 'current count %s', neg['name'], zone, neg.get('size')) 315 except googleapiclient.errors.HttpError as error: 316 # noinspection PyProtectedMember 317 reason = error._get_reason() 318 logger.debug('Retrying NEG load, got %s, details %s', 319 error.resp.status, reason) 320 raise 321 return neg 322 323 def get_network_endpoint_group(self, name, zone): 324 neg = self.api.networkEndpointGroups().get(project=self.project, 325 networkEndpointGroup=name, 326 zone=zone).execute() 327 # TODO(sergiitk): dataclass 328 return neg 329 330 def wait_for_backends_healthy_status( 331 self, 332 backend_service: GcpResource, 333 backends: Set[ZonalGcpResource], 334 *, 335 timeout_sec: int = _WAIT_FOR_BACKEND_SEC, 336 wait_sec: int = _WAIT_FOR_BACKEND_SLEEP_SEC): 337 retryer = retryers.constant_retryer( 338 wait_fixed=datetime.timedelta(seconds=wait_sec), 339 timeout=datetime.timedelta(seconds=timeout_sec), 340 check_result=lambda result: result) 341 pending = set(backends) 342 retryer(self._retry_backends_health, backend_service, pending) 343 344 def _retry_backends_health(self, backend_service: GcpResource, 345 pending: Set[ZonalGcpResource]): 346 for backend in pending: 347 result = self.get_backend_service_backend_health( 348 backend_service, backend) 349 if 'healthStatus' not in result: 350 logger.debug('Waiting for instances: backend %s, zone %s', 351 backend.name, backend.zone) 352 continue 353 354 backend_healthy = True 355 for instance in result['healthStatus']: 356 logger.debug('Backend %s in zone %s: instance %s:%s health: %s', 357 backend.name, backend.zone, instance['ipAddress'], 358 instance['port'], instance['healthState']) 359 if instance['healthState'] != 'HEALTHY': 360 backend_healthy = False 361 362 if backend_healthy: 363 logger.info('Backend %s in zone %s reported healthy', 364 backend.name, backend.zone) 365 pending.remove(backend) 366 367 return not pending 368 369 def get_backend_service_backend_health(self, backend_service, backend): 370 return self.api.backendServices().getHealth( 371 project=self.project, 372 backendService=backend_service.name, 373 body={ 374 "group": backend.url 375 }).execute() 376 377 def _get_resource(self, collection: discovery.Resource, 378 **kwargs) -> 'GcpResource': 379 resp = collection.get(project=self.project, **kwargs).execute() 380 logger.info('Loaded compute resource:\n%s', 381 self.resource_pretty_format(resp)) 382 return self.GcpResource(resp['name'], resp['selfLink']) 383 384 def _exists_resource( 385 self, collection: discovery.Resource, filter: str) -> bool: # pylint: disable=redefined-builtin 386 resp = collection.list( 387 project=self.project, filter=filter, 388 maxResults=1).execute(num_retries=self._GCP_API_RETRIES) 389 if 'kind' not in resp: 390 # TODO(sergiitk): better error 391 raise ValueError('List response "kind" is missing') 392 return 'items' in resp and resp['items'] 393 394 def _insert_resource(self, collection: discovery.Resource, 395 body: Dict[str, Any]) -> 'GcpResource': 396 logger.info('Creating compute resource:\n%s', 397 self.resource_pretty_format(body)) 398 resp = self._execute(collection.insert(project=self.project, body=body)) 399 return self.GcpResource(body['name'], resp['targetLink']) 400 401 def _patch_resource(self, collection, body, **kwargs): 402 logger.info('Patching compute resource:\n%s', 403 self.resource_pretty_format(body)) 404 self._execute( 405 collection.patch(project=self.project, body=body, **kwargs)) 406 407 def _list_resource(self, collection: discovery.Resource): 408 return collection.list(project=self.project).execute( 409 num_retries=self._GCP_API_RETRIES) 410 411 def _delete_resource(self, collection: discovery.Resource, 412 resource_type: str, resource_name: str) -> bool: 413 try: 414 params = {"project": self.project, resource_type: resource_name} 415 self._execute(collection.delete(**params)) 416 return True 417 except googleapiclient.errors.HttpError as error: 418 if error.resp and error.resp.status == 404: 419 logger.info( 420 'Resource %s "%s" not deleted since it does not exist', 421 resource_type, resource_name) 422 else: 423 logger.warning('Failed to delete %s "%s", %r', resource_type, 424 resource_name, error) 425 return False 426 427 @staticmethod 428 def _operation_status_done(operation): 429 return 'status' in operation and operation['status'] == 'DONE' 430 431 def _execute( # pylint: disable=arguments-differ 432 self, 433 request, 434 *, 435 timeout_sec=_WAIT_FOR_OPERATION_SEC): 436 operation = request.execute(num_retries=self._GCP_API_RETRIES) 437 logger.debug('Operation %s', operation) 438 return self._wait(operation['name'], timeout_sec) 439 440 def _wait(self, 441 operation_id: str, 442 timeout_sec: int = _WAIT_FOR_OPERATION_SEC) -> dict: 443 logger.info('Waiting %s sec for compute operation id: %s', timeout_sec, 444 operation_id) 445 446 # TODO(sergiitk) try using wait() here 447 # https://googleapis.github.io/google-api-python-client/docs/dyn/compute_v1.globalOperations.html#wait 448 op_request = self.api.globalOperations().get(project=self.project, 449 operation=operation_id) 450 operation = self.wait_for_operation( 451 operation_request=op_request, 452 test_success_fn=self._operation_status_done, 453 timeout_sec=timeout_sec) 454 455 logger.debug('Completed operation: %s', operation) 456 if 'error' in operation: 457 # This shouldn't normally happen: gcp library raises on errors. 458 raise Exception(f'Compute operation {operation_id} ' 459 f'failed: {operation}') 460 return operation 461