1# Copyright 2020 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import datetime
16import json
17
18import mock
19import pytest
20from six.moves import http_client
21from six.moves import urllib
22
23from google.auth import _helpers
24from google.auth import aws
25from google.auth import environment_vars
26from google.auth import exceptions
27from google.auth import transport
28
29
30CLIENT_ID = "username"
31CLIENT_SECRET = "password"
32# Base64 encoding of "username:password".
33BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ="
34SERVICE_ACCOUNT_EMAIL = "[email protected]"
35SERVICE_ACCOUNT_IMPERSONATION_URL = (
36    "https://us-east1-iamcredentials.googleapis.com/v1/projects/-"
37    + "/serviceAccounts/{}:generateAccessToken".format(SERVICE_ACCOUNT_EMAIL)
38)
39QUOTA_PROJECT_ID = "QUOTA_PROJECT_ID"
40SCOPES = ["scope1", "scope2"]
41TOKEN_URL = "https://sts.googleapis.com/v1/token"
42SUBJECT_TOKEN_TYPE = "urn:ietf:params:aws:token-type:aws4_request"
43AUDIENCE = "//iam.googleapis.com/projects/123456/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID"
44REGION_URL = "http://169.254.169.254/latest/meta-data/placement/availability-zone"
45SECURITY_CREDS_URL = "http://169.254.169.254/latest/meta-data/iam/security-credentials"
46CRED_VERIFICATION_URL = (
47    "https://sts.{region}.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15"
48)
49# Sample AWS security credentials to be used with tests that require a session token.
50ACCESS_KEY_ID = "ASIARD4OQDT6A77FR3CL"
51SECRET_ACCESS_KEY = "Y8AfSaucF37G4PpvfguKZ3/l7Id4uocLXxX0+VTx"
52TOKEN = "IQoJb3JpZ2luX2VjEIz//////////wEaCXVzLWVhc3QtMiJGMEQCIH7MHX/Oy/OB8OlLQa9GrqU1B914+iMikqWQW7vPCKlgAiA/Lsv8Jcafn14owfxXn95FURZNKaaphj0ykpmS+Ki+CSq0AwhlEAAaDDA3NzA3MTM5MTk5NiIMx9sAeP1ovlMTMKLjKpEDwuJQg41/QUKx0laTZYjPlQvjwSqS3OB9P1KAXPWSLkliVMMqaHqelvMF/WO/glv3KwuTfQsavRNs3v5pcSEm4SPO3l7mCs7KrQUHwGP0neZhIKxEXy+Ls//1C/Bqt53NL+LSbaGv6RPHaX82laz2qElphg95aVLdYgIFY6JWV5fzyjgnhz0DQmy62/Vi8pNcM2/VnxeCQ8CC8dRDSt52ry2v+nc77vstuI9xV5k8mPtnaPoJDRANh0bjwY5Sdwkbp+mGRUJBAQRlNgHUJusefXQgVKBCiyJY4w3Csd8Bgj9IyDV+Azuy1jQqfFZWgP68LSz5bURyIjlWDQunO82stZ0BgplKKAa/KJHBPCp8Qi6i99uy7qh76FQAqgVTsnDuU6fGpHDcsDSGoCls2HgZjZFPeOj8mmRhFk1Xqvkbjuz8V1cJk54d3gIJvQt8gD2D6yJQZecnuGWd5K2e2HohvCc8Fc9kBl1300nUJPV+k4tr/A5R/0QfEKOZL1/k5lf1g9CREnrM8LVkGxCgdYMxLQow1uTL+QU67AHRRSp5PhhGX4Rek+01vdYSnJCMaPhSEgcLqDlQkhk6MPsyT91QMXcWmyO+cAZwUPwnRamFepuP4K8k2KVXs/LIJHLELwAZ0ekyaS7CptgOqS7uaSTFG3U+vzFZLEnGvWQ7y9IPNQZ+Dffgh4p3vF4J68y9049sI6Sr5d5wbKkcbm8hdCDHZcv4lnqohquPirLiFQ3q7B17V9krMPu3mz1cg4Ekgcrn/E09NTsxAqD8NcZ7C7ECom9r+X3zkDOxaajW6hu3Az8hGlyylDaMiFfRbBJpTIlxp7jfa7CxikNgNtEKLH9iCzvuSg2vhA=="
53# To avoid json.dumps() differing behavior from one version to other,
54# the JSON payload is hardcoded.
55REQUEST_PARAMS = '{"KeySchema":[{"KeyType":"HASH","AttributeName":"Id"}],"TableName":"TestTable","AttributeDefinitions":[{"AttributeName":"Id","AttributeType":"S"}],"ProvisionedThroughput":{"WriteCapacityUnits":5,"ReadCapacityUnits":5}}'
56# Each tuple contains the following entries:
57# region, time, credentials, original_request, signed_request
58TEST_FIXTURES = [
59    # GET request (AWS botocore tests).
60    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-vanilla.req
61    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-vanilla.sreq
62    (
63        "us-east-1",
64        "2011-09-09T23:36:00Z",
65        {
66            "access_key_id": "AKIDEXAMPLE",
67            "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
68        },
69        {
70            "method": "GET",
71            "url": "https://host.foo.com",
72            "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"},
73        },
74        {
75            "url": "https://host.foo.com",
76            "method": "GET",
77            "headers": {
78                "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=b27ccfbfa7df52a200ff74193ca6e32d4b48b8856fab7ebf1c595d0670a7e470",
79                "host": "host.foo.com",
80                "date": "Mon, 09 Sep 2011 23:36:00 GMT",
81            },
82        },
83    ),
84    # GET request with relative path (AWS botocore tests).
85    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-relative-relative.req
86    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-relative-relative.sreq
87    (
88        "us-east-1",
89        "2011-09-09T23:36:00Z",
90        {
91            "access_key_id": "AKIDEXAMPLE",
92            "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
93        },
94        {
95            "method": "GET",
96            "url": "https://host.foo.com/foo/bar/../..",
97            "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"},
98        },
99        {
100            "url": "https://host.foo.com/foo/bar/../..",
101            "method": "GET",
102            "headers": {
103                "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=b27ccfbfa7df52a200ff74193ca6e32d4b48b8856fab7ebf1c595d0670a7e470",
104                "host": "host.foo.com",
105                "date": "Mon, 09 Sep 2011 23:36:00 GMT",
106            },
107        },
108    ),
109    # GET request with /./ path (AWS botocore tests).
110    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-slash-dot-slash.req
111    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-slash-dot-slash.sreq
112    (
113        "us-east-1",
114        "2011-09-09T23:36:00Z",
115        {
116            "access_key_id": "AKIDEXAMPLE",
117            "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
118        },
119        {
120            "method": "GET",
121            "url": "https://host.foo.com/./",
122            "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"},
123        },
124        {
125            "url": "https://host.foo.com/./",
126            "method": "GET",
127            "headers": {
128                "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=b27ccfbfa7df52a200ff74193ca6e32d4b48b8856fab7ebf1c595d0670a7e470",
129                "host": "host.foo.com",
130                "date": "Mon, 09 Sep 2011 23:36:00 GMT",
131            },
132        },
133    ),
134    # GET request with pointless dot path (AWS botocore tests).
135    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-slash-pointless-dot.req
136    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-slash-pointless-dot.sreq
137    (
138        "us-east-1",
139        "2011-09-09T23:36:00Z",
140        {
141            "access_key_id": "AKIDEXAMPLE",
142            "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
143        },
144        {
145            "method": "GET",
146            "url": "https://host.foo.com/./foo",
147            "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"},
148        },
149        {
150            "url": "https://host.foo.com/./foo",
151            "method": "GET",
152            "headers": {
153                "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=910e4d6c9abafaf87898e1eb4c929135782ea25bb0279703146455745391e63a",
154                "host": "host.foo.com",
155                "date": "Mon, 09 Sep 2011 23:36:00 GMT",
156            },
157        },
158    ),
159    # GET request with utf8 path (AWS botocore tests).
160    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-utf8.req
161    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-utf8.sreq
162    (
163        "us-east-1",
164        "2011-09-09T23:36:00Z",
165        {
166            "access_key_id": "AKIDEXAMPLE",
167            "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
168        },
169        {
170            "method": "GET",
171            "url": "https://host.foo.com/%E1%88%B4",
172            "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"},
173        },
174        {
175            "url": "https://host.foo.com/%E1%88%B4",
176            "method": "GET",
177            "headers": {
178                "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=8d6634c189aa8c75c2e51e106b6b5121bed103fdb351f7d7d4381c738823af74",
179                "host": "host.foo.com",
180                "date": "Mon, 09 Sep 2011 23:36:00 GMT",
181            },
182        },
183    ),
184    # GET request with duplicate query key (AWS botocore tests).
185    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-vanilla-query-order-key-case.req
186    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-vanilla-query-order-key-case.sreq
187    (
188        "us-east-1",
189        "2011-09-09T23:36:00Z",
190        {
191            "access_key_id": "AKIDEXAMPLE",
192            "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
193        },
194        {
195            "method": "GET",
196            "url": "https://host.foo.com/?foo=Zoo&foo=aha",
197            "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"},
198        },
199        {
200            "url": "https://host.foo.com/?foo=Zoo&foo=aha",
201            "method": "GET",
202            "headers": {
203                "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=be7148d34ebccdc6423b19085378aa0bee970bdc61d144bd1a8c48c33079ab09",
204                "host": "host.foo.com",
205                "date": "Mon, 09 Sep 2011 23:36:00 GMT",
206            },
207        },
208    ),
209    # GET request with duplicate out of order query key (AWS botocore tests).
210    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-vanilla-query-order-value.req
211    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-vanilla-query-order-value.sreq
212    (
213        "us-east-1",
214        "2011-09-09T23:36:00Z",
215        {
216            "access_key_id": "AKIDEXAMPLE",
217            "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
218        },
219        {
220            "method": "GET",
221            "url": "https://host.foo.com/?foo=b&foo=a",
222            "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"},
223        },
224        {
225            "url": "https://host.foo.com/?foo=b&foo=a",
226            "method": "GET",
227            "headers": {
228                "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=feb926e49e382bec75c9d7dcb2a1b6dc8aa50ca43c25d2bc51143768c0875acc",
229                "host": "host.foo.com",
230                "date": "Mon, 09 Sep 2011 23:36:00 GMT",
231            },
232        },
233    ),
234    # GET request with utf8 query (AWS botocore tests).
235    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-vanilla-ut8-query.req
236    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-vanilla-ut8-query.sreq
237    (
238        "us-east-1",
239        "2011-09-09T23:36:00Z",
240        {
241            "access_key_id": "AKIDEXAMPLE",
242            "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
243        },
244        {
245            "method": "GET",
246            "url": "https://host.foo.com/?{}=bar".format(
247                urllib.parse.unquote("%E1%88%B4")
248            ),
249            "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"},
250        },
251        {
252            "url": "https://host.foo.com/?{}=bar".format(
253                urllib.parse.unquote("%E1%88%B4")
254            ),
255            "method": "GET",
256            "headers": {
257                "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=6fb359e9a05394cc7074e0feb42573a2601abc0c869a953e8c5c12e4e01f1a8c",
258                "host": "host.foo.com",
259                "date": "Mon, 09 Sep 2011 23:36:00 GMT",
260            },
261        },
262    ),
263    # POST request with sorted headers (AWS botocore tests).
264    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/post-header-key-sort.req
265    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/post-header-key-sort.sreq
266    (
267        "us-east-1",
268        "2011-09-09T23:36:00Z",
269        {
270            "access_key_id": "AKIDEXAMPLE",
271            "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
272        },
273        {
274            "method": "POST",
275            "url": "https://host.foo.com/",
276            "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT", "ZOO": "zoobar"},
277        },
278        {
279            "url": "https://host.foo.com/",
280            "method": "POST",
281            "headers": {
282                "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host;zoo, Signature=b7a95a52518abbca0964a999a880429ab734f35ebbf1235bd79a5de87756dc4a",
283                "host": "host.foo.com",
284                "date": "Mon, 09 Sep 2011 23:36:00 GMT",
285                "ZOO": "zoobar",
286            },
287        },
288    ),
289    # POST request with upper case header value from AWS Python test harness.
290    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/post-header-value-case.req
291    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/post-header-value-case.sreq
292    (
293        "us-east-1",
294        "2011-09-09T23:36:00Z",
295        {
296            "access_key_id": "AKIDEXAMPLE",
297            "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
298        },
299        {
300            "method": "POST",
301            "url": "https://host.foo.com/",
302            "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT", "zoo": "ZOOBAR"},
303        },
304        {
305            "url": "https://host.foo.com/",
306            "method": "POST",
307            "headers": {
308                "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host;zoo, Signature=273313af9d0c265c531e11db70bbd653f3ba074c1009239e8559d3987039cad7",
309                "host": "host.foo.com",
310                "date": "Mon, 09 Sep 2011 23:36:00 GMT",
311                "zoo": "ZOOBAR",
312            },
313        },
314    ),
315    # POST request with header and no body (AWS botocore tests).
316    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-header-value-trim.req
317    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-header-value-trim.sreq
318    (
319        "us-east-1",
320        "2011-09-09T23:36:00Z",
321        {
322            "access_key_id": "AKIDEXAMPLE",
323            "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
324        },
325        {
326            "method": "POST",
327            "url": "https://host.foo.com/",
328            "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT", "p": "phfft"},
329        },
330        {
331            "url": "https://host.foo.com/",
332            "method": "POST",
333            "headers": {
334                "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host;p, Signature=debf546796015d6f6ded8626f5ce98597c33b47b9164cf6b17b4642036fcb592",
335                "host": "host.foo.com",
336                "date": "Mon, 09 Sep 2011 23:36:00 GMT",
337                "p": "phfft",
338            },
339        },
340    ),
341    # POST request with body and no header (AWS botocore tests).
342    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/post-x-www-form-urlencoded.req
343    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/post-x-www-form-urlencoded.sreq
344    (
345        "us-east-1",
346        "2011-09-09T23:36:00Z",
347        {
348            "access_key_id": "AKIDEXAMPLE",
349            "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
350        },
351        {
352            "method": "POST",
353            "url": "https://host.foo.com/",
354            "headers": {
355                "Content-Type": "application/x-www-form-urlencoded",
356                "date": "Mon, 09 Sep 2011 23:36:00 GMT",
357            },
358            "data": "foo=bar",
359        },
360        {
361            "url": "https://host.foo.com/",
362            "method": "POST",
363            "headers": {
364                "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=content-type;date;host, Signature=5a15b22cf462f047318703b92e6f4f38884e4a7ab7b1d6426ca46a8bd1c26cbc",
365                "host": "host.foo.com",
366                "Content-Type": "application/x-www-form-urlencoded",
367                "date": "Mon, 09 Sep 2011 23:36:00 GMT",
368            },
369            "data": "foo=bar",
370        },
371    ),
372    # POST request with querystring (AWS botocore tests).
373    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/post-vanilla-query.req
374    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/post-vanilla-query.sreq
375    (
376        "us-east-1",
377        "2011-09-09T23:36:00Z",
378        {
379            "access_key_id": "AKIDEXAMPLE",
380            "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
381        },
382        {
383            "method": "POST",
384            "url": "https://host.foo.com/?foo=bar",
385            "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"},
386        },
387        {
388            "url": "https://host.foo.com/?foo=bar",
389            "method": "POST",
390            "headers": {
391                "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=b6e3b79003ce0743a491606ba1035a804593b0efb1e20a11cba83f8c25a57a92",
392                "host": "host.foo.com",
393                "date": "Mon, 09 Sep 2011 23:36:00 GMT",
394            },
395        },
396    ),
397    # GET request with session token credentials.
398    (
399        "us-east-2",
400        "2020-08-11T06:55:22Z",
401        {
402            "access_key_id": ACCESS_KEY_ID,
403            "secret_access_key": SECRET_ACCESS_KEY,
404            "security_token": TOKEN,
405        },
406        {
407            "method": "GET",
408            "url": "https://ec2.us-east-2.amazonaws.com?Action=DescribeRegions&Version=2013-10-15",
409        },
410        {
411            "url": "https://ec2.us-east-2.amazonaws.com?Action=DescribeRegions&Version=2013-10-15",
412            "method": "GET",
413            "headers": {
414                "Authorization": "AWS4-HMAC-SHA256 Credential="
415                + ACCESS_KEY_ID
416                + "/20200811/us-east-2/ec2/aws4_request, SignedHeaders=host;x-amz-date;x-amz-security-token, Signature=631ea80cddfaa545fdadb120dc92c9f18166e38a5c47b50fab9fce476e022855",
417                "host": "ec2.us-east-2.amazonaws.com",
418                "x-amz-date": "20200811T065522Z",
419                "x-amz-security-token": TOKEN,
420            },
421        },
422    ),
423    # POST request with session token credentials.
424    (
425        "us-east-2",
426        "2020-08-11T06:55:22Z",
427        {
428            "access_key_id": ACCESS_KEY_ID,
429            "secret_access_key": SECRET_ACCESS_KEY,
430            "security_token": TOKEN,
431        },
432        {
433            "method": "POST",
434            "url": "https://sts.us-east-2.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15",
435        },
436        {
437            "url": "https://sts.us-east-2.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15",
438            "method": "POST",
439            "headers": {
440                "Authorization": "AWS4-HMAC-SHA256 Credential="
441                + ACCESS_KEY_ID
442                + "/20200811/us-east-2/sts/aws4_request, SignedHeaders=host;x-amz-date;x-amz-security-token, Signature=73452984e4a880ffdc5c392355733ec3f5ba310d5e0609a89244440cadfe7a7a",
443                "host": "sts.us-east-2.amazonaws.com",
444                "x-amz-date": "20200811T065522Z",
445                "x-amz-security-token": TOKEN,
446            },
447        },
448    ),
449    # POST request with computed x-amz-date and no data.
450    (
451        "us-east-2",
452        "2020-08-11T06:55:22Z",
453        {"access_key_id": ACCESS_KEY_ID, "secret_access_key": SECRET_ACCESS_KEY},
454        {
455            "method": "POST",
456            "url": "https://sts.us-east-2.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15",
457        },
458        {
459            "url": "https://sts.us-east-2.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15",
460            "method": "POST",
461            "headers": {
462                "Authorization": "AWS4-HMAC-SHA256 Credential="
463                + ACCESS_KEY_ID
464                + "/20200811/us-east-2/sts/aws4_request, SignedHeaders=host;x-amz-date, Signature=d095ba304919cd0d5570ba8a3787884ee78b860f268ed040ba23831d55536d56",
465                "host": "sts.us-east-2.amazonaws.com",
466                "x-amz-date": "20200811T065522Z",
467            },
468        },
469    ),
470    # POST request with session token and additional headers/data.
471    (
472        "us-east-2",
473        "2020-08-11T06:55:22Z",
474        {
475            "access_key_id": ACCESS_KEY_ID,
476            "secret_access_key": SECRET_ACCESS_KEY,
477            "security_token": TOKEN,
478        },
479        {
480            "method": "POST",
481            "url": "https://dynamodb.us-east-2.amazonaws.com/",
482            "headers": {
483                "Content-Type": "application/x-amz-json-1.0",
484                "x-amz-target": "DynamoDB_20120810.CreateTable",
485            },
486            "data": REQUEST_PARAMS,
487        },
488        {
489            "url": "https://dynamodb.us-east-2.amazonaws.com/",
490            "method": "POST",
491            "headers": {
492                "Authorization": "AWS4-HMAC-SHA256 Credential="
493                + ACCESS_KEY_ID
494                + "/20200811/us-east-2/dynamodb/aws4_request, SignedHeaders=content-type;host;x-amz-date;x-amz-security-token;x-amz-target, Signature=fdaa5b9cc9c86b80fe61eaf504141c0b3523780349120f2bd8145448456e0385",
495                "host": "dynamodb.us-east-2.amazonaws.com",
496                "x-amz-date": "20200811T065522Z",
497                "Content-Type": "application/x-amz-json-1.0",
498                "x-amz-target": "DynamoDB_20120810.CreateTable",
499                "x-amz-security-token": TOKEN,
500            },
501            "data": REQUEST_PARAMS,
502        },
503    ),
504]
505
506
507class TestRequestSigner(object):
508    @pytest.mark.parametrize(
509        "region, time, credentials, original_request, signed_request", TEST_FIXTURES
510    )
511    @mock.patch("google.auth._helpers.utcnow")
512    def test_get_request_options(
513        self, utcnow, region, time, credentials, original_request, signed_request
514    ):
515        utcnow.return_value = datetime.datetime.strptime(time, "%Y-%m-%dT%H:%M:%SZ")
516        request_signer = aws.RequestSigner(region)
517        actual_signed_request = request_signer.get_request_options(
518            credentials,
519            original_request.get("url"),
520            original_request.get("method"),
521            original_request.get("data"),
522            original_request.get("headers"),
523        )
524
525        assert actual_signed_request == signed_request
526
527    def test_get_request_options_with_missing_scheme_url(self):
528        request_signer = aws.RequestSigner("us-east-2")
529
530        with pytest.raises(ValueError) as excinfo:
531            request_signer.get_request_options(
532                {
533                    "access_key_id": ACCESS_KEY_ID,
534                    "secret_access_key": SECRET_ACCESS_KEY,
535                },
536                "invalid",
537                "POST",
538            )
539
540        assert excinfo.match(r"Invalid AWS service URL")
541
542    def test_get_request_options_with_invalid_scheme_url(self):
543        request_signer = aws.RequestSigner("us-east-2")
544
545        with pytest.raises(ValueError) as excinfo:
546            request_signer.get_request_options(
547                {
548                    "access_key_id": ACCESS_KEY_ID,
549                    "secret_access_key": SECRET_ACCESS_KEY,
550                },
551                "http://invalid",
552                "POST",
553            )
554
555        assert excinfo.match(r"Invalid AWS service URL")
556
557    def test_get_request_options_with_missing_hostname_url(self):
558        request_signer = aws.RequestSigner("us-east-2")
559
560        with pytest.raises(ValueError) as excinfo:
561            request_signer.get_request_options(
562                {
563                    "access_key_id": ACCESS_KEY_ID,
564                    "secret_access_key": SECRET_ACCESS_KEY,
565                },
566                "https://",
567                "POST",
568            )
569
570        assert excinfo.match(r"Invalid AWS service URL")
571
572
573class TestCredentials(object):
574    AWS_REGION = "us-east-2"
575    AWS_ROLE = "gcp-aws-role"
576    AWS_SECURITY_CREDENTIALS_RESPONSE = {
577        "AccessKeyId": ACCESS_KEY_ID,
578        "SecretAccessKey": SECRET_ACCESS_KEY,
579        "Token": TOKEN,
580    }
581    AWS_SIGNATURE_TIME = "2020-08-11T06:55:22Z"
582    CREDENTIAL_SOURCE = {
583        "environment_id": "aws1",
584        "region_url": REGION_URL,
585        "url": SECURITY_CREDS_URL,
586        "regional_cred_verification_url": CRED_VERIFICATION_URL,
587    }
588    SUCCESS_RESPONSE = {
589        "access_token": "ACCESS_TOKEN",
590        "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
591        "token_type": "Bearer",
592        "expires_in": 3600,
593        "scope": " ".join(SCOPES),
594    }
595
596    @classmethod
597    def make_serialized_aws_signed_request(
598        cls,
599        aws_security_credentials,
600        region_name="us-east-2",
601        url="https://sts.us-east-2.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15",
602    ):
603        """Utility to generate serialize AWS signed requests.
604        This makes it easy to assert generated subject tokens based on the
605        provided AWS security credentials, regions and AWS STS endpoint.
606        """
607        request_signer = aws.RequestSigner(region_name)
608        signed_request = request_signer.get_request_options(
609            aws_security_credentials, url, "POST"
610        )
611        reformatted_signed_request = {
612            "url": signed_request.get("url"),
613            "method": signed_request.get("method"),
614            "headers": [
615                {
616                    "key": "Authorization",
617                    "value": signed_request.get("headers").get("Authorization"),
618                },
619                {"key": "host", "value": signed_request.get("headers").get("host")},
620                {
621                    "key": "x-amz-date",
622                    "value": signed_request.get("headers").get("x-amz-date"),
623                },
624            ],
625        }
626        # Include security token if available.
627        if "security_token" in aws_security_credentials:
628            reformatted_signed_request.get("headers").append(
629                {
630                    "key": "x-amz-security-token",
631                    "value": signed_request.get("headers").get("x-amz-security-token"),
632                }
633            )
634        # Append x-goog-cloud-target-resource header.
635        reformatted_signed_request.get("headers").append(
636            {"key": "x-goog-cloud-target-resource", "value": AUDIENCE}
637        ),
638        return urllib.parse.quote(
639            json.dumps(
640                reformatted_signed_request, separators=(",", ":"), sort_keys=True
641            )
642        )
643
644    @classmethod
645    def make_mock_request(
646        cls,
647        region_status=None,
648        region_name=None,
649        role_status=None,
650        role_name=None,
651        security_credentials_status=None,
652        security_credentials_data=None,
653        token_status=None,
654        token_data=None,
655        impersonation_status=None,
656        impersonation_data=None,
657    ):
658        """Utility function to generate a mock HTTP request object.
659        This will facilitate testing various edge cases by specify how the
660        various endpoints will respond while generating a Google Access token
661        in an AWS environment.
662        """
663        responses = []
664        if region_status:
665            # AWS region request.
666            region_response = mock.create_autospec(transport.Response, instance=True)
667            region_response.status = region_status
668            if region_name:
669                region_response.data = "{}b".format(region_name).encode("utf-8")
670            responses.append(region_response)
671
672        if role_status:
673            # AWS role name request.
674            role_response = mock.create_autospec(transport.Response, instance=True)
675            role_response.status = role_status
676            if role_name:
677                role_response.data = role_name.encode("utf-8")
678            responses.append(role_response)
679
680        if security_credentials_status:
681            # AWS security credentials request.
682            security_credentials_response = mock.create_autospec(
683                transport.Response, instance=True
684            )
685            security_credentials_response.status = security_credentials_status
686            if security_credentials_data:
687                security_credentials_response.data = json.dumps(
688                    security_credentials_data
689                ).encode("utf-8")
690            responses.append(security_credentials_response)
691
692        if token_status:
693            # GCP token exchange request.
694            token_response = mock.create_autospec(transport.Response, instance=True)
695            token_response.status = token_status
696            token_response.data = json.dumps(token_data).encode("utf-8")
697            responses.append(token_response)
698
699        if impersonation_status:
700            # Service account impersonation request.
701            impersonation_response = mock.create_autospec(
702                transport.Response, instance=True
703            )
704            impersonation_response.status = impersonation_status
705            impersonation_response.data = json.dumps(impersonation_data).encode("utf-8")
706            responses.append(impersonation_response)
707
708        request = mock.create_autospec(transport.Request)
709        request.side_effect = responses
710
711        return request
712
713    @classmethod
714    def make_credentials(
715        cls,
716        credential_source,
717        client_id=None,
718        client_secret=None,
719        quota_project_id=None,
720        scopes=None,
721        default_scopes=None,
722        service_account_impersonation_url=None,
723    ):
724        return aws.Credentials(
725            audience=AUDIENCE,
726            subject_token_type=SUBJECT_TOKEN_TYPE,
727            token_url=TOKEN_URL,
728            service_account_impersonation_url=service_account_impersonation_url,
729            credential_source=credential_source,
730            client_id=client_id,
731            client_secret=client_secret,
732            quota_project_id=quota_project_id,
733            scopes=scopes,
734            default_scopes=default_scopes,
735        )
736
737    @classmethod
738    def assert_aws_metadata_request_kwargs(cls, request_kwargs, url, headers=None):
739        assert request_kwargs["url"] == url
740        # All used AWS metadata server endpoints use GET HTTP method.
741        assert request_kwargs["method"] == "GET"
742        if headers:
743            assert request_kwargs["headers"] == headers
744        else:
745            assert "headers" not in request_kwargs
746        # None of the endpoints used require any data in request.
747        assert "body" not in request_kwargs
748
749    @classmethod
750    def assert_token_request_kwargs(
751        cls, request_kwargs, headers, request_data, token_url=TOKEN_URL
752    ):
753        assert request_kwargs["url"] == token_url
754        assert request_kwargs["method"] == "POST"
755        assert request_kwargs["headers"] == headers
756        assert request_kwargs["body"] is not None
757        body_tuples = urllib.parse.parse_qsl(request_kwargs["body"])
758        assert len(body_tuples) == len(request_data.keys())
759        for (k, v) in body_tuples:
760            assert v.decode("utf-8") == request_data[k.decode("utf-8")]
761
762    @classmethod
763    def assert_impersonation_request_kwargs(
764        cls,
765        request_kwargs,
766        headers,
767        request_data,
768        service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
769    ):
770        assert request_kwargs["url"] == service_account_impersonation_url
771        assert request_kwargs["method"] == "POST"
772        assert request_kwargs["headers"] == headers
773        assert request_kwargs["body"] is not None
774        body_json = json.loads(request_kwargs["body"].decode("utf-8"))
775        assert body_json == request_data
776
777    @mock.patch.object(aws.Credentials, "__init__", return_value=None)
778    def test_from_info_full_options(self, mock_init):
779        credentials = aws.Credentials.from_info(
780            {
781                "audience": AUDIENCE,
782                "subject_token_type": SUBJECT_TOKEN_TYPE,
783                "token_url": TOKEN_URL,
784                "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL,
785                "client_id": CLIENT_ID,
786                "client_secret": CLIENT_SECRET,
787                "quota_project_id": QUOTA_PROJECT_ID,
788                "credential_source": self.CREDENTIAL_SOURCE,
789            }
790        )
791
792        # Confirm aws.Credentials instance initialized with the expected parameters.
793        assert isinstance(credentials, aws.Credentials)
794        mock_init.assert_called_once_with(
795            audience=AUDIENCE,
796            subject_token_type=SUBJECT_TOKEN_TYPE,
797            token_url=TOKEN_URL,
798            service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
799            client_id=CLIENT_ID,
800            client_secret=CLIENT_SECRET,
801            credential_source=self.CREDENTIAL_SOURCE,
802            quota_project_id=QUOTA_PROJECT_ID,
803        )
804
805    @mock.patch.object(aws.Credentials, "__init__", return_value=None)
806    def test_from_info_required_options_only(self, mock_init):
807        credentials = aws.Credentials.from_info(
808            {
809                "audience": AUDIENCE,
810                "subject_token_type": SUBJECT_TOKEN_TYPE,
811                "token_url": TOKEN_URL,
812                "credential_source": self.CREDENTIAL_SOURCE,
813            }
814        )
815
816        # Confirm aws.Credentials instance initialized with the expected parameters.
817        assert isinstance(credentials, aws.Credentials)
818        mock_init.assert_called_once_with(
819            audience=AUDIENCE,
820            subject_token_type=SUBJECT_TOKEN_TYPE,
821            token_url=TOKEN_URL,
822            service_account_impersonation_url=None,
823            client_id=None,
824            client_secret=None,
825            credential_source=self.CREDENTIAL_SOURCE,
826            quota_project_id=None,
827        )
828
829    @mock.patch.object(aws.Credentials, "__init__", return_value=None)
830    def test_from_file_full_options(self, mock_init, tmpdir):
831        info = {
832            "audience": AUDIENCE,
833            "subject_token_type": SUBJECT_TOKEN_TYPE,
834            "token_url": TOKEN_URL,
835            "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL,
836            "client_id": CLIENT_ID,
837            "client_secret": CLIENT_SECRET,
838            "quota_project_id": QUOTA_PROJECT_ID,
839            "credential_source": self.CREDENTIAL_SOURCE,
840        }
841        config_file = tmpdir.join("config.json")
842        config_file.write(json.dumps(info))
843        credentials = aws.Credentials.from_file(str(config_file))
844
845        # Confirm aws.Credentials instance initialized with the expected parameters.
846        assert isinstance(credentials, aws.Credentials)
847        mock_init.assert_called_once_with(
848            audience=AUDIENCE,
849            subject_token_type=SUBJECT_TOKEN_TYPE,
850            token_url=TOKEN_URL,
851            service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
852            client_id=CLIENT_ID,
853            client_secret=CLIENT_SECRET,
854            credential_source=self.CREDENTIAL_SOURCE,
855            quota_project_id=QUOTA_PROJECT_ID,
856        )
857
858    @mock.patch.object(aws.Credentials, "__init__", return_value=None)
859    def test_from_file_required_options_only(self, mock_init, tmpdir):
860        info = {
861            "audience": AUDIENCE,
862            "subject_token_type": SUBJECT_TOKEN_TYPE,
863            "token_url": TOKEN_URL,
864            "credential_source": self.CREDENTIAL_SOURCE,
865        }
866        config_file = tmpdir.join("config.json")
867        config_file.write(json.dumps(info))
868        credentials = aws.Credentials.from_file(str(config_file))
869
870        # Confirm aws.Credentials instance initialized with the expected parameters.
871        assert isinstance(credentials, aws.Credentials)
872        mock_init.assert_called_once_with(
873            audience=AUDIENCE,
874            subject_token_type=SUBJECT_TOKEN_TYPE,
875            token_url=TOKEN_URL,
876            service_account_impersonation_url=None,
877            client_id=None,
878            client_secret=None,
879            credential_source=self.CREDENTIAL_SOURCE,
880            quota_project_id=None,
881        )
882
883    def test_constructor_invalid_credential_source(self):
884        # Provide invalid credential source.
885        credential_source = {"unsupported": "value"}
886
887        with pytest.raises(ValueError) as excinfo:
888            self.make_credentials(credential_source=credential_source)
889
890        assert excinfo.match(r"No valid AWS 'credential_source' provided")
891
892    def test_constructor_invalid_environment_id(self):
893        # Provide invalid environment_id.
894        credential_source = self.CREDENTIAL_SOURCE.copy()
895        credential_source["environment_id"] = "azure1"
896
897        with pytest.raises(ValueError) as excinfo:
898            self.make_credentials(credential_source=credential_source)
899
900        assert excinfo.match(r"No valid AWS 'credential_source' provided")
901
902    def test_constructor_missing_cred_verification_url(self):
903        # regional_cred_verification_url is a required field.
904        credential_source = self.CREDENTIAL_SOURCE.copy()
905        credential_source.pop("regional_cred_verification_url")
906
907        with pytest.raises(ValueError) as excinfo:
908            self.make_credentials(credential_source=credential_source)
909
910        assert excinfo.match(r"No valid AWS 'credential_source' provided")
911
912    def test_constructor_invalid_environment_id_version(self):
913        # Provide an unsupported version.
914        credential_source = self.CREDENTIAL_SOURCE.copy()
915        credential_source["environment_id"] = "aws3"
916
917        with pytest.raises(ValueError) as excinfo:
918            self.make_credentials(credential_source=credential_source)
919
920        assert excinfo.match(r"aws version '3' is not supported in the current build.")
921
922    def test_info(self):
923        credentials = self.make_credentials(
924            credential_source=self.CREDENTIAL_SOURCE.copy()
925        )
926
927        assert credentials.info == {
928            "type": "external_account",
929            "audience": AUDIENCE,
930            "subject_token_type": SUBJECT_TOKEN_TYPE,
931            "token_url": TOKEN_URL,
932            "credential_source": self.CREDENTIAL_SOURCE,
933        }
934
935    def test_retrieve_subject_token_missing_region_url(self):
936        # When AWS_REGION envvar is not available, region_url is required for
937        # determining the current AWS region.
938        credential_source = self.CREDENTIAL_SOURCE.copy()
939        credential_source.pop("region_url")
940        credentials = self.make_credentials(credential_source=credential_source)
941
942        with pytest.raises(exceptions.RefreshError) as excinfo:
943            credentials.retrieve_subject_token(None)
944
945        assert excinfo.match(r"Unable to determine AWS region")
946
947    @mock.patch("google.auth._helpers.utcnow")
948    def test_retrieve_subject_token_success_temp_creds_no_environment_vars(
949        self, utcnow
950    ):
951        utcnow.return_value = datetime.datetime.strptime(
952            self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
953        )
954        request = self.make_mock_request(
955            region_status=http_client.OK,
956            region_name=self.AWS_REGION,
957            role_status=http_client.OK,
958            role_name=self.AWS_ROLE,
959            security_credentials_status=http_client.OK,
960            security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
961        )
962        credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)
963
964        subject_token = credentials.retrieve_subject_token(request)
965
966        assert subject_token == self.make_serialized_aws_signed_request(
967            {
968                "access_key_id": ACCESS_KEY_ID,
969                "secret_access_key": SECRET_ACCESS_KEY,
970                "security_token": TOKEN,
971            }
972        )
973        # Assert region request.
974        self.assert_aws_metadata_request_kwargs(
975            request.call_args_list[0][1], REGION_URL
976        )
977        # Assert role request.
978        self.assert_aws_metadata_request_kwargs(
979            request.call_args_list[1][1], SECURITY_CREDS_URL
980        )
981        # Assert security credentials request.
982        self.assert_aws_metadata_request_kwargs(
983            request.call_args_list[2][1],
984            "{}/{}".format(SECURITY_CREDS_URL, self.AWS_ROLE),
985            {"Content-Type": "application/json"},
986        )
987
988        # Retrieve subject_token again. Region should not be queried again.
989        new_request = self.make_mock_request(
990            role_status=http_client.OK,
991            role_name=self.AWS_ROLE,
992            security_credentials_status=http_client.OK,
993            security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
994        )
995
996        credentials.retrieve_subject_token(new_request)
997
998        # Only 2 requests should be sent as the region is cached.
999        assert len(new_request.call_args_list) == 2
1000        # Assert role request.
1001        self.assert_aws_metadata_request_kwargs(
1002            new_request.call_args_list[0][1], SECURITY_CREDS_URL
1003        )
1004        # Assert security credentials request.
1005        self.assert_aws_metadata_request_kwargs(
1006            new_request.call_args_list[1][1],
1007            "{}/{}".format(SECURITY_CREDS_URL, self.AWS_ROLE),
1008            {"Content-Type": "application/json"},
1009        )
1010
1011    @mock.patch("google.auth._helpers.utcnow")
1012    def test_retrieve_subject_token_success_permanent_creds_no_environment_vars(
1013        self, utcnow
1014    ):
1015        # Simualte a permanent credential without a session token is
1016        # returned by the security-credentials endpoint.
1017        security_creds_response = self.AWS_SECURITY_CREDENTIALS_RESPONSE.copy()
1018        security_creds_response.pop("Token")
1019        utcnow.return_value = datetime.datetime.strptime(
1020            self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
1021        )
1022        request = self.make_mock_request(
1023            region_status=http_client.OK,
1024            region_name=self.AWS_REGION,
1025            role_status=http_client.OK,
1026            role_name=self.AWS_ROLE,
1027            security_credentials_status=http_client.OK,
1028            security_credentials_data=security_creds_response,
1029        )
1030        credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)
1031
1032        subject_token = credentials.retrieve_subject_token(request)
1033
1034        assert subject_token == self.make_serialized_aws_signed_request(
1035            {"access_key_id": ACCESS_KEY_ID, "secret_access_key": SECRET_ACCESS_KEY}
1036        )
1037
1038    @mock.patch("google.auth._helpers.utcnow")
1039    def test_retrieve_subject_token_success_environment_vars(self, utcnow, monkeypatch):
1040        monkeypatch.setenv(environment_vars.AWS_ACCESS_KEY_ID, ACCESS_KEY_ID)
1041        monkeypatch.setenv(environment_vars.AWS_SECRET_ACCESS_KEY, SECRET_ACCESS_KEY)
1042        monkeypatch.setenv(environment_vars.AWS_SESSION_TOKEN, TOKEN)
1043        monkeypatch.setenv(environment_vars.AWS_REGION, self.AWS_REGION)
1044        utcnow.return_value = datetime.datetime.strptime(
1045            self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
1046        )
1047        credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)
1048
1049        subject_token = credentials.retrieve_subject_token(None)
1050
1051        assert subject_token == self.make_serialized_aws_signed_request(
1052            {
1053                "access_key_id": ACCESS_KEY_ID,
1054                "secret_access_key": SECRET_ACCESS_KEY,
1055                "security_token": TOKEN,
1056            }
1057        )
1058
1059    @mock.patch("google.auth._helpers.utcnow")
1060    def test_retrieve_subject_token_success_environment_vars_with_default_region(
1061        self, utcnow, monkeypatch
1062    ):
1063        monkeypatch.setenv(environment_vars.AWS_ACCESS_KEY_ID, ACCESS_KEY_ID)
1064        monkeypatch.setenv(environment_vars.AWS_SECRET_ACCESS_KEY, SECRET_ACCESS_KEY)
1065        monkeypatch.setenv(environment_vars.AWS_SESSION_TOKEN, TOKEN)
1066        monkeypatch.setenv(environment_vars.AWS_DEFAULT_REGION, self.AWS_REGION)
1067        utcnow.return_value = datetime.datetime.strptime(
1068            self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
1069        )
1070        credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)
1071
1072        subject_token = credentials.retrieve_subject_token(None)
1073
1074        assert subject_token == self.make_serialized_aws_signed_request(
1075            {
1076                "access_key_id": ACCESS_KEY_ID,
1077                "secret_access_key": SECRET_ACCESS_KEY,
1078                "security_token": TOKEN,
1079            }
1080        )
1081
1082    @mock.patch("google.auth._helpers.utcnow")
1083    def test_retrieve_subject_token_success_environment_vars_with_both_regions_set(
1084        self, utcnow, monkeypatch
1085    ):
1086        monkeypatch.setenv(environment_vars.AWS_ACCESS_KEY_ID, ACCESS_KEY_ID)
1087        monkeypatch.setenv(environment_vars.AWS_SECRET_ACCESS_KEY, SECRET_ACCESS_KEY)
1088        monkeypatch.setenv(environment_vars.AWS_SESSION_TOKEN, TOKEN)
1089        monkeypatch.setenv(environment_vars.AWS_DEFAULT_REGION, "Malformed AWS Region")
1090        # This test makes sure that the AWS_REGION gets used over AWS_DEFAULT_REGION,
1091        # So, AWS_DEFAULT_REGION is set to something that would cause the test to fail,
1092        # And AWS_REGION is set to the a valid value, and it should succeed
1093        monkeypatch.setenv(environment_vars.AWS_REGION, self.AWS_REGION)
1094        utcnow.return_value = datetime.datetime.strptime(
1095            self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
1096        )
1097        credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)
1098
1099        subject_token = credentials.retrieve_subject_token(None)
1100
1101        assert subject_token == self.make_serialized_aws_signed_request(
1102            {
1103                "access_key_id": ACCESS_KEY_ID,
1104                "secret_access_key": SECRET_ACCESS_KEY,
1105                "security_token": TOKEN,
1106            }
1107        )
1108
1109    @mock.patch("google.auth._helpers.utcnow")
1110    def test_retrieve_subject_token_success_environment_vars_no_session_token(
1111        self, utcnow, monkeypatch
1112    ):
1113        monkeypatch.setenv(environment_vars.AWS_ACCESS_KEY_ID, ACCESS_KEY_ID)
1114        monkeypatch.setenv(environment_vars.AWS_SECRET_ACCESS_KEY, SECRET_ACCESS_KEY)
1115        monkeypatch.setenv(environment_vars.AWS_REGION, self.AWS_REGION)
1116        utcnow.return_value = datetime.datetime.strptime(
1117            self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
1118        )
1119        credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)
1120
1121        subject_token = credentials.retrieve_subject_token(None)
1122
1123        assert subject_token == self.make_serialized_aws_signed_request(
1124            {"access_key_id": ACCESS_KEY_ID, "secret_access_key": SECRET_ACCESS_KEY}
1125        )
1126
1127    @mock.patch("google.auth._helpers.utcnow")
1128    def test_retrieve_subject_token_success_environment_vars_except_region(
1129        self, utcnow, monkeypatch
1130    ):
1131        monkeypatch.setenv(environment_vars.AWS_ACCESS_KEY_ID, ACCESS_KEY_ID)
1132        monkeypatch.setenv(environment_vars.AWS_SECRET_ACCESS_KEY, SECRET_ACCESS_KEY)
1133        monkeypatch.setenv(environment_vars.AWS_SESSION_TOKEN, TOKEN)
1134        utcnow.return_value = datetime.datetime.strptime(
1135            self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
1136        )
1137        # Region will be queried since it is not found in envvars.
1138        request = self.make_mock_request(
1139            region_status=http_client.OK, region_name=self.AWS_REGION
1140        )
1141        credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)
1142
1143        subject_token = credentials.retrieve_subject_token(request)
1144
1145        assert subject_token == self.make_serialized_aws_signed_request(
1146            {
1147                "access_key_id": ACCESS_KEY_ID,
1148                "secret_access_key": SECRET_ACCESS_KEY,
1149                "security_token": TOKEN,
1150            }
1151        )
1152
1153    def test_retrieve_subject_token_error_determining_aws_region(self):
1154        # Simulate error in retrieving the AWS region.
1155        request = self.make_mock_request(region_status=http_client.BAD_REQUEST)
1156        credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)
1157
1158        with pytest.raises(exceptions.RefreshError) as excinfo:
1159            credentials.retrieve_subject_token(request)
1160
1161        assert excinfo.match(r"Unable to retrieve AWS region")
1162
1163    def test_retrieve_subject_token_error_determining_aws_role(self):
1164        # Simulate error in retrieving the AWS role name.
1165        request = self.make_mock_request(
1166            region_status=http_client.OK,
1167            region_name=self.AWS_REGION,
1168            role_status=http_client.BAD_REQUEST,
1169        )
1170        credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)
1171
1172        with pytest.raises(exceptions.RefreshError) as excinfo:
1173            credentials.retrieve_subject_token(request)
1174
1175        assert excinfo.match(r"Unable to retrieve AWS role name")
1176
1177    def test_retrieve_subject_token_error_determining_security_creds_url(self):
1178        # Simulate the security-credentials url is missing. This is needed for
1179        # determining the AWS security credentials when not found in envvars.
1180        credential_source = self.CREDENTIAL_SOURCE.copy()
1181        credential_source.pop("url")
1182        request = self.make_mock_request(
1183            region_status=http_client.OK, region_name=self.AWS_REGION
1184        )
1185        credentials = self.make_credentials(credential_source=credential_source)
1186
1187        with pytest.raises(exceptions.RefreshError) as excinfo:
1188            credentials.retrieve_subject_token(request)
1189
1190        assert excinfo.match(
1191            r"Unable to determine the AWS metadata server security credentials endpoint"
1192        )
1193
1194    def test_retrieve_subject_token_error_determining_aws_security_creds(self):
1195        # Simulate error in retrieving the AWS security credentials.
1196        request = self.make_mock_request(
1197            region_status=http_client.OK,
1198            region_name=self.AWS_REGION,
1199            role_status=http_client.OK,
1200            role_name=self.AWS_ROLE,
1201            security_credentials_status=http_client.BAD_REQUEST,
1202        )
1203        credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)
1204
1205        with pytest.raises(exceptions.RefreshError) as excinfo:
1206            credentials.retrieve_subject_token(request)
1207
1208        assert excinfo.match(r"Unable to retrieve AWS security credentials")
1209
1210    @mock.patch("google.auth._helpers.utcnow")
1211    def test_refresh_success_without_impersonation_ignore_default_scopes(self, utcnow):
1212        utcnow.return_value = datetime.datetime.strptime(
1213            self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
1214        )
1215        expected_subject_token = self.make_serialized_aws_signed_request(
1216            {
1217                "access_key_id": ACCESS_KEY_ID,
1218                "secret_access_key": SECRET_ACCESS_KEY,
1219                "security_token": TOKEN,
1220            }
1221        )
1222        token_headers = {
1223            "Content-Type": "application/x-www-form-urlencoded",
1224            "Authorization": "Basic " + BASIC_AUTH_ENCODING,
1225        }
1226        token_request_data = {
1227            "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
1228            "audience": AUDIENCE,
1229            "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
1230            "scope": " ".join(SCOPES),
1231            "subject_token": expected_subject_token,
1232            "subject_token_type": SUBJECT_TOKEN_TYPE,
1233        }
1234        request = self.make_mock_request(
1235            region_status=http_client.OK,
1236            region_name=self.AWS_REGION,
1237            role_status=http_client.OK,
1238            role_name=self.AWS_ROLE,
1239            security_credentials_status=http_client.OK,
1240            security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
1241            token_status=http_client.OK,
1242            token_data=self.SUCCESS_RESPONSE,
1243        )
1244        credentials = self.make_credentials(
1245            client_id=CLIENT_ID,
1246            client_secret=CLIENT_SECRET,
1247            credential_source=self.CREDENTIAL_SOURCE,
1248            quota_project_id=QUOTA_PROJECT_ID,
1249            scopes=SCOPES,
1250            # Default scopes should be ignored.
1251            default_scopes=["ignored"],
1252        )
1253
1254        credentials.refresh(request)
1255
1256        assert len(request.call_args_list) == 4
1257        # Fourth request should be sent to GCP STS endpoint.
1258        self.assert_token_request_kwargs(
1259            request.call_args_list[3][1], token_headers, token_request_data
1260        )
1261        assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
1262        assert credentials.quota_project_id == QUOTA_PROJECT_ID
1263        assert credentials.scopes == SCOPES
1264        assert credentials.default_scopes == ["ignored"]
1265
1266    @mock.patch("google.auth._helpers.utcnow")
1267    def test_refresh_success_without_impersonation_use_default_scopes(self, utcnow):
1268        utcnow.return_value = datetime.datetime.strptime(
1269            self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
1270        )
1271        expected_subject_token = self.make_serialized_aws_signed_request(
1272            {
1273                "access_key_id": ACCESS_KEY_ID,
1274                "secret_access_key": SECRET_ACCESS_KEY,
1275                "security_token": TOKEN,
1276            }
1277        )
1278        token_headers = {
1279            "Content-Type": "application/x-www-form-urlencoded",
1280            "Authorization": "Basic " + BASIC_AUTH_ENCODING,
1281        }
1282        token_request_data = {
1283            "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
1284            "audience": AUDIENCE,
1285            "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
1286            "scope": " ".join(SCOPES),
1287            "subject_token": expected_subject_token,
1288            "subject_token_type": SUBJECT_TOKEN_TYPE,
1289        }
1290        request = self.make_mock_request(
1291            region_status=http_client.OK,
1292            region_name=self.AWS_REGION,
1293            role_status=http_client.OK,
1294            role_name=self.AWS_ROLE,
1295            security_credentials_status=http_client.OK,
1296            security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
1297            token_status=http_client.OK,
1298            token_data=self.SUCCESS_RESPONSE,
1299        )
1300        credentials = self.make_credentials(
1301            client_id=CLIENT_ID,
1302            client_secret=CLIENT_SECRET,
1303            credential_source=self.CREDENTIAL_SOURCE,
1304            quota_project_id=QUOTA_PROJECT_ID,
1305            scopes=None,
1306            # Default scopes should be used since user specified scopes are none.
1307            default_scopes=SCOPES,
1308        )
1309
1310        credentials.refresh(request)
1311
1312        assert len(request.call_args_list) == 4
1313        # Fourth request should be sent to GCP STS endpoint.
1314        self.assert_token_request_kwargs(
1315            request.call_args_list[3][1], token_headers, token_request_data
1316        )
1317        assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
1318        assert credentials.quota_project_id == QUOTA_PROJECT_ID
1319        assert credentials.scopes is None
1320        assert credentials.default_scopes == SCOPES
1321
1322    @mock.patch("google.auth._helpers.utcnow")
1323    def test_refresh_success_with_impersonation_ignore_default_scopes(self, utcnow):
1324        utcnow.return_value = datetime.datetime.strptime(
1325            self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
1326        )
1327        expire_time = (
1328            _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
1329        ).isoformat("T") + "Z"
1330        expected_subject_token = self.make_serialized_aws_signed_request(
1331            {
1332                "access_key_id": ACCESS_KEY_ID,
1333                "secret_access_key": SECRET_ACCESS_KEY,
1334                "security_token": TOKEN,
1335            }
1336        )
1337        token_headers = {
1338            "Content-Type": "application/x-www-form-urlencoded",
1339            "Authorization": "Basic " + BASIC_AUTH_ENCODING,
1340        }
1341        token_request_data = {
1342            "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
1343            "audience": AUDIENCE,
1344            "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
1345            "scope": "https://www.googleapis.com/auth/iam",
1346            "subject_token": expected_subject_token,
1347            "subject_token_type": SUBJECT_TOKEN_TYPE,
1348        }
1349        # Service account impersonation request/response.
1350        impersonation_response = {
1351            "accessToken": "SA_ACCESS_TOKEN",
1352            "expireTime": expire_time,
1353        }
1354        impersonation_headers = {
1355            "Content-Type": "application/json",
1356            "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
1357            "x-goog-user-project": QUOTA_PROJECT_ID,
1358        }
1359        impersonation_request_data = {
1360            "delegates": None,
1361            "scope": SCOPES,
1362            "lifetime": "3600s",
1363        }
1364        request = self.make_mock_request(
1365            region_status=http_client.OK,
1366            region_name=self.AWS_REGION,
1367            role_status=http_client.OK,
1368            role_name=self.AWS_ROLE,
1369            security_credentials_status=http_client.OK,
1370            security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
1371            token_status=http_client.OK,
1372            token_data=self.SUCCESS_RESPONSE,
1373            impersonation_status=http_client.OK,
1374            impersonation_data=impersonation_response,
1375        )
1376        credentials = self.make_credentials(
1377            client_id=CLIENT_ID,
1378            client_secret=CLIENT_SECRET,
1379            credential_source=self.CREDENTIAL_SOURCE,
1380            service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
1381            quota_project_id=QUOTA_PROJECT_ID,
1382            scopes=SCOPES,
1383            # Default scopes should be ignored.
1384            default_scopes=["ignored"],
1385        )
1386
1387        credentials.refresh(request)
1388
1389        assert len(request.call_args_list) == 5
1390        # Fourth request should be sent to GCP STS endpoint.
1391        self.assert_token_request_kwargs(
1392            request.call_args_list[3][1], token_headers, token_request_data
1393        )
1394        # Fifth request should be sent to iamcredentials endpoint for service
1395        # account impersonation.
1396        self.assert_impersonation_request_kwargs(
1397            request.call_args_list[4][1],
1398            impersonation_headers,
1399            impersonation_request_data,
1400        )
1401        assert credentials.token == impersonation_response["accessToken"]
1402        assert credentials.quota_project_id == QUOTA_PROJECT_ID
1403        assert credentials.scopes == SCOPES
1404        assert credentials.default_scopes == ["ignored"]
1405
1406    @mock.patch("google.auth._helpers.utcnow")
1407    def test_refresh_success_with_impersonation_use_default_scopes(self, utcnow):
1408        utcnow.return_value = datetime.datetime.strptime(
1409            self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
1410        )
1411        expire_time = (
1412            _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
1413        ).isoformat("T") + "Z"
1414        expected_subject_token = self.make_serialized_aws_signed_request(
1415            {
1416                "access_key_id": ACCESS_KEY_ID,
1417                "secret_access_key": SECRET_ACCESS_KEY,
1418                "security_token": TOKEN,
1419            }
1420        )
1421        token_headers = {
1422            "Content-Type": "application/x-www-form-urlencoded",
1423            "Authorization": "Basic " + BASIC_AUTH_ENCODING,
1424        }
1425        token_request_data = {
1426            "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
1427            "audience": AUDIENCE,
1428            "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
1429            "scope": "https://www.googleapis.com/auth/iam",
1430            "subject_token": expected_subject_token,
1431            "subject_token_type": SUBJECT_TOKEN_TYPE,
1432        }
1433        # Service account impersonation request/response.
1434        impersonation_response = {
1435            "accessToken": "SA_ACCESS_TOKEN",
1436            "expireTime": expire_time,
1437        }
1438        impersonation_headers = {
1439            "Content-Type": "application/json",
1440            "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
1441            "x-goog-user-project": QUOTA_PROJECT_ID,
1442        }
1443        impersonation_request_data = {
1444            "delegates": None,
1445            "scope": SCOPES,
1446            "lifetime": "3600s",
1447        }
1448        request = self.make_mock_request(
1449            region_status=http_client.OK,
1450            region_name=self.AWS_REGION,
1451            role_status=http_client.OK,
1452            role_name=self.AWS_ROLE,
1453            security_credentials_status=http_client.OK,
1454            security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
1455            token_status=http_client.OK,
1456            token_data=self.SUCCESS_RESPONSE,
1457            impersonation_status=http_client.OK,
1458            impersonation_data=impersonation_response,
1459        )
1460        credentials = self.make_credentials(
1461            client_id=CLIENT_ID,
1462            client_secret=CLIENT_SECRET,
1463            credential_source=self.CREDENTIAL_SOURCE,
1464            service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
1465            quota_project_id=QUOTA_PROJECT_ID,
1466            scopes=None,
1467            # Default scopes should be used since user specified scopes are none.
1468            default_scopes=SCOPES,
1469        )
1470
1471        credentials.refresh(request)
1472
1473        assert len(request.call_args_list) == 5
1474        # Fourth request should be sent to GCP STS endpoint.
1475        self.assert_token_request_kwargs(
1476            request.call_args_list[3][1], token_headers, token_request_data
1477        )
1478        # Fifth request should be sent to iamcredentials endpoint for service
1479        # account impersonation.
1480        self.assert_impersonation_request_kwargs(
1481            request.call_args_list[4][1],
1482            impersonation_headers,
1483            impersonation_request_data,
1484        )
1485        assert credentials.token == impersonation_response["accessToken"]
1486        assert credentials.quota_project_id == QUOTA_PROJECT_ID
1487        assert credentials.scopes is None
1488        assert credentials.default_scopes == SCOPES
1489
1490    def test_refresh_with_retrieve_subject_token_error(self):
1491        request = self.make_mock_request(region_status=http_client.BAD_REQUEST)
1492        credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)
1493
1494        with pytest.raises(exceptions.RefreshError) as excinfo:
1495            credentials.refresh(request)
1496
1497        assert excinfo.match(r"Unable to retrieve AWS region")
1498