1# Copyright 2016 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import datetime 16 17import pytest 18 19from google.auth import _helpers 20from google.auth import credentials 21 22 23class CredentialsImpl(credentials.Credentials): 24 def refresh(self, request): 25 self.token = request 26 27 def with_quota_project(self, quota_project_id): 28 raise NotImplementedError() 29 30 31def test_credentials_constructor(): 32 credentials = CredentialsImpl() 33 assert not credentials.token 34 assert not credentials.expiry 35 assert not credentials.expired 36 assert not credentials.valid 37 38 39def test_expired_and_valid(): 40 credentials = CredentialsImpl() 41 credentials.token = "token" 42 43 assert credentials.valid 44 assert not credentials.expired 45 46 # Set the expiration to one second more than now plus the clock skew 47 # accomodation. These credentials should be valid. 48 credentials.expiry = ( 49 datetime.datetime.utcnow() 50 + _helpers.REFRESH_THRESHOLD 51 + datetime.timedelta(seconds=1) 52 ) 53 54 assert credentials.valid 55 assert not credentials.expired 56 57 # Set the credentials expiration to now. Because of the clock skew 58 # accomodation, these credentials should report as expired. 59 credentials.expiry = datetime.datetime.utcnow() 60 61 assert not credentials.valid 62 assert credentials.expired 63 64 65def test_before_request(): 66 credentials = CredentialsImpl() 67 request = "token" 68 headers = {} 69 70 # First call should call refresh, setting the token. 71 credentials.before_request(request, "http://example.com", "GET", headers) 72 assert credentials.valid 73 assert credentials.token == "token" 74 assert headers["authorization"] == "Bearer token" 75 76 request = "token2" 77 headers = {} 78 79 # Second call shouldn't call refresh. 80 credentials.before_request(request, "http://example.com", "GET", headers) 81 assert credentials.valid 82 assert credentials.token == "token" 83 assert headers["authorization"] == "Bearer token" 84 85 86def test_anonymous_credentials_ctor(): 87 anon = credentials.AnonymousCredentials() 88 assert anon.token is None 89 assert anon.expiry is None 90 assert not anon.expired 91 assert anon.valid 92 93 94def test_anonymous_credentials_refresh(): 95 anon = credentials.AnonymousCredentials() 96 request = object() 97 with pytest.raises(ValueError): 98 anon.refresh(request) 99 100 101def test_anonymous_credentials_apply_default(): 102 anon = credentials.AnonymousCredentials() 103 headers = {} 104 anon.apply(headers) 105 assert headers == {} 106 with pytest.raises(ValueError): 107 anon.apply(headers, token="TOKEN") 108 109 110def test_anonymous_credentials_before_request(): 111 anon = credentials.AnonymousCredentials() 112 request = object() 113 method = "GET" 114 url = "https://example.com/api/endpoint" 115 headers = {} 116 anon.before_request(request, method, url, headers) 117 assert headers == {} 118 119 120class ReadOnlyScopedCredentialsImpl(credentials.ReadOnlyScoped, CredentialsImpl): 121 @property 122 def requires_scopes(self): 123 return super(ReadOnlyScopedCredentialsImpl, self).requires_scopes 124 125 126def test_readonly_scoped_credentials_constructor(): 127 credentials = ReadOnlyScopedCredentialsImpl() 128 assert credentials._scopes is None 129 130 131def test_readonly_scoped_credentials_scopes(): 132 credentials = ReadOnlyScopedCredentialsImpl() 133 credentials._scopes = ["one", "two"] 134 assert credentials.scopes == ["one", "two"] 135 assert credentials.has_scopes(["one"]) 136 assert credentials.has_scopes(["two"]) 137 assert credentials.has_scopes(["one", "two"]) 138 assert not credentials.has_scopes(["three"]) 139 140 141def test_readonly_scoped_credentials_requires_scopes(): 142 credentials = ReadOnlyScopedCredentialsImpl() 143 assert not credentials.requires_scopes 144 145 146class RequiresScopedCredentialsImpl(credentials.Scoped, CredentialsImpl): 147 def __init__(self, scopes=None, default_scopes=None): 148 super(RequiresScopedCredentialsImpl, self).__init__() 149 self._scopes = scopes 150 self._default_scopes = default_scopes 151 152 @property 153 def requires_scopes(self): 154 return not self.scopes 155 156 def with_scopes(self, scopes, default_scopes=None): 157 return RequiresScopedCredentialsImpl( 158 scopes=scopes, default_scopes=default_scopes 159 ) 160 161 162def test_create_scoped_if_required_scoped(): 163 unscoped_credentials = RequiresScopedCredentialsImpl() 164 scoped_credentials = credentials.with_scopes_if_required( 165 unscoped_credentials, ["one", "two"] 166 ) 167 168 assert scoped_credentials is not unscoped_credentials 169 assert not scoped_credentials.requires_scopes 170 assert scoped_credentials.has_scopes(["one", "two"]) 171 172 173def test_create_scoped_if_required_not_scopes(): 174 unscoped_credentials = CredentialsImpl() 175 scoped_credentials = credentials.with_scopes_if_required( 176 unscoped_credentials, ["one", "two"] 177 ) 178 179 assert scoped_credentials is unscoped_credentials 180