1# Copyright 2021 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# Prerequisites:
16# Make sure to run the setup in scripts/setup_external_accounts.sh
17# and copy the logged constant strings (_AUDIENCE_OIDC, _AUDIENCE_AWS)
18# into this file before running this test suite.
19# Once that is done, this test can be run indefinitely.
20#
21# The only requirement for this test suite to run is to set the environment
22# variable GOOGLE_APPLICATION_CREDENTIALS to point to the expected service
23# account keys whose email is referred to in the setup script.
24#
25# This script follows the following logic.
26# OIDC provider (file-sourced and url-sourced credentials):
27# Use the service account keys to generate a Google ID token using the
28# iamcredentials generateIdToken API, using the default STS audience.
29# This will use the service account client ID as the sub field of the token.
30# This OIDC token will be used as the external subject token to be exchanged
31# for a Google access token via GCP STS endpoint and then to impersonate the
32# original service account key.
33
34
35import json
36import os
37import socket
38from tempfile import NamedTemporaryFile
39import threading
40
41import sys
42import google.auth
43from googleapiclient import discovery
44from six.moves import BaseHTTPServer
45from google.oauth2 import service_account
46import pytest
47from mock import patch
48
49# Populate values from the output of scripts/setup_external_accounts.sh.
50_AUDIENCE_OIDC = "//iam.googleapis.com/projects/79992041559/locations/global/workloadIdentityPools/pool-73wslmxn/providers/oidc-73wslmxn"
51_AUDIENCE_AWS = "//iam.googleapis.com/projects/79992041559/locations/global/workloadIdentityPools/pool-73wslmxn/providers/aws-73wslmxn"
52_ROLE_AWS = "arn:aws:iam::077071391996:role/ci-python-test"
53
54
55def dns_access_direct(request, project_id):
56    # First, get the default credentials.
57    credentials, _ = google.auth.default(
58        scopes=["https://www.googleapis.com/auth/cloud-platform.read-only"],
59        request=request,
60    )
61
62    # Apply the default credentials to the headers to make the request.
63    headers = {}
64    credentials.apply(headers)
65    response = request(
66        url="https://dns.googleapis.com/dns/v1/projects/{}".format(project_id),
67        headers=headers,
68    )
69
70    if response.status == 200:
71        return response.data
72
73
74def dns_access_client_library(_, project_id):
75    service = discovery.build("dns", "v1")
76    request = service.projects().get(project=project_id)
77    return request.execute()
78
79
80@pytest.fixture(params=[dns_access_direct, dns_access_client_library])
81def dns_access(request, http_request, service_account_info):
82    # Fill in the fixtures on the functions,
83    # so that we don't have to fill in the parameters manually.
84    def wrapper():
85        return request.param(http_request, service_account_info["project_id"])
86
87    yield wrapper
88
89
90@pytest.fixture
91def oidc_credentials(service_account_file, http_request):
92    result = service_account.IDTokenCredentials.from_service_account_file(
93        service_account_file, target_audience=_AUDIENCE_OIDC
94    )
95    result.refresh(http_request)
96    yield result
97
98
99@pytest.fixture
100def service_account_info(service_account_file):
101    with open(service_account_file) as f:
102        yield json.load(f)
103
104
105@pytest.fixture
106def aws_oidc_credentials(
107    service_account_file, service_account_info, authenticated_request
108):
109    credentials = service_account.Credentials.from_service_account_file(
110        service_account_file, scopes=["https://www.googleapis.com/auth/cloud-platform"]
111    )
112    result = authenticated_request(credentials)(
113        url="https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/{}:generateIdToken".format(
114            service_account_info["client_email"]
115        ),
116        method="POST",
117        body=json.dumps(
118            {"audience": service_account_info["client_id"], "includeEmail": True}
119        ),
120    )
121    assert result.status == 200
122
123    yield json.loads(result.data)["token"]
124
125
126# Our external accounts tests involve setting up some preconditions, setting a
127# credential file, and then making sure that our client libraries can work with
128# the set credentials.
129def get_project_dns(dns_access, credential_data):
130    with NamedTemporaryFile() as credfile:
131        credfile.write(json.dumps(credential_data).encode("utf-8"))
132        credfile.flush()
133        old_credentials = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS")
134
135        with patch.dict(os.environ, {"GOOGLE_APPLICATION_CREDENTIALS": credfile.name}):
136            # If our setup and credential file are correct,
137            # discovery.build should be able to establish these as the default credentials.
138            return dns_access()
139
140
141def get_xml_value_by_tagname(data, tagname):
142    startIndex = data.index("<{}>".format(tagname))
143    if startIndex >= 0:
144        endIndex = data.index("</{}>".format(tagname), startIndex)
145        if endIndex > startIndex:
146            return data[startIndex + len(tagname) + 2 : endIndex]
147
148
149# This test makes sure that setting an accesible credential file
150# works to allow access to Google resources.
151def test_file_based_external_account(
152    oidc_credentials, service_account_info, dns_access
153):
154    with NamedTemporaryFile() as tmpfile:
155        tmpfile.write(oidc_credentials.token.encode("utf-8"))
156        tmpfile.flush()
157
158        assert get_project_dns(
159            dns_access,
160            {
161                "type": "external_account",
162                "audience": _AUDIENCE_OIDC,
163                "subject_token_type": "urn:ietf:params:oauth:token-type:jwt",
164                "token_url": "https://sts.googleapis.com/v1/token",
165                "service_account_impersonation_url": "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/{}:generateAccessToken".format(
166                    oidc_credentials.service_account_email
167                ),
168                "credential_source": {
169                    "file": tmpfile.name,
170                },
171            },
172        )
173
174
175# This test makes sure that setting up an http server to provide credentials
176# works to allow access to Google resources.
177def test_url_based_external_account(dns_access, oidc_credentials, service_account_info):
178    class TestResponseHandler(BaseHTTPServer.BaseHTTPRequestHandler):
179        def do_GET(self):
180            if self.headers["my-header"] != "expected-value":
181                self.send_response(400)
182                self.send_header("Content-Type", "application/json")
183                self.end_headers()
184                self.wfile.write(
185                    json.dumps({"error": "missing header"}).encode("utf-8")
186                )
187            elif self.path != "/token":
188                self.send_response(400)
189                self.send_header("Content-Type", "application/json")
190                self.end_headers()
191                self.wfile.write(
192                    json.dumps({"error": "incorrect token path"}).encode("utf-8")
193                )
194            else:
195                self.send_response(200)
196                self.send_header("Content-Type", "application/json")
197                self.end_headers()
198                self.wfile.write(
199                    json.dumps({"access_token": oidc_credentials.token}).encode("utf-8")
200                )
201
202    class TestHTTPServer(BaseHTTPServer.HTTPServer, object):
203        def __init__(self):
204            self.port = self._find_open_port()
205            super(TestHTTPServer, self).__init__(("", self.port), TestResponseHandler)
206
207        @staticmethod
208        def _find_open_port():
209            s = socket.socket()
210            s.bind(("", 0))
211            return s.getsockname()[1]
212
213        # This makes sure that the server gets shut down when this variable leaves its "with" block
214        # The python3 HttpServer has __enter__ and __exit__ methods, but python2 does not.
215        # By redefining the __enter__ and __exit__ methods, we ensure that python2 and python3 act similarly
216        def __exit__(self, *args):
217            self.shutdown()
218
219        def __enter__(self):
220            return self
221
222    with TestHTTPServer() as server:
223        threading.Thread(target=server.serve_forever).start()
224
225        assert get_project_dns(
226            dns_access,
227            {
228                "type": "external_account",
229                "audience": _AUDIENCE_OIDC,
230                "subject_token_type": "urn:ietf:params:oauth:token-type:jwt",
231                "token_url": "https://sts.googleapis.com/v1/token",
232                "service_account_impersonation_url": "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/{}:generateAccessToken".format(
233                    oidc_credentials.service_account_email
234                ),
235                "credential_source": {
236                    "url": "http://localhost:{}/token".format(server.port),
237                    "headers": {"my-header": "expected-value"},
238                    "format": {
239                        "type": "json",
240                        "subject_token_field_name": "access_token",
241                    },
242                },
243            },
244        )
245
246
247# AWS provider tests for AWS credentials
248# The test suite will also run tests for AWS credentials. This works as
249# follows. (Note prequisite setup is needed. This is documented in
250# setup_external_accounts.sh).
251# - iamcredentials:generateIdToken is used to generate a Google ID token using
252#   the service account access token. The service account client_id is used as
253#   audience.
254# - AWS STS AssumeRoleWithWebIdentity API is used to exchange this token for
255#   temporary AWS security credentials for a specified AWS ARN role.
256# - AWS_REGION, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and AWS_SESSION_TOKEN
257#   environment variables are set using these credentials before the test is
258#   run simulating an AWS VM.
259# - The test can now be run.
260def test_aws_based_external_account(
261    aws_oidc_credentials, service_account_info, dns_access, http_request
262):
263
264    response = http_request(
265        url=(
266            "https://sts.amazonaws.com/"
267            "?Action=AssumeRoleWithWebIdentity"
268            "&Version=2011-06-15"
269            "&DurationSeconds=3600"
270            "&RoleSessionName=python-test"
271            "&RoleArn={}"
272            "&WebIdentityToken={}"
273        ).format(_ROLE_AWS, aws_oidc_credentials)
274    )
275    assert response.status == 200
276
277    # The returned data is in XML, but loading an XML parser would be overkill.
278    # Searching the return text manually for the start and finish tag.
279    data = response.data.decode("utf-8")
280
281    with patch.dict(
282        os.environ,
283        {
284            "AWS_REGION": "us-east-2",
285            "AWS_ACCESS_KEY_ID": get_xml_value_by_tagname(data, "AccessKeyId"),
286            "AWS_SECRET_ACCESS_KEY": get_xml_value_by_tagname(data, "SecretAccessKey"),
287            "AWS_SESSION_TOKEN": get_xml_value_by_tagname(data, "SessionToken"),
288        },
289    ):
290        assert get_project_dns(
291            dns_access,
292            {
293                "type": "external_account",
294                "audience": _AUDIENCE_AWS,
295                "subject_token_type": "urn:ietf:params:aws:token-type:aws4_request",
296                "token_url": "https://sts.googleapis.com/v1/token",
297                "service_account_impersonation_url": "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/{}:generateAccessToken".format(
298                    service_account_info["client_email"]
299                ),
300                "credential_source": {
301                    "environment_id": "aws1",
302                    "regional_cred_verification_url": "https://sts.{region}.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15",
303                },
304            },
305        )
306