1# Copyright 2021 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# Prerequisites: 16# Make sure to run the setup in scripts/setup_external_accounts.sh 17# and copy the logged constant strings (_AUDIENCE_OIDC, _AUDIENCE_AWS) 18# into this file before running this test suite. 19# Once that is done, this test can be run indefinitely. 20# 21# The only requirement for this test suite to run is to set the environment 22# variable GOOGLE_APPLICATION_CREDENTIALS to point to the expected service 23# account keys whose email is referred to in the setup script. 24# 25# This script follows the following logic. 26# OIDC provider (file-sourced and url-sourced credentials): 27# Use the service account keys to generate a Google ID token using the 28# iamcredentials generateIdToken API, using the default STS audience. 29# This will use the service account client ID as the sub field of the token. 30# This OIDC token will be used as the external subject token to be exchanged 31# for a Google access token via GCP STS endpoint and then to impersonate the 32# original service account key. 33 34 35import json 36import os 37import socket 38from tempfile import NamedTemporaryFile 39import threading 40 41import sys 42import google.auth 43from googleapiclient import discovery 44from six.moves import BaseHTTPServer 45from google.oauth2 import service_account 46import pytest 47from mock import patch 48 49# Populate values from the output of scripts/setup_external_accounts.sh. 50_AUDIENCE_OIDC = "//iam.googleapis.com/projects/79992041559/locations/global/workloadIdentityPools/pool-73wslmxn/providers/oidc-73wslmxn" 51_AUDIENCE_AWS = "//iam.googleapis.com/projects/79992041559/locations/global/workloadIdentityPools/pool-73wslmxn/providers/aws-73wslmxn" 52_ROLE_AWS = "arn:aws:iam::077071391996:role/ci-python-test" 53 54 55def dns_access_direct(request, project_id): 56 # First, get the default credentials. 57 credentials, _ = google.auth.default( 58 scopes=["https://www.googleapis.com/auth/cloud-platform.read-only"], 59 request=request, 60 ) 61 62 # Apply the default credentials to the headers to make the request. 63 headers = {} 64 credentials.apply(headers) 65 response = request( 66 url="https://dns.googleapis.com/dns/v1/projects/{}".format(project_id), 67 headers=headers, 68 ) 69 70 if response.status == 200: 71 return response.data 72 73 74def dns_access_client_library(_, project_id): 75 service = discovery.build("dns", "v1") 76 request = service.projects().get(project=project_id) 77 return request.execute() 78 79 80@pytest.fixture(params=[dns_access_direct, dns_access_client_library]) 81def dns_access(request, http_request, service_account_info): 82 # Fill in the fixtures on the functions, 83 # so that we don't have to fill in the parameters manually. 84 def wrapper(): 85 return request.param(http_request, service_account_info["project_id"]) 86 87 yield wrapper 88 89 90@pytest.fixture 91def oidc_credentials(service_account_file, http_request): 92 result = service_account.IDTokenCredentials.from_service_account_file( 93 service_account_file, target_audience=_AUDIENCE_OIDC 94 ) 95 result.refresh(http_request) 96 yield result 97 98 99@pytest.fixture 100def service_account_info(service_account_file): 101 with open(service_account_file) as f: 102 yield json.load(f) 103 104 105@pytest.fixture 106def aws_oidc_credentials( 107 service_account_file, service_account_info, authenticated_request 108): 109 credentials = service_account.Credentials.from_service_account_file( 110 service_account_file, scopes=["https://www.googleapis.com/auth/cloud-platform"] 111 ) 112 result = authenticated_request(credentials)( 113 url="https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/{}:generateIdToken".format( 114 service_account_info["client_email"] 115 ), 116 method="POST", 117 body=json.dumps( 118 {"audience": service_account_info["client_id"], "includeEmail": True} 119 ), 120 ) 121 assert result.status == 200 122 123 yield json.loads(result.data)["token"] 124 125 126# Our external accounts tests involve setting up some preconditions, setting a 127# credential file, and then making sure that our client libraries can work with 128# the set credentials. 129def get_project_dns(dns_access, credential_data): 130 with NamedTemporaryFile() as credfile: 131 credfile.write(json.dumps(credential_data).encode("utf-8")) 132 credfile.flush() 133 old_credentials = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS") 134 135 with patch.dict(os.environ, {"GOOGLE_APPLICATION_CREDENTIALS": credfile.name}): 136 # If our setup and credential file are correct, 137 # discovery.build should be able to establish these as the default credentials. 138 return dns_access() 139 140 141def get_xml_value_by_tagname(data, tagname): 142 startIndex = data.index("<{}>".format(tagname)) 143 if startIndex >= 0: 144 endIndex = data.index("</{}>".format(tagname), startIndex) 145 if endIndex > startIndex: 146 return data[startIndex + len(tagname) + 2 : endIndex] 147 148 149# This test makes sure that setting an accesible credential file 150# works to allow access to Google resources. 151def test_file_based_external_account( 152 oidc_credentials, service_account_info, dns_access 153): 154 with NamedTemporaryFile() as tmpfile: 155 tmpfile.write(oidc_credentials.token.encode("utf-8")) 156 tmpfile.flush() 157 158 assert get_project_dns( 159 dns_access, 160 { 161 "type": "external_account", 162 "audience": _AUDIENCE_OIDC, 163 "subject_token_type": "urn:ietf:params:oauth:token-type:jwt", 164 "token_url": "https://sts.googleapis.com/v1/token", 165 "service_account_impersonation_url": "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/{}:generateAccessToken".format( 166 oidc_credentials.service_account_email 167 ), 168 "credential_source": { 169 "file": tmpfile.name, 170 }, 171 }, 172 ) 173 174 175# This test makes sure that setting up an http server to provide credentials 176# works to allow access to Google resources. 177def test_url_based_external_account(dns_access, oidc_credentials, service_account_info): 178 class TestResponseHandler(BaseHTTPServer.BaseHTTPRequestHandler): 179 def do_GET(self): 180 if self.headers["my-header"] != "expected-value": 181 self.send_response(400) 182 self.send_header("Content-Type", "application/json") 183 self.end_headers() 184 self.wfile.write( 185 json.dumps({"error": "missing header"}).encode("utf-8") 186 ) 187 elif self.path != "/token": 188 self.send_response(400) 189 self.send_header("Content-Type", "application/json") 190 self.end_headers() 191 self.wfile.write( 192 json.dumps({"error": "incorrect token path"}).encode("utf-8") 193 ) 194 else: 195 self.send_response(200) 196 self.send_header("Content-Type", "application/json") 197 self.end_headers() 198 self.wfile.write( 199 json.dumps({"access_token": oidc_credentials.token}).encode("utf-8") 200 ) 201 202 class TestHTTPServer(BaseHTTPServer.HTTPServer, object): 203 def __init__(self): 204 self.port = self._find_open_port() 205 super(TestHTTPServer, self).__init__(("", self.port), TestResponseHandler) 206 207 @staticmethod 208 def _find_open_port(): 209 s = socket.socket() 210 s.bind(("", 0)) 211 return s.getsockname()[1] 212 213 # This makes sure that the server gets shut down when this variable leaves its "with" block 214 # The python3 HttpServer has __enter__ and __exit__ methods, but python2 does not. 215 # By redefining the __enter__ and __exit__ methods, we ensure that python2 and python3 act similarly 216 def __exit__(self, *args): 217 self.shutdown() 218 219 def __enter__(self): 220 return self 221 222 with TestHTTPServer() as server: 223 threading.Thread(target=server.serve_forever).start() 224 225 assert get_project_dns( 226 dns_access, 227 { 228 "type": "external_account", 229 "audience": _AUDIENCE_OIDC, 230 "subject_token_type": "urn:ietf:params:oauth:token-type:jwt", 231 "token_url": "https://sts.googleapis.com/v1/token", 232 "service_account_impersonation_url": "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/{}:generateAccessToken".format( 233 oidc_credentials.service_account_email 234 ), 235 "credential_source": { 236 "url": "http://localhost:{}/token".format(server.port), 237 "headers": {"my-header": "expected-value"}, 238 "format": { 239 "type": "json", 240 "subject_token_field_name": "access_token", 241 }, 242 }, 243 }, 244 ) 245 246 247# AWS provider tests for AWS credentials 248# The test suite will also run tests for AWS credentials. This works as 249# follows. (Note prequisite setup is needed. This is documented in 250# setup_external_accounts.sh). 251# - iamcredentials:generateIdToken is used to generate a Google ID token using 252# the service account access token. The service account client_id is used as 253# audience. 254# - AWS STS AssumeRoleWithWebIdentity API is used to exchange this token for 255# temporary AWS security credentials for a specified AWS ARN role. 256# - AWS_REGION, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and AWS_SESSION_TOKEN 257# environment variables are set using these credentials before the test is 258# run simulating an AWS VM. 259# - The test can now be run. 260def test_aws_based_external_account( 261 aws_oidc_credentials, service_account_info, dns_access, http_request 262): 263 264 response = http_request( 265 url=( 266 "https://sts.amazonaws.com/" 267 "?Action=AssumeRoleWithWebIdentity" 268 "&Version=2011-06-15" 269 "&DurationSeconds=3600" 270 "&RoleSessionName=python-test" 271 "&RoleArn={}" 272 "&WebIdentityToken={}" 273 ).format(_ROLE_AWS, aws_oidc_credentials) 274 ) 275 assert response.status == 200 276 277 # The returned data is in XML, but loading an XML parser would be overkill. 278 # Searching the return text manually for the start and finish tag. 279 data = response.data.decode("utf-8") 280 281 with patch.dict( 282 os.environ, 283 { 284 "AWS_REGION": "us-east-2", 285 "AWS_ACCESS_KEY_ID": get_xml_value_by_tagname(data, "AccessKeyId"), 286 "AWS_SECRET_ACCESS_KEY": get_xml_value_by_tagname(data, "SecretAccessKey"), 287 "AWS_SESSION_TOKEN": get_xml_value_by_tagname(data, "SessionToken"), 288 }, 289 ): 290 assert get_project_dns( 291 dns_access, 292 { 293 "type": "external_account", 294 "audience": _AUDIENCE_AWS, 295 "subject_token_type": "urn:ietf:params:aws:token-type:aws4_request", 296 "token_url": "https://sts.googleapis.com/v1/token", 297 "service_account_impersonation_url": "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/{}:generateAccessToken".format( 298 service_account_info["client_email"] 299 ), 300 "credential_source": { 301 "environment_id": "aws1", 302 "regional_cred_verification_url": "https://sts.{region}.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15", 303 }, 304 }, 305 ) 306