1# Copyright 2020 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import json 16import os 17 18import mock 19import pytest 20 21from google.auth import _credentials_async as credentials 22from google.auth import _default_async as _default 23from google.auth import app_engine 24from google.auth import compute_engine 25from google.auth import environment_vars 26from google.auth import exceptions 27from google.oauth2 import _service_account_async as service_account 28import google.oauth2.credentials 29from tests import test__default as test_default 30 31MOCK_CREDENTIALS = mock.Mock(spec=credentials.CredentialsWithQuotaProject) 32MOCK_CREDENTIALS.with_quota_project.return_value = MOCK_CREDENTIALS 33 34LOAD_FILE_PATCH = mock.patch( 35 "google.auth._default_async.load_credentials_from_file", 36 return_value=(MOCK_CREDENTIALS, mock.sentinel.project_id), 37 autospec=True, 38) 39 40 41def test_load_credentials_from_missing_file(): 42 with pytest.raises(exceptions.DefaultCredentialsError) as excinfo: 43 _default.load_credentials_from_file("") 44 45 assert excinfo.match(r"not found") 46 47 48def test_load_credentials_from_file_invalid_json(tmpdir): 49 jsonfile = tmpdir.join("invalid.json") 50 jsonfile.write("{") 51 52 with pytest.raises(exceptions.DefaultCredentialsError) as excinfo: 53 _default.load_credentials_from_file(str(jsonfile)) 54 55 assert excinfo.match(r"not a valid json file") 56 57 58def test_load_credentials_from_file_invalid_type(tmpdir): 59 jsonfile = tmpdir.join("invalid.json") 60 jsonfile.write(json.dumps({"type": "not-a-real-type"})) 61 62 with pytest.raises(exceptions.DefaultCredentialsError) as excinfo: 63 _default.load_credentials_from_file(str(jsonfile)) 64 65 assert excinfo.match(r"does not have a valid type") 66 67 68def test_load_credentials_from_file_authorized_user(): 69 credentials, project_id = _default.load_credentials_from_file( 70 test_default.AUTHORIZED_USER_FILE 71 ) 72 assert isinstance(credentials, google.oauth2._credentials_async.Credentials) 73 assert project_id is None 74 75 76def test_load_credentials_from_file_no_type(tmpdir): 77 # use the client_secrets.json, which is valid json but not a 78 # loadable credentials type 79 with pytest.raises(exceptions.DefaultCredentialsError) as excinfo: 80 _default.load_credentials_from_file(test_default.CLIENT_SECRETS_FILE) 81 82 assert excinfo.match(r"does not have a valid type") 83 assert excinfo.match(r"Type is None") 84 85 86def test_load_credentials_from_file_authorized_user_bad_format(tmpdir): 87 filename = tmpdir.join("authorized_user_bad.json") 88 filename.write(json.dumps({"type": "authorized_user"})) 89 90 with pytest.raises(exceptions.DefaultCredentialsError) as excinfo: 91 _default.load_credentials_from_file(str(filename)) 92 93 assert excinfo.match(r"Failed to load authorized user") 94 assert excinfo.match(r"missing fields") 95 96 97def test_load_credentials_from_file_authorized_user_cloud_sdk(): 98 with pytest.warns(UserWarning, match="Cloud SDK"): 99 credentials, project_id = _default.load_credentials_from_file( 100 test_default.AUTHORIZED_USER_CLOUD_SDK_FILE 101 ) 102 assert isinstance(credentials, google.oauth2._credentials_async.Credentials) 103 assert project_id is None 104 105 # No warning if the json file has quota project id. 106 credentials, project_id = _default.load_credentials_from_file( 107 test_default.AUTHORIZED_USER_CLOUD_SDK_WITH_QUOTA_PROJECT_ID_FILE 108 ) 109 assert isinstance(credentials, google.oauth2._credentials_async.Credentials) 110 assert project_id is None 111 112 113def test_load_credentials_from_file_authorized_user_cloud_sdk_with_scopes(): 114 with pytest.warns(UserWarning, match="Cloud SDK"): 115 credentials, project_id = _default.load_credentials_from_file( 116 test_default.AUTHORIZED_USER_CLOUD_SDK_FILE, 117 scopes=["https://www.google.com/calendar/feeds"], 118 ) 119 assert isinstance(credentials, google.oauth2._credentials_async.Credentials) 120 assert project_id is None 121 assert credentials.scopes == ["https://www.google.com/calendar/feeds"] 122 123 124def test_load_credentials_from_file_authorized_user_cloud_sdk_with_quota_project(): 125 credentials, project_id = _default.load_credentials_from_file( 126 test_default.AUTHORIZED_USER_CLOUD_SDK_FILE, quota_project_id="project-foo" 127 ) 128 129 assert isinstance(credentials, google.oauth2._credentials_async.Credentials) 130 assert project_id is None 131 assert credentials.quota_project_id == "project-foo" 132 133 134def test_load_credentials_from_file_service_account(): 135 credentials, project_id = _default.load_credentials_from_file( 136 test_default.SERVICE_ACCOUNT_FILE 137 ) 138 assert isinstance(credentials, service_account.Credentials) 139 assert project_id == test_default.SERVICE_ACCOUNT_FILE_DATA["project_id"] 140 141 142def test_load_credentials_from_file_service_account_with_scopes(): 143 credentials, project_id = _default.load_credentials_from_file( 144 test_default.SERVICE_ACCOUNT_FILE, 145 scopes=["https://www.google.com/calendar/feeds"], 146 ) 147 assert isinstance(credentials, service_account.Credentials) 148 assert project_id == test_default.SERVICE_ACCOUNT_FILE_DATA["project_id"] 149 assert credentials.scopes == ["https://www.google.com/calendar/feeds"] 150 151 152def test_load_credentials_from_file_service_account_bad_format(tmpdir): 153 filename = tmpdir.join("serivce_account_bad.json") 154 filename.write(json.dumps({"type": "service_account"})) 155 156 with pytest.raises(exceptions.DefaultCredentialsError) as excinfo: 157 _default.load_credentials_from_file(str(filename)) 158 159 assert excinfo.match(r"Failed to load service account") 160 assert excinfo.match(r"missing fields") 161 162 163@mock.patch.dict(os.environ, {}, clear=True) 164def test__get_explicit_environ_credentials_no_env(): 165 assert _default._get_explicit_environ_credentials() == (None, None) 166 167 168@pytest.mark.parametrize("quota_project_id", [None, "project-foo"]) 169@LOAD_FILE_PATCH 170def test__get_explicit_environ_credentials(load, quota_project_id, monkeypatch): 171 monkeypatch.setenv(environment_vars.CREDENTIALS, "filename") 172 173 credentials, project_id = _default._get_explicit_environ_credentials( 174 quota_project_id=quota_project_id 175 ) 176 177 assert credentials is MOCK_CREDENTIALS 178 assert project_id is mock.sentinel.project_id 179 load.assert_called_with("filename", quota_project_id=quota_project_id) 180 181 182@LOAD_FILE_PATCH 183def test__get_explicit_environ_credentials_no_project_id(load, monkeypatch): 184 load.return_value = MOCK_CREDENTIALS, None 185 monkeypatch.setenv(environment_vars.CREDENTIALS, "filename") 186 187 credentials, project_id = _default._get_explicit_environ_credentials() 188 189 assert credentials is MOCK_CREDENTIALS 190 assert project_id is None 191 192 193@pytest.mark.parametrize("quota_project_id", [None, "project-foo"]) 194@mock.patch( 195 "google.auth._cloud_sdk.get_application_default_credentials_path", autospec=True 196) 197@mock.patch("google.auth._default_async._get_gcloud_sdk_credentials", autospec=True) 198def test__get_explicit_environ_credentials_fallback_to_gcloud( 199 get_gcloud_creds, get_adc_path, quota_project_id, monkeypatch 200): 201 # Set explicit credentials path to cloud sdk credentials path. 202 get_adc_path.return_value = "filename" 203 monkeypatch.setenv(environment_vars.CREDENTIALS, "filename") 204 205 _default._get_explicit_environ_credentials(quota_project_id=quota_project_id) 206 207 # Check we fall back to cloud sdk flow since explicit credentials path is 208 # cloud sdk credentials path 209 get_gcloud_creds.assert_called_with(quota_project_id=quota_project_id) 210 211 212@pytest.mark.parametrize("quota_project_id", [None, "project-foo"]) 213@LOAD_FILE_PATCH 214@mock.patch( 215 "google.auth._cloud_sdk.get_application_default_credentials_path", autospec=True 216) 217def test__get_gcloud_sdk_credentials(get_adc_path, load, quota_project_id): 218 get_adc_path.return_value = test_default.SERVICE_ACCOUNT_FILE 219 220 credentials, project_id = _default._get_gcloud_sdk_credentials( 221 quota_project_id=quota_project_id 222 ) 223 224 assert credentials is MOCK_CREDENTIALS 225 assert project_id is mock.sentinel.project_id 226 load.assert_called_with( 227 test_default.SERVICE_ACCOUNT_FILE, quota_project_id=quota_project_id 228 ) 229 230 231@mock.patch( 232 "google.auth._cloud_sdk.get_application_default_credentials_path", autospec=True 233) 234def test__get_gcloud_sdk_credentials_non_existent(get_adc_path, tmpdir): 235 non_existent = tmpdir.join("non-existent") 236 get_adc_path.return_value = str(non_existent) 237 238 credentials, project_id = _default._get_gcloud_sdk_credentials() 239 240 assert credentials is None 241 assert project_id is None 242 243 244@mock.patch( 245 "google.auth._cloud_sdk.get_project_id", 246 return_value=mock.sentinel.project_id, 247 autospec=True, 248) 249@mock.patch("os.path.isfile", return_value=True, autospec=True) 250@LOAD_FILE_PATCH 251def test__get_gcloud_sdk_credentials_project_id(load, unused_isfile, get_project_id): 252 # Don't return a project ID from load file, make the function check 253 # the Cloud SDK project. 254 load.return_value = MOCK_CREDENTIALS, None 255 256 credentials, project_id = _default._get_gcloud_sdk_credentials() 257 258 assert credentials == MOCK_CREDENTIALS 259 assert project_id == mock.sentinel.project_id 260 assert get_project_id.called 261 262 263@mock.patch("google.auth._cloud_sdk.get_project_id", return_value=None, autospec=True) 264@mock.patch("os.path.isfile", return_value=True) 265@LOAD_FILE_PATCH 266def test__get_gcloud_sdk_credentials_no_project_id(load, unused_isfile, get_project_id): 267 # Don't return a project ID from load file, make the function check 268 # the Cloud SDK project. 269 load.return_value = MOCK_CREDENTIALS, None 270 271 credentials, project_id = _default._get_gcloud_sdk_credentials() 272 273 assert credentials == MOCK_CREDENTIALS 274 assert project_id is None 275 assert get_project_id.called 276 277 278class _AppIdentityModule(object): 279 """The interface of the App Idenity app engine module. 280 See https://cloud.google.com/appengine/docs/standard/python/refdocs\ 281 /google.appengine.api.app_identity.app_identity 282 """ 283 284 def get_application_id(self): 285 raise NotImplementedError() 286 287 288@pytest.fixture 289def app_identity(monkeypatch): 290 """Mocks the app_identity module for google.auth.app_engine.""" 291 app_identity_module = mock.create_autospec(_AppIdentityModule, instance=True) 292 monkeypatch.setattr(app_engine, "app_identity", app_identity_module) 293 yield app_identity_module 294 295 296@mock.patch.dict(os.environ) 297def test__get_gae_credentials_gen1(app_identity): 298 os.environ[environment_vars.LEGACY_APPENGINE_RUNTIME] = "python27" 299 app_identity.get_application_id.return_value = mock.sentinel.project 300 301 credentials, project_id = _default._get_gae_credentials() 302 303 assert isinstance(credentials, app_engine.Credentials) 304 assert project_id == mock.sentinel.project 305 306 307@mock.patch.dict(os.environ) 308def test__get_gae_credentials_gen2(): 309 os.environ["GAE_RUNTIME"] = "python37" 310 credentials, project_id = _default._get_gae_credentials() 311 assert credentials is None 312 assert project_id is None 313 314 315@mock.patch.dict(os.environ) 316def test__get_gae_credentials_gen2_backwards_compat(): 317 # compat helpers may copy GAE_RUNTIME to APPENGINE_RUNTIME 318 # for backwards compatibility with code that relies on it 319 os.environ[environment_vars.LEGACY_APPENGINE_RUNTIME] = "python37" 320 os.environ["GAE_RUNTIME"] = "python37" 321 credentials, project_id = _default._get_gae_credentials() 322 assert credentials is None 323 assert project_id is None 324 325 326def test__get_gae_credentials_env_unset(): 327 assert environment_vars.LEGACY_APPENGINE_RUNTIME not in os.environ 328 assert "GAE_RUNTIME" not in os.environ 329 credentials, project_id = _default._get_gae_credentials() 330 assert credentials is None 331 assert project_id is None 332 333 334@mock.patch.dict(os.environ) 335def test__get_gae_credentials_no_app_engine(): 336 # test both with and without LEGACY_APPENGINE_RUNTIME setting 337 assert environment_vars.LEGACY_APPENGINE_RUNTIME not in os.environ 338 339 import sys 340 341 with mock.patch.dict(sys.modules, {"google.auth.app_engine": None}): 342 credentials, project_id = _default._get_gae_credentials() 343 assert credentials is None 344 assert project_id is None 345 346 os.environ[environment_vars.LEGACY_APPENGINE_RUNTIME] = "python27" 347 credentials, project_id = _default._get_gae_credentials() 348 assert credentials is None 349 assert project_id is None 350 351 352@mock.patch.dict(os.environ) 353@mock.patch.object(app_engine, "app_identity", new=None) 354def test__get_gae_credentials_no_apis(): 355 # test both with and without LEGACY_APPENGINE_RUNTIME setting 356 assert environment_vars.LEGACY_APPENGINE_RUNTIME not in os.environ 357 358 credentials, project_id = _default._get_gae_credentials() 359 assert credentials is None 360 assert project_id is None 361 362 os.environ[environment_vars.LEGACY_APPENGINE_RUNTIME] = "python27" 363 credentials, project_id = _default._get_gae_credentials() 364 assert credentials is None 365 assert project_id is None 366 367 368@mock.patch( 369 "google.auth.compute_engine._metadata.ping", return_value=True, autospec=True 370) 371@mock.patch( 372 "google.auth.compute_engine._metadata.get_project_id", 373 return_value="example-project", 374 autospec=True, 375) 376def test__get_gce_credentials(unused_get, unused_ping): 377 credentials, project_id = _default._get_gce_credentials() 378 379 assert isinstance(credentials, compute_engine.Credentials) 380 assert project_id == "example-project" 381 382 383@mock.patch( 384 "google.auth.compute_engine._metadata.ping", return_value=False, autospec=True 385) 386def test__get_gce_credentials_no_ping(unused_ping): 387 credentials, project_id = _default._get_gce_credentials() 388 389 assert credentials is None 390 assert project_id is None 391 392 393@mock.patch( 394 "google.auth.compute_engine._metadata.ping", return_value=True, autospec=True 395) 396@mock.patch( 397 "google.auth.compute_engine._metadata.get_project_id", 398 side_effect=exceptions.TransportError(), 399 autospec=True, 400) 401def test__get_gce_credentials_no_project_id(unused_get, unused_ping): 402 credentials, project_id = _default._get_gce_credentials() 403 404 assert isinstance(credentials, compute_engine.Credentials) 405 assert project_id is None 406 407 408def test__get_gce_credentials_no_compute_engine(): 409 import sys 410 411 with mock.patch.dict("sys.modules"): 412 sys.modules["google.auth.compute_engine"] = None 413 credentials, project_id = _default._get_gce_credentials() 414 assert credentials is None 415 assert project_id is None 416 417 418@mock.patch( 419 "google.auth.compute_engine._metadata.ping", return_value=False, autospec=True 420) 421def test__get_gce_credentials_explicit_request(ping): 422 _default._get_gce_credentials(mock.sentinel.request) 423 ping.assert_called_with(request=mock.sentinel.request) 424 425 426@mock.patch( 427 "google.auth._default_async._get_explicit_environ_credentials", 428 return_value=(MOCK_CREDENTIALS, mock.sentinel.project_id), 429 autospec=True, 430) 431def test_default_early_out(unused_get): 432 assert _default.default_async() == (MOCK_CREDENTIALS, mock.sentinel.project_id) 433 434 435@mock.patch( 436 "google.auth._default_async._get_explicit_environ_credentials", 437 return_value=(MOCK_CREDENTIALS, mock.sentinel.project_id), 438 autospec=True, 439) 440def test_default_explict_project_id(unused_get, monkeypatch): 441 monkeypatch.setenv(environment_vars.PROJECT, "explicit-env") 442 assert _default.default_async() == (MOCK_CREDENTIALS, "explicit-env") 443 444 445@mock.patch( 446 "google.auth._default_async._get_explicit_environ_credentials", 447 return_value=(MOCK_CREDENTIALS, mock.sentinel.project_id), 448 autospec=True, 449) 450def test_default_explict_legacy_project_id(unused_get, monkeypatch): 451 monkeypatch.setenv(environment_vars.LEGACY_PROJECT, "explicit-env") 452 assert _default.default_async() == (MOCK_CREDENTIALS, "explicit-env") 453 454 455@mock.patch("logging.Logger.warning", autospec=True) 456@mock.patch( 457 "google.auth._default_async._get_explicit_environ_credentials", 458 return_value=(MOCK_CREDENTIALS, None), 459 autospec=True, 460) 461@mock.patch( 462 "google.auth._default_async._get_gcloud_sdk_credentials", 463 return_value=(MOCK_CREDENTIALS, None), 464 autospec=True, 465) 466@mock.patch( 467 "google.auth._default_async._get_gae_credentials", 468 return_value=(MOCK_CREDENTIALS, None), 469 autospec=True, 470) 471@mock.patch( 472 "google.auth._default_async._get_gce_credentials", 473 return_value=(MOCK_CREDENTIALS, None), 474 autospec=True, 475) 476def test_default_without_project_id( 477 unused_gce, unused_gae, unused_sdk, unused_explicit, logger_warning 478): 479 assert _default.default_async() == (MOCK_CREDENTIALS, None) 480 logger_warning.assert_called_with(mock.ANY, mock.ANY, mock.ANY) 481 482 483@mock.patch( 484 "google.auth._default_async._get_explicit_environ_credentials", 485 return_value=(None, None), 486 autospec=True, 487) 488@mock.patch( 489 "google.auth._default_async._get_gcloud_sdk_credentials", 490 return_value=(None, None), 491 autospec=True, 492) 493@mock.patch( 494 "google.auth._default_async._get_gae_credentials", 495 return_value=(None, None), 496 autospec=True, 497) 498@mock.patch( 499 "google.auth._default_async._get_gce_credentials", 500 return_value=(None, None), 501 autospec=True, 502) 503def test_default_fail(unused_gce, unused_gae, unused_sdk, unused_explicit): 504 with pytest.raises(exceptions.DefaultCredentialsError): 505 assert _default.default_async() 506 507 508@mock.patch( 509 "google.auth._default_async._get_explicit_environ_credentials", 510 return_value=(MOCK_CREDENTIALS, mock.sentinel.project_id), 511 autospec=True, 512) 513@mock.patch( 514 "google.auth._credentials_async.with_scopes_if_required", 515 return_value=MOCK_CREDENTIALS, 516 autospec=True, 517) 518def test_default_scoped(with_scopes, unused_get): 519 scopes = ["one", "two"] 520 521 credentials, project_id = _default.default_async(scopes=scopes) 522 523 assert credentials == with_scopes.return_value 524 assert project_id == mock.sentinel.project_id 525 with_scopes.assert_called_once_with(MOCK_CREDENTIALS, scopes) 526 527 528@mock.patch( 529 "google.auth._default_async._get_explicit_environ_credentials", 530 return_value=(MOCK_CREDENTIALS, mock.sentinel.project_id), 531 autospec=True, 532) 533def test_default_no_app_engine_compute_engine_module(unused_get): 534 """ 535 google.auth.compute_engine and google.auth.app_engine are both optional 536 to allow not including them when using this package. This verifies 537 that default fails gracefully if these modules are absent 538 """ 539 import sys 540 541 with mock.patch.dict("sys.modules"): 542 sys.modules["google.auth.compute_engine"] = None 543 sys.modules["google.auth.app_engine"] = None 544 assert _default.default_async() == (MOCK_CREDENTIALS, mock.sentinel.project_id) 545 546 547@mock.patch( 548 "google.auth._cloud_sdk.get_application_default_credentials_path", autospec=True 549) 550def test_default_warning_without_quota_project_id_for_user_creds(get_adc_path): 551 get_adc_path.return_value = test_default.AUTHORIZED_USER_CLOUD_SDK_FILE 552 553 with pytest.warns(UserWarning, match="Cloud SDK"): 554 credentials, project_id = _default.default_async(quota_project_id=None) 555 556 557@mock.patch( 558 "google.auth._cloud_sdk.get_application_default_credentials_path", autospec=True 559) 560def test_default_no_warning_with_quota_project_id_for_user_creds(get_adc_path): 561 get_adc_path.return_value = test_default.AUTHORIZED_USER_CLOUD_SDK_FILE 562 563 credentials, project_id = _default.default_async(quota_project_id="project-foo") 564