1# Copyright 2016 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import datetime 16import json 17import os 18 19import mock 20import pytest 21import six 22from six.moves import http_client 23from six.moves import urllib 24 25from google.auth import _helpers 26from google.auth import crypt 27from google.auth import exceptions 28from google.auth import jwt 29from google.auth import transport 30from google.oauth2 import _client 31 32 33DATA_DIR = os.path.join(os.path.dirname(__file__), "..", "data") 34 35with open(os.path.join(DATA_DIR, "privatekey.pem"), "rb") as fh: 36 PRIVATE_KEY_BYTES = fh.read() 37 38SIGNER = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, "1") 39 40SCOPES_AS_LIST = [ 41 "https://www.googleapis.com/auth/pubsub", 42 "https://www.googleapis.com/auth/logging.write", 43] 44SCOPES_AS_STRING = ( 45 "https://www.googleapis.com/auth/pubsub" 46 " https://www.googleapis.com/auth/logging.write" 47) 48 49 50def test__handle_error_response(): 51 response_data = {"error": "help", "error_description": "I'm alive"} 52 53 with pytest.raises(exceptions.RefreshError) as excinfo: 54 _client._handle_error_response(response_data) 55 56 assert excinfo.match(r"help: I\'m alive") 57 58 59def test__handle_error_response_non_json(): 60 response_data = {"foo": "bar"} 61 62 with pytest.raises(exceptions.RefreshError) as excinfo: 63 _client._handle_error_response(response_data) 64 65 assert excinfo.match(r"{\"foo\": \"bar\"}") 66 67 68@mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min) 69def test__parse_expiry(unused_utcnow): 70 result = _client._parse_expiry({"expires_in": 500}) 71 assert result == datetime.datetime.min + datetime.timedelta(seconds=500) 72 73 74def test__parse_expiry_none(): 75 assert _client._parse_expiry({}) is None 76 77 78def make_request(response_data, status=http_client.OK): 79 response = mock.create_autospec(transport.Response, instance=True) 80 response.status = status 81 response.data = json.dumps(response_data).encode("utf-8") 82 request = mock.create_autospec(transport.Request) 83 request.return_value = response 84 return request 85 86 87def test__token_endpoint_request(): 88 request = make_request({"test": "response"}) 89 90 result = _client._token_endpoint_request( 91 request, "http://example.com", {"test": "params"} 92 ) 93 94 # Check request call 95 request.assert_called_with( 96 method="POST", 97 url="http://example.com", 98 headers={"Content-Type": "application/x-www-form-urlencoded"}, 99 body="test=params".encode("utf-8"), 100 ) 101 102 # Check result 103 assert result == {"test": "response"} 104 105 106def test__token_endpoint_request_use_json(): 107 request = make_request({"test": "response"}) 108 109 result = _client._token_endpoint_request( 110 request, 111 "http://example.com", 112 {"test": "params"}, 113 access_token="access_token", 114 use_json=True, 115 ) 116 117 # Check request call 118 request.assert_called_with( 119 method="POST", 120 url="http://example.com", 121 headers={ 122 "Content-Type": "application/json", 123 "Authorization": "Bearer access_token", 124 }, 125 body=b'{"test": "params"}', 126 ) 127 128 # Check result 129 assert result == {"test": "response"} 130 131 132def test__token_endpoint_request_error(): 133 request = make_request({}, status=http_client.BAD_REQUEST) 134 135 with pytest.raises(exceptions.RefreshError): 136 _client._token_endpoint_request(request, "http://example.com", {}) 137 138 139def test__token_endpoint_request_internal_failure_error(): 140 request = make_request( 141 {"error_description": "internal_failure"}, status=http_client.BAD_REQUEST 142 ) 143 144 with pytest.raises(exceptions.RefreshError): 145 _client._token_endpoint_request( 146 request, "http://example.com", {"error_description": "internal_failure"} 147 ) 148 149 request = make_request( 150 {"error": "internal_failure"}, status=http_client.BAD_REQUEST 151 ) 152 153 with pytest.raises(exceptions.RefreshError): 154 _client._token_endpoint_request( 155 request, "http://example.com", {"error": "internal_failure"} 156 ) 157 158 159def verify_request_params(request, params): 160 request_body = request.call_args[1]["body"].decode("utf-8") 161 request_params = urllib.parse.parse_qs(request_body) 162 163 for key, value in six.iteritems(params): 164 assert request_params[key][0] == value 165 166 167@mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min) 168def test_jwt_grant(utcnow): 169 request = make_request( 170 {"access_token": "token", "expires_in": 500, "extra": "data"} 171 ) 172 173 token, expiry, extra_data = _client.jwt_grant( 174 request, "http://example.com", "assertion_value" 175 ) 176 177 # Check request call 178 verify_request_params( 179 request, {"grant_type": _client._JWT_GRANT_TYPE, "assertion": "assertion_value"} 180 ) 181 182 # Check result 183 assert token == "token" 184 assert expiry == utcnow() + datetime.timedelta(seconds=500) 185 assert extra_data["extra"] == "data" 186 187 188def test_jwt_grant_no_access_token(): 189 request = make_request( 190 { 191 # No access token. 192 "expires_in": 500, 193 "extra": "data", 194 } 195 ) 196 197 with pytest.raises(exceptions.RefreshError): 198 _client.jwt_grant(request, "http://example.com", "assertion_value") 199 200 201def test_id_token_jwt_grant(): 202 now = _helpers.utcnow() 203 id_token_expiry = _helpers.datetime_to_secs(now) 204 id_token = jwt.encode(SIGNER, {"exp": id_token_expiry}).decode("utf-8") 205 request = make_request({"id_token": id_token, "extra": "data"}) 206 207 token, expiry, extra_data = _client.id_token_jwt_grant( 208 request, "http://example.com", "assertion_value" 209 ) 210 211 # Check request call 212 verify_request_params( 213 request, {"grant_type": _client._JWT_GRANT_TYPE, "assertion": "assertion_value"} 214 ) 215 216 # Check result 217 assert token == id_token 218 # JWT does not store microseconds 219 now = now.replace(microsecond=0) 220 assert expiry == now 221 assert extra_data["extra"] == "data" 222 223 224def test_id_token_jwt_grant_no_access_token(): 225 request = make_request( 226 { 227 # No access token. 228 "expires_in": 500, 229 "extra": "data", 230 } 231 ) 232 233 with pytest.raises(exceptions.RefreshError): 234 _client.id_token_jwt_grant(request, "http://example.com", "assertion_value") 235 236 237@mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min) 238def test_refresh_grant(unused_utcnow): 239 request = make_request( 240 { 241 "access_token": "token", 242 "refresh_token": "new_refresh_token", 243 "expires_in": 500, 244 "extra": "data", 245 } 246 ) 247 248 token, refresh_token, expiry, extra_data = _client.refresh_grant( 249 request, 250 "http://example.com", 251 "refresh_token", 252 "client_id", 253 "client_secret", 254 rapt_token="rapt_token", 255 ) 256 257 # Check request call 258 verify_request_params( 259 request, 260 { 261 "grant_type": _client._REFRESH_GRANT_TYPE, 262 "refresh_token": "refresh_token", 263 "client_id": "client_id", 264 "client_secret": "client_secret", 265 "rapt": "rapt_token", 266 }, 267 ) 268 269 # Check result 270 assert token == "token" 271 assert refresh_token == "new_refresh_token" 272 assert expiry == datetime.datetime.min + datetime.timedelta(seconds=500) 273 assert extra_data["extra"] == "data" 274 275 276@mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min) 277def test_refresh_grant_with_scopes(unused_utcnow): 278 request = make_request( 279 { 280 "access_token": "token", 281 "refresh_token": "new_refresh_token", 282 "expires_in": 500, 283 "extra": "data", 284 "scope": SCOPES_AS_STRING, 285 } 286 ) 287 288 token, refresh_token, expiry, extra_data = _client.refresh_grant( 289 request, 290 "http://example.com", 291 "refresh_token", 292 "client_id", 293 "client_secret", 294 SCOPES_AS_LIST, 295 ) 296 297 # Check request call. 298 verify_request_params( 299 request, 300 { 301 "grant_type": _client._REFRESH_GRANT_TYPE, 302 "refresh_token": "refresh_token", 303 "client_id": "client_id", 304 "client_secret": "client_secret", 305 "scope": SCOPES_AS_STRING, 306 }, 307 ) 308 309 # Check result. 310 assert token == "token" 311 assert refresh_token == "new_refresh_token" 312 assert expiry == datetime.datetime.min + datetime.timedelta(seconds=500) 313 assert extra_data["extra"] == "data" 314 315 316def test_refresh_grant_no_access_token(): 317 request = make_request( 318 { 319 # No access token. 320 "refresh_token": "new_refresh_token", 321 "expires_in": 500, 322 "extra": "data", 323 } 324 ) 325 326 with pytest.raises(exceptions.RefreshError): 327 _client.refresh_grant( 328 request, "http://example.com", "refresh_token", "client_id", "client_secret" 329 ) 330