1#!/usr/bin/env python3
2# Copyright 2020 gRPC authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""Run xDS integration tests on GCP using Traffic Director."""
16
17import argparse
18import datetime
19import json
20import logging
21import os
22import random
23import re
24import shlex
25import socket
26import subprocess
27import sys
28import tempfile
29import time
30import uuid
31
32from google.protobuf import json_format
33import googleapiclient.discovery
34import grpc
35from oauth2client.client import GoogleCredentials
36
37import python_utils.jobset as jobset
38import python_utils.report_utils as report_utils
39from src.proto.grpc.health.v1 import health_pb2
40from src.proto.grpc.health.v1 import health_pb2_grpc
41from src.proto.grpc.testing import empty_pb2
42from src.proto.grpc.testing import messages_pb2
43from src.proto.grpc.testing import test_pb2_grpc
44
45# Envoy protos provided by PyPI package xds-protos
46# Needs to import the generated Python file to load descriptors
47try:
48    from envoy.extensions.filters.common.fault.v3 import fault_pb2
49    from envoy.extensions.filters.http.fault.v3 import fault_pb2
50    from envoy.extensions.filters.http.router.v3 import router_pb2
51    from envoy.extensions.filters.network.http_connection_manager.v3 import \
52        http_connection_manager_pb2
53    from envoy.service.status.v3 import csds_pb2
54    from envoy.service.status.v3 import csds_pb2_grpc
55except ImportError:
56    # These protos are required by CSDS test. We should not fail the entire
57    # script for one test case.
58    pass
59
60logger = logging.getLogger()
61console_handler = logging.StreamHandler()
62formatter = logging.Formatter(fmt='%(asctime)s: %(levelname)-8s %(message)s')
63console_handler.setFormatter(formatter)
64logger.handlers = []
65logger.addHandler(console_handler)
66logger.setLevel(logging.WARNING)
67
68# Suppress excessive logs for gRPC Python
69original_grpc_trace = os.environ.pop('GRPC_TRACE', None)
70original_grpc_verbosity = os.environ.pop('GRPC_VERBOSITY', None)
71# Suppress not-essential logs for GCP clients
72logging.getLogger('google_auth_httplib2').setLevel(logging.WARNING)
73logging.getLogger('googleapiclient.discovery').setLevel(logging.WARNING)
74
75_TEST_CASES = [
76    'backends_restart',
77    'change_backend_service',
78    'gentle_failover',
79    'load_report_based_failover',
80    'ping_pong',
81    'remove_instance_group',
82    'round_robin',
83    'secondary_locality_gets_no_requests_on_partial_primary_failure',
84    'secondary_locality_gets_requests_on_primary_failure',
85    'traffic_splitting',
86    'path_matching',
87    'header_matching',
88    'api_listener',
89    'forwarding_rule_port_match',
90    'forwarding_rule_default_port',
91    'metadata_filter',
92]
93
94# Valid test cases, but not in all. So the tests can only run manually, and
95# aren't enabled automatically for all languages.
96#
97# TODO: Move them into _TEST_CASES when support is ready in all languages.
98_ADDITIONAL_TEST_CASES = [
99    'circuit_breaking',
100    'timeout',
101    'fault_injection',
102    'csds',
103]
104
105# Test cases that require the V3 API.  Skipped in older runs.
106_V3_TEST_CASES = frozenset(['timeout', 'fault_injection', 'csds'])
107
108# Test cases that require the alpha API.  Skipped for stable API runs.
109_ALPHA_TEST_CASES = frozenset(['timeout'])
110
111
112def parse_test_cases(arg):
113    if arg == '':
114        return []
115    arg_split = arg.split(',')
116    test_cases = set()
117    all_test_cases = _TEST_CASES + _ADDITIONAL_TEST_CASES
118    for arg in arg_split:
119        if arg == "all":
120            test_cases = test_cases.union(_TEST_CASES)
121        else:
122            test_cases = test_cases.union([arg])
123    if not all([test_case in all_test_cases for test_case in test_cases]):
124        raise Exception('Failed to parse test cases %s' % arg)
125    # Perserve order.
126    return [x for x in all_test_cases if x in test_cases]
127
128
129def parse_port_range(port_arg):
130    try:
131        port = int(port_arg)
132        return list(range(port, port + 1))
133    except:
134        port_min, port_max = port_arg.split(':')
135        return list(range(int(port_min), int(port_max) + 1))
136
137
138argp = argparse.ArgumentParser(description='Run xDS interop tests on GCP')
139# TODO(zdapeng): remove default value of project_id and project_num
140argp.add_argument('--project_id', default='grpc-testing', help='GCP project id')
141argp.add_argument('--project_num',
142                  default='830293263384',
143                  help='GCP project number')
144argp.add_argument(
145    '--gcp_suffix',
146    default='',
147    help='Optional suffix for all generated GCP resource names. Useful to '
148    'ensure distinct names across test runs.')
149argp.add_argument(
150    '--test_case',
151    default='ping_pong',
152    type=parse_test_cases,
153    help='Comma-separated list of test cases to run. Available tests: %s, '
154    '(or \'all\' to run every test). '
155    'Alternative tests not included in \'all\': %s' %
156    (','.join(_TEST_CASES), ','.join(_ADDITIONAL_TEST_CASES)))
157argp.add_argument(
158    '--bootstrap_file',
159    default='',
160    help='File to reference via GRPC_XDS_BOOTSTRAP. Disables built-in '
161    'bootstrap generation')
162argp.add_argument(
163    '--xds_v3_support',
164    default=False,
165    action='store_true',
166    help='Support xDS v3 via GRPC_XDS_EXPERIMENTAL_V3_SUPPORT. '
167    'If a pre-created bootstrap file is provided via the --bootstrap_file '
168    'parameter, it should include xds_v3 in its server_features field.')
169argp.add_argument(
170    '--client_cmd',
171    default=None,
172    help='Command to launch xDS test client. {server_uri}, {stats_port} and '
173    '{qps} references will be replaced using str.format(). GRPC_XDS_BOOTSTRAP '
174    'will be set for the command')
175argp.add_argument(
176    '--client_hosts',
177    default=None,
178    help='Comma-separated list of hosts running client processes. If set, '
179    '--client_cmd is ignored and client processes are assumed to be running on '
180    'the specified hosts.')
181argp.add_argument('--zone', default='us-central1-a')
182argp.add_argument('--secondary_zone',
183                  default='us-west1-b',
184                  help='Zone to use for secondary TD locality tests')
185argp.add_argument('--qps', default=100, type=int, help='Client QPS')
186argp.add_argument(
187    '--wait_for_backend_sec',
188    default=1200,
189    type=int,
190    help='Time limit for waiting for created backend services to report '
191    'healthy when launching or updated GCP resources')
192argp.add_argument(
193    '--use_existing_gcp_resources',
194    default=False,
195    action='store_true',
196    help=
197    'If set, find and use already created GCP resources instead of creating new'
198    ' ones.')
199argp.add_argument(
200    '--keep_gcp_resources',
201    default=False,
202    action='store_true',
203    help=
204    'Leave GCP VMs and configuration running after test. Default behavior is '
205    'to delete when tests complete.')
206argp.add_argument('--halt_after_fail',
207                  action='store_true',
208                  help='Halt and save the resources when test failed.')
209argp.add_argument(
210    '--compute_discovery_document',
211    default=None,
212    type=str,
213    help=
214    'If provided, uses this file instead of retrieving via the GCP discovery '
215    'API')
216argp.add_argument(
217    '--alpha_compute_discovery_document',
218    default=None,
219    type=str,
220    help='If provided, uses this file instead of retrieving via the alpha GCP '
221    'discovery API')
222argp.add_argument('--network',
223                  default='global/networks/default',
224                  help='GCP network to use')
225_DEFAULT_PORT_RANGE = '8080:8280'
226argp.add_argument('--service_port_range',
227                  default=_DEFAULT_PORT_RANGE,
228                  type=parse_port_range,
229                  help='Listening port for created gRPC backends. Specified as '
230                  'either a single int or as a range in the format min:max, in '
231                  'which case an available port p will be chosen s.t. min <= p '
232                  '<= max')
233argp.add_argument(
234    '--stats_port',
235    default=8079,
236    type=int,
237    help='Local port for the client process to expose the LB stats service')
238argp.add_argument('--xds_server',
239                  default='trafficdirector.googleapis.com:443',
240                  help='xDS server')
241argp.add_argument('--source_image',
242                  default='projects/debian-cloud/global/images/family/debian-9',
243                  help='Source image for VMs created during the test')
244argp.add_argument('--path_to_server_binary',
245                  default=None,
246                  type=str,
247                  help='If set, the server binary must already be pre-built on '
248                  'the specified source image')
249argp.add_argument('--machine_type',
250                  default='e2-standard-2',
251                  help='Machine type for VMs created during the test')
252argp.add_argument(
253    '--instance_group_size',
254    default=2,
255    type=int,
256    help='Number of VMs to create per instance group. Certain test cases (e.g., '
257    'round_robin) may not give meaningful results if this is set to a value '
258    'less than 2.')
259argp.add_argument('--verbose',
260                  help='verbose log output',
261                  default=False,
262                  action='store_true')
263# TODO(ericgribkoff) Remove this param once the sponge-formatted log files are
264# visible in all test environments.
265argp.add_argument('--log_client_output',
266                  help='Log captured client output',
267                  default=False,
268                  action='store_true')
269# TODO(ericgribkoff) Remove this flag once all test environments are verified to
270# have access to the alpha compute APIs.
271argp.add_argument('--only_stable_gcp_apis',
272                  help='Do not use alpha compute APIs. Some tests may be '
273                  'incompatible with this option (gRPC health checks are '
274                  'currently alpha and required for simulating server failure',
275                  default=False,
276                  action='store_true')
277args = argp.parse_args()
278
279if args.verbose:
280    logger.setLevel(logging.DEBUG)
281
282CLIENT_HOSTS = []
283if args.client_hosts:
284    CLIENT_HOSTS = args.client_hosts.split(',')
285
286# Each of the config propagation in the control plane should finish within 600s.
287# Otherwise, it indicates a bug in the control plane. The config propagation
288# includes all kinds of traffic config update, like updating urlMap, creating
289# the resources for the first time, updating BackendService, and changing the
290# status of endpoints in BackendService.
291_WAIT_FOR_URL_MAP_PATCH_SEC = 600
292# In general, fetching load balancing stats only takes ~10s. However, slow
293# config update could lead to empty EDS or similar symptoms causing the
294# connection to hang for a long period of time. So, we want to extend the stats
295# wait time to be the same as urlMap patch time.
296_WAIT_FOR_STATS_SEC = _WAIT_FOR_URL_MAP_PATCH_SEC
297
298_DEFAULT_SERVICE_PORT = 80
299_WAIT_FOR_BACKEND_SEC = args.wait_for_backend_sec
300_WAIT_FOR_OPERATION_SEC = 1200
301_INSTANCE_GROUP_SIZE = args.instance_group_size
302_NUM_TEST_RPCS = 10 * args.qps
303_CONNECTION_TIMEOUT_SEC = 60
304_GCP_API_RETRIES = 5
305_BOOTSTRAP_TEMPLATE = """
306{{
307  "node": {{
308    "id": "{node_id}",
309    "metadata": {{
310      "TRAFFICDIRECTOR_NETWORK_NAME": "%s",
311      "com.googleapis.trafficdirector.config_time_trace": "TRUE"
312    }},
313    "locality": {{
314      "zone": "%s"
315    }}
316  }},
317  "xds_servers": [{{
318    "server_uri": "%s",
319    "channel_creds": [
320      {{
321        "type": "google_default",
322        "config": {{}}
323      }}
324    ],
325    "server_features": {server_features}
326  }}]
327}}""" % (args.network.split('/')[-1], args.zone, args.xds_server)
328
329# TODO(ericgribkoff) Add change_backend_service to this list once TD no longer
330# sends an update with no localities when adding the MIG to the backend service
331# can race with the URL map patch.
332_TESTS_TO_FAIL_ON_RPC_FAILURE = ['ping_pong', 'round_robin']
333# Tests that run UnaryCall and EmptyCall.
334_TESTS_TO_RUN_MULTIPLE_RPCS = ['path_matching', 'header_matching']
335# Tests that make UnaryCall with test metadata.
336_TESTS_TO_SEND_METADATA = ['header_matching']
337_TEST_METADATA_KEY = 'xds_md'
338_TEST_METADATA_VALUE_UNARY = 'unary_yranu'
339_TEST_METADATA_VALUE_EMPTY = 'empty_ytpme'
340# Extra RPC metadata whose value is a number, sent with UnaryCall only.
341_TEST_METADATA_NUMERIC_KEY = 'xds_md_numeric'
342_TEST_METADATA_NUMERIC_VALUE = '159'
343_PATH_MATCHER_NAME = 'path-matcher'
344_BASE_TEMPLATE_NAME = 'test-template'
345_BASE_INSTANCE_GROUP_NAME = 'test-ig'
346_BASE_HEALTH_CHECK_NAME = 'test-hc'
347_BASE_FIREWALL_RULE_NAME = 'test-fw-rule'
348_BASE_BACKEND_SERVICE_NAME = 'test-backend-service'
349_BASE_URL_MAP_NAME = 'test-map'
350_BASE_SERVICE_HOST = 'grpc-test'
351_BASE_TARGET_PROXY_NAME = 'test-target-proxy'
352_BASE_FORWARDING_RULE_NAME = 'test-forwarding-rule'
353_TEST_LOG_BASE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)),
354                                  '../../reports')
355_SPONGE_LOG_NAME = 'sponge_log.log'
356_SPONGE_XML_NAME = 'sponge_log.xml'
357
358
359def get_client_stats(num_rpcs, timeout_sec):
360    if CLIENT_HOSTS:
361        hosts = CLIENT_HOSTS
362    else:
363        hosts = ['localhost']
364    for host in hosts:
365        with grpc.insecure_channel('%s:%d' %
366                                   (host, args.stats_port)) as channel:
367            stub = test_pb2_grpc.LoadBalancerStatsServiceStub(channel)
368            request = messages_pb2.LoadBalancerStatsRequest()
369            request.num_rpcs = num_rpcs
370            request.timeout_sec = timeout_sec
371            rpc_timeout = timeout_sec + _CONNECTION_TIMEOUT_SEC
372            logger.debug('Invoking GetClientStats RPC to %s:%d:', host,
373                         args.stats_port)
374            response = stub.GetClientStats(request,
375                                           wait_for_ready=True,
376                                           timeout=rpc_timeout)
377            logger.debug('Invoked GetClientStats RPC to %s: %s', host,
378                         json_format.MessageToJson(response))
379            return response
380
381
382def get_client_accumulated_stats():
383    if CLIENT_HOSTS:
384        hosts = CLIENT_HOSTS
385    else:
386        hosts = ['localhost']
387    for host in hosts:
388        with grpc.insecure_channel('%s:%d' %
389                                   (host, args.stats_port)) as channel:
390            stub = test_pb2_grpc.LoadBalancerStatsServiceStub(channel)
391            request = messages_pb2.LoadBalancerAccumulatedStatsRequest()
392            logger.debug('Invoking GetClientAccumulatedStats RPC to %s:%d:',
393                         host, args.stats_port)
394            response = stub.GetClientAccumulatedStats(
395                request, wait_for_ready=True, timeout=_CONNECTION_TIMEOUT_SEC)
396            logger.debug('Invoked GetClientAccumulatedStats RPC to %s: %s',
397                         host, response)
398            return response
399
400
401def get_client_xds_config_dump():
402    if CLIENT_HOSTS:
403        hosts = CLIENT_HOSTS
404    else:
405        hosts = ['localhost']
406    for host in hosts:
407        server_address = '%s:%d' % (host, args.stats_port)
408        with grpc.insecure_channel(server_address) as channel:
409            stub = csds_pb2_grpc.ClientStatusDiscoveryServiceStub(channel)
410            logger.debug('Fetching xDS config dump from %s', server_address)
411            response = stub.FetchClientStatus(csds_pb2.ClientStatusRequest(),
412                                              wait_for_ready=True,
413                                              timeout=_CONNECTION_TIMEOUT_SEC)
414            logger.debug('Fetched xDS config dump from %s', server_address)
415            if len(response.config) != 1:
416                logger.error('Unexpected number of ClientConfigs %d: %s',
417                             len(response.config), response)
418                return None
419            else:
420                # Converting the ClientStatusResponse into JSON, because many
421                # fields are packed in google.protobuf.Any. It will require many
422                # duplicated code to unpack proto message and inspect values.
423                return json_format.MessageToDict(
424                    response.config[0], preserving_proto_field_name=True)
425
426
427def configure_client(rpc_types, metadata=[], timeout_sec=None):
428    if CLIENT_HOSTS:
429        hosts = CLIENT_HOSTS
430    else:
431        hosts = ['localhost']
432    for host in hosts:
433        with grpc.insecure_channel('%s:%d' %
434                                   (host, args.stats_port)) as channel:
435            stub = test_pb2_grpc.XdsUpdateClientConfigureServiceStub(channel)
436            request = messages_pb2.ClientConfigureRequest()
437            request.types.extend(rpc_types)
438            for rpc_type, md_key, md_value in metadata:
439                md = request.metadata.add()
440                md.type = rpc_type
441                md.key = md_key
442                md.value = md_value
443            if timeout_sec:
444                request.timeout_sec = timeout_sec
445            logger.debug(
446                'Invoking XdsUpdateClientConfigureService RPC to %s:%d: %s',
447                host, args.stats_port, request)
448            stub.Configure(request,
449                           wait_for_ready=True,
450                           timeout=_CONNECTION_TIMEOUT_SEC)
451            logger.debug('Invoked XdsUpdateClientConfigureService RPC to %s',
452                         host)
453
454
455class RpcDistributionError(Exception):
456    pass
457
458
459def _verify_rpcs_to_given_backends(backends, timeout_sec, num_rpcs,
460                                   allow_failures):
461    start_time = time.time()
462    error_msg = None
463    logger.debug('Waiting for %d sec until backends %s receive load' %
464                 (timeout_sec, backends))
465    while time.time() - start_time <= timeout_sec:
466        error_msg = None
467        stats = get_client_stats(num_rpcs, timeout_sec)
468        rpcs_by_peer = stats.rpcs_by_peer
469        for backend in backends:
470            if backend not in rpcs_by_peer:
471                error_msg = 'Backend %s did not receive load' % backend
472                break
473        if not error_msg and len(rpcs_by_peer) > len(backends):
474            error_msg = 'Unexpected backend received load: %s' % rpcs_by_peer
475        if not allow_failures and stats.num_failures > 0:
476            error_msg = '%d RPCs failed' % stats.num_failures
477        if not error_msg:
478            return
479    raise RpcDistributionError(error_msg)
480
481
482def wait_until_all_rpcs_go_to_given_backends_or_fail(backends,
483                                                     timeout_sec,
484                                                     num_rpcs=_NUM_TEST_RPCS):
485    _verify_rpcs_to_given_backends(backends,
486                                   timeout_sec,
487                                   num_rpcs,
488                                   allow_failures=True)
489
490
491def wait_until_all_rpcs_go_to_given_backends(backends,
492                                             timeout_sec,
493                                             num_rpcs=_NUM_TEST_RPCS):
494    _verify_rpcs_to_given_backends(backends,
495                                   timeout_sec,
496                                   num_rpcs,
497                                   allow_failures=False)
498
499
500def wait_until_no_rpcs_go_to_given_backends(backends, timeout_sec):
501    start_time = time.time()
502    while time.time() - start_time <= timeout_sec:
503        stats = get_client_stats(_NUM_TEST_RPCS, timeout_sec)
504        error_msg = None
505        rpcs_by_peer = stats.rpcs_by_peer
506        for backend in backends:
507            if backend in rpcs_by_peer:
508                error_msg = 'Unexpected backend %s receives load' % backend
509                break
510        if not error_msg:
511            return
512    raise Exception('Unexpected RPCs going to given backends')
513
514
515def wait_until_rpcs_in_flight(rpc_type, timeout_sec, num_rpcs, threshold):
516    '''Block until the test client reaches the state with the given number
517    of RPCs being outstanding stably.
518
519    Args:
520      rpc_type: A string indicating the RPC method to check for. Either
521        'UnaryCall' or 'EmptyCall'.
522      timeout_sec: Maximum number of seconds to wait until the desired state
523        is reached.
524      num_rpcs: Expected number of RPCs to be in-flight.
525      threshold: Number within [0,100], the tolerable percentage by which
526        the actual number of RPCs in-flight can differ from the expected number.
527    '''
528    if threshold < 0 or threshold > 100:
529        raise ValueError('Value error: Threshold should be between 0 to 100')
530    threshold_fraction = threshold / 100.0
531    start_time = time.time()
532    error_msg = None
533    logger.debug(
534        'Waiting for %d sec until %d %s RPCs (with %d%% tolerance) in-flight' %
535        (timeout_sec, num_rpcs, rpc_type, threshold))
536    while time.time() - start_time <= timeout_sec:
537        error_msg = _check_rpcs_in_flight(rpc_type, num_rpcs, threshold,
538                                          threshold_fraction)
539        if error_msg:
540            logger.debug('Progress: %s', error_msg)
541            time.sleep(2)
542        else:
543            break
544    # Ensure the number of outstanding RPCs is stable.
545    if not error_msg:
546        time.sleep(5)
547        error_msg = _check_rpcs_in_flight(rpc_type, num_rpcs, threshold,
548                                          threshold_fraction)
549    if error_msg:
550        raise Exception("Wrong number of %s RPCs in-flight: %s" %
551                        (rpc_type, error_msg))
552
553
554def _check_rpcs_in_flight(rpc_type, num_rpcs, threshold, threshold_fraction):
555    error_msg = None
556    stats = get_client_accumulated_stats()
557    rpcs_started = stats.num_rpcs_started_by_method[rpc_type]
558    rpcs_succeeded = stats.num_rpcs_succeeded_by_method[rpc_type]
559    rpcs_failed = stats.num_rpcs_failed_by_method[rpc_type]
560    rpcs_in_flight = rpcs_started - rpcs_succeeded - rpcs_failed
561    if rpcs_in_flight < (num_rpcs * (1 - threshold_fraction)):
562        error_msg = ('actual(%d) < expected(%d - %d%%)' %
563                     (rpcs_in_flight, num_rpcs, threshold))
564    elif rpcs_in_flight > (num_rpcs * (1 + threshold_fraction)):
565        error_msg = ('actual(%d) > expected(%d + %d%%)' %
566                     (rpcs_in_flight, num_rpcs, threshold))
567    return error_msg
568
569
570def compare_distributions(actual_distribution, expected_distribution,
571                          threshold):
572    """Compare if two distributions are similar.
573
574    Args:
575      actual_distribution: A list of floats, contains the actual distribution.
576      expected_distribution: A list of floats, contains the expected distribution.
577      threshold: Number within [0,100], the threshold percentage by which the
578        actual distribution can differ from the expected distribution.
579
580    Returns:
581      The similarity between the distributions as a boolean. Returns true if the
582      actual distribution lies within the threshold of the expected
583      distribution, false otherwise.
584
585    Raises:
586      ValueError: if threshold is not with in [0,100].
587      Exception: containing detailed error messages.
588    """
589    if len(expected_distribution) != len(actual_distribution):
590        raise Exception(
591            'Error: expected and actual distributions have different size (%d vs %d)'
592            % (len(expected_distribution), len(actual_distribution)))
593    if threshold < 0 or threshold > 100:
594        raise ValueError('Value error: Threshold should be between 0 to 100')
595    threshold_fraction = threshold / 100.0
596    for expected, actual in zip(expected_distribution, actual_distribution):
597        if actual < (expected * (1 - threshold_fraction)):
598            raise Exception("actual(%f) < expected(%f-%d%%)" %
599                            (actual, expected, threshold))
600        if actual > (expected * (1 + threshold_fraction)):
601            raise Exception("actual(%f) > expected(%f+%d%%)" %
602                            (actual, expected, threshold))
603    return True
604
605
606def compare_expected_instances(stats, expected_instances):
607    """Compare if stats have expected instances for each type of RPC.
608
609    Args:
610      stats: LoadBalancerStatsResponse reported by interop client.
611      expected_instances: a dict with key as the RPC type (string), value as
612        the expected backend instances (list of strings).
613
614    Returns:
615      Returns true if the instances are expected. False if not.
616    """
617    for rpc_type, expected_peers in list(expected_instances.items()):
618        rpcs_by_peer_for_type = stats.rpcs_by_method[rpc_type]
619        rpcs_by_peer = rpcs_by_peer_for_type.rpcs_by_peer if rpcs_by_peer_for_type else None
620        logger.debug('rpc: %s, by_peer: %s', rpc_type, rpcs_by_peer)
621        peers = list(rpcs_by_peer.keys())
622        if set(peers) != set(expected_peers):
623            logger.info('unexpected peers for %s, got %s, want %s', rpc_type,
624                        peers, expected_peers)
625            return False
626    return True
627
628
629def test_backends_restart(gcp, backend_service, instance_group):
630    logger.info('Running test_backends_restart')
631    instance_names = get_instance_names(gcp, instance_group)
632    num_instances = len(instance_names)
633    start_time = time.time()
634    wait_until_all_rpcs_go_to_given_backends(instance_names,
635                                             _WAIT_FOR_STATS_SEC)
636    try:
637        resize_instance_group(gcp, instance_group, 0)
638        wait_until_all_rpcs_go_to_given_backends_or_fail([],
639                                                         _WAIT_FOR_BACKEND_SEC)
640    finally:
641        resize_instance_group(gcp, instance_group, num_instances)
642    wait_for_healthy_backends(gcp, backend_service, instance_group)
643    new_instance_names = get_instance_names(gcp, instance_group)
644    wait_until_all_rpcs_go_to_given_backends(new_instance_names,
645                                             _WAIT_FOR_BACKEND_SEC)
646
647
648def test_change_backend_service(gcp, original_backend_service, instance_group,
649                                alternate_backend_service,
650                                same_zone_instance_group):
651    logger.info('Running test_change_backend_service')
652    original_backend_instances = get_instance_names(gcp, instance_group)
653    alternate_backend_instances = get_instance_names(gcp,
654                                                     same_zone_instance_group)
655    patch_backend_service(gcp, alternate_backend_service,
656                          [same_zone_instance_group])
657    wait_for_healthy_backends(gcp, original_backend_service, instance_group)
658    wait_for_healthy_backends(gcp, alternate_backend_service,
659                              same_zone_instance_group)
660    wait_until_all_rpcs_go_to_given_backends(original_backend_instances,
661                                             _WAIT_FOR_STATS_SEC)
662    passed = True
663    try:
664        patch_url_map_backend_service(gcp, alternate_backend_service)
665        wait_until_all_rpcs_go_to_given_backends(alternate_backend_instances,
666                                                 _WAIT_FOR_URL_MAP_PATCH_SEC)
667    except Exception:
668        passed = False
669        raise
670    finally:
671        if passed or not args.halt_after_fail:
672            patch_url_map_backend_service(gcp, original_backend_service)
673            patch_backend_service(gcp, alternate_backend_service, [])
674
675
676def test_gentle_failover(gcp,
677                         backend_service,
678                         primary_instance_group,
679                         secondary_instance_group,
680                         swapped_primary_and_secondary=False):
681    logger.info('Running test_gentle_failover')
682    num_primary_instances = len(get_instance_names(gcp, primary_instance_group))
683    min_instances_for_gentle_failover = 3  # Need >50% failure to start failover
684    passed = True
685    try:
686        if num_primary_instances < min_instances_for_gentle_failover:
687            resize_instance_group(gcp, primary_instance_group,
688                                  min_instances_for_gentle_failover)
689        patch_backend_service(
690            gcp, backend_service,
691            [primary_instance_group, secondary_instance_group])
692        primary_instance_names = get_instance_names(gcp, primary_instance_group)
693        secondary_instance_names = get_instance_names(gcp,
694                                                      secondary_instance_group)
695        wait_for_healthy_backends(gcp, backend_service, primary_instance_group)
696        wait_for_healthy_backends(gcp, backend_service,
697                                  secondary_instance_group)
698        wait_until_all_rpcs_go_to_given_backends(primary_instance_names,
699                                                 _WAIT_FOR_STATS_SEC)
700        instances_to_stop = primary_instance_names[:-1]
701        remaining_instances = primary_instance_names[-1:]
702        try:
703            set_serving_status(instances_to_stop,
704                               gcp.service_port,
705                               serving=False)
706            wait_until_all_rpcs_go_to_given_backends(
707                remaining_instances + secondary_instance_names,
708                _WAIT_FOR_BACKEND_SEC)
709        finally:
710            set_serving_status(primary_instance_names,
711                               gcp.service_port,
712                               serving=True)
713    except RpcDistributionError as e:
714        if not swapped_primary_and_secondary and is_primary_instance_group(
715                gcp, secondary_instance_group):
716            # Swap expectation of primary and secondary instance groups.
717            test_gentle_failover(gcp,
718                                 backend_service,
719                                 secondary_instance_group,
720                                 primary_instance_group,
721                                 swapped_primary_and_secondary=True)
722        else:
723            passed = False
724            raise e
725    except Exception:
726        passed = False
727        raise
728    finally:
729        if passed or not args.halt_after_fail:
730            patch_backend_service(gcp, backend_service,
731                                  [primary_instance_group])
732            resize_instance_group(gcp, primary_instance_group,
733                                  num_primary_instances)
734            instance_names = get_instance_names(gcp, primary_instance_group)
735            wait_until_all_rpcs_go_to_given_backends(instance_names,
736                                                     _WAIT_FOR_BACKEND_SEC)
737
738
739def test_load_report_based_failover(gcp, backend_service,
740                                    primary_instance_group,
741                                    secondary_instance_group):
742    logger.info('Running test_load_report_based_failover')
743    passed = True
744    try:
745        patch_backend_service(
746            gcp, backend_service,
747            [primary_instance_group, secondary_instance_group])
748        primary_instance_names = get_instance_names(gcp, primary_instance_group)
749        secondary_instance_names = get_instance_names(gcp,
750                                                      secondary_instance_group)
751        wait_for_healthy_backends(gcp, backend_service, primary_instance_group)
752        wait_for_healthy_backends(gcp, backend_service,
753                                  secondary_instance_group)
754        wait_until_all_rpcs_go_to_given_backends(primary_instance_names,
755                                                 _WAIT_FOR_STATS_SEC)
756        # Set primary locality's balance mode to RATE, and RPS to 20% of the
757        # client's QPS. The secondary locality will be used.
758        max_rate = int(args.qps * 1 / 5)
759        logger.info('Patching backend service to RATE with %d max_rate',
760                    max_rate)
761        patch_backend_service(
762            gcp,
763            backend_service, [primary_instance_group, secondary_instance_group],
764            balancing_mode='RATE',
765            max_rate=max_rate)
766        wait_until_all_rpcs_go_to_given_backends(
767            primary_instance_names + secondary_instance_names,
768            _WAIT_FOR_BACKEND_SEC)
769
770        # Set primary locality's balance mode to RATE, and RPS to 120% of the
771        # client's QPS. Only the primary locality will be used.
772        max_rate = int(args.qps * 6 / 5)
773        logger.info('Patching backend service to RATE with %d max_rate',
774                    max_rate)
775        patch_backend_service(
776            gcp,
777            backend_service, [primary_instance_group, secondary_instance_group],
778            balancing_mode='RATE',
779            max_rate=max_rate)
780        wait_until_all_rpcs_go_to_given_backends(primary_instance_names,
781                                                 _WAIT_FOR_BACKEND_SEC)
782        logger.info("success")
783    except Exception:
784        passed = False
785        raise
786    finally:
787        if passed or not args.halt_after_fail:
788            patch_backend_service(gcp, backend_service,
789                                  [primary_instance_group])
790            instance_names = get_instance_names(gcp, primary_instance_group)
791            wait_until_all_rpcs_go_to_given_backends(instance_names,
792                                                     _WAIT_FOR_BACKEND_SEC)
793
794
795def test_ping_pong(gcp, backend_service, instance_group):
796    logger.info('Running test_ping_pong')
797    wait_for_healthy_backends(gcp, backend_service, instance_group)
798    instance_names = get_instance_names(gcp, instance_group)
799    wait_until_all_rpcs_go_to_given_backends(instance_names,
800                                             _WAIT_FOR_STATS_SEC)
801
802
803def test_remove_instance_group(gcp, backend_service, instance_group,
804                               same_zone_instance_group):
805    logger.info('Running test_remove_instance_group')
806    passed = True
807    try:
808        patch_backend_service(gcp,
809                              backend_service,
810                              [instance_group, same_zone_instance_group],
811                              balancing_mode='RATE')
812        wait_for_healthy_backends(gcp, backend_service, instance_group)
813        wait_for_healthy_backends(gcp, backend_service,
814                                  same_zone_instance_group)
815        instance_names = get_instance_names(gcp, instance_group)
816        same_zone_instance_names = get_instance_names(gcp,
817                                                      same_zone_instance_group)
818        try:
819            wait_until_all_rpcs_go_to_given_backends(
820                instance_names + same_zone_instance_names,
821                _WAIT_FOR_OPERATION_SEC)
822            remaining_instance_group = same_zone_instance_group
823            remaining_instance_names = same_zone_instance_names
824        except RpcDistributionError as e:
825            # If connected to TD in a different zone, we may route traffic to
826            # only one instance group. Determine which group that is to continue
827            # with the remainder of the test case.
828            try:
829                wait_until_all_rpcs_go_to_given_backends(
830                    instance_names, _WAIT_FOR_STATS_SEC)
831                remaining_instance_group = same_zone_instance_group
832                remaining_instance_names = same_zone_instance_names
833            except RpcDistributionError as e:
834                wait_until_all_rpcs_go_to_given_backends(
835                    same_zone_instance_names, _WAIT_FOR_STATS_SEC)
836                remaining_instance_group = instance_group
837                remaining_instance_names = instance_names
838        patch_backend_service(gcp,
839                              backend_service, [remaining_instance_group],
840                              balancing_mode='RATE')
841        wait_until_all_rpcs_go_to_given_backends(remaining_instance_names,
842                                                 _WAIT_FOR_BACKEND_SEC)
843    except Exception:
844        passed = False
845        raise
846    finally:
847        if passed or not args.halt_after_fail:
848            patch_backend_service(gcp, backend_service, [instance_group])
849            wait_until_all_rpcs_go_to_given_backends(instance_names,
850                                                     _WAIT_FOR_BACKEND_SEC)
851
852
853def test_round_robin(gcp, backend_service, instance_group):
854    logger.info('Running test_round_robin')
855    wait_for_healthy_backends(gcp, backend_service, instance_group)
856    instance_names = get_instance_names(gcp, instance_group)
857    threshold = 1
858    wait_until_all_rpcs_go_to_given_backends(instance_names,
859                                             _WAIT_FOR_STATS_SEC)
860    # TODO(ericgribkoff) Delayed config propagation from earlier tests
861    # may result in briefly receiving an empty EDS update, resulting in failed
862    # RPCs. Retry distribution validation if this occurs; long-term fix is
863    # creating new backend resources for each individual test case.
864    # Each attempt takes 10 seconds. Config propagation can take several
865    # minutes.
866    max_attempts = 40
867    for i in range(max_attempts):
868        stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
869        requests_received = [stats.rpcs_by_peer[x] for x in stats.rpcs_by_peer]
870        total_requests_received = sum(requests_received)
871        if total_requests_received != _NUM_TEST_RPCS:
872            logger.info('Unexpected RPC failures, retrying: %s', stats)
873            continue
874        expected_requests = total_requests_received / len(instance_names)
875        for instance in instance_names:
876            if abs(stats.rpcs_by_peer[instance] -
877                   expected_requests) > threshold:
878                raise Exception(
879                    'RPC peer distribution differs from expected by more than %d '
880                    'for instance %s (%s)' % (threshold, instance, stats))
881        return
882    raise Exception('RPC failures persisted through %d retries' % max_attempts)
883
884
885def test_secondary_locality_gets_no_requests_on_partial_primary_failure(
886        gcp,
887        backend_service,
888        primary_instance_group,
889        secondary_instance_group,
890        swapped_primary_and_secondary=False):
891    logger.info(
892        'Running secondary_locality_gets_no_requests_on_partial_primary_failure'
893    )
894    passed = True
895    try:
896        patch_backend_service(
897            gcp, backend_service,
898            [primary_instance_group, secondary_instance_group])
899        wait_for_healthy_backends(gcp, backend_service, primary_instance_group)
900        wait_for_healthy_backends(gcp, backend_service,
901                                  secondary_instance_group)
902        primary_instance_names = get_instance_names(gcp, primary_instance_group)
903        wait_until_all_rpcs_go_to_given_backends(primary_instance_names,
904                                                 _WAIT_FOR_STATS_SEC)
905        instances_to_stop = primary_instance_names[:1]
906        remaining_instances = primary_instance_names[1:]
907        try:
908            set_serving_status(instances_to_stop,
909                               gcp.service_port,
910                               serving=False)
911            wait_until_all_rpcs_go_to_given_backends(remaining_instances,
912                                                     _WAIT_FOR_BACKEND_SEC)
913        finally:
914            set_serving_status(primary_instance_names,
915                               gcp.service_port,
916                               serving=True)
917    except RpcDistributionError as e:
918        if not swapped_primary_and_secondary and is_primary_instance_group(
919                gcp, secondary_instance_group):
920            # Swap expectation of primary and secondary instance groups.
921            test_secondary_locality_gets_no_requests_on_partial_primary_failure(
922                gcp,
923                backend_service,
924                secondary_instance_group,
925                primary_instance_group,
926                swapped_primary_and_secondary=True)
927        else:
928            passed = False
929            raise e
930    finally:
931        if passed or not args.halt_after_fail:
932            patch_backend_service(gcp, backend_service,
933                                  [primary_instance_group])
934
935
936def test_secondary_locality_gets_requests_on_primary_failure(
937        gcp,
938        backend_service,
939        primary_instance_group,
940        secondary_instance_group,
941        swapped_primary_and_secondary=False):
942    logger.info('Running secondary_locality_gets_requests_on_primary_failure')
943    passed = True
944    try:
945        patch_backend_service(
946            gcp, backend_service,
947            [primary_instance_group, secondary_instance_group])
948        wait_for_healthy_backends(gcp, backend_service, primary_instance_group)
949        wait_for_healthy_backends(gcp, backend_service,
950                                  secondary_instance_group)
951        primary_instance_names = get_instance_names(gcp, primary_instance_group)
952        secondary_instance_names = get_instance_names(gcp,
953                                                      secondary_instance_group)
954        wait_until_all_rpcs_go_to_given_backends(primary_instance_names,
955                                                 _WAIT_FOR_STATS_SEC)
956        try:
957            set_serving_status(primary_instance_names,
958                               gcp.service_port,
959                               serving=False)
960            wait_until_all_rpcs_go_to_given_backends(secondary_instance_names,
961                                                     _WAIT_FOR_BACKEND_SEC)
962        finally:
963            set_serving_status(primary_instance_names,
964                               gcp.service_port,
965                               serving=True)
966    except RpcDistributionError as e:
967        if not swapped_primary_and_secondary and is_primary_instance_group(
968                gcp, secondary_instance_group):
969            # Swap expectation of primary and secondary instance groups.
970            test_secondary_locality_gets_requests_on_primary_failure(
971                gcp,
972                backend_service,
973                secondary_instance_group,
974                primary_instance_group,
975                swapped_primary_and_secondary=True)
976        else:
977            passed = False
978            raise e
979    finally:
980        if passed or not args.halt_after_fail:
981            patch_backend_service(gcp, backend_service,
982                                  [primary_instance_group])
983
984
985def prepare_services_for_urlmap_tests(gcp, original_backend_service,
986                                      instance_group, alternate_backend_service,
987                                      same_zone_instance_group):
988    '''
989    This function prepares the services to be ready for tests that modifies
990    urlmaps.
991
992    Returns:
993      Returns original and alternate backend names as lists of strings.
994    '''
995    logger.info('waiting for original backends to become healthy')
996    wait_for_healthy_backends(gcp, original_backend_service, instance_group)
997
998    patch_backend_service(gcp, alternate_backend_service,
999                          [same_zone_instance_group])
1000    logger.info('waiting for alternate to become healthy')
1001    wait_for_healthy_backends(gcp, alternate_backend_service,
1002                              same_zone_instance_group)
1003
1004    original_backend_instances = get_instance_names(gcp, instance_group)
1005    logger.info('original backends instances: %s', original_backend_instances)
1006
1007    alternate_backend_instances = get_instance_names(gcp,
1008                                                     same_zone_instance_group)
1009    logger.info('alternate backends instances: %s', alternate_backend_instances)
1010
1011    # Start with all traffic going to original_backend_service.
1012    logger.info('waiting for traffic to all go to original backends')
1013    wait_until_all_rpcs_go_to_given_backends(original_backend_instances,
1014                                             _WAIT_FOR_STATS_SEC)
1015    return original_backend_instances, alternate_backend_instances
1016
1017
1018def test_metadata_filter(gcp, original_backend_service, instance_group,
1019                         alternate_backend_service, same_zone_instance_group):
1020    logger.info("Running test_metadata_filter")
1021    wait_for_healthy_backends(gcp, original_backend_service, instance_group)
1022    original_backend_instances = get_instance_names(gcp, instance_group)
1023    alternate_backend_instances = get_instance_names(gcp,
1024                                                     same_zone_instance_group)
1025    patch_backend_service(gcp, alternate_backend_service,
1026                          [same_zone_instance_group])
1027    wait_for_healthy_backends(gcp, alternate_backend_service,
1028                              same_zone_instance_group)
1029    passed = True
1030    try:
1031        with open(bootstrap_path) as f:
1032            md = json.load(f)['node']['metadata']
1033            match_labels = []
1034            for k, v in list(md.items()):
1035                match_labels.append({'name': k, 'value': v})
1036
1037        not_match_labels = [{'name': 'fake', 'value': 'fail'}]
1038        test_route_rules = [
1039            # test MATCH_ALL
1040            [
1041                {
1042                    'priority': 0,
1043                    'matchRules': [{
1044                        'prefixMatch':
1045                            '/',
1046                        'metadataFilters': [{
1047                            'filterMatchCriteria': 'MATCH_ALL',
1048                            'filterLabels': not_match_labels
1049                        }]
1050                    }],
1051                    'service': original_backend_service.url
1052                },
1053                {
1054                    'priority': 1,
1055                    'matchRules': [{
1056                        'prefixMatch':
1057                            '/',
1058                        'metadataFilters': [{
1059                            'filterMatchCriteria': 'MATCH_ALL',
1060                            'filterLabels': match_labels
1061                        }]
1062                    }],
1063                    'service': alternate_backend_service.url
1064                },
1065            ],
1066            # test mixing MATCH_ALL and MATCH_ANY
1067            # test MATCH_ALL: super set labels won't match
1068            [
1069                {
1070                    'priority': 0,
1071                    'matchRules': [{
1072                        'prefixMatch':
1073                            '/',
1074                        'metadataFilters': [{
1075                            'filterMatchCriteria': 'MATCH_ALL',
1076                            'filterLabels': not_match_labels + match_labels
1077                        }]
1078                    }],
1079                    'service': original_backend_service.url
1080                },
1081                {
1082                    'priority': 1,
1083                    'matchRules': [{
1084                        'prefixMatch':
1085                            '/',
1086                        'metadataFilters': [{
1087                            'filterMatchCriteria': 'MATCH_ANY',
1088                            'filterLabels': not_match_labels + match_labels
1089                        }]
1090                    }],
1091                    'service': alternate_backend_service.url
1092                },
1093            ],
1094            # test MATCH_ANY
1095            [
1096                {
1097                    'priority': 0,
1098                    'matchRules': [{
1099                        'prefixMatch':
1100                            '/',
1101                        'metadataFilters': [{
1102                            'filterMatchCriteria': 'MATCH_ANY',
1103                            'filterLabels': not_match_labels
1104                        }]
1105                    }],
1106                    'service': original_backend_service.url
1107                },
1108                {
1109                    'priority': 1,
1110                    'matchRules': [{
1111                        'prefixMatch':
1112                            '/',
1113                        'metadataFilters': [{
1114                            'filterMatchCriteria': 'MATCH_ANY',
1115                            'filterLabels': not_match_labels + match_labels
1116                        }]
1117                    }],
1118                    'service': alternate_backend_service.url
1119                },
1120            ],
1121            # test match multiple route rules
1122            [
1123                {
1124                    'priority': 0,
1125                    'matchRules': [{
1126                        'prefixMatch':
1127                            '/',
1128                        'metadataFilters': [{
1129                            'filterMatchCriteria': 'MATCH_ANY',
1130                            'filterLabels': match_labels
1131                        }]
1132                    }],
1133                    'service': alternate_backend_service.url
1134                },
1135                {
1136                    'priority': 1,
1137                    'matchRules': [{
1138                        'prefixMatch':
1139                            '/',
1140                        'metadataFilters': [{
1141                            'filterMatchCriteria': 'MATCH_ALL',
1142                            'filterLabels': match_labels
1143                        }]
1144                    }],
1145                    'service': original_backend_service.url
1146                },
1147            ]
1148        ]
1149
1150        for route_rules in test_route_rules:
1151            wait_until_all_rpcs_go_to_given_backends(original_backend_instances,
1152                                                     _WAIT_FOR_STATS_SEC)
1153            patch_url_map_backend_service(gcp,
1154                                          original_backend_service,
1155                                          route_rules=route_rules)
1156            wait_until_no_rpcs_go_to_given_backends(original_backend_instances,
1157                                                    _WAIT_FOR_STATS_SEC)
1158            wait_until_all_rpcs_go_to_given_backends(
1159                alternate_backend_instances, _WAIT_FOR_STATS_SEC)
1160            patch_url_map_backend_service(gcp, original_backend_service)
1161    except Exception:
1162        passed = False
1163        raise
1164    finally:
1165        if passed or not args.halt_after_fail:
1166            patch_backend_service(gcp, alternate_backend_service, [])
1167
1168
1169def test_api_listener(gcp, backend_service, instance_group,
1170                      alternate_backend_service):
1171    logger.info("Running api_listener")
1172    passed = True
1173    try:
1174        wait_for_healthy_backends(gcp, backend_service, instance_group)
1175        backend_instances = get_instance_names(gcp, instance_group)
1176        wait_until_all_rpcs_go_to_given_backends(backend_instances,
1177                                                 _WAIT_FOR_STATS_SEC)
1178        # create a second suite of map+tp+fr with the same host name in host rule
1179        # and we have to disable proxyless validation because it needs `0.0.0.0`
1180        # ip address in fr for proxyless and also we violate ip:port uniqueness
1181        # for test purpose. See https://github.com/grpc/grpc-java/issues/8009
1182        new_config_suffix = '2'
1183        url_map_2 = create_url_map(gcp, url_map_name + new_config_suffix,
1184                                   backend_service, service_host_name)
1185        target_proxy_2 = create_target_proxy(
1186            gcp, target_proxy_name + new_config_suffix, False, url_map_2)
1187        if not gcp.service_port:
1188            raise Exception(
1189                'Faied to find a valid port for the forwarding rule')
1190        potential_ip_addresses = []
1191        max_attempts = 10
1192        for i in range(max_attempts):
1193            potential_ip_addresses.append('10.10.10.%d' %
1194                                          (random.randint(0, 255)))
1195        create_global_forwarding_rule(gcp,
1196                                      forwarding_rule_name + new_config_suffix,
1197                                      [gcp.service_port],
1198                                      potential_ip_addresses, target_proxy_2)
1199        if gcp.service_port != _DEFAULT_SERVICE_PORT:
1200            patch_url_map_host_rule_with_port(gcp,
1201                                              url_map_name + new_config_suffix,
1202                                              backend_service,
1203                                              service_host_name)
1204        wait_until_all_rpcs_go_to_given_backends(backend_instances,
1205                                                 _WAIT_FOR_STATS_SEC)
1206
1207        delete_global_forwarding_rule(gcp, gcp.global_forwarding_rules[0])
1208        delete_target_proxy(gcp, gcp.target_proxies[0])
1209        delete_url_map(gcp, gcp.url_maps[0])
1210        verify_attempts = int(_WAIT_FOR_URL_MAP_PATCH_SEC / _NUM_TEST_RPCS *
1211                              args.qps)
1212        for i in range(verify_attempts):
1213            wait_until_all_rpcs_go_to_given_backends(backend_instances,
1214                                                     _WAIT_FOR_STATS_SEC)
1215        # delete host rule for the original host name
1216        patch_url_map_backend_service(gcp, alternate_backend_service)
1217        wait_until_no_rpcs_go_to_given_backends(backend_instances,
1218                                                _WAIT_FOR_STATS_SEC)
1219
1220    except Exception:
1221        passed = False
1222        raise
1223    finally:
1224        if passed or not args.halt_after_fail:
1225            delete_global_forwarding_rules(gcp)
1226            delete_target_proxies(gcp)
1227            delete_url_maps(gcp)
1228            create_url_map(gcp, url_map_name, backend_service,
1229                           service_host_name)
1230            create_target_proxy(gcp, target_proxy_name)
1231            create_global_forwarding_rule(gcp, forwarding_rule_name,
1232                                          potential_service_ports)
1233            if gcp.service_port != _DEFAULT_SERVICE_PORT:
1234                patch_url_map_host_rule_with_port(gcp, url_map_name,
1235                                                  backend_service,
1236                                                  service_host_name)
1237                server_uri = service_host_name + ':' + str(gcp.service_port)
1238            else:
1239                server_uri = service_host_name
1240            return server_uri
1241
1242
1243def test_forwarding_rule_port_match(gcp, backend_service, instance_group):
1244    logger.info("Running test_forwarding_rule_port_match")
1245    passed = True
1246    try:
1247        wait_for_healthy_backends(gcp, backend_service, instance_group)
1248        backend_instances = get_instance_names(gcp, instance_group)
1249        wait_until_all_rpcs_go_to_given_backends(backend_instances,
1250                                                 _WAIT_FOR_STATS_SEC)
1251        delete_global_forwarding_rules(gcp)
1252        create_global_forwarding_rule(gcp, forwarding_rule_name, [
1253            x for x in parse_port_range(_DEFAULT_PORT_RANGE)
1254            if x != gcp.service_port
1255        ])
1256        wait_until_no_rpcs_go_to_given_backends(backend_instances,
1257                                                _WAIT_FOR_STATS_SEC)
1258    except Exception:
1259        passed = False
1260        raise
1261    finally:
1262        if passed or not args.halt_after_fail:
1263            delete_global_forwarding_rules(gcp)
1264            create_global_forwarding_rule(gcp, forwarding_rule_name,
1265                                          potential_service_ports)
1266            if gcp.service_port != _DEFAULT_SERVICE_PORT:
1267                patch_url_map_host_rule_with_port(gcp, url_map_name,
1268                                                  backend_service,
1269                                                  service_host_name)
1270                server_uri = service_host_name + ':' + str(gcp.service_port)
1271            else:
1272                server_uri = service_host_name
1273            return server_uri
1274
1275
1276def test_forwarding_rule_default_port(gcp, backend_service, instance_group):
1277    logger.info("Running test_forwarding_rule_default_port")
1278    passed = True
1279    try:
1280        wait_for_healthy_backends(gcp, backend_service, instance_group)
1281        backend_instances = get_instance_names(gcp, instance_group)
1282        if gcp.service_port == _DEFAULT_SERVICE_PORT:
1283            wait_until_all_rpcs_go_to_given_backends(backend_instances,
1284                                                     _WAIT_FOR_STATS_SEC)
1285            delete_global_forwarding_rules(gcp)
1286            create_global_forwarding_rule(gcp, forwarding_rule_name,
1287                                          parse_port_range(_DEFAULT_PORT_RANGE))
1288            patch_url_map_host_rule_with_port(gcp, url_map_name,
1289                                              backend_service,
1290                                              service_host_name)
1291        wait_until_no_rpcs_go_to_given_backends(backend_instances,
1292                                                _WAIT_FOR_STATS_SEC)
1293        # expect success when no port in client request service uri, and no port in url-map
1294        delete_global_forwarding_rule(gcp, gcp.global_forwarding_rules[0])
1295        delete_target_proxy(gcp, gcp.target_proxies[0])
1296        delete_url_map(gcp, gcp.url_maps[0])
1297        create_url_map(gcp, url_map_name, backend_service, service_host_name)
1298        create_target_proxy(gcp, target_proxy_name, False)
1299        potential_ip_addresses = []
1300        max_attempts = 10
1301        for i in range(max_attempts):
1302            potential_ip_addresses.append('10.10.10.%d' %
1303                                          (random.randint(0, 255)))
1304        create_global_forwarding_rule(gcp, forwarding_rule_name, [80],
1305                                      potential_ip_addresses)
1306        wait_until_all_rpcs_go_to_given_backends(backend_instances,
1307                                                 _WAIT_FOR_STATS_SEC)
1308
1309        # expect failure when no port in client request uri, but specify port in url-map
1310        patch_url_map_host_rule_with_port(gcp, url_map_name, backend_service,
1311                                          service_host_name)
1312        wait_until_no_rpcs_go_to_given_backends(backend_instances,
1313                                                _WAIT_FOR_STATS_SEC)
1314    except Exception:
1315        passed = False
1316        raise
1317    finally:
1318        if passed or not args.halt_after_fail:
1319            delete_global_forwarding_rules(gcp)
1320            delete_target_proxies(gcp)
1321            delete_url_maps(gcp)
1322            create_url_map(gcp, url_map_name, backend_service,
1323                           service_host_name)
1324            create_target_proxy(gcp, target_proxy_name)
1325            create_global_forwarding_rule(gcp, forwarding_rule_name,
1326                                          potential_service_ports)
1327            if gcp.service_port != _DEFAULT_SERVICE_PORT:
1328                patch_url_map_host_rule_with_port(gcp, url_map_name,
1329                                                  backend_service,
1330                                                  service_host_name)
1331                server_uri = service_host_name + ':' + str(gcp.service_port)
1332            else:
1333                server_uri = service_host_name
1334            return server_uri
1335
1336
1337def test_traffic_splitting(gcp, original_backend_service, instance_group,
1338                           alternate_backend_service, same_zone_instance_group):
1339    # This test start with all traffic going to original_backend_service. Then
1340    # it updates URL-map to set default action to traffic splitting between
1341    # original and alternate. It waits for all backends in both services to
1342    # receive traffic, then verifies that weights are expected.
1343    logger.info('Running test_traffic_splitting')
1344
1345    original_backend_instances, alternate_backend_instances = prepare_services_for_urlmap_tests(
1346        gcp, original_backend_service, instance_group,
1347        alternate_backend_service, same_zone_instance_group)
1348
1349    passed = True
1350    try:
1351        # Patch urlmap, change route action to traffic splitting between
1352        # original and alternate.
1353        logger.info('patching url map with traffic splitting')
1354        original_service_percentage, alternate_service_percentage = 20, 80
1355        patch_url_map_backend_service(
1356            gcp,
1357            services_with_weights={
1358                original_backend_service: original_service_percentage,
1359                alternate_backend_service: alternate_service_percentage,
1360            })
1361        # Split percentage between instances: [20,80] -> [10,10,40,40].
1362        expected_instance_percentage = [
1363            original_service_percentage * 1.0 / len(original_backend_instances)
1364        ] * len(original_backend_instances) + [
1365            alternate_service_percentage * 1.0 /
1366            len(alternate_backend_instances)
1367        ] * len(alternate_backend_instances)
1368
1369        # Wait for traffic to go to both services.
1370        logger.info(
1371            'waiting for traffic to go to all backends (including alternate)')
1372        wait_until_all_rpcs_go_to_given_backends(
1373            original_backend_instances + alternate_backend_instances,
1374            _WAIT_FOR_STATS_SEC)
1375
1376        # Verify that weights between two services are expected.
1377        retry_count = 10
1378        # Each attempt takes about 10 seconds, 10 retries is equivalent to 100
1379        # seconds timeout.
1380        for i in range(retry_count):
1381            stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
1382            got_instance_count = [
1383                stats.rpcs_by_peer[i] for i in original_backend_instances
1384            ] + [stats.rpcs_by_peer[i] for i in alternate_backend_instances]
1385            total_count = sum(got_instance_count)
1386            got_instance_percentage = [
1387                x * 100.0 / total_count for x in got_instance_count
1388            ]
1389
1390            try:
1391                compare_distributions(got_instance_percentage,
1392                                      expected_instance_percentage, 5)
1393            except Exception as e:
1394                logger.info('attempt %d', i)
1395                logger.info('got percentage: %s', got_instance_percentage)
1396                logger.info('expected percentage: %s',
1397                            expected_instance_percentage)
1398                logger.info(e)
1399                if i == retry_count - 1:
1400                    raise Exception(
1401                        'RPC distribution (%s) differs from expected (%s)' %
1402                        (got_instance_percentage, expected_instance_percentage))
1403            else:
1404                logger.info("success")
1405                break
1406    except Exception:
1407        passed = False
1408        raise
1409    finally:
1410        if passed or not args.halt_after_fail:
1411            patch_url_map_backend_service(gcp, original_backend_service)
1412            patch_backend_service(gcp, alternate_backend_service, [])
1413
1414
1415def test_path_matching(gcp, original_backend_service, instance_group,
1416                       alternate_backend_service, same_zone_instance_group):
1417    # This test start with all traffic (UnaryCall and EmptyCall) going to
1418    # original_backend_service.
1419    #
1420    # Then it updates URL-map to add routes, to make UnaryCall and EmptyCall to
1421    # go different backends. It waits for all backends in both services to
1422    # receive traffic, then verifies that traffic goes to the expected
1423    # backends.
1424    logger.info('Running test_path_matching')
1425
1426    original_backend_instances, alternate_backend_instances = prepare_services_for_urlmap_tests(
1427        gcp, original_backend_service, instance_group,
1428        alternate_backend_service, same_zone_instance_group)
1429
1430    passed = True
1431    try:
1432        # A list of tuples (route_rules, expected_instances).
1433        test_cases = [
1434            (
1435                [{
1436                    'priority': 0,
1437                    # FullPath EmptyCall -> alternate_backend_service.
1438                    'matchRules': [{
1439                        'fullPathMatch': '/grpc.testing.TestService/EmptyCall'
1440                    }],
1441                    'service': alternate_backend_service.url
1442                }],
1443                {
1444                    "EmptyCall": alternate_backend_instances,
1445                    "UnaryCall": original_backend_instances
1446                }),
1447            (
1448                [{
1449                    'priority': 0,
1450                    # Prefix UnaryCall -> alternate_backend_service.
1451                    'matchRules': [{
1452                        'prefixMatch': '/grpc.testing.TestService/Unary'
1453                    }],
1454                    'service': alternate_backend_service.url
1455                }],
1456                {
1457                    "UnaryCall": alternate_backend_instances,
1458                    "EmptyCall": original_backend_instances
1459                }),
1460            (
1461                # This test case is similar to the one above (but with route
1462                # services swapped). This test has two routes (full_path and
1463                # the default) to match EmptyCall, and both routes set
1464                # alternative_backend_service as the action. This forces the
1465                # client to handle duplicate Clusters in the RDS response.
1466                [
1467                    {
1468                        'priority': 0,
1469                        # Prefix UnaryCall -> original_backend_service.
1470                        'matchRules': [{
1471                            'prefixMatch': '/grpc.testing.TestService/Unary'
1472                        }],
1473                        'service': original_backend_service.url
1474                    },
1475                    {
1476                        'priority': 1,
1477                        # FullPath EmptyCall -> alternate_backend_service.
1478                        'matchRules': [{
1479                            'fullPathMatch':
1480                                '/grpc.testing.TestService/EmptyCall'
1481                        }],
1482                        'service': alternate_backend_service.url
1483                    }
1484                ],
1485                {
1486                    "UnaryCall": original_backend_instances,
1487                    "EmptyCall": alternate_backend_instances
1488                }),
1489            (
1490                [{
1491                    'priority': 0,
1492                    # Regex UnaryCall -> alternate_backend_service.
1493                    'matchRules': [{
1494                        'regexMatch':
1495                            '^\/.*\/UnaryCall$'  # Unary methods with any services.
1496                    }],
1497                    'service': alternate_backend_service.url
1498                }],
1499                {
1500                    "UnaryCall": alternate_backend_instances,
1501                    "EmptyCall": original_backend_instances
1502                }),
1503            (
1504                [{
1505                    'priority': 0,
1506                    # ignoreCase EmptyCall -> alternate_backend_service.
1507                    'matchRules': [{
1508                        # Case insensitive matching.
1509                        'fullPathMatch': '/gRpC.tEsTinG.tEstseRvice/empTycaLl',
1510                        'ignoreCase': True,
1511                    }],
1512                    'service': alternate_backend_service.url
1513                }],
1514                {
1515                    "UnaryCall": original_backend_instances,
1516                    "EmptyCall": alternate_backend_instances
1517                }),
1518        ]
1519
1520        for (route_rules, expected_instances) in test_cases:
1521            logger.info('patching url map with %s', route_rules)
1522            patch_url_map_backend_service(gcp,
1523                                          original_backend_service,
1524                                          route_rules=route_rules)
1525
1526            # Wait for traffic to go to both services.
1527            logger.info(
1528                'waiting for traffic to go to all backends (including alternate)'
1529            )
1530            wait_until_all_rpcs_go_to_given_backends(
1531                original_backend_instances + alternate_backend_instances,
1532                _WAIT_FOR_STATS_SEC)
1533
1534            retry_count = 80
1535            # Each attempt takes about 5 seconds, 80 retries is equivalent to 400
1536            # seconds timeout.
1537            for i in range(retry_count):
1538                stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
1539                if not stats.rpcs_by_method:
1540                    raise ValueError(
1541                        'stats.rpcs_by_method is None, the interop client stats service does not support this test case'
1542                    )
1543                logger.info('attempt %d', i)
1544                if compare_expected_instances(stats, expected_instances):
1545                    logger.info("success")
1546                    break
1547                elif i == retry_count - 1:
1548                    raise Exception(
1549                        'timeout waiting for RPCs to the expected instances: %s'
1550                        % expected_instances)
1551    except Exception:
1552        passed = False
1553        raise
1554    finally:
1555        if passed or not args.halt_after_fail:
1556            patch_url_map_backend_service(gcp, original_backend_service)
1557            patch_backend_service(gcp, alternate_backend_service, [])
1558
1559
1560def test_header_matching(gcp, original_backend_service, instance_group,
1561                         alternate_backend_service, same_zone_instance_group):
1562    # This test start with all traffic (UnaryCall and EmptyCall) going to
1563    # original_backend_service.
1564    #
1565    # Then it updates URL-map to add routes, to make RPCs with test headers to
1566    # go to different backends. It waits for all backends in both services to
1567    # receive traffic, then verifies that traffic goes to the expected
1568    # backends.
1569    logger.info('Running test_header_matching')
1570
1571    original_backend_instances, alternate_backend_instances = prepare_services_for_urlmap_tests(
1572        gcp, original_backend_service, instance_group,
1573        alternate_backend_service, same_zone_instance_group)
1574
1575    passed = True
1576    try:
1577        # A list of tuples (route_rules, expected_instances).
1578        test_cases = [
1579            (
1580                [{
1581                    'priority': 0,
1582                    # Header ExactMatch -> alternate_backend_service.
1583                    # EmptyCall is sent with the metadata.
1584                    'matchRules': [{
1585                        'prefixMatch':
1586                            '/',
1587                        'headerMatches': [{
1588                            'headerName': _TEST_METADATA_KEY,
1589                            'exactMatch': _TEST_METADATA_VALUE_EMPTY
1590                        }]
1591                    }],
1592                    'service': alternate_backend_service.url
1593                }],
1594                {
1595                    "EmptyCall": alternate_backend_instances,
1596                    "UnaryCall": original_backend_instances
1597                }),
1598            (
1599                [{
1600                    'priority': 0,
1601                    # Header PrefixMatch -> alternate_backend_service.
1602                    # UnaryCall is sent with the metadata.
1603                    'matchRules': [{
1604                        'prefixMatch':
1605                            '/',
1606                        'headerMatches': [{
1607                            'headerName': _TEST_METADATA_KEY,
1608                            'prefixMatch': _TEST_METADATA_VALUE_UNARY[:2]
1609                        }]
1610                    }],
1611                    'service': alternate_backend_service.url
1612                }],
1613                {
1614                    "EmptyCall": original_backend_instances,
1615                    "UnaryCall": alternate_backend_instances
1616                }),
1617            (
1618                [{
1619                    'priority': 0,
1620                    # Header SuffixMatch -> alternate_backend_service.
1621                    # EmptyCall is sent with the metadata.
1622                    'matchRules': [{
1623                        'prefixMatch':
1624                            '/',
1625                        'headerMatches': [{
1626                            'headerName': _TEST_METADATA_KEY,
1627                            'suffixMatch': _TEST_METADATA_VALUE_EMPTY[-2:]
1628                        }]
1629                    }],
1630                    'service': alternate_backend_service.url
1631                }],
1632                {
1633                    "EmptyCall": alternate_backend_instances,
1634                    "UnaryCall": original_backend_instances
1635                }),
1636            (
1637                [{
1638                    'priority': 0,
1639                    # Header 'xds_md_numeric' present -> alternate_backend_service.
1640                    # UnaryCall is sent with the metadata, so will be sent to alternative.
1641                    'matchRules': [{
1642                        'prefixMatch':
1643                            '/',
1644                        'headerMatches': [{
1645                            'headerName': _TEST_METADATA_NUMERIC_KEY,
1646                            'presentMatch': True
1647                        }]
1648                    }],
1649                    'service': alternate_backend_service.url
1650                }],
1651                {
1652                    "EmptyCall": original_backend_instances,
1653                    "UnaryCall": alternate_backend_instances
1654                }),
1655            (
1656                [{
1657                    'priority': 0,
1658                    # Header invert ExactMatch -> alternate_backend_service.
1659                    # UnaryCall is sent with the metadata, so will be sent to
1660                    # original. EmptyCall will be sent to alternative.
1661                    'matchRules': [{
1662                        'prefixMatch':
1663                            '/',
1664                        'headerMatches': [{
1665                            'headerName': _TEST_METADATA_KEY,
1666                            'exactMatch': _TEST_METADATA_VALUE_UNARY,
1667                            'invertMatch': True
1668                        }]
1669                    }],
1670                    'service': alternate_backend_service.url
1671                }],
1672                {
1673                    "EmptyCall": alternate_backend_instances,
1674                    "UnaryCall": original_backend_instances
1675                }),
1676            (
1677                [{
1678                    'priority': 0,
1679                    # Header 'xds_md_numeric' range [100,200] -> alternate_backend_service.
1680                    # UnaryCall is sent with the metadata in range.
1681                    'matchRules': [{
1682                        'prefixMatch':
1683                            '/',
1684                        'headerMatches': [{
1685                            'headerName': _TEST_METADATA_NUMERIC_KEY,
1686                            'rangeMatch': {
1687                                'rangeStart': '100',
1688                                'rangeEnd': '200'
1689                            }
1690                        }]
1691                    }],
1692                    'service': alternate_backend_service.url
1693                }],
1694                {
1695                    "EmptyCall": original_backend_instances,
1696                    "UnaryCall": alternate_backend_instances
1697                }),
1698            (
1699                [{
1700                    'priority': 0,
1701                    # Header RegexMatch -> alternate_backend_service.
1702                    # EmptyCall is sent with the metadata.
1703                    'matchRules': [{
1704                        'prefixMatch':
1705                            '/',
1706                        'headerMatches': [{
1707                            'headerName':
1708                                _TEST_METADATA_KEY,
1709                            'regexMatch':
1710                                "^%s.*%s$" % (_TEST_METADATA_VALUE_EMPTY[:2],
1711                                              _TEST_METADATA_VALUE_EMPTY[-2:])
1712                        }]
1713                    }],
1714                    'service': alternate_backend_service.url
1715                }],
1716                {
1717                    "EmptyCall": alternate_backend_instances,
1718                    "UnaryCall": original_backend_instances
1719                }),
1720        ]
1721
1722        for (route_rules, expected_instances) in test_cases:
1723            logger.info('patching url map with %s -> alternative',
1724                        route_rules[0]['matchRules'])
1725            patch_url_map_backend_service(gcp,
1726                                          original_backend_service,
1727                                          route_rules=route_rules)
1728
1729            # Wait for traffic to go to both services.
1730            logger.info(
1731                'waiting for traffic to go to all backends (including alternate)'
1732            )
1733            wait_until_all_rpcs_go_to_given_backends(
1734                original_backend_instances + alternate_backend_instances,
1735                _WAIT_FOR_STATS_SEC)
1736
1737            retry_count = 80
1738            # Each attempt takes about 5 seconds, 80 retries is equivalent to 400
1739            # seconds timeout.
1740            for i in range(retry_count):
1741                stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
1742                if not stats.rpcs_by_method:
1743                    raise ValueError(
1744                        'stats.rpcs_by_method is None, the interop client stats service does not support this test case'
1745                    )
1746                logger.info('attempt %d', i)
1747                if compare_expected_instances(stats, expected_instances):
1748                    logger.info("success")
1749                    break
1750                elif i == retry_count - 1:
1751                    raise Exception(
1752                        'timeout waiting for RPCs to the expected instances: %s'
1753                        % expected_instances)
1754    except Exception:
1755        passed = False
1756        raise
1757    finally:
1758        if passed or not args.halt_after_fail:
1759            patch_url_map_backend_service(gcp, original_backend_service)
1760            patch_backend_service(gcp, alternate_backend_service, [])
1761
1762
1763def test_circuit_breaking(gcp, original_backend_service, instance_group,
1764                          same_zone_instance_group):
1765    '''
1766    Since backend service circuit_breakers configuration cannot be unset,
1767    which causes trouble for restoring validate_for_proxy flag in target
1768    proxy/global forwarding rule. This test uses dedicated backend sevices.
1769    The url_map and backend services undergoes the following state changes:
1770
1771    Before test:
1772       original_backend_service -> [instance_group]
1773       extra_backend_service -> []
1774       more_extra_backend_service -> []
1775
1776       url_map -> [original_backend_service]
1777
1778    In test:
1779       extra_backend_service (with circuit_breakers) -> [instance_group]
1780       more_extra_backend_service (with circuit_breakers) -> [same_zone_instance_group]
1781
1782       url_map -> [extra_backend_service, more_extra_backend_service]
1783
1784    After test:
1785       original_backend_service -> [instance_group]
1786       extra_backend_service (with circuit_breakers) -> []
1787       more_extra_backend_service (with circuit_breakers) -> []
1788
1789       url_map -> [original_backend_service]
1790    '''
1791    logger.info('Running test_circuit_breaking')
1792    additional_backend_services = []
1793    passed = True
1794    try:
1795        # TODO(chengyuanzhang): Dedicated backend services created for circuit
1796        # breaking test. Once the issue for unsetting backend service circuit
1797        # breakers is resolved or configuring backend service circuit breakers is
1798        # enabled for config validation, these dedicated backend services can be
1799        # eliminated.
1800        extra_backend_service_name = _BASE_BACKEND_SERVICE_NAME + '-extra' + gcp_suffix
1801        more_extra_backend_service_name = _BASE_BACKEND_SERVICE_NAME + '-more-extra' + gcp_suffix
1802        extra_backend_service = add_backend_service(gcp,
1803                                                    extra_backend_service_name)
1804        additional_backend_services.append(extra_backend_service)
1805        more_extra_backend_service = add_backend_service(
1806            gcp, more_extra_backend_service_name)
1807        additional_backend_services.append(more_extra_backend_service)
1808        # The config validation for proxyless doesn't allow setting
1809        # circuit_breakers. Disable validate validate_for_proxyless
1810        # for this test. This can be removed when validation
1811        # accepts circuit_breakers.
1812        logger.info('disabling validate_for_proxyless in target proxy')
1813        set_validate_for_proxyless(gcp, False)
1814        extra_backend_service_max_requests = 500
1815        more_extra_backend_service_max_requests = 1000
1816        patch_backend_service(gcp,
1817                              extra_backend_service, [instance_group],
1818                              circuit_breakers={
1819                                  'maxRequests':
1820                                      extra_backend_service_max_requests
1821                              })
1822        logger.info('Waiting for extra backends to become healthy')
1823        wait_for_healthy_backends(gcp, extra_backend_service, instance_group)
1824        patch_backend_service(gcp,
1825                              more_extra_backend_service,
1826                              [same_zone_instance_group],
1827                              circuit_breakers={
1828                                  'maxRequests':
1829                                      more_extra_backend_service_max_requests
1830                              })
1831        logger.info('Waiting for more extra backend to become healthy')
1832        wait_for_healthy_backends(gcp, more_extra_backend_service,
1833                                  same_zone_instance_group)
1834        extra_backend_instances = get_instance_names(gcp, instance_group)
1835        more_extra_backend_instances = get_instance_names(
1836            gcp, same_zone_instance_group)
1837        route_rules = [
1838            {
1839                'priority': 0,
1840                # UnaryCall -> extra_backend_service
1841                'matchRules': [{
1842                    'fullPathMatch': '/grpc.testing.TestService/UnaryCall'
1843                }],
1844                'service': extra_backend_service.url
1845            },
1846            {
1847                'priority': 1,
1848                # EmptyCall -> more_extra_backend_service
1849                'matchRules': [{
1850                    'fullPathMatch': '/grpc.testing.TestService/EmptyCall'
1851                }],
1852                'service': more_extra_backend_service.url
1853            },
1854        ]
1855
1856        # Make client send UNARY_CALL and EMPTY_CALL.
1857        configure_client([
1858            messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
1859            messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL
1860        ])
1861        logger.info('Patching url map with %s', route_rules)
1862        patch_url_map_backend_service(gcp,
1863                                      extra_backend_service,
1864                                      route_rules=route_rules)
1865        logger.info('Waiting for traffic to go to all backends')
1866        wait_until_all_rpcs_go_to_given_backends(
1867            extra_backend_instances + more_extra_backend_instances,
1868            _WAIT_FOR_STATS_SEC)
1869
1870        # Make all calls keep-open.
1871        configure_client([
1872            messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
1873            messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL
1874        ], [(messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
1875             'rpc-behavior', 'keep-open'),
1876            (messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL,
1877             'rpc-behavior', 'keep-open')])
1878        wait_until_rpcs_in_flight(
1879            'UNARY_CALL', (_WAIT_FOR_BACKEND_SEC +
1880                           int(extra_backend_service_max_requests / args.qps)),
1881            extra_backend_service_max_requests, 1)
1882        logger.info('UNARY_CALL reached stable state (%d)',
1883                    extra_backend_service_max_requests)
1884        wait_until_rpcs_in_flight(
1885            'EMPTY_CALL',
1886            (_WAIT_FOR_BACKEND_SEC +
1887             int(more_extra_backend_service_max_requests / args.qps)),
1888            more_extra_backend_service_max_requests, 1)
1889        logger.info('EMPTY_CALL reached stable state (%d)',
1890                    more_extra_backend_service_max_requests)
1891
1892        # Increment circuit breakers max_requests threshold.
1893        extra_backend_service_max_requests = 800
1894        patch_backend_service(gcp,
1895                              extra_backend_service, [instance_group],
1896                              circuit_breakers={
1897                                  'maxRequests':
1898                                      extra_backend_service_max_requests
1899                              })
1900        wait_until_rpcs_in_flight(
1901            'UNARY_CALL', (_WAIT_FOR_BACKEND_SEC +
1902                           int(extra_backend_service_max_requests / args.qps)),
1903            extra_backend_service_max_requests, 1)
1904        logger.info('UNARY_CALL reached stable state after increase (%d)',
1905                    extra_backend_service_max_requests)
1906        logger.info('success')
1907        # Avoid new RPCs being outstanding (some test clients create threads
1908        # for sending RPCs) after restoring backend services.
1909        configure_client(
1910            [messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL])
1911    except Exception:
1912        passed = False
1913        raise
1914    finally:
1915        if passed or not args.halt_after_fail:
1916            patch_url_map_backend_service(gcp, original_backend_service)
1917            patch_backend_service(gcp, original_backend_service,
1918                                  [instance_group])
1919            for backend_service in additional_backend_services:
1920                delete_backend_service(gcp, backend_service)
1921            set_validate_for_proxyless(gcp, True)
1922
1923
1924def test_timeout(gcp, original_backend_service, instance_group):
1925    logger.info('Running test_timeout')
1926
1927    logger.info('waiting for original backends to become healthy')
1928    wait_for_healthy_backends(gcp, original_backend_service, instance_group)
1929
1930    # UnaryCall -> maxStreamDuration:3s
1931    route_rules = [{
1932        'priority': 0,
1933        'matchRules': [{
1934            'fullPathMatch': '/grpc.testing.TestService/UnaryCall'
1935        }],
1936        'service': original_backend_service.url,
1937        'routeAction': {
1938            'maxStreamDuration': {
1939                'seconds': 3,
1940            },
1941        },
1942    }]
1943    patch_url_map_backend_service(gcp,
1944                                  original_backend_service,
1945                                  route_rules=route_rules)
1946    # A list of tuples (testcase_name, {client_config}, {expected_results})
1947    test_cases = [
1948        (
1949            'timeout_exceeded (UNARY_CALL), timeout_different_route (EMPTY_CALL)',
1950            # UnaryCall and EmptyCall both sleep-4.
1951            # UnaryCall timeouts, EmptyCall succeeds.
1952            {
1953                'rpc_types': [
1954                    messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
1955                    messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL,
1956                ],
1957                'metadata': [
1958                    (messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
1959                     'rpc-behavior', 'sleep-4'),
1960                    (messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL,
1961                     'rpc-behavior', 'sleep-4'),
1962                ],
1963            },
1964            {
1965                'UNARY_CALL': 4,  # DEADLINE_EXCEEDED
1966                'EMPTY_CALL': 0,
1967            },
1968        ),
1969        (
1970            'app_timeout_exceeded',
1971            # UnaryCall only with sleep-2; timeout=1s; calls timeout.
1972            {
1973                'rpc_types': [
1974                    messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
1975                ],
1976                'metadata': [
1977                    (messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
1978                     'rpc-behavior', 'sleep-2'),
1979                ],
1980                'timeout_sec': 1,
1981            },
1982            {
1983                'UNARY_CALL': 4,  # DEADLINE_EXCEEDED
1984            },
1985        ),
1986        (
1987            'timeout_not_exceeded',
1988            # UnaryCall only with no sleep; calls succeed.
1989            {
1990                'rpc_types': [
1991                    messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
1992                ],
1993            },
1994            {
1995                'UNARY_CALL': 0,
1996            },
1997        )
1998    ]
1999
2000    passed = True
2001    try:
2002        first_case = True
2003        for (testcase_name, client_config, expected_results) in test_cases:
2004            logger.info('starting case %s', testcase_name)
2005            configure_client(**client_config)
2006            # wait a second to help ensure the client stops sending RPCs with
2007            # the old config.  We will make multiple attempts if it is failing,
2008            # but this improves confidence that the test is valid if the
2009            # previous client_config would lead to the same results.
2010            time.sleep(1)
2011            # Each attempt takes 10 seconds; 20 attempts is equivalent to 200
2012            # second timeout.
2013            attempt_count = 20
2014            if first_case:
2015                attempt_count = 120
2016                first_case = False
2017            before_stats = get_client_accumulated_stats()
2018            if not before_stats.stats_per_method:
2019                raise ValueError(
2020                    'stats.stats_per_method is None, the interop client stats service does not support this test case'
2021                )
2022            for i in range(attempt_count):
2023                logger.info('%s: attempt %d', testcase_name, i)
2024
2025                test_runtime_secs = 10
2026                time.sleep(test_runtime_secs)
2027                after_stats = get_client_accumulated_stats()
2028
2029                success = True
2030                for rpc, status in list(expected_results.items()):
2031                    qty = (after_stats.stats_per_method[rpc].result[status] -
2032                           before_stats.stats_per_method[rpc].result[status])
2033                    want = test_runtime_secs * args.qps
2034                    # Allow 10% deviation from expectation to reduce flakiness
2035                    if qty < (want * .9) or qty > (want * 1.1):
2036                        logger.info('%s: failed due to %s[%s]: got %d want ~%d',
2037                                    testcase_name, rpc, status, qty, want)
2038                        success = False
2039                if success:
2040                    logger.info('success')
2041                    break
2042                logger.info('%s attempt %d failed', testcase_name, i)
2043                before_stats = after_stats
2044            else:
2045                raise Exception(
2046                    '%s: timeout waiting for expected results: %s; got %s' %
2047                    (testcase_name, expected_results,
2048                     after_stats.stats_per_method))
2049    except Exception:
2050        passed = False
2051        raise
2052    finally:
2053        if passed or not args.halt_after_fail:
2054            patch_url_map_backend_service(gcp, original_backend_service)
2055
2056
2057def test_fault_injection(gcp, original_backend_service, instance_group):
2058    logger.info('Running test_fault_injection')
2059
2060    logger.info('waiting for original backends to become healthy')
2061    wait_for_healthy_backends(gcp, original_backend_service, instance_group)
2062
2063    testcase_header = 'fi_testcase'
2064
2065    def _route(pri, name, fi_policy):
2066        return {
2067            'priority': pri,
2068            'matchRules': [{
2069                'prefixMatch':
2070                    '/',
2071                'headerMatches': [{
2072                    'headerName': testcase_header,
2073                    'exactMatch': name,
2074                }],
2075            }],
2076            'service': original_backend_service.url,
2077            'routeAction': {
2078                'faultInjectionPolicy': fi_policy
2079            },
2080        }
2081
2082    def _abort(pct):
2083        return {
2084            'abort': {
2085                'httpStatus': 401,
2086                'percentage': pct,
2087            }
2088        }
2089
2090    def _delay(pct):
2091        return {
2092            'delay': {
2093                'fixedDelay': {
2094                    'seconds': '20'
2095                },
2096                'percentage': pct,
2097            }
2098        }
2099
2100    zero_route = _abort(0)
2101    zero_route.update(_delay(0))
2102    route_rules = [
2103        _route(0, 'zero_percent_fault_injection', zero_route),
2104        _route(1, 'always_delay', _delay(100)),
2105        _route(2, 'always_abort', _abort(100)),
2106        _route(3, 'delay_half', _delay(50)),
2107        _route(4, 'abort_half', _abort(50)),
2108        {
2109            'priority': 5,
2110            'matchRules': [{
2111                'prefixMatch': '/'
2112            }],
2113            'service': original_backend_service.url,
2114        },
2115    ]
2116    set_validate_for_proxyless(gcp, False)
2117    patch_url_map_backend_service(gcp,
2118                                  original_backend_service,
2119                                  route_rules=route_rules)
2120    # A list of tuples (testcase_name, {client_config}, {code: percent}).  Each
2121    # test case will set the testcase_header with the testcase_name for routing
2122    # to the appropriate config for the case, defined above.
2123    test_cases = [
2124        (
2125            'always_delay',
2126            {
2127                'timeout_sec': 2
2128            },
2129            {
2130                4: 1
2131            },  # DEADLINE_EXCEEDED
2132        ),
2133        (
2134            'always_abort',
2135            {},
2136            {
2137                16: 1
2138            },  # UNAUTHENTICATED
2139        ),
2140        (
2141            'delay_half',
2142            {
2143                'timeout_sec': 2
2144            },
2145            {
2146                4: .5,
2147                0: .5
2148            },  # DEADLINE_EXCEEDED / OK: 50% / 50%
2149        ),
2150        (
2151            'abort_half',
2152            {},
2153            {
2154                16: .5,
2155                0: .5
2156            },  # UNAUTHENTICATED / OK: 50% / 50%
2157        ),
2158        (
2159            'zero_percent_fault_injection',
2160            {},
2161            {
2162                0: 1
2163            },  # OK
2164        ),
2165        (
2166            'non_matching_fault_injection',  # Not in route_rules, above.
2167            {},
2168            {
2169                0: 1
2170            },  # OK
2171        ),
2172    ]
2173
2174    passed = True
2175    try:
2176        first_case = True
2177        for (testcase_name, client_config, expected_results) in test_cases:
2178            logger.info('starting case %s', testcase_name)
2179
2180            client_config['metadata'] = [
2181                (messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
2182                 testcase_header, testcase_name)
2183            ]
2184            client_config['rpc_types'] = [
2185                messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
2186            ]
2187            configure_client(**client_config)
2188            # wait a second to help ensure the client stops sending RPCs with
2189            # the old config.  We will make multiple attempts if it is failing,
2190            # but this improves confidence that the test is valid if the
2191            # previous client_config would lead to the same results.
2192            time.sleep(1)
2193            # Each attempt takes 10 seconds
2194            if first_case:
2195                # Give the first test case 600s for xDS config propagation.
2196                attempt_count = 60
2197                first_case = False
2198            else:
2199                # The accumulated stats might include previous sub-test, running
2200                # the test multiple times to deflake
2201                attempt_count = 10
2202            before_stats = get_client_accumulated_stats()
2203            if not before_stats.stats_per_method:
2204                raise ValueError(
2205                    'stats.stats_per_method is None, the interop client stats service does not support this test case'
2206                )
2207            for i in range(attempt_count):
2208                logger.info('%s: attempt %d', testcase_name, i)
2209
2210                test_runtime_secs = 10
2211                time.sleep(test_runtime_secs)
2212                after_stats = get_client_accumulated_stats()
2213
2214                success = True
2215                for status, pct in list(expected_results.items()):
2216                    rpc = 'UNARY_CALL'
2217                    qty = (after_stats.stats_per_method[rpc].result[status] -
2218                           before_stats.stats_per_method[rpc].result[status])
2219                    want = pct * args.qps * test_runtime_secs
2220                    # Allow 10% deviation from expectation to reduce flakiness
2221                    VARIANCE_ALLOWED = 0.1
2222                    if abs(qty - want) > want * VARIANCE_ALLOWED:
2223                        logger.info('%s: failed due to %s[%s]: got %d want ~%d',
2224                                    testcase_name, rpc, status, qty, want)
2225                        success = False
2226                if success:
2227                    logger.info('success')
2228                    break
2229                logger.info('%s attempt %d failed', testcase_name, i)
2230                before_stats = after_stats
2231            else:
2232                raise Exception(
2233                    '%s: timeout waiting for expected results: %s; got %s' %
2234                    (testcase_name, expected_results,
2235                     after_stats.stats_per_method))
2236    except Exception:
2237        passed = False
2238        raise
2239    finally:
2240        if passed or not args.halt_after_fail:
2241            patch_url_map_backend_service(gcp, original_backend_service)
2242            set_validate_for_proxyless(gcp, True)
2243
2244
2245def test_csds(gcp, original_backend_service, instance_group, server_uri):
2246    test_csds_timeout_s = datetime.timedelta(minutes=5).total_seconds()
2247    sleep_interval_between_attempts_s = datetime.timedelta(
2248        seconds=2).total_seconds()
2249    logger.info('Running test_csds')
2250
2251    logger.info('waiting for original backends to become healthy')
2252    wait_for_healthy_backends(gcp, original_backend_service, instance_group)
2253
2254    # Test case timeout: 5 minutes
2255    deadline = time.time() + test_csds_timeout_s
2256    cnt = 0
2257    while time.time() <= deadline:
2258        client_config = get_client_xds_config_dump()
2259        logger.info('test_csds attempt %d: received xDS config %s', cnt,
2260                    json.dumps(client_config, indent=2))
2261        if client_config is not None:
2262            # Got the xDS config dump, now validate it
2263            ok = True
2264            try:
2265                if client_config['node']['locality']['zone'] != args.zone:
2266                    logger.info('Invalid zone %s != %s',
2267                                client_config['node']['locality']['zone'],
2268                                args.zone)
2269                    ok = False
2270                seen = set()
2271                for xds_config in client_config.get('xds_config', []):
2272                    if 'listener_config' in xds_config:
2273                        listener_name = xds_config['listener_config'][
2274                            'dynamic_listeners'][0]['active_state']['listener'][
2275                                'name']
2276                        if listener_name != server_uri:
2277                            logger.info('Invalid Listener name %s != %s',
2278                                        listener_name, server_uri)
2279                            ok = False
2280                        else:
2281                            seen.add('lds')
2282                    elif 'route_config' in xds_config:
2283                        num_vh = len(
2284                            xds_config['route_config']['dynamic_route_configs']
2285                            [0]['route_config']['virtual_hosts'])
2286                        if num_vh <= 0:
2287                            logger.info('Invalid number of VirtualHosts %s',
2288                                        num_vh)
2289                            ok = False
2290                        else:
2291                            seen.add('rds')
2292                    elif 'cluster_config' in xds_config:
2293                        cluster_type = xds_config['cluster_config'][
2294                            'dynamic_active_clusters'][0]['cluster']['type']
2295                        if cluster_type != 'EDS':
2296                            logger.info('Invalid cluster type %s != EDS',
2297                                        cluster_type)
2298                            ok = False
2299                        else:
2300                            seen.add('cds')
2301                    elif 'endpoint_config' in xds_config:
2302                        sub_zone = xds_config["endpoint_config"][
2303                            "dynamic_endpoint_configs"][0]["endpoint_config"][
2304                                "endpoints"][0]["locality"]["sub_zone"]
2305                        if args.zone not in sub_zone:
2306                            logger.info('Invalid endpoint sub_zone %s',
2307                                        sub_zone)
2308                            ok = False
2309                        else:
2310                            seen.add('eds')
2311                for generic_xds_config in client_config.get(
2312                        'generic_xds_configs', []):
2313                    if re.search(r'\.Listener$',
2314                                 generic_xds_config['type_url']):
2315                        seen.add('lds')
2316                        listener = generic_xds_config["xds_config"]
2317                        if listener['name'] != server_uri:
2318                            logger.info('Invalid Listener name %s != %s',
2319                                        listener_name, server_uri)
2320                            ok = False
2321                    elif re.search(r'\.RouteConfiguration$',
2322                                   generic_xds_config['type_url']):
2323                        seen.add('rds')
2324                        route_config = generic_xds_config["xds_config"]
2325                        if not len(route_config['virtual_hosts']):
2326                            logger.info('Invalid number of VirtualHosts %s',
2327                                        num_vh)
2328                            ok = False
2329                    elif re.search(r'\.Cluster$',
2330                                   generic_xds_config['type_url']):
2331                        seen.add('cds')
2332                        cluster = generic_xds_config["xds_config"]
2333                        if cluster['type'] != 'EDS':
2334                            logger.info('Invalid cluster type %s != EDS',
2335                                        cluster_type)
2336                            ok = False
2337                    elif re.search(r'\.ClusterLoadAssignment$',
2338                                   generic_xds_config['type_url']):
2339                        seen.add('eds')
2340                        endpoint = generic_xds_config["xds_config"]
2341                        if args.zone not in endpoint["endpoints"][0][
2342                                "locality"]["sub_zone"]:
2343                            logger.info('Invalid endpoint sub_zone %s',
2344                                        sub_zone)
2345                            ok = False
2346                want = {'lds', 'rds', 'cds', 'eds'}
2347                if seen != want:
2348                    logger.info('Incomplete xDS config dump, seen=%s', seen)
2349                    ok = False
2350            except:
2351                logger.exception('Error in xDS config dump:')
2352                ok = False
2353            finally:
2354                if ok:
2355                    # Successfully fetched xDS config, and they looks good.
2356                    logger.info('success')
2357                    return
2358        logger.info('test_csds attempt %d failed', cnt)
2359        # Give the client some time to fetch xDS resources
2360        time.sleep(sleep_interval_between_attempts_s)
2361        cnt += 1
2362
2363    raise RuntimeError('failed to receive a valid xDS config in %s seconds' %
2364                       test_csds_timeout_s)
2365
2366
2367def set_validate_for_proxyless(gcp, validate_for_proxyless):
2368    if not gcp.alpha_compute:
2369        logger.debug(
2370            'Not setting validateForProxy because alpha is not enabled')
2371        return
2372    if len(gcp.global_forwarding_rules) != 1 or len(
2373            gcp.target_proxies) != 1 or len(gcp.url_maps) != 1:
2374        logger.debug(
2375            "Global forwarding rule, target proxy or url map not found.")
2376        return
2377    # This function deletes global_forwarding_rule and target_proxy, then
2378    # recreate target_proxy with validateForProxyless=False. This is necessary
2379    # because patching target_grpc_proxy isn't supported.
2380    delete_global_forwarding_rule(gcp, gcp.global_forwarding_rules[0])
2381    delete_target_proxy(gcp, gcp.target_proxies[0])
2382    create_target_proxy(gcp, target_proxy_name, validate_for_proxyless)
2383    create_global_forwarding_rule(gcp, forwarding_rule_name, [gcp.service_port])
2384
2385
2386def get_serving_status(instance, service_port):
2387    with grpc.insecure_channel('%s:%d' % (instance, service_port)) as channel:
2388        health_stub = health_pb2_grpc.HealthStub(channel)
2389        return health_stub.Check(health_pb2.HealthCheckRequest())
2390
2391
2392def set_serving_status(instances, service_port, serving):
2393    logger.info('setting %s serving status to %s', instances, serving)
2394    for instance in instances:
2395        with grpc.insecure_channel('%s:%d' %
2396                                   (instance, service_port)) as channel:
2397            logger.info('setting %s serving status to %s', instance, serving)
2398            stub = test_pb2_grpc.XdsUpdateHealthServiceStub(channel)
2399            retry_count = 5
2400            for i in range(5):
2401                if serving:
2402                    stub.SetServing(empty_pb2.Empty())
2403                else:
2404                    stub.SetNotServing(empty_pb2.Empty())
2405                serving_status = get_serving_status(instance, service_port)
2406                logger.info('got instance service status %s', serving_status)
2407                want_status = health_pb2.HealthCheckResponse.SERVING if serving else health_pb2.HealthCheckResponse.NOT_SERVING
2408                if serving_status.status == want_status:
2409                    break
2410                if i == retry_count - 1:
2411                    raise Exception(
2412                        'failed to set instance service status after %d retries'
2413                        % retry_count)
2414
2415
2416def is_primary_instance_group(gcp, instance_group):
2417    # Clients may connect to a TD instance in a different region than the
2418    # client, in which case primary/secondary assignments may not be based on
2419    # the client's actual locality.
2420    instance_names = get_instance_names(gcp, instance_group)
2421    stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
2422    return all(
2423        peer in instance_names for peer in list(stats.rpcs_by_peer.keys()))
2424
2425
2426def get_startup_script(path_to_server_binary, service_port):
2427    if path_to_server_binary:
2428        return 'nohup %s --port=%d 1>/dev/null &' % (path_to_server_binary,
2429                                                     service_port)
2430    else:
2431        return """#!/bin/bash
2432sudo apt update
2433sudo apt install -y git default-jdk
2434mkdir java_server
2435pushd java_server
2436git clone https://github.com/grpc/grpc-java.git
2437pushd grpc-java
2438pushd interop-testing
2439../gradlew installDist -x test -PskipCodegen=true -PskipAndroid=true
2440
2441nohup build/install/grpc-interop-testing/bin/xds-test-server \
2442    --port=%d 1>/dev/null &""" % service_port
2443
2444
2445def create_instance_template(gcp, name, network, source_image, machine_type,
2446                             startup_script):
2447    config = {
2448        'name': name,
2449        'properties': {
2450            'tags': {
2451                'items': ['allow-health-checks']
2452            },
2453            'machineType': machine_type,
2454            'serviceAccounts': [{
2455                'email': 'default',
2456                'scopes': ['https://www.googleapis.com/auth/cloud-platform',]
2457            }],
2458            'networkInterfaces': [{
2459                'accessConfigs': [{
2460                    'type': 'ONE_TO_ONE_NAT'
2461                }],
2462                'network': network
2463            }],
2464            'disks': [{
2465                'boot': True,
2466                'initializeParams': {
2467                    'sourceImage': source_image
2468                },
2469                'autoDelete': True
2470            }],
2471            'metadata': {
2472                'items': [{
2473                    'key': 'startup-script',
2474                    'value': startup_script
2475                }]
2476            }
2477        }
2478    }
2479
2480    logger.debug('Sending GCP request with body=%s', config)
2481    result = gcp.compute.instanceTemplates().insert(
2482        project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES)
2483    wait_for_global_operation(gcp, result['name'])
2484    gcp.instance_template = GcpResource(config['name'], result['targetLink'])
2485
2486
2487def add_instance_group(gcp, zone, name, size):
2488    config = {
2489        'name': name,
2490        'instanceTemplate': gcp.instance_template.url,
2491        'targetSize': size,
2492        'namedPorts': [{
2493            'name': 'grpc',
2494            'port': gcp.service_port
2495        }]
2496    }
2497
2498    logger.debug('Sending GCP request with body=%s', config)
2499    result = gcp.compute.instanceGroupManagers().insert(
2500        project=gcp.project, zone=zone,
2501        body=config).execute(num_retries=_GCP_API_RETRIES)
2502    wait_for_zone_operation(gcp, zone, result['name'])
2503    result = gcp.compute.instanceGroupManagers().get(
2504        project=gcp.project, zone=zone,
2505        instanceGroupManager=config['name']).execute(
2506            num_retries=_GCP_API_RETRIES)
2507    instance_group = InstanceGroup(config['name'], result['instanceGroup'],
2508                                   zone)
2509    gcp.instance_groups.append(instance_group)
2510    wait_for_instance_group_to_reach_expected_size(gcp, instance_group, size,
2511                                                   _WAIT_FOR_OPERATION_SEC)
2512    return instance_group
2513
2514
2515def create_health_check(gcp, name):
2516    if gcp.alpha_compute:
2517        config = {
2518            'name': name,
2519            'type': 'GRPC',
2520            'grpcHealthCheck': {
2521                'portSpecification': 'USE_SERVING_PORT'
2522            }
2523        }
2524        compute_to_use = gcp.alpha_compute
2525    else:
2526        config = {
2527            'name': name,
2528            'type': 'TCP',
2529            'tcpHealthCheck': {
2530                'portName': 'grpc'
2531            }
2532        }
2533        compute_to_use = gcp.compute
2534    logger.debug('Sending GCP request with body=%s', config)
2535    result = compute_to_use.healthChecks().insert(
2536        project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES)
2537    wait_for_global_operation(gcp, result['name'])
2538    gcp.health_check = GcpResource(config['name'], result['targetLink'])
2539
2540
2541def create_health_check_firewall_rule(gcp, name):
2542    config = {
2543        'name': name,
2544        'direction': 'INGRESS',
2545        'allowed': [{
2546            'IPProtocol': 'tcp'
2547        }],
2548        'sourceRanges': ['35.191.0.0/16', '130.211.0.0/22'],
2549        'targetTags': ['allow-health-checks'],
2550    }
2551    logger.debug('Sending GCP request with body=%s', config)
2552    result = gcp.compute.firewalls().insert(
2553        project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES)
2554    wait_for_global_operation(gcp, result['name'])
2555    gcp.health_check_firewall_rule = GcpResource(config['name'],
2556                                                 result['targetLink'])
2557
2558
2559def add_backend_service(gcp, name):
2560    if gcp.alpha_compute:
2561        protocol = 'GRPC'
2562        compute_to_use = gcp.alpha_compute
2563    else:
2564        protocol = 'HTTP2'
2565        compute_to_use = gcp.compute
2566    config = {
2567        'name': name,
2568        'loadBalancingScheme': 'INTERNAL_SELF_MANAGED',
2569        'healthChecks': [gcp.health_check.url],
2570        'portName': 'grpc',
2571        'protocol': protocol
2572    }
2573    logger.debug('Sending GCP request with body=%s', config)
2574    result = compute_to_use.backendServices().insert(
2575        project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES)
2576    wait_for_global_operation(gcp, result['name'])
2577    backend_service = GcpResource(config['name'], result['targetLink'])
2578    gcp.backend_services.append(backend_service)
2579    return backend_service
2580
2581
2582def create_url_map(gcp, name, backend_service, host_name):
2583    config = {
2584        'name': name,
2585        'defaultService': backend_service.url,
2586        'pathMatchers': [{
2587            'name': _PATH_MATCHER_NAME,
2588            'defaultService': backend_service.url,
2589        }],
2590        'hostRules': [{
2591            'hosts': [host_name],
2592            'pathMatcher': _PATH_MATCHER_NAME
2593        }]
2594    }
2595    logger.debug('Sending GCP request with body=%s', config)
2596    result = gcp.compute.urlMaps().insert(
2597        project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES)
2598    wait_for_global_operation(gcp, result['name'])
2599    url_map = GcpResource(config['name'], result['targetLink'])
2600    gcp.url_maps.append(url_map)
2601    return url_map
2602
2603
2604def patch_url_map_host_rule_with_port(gcp, name, backend_service, host_name):
2605    config = {
2606        'hostRules': [{
2607            'hosts': ['%s:%d' % (host_name, gcp.service_port)],
2608            'pathMatcher': _PATH_MATCHER_NAME
2609        }]
2610    }
2611    logger.debug('Sending GCP request with body=%s', config)
2612    result = gcp.compute.urlMaps().patch(
2613        project=gcp.project, urlMap=name,
2614        body=config).execute(num_retries=_GCP_API_RETRIES)
2615    wait_for_global_operation(gcp, result['name'])
2616
2617
2618def create_target_proxy(gcp, name, validate_for_proxyless=True, url_map=None):
2619    if url_map:
2620        arg_url_map_url = url_map.url
2621    else:
2622        arg_url_map_url = gcp.url_maps[0].url
2623    if gcp.alpha_compute:
2624        config = {
2625            'name': name,
2626            'url_map': arg_url_map_url,
2627            'validate_for_proxyless': validate_for_proxyless
2628        }
2629        logger.debug('Sending GCP request with body=%s', config)
2630        result = gcp.alpha_compute.targetGrpcProxies().insert(
2631            project=gcp.project,
2632            body=config).execute(num_retries=_GCP_API_RETRIES)
2633    else:
2634        config = {
2635            'name': name,
2636            'url_map': arg_url_map_url,
2637        }
2638        logger.debug('Sending GCP request with body=%s', config)
2639        result = gcp.compute.targetHttpProxies().insert(
2640            project=gcp.project,
2641            body=config).execute(num_retries=_GCP_API_RETRIES)
2642    wait_for_global_operation(gcp, result['name'])
2643    target_proxy = GcpResource(config['name'], result['targetLink'])
2644    gcp.target_proxies.append(target_proxy)
2645    return target_proxy
2646
2647
2648def create_global_forwarding_rule(gcp,
2649                                  name,
2650                                  potential_ports,
2651                                  potential_ip_addresses=['0.0.0.0'],
2652                                  target_proxy=None):
2653    if target_proxy:
2654        arg_target_proxy_url = target_proxy.url
2655    else:
2656        arg_target_proxy_url = gcp.target_proxies[0].url
2657    if gcp.alpha_compute:
2658        compute_to_use = gcp.alpha_compute
2659    else:
2660        compute_to_use = gcp.compute
2661    for port in potential_ports:
2662        for ip_address in potential_ip_addresses:
2663            try:
2664                config = {
2665                    'name': name,
2666                    'loadBalancingScheme': 'INTERNAL_SELF_MANAGED',
2667                    'portRange': str(port),
2668                    'IPAddress': ip_address,
2669                    'network': args.network,
2670                    'target': arg_target_proxy_url,
2671                }
2672                logger.debug('Sending GCP request with body=%s', config)
2673                result = compute_to_use.globalForwardingRules().insert(
2674                    project=gcp.project,
2675                    body=config).execute(num_retries=_GCP_API_RETRIES)
2676                wait_for_global_operation(gcp, result['name'])
2677                global_forwarding_rule = GcpResource(config['name'],
2678                                                     result['targetLink'])
2679                gcp.global_forwarding_rules.append(global_forwarding_rule)
2680                gcp.service_port = port
2681                return
2682            except googleapiclient.errors.HttpError as http_error:
2683                logger.warning(
2684                    'Got error %s when attempting to create forwarding rule to '
2685                    '%s:%d. Retrying with another port.' %
2686                    (http_error, ip_address, port))
2687
2688
2689def get_health_check(gcp, health_check_name):
2690    try:
2691        result = gcp.compute.healthChecks().get(
2692            project=gcp.project, healthCheck=health_check_name).execute()
2693        gcp.health_check = GcpResource(health_check_name, result['selfLink'])
2694    except Exception as e:
2695        gcp.errors.append(e)
2696        gcp.health_check = GcpResource(health_check_name, None)
2697
2698
2699def get_health_check_firewall_rule(gcp, firewall_name):
2700    try:
2701        result = gcp.compute.firewalls().get(project=gcp.project,
2702                                             firewall=firewall_name).execute()
2703        gcp.health_check_firewall_rule = GcpResource(firewall_name,
2704                                                     result['selfLink'])
2705    except Exception as e:
2706        gcp.errors.append(e)
2707        gcp.health_check_firewall_rule = GcpResource(firewall_name, None)
2708
2709
2710def get_backend_service(gcp, backend_service_name, record_error=True):
2711    try:
2712        result = gcp.compute.backendServices().get(
2713            project=gcp.project, backendService=backend_service_name).execute()
2714        backend_service = GcpResource(backend_service_name, result['selfLink'])
2715    except Exception as e:
2716        if record_error:
2717            gcp.errors.append(e)
2718        backend_service = GcpResource(backend_service_name, None)
2719    gcp.backend_services.append(backend_service)
2720    return backend_service
2721
2722
2723def get_url_map(gcp, url_map_name, record_error=True):
2724    try:
2725        result = gcp.compute.urlMaps().get(project=gcp.project,
2726                                           urlMap=url_map_name).execute()
2727        url_map = GcpResource(url_map_name, result['selfLink'])
2728        gcp.url_maps.append(url_map)
2729    except Exception as e:
2730        if record_error:
2731            gcp.errors.append(e)
2732
2733
2734def get_target_proxy(gcp, target_proxy_name, record_error=True):
2735    try:
2736        if gcp.alpha_compute:
2737            result = gcp.alpha_compute.targetGrpcProxies().get(
2738                project=gcp.project,
2739                targetGrpcProxy=target_proxy_name).execute()
2740        else:
2741            result = gcp.compute.targetHttpProxies().get(
2742                project=gcp.project,
2743                targetHttpProxy=target_proxy_name).execute()
2744        target_proxy = GcpResource(target_proxy_name, result['selfLink'])
2745        gcp.target_proxies.append(target_proxy)
2746    except Exception as e:
2747        if record_error:
2748            gcp.errors.append(e)
2749
2750
2751def get_global_forwarding_rule(gcp, forwarding_rule_name, record_error=True):
2752    try:
2753        result = gcp.compute.globalForwardingRules().get(
2754            project=gcp.project, forwardingRule=forwarding_rule_name).execute()
2755        global_forwarding_rule = GcpResource(forwarding_rule_name,
2756                                             result['selfLink'])
2757        gcp.global_forwarding_rules.append(global_forwarding_rule)
2758    except Exception as e:
2759        if record_error:
2760            gcp.errors.append(e)
2761
2762
2763def get_instance_template(gcp, template_name):
2764    try:
2765        result = gcp.compute.instanceTemplates().get(
2766            project=gcp.project, instanceTemplate=template_name).execute()
2767        gcp.instance_template = GcpResource(template_name, result['selfLink'])
2768    except Exception as e:
2769        gcp.errors.append(e)
2770        gcp.instance_template = GcpResource(template_name, None)
2771
2772
2773def get_instance_group(gcp, zone, instance_group_name):
2774    try:
2775        result = gcp.compute.instanceGroups().get(
2776            project=gcp.project, zone=zone,
2777            instanceGroup=instance_group_name).execute()
2778        gcp.service_port = result['namedPorts'][0]['port']
2779        instance_group = InstanceGroup(instance_group_name, result['selfLink'],
2780                                       zone)
2781    except Exception as e:
2782        gcp.errors.append(e)
2783        instance_group = InstanceGroup(instance_group_name, None, zone)
2784    gcp.instance_groups.append(instance_group)
2785    return instance_group
2786
2787
2788def delete_global_forwarding_rule(gcp, forwarding_rule_to_delete=None):
2789    if not forwarding_rule_to_delete:
2790        return
2791    try:
2792        logger.debug('Deleting forwarding rule %s',
2793                     forwarding_rule_to_delete.name)
2794        result = gcp.compute.globalForwardingRules().delete(
2795            project=gcp.project,
2796            forwardingRule=forwarding_rule_to_delete.name).execute(
2797                num_retries=_GCP_API_RETRIES)
2798        wait_for_global_operation(gcp, result['name'])
2799        if forwarding_rule_to_delete in gcp.global_forwarding_rules:
2800            gcp.global_forwarding_rules.remove(forwarding_rule_to_delete)
2801        else:
2802            logger.debug(
2803                'Forwarding rule %s does not exist in gcp.global_forwarding_rules',
2804                forwarding_rule_to_delete.name)
2805    except googleapiclient.errors.HttpError as http_error:
2806        logger.info('Delete failed: %s', http_error)
2807
2808
2809def delete_global_forwarding_rules(gcp):
2810    forwarding_rules_to_delete = gcp.global_forwarding_rules.copy()
2811    for forwarding_rule in forwarding_rules_to_delete:
2812        delete_global_forwarding_rule(gcp, forwarding_rule)
2813
2814
2815def delete_target_proxy(gcp, proxy_to_delete=None):
2816    if not proxy_to_delete:
2817        return
2818    try:
2819        if gcp.alpha_compute:
2820            logger.debug('Deleting grpc proxy %s', proxy_to_delete.name)
2821            result = gcp.alpha_compute.targetGrpcProxies().delete(
2822                project=gcp.project,
2823                targetGrpcProxy=proxy_to_delete.name).execute(
2824                    num_retries=_GCP_API_RETRIES)
2825        else:
2826            logger.debug('Deleting http proxy %s', proxy_to_delete.name)
2827            result = gcp.compute.targetHttpProxies().delete(
2828                project=gcp.project,
2829                targetHttpProxy=proxy_to_delete.name).execute(
2830                    num_retries=_GCP_API_RETRIES)
2831        wait_for_global_operation(gcp, result['name'])
2832        if proxy_to_delete in gcp.target_proxies:
2833            gcp.target_proxies.remove(proxy_to_delete)
2834        else:
2835            logger.debug('Gcp proxy %s does not exist in gcp.target_proxies',
2836                         proxy_to_delete.name)
2837    except googleapiclient.errors.HttpError as http_error:
2838        logger.info('Delete failed: %s', http_error)
2839
2840
2841def delete_target_proxies(gcp):
2842    target_proxies_to_delete = gcp.target_proxies.copy()
2843    for target_proxy in target_proxies_to_delete:
2844        delete_target_proxy(gcp, target_proxy)
2845
2846
2847def delete_url_map(gcp, url_map_to_delete=None):
2848    if not url_map_to_delete:
2849        return
2850    try:
2851        logger.debug('Deleting url map %s', url_map_to_delete.name)
2852        result = gcp.compute.urlMaps().delete(
2853            project=gcp.project,
2854            urlMap=url_map_to_delete.name).execute(num_retries=_GCP_API_RETRIES)
2855        wait_for_global_operation(gcp, result['name'])
2856        if url_map_to_delete in gcp.url_maps:
2857            gcp.url_maps.remove(url_map_to_delete)
2858        else:
2859            logger.debug('Url map %s does not exist in gcp.url_maps',
2860                         url_map_to_delete.name)
2861    except googleapiclient.errors.HttpError as http_error:
2862        logger.info('Delete failed: %s', http_error)
2863
2864
2865def delete_url_maps(gcp):
2866    url_maps_to_delete = gcp.url_maps.copy()
2867    for url_map in url_maps_to_delete:
2868        delete_url_map(gcp, url_map)
2869
2870
2871def delete_backend_service(gcp, backend_service):
2872    try:
2873        logger.debug('Deleting backend service %s', backend_service.name)
2874        result = gcp.compute.backendServices().delete(
2875            project=gcp.project, backendService=backend_service.name).execute(
2876                num_retries=_GCP_API_RETRIES)
2877        wait_for_global_operation(gcp, result['name'])
2878    except googleapiclient.errors.HttpError as http_error:
2879        logger.info('Delete failed: %s', http_error)
2880
2881
2882def delete_backend_services(gcp):
2883    for backend_service in gcp.backend_services:
2884        delete_backend_service(gcp, backend_service)
2885
2886
2887def delete_firewall(gcp):
2888    try:
2889        logger.debug('Deleting firewall %s',
2890                     gcp.health_check_firewall_rule.name)
2891        result = gcp.compute.firewalls().delete(
2892            project=gcp.project,
2893            firewall=gcp.health_check_firewall_rule.name).execute(
2894                num_retries=_GCP_API_RETRIES)
2895        wait_for_global_operation(gcp, result['name'])
2896    except googleapiclient.errors.HttpError as http_error:
2897        logger.info('Delete failed: %s', http_error)
2898
2899
2900def delete_health_check(gcp):
2901    try:
2902        logger.debug('Deleting health check %s', gcp.health_check.name)
2903        result = gcp.compute.healthChecks().delete(
2904            project=gcp.project, healthCheck=gcp.health_check.name).execute(
2905                num_retries=_GCP_API_RETRIES)
2906        wait_for_global_operation(gcp, result['name'])
2907    except googleapiclient.errors.HttpError as http_error:
2908        logger.info('Delete failed: %s', http_error)
2909
2910
2911def delete_instance_groups(gcp):
2912    for instance_group in gcp.instance_groups:
2913        try:
2914            logger.debug('Deleting instance group %s %s', instance_group.name,
2915                         instance_group.zone)
2916            result = gcp.compute.instanceGroupManagers().delete(
2917                project=gcp.project,
2918                zone=instance_group.zone,
2919                instanceGroupManager=instance_group.name).execute(
2920                    num_retries=_GCP_API_RETRIES)
2921            wait_for_zone_operation(gcp,
2922                                    instance_group.zone,
2923                                    result['name'],
2924                                    timeout_sec=_WAIT_FOR_BACKEND_SEC)
2925        except googleapiclient.errors.HttpError as http_error:
2926            logger.info('Delete failed: %s', http_error)
2927
2928
2929def delete_instance_template(gcp):
2930    try:
2931        logger.debug('Deleting instance template %s',
2932                     gcp.instance_template.name)
2933        result = gcp.compute.instanceTemplates().delete(
2934            project=gcp.project,
2935            instanceTemplate=gcp.instance_template.name).execute(
2936                num_retries=_GCP_API_RETRIES)
2937        wait_for_global_operation(gcp, result['name'])
2938    except googleapiclient.errors.HttpError as http_error:
2939        logger.info('Delete failed: %s', http_error)
2940
2941
2942def patch_backend_service(gcp,
2943                          backend_service,
2944                          instance_groups,
2945                          balancing_mode='UTILIZATION',
2946                          max_rate=1,
2947                          circuit_breakers=None):
2948    if gcp.alpha_compute:
2949        compute_to_use = gcp.alpha_compute
2950    else:
2951        compute_to_use = gcp.compute
2952    config = {
2953        'backends': [{
2954            'group': instance_group.url,
2955            'balancingMode': balancing_mode,
2956            'maxRate': max_rate if balancing_mode == 'RATE' else None
2957        } for instance_group in instance_groups],
2958        'circuitBreakers': circuit_breakers,
2959    }
2960    logger.debug('Sending GCP request with body=%s', config)
2961    result = compute_to_use.backendServices().patch(
2962        project=gcp.project, backendService=backend_service.name,
2963        body=config).execute(num_retries=_GCP_API_RETRIES)
2964    wait_for_global_operation(gcp,
2965                              result['name'],
2966                              timeout_sec=_WAIT_FOR_BACKEND_SEC)
2967
2968
2969def resize_instance_group(gcp,
2970                          instance_group,
2971                          new_size,
2972                          timeout_sec=_WAIT_FOR_OPERATION_SEC):
2973    result = gcp.compute.instanceGroupManagers().resize(
2974        project=gcp.project,
2975        zone=instance_group.zone,
2976        instanceGroupManager=instance_group.name,
2977        size=new_size).execute(num_retries=_GCP_API_RETRIES)
2978    wait_for_zone_operation(gcp,
2979                            instance_group.zone,
2980                            result['name'],
2981                            timeout_sec=360)
2982    wait_for_instance_group_to_reach_expected_size(gcp, instance_group,
2983                                                   new_size, timeout_sec)
2984
2985
2986def patch_url_map_backend_service(gcp,
2987                                  backend_service=None,
2988                                  services_with_weights=None,
2989                                  route_rules=None,
2990                                  url_map=None):
2991    if url_map:
2992        url_map_name = url_map.name
2993    else:
2994        url_map_name = gcp.url_maps[0].name
2995    '''change url_map's backend service
2996
2997    Only one of backend_service and service_with_weights can be not None.
2998    '''
2999    if gcp.alpha_compute:
3000        compute_to_use = gcp.alpha_compute
3001    else:
3002        compute_to_use = gcp.compute
3003
3004    if backend_service and services_with_weights:
3005        raise ValueError(
3006            'both backend_service and service_with_weights are not None.')
3007
3008    default_service = backend_service.url if backend_service else None
3009    default_route_action = {
3010        'weightedBackendServices': [{
3011            'backendService': service.url,
3012            'weight': w,
3013        } for service, w in list(services_with_weights.items())]
3014    } if services_with_weights else None
3015
3016    config = {
3017        'pathMatchers': [{
3018            'name': _PATH_MATCHER_NAME,
3019            'defaultService': default_service,
3020            'defaultRouteAction': default_route_action,
3021            'routeRules': route_rules,
3022        }]
3023    }
3024    logger.debug('Sending GCP request with body=%s', config)
3025    result = compute_to_use.urlMaps().patch(
3026        project=gcp.project, urlMap=url_map_name,
3027        body=config).execute(num_retries=_GCP_API_RETRIES)
3028    wait_for_global_operation(gcp, result['name'])
3029
3030
3031def wait_for_instance_group_to_reach_expected_size(gcp, instance_group,
3032                                                   expected_size, timeout_sec):
3033    start_time = time.time()
3034    while True:
3035        current_size = len(get_instance_names(gcp, instance_group))
3036        if current_size == expected_size:
3037            break
3038        if time.time() - start_time > timeout_sec:
3039            raise Exception(
3040                'Instance group had expected size %d but actual size %d' %
3041                (expected_size, current_size))
3042        time.sleep(2)
3043
3044
3045def wait_for_global_operation(gcp,
3046                              operation,
3047                              timeout_sec=_WAIT_FOR_OPERATION_SEC):
3048    start_time = time.time()
3049    while time.time() - start_time <= timeout_sec:
3050        result = gcp.compute.globalOperations().get(
3051            project=gcp.project,
3052            operation=operation).execute(num_retries=_GCP_API_RETRIES)
3053        if result['status'] == 'DONE':
3054            if 'error' in result:
3055                raise Exception(result['error'])
3056            return
3057        time.sleep(2)
3058    raise Exception('Operation %s did not complete within %d' %
3059                    (operation, timeout_sec))
3060
3061
3062def wait_for_zone_operation(gcp,
3063                            zone,
3064                            operation,
3065                            timeout_sec=_WAIT_FOR_OPERATION_SEC):
3066    start_time = time.time()
3067    while time.time() - start_time <= timeout_sec:
3068        result = gcp.compute.zoneOperations().get(
3069            project=gcp.project, zone=zone,
3070            operation=operation).execute(num_retries=_GCP_API_RETRIES)
3071        if result['status'] == 'DONE':
3072            if 'error' in result:
3073                raise Exception(result['error'])
3074            return
3075        time.sleep(2)
3076    raise Exception('Operation %s did not complete within %d' %
3077                    (operation, timeout_sec))
3078
3079
3080def wait_for_healthy_backends(gcp,
3081                              backend_service,
3082                              instance_group,
3083                              timeout_sec=_WAIT_FOR_BACKEND_SEC):
3084    start_time = time.time()
3085    config = {'group': instance_group.url}
3086    instance_names = get_instance_names(gcp, instance_group)
3087    expected_size = len(instance_names)
3088    while time.time() - start_time <= timeout_sec:
3089        for instance_name in instance_names:
3090            try:
3091                status = get_serving_status(instance_name, gcp.service_port)
3092                logger.info('serving status response from %s: %s',
3093                            instance_name, status)
3094            except grpc.RpcError as rpc_error:
3095                logger.info('checking serving status of %s failed: %s',
3096                            instance_name, rpc_error)
3097        result = gcp.compute.backendServices().getHealth(
3098            project=gcp.project,
3099            backendService=backend_service.name,
3100            body=config).execute(num_retries=_GCP_API_RETRIES)
3101        if 'healthStatus' in result:
3102            logger.info('received GCP healthStatus: %s', result['healthStatus'])
3103            healthy = True
3104            for instance in result['healthStatus']:
3105                if instance['healthState'] != 'HEALTHY':
3106                    healthy = False
3107                    break
3108            if healthy and expected_size == len(result['healthStatus']):
3109                return
3110        else:
3111            logger.info('no healthStatus received from GCP')
3112        time.sleep(5)
3113    raise Exception('Not all backends became healthy within %d seconds: %s' %
3114                    (timeout_sec, result))
3115
3116
3117def get_instance_names(gcp, instance_group):
3118    instance_names = []
3119    result = gcp.compute.instanceGroups().listInstances(
3120        project=gcp.project,
3121        zone=instance_group.zone,
3122        instanceGroup=instance_group.name,
3123        body={
3124            'instanceState': 'ALL'
3125        }).execute(num_retries=_GCP_API_RETRIES)
3126    if 'items' not in result:
3127        return []
3128    for item in result['items']:
3129        # listInstances() returns the full URL of the instance, which ends with
3130        # the instance name. compute.instances().get() requires using the
3131        # instance name (not the full URL) to look up instance details, so we
3132        # just extract the name manually.
3133        instance_name = item['instance'].split('/')[-1]
3134        instance_names.append(instance_name)
3135    logger.info('retrieved instance names: %s', instance_names)
3136    return instance_names
3137
3138
3139def clean_up(gcp):
3140    delete_global_forwarding_rules(gcp)
3141    delete_target_proxies(gcp)
3142    delete_url_maps(gcp)
3143    delete_backend_services(gcp)
3144    if gcp.health_check_firewall_rule:
3145        delete_firewall(gcp)
3146    if gcp.health_check:
3147        delete_health_check(gcp)
3148    delete_instance_groups(gcp)
3149    if gcp.instance_template:
3150        delete_instance_template(gcp)
3151
3152
3153class InstanceGroup(object):
3154
3155    def __init__(self, name, url, zone):
3156        self.name = name
3157        self.url = url
3158        self.zone = zone
3159
3160
3161class GcpResource(object):
3162
3163    def __init__(self, name, url):
3164        self.name = name
3165        self.url = url
3166
3167
3168class GcpState(object):
3169
3170    def __init__(self, compute, alpha_compute, project, project_num):
3171        self.compute = compute
3172        self.alpha_compute = alpha_compute
3173        self.project = project
3174        self.project_num = project_num
3175        self.health_check = None
3176        self.health_check_firewall_rule = None
3177        self.backend_services = []
3178        self.url_maps = []
3179        self.target_proxies = []
3180        self.global_forwarding_rules = []
3181        self.service_port = None
3182        self.instance_template = None
3183        self.instance_groups = []
3184        self.errors = []
3185
3186
3187logging.debug(
3188    "script start time: %s",
3189    datetime.datetime.now(
3190        datetime.timezone.utc).astimezone().strftime("%Y-%m-%dT%H:%M:%S %Z"))
3191logging.debug("logging local timezone: %s",
3192              datetime.datetime.now(datetime.timezone.utc).astimezone().tzinfo)
3193alpha_compute = None
3194if args.compute_discovery_document:
3195    with open(args.compute_discovery_document, 'r') as discovery_doc:
3196        compute = googleapiclient.discovery.build_from_document(
3197            discovery_doc.read())
3198    if not args.only_stable_gcp_apis and args.alpha_compute_discovery_document:
3199        with open(args.alpha_compute_discovery_document, 'r') as discovery_doc:
3200            alpha_compute = googleapiclient.discovery.build_from_document(
3201                discovery_doc.read())
3202else:
3203    compute = googleapiclient.discovery.build('compute', 'v1')
3204    if not args.only_stable_gcp_apis:
3205        alpha_compute = googleapiclient.discovery.build('compute', 'alpha')
3206
3207test_results = {}
3208failed_tests = []
3209try:
3210    gcp = GcpState(compute, alpha_compute, args.project_id, args.project_num)
3211    gcp_suffix = args.gcp_suffix
3212    health_check_name = _BASE_HEALTH_CHECK_NAME + gcp_suffix
3213    if not args.use_existing_gcp_resources:
3214        if args.keep_gcp_resources:
3215            # Auto-generating a unique suffix in case of conflict should not be
3216            # combined with --keep_gcp_resources, as the suffix actually used
3217            # for GCP resources will not match the provided --gcp_suffix value.
3218            num_attempts = 1
3219        else:
3220            num_attempts = 5
3221        for i in range(num_attempts):
3222            try:
3223                logger.info('Using GCP suffix %s', gcp_suffix)
3224                create_health_check(gcp, health_check_name)
3225                break
3226            except googleapiclient.errors.HttpError as http_error:
3227                gcp_suffix = '%s-%04d' % (gcp_suffix, random.randint(0, 9999))
3228                health_check_name = _BASE_HEALTH_CHECK_NAME + gcp_suffix
3229                logger.exception('HttpError when creating health check')
3230        if gcp.health_check is None:
3231            raise Exception('Failed to create health check name after %d '
3232                            'attempts' % num_attempts)
3233    firewall_name = _BASE_FIREWALL_RULE_NAME + gcp_suffix
3234    backend_service_name = _BASE_BACKEND_SERVICE_NAME + gcp_suffix
3235    alternate_backend_service_name = _BASE_BACKEND_SERVICE_NAME + '-alternate' + gcp_suffix
3236    extra_backend_service_name = _BASE_BACKEND_SERVICE_NAME + '-extra' + gcp_suffix
3237    more_extra_backend_service_name = _BASE_BACKEND_SERVICE_NAME + '-more-extra' + gcp_suffix
3238    url_map_name = _BASE_URL_MAP_NAME + gcp_suffix
3239    url_map_name_2 = url_map_name + '2'
3240    service_host_name = _BASE_SERVICE_HOST + gcp_suffix
3241    target_proxy_name = _BASE_TARGET_PROXY_NAME + gcp_suffix
3242    target_proxy_name_2 = target_proxy_name + '2'
3243    forwarding_rule_name = _BASE_FORWARDING_RULE_NAME + gcp_suffix
3244    forwarding_rule_name_2 = forwarding_rule_name + '2'
3245    template_name = _BASE_TEMPLATE_NAME + gcp_suffix
3246    instance_group_name = _BASE_INSTANCE_GROUP_NAME + gcp_suffix
3247    same_zone_instance_group_name = _BASE_INSTANCE_GROUP_NAME + '-same-zone' + gcp_suffix
3248    secondary_zone_instance_group_name = _BASE_INSTANCE_GROUP_NAME + '-secondary-zone' + gcp_suffix
3249    potential_service_ports = list(args.service_port_range)
3250    random.shuffle(potential_service_ports)
3251    if args.use_existing_gcp_resources:
3252        logger.info('Reusing existing GCP resources')
3253        get_health_check(gcp, health_check_name)
3254        get_health_check_firewall_rule(gcp, firewall_name)
3255        backend_service = get_backend_service(gcp, backend_service_name)
3256        alternate_backend_service = get_backend_service(
3257            gcp, alternate_backend_service_name)
3258        extra_backend_service = get_backend_service(gcp,
3259                                                    extra_backend_service_name,
3260                                                    record_error=False)
3261        more_extra_backend_service = get_backend_service(
3262            gcp, more_extra_backend_service_name, record_error=False)
3263        get_url_map(gcp, url_map_name)
3264        get_target_proxy(gcp, target_proxy_name)
3265        get_global_forwarding_rule(gcp, forwarding_rule_name)
3266        get_url_map(gcp, url_map_name_2, record_error=False)
3267        get_target_proxy(gcp, target_proxy_name_2, record_error=False)
3268        get_global_forwarding_rule(gcp,
3269                                   forwarding_rule_name_2,
3270                                   record_error=False)
3271        get_instance_template(gcp, template_name)
3272        instance_group = get_instance_group(gcp, args.zone, instance_group_name)
3273        same_zone_instance_group = get_instance_group(
3274            gcp, args.zone, same_zone_instance_group_name)
3275        secondary_zone_instance_group = get_instance_group(
3276            gcp, args.secondary_zone, secondary_zone_instance_group_name)
3277        if gcp.errors:
3278            raise Exception(gcp.errors)
3279    else:
3280        create_health_check_firewall_rule(gcp, firewall_name)
3281        backend_service = add_backend_service(gcp, backend_service_name)
3282        alternate_backend_service = add_backend_service(
3283            gcp, alternate_backend_service_name)
3284        create_url_map(gcp, url_map_name, backend_service, service_host_name)
3285        create_target_proxy(gcp, target_proxy_name)
3286        create_global_forwarding_rule(gcp, forwarding_rule_name,
3287                                      potential_service_ports)
3288        if not gcp.service_port:
3289            raise Exception(
3290                'Failed to find a valid ip:port for the forwarding rule')
3291        if gcp.service_port != _DEFAULT_SERVICE_PORT:
3292            patch_url_map_host_rule_with_port(gcp, url_map_name,
3293                                              backend_service,
3294                                              service_host_name)
3295        startup_script = get_startup_script(args.path_to_server_binary,
3296                                            gcp.service_port)
3297        create_instance_template(gcp, template_name, args.network,
3298                                 args.source_image, args.machine_type,
3299                                 startup_script)
3300        instance_group = add_instance_group(gcp, args.zone, instance_group_name,
3301                                            _INSTANCE_GROUP_SIZE)
3302        patch_backend_service(gcp, backend_service, [instance_group])
3303        same_zone_instance_group = add_instance_group(
3304            gcp, args.zone, same_zone_instance_group_name, _INSTANCE_GROUP_SIZE)
3305        secondary_zone_instance_group = add_instance_group(
3306            gcp, args.secondary_zone, secondary_zone_instance_group_name,
3307            _INSTANCE_GROUP_SIZE)
3308
3309    wait_for_healthy_backends(gcp, backend_service, instance_group)
3310
3311    if args.test_case:
3312        client_env = dict(os.environ)
3313        if original_grpc_trace:
3314            client_env['GRPC_TRACE'] = original_grpc_trace
3315        if original_grpc_verbosity:
3316            client_env['GRPC_VERBOSITY'] = original_grpc_verbosity
3317        bootstrap_server_features = []
3318
3319        if gcp.service_port == _DEFAULT_SERVICE_PORT:
3320            server_uri = service_host_name
3321        else:
3322            server_uri = service_host_name + ':' + str(gcp.service_port)
3323        if args.xds_v3_support:
3324            client_env['GRPC_XDS_EXPERIMENTAL_V3_SUPPORT'] = 'true'
3325            bootstrap_server_features.append('xds_v3')
3326        if args.bootstrap_file:
3327            bootstrap_path = os.path.abspath(args.bootstrap_file)
3328        else:
3329            with tempfile.NamedTemporaryFile(delete=False) as bootstrap_file:
3330                bootstrap_file.write(
3331                    _BOOTSTRAP_TEMPLATE.format(
3332                        node_id='projects/%s/networks/%s/nodes/%s' %
3333                        (gcp.project_num, args.network.split('/')[-1],
3334                         uuid.uuid1()),
3335                        server_features=json.dumps(
3336                            bootstrap_server_features)).encode('utf-8'))
3337                bootstrap_path = bootstrap_file.name
3338        client_env['GRPC_XDS_BOOTSTRAP'] = bootstrap_path
3339        client_env['GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING'] = 'true'
3340        client_env['GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT'] = 'true'
3341        client_env['GRPC_XDS_EXPERIMENTAL_FAULT_INJECTION'] = 'true'
3342        for test_case in args.test_case:
3343            if test_case in _V3_TEST_CASES and not args.xds_v3_support:
3344                logger.info('skipping test %s due to missing v3 support',
3345                            test_case)
3346                continue
3347            if test_case in _ALPHA_TEST_CASES and not gcp.alpha_compute:
3348                logger.info('skipping test %s due to missing alpha support',
3349                            test_case)
3350                continue
3351            if test_case in [
3352                    'api_listener', 'forwarding_rule_port_match',
3353                    'forwarding_rule_default_port'
3354            ] and CLIENT_HOSTS:
3355                logger.info(
3356                    'skipping test %s because test configuration is'
3357                    'not compatible with client processes on existing'
3358                    'client hosts', test_case)
3359                continue
3360            if test_case == 'forwarding_rule_default_port':
3361                server_uri = service_host_name
3362            result = jobset.JobResult()
3363            log_dir = os.path.join(_TEST_LOG_BASE_DIR, test_case)
3364            if not os.path.exists(log_dir):
3365                os.makedirs(log_dir)
3366            test_log_filename = os.path.join(log_dir, _SPONGE_LOG_NAME)
3367            test_log_file = open(test_log_filename, 'w+')
3368            client_process = None
3369
3370            if test_case in _TESTS_TO_RUN_MULTIPLE_RPCS:
3371                rpcs_to_send = '--rpc="UnaryCall,EmptyCall"'
3372            else:
3373                rpcs_to_send = '--rpc="UnaryCall"'
3374
3375            if test_case in _TESTS_TO_SEND_METADATA:
3376                metadata_to_send = '--metadata="EmptyCall:{keyE}:{valueE},UnaryCall:{keyU}:{valueU},UnaryCall:{keyNU}:{valueNU}"'.format(
3377                    keyE=_TEST_METADATA_KEY,
3378                    valueE=_TEST_METADATA_VALUE_EMPTY,
3379                    keyU=_TEST_METADATA_KEY,
3380                    valueU=_TEST_METADATA_VALUE_UNARY,
3381                    keyNU=_TEST_METADATA_NUMERIC_KEY,
3382                    valueNU=_TEST_METADATA_NUMERIC_VALUE)
3383            else:
3384                # Setting the arg explicitly to empty with '--metadata=""'
3385                # makes C# client fail
3386                # (see https://github.com/commandlineparser/commandline/issues/412),
3387                # so instead we just rely on clients using the default when
3388                # metadata arg is not specified.
3389                metadata_to_send = ''
3390
3391            # TODO(ericgribkoff) Temporarily disable fail_on_failed_rpc checks
3392            # in the client. This means we will ignore intermittent RPC
3393            # failures (but this framework still checks that the final result
3394            # is as expected).
3395            #
3396            # Reason for disabling this is, the resources are shared by
3397            # multiple tests, and a change in previous test could be delayed
3398            # until the second test starts. The second test may see
3399            # intermittent failures because of that.
3400            #
3401            # A fix is to not share resources between tests (though that does
3402            # mean the tests will be significantly slower due to creating new
3403            # resources).
3404            fail_on_failed_rpc = ''
3405
3406            try:
3407                if not CLIENT_HOSTS:
3408                    client_cmd_formatted = args.client_cmd.format(
3409                        server_uri=server_uri,
3410                        stats_port=args.stats_port,
3411                        qps=args.qps,
3412                        fail_on_failed_rpc=fail_on_failed_rpc,
3413                        rpcs_to_send=rpcs_to_send,
3414                        metadata_to_send=metadata_to_send)
3415                    logger.debug('running client: %s', client_cmd_formatted)
3416                    client_cmd = shlex.split(client_cmd_formatted)
3417                    client_process = subprocess.Popen(client_cmd,
3418                                                      env=client_env,
3419                                                      stderr=subprocess.STDOUT,
3420                                                      stdout=test_log_file)
3421                if test_case == 'backends_restart':
3422                    test_backends_restart(gcp, backend_service, instance_group)
3423                elif test_case == 'change_backend_service':
3424                    test_change_backend_service(gcp, backend_service,
3425                                                instance_group,
3426                                                alternate_backend_service,
3427                                                same_zone_instance_group)
3428                elif test_case == 'gentle_failover':
3429                    test_gentle_failover(gcp, backend_service, instance_group,
3430                                         secondary_zone_instance_group)
3431                elif test_case == 'load_report_based_failover':
3432                    test_load_report_based_failover(
3433                        gcp, backend_service, instance_group,
3434                        secondary_zone_instance_group)
3435                elif test_case == 'ping_pong':
3436                    test_ping_pong(gcp, backend_service, instance_group)
3437                elif test_case == 'remove_instance_group':
3438                    test_remove_instance_group(gcp, backend_service,
3439                                               instance_group,
3440                                               same_zone_instance_group)
3441                elif test_case == 'round_robin':
3442                    test_round_robin(gcp, backend_service, instance_group)
3443                elif test_case == 'secondary_locality_gets_no_requests_on_partial_primary_failure':
3444                    test_secondary_locality_gets_no_requests_on_partial_primary_failure(
3445                        gcp, backend_service, instance_group,
3446                        secondary_zone_instance_group)
3447                elif test_case == 'secondary_locality_gets_requests_on_primary_failure':
3448                    test_secondary_locality_gets_requests_on_primary_failure(
3449                        gcp, backend_service, instance_group,
3450                        secondary_zone_instance_group)
3451                elif test_case == 'traffic_splitting':
3452                    test_traffic_splitting(gcp, backend_service, instance_group,
3453                                           alternate_backend_service,
3454                                           same_zone_instance_group)
3455                elif test_case == 'path_matching':
3456                    test_path_matching(gcp, backend_service, instance_group,
3457                                       alternate_backend_service,
3458                                       same_zone_instance_group)
3459                elif test_case == 'header_matching':
3460                    test_header_matching(gcp, backend_service, instance_group,
3461                                         alternate_backend_service,
3462                                         same_zone_instance_group)
3463                elif test_case == 'circuit_breaking':
3464                    test_circuit_breaking(gcp, backend_service, instance_group,
3465                                          same_zone_instance_group)
3466                elif test_case == 'timeout':
3467                    test_timeout(gcp, backend_service, instance_group)
3468                elif test_case == 'fault_injection':
3469                    test_fault_injection(gcp, backend_service, instance_group)
3470                elif test_case == 'api_listener':
3471                    server_uri = test_api_listener(gcp, backend_service,
3472                                                   instance_group,
3473                                                   alternate_backend_service)
3474                elif test_case == 'forwarding_rule_port_match':
3475                    server_uri = test_forwarding_rule_port_match(
3476                        gcp, backend_service, instance_group)
3477                elif test_case == 'forwarding_rule_default_port':
3478                    server_uri = test_forwarding_rule_default_port(
3479                        gcp, backend_service, instance_group)
3480                elif test_case == 'metadata_filter':
3481                    test_metadata_filter(gcp, backend_service, instance_group,
3482                                         alternate_backend_service,
3483                                         same_zone_instance_group)
3484                elif test_case == 'csds':
3485                    test_csds(gcp, backend_service, instance_group, server_uri)
3486                else:
3487                    logger.error('Unknown test case: %s', test_case)
3488                    sys.exit(1)
3489                if client_process and client_process.poll() is not None:
3490                    raise Exception(
3491                        'Client process exited prematurely with exit code %d' %
3492                        client_process.returncode)
3493                result.state = 'PASSED'
3494                result.returncode = 0
3495            except Exception as e:
3496                logger.exception('Test case %s failed', test_case)
3497                failed_tests.append(test_case)
3498                result.state = 'FAILED'
3499                result.message = str(e)
3500                if args.halt_after_fail:
3501                    # Stop the test suite if one case failed.
3502                    raise
3503            finally:
3504                if client_process:
3505                    if client_process.returncode:
3506                        logger.info('Client exited with code %d' %
3507                                    client_process.returncode)
3508                    else:
3509                        client_process.terminate()
3510                test_log_file.close()
3511                # Workaround for Python 3, as report_utils will invoke decode() on
3512                # result.message, which has a default value of ''.
3513                result.message = result.message.encode('UTF-8')
3514                test_results[test_case] = [result]
3515                if args.log_client_output:
3516                    logger.info('Client output:')
3517                    with open(test_log_filename, 'r') as client_output:
3518                        logger.info(client_output.read())
3519        if not os.path.exists(_TEST_LOG_BASE_DIR):
3520            os.makedirs(_TEST_LOG_BASE_DIR)
3521        report_utils.render_junit_xml_report(test_results,
3522                                             os.path.join(
3523                                                 _TEST_LOG_BASE_DIR,
3524                                                 _SPONGE_XML_NAME),
3525                                             suite_name='xds_tests',
3526                                             multi_target=True)
3527        if failed_tests:
3528            logger.error('Test case(s) %s failed', failed_tests)
3529            sys.exit(1)
3530finally:
3531    keep_resources = args.keep_gcp_resources
3532    if args.halt_after_fail and failed_tests:
3533        logger.info(
3534            'Halt after fail triggered, exiting without cleaning up resources')
3535        keep_resources = True
3536    if not keep_resources:
3537        logger.info('Cleaning up GCP resources. This may take some time.')
3538        clean_up(gcp)
3539