1# Copyright 2021 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import copy
16
17import mock
18import pytest
19
20from google.auth import exceptions
21from google.oauth2 import reauth
22
23
24MOCK_REQUEST = mock.Mock()
25CHALLENGES_RESPONSE_TEMPLATE = {
26    "status": "CHALLENGE_REQUIRED",
27    "sessionId": "123",
28    "challenges": [
29        {
30            "status": "READY",
31            "challengeId": 1,
32            "challengeType": "PASSWORD",
33            "securityKey": {},
34        }
35    ],
36}
37CHALLENGES_RESPONSE_AUTHENTICATED = {
38    "status": "AUTHENTICATED",
39    "sessionId": "123",
40    "encodedProofOfReauthToken": "new_rapt_token",
41}
42
43
44class MockChallenge(object):
45    def __init__(self, name, locally_eligible, challenge_input):
46        self.name = name
47        self.is_locally_eligible = locally_eligible
48        self.challenge_input = challenge_input
49
50    def obtain_challenge_input(self, metadata):
51        return self.challenge_input
52
53
54def test_is_interactive():
55    with mock.patch("sys.stdin.isatty", return_value=True):
56        assert reauth.is_interactive()
57
58
59def test__get_challenges():
60    with mock.patch(
61        "google.oauth2._client._token_endpoint_request"
62    ) as mock_token_endpoint_request:
63        reauth._get_challenges(MOCK_REQUEST, ["SAML"], "token")
64        mock_token_endpoint_request.assert_called_with(
65            MOCK_REQUEST,
66            reauth._REAUTH_API + ":start",
67            {"supportedChallengeTypes": ["SAML"]},
68            access_token="token",
69            use_json=True,
70        )
71
72
73def test__get_challenges_with_scopes():
74    with mock.patch(
75        "google.oauth2._client._token_endpoint_request"
76    ) as mock_token_endpoint_request:
77        reauth._get_challenges(
78            MOCK_REQUEST, ["SAML"], "token", requested_scopes=["scope"]
79        )
80        mock_token_endpoint_request.assert_called_with(
81            MOCK_REQUEST,
82            reauth._REAUTH_API + ":start",
83            {
84                "supportedChallengeTypes": ["SAML"],
85                "oauthScopesForDomainPolicyLookup": ["scope"],
86            },
87            access_token="token",
88            use_json=True,
89        )
90
91
92def test__send_challenge_result():
93    with mock.patch(
94        "google.oauth2._client._token_endpoint_request"
95    ) as mock_token_endpoint_request:
96        reauth._send_challenge_result(
97            MOCK_REQUEST, "123", "1", {"credential": "password"}, "token"
98        )
99        mock_token_endpoint_request.assert_called_with(
100            MOCK_REQUEST,
101            reauth._REAUTH_API + "/123:continue",
102            {
103                "sessionId": "123",
104                "challengeId": "1",
105                "action": "RESPOND",
106                "proposalResponse": {"credential": "password"},
107            },
108            access_token="token",
109            use_json=True,
110        )
111
112
113def test__run_next_challenge_not_ready():
114    challenges_response = copy.deepcopy(CHALLENGES_RESPONSE_TEMPLATE)
115    challenges_response["challenges"][0]["status"] = "STATUS_UNSPECIFIED"
116    assert (
117        reauth._run_next_challenge(challenges_response, MOCK_REQUEST, "token") is None
118    )
119
120
121def test__run_next_challenge_not_supported():
122    challenges_response = copy.deepcopy(CHALLENGES_RESPONSE_TEMPLATE)
123    challenges_response["challenges"][0]["challengeType"] = "CHALLENGE_TYPE_UNSPECIFIED"
124    with pytest.raises(exceptions.ReauthFailError) as excinfo:
125        reauth._run_next_challenge(challenges_response, MOCK_REQUEST, "token")
126    assert excinfo.match(r"Unsupported challenge type CHALLENGE_TYPE_UNSPECIFIED")
127
128
129def test__run_next_challenge_not_locally_eligible():
130    mock_challenge = MockChallenge("PASSWORD", False, "challenge_input")
131    with mock.patch(
132        "google.oauth2.challenges.AVAILABLE_CHALLENGES", {"PASSWORD": mock_challenge}
133    ):
134        with pytest.raises(exceptions.ReauthFailError) as excinfo:
135            reauth._run_next_challenge(
136                CHALLENGES_RESPONSE_TEMPLATE, MOCK_REQUEST, "token"
137            )
138        assert excinfo.match(r"Challenge PASSWORD is not locally eligible")
139
140
141def test__run_next_challenge_no_challenge_input():
142    mock_challenge = MockChallenge("PASSWORD", True, None)
143    with mock.patch(
144        "google.oauth2.challenges.AVAILABLE_CHALLENGES", {"PASSWORD": mock_challenge}
145    ):
146        assert (
147            reauth._run_next_challenge(
148                CHALLENGES_RESPONSE_TEMPLATE, MOCK_REQUEST, "token"
149            )
150            is None
151        )
152
153
154def test__run_next_challenge_success():
155    mock_challenge = MockChallenge("PASSWORD", True, {"credential": "password"})
156    with mock.patch(
157        "google.oauth2.challenges.AVAILABLE_CHALLENGES", {"PASSWORD": mock_challenge}
158    ):
159        with mock.patch(
160            "google.oauth2.reauth._send_challenge_result"
161        ) as mock_send_challenge_result:
162            reauth._run_next_challenge(
163                CHALLENGES_RESPONSE_TEMPLATE, MOCK_REQUEST, "token"
164            )
165            mock_send_challenge_result.assert_called_with(
166                MOCK_REQUEST, "123", 1, {"credential": "password"}, "token"
167            )
168
169
170def test__obtain_rapt_authenticated():
171    with mock.patch(
172        "google.oauth2.reauth._get_challenges",
173        return_value=CHALLENGES_RESPONSE_AUTHENTICATED,
174    ):
175        assert reauth._obtain_rapt(MOCK_REQUEST, "token", None) == "new_rapt_token"
176
177
178def test__obtain_rapt_authenticated_after_run_next_challenge():
179    with mock.patch(
180        "google.oauth2.reauth._get_challenges",
181        return_value=CHALLENGES_RESPONSE_TEMPLATE,
182    ):
183        with mock.patch(
184            "google.oauth2.reauth._run_next_challenge",
185            side_effect=[
186                CHALLENGES_RESPONSE_TEMPLATE,
187                CHALLENGES_RESPONSE_AUTHENTICATED,
188            ],
189        ):
190            with mock.patch("google.oauth2.reauth.is_interactive", return_value=True):
191                assert (
192                    reauth._obtain_rapt(MOCK_REQUEST, "token", None) == "new_rapt_token"
193                )
194
195
196def test__obtain_rapt_unsupported_status():
197    challenges_response = copy.deepcopy(CHALLENGES_RESPONSE_TEMPLATE)
198    challenges_response["status"] = "STATUS_UNSPECIFIED"
199    with mock.patch(
200        "google.oauth2.reauth._get_challenges", return_value=challenges_response
201    ):
202        with pytest.raises(exceptions.ReauthFailError) as excinfo:
203            reauth._obtain_rapt(MOCK_REQUEST, "token", None)
204        assert excinfo.match(r"API error: STATUS_UNSPECIFIED")
205
206
207def test__obtain_rapt_not_interactive():
208    with mock.patch(
209        "google.oauth2.reauth._get_challenges",
210        return_value=CHALLENGES_RESPONSE_TEMPLATE,
211    ):
212        with mock.patch("google.oauth2.reauth.is_interactive", return_value=False):
213            with pytest.raises(exceptions.ReauthFailError) as excinfo:
214                reauth._obtain_rapt(MOCK_REQUEST, "token", None)
215            assert excinfo.match(r"not in an interactive session")
216
217
218def test__obtain_rapt_not_authenticated():
219    with mock.patch(
220        "google.oauth2.reauth._get_challenges",
221        return_value=CHALLENGES_RESPONSE_TEMPLATE,
222    ):
223        with mock.patch("google.oauth2.reauth.RUN_CHALLENGE_RETRY_LIMIT", 0):
224            with pytest.raises(exceptions.ReauthFailError) as excinfo:
225                reauth._obtain_rapt(MOCK_REQUEST, "token", None)
226            assert excinfo.match(r"Reauthentication failed")
227
228
229def test_get_rapt_token():
230    with mock.patch(
231        "google.oauth2._client.refresh_grant", return_value=("token", None, None, None)
232    ) as mock_refresh_grant:
233        with mock.patch(
234            "google.oauth2.reauth._obtain_rapt", return_value="new_rapt_token"
235        ) as mock_obtain_rapt:
236            assert (
237                reauth.get_rapt_token(
238                    MOCK_REQUEST,
239                    "client_id",
240                    "client_secret",
241                    "refresh_token",
242                    "token_uri",
243                )
244                == "new_rapt_token"
245            )
246            mock_refresh_grant.assert_called_with(
247                request=MOCK_REQUEST,
248                client_id="client_id",
249                client_secret="client_secret",
250                refresh_token="refresh_token",
251                token_uri="token_uri",
252                scopes=[reauth._REAUTH_SCOPE],
253            )
254            mock_obtain_rapt.assert_called_with(
255                MOCK_REQUEST, "token", requested_scopes=None
256            )
257
258
259def test_refresh_grant_failed():
260    with mock.patch(
261        "google.oauth2._client._token_endpoint_request_no_throw"
262    ) as mock_token_request:
263        mock_token_request.return_value = (False, {"error": "Bad request"})
264        with pytest.raises(exceptions.RefreshError) as excinfo:
265            reauth.refresh_grant(
266                MOCK_REQUEST,
267                "token_uri",
268                "refresh_token",
269                "client_id",
270                "client_secret",
271                scopes=["foo", "bar"],
272                rapt_token="rapt_token",
273                enable_reauth_refresh=True,
274            )
275        assert excinfo.match(r"Bad request")
276        mock_token_request.assert_called_with(
277            MOCK_REQUEST,
278            "token_uri",
279            {
280                "grant_type": "refresh_token",
281                "client_id": "client_id",
282                "client_secret": "client_secret",
283                "refresh_token": "refresh_token",
284                "scope": "foo bar",
285                "rapt": "rapt_token",
286            },
287        )
288
289
290def test_refresh_grant_success():
291    with mock.patch(
292        "google.oauth2._client._token_endpoint_request_no_throw"
293    ) as mock_token_request:
294        mock_token_request.side_effect = [
295            (False, {"error": "invalid_grant", "error_subtype": "rapt_required"}),
296            (True, {"access_token": "access_token"}),
297        ]
298        with mock.patch(
299            "google.oauth2.reauth.get_rapt_token", return_value="new_rapt_token"
300        ):
301            assert reauth.refresh_grant(
302                MOCK_REQUEST,
303                "token_uri",
304                "refresh_token",
305                "client_id",
306                "client_secret",
307                enable_reauth_refresh=True,
308            ) == (
309                "access_token",
310                "refresh_token",
311                None,
312                {"access_token": "access_token"},
313                "new_rapt_token",
314            )
315
316
317def test_refresh_grant_reauth_refresh_disabled():
318    with mock.patch(
319        "google.oauth2._client._token_endpoint_request_no_throw"
320    ) as mock_token_request:
321        mock_token_request.side_effect = [
322            (False, {"error": "invalid_grant", "error_subtype": "rapt_required"}),
323            (True, {"access_token": "access_token"}),
324        ]
325        with pytest.raises(exceptions.RefreshError) as excinfo:
326            reauth.refresh_grant(
327                MOCK_REQUEST, "token_uri", "refresh_token", "client_id", "client_secret"
328            )
329        assert excinfo.match(r"Reauthentication is needed")
330