# Copyright 2021 The Pigweed Authors # # Licensed under the Apache License, Version 2.0 (the "License"); you may not # use this file except in compliance with the License. You may obtain a copy of # the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations under # the License. """Tests for pw_console.console_app""" import asyncio import builtins import inspect import io import sys import threading import unittest from unittest.mock import MagicMock, call from prompt_toolkit.application import create_app_session from prompt_toolkit.output import ( ColorDepth, # inclusive-language: ignore DummyOutput as FakeOutput, ) from pw_console.console_app import ConsoleApp from pw_console.console_prefs import ConsolePrefs from pw_console.repl_pane import ReplPane from pw_console.pw_ptpython_repl import PwPtPythonRepl _PYTHON_3_8 = sys.version_info >= ( 3, 8, ) if _PYTHON_3_8: from unittest import IsolatedAsyncioTestCase # type: ignore # pylint: disable=no-name-in-module class TestReplPane(IsolatedAsyncioTestCase): """Tests for ReplPane.""" maxDiff = None def test_repl_code_return_values(self) -> None: """Test stdout, return values, and exceptions can be returned from running user repl code.""" app = MagicMock() global_vars = { '__name__': '__main__', '__package__': None, '__doc__': None, '__builtins__': builtins, } pw_ptpython_repl = PwPtPythonRepl( get_globals=lambda: global_vars, get_locals=lambda: global_vars, color_depth=ColorDepth.DEPTH_8_BIT, ) repl_pane = ReplPane( application=app, python_repl=pw_ptpython_repl, ) # Check pw_ptpython_repl has a reference to the parent repl_pane. self.assertEqual(repl_pane, pw_ptpython_repl.repl_pane) # Define a function, should return nothing. code = inspect.cleandoc( """ def run(): print('The answer is ', end='') return 1+1+4+16+20 """ ) temp_stdout = io.StringIO() temp_stderr = io.StringIO() # pylint: disable=protected-access result = asyncio.run( pw_ptpython_repl._run_user_code(code, temp_stdout, temp_stderr) ) self.assertEqual( result, {'stdout': '', 'stderr': '', 'result': None} ) temp_stdout = io.StringIO() temp_stderr = io.StringIO() # Check stdout and return value result = asyncio.run( pw_ptpython_repl._run_user_code( 'run()', temp_stdout, temp_stderr ) ) self.assertEqual( result, {'stdout': 'The answer is ', 'stderr': '', 'result': 42} ) temp_stdout = io.StringIO() temp_stderr = io.StringIO() # Check for repl exception result = asyncio.run( pw_ptpython_repl._run_user_code( 'return "blah"', temp_stdout, temp_stderr ) ) self.assertIn( "SyntaxError: 'return' outside function", pw_ptpython_repl._last_exception, # type: ignore ) async def test_user_thread(self) -> None: """Test user code thread.""" with create_app_session(output=FakeOutput()): # Setup Mocks prefs = ConsolePrefs( project_file=False, project_user_file=False, user_file=False ) prefs.set_code_theme('default') app = ConsoleApp( color_depth=ColorDepth.DEPTH_8_BIT, prefs=prefs ) app.start_user_code_thread() pw_ptpython_repl = app.pw_ptpython_repl repl_pane = app.repl_pane # Mock update_output_buffer to track number of update calls repl_pane.update_output_buffer = MagicMock( # type: ignore wraps=repl_pane.update_output_buffer ) # Mock complete callback pw_ptpython_repl.user_code_complete_callback = ( # type: ignore MagicMock( wraps=pw_ptpython_repl.user_code_complete_callback ) ) # Repl done flag for tests user_code_done = threading.Event() # Run some code code = inspect.cleandoc( """ import time def run(): for i in range(2): time.sleep(0.5) print(i) print('The answer is ', end='') return 1+1+4+16+20 """ ) input_buffer = MagicMock(text=code) # pylint: disable=protected-access pw_ptpython_repl._accept_handler(input_buffer) # pylint: enable=protected-access # Get last executed code object. user_code1 = repl_pane.executed_code[-1] # Wait for repl code to finish. user_code1.future.add_done_callback( lambda future: user_code_done.set() ) # Wait for stdout monitoring to complete. if user_code1.stdout_check_task: await user_code1.stdout_check_task # Wait for test done callback. user_code_done.wait() # Check user_code1 results # NOTE: Avoid using assert_has_calls. Thread timing can make the # test flaky. expected_calls = [ # Initial exec start call('pw_ptpython_repl._accept_handler'), # Code finishes call('repl_pane.append_result_to_executed_code'), # Complete callback call('pw_ptpython_repl.user_code_complete_callback'), ] for expected_call in expected_calls: self.assertIn( expected_call, repl_pane.update_output_buffer.mock_calls ) user_code_complete_callback = ( pw_ptpython_repl.user_code_complete_callback ) user_code_complete_callback.assert_called_once() self.assertIsNotNone(user_code1) self.assertTrue(user_code1.future.done()) self.assertEqual(user_code1.input, code) self.assertEqual(user_code1.output, None) # stdout / stderr may be '' or None self.assertFalse(user_code1.stdout) self.assertFalse(user_code1.stderr) # Reset mocks user_code_done.clear() pw_ptpython_repl.user_code_complete_callback.reset_mock() repl_pane.update_output_buffer.reset_mock() # Run some code input_buffer = MagicMock(text='run()') # pylint: disable=protected-access pw_ptpython_repl._accept_handler(input_buffer) # pylint: enable=protected-access # Get last executed code object. user_code2 = repl_pane.executed_code[-1] # Wait for repl code to finish. user_code2.future.add_done_callback( lambda future: user_code_done.set() ) # Wait for stdout monitoring to complete. if user_code2.stdout_check_task: await user_code2.stdout_check_task # Wait for test done callback. user_code_done.wait() # Check user_code2 results # NOTE: Avoid using assert_has_calls. Thread timing can make the # test flaky. expected_calls = [ # Initial exec start call('pw_ptpython_repl._accept_handler'), # Periodic checks, should be a total of 4: # Code should take 1.0 second to run. # Periodic checks every 0.3 seconds # 1.0 / 0.3 = 3.33 (4) checks call('repl_pane.periodic_check'), call('repl_pane.periodic_check'), call('repl_pane.periodic_check'), # Code finishes call('repl_pane.append_result_to_executed_code'), # Complete callback call('pw_ptpython_repl.user_code_complete_callback'), # Final periodic check call('repl_pane.periodic_check'), ] for expected_call in expected_calls: self.assertIn( expected_call, repl_pane.update_output_buffer.mock_calls ) # pylint: disable=line-too-long pw_ptpython_repl.user_code_complete_callback.assert_called_once() # pylint: enable=line-too-long self.assertIsNotNone(user_code2) self.assertTrue(user_code2.future.done()) self.assertEqual(user_code2.input, 'run()') self.assertEqual(user_code2.output, '42') self.assertEqual(user_code2.stdout, '0\n1\nThe answer is ') self.assertFalse(user_code2.stderr) # Reset mocks user_code_done.clear() pw_ptpython_repl.user_code_complete_callback.reset_mock() repl_pane.update_output_buffer.reset_mock() if __name__ == '__main__': unittest.main()