1# Copyright 2021 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import copy 16 17import mock 18import pytest 19 20from google.auth import exceptions 21from google.oauth2 import reauth 22 23 24MOCK_REQUEST = mock.Mock() 25CHALLENGES_RESPONSE_TEMPLATE = { 26 "status": "CHALLENGE_REQUIRED", 27 "sessionId": "123", 28 "challenges": [ 29 { 30 "status": "READY", 31 "challengeId": 1, 32 "challengeType": "PASSWORD", 33 "securityKey": {}, 34 } 35 ], 36} 37CHALLENGES_RESPONSE_AUTHENTICATED = { 38 "status": "AUTHENTICATED", 39 "sessionId": "123", 40 "encodedProofOfReauthToken": "new_rapt_token", 41} 42 43 44class MockChallenge(object): 45 def __init__(self, name, locally_eligible, challenge_input): 46 self.name = name 47 self.is_locally_eligible = locally_eligible 48 self.challenge_input = challenge_input 49 50 def obtain_challenge_input(self, metadata): 51 return self.challenge_input 52 53 54def test_is_interactive(): 55 with mock.patch("sys.stdin.isatty", return_value=True): 56 assert reauth.is_interactive() 57 58 59def test__get_challenges(): 60 with mock.patch( 61 "google.oauth2._client._token_endpoint_request" 62 ) as mock_token_endpoint_request: 63 reauth._get_challenges(MOCK_REQUEST, ["SAML"], "token") 64 mock_token_endpoint_request.assert_called_with( 65 MOCK_REQUEST, 66 reauth._REAUTH_API + ":start", 67 {"supportedChallengeTypes": ["SAML"]}, 68 access_token="token", 69 use_json=True, 70 ) 71 72 73def test__get_challenges_with_scopes(): 74 with mock.patch( 75 "google.oauth2._client._token_endpoint_request" 76 ) as mock_token_endpoint_request: 77 reauth._get_challenges( 78 MOCK_REQUEST, ["SAML"], "token", requested_scopes=["scope"] 79 ) 80 mock_token_endpoint_request.assert_called_with( 81 MOCK_REQUEST, 82 reauth._REAUTH_API + ":start", 83 { 84 "supportedChallengeTypes": ["SAML"], 85 "oauthScopesForDomainPolicyLookup": ["scope"], 86 }, 87 access_token="token", 88 use_json=True, 89 ) 90 91 92def test__send_challenge_result(): 93 with mock.patch( 94 "google.oauth2._client._token_endpoint_request" 95 ) as mock_token_endpoint_request: 96 reauth._send_challenge_result( 97 MOCK_REQUEST, "123", "1", {"credential": "password"}, "token" 98 ) 99 mock_token_endpoint_request.assert_called_with( 100 MOCK_REQUEST, 101 reauth._REAUTH_API + "/123:continue", 102 { 103 "sessionId": "123", 104 "challengeId": "1", 105 "action": "RESPOND", 106 "proposalResponse": {"credential": "password"}, 107 }, 108 access_token="token", 109 use_json=True, 110 ) 111 112 113def test__run_next_challenge_not_ready(): 114 challenges_response = copy.deepcopy(CHALLENGES_RESPONSE_TEMPLATE) 115 challenges_response["challenges"][0]["status"] = "STATUS_UNSPECIFIED" 116 assert ( 117 reauth._run_next_challenge(challenges_response, MOCK_REQUEST, "token") is None 118 ) 119 120 121def test__run_next_challenge_not_supported(): 122 challenges_response = copy.deepcopy(CHALLENGES_RESPONSE_TEMPLATE) 123 challenges_response["challenges"][0]["challengeType"] = "CHALLENGE_TYPE_UNSPECIFIED" 124 with pytest.raises(exceptions.ReauthFailError) as excinfo: 125 reauth._run_next_challenge(challenges_response, MOCK_REQUEST, "token") 126 assert excinfo.match(r"Unsupported challenge type CHALLENGE_TYPE_UNSPECIFIED") 127 128 129def test__run_next_challenge_not_locally_eligible(): 130 mock_challenge = MockChallenge("PASSWORD", False, "challenge_input") 131 with mock.patch( 132 "google.oauth2.challenges.AVAILABLE_CHALLENGES", {"PASSWORD": mock_challenge} 133 ): 134 with pytest.raises(exceptions.ReauthFailError) as excinfo: 135 reauth._run_next_challenge( 136 CHALLENGES_RESPONSE_TEMPLATE, MOCK_REQUEST, "token" 137 ) 138 assert excinfo.match(r"Challenge PASSWORD is not locally eligible") 139 140 141def test__run_next_challenge_no_challenge_input(): 142 mock_challenge = MockChallenge("PASSWORD", True, None) 143 with mock.patch( 144 "google.oauth2.challenges.AVAILABLE_CHALLENGES", {"PASSWORD": mock_challenge} 145 ): 146 assert ( 147 reauth._run_next_challenge( 148 CHALLENGES_RESPONSE_TEMPLATE, MOCK_REQUEST, "token" 149 ) 150 is None 151 ) 152 153 154def test__run_next_challenge_success(): 155 mock_challenge = MockChallenge("PASSWORD", True, {"credential": "password"}) 156 with mock.patch( 157 "google.oauth2.challenges.AVAILABLE_CHALLENGES", {"PASSWORD": mock_challenge} 158 ): 159 with mock.patch( 160 "google.oauth2.reauth._send_challenge_result" 161 ) as mock_send_challenge_result: 162 reauth._run_next_challenge( 163 CHALLENGES_RESPONSE_TEMPLATE, MOCK_REQUEST, "token" 164 ) 165 mock_send_challenge_result.assert_called_with( 166 MOCK_REQUEST, "123", 1, {"credential": "password"}, "token" 167 ) 168 169 170def test__obtain_rapt_authenticated(): 171 with mock.patch( 172 "google.oauth2.reauth._get_challenges", 173 return_value=CHALLENGES_RESPONSE_AUTHENTICATED, 174 ): 175 assert reauth._obtain_rapt(MOCK_REQUEST, "token", None) == "new_rapt_token" 176 177 178def test__obtain_rapt_authenticated_after_run_next_challenge(): 179 with mock.patch( 180 "google.oauth2.reauth._get_challenges", 181 return_value=CHALLENGES_RESPONSE_TEMPLATE, 182 ): 183 with mock.patch( 184 "google.oauth2.reauth._run_next_challenge", 185 side_effect=[ 186 CHALLENGES_RESPONSE_TEMPLATE, 187 CHALLENGES_RESPONSE_AUTHENTICATED, 188 ], 189 ): 190 with mock.patch("google.oauth2.reauth.is_interactive", return_value=True): 191 assert ( 192 reauth._obtain_rapt(MOCK_REQUEST, "token", None) == "new_rapt_token" 193 ) 194 195 196def test__obtain_rapt_unsupported_status(): 197 challenges_response = copy.deepcopy(CHALLENGES_RESPONSE_TEMPLATE) 198 challenges_response["status"] = "STATUS_UNSPECIFIED" 199 with mock.patch( 200 "google.oauth2.reauth._get_challenges", return_value=challenges_response 201 ): 202 with pytest.raises(exceptions.ReauthFailError) as excinfo: 203 reauth._obtain_rapt(MOCK_REQUEST, "token", None) 204 assert excinfo.match(r"API error: STATUS_UNSPECIFIED") 205 206 207def test__obtain_rapt_not_interactive(): 208 with mock.patch( 209 "google.oauth2.reauth._get_challenges", 210 return_value=CHALLENGES_RESPONSE_TEMPLATE, 211 ): 212 with mock.patch("google.oauth2.reauth.is_interactive", return_value=False): 213 with pytest.raises(exceptions.ReauthFailError) as excinfo: 214 reauth._obtain_rapt(MOCK_REQUEST, "token", None) 215 assert excinfo.match(r"not in an interactive session") 216 217 218def test__obtain_rapt_not_authenticated(): 219 with mock.patch( 220 "google.oauth2.reauth._get_challenges", 221 return_value=CHALLENGES_RESPONSE_TEMPLATE, 222 ): 223 with mock.patch("google.oauth2.reauth.RUN_CHALLENGE_RETRY_LIMIT", 0): 224 with pytest.raises(exceptions.ReauthFailError) as excinfo: 225 reauth._obtain_rapt(MOCK_REQUEST, "token", None) 226 assert excinfo.match(r"Reauthentication failed") 227 228 229def test_get_rapt_token(): 230 with mock.patch( 231 "google.oauth2._client.refresh_grant", return_value=("token", None, None, None) 232 ) as mock_refresh_grant: 233 with mock.patch( 234 "google.oauth2.reauth._obtain_rapt", return_value="new_rapt_token" 235 ) as mock_obtain_rapt: 236 assert ( 237 reauth.get_rapt_token( 238 MOCK_REQUEST, 239 "client_id", 240 "client_secret", 241 "refresh_token", 242 "token_uri", 243 ) 244 == "new_rapt_token" 245 ) 246 mock_refresh_grant.assert_called_with( 247 request=MOCK_REQUEST, 248 client_id="client_id", 249 client_secret="client_secret", 250 refresh_token="refresh_token", 251 token_uri="token_uri", 252 scopes=[reauth._REAUTH_SCOPE], 253 ) 254 mock_obtain_rapt.assert_called_with( 255 MOCK_REQUEST, "token", requested_scopes=None 256 ) 257 258 259def test_refresh_grant_failed(): 260 with mock.patch( 261 "google.oauth2._client._token_endpoint_request_no_throw" 262 ) as mock_token_request: 263 mock_token_request.return_value = (False, {"error": "Bad request"}) 264 with pytest.raises(exceptions.RefreshError) as excinfo: 265 reauth.refresh_grant( 266 MOCK_REQUEST, 267 "token_uri", 268 "refresh_token", 269 "client_id", 270 "client_secret", 271 scopes=["foo", "bar"], 272 rapt_token="rapt_token", 273 enable_reauth_refresh=True, 274 ) 275 assert excinfo.match(r"Bad request") 276 mock_token_request.assert_called_with( 277 MOCK_REQUEST, 278 "token_uri", 279 { 280 "grant_type": "refresh_token", 281 "client_id": "client_id", 282 "client_secret": "client_secret", 283 "refresh_token": "refresh_token", 284 "scope": "foo bar", 285 "rapt": "rapt_token", 286 }, 287 ) 288 289 290def test_refresh_grant_success(): 291 with mock.patch( 292 "google.oauth2._client._token_endpoint_request_no_throw" 293 ) as mock_token_request: 294 mock_token_request.side_effect = [ 295 (False, {"error": "invalid_grant", "error_subtype": "rapt_required"}), 296 (True, {"access_token": "access_token"}), 297 ] 298 with mock.patch( 299 "google.oauth2.reauth.get_rapt_token", return_value="new_rapt_token" 300 ): 301 assert reauth.refresh_grant( 302 MOCK_REQUEST, 303 "token_uri", 304 "refresh_token", 305 "client_id", 306 "client_secret", 307 enable_reauth_refresh=True, 308 ) == ( 309 "access_token", 310 "refresh_token", 311 None, 312 {"access_token": "access_token"}, 313 "new_rapt_token", 314 ) 315 316 317def test_refresh_grant_reauth_refresh_disabled(): 318 with mock.patch( 319 "google.oauth2._client._token_endpoint_request_no_throw" 320 ) as mock_token_request: 321 mock_token_request.side_effect = [ 322 (False, {"error": "invalid_grant", "error_subtype": "rapt_required"}), 323 (True, {"access_token": "access_token"}), 324 ] 325 with pytest.raises(exceptions.RefreshError) as excinfo: 326 reauth.refresh_grant( 327 MOCK_REQUEST, "token_uri", "refresh_token", "client_id", "client_secret" 328 ) 329 assert excinfo.match(r"Reauthentication is needed") 330