1# Copyright 2016 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import datetime
16
17import pytest
18
19from google.auth import _helpers
20from google.auth import credentials
21
22
23class CredentialsImpl(credentials.Credentials):
24    def refresh(self, request):
25        self.token = request
26
27    def with_quota_project(self, quota_project_id):
28        raise NotImplementedError()
29
30
31def test_credentials_constructor():
32    credentials = CredentialsImpl()
33    assert not credentials.token
34    assert not credentials.expiry
35    assert not credentials.expired
36    assert not credentials.valid
37
38
39def test_expired_and_valid():
40    credentials = CredentialsImpl()
41    credentials.token = "token"
42
43    assert credentials.valid
44    assert not credentials.expired
45
46    # Set the expiration to one second more than now plus the clock skew
47    # accomodation. These credentials should be valid.
48    credentials.expiry = (
49        datetime.datetime.utcnow()
50        + _helpers.REFRESH_THRESHOLD
51        + datetime.timedelta(seconds=1)
52    )
53
54    assert credentials.valid
55    assert not credentials.expired
56
57    # Set the credentials expiration to now. Because of the clock skew
58    # accomodation, these credentials should report as expired.
59    credentials.expiry = datetime.datetime.utcnow()
60
61    assert not credentials.valid
62    assert credentials.expired
63
64
65def test_before_request():
66    credentials = CredentialsImpl()
67    request = "token"
68    headers = {}
69
70    # First call should call refresh, setting the token.
71    credentials.before_request(request, "http://example.com", "GET", headers)
72    assert credentials.valid
73    assert credentials.token == "token"
74    assert headers["authorization"] == "Bearer token"
75
76    request = "token2"
77    headers = {}
78
79    # Second call shouldn't call refresh.
80    credentials.before_request(request, "http://example.com", "GET", headers)
81    assert credentials.valid
82    assert credentials.token == "token"
83    assert headers["authorization"] == "Bearer token"
84
85
86def test_anonymous_credentials_ctor():
87    anon = credentials.AnonymousCredentials()
88    assert anon.token is None
89    assert anon.expiry is None
90    assert not anon.expired
91    assert anon.valid
92
93
94def test_anonymous_credentials_refresh():
95    anon = credentials.AnonymousCredentials()
96    request = object()
97    with pytest.raises(ValueError):
98        anon.refresh(request)
99
100
101def test_anonymous_credentials_apply_default():
102    anon = credentials.AnonymousCredentials()
103    headers = {}
104    anon.apply(headers)
105    assert headers == {}
106    with pytest.raises(ValueError):
107        anon.apply(headers, token="TOKEN")
108
109
110def test_anonymous_credentials_before_request():
111    anon = credentials.AnonymousCredentials()
112    request = object()
113    method = "GET"
114    url = "https://example.com/api/endpoint"
115    headers = {}
116    anon.before_request(request, method, url, headers)
117    assert headers == {}
118
119
120class ReadOnlyScopedCredentialsImpl(credentials.ReadOnlyScoped, CredentialsImpl):
121    @property
122    def requires_scopes(self):
123        return super(ReadOnlyScopedCredentialsImpl, self).requires_scopes
124
125
126def test_readonly_scoped_credentials_constructor():
127    credentials = ReadOnlyScopedCredentialsImpl()
128    assert credentials._scopes is None
129
130
131def test_readonly_scoped_credentials_scopes():
132    credentials = ReadOnlyScopedCredentialsImpl()
133    credentials._scopes = ["one", "two"]
134    assert credentials.scopes == ["one", "two"]
135    assert credentials.has_scopes(["one"])
136    assert credentials.has_scopes(["two"])
137    assert credentials.has_scopes(["one", "two"])
138    assert not credentials.has_scopes(["three"])
139
140
141def test_readonly_scoped_credentials_requires_scopes():
142    credentials = ReadOnlyScopedCredentialsImpl()
143    assert not credentials.requires_scopes
144
145
146class RequiresScopedCredentialsImpl(credentials.Scoped, CredentialsImpl):
147    def __init__(self, scopes=None, default_scopes=None):
148        super(RequiresScopedCredentialsImpl, self).__init__()
149        self._scopes = scopes
150        self._default_scopes = default_scopes
151
152    @property
153    def requires_scopes(self):
154        return not self.scopes
155
156    def with_scopes(self, scopes, default_scopes=None):
157        return RequiresScopedCredentialsImpl(
158            scopes=scopes, default_scopes=default_scopes
159        )
160
161
162def test_create_scoped_if_required_scoped():
163    unscoped_credentials = RequiresScopedCredentialsImpl()
164    scoped_credentials = credentials.with_scopes_if_required(
165        unscoped_credentials, ["one", "two"]
166    )
167
168    assert scoped_credentials is not unscoped_credentials
169    assert not scoped_credentials.requires_scopes
170    assert scoped_credentials.has_scopes(["one", "two"])
171
172
173def test_create_scoped_if_required_not_scopes():
174    unscoped_credentials = CredentialsImpl()
175    scoped_credentials = credentials.with_scopes_if_required(
176        unscoped_credentials, ["one", "two"]
177    )
178
179    assert scoped_credentials is unscoped_credentials
180