1# Copyright 2020 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import json
16import os
17
18import mock
19import pytest
20
21from google.auth import _credentials_async as credentials
22from google.auth import _default_async as _default
23from google.auth import app_engine
24from google.auth import compute_engine
25from google.auth import environment_vars
26from google.auth import exceptions
27from google.oauth2 import _service_account_async as service_account
28import google.oauth2.credentials
29from tests import test__default as test_default
30
31MOCK_CREDENTIALS = mock.Mock(spec=credentials.CredentialsWithQuotaProject)
32MOCK_CREDENTIALS.with_quota_project.return_value = MOCK_CREDENTIALS
33
34LOAD_FILE_PATCH = mock.patch(
35    "google.auth._default_async.load_credentials_from_file",
36    return_value=(MOCK_CREDENTIALS, mock.sentinel.project_id),
37    autospec=True,
38)
39
40
41def test_load_credentials_from_missing_file():
42    with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
43        _default.load_credentials_from_file("")
44
45    assert excinfo.match(r"not found")
46
47
48def test_load_credentials_from_file_invalid_json(tmpdir):
49    jsonfile = tmpdir.join("invalid.json")
50    jsonfile.write("{")
51
52    with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
53        _default.load_credentials_from_file(str(jsonfile))
54
55    assert excinfo.match(r"not a valid json file")
56
57
58def test_load_credentials_from_file_invalid_type(tmpdir):
59    jsonfile = tmpdir.join("invalid.json")
60    jsonfile.write(json.dumps({"type": "not-a-real-type"}))
61
62    with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
63        _default.load_credentials_from_file(str(jsonfile))
64
65    assert excinfo.match(r"does not have a valid type")
66
67
68def test_load_credentials_from_file_authorized_user():
69    credentials, project_id = _default.load_credentials_from_file(
70        test_default.AUTHORIZED_USER_FILE
71    )
72    assert isinstance(credentials, google.oauth2._credentials_async.Credentials)
73    assert project_id is None
74
75
76def test_load_credentials_from_file_no_type(tmpdir):
77    # use the client_secrets.json, which is valid json but not a
78    # loadable credentials type
79    with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
80        _default.load_credentials_from_file(test_default.CLIENT_SECRETS_FILE)
81
82    assert excinfo.match(r"does not have a valid type")
83    assert excinfo.match(r"Type is None")
84
85
86def test_load_credentials_from_file_authorized_user_bad_format(tmpdir):
87    filename = tmpdir.join("authorized_user_bad.json")
88    filename.write(json.dumps({"type": "authorized_user"}))
89
90    with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
91        _default.load_credentials_from_file(str(filename))
92
93    assert excinfo.match(r"Failed to load authorized user")
94    assert excinfo.match(r"missing fields")
95
96
97def test_load_credentials_from_file_authorized_user_cloud_sdk():
98    with pytest.warns(UserWarning, match="Cloud SDK"):
99        credentials, project_id = _default.load_credentials_from_file(
100            test_default.AUTHORIZED_USER_CLOUD_SDK_FILE
101        )
102    assert isinstance(credentials, google.oauth2._credentials_async.Credentials)
103    assert project_id is None
104
105    # No warning if the json file has quota project id.
106    credentials, project_id = _default.load_credentials_from_file(
107        test_default.AUTHORIZED_USER_CLOUD_SDK_WITH_QUOTA_PROJECT_ID_FILE
108    )
109    assert isinstance(credentials, google.oauth2._credentials_async.Credentials)
110    assert project_id is None
111
112
113def test_load_credentials_from_file_authorized_user_cloud_sdk_with_scopes():
114    with pytest.warns(UserWarning, match="Cloud SDK"):
115        credentials, project_id = _default.load_credentials_from_file(
116            test_default.AUTHORIZED_USER_CLOUD_SDK_FILE,
117            scopes=["https://www.google.com/calendar/feeds"],
118        )
119    assert isinstance(credentials, google.oauth2._credentials_async.Credentials)
120    assert project_id is None
121    assert credentials.scopes == ["https://www.google.com/calendar/feeds"]
122
123
124def test_load_credentials_from_file_authorized_user_cloud_sdk_with_quota_project():
125    credentials, project_id = _default.load_credentials_from_file(
126        test_default.AUTHORIZED_USER_CLOUD_SDK_FILE, quota_project_id="project-foo"
127    )
128
129    assert isinstance(credentials, google.oauth2._credentials_async.Credentials)
130    assert project_id is None
131    assert credentials.quota_project_id == "project-foo"
132
133
134def test_load_credentials_from_file_service_account():
135    credentials, project_id = _default.load_credentials_from_file(
136        test_default.SERVICE_ACCOUNT_FILE
137    )
138    assert isinstance(credentials, service_account.Credentials)
139    assert project_id == test_default.SERVICE_ACCOUNT_FILE_DATA["project_id"]
140
141
142def test_load_credentials_from_file_service_account_with_scopes():
143    credentials, project_id = _default.load_credentials_from_file(
144        test_default.SERVICE_ACCOUNT_FILE,
145        scopes=["https://www.google.com/calendar/feeds"],
146    )
147    assert isinstance(credentials, service_account.Credentials)
148    assert project_id == test_default.SERVICE_ACCOUNT_FILE_DATA["project_id"]
149    assert credentials.scopes == ["https://www.google.com/calendar/feeds"]
150
151
152def test_load_credentials_from_file_service_account_bad_format(tmpdir):
153    filename = tmpdir.join("serivce_account_bad.json")
154    filename.write(json.dumps({"type": "service_account"}))
155
156    with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
157        _default.load_credentials_from_file(str(filename))
158
159    assert excinfo.match(r"Failed to load service account")
160    assert excinfo.match(r"missing fields")
161
162
163@mock.patch.dict(os.environ, {}, clear=True)
164def test__get_explicit_environ_credentials_no_env():
165    assert _default._get_explicit_environ_credentials() == (None, None)
166
167
168@pytest.mark.parametrize("quota_project_id", [None, "project-foo"])
169@LOAD_FILE_PATCH
170def test__get_explicit_environ_credentials(load, quota_project_id, monkeypatch):
171    monkeypatch.setenv(environment_vars.CREDENTIALS, "filename")
172
173    credentials, project_id = _default._get_explicit_environ_credentials(
174        quota_project_id=quota_project_id
175    )
176
177    assert credentials is MOCK_CREDENTIALS
178    assert project_id is mock.sentinel.project_id
179    load.assert_called_with("filename", quota_project_id=quota_project_id)
180
181
182@LOAD_FILE_PATCH
183def test__get_explicit_environ_credentials_no_project_id(load, monkeypatch):
184    load.return_value = MOCK_CREDENTIALS, None
185    monkeypatch.setenv(environment_vars.CREDENTIALS, "filename")
186
187    credentials, project_id = _default._get_explicit_environ_credentials()
188
189    assert credentials is MOCK_CREDENTIALS
190    assert project_id is None
191
192
193@pytest.mark.parametrize("quota_project_id", [None, "project-foo"])
194@mock.patch(
195    "google.auth._cloud_sdk.get_application_default_credentials_path", autospec=True
196)
197@mock.patch("google.auth._default_async._get_gcloud_sdk_credentials", autospec=True)
198def test__get_explicit_environ_credentials_fallback_to_gcloud(
199    get_gcloud_creds, get_adc_path, quota_project_id, monkeypatch
200):
201    # Set explicit credentials path to cloud sdk credentials path.
202    get_adc_path.return_value = "filename"
203    monkeypatch.setenv(environment_vars.CREDENTIALS, "filename")
204
205    _default._get_explicit_environ_credentials(quota_project_id=quota_project_id)
206
207    # Check we fall back to cloud sdk flow since explicit credentials path is
208    # cloud sdk credentials path
209    get_gcloud_creds.assert_called_with(quota_project_id=quota_project_id)
210
211
212@pytest.mark.parametrize("quota_project_id", [None, "project-foo"])
213@LOAD_FILE_PATCH
214@mock.patch(
215    "google.auth._cloud_sdk.get_application_default_credentials_path", autospec=True
216)
217def test__get_gcloud_sdk_credentials(get_adc_path, load, quota_project_id):
218    get_adc_path.return_value = test_default.SERVICE_ACCOUNT_FILE
219
220    credentials, project_id = _default._get_gcloud_sdk_credentials(
221        quota_project_id=quota_project_id
222    )
223
224    assert credentials is MOCK_CREDENTIALS
225    assert project_id is mock.sentinel.project_id
226    load.assert_called_with(
227        test_default.SERVICE_ACCOUNT_FILE, quota_project_id=quota_project_id
228    )
229
230
231@mock.patch(
232    "google.auth._cloud_sdk.get_application_default_credentials_path", autospec=True
233)
234def test__get_gcloud_sdk_credentials_non_existent(get_adc_path, tmpdir):
235    non_existent = tmpdir.join("non-existent")
236    get_adc_path.return_value = str(non_existent)
237
238    credentials, project_id = _default._get_gcloud_sdk_credentials()
239
240    assert credentials is None
241    assert project_id is None
242
243
244@mock.patch(
245    "google.auth._cloud_sdk.get_project_id",
246    return_value=mock.sentinel.project_id,
247    autospec=True,
248)
249@mock.patch("os.path.isfile", return_value=True, autospec=True)
250@LOAD_FILE_PATCH
251def test__get_gcloud_sdk_credentials_project_id(load, unused_isfile, get_project_id):
252    # Don't return a project ID from load file, make the function check
253    # the Cloud SDK project.
254    load.return_value = MOCK_CREDENTIALS, None
255
256    credentials, project_id = _default._get_gcloud_sdk_credentials()
257
258    assert credentials == MOCK_CREDENTIALS
259    assert project_id == mock.sentinel.project_id
260    assert get_project_id.called
261
262
263@mock.patch("google.auth._cloud_sdk.get_project_id", return_value=None, autospec=True)
264@mock.patch("os.path.isfile", return_value=True)
265@LOAD_FILE_PATCH
266def test__get_gcloud_sdk_credentials_no_project_id(load, unused_isfile, get_project_id):
267    # Don't return a project ID from load file, make the function check
268    # the Cloud SDK project.
269    load.return_value = MOCK_CREDENTIALS, None
270
271    credentials, project_id = _default._get_gcloud_sdk_credentials()
272
273    assert credentials == MOCK_CREDENTIALS
274    assert project_id is None
275    assert get_project_id.called
276
277
278class _AppIdentityModule(object):
279    """The interface of the App Idenity app engine module.
280    See https://cloud.google.com/appengine/docs/standard/python/refdocs\
281    /google.appengine.api.app_identity.app_identity
282    """
283
284    def get_application_id(self):
285        raise NotImplementedError()
286
287
288@pytest.fixture
289def app_identity(monkeypatch):
290    """Mocks the app_identity module for google.auth.app_engine."""
291    app_identity_module = mock.create_autospec(_AppIdentityModule, instance=True)
292    monkeypatch.setattr(app_engine, "app_identity", app_identity_module)
293    yield app_identity_module
294
295
296@mock.patch.dict(os.environ)
297def test__get_gae_credentials_gen1(app_identity):
298    os.environ[environment_vars.LEGACY_APPENGINE_RUNTIME] = "python27"
299    app_identity.get_application_id.return_value = mock.sentinel.project
300
301    credentials, project_id = _default._get_gae_credentials()
302
303    assert isinstance(credentials, app_engine.Credentials)
304    assert project_id == mock.sentinel.project
305
306
307@mock.patch.dict(os.environ)
308def test__get_gae_credentials_gen2():
309    os.environ["GAE_RUNTIME"] = "python37"
310    credentials, project_id = _default._get_gae_credentials()
311    assert credentials is None
312    assert project_id is None
313
314
315@mock.patch.dict(os.environ)
316def test__get_gae_credentials_gen2_backwards_compat():
317    # compat helpers may copy GAE_RUNTIME to APPENGINE_RUNTIME
318    # for backwards compatibility with code that relies on it
319    os.environ[environment_vars.LEGACY_APPENGINE_RUNTIME] = "python37"
320    os.environ["GAE_RUNTIME"] = "python37"
321    credentials, project_id = _default._get_gae_credentials()
322    assert credentials is None
323    assert project_id is None
324
325
326def test__get_gae_credentials_env_unset():
327    assert environment_vars.LEGACY_APPENGINE_RUNTIME not in os.environ
328    assert "GAE_RUNTIME" not in os.environ
329    credentials, project_id = _default._get_gae_credentials()
330    assert credentials is None
331    assert project_id is None
332
333
334@mock.patch.dict(os.environ)
335def test__get_gae_credentials_no_app_engine():
336    # test both with and without LEGACY_APPENGINE_RUNTIME setting
337    assert environment_vars.LEGACY_APPENGINE_RUNTIME not in os.environ
338
339    import sys
340
341    with mock.patch.dict(sys.modules, {"google.auth.app_engine": None}):
342        credentials, project_id = _default._get_gae_credentials()
343        assert credentials is None
344        assert project_id is None
345
346        os.environ[environment_vars.LEGACY_APPENGINE_RUNTIME] = "python27"
347        credentials, project_id = _default._get_gae_credentials()
348        assert credentials is None
349        assert project_id is None
350
351
352@mock.patch.dict(os.environ)
353@mock.patch.object(app_engine, "app_identity", new=None)
354def test__get_gae_credentials_no_apis():
355    # test both with and without LEGACY_APPENGINE_RUNTIME setting
356    assert environment_vars.LEGACY_APPENGINE_RUNTIME not in os.environ
357
358    credentials, project_id = _default._get_gae_credentials()
359    assert credentials is None
360    assert project_id is None
361
362    os.environ[environment_vars.LEGACY_APPENGINE_RUNTIME] = "python27"
363    credentials, project_id = _default._get_gae_credentials()
364    assert credentials is None
365    assert project_id is None
366
367
368@mock.patch(
369    "google.auth.compute_engine._metadata.ping", return_value=True, autospec=True
370)
371@mock.patch(
372    "google.auth.compute_engine._metadata.get_project_id",
373    return_value="example-project",
374    autospec=True,
375)
376def test__get_gce_credentials(unused_get, unused_ping):
377    credentials, project_id = _default._get_gce_credentials()
378
379    assert isinstance(credentials, compute_engine.Credentials)
380    assert project_id == "example-project"
381
382
383@mock.patch(
384    "google.auth.compute_engine._metadata.ping", return_value=False, autospec=True
385)
386def test__get_gce_credentials_no_ping(unused_ping):
387    credentials, project_id = _default._get_gce_credentials()
388
389    assert credentials is None
390    assert project_id is None
391
392
393@mock.patch(
394    "google.auth.compute_engine._metadata.ping", return_value=True, autospec=True
395)
396@mock.patch(
397    "google.auth.compute_engine._metadata.get_project_id",
398    side_effect=exceptions.TransportError(),
399    autospec=True,
400)
401def test__get_gce_credentials_no_project_id(unused_get, unused_ping):
402    credentials, project_id = _default._get_gce_credentials()
403
404    assert isinstance(credentials, compute_engine.Credentials)
405    assert project_id is None
406
407
408def test__get_gce_credentials_no_compute_engine():
409    import sys
410
411    with mock.patch.dict("sys.modules"):
412        sys.modules["google.auth.compute_engine"] = None
413        credentials, project_id = _default._get_gce_credentials()
414        assert credentials is None
415        assert project_id is None
416
417
418@mock.patch(
419    "google.auth.compute_engine._metadata.ping", return_value=False, autospec=True
420)
421def test__get_gce_credentials_explicit_request(ping):
422    _default._get_gce_credentials(mock.sentinel.request)
423    ping.assert_called_with(request=mock.sentinel.request)
424
425
426@mock.patch(
427    "google.auth._default_async._get_explicit_environ_credentials",
428    return_value=(MOCK_CREDENTIALS, mock.sentinel.project_id),
429    autospec=True,
430)
431def test_default_early_out(unused_get):
432    assert _default.default_async() == (MOCK_CREDENTIALS, mock.sentinel.project_id)
433
434
435@mock.patch(
436    "google.auth._default_async._get_explicit_environ_credentials",
437    return_value=(MOCK_CREDENTIALS, mock.sentinel.project_id),
438    autospec=True,
439)
440def test_default_explict_project_id(unused_get, monkeypatch):
441    monkeypatch.setenv(environment_vars.PROJECT, "explicit-env")
442    assert _default.default_async() == (MOCK_CREDENTIALS, "explicit-env")
443
444
445@mock.patch(
446    "google.auth._default_async._get_explicit_environ_credentials",
447    return_value=(MOCK_CREDENTIALS, mock.sentinel.project_id),
448    autospec=True,
449)
450def test_default_explict_legacy_project_id(unused_get, monkeypatch):
451    monkeypatch.setenv(environment_vars.LEGACY_PROJECT, "explicit-env")
452    assert _default.default_async() == (MOCK_CREDENTIALS, "explicit-env")
453
454
455@mock.patch("logging.Logger.warning", autospec=True)
456@mock.patch(
457    "google.auth._default_async._get_explicit_environ_credentials",
458    return_value=(MOCK_CREDENTIALS, None),
459    autospec=True,
460)
461@mock.patch(
462    "google.auth._default_async._get_gcloud_sdk_credentials",
463    return_value=(MOCK_CREDENTIALS, None),
464    autospec=True,
465)
466@mock.patch(
467    "google.auth._default_async._get_gae_credentials",
468    return_value=(MOCK_CREDENTIALS, None),
469    autospec=True,
470)
471@mock.patch(
472    "google.auth._default_async._get_gce_credentials",
473    return_value=(MOCK_CREDENTIALS, None),
474    autospec=True,
475)
476def test_default_without_project_id(
477    unused_gce, unused_gae, unused_sdk, unused_explicit, logger_warning
478):
479    assert _default.default_async() == (MOCK_CREDENTIALS, None)
480    logger_warning.assert_called_with(mock.ANY, mock.ANY, mock.ANY)
481
482
483@mock.patch(
484    "google.auth._default_async._get_explicit_environ_credentials",
485    return_value=(None, None),
486    autospec=True,
487)
488@mock.patch(
489    "google.auth._default_async._get_gcloud_sdk_credentials",
490    return_value=(None, None),
491    autospec=True,
492)
493@mock.patch(
494    "google.auth._default_async._get_gae_credentials",
495    return_value=(None, None),
496    autospec=True,
497)
498@mock.patch(
499    "google.auth._default_async._get_gce_credentials",
500    return_value=(None, None),
501    autospec=True,
502)
503def test_default_fail(unused_gce, unused_gae, unused_sdk, unused_explicit):
504    with pytest.raises(exceptions.DefaultCredentialsError):
505        assert _default.default_async()
506
507
508@mock.patch(
509    "google.auth._default_async._get_explicit_environ_credentials",
510    return_value=(MOCK_CREDENTIALS, mock.sentinel.project_id),
511    autospec=True,
512)
513@mock.patch(
514    "google.auth._credentials_async.with_scopes_if_required",
515    return_value=MOCK_CREDENTIALS,
516    autospec=True,
517)
518def test_default_scoped(with_scopes, unused_get):
519    scopes = ["one", "two"]
520
521    credentials, project_id = _default.default_async(scopes=scopes)
522
523    assert credentials == with_scopes.return_value
524    assert project_id == mock.sentinel.project_id
525    with_scopes.assert_called_once_with(MOCK_CREDENTIALS, scopes)
526
527
528@mock.patch(
529    "google.auth._default_async._get_explicit_environ_credentials",
530    return_value=(MOCK_CREDENTIALS, mock.sentinel.project_id),
531    autospec=True,
532)
533def test_default_no_app_engine_compute_engine_module(unused_get):
534    """
535    google.auth.compute_engine and google.auth.app_engine are both optional
536    to allow not including them when using this package. This verifies
537    that default fails gracefully if these modules are absent
538    """
539    import sys
540
541    with mock.patch.dict("sys.modules"):
542        sys.modules["google.auth.compute_engine"] = None
543        sys.modules["google.auth.app_engine"] = None
544        assert _default.default_async() == (MOCK_CREDENTIALS, mock.sentinel.project_id)
545
546
547@mock.patch(
548    "google.auth._cloud_sdk.get_application_default_credentials_path", autospec=True
549)
550def test_default_warning_without_quota_project_id_for_user_creds(get_adc_path):
551    get_adc_path.return_value = test_default.AUTHORIZED_USER_CLOUD_SDK_FILE
552
553    with pytest.warns(UserWarning, match="Cloud SDK"):
554        credentials, project_id = _default.default_async(quota_project_id=None)
555
556
557@mock.patch(
558    "google.auth._cloud_sdk.get_application_default_credentials_path", autospec=True
559)
560def test_default_no_warning_with_quota_project_id_for_user_creds(get_adc_path):
561    get_adc_path.return_value = test_default.AUTHORIZED_USER_CLOUD_SDK_FILE
562
563    credentials, project_id = _default.default_async(quota_project_id="project-foo")
564