1#!/usr/bin/env python
2#
3# This file is part of pySerial - Cross platform serial port support for Python
4# (C) 2016 Chris Liechti <[email protected]>
5#
6# SPDX-License-Identifier:    BSD-3-Clause
7"""\
8Test asyncio related functionality.
9"""
10
11import os
12import unittest
13import serial
14
15# on which port should the tests be performed:
16PORT = '/dev/ttyUSB0'
17
18try:
19    import asyncio
20    import serial.aio
21except (ImportError, SyntaxError):
22    # not compatible with python 2.x
23    pass
24else:
25
26    @unittest.skipIf(os.name != 'posix', "asyncio not supported on platform")
27    class Test_asyncio(unittest.TestCase):
28        """Test asyncio related functionality"""
29
30        def setUp(self):
31            self.loop = asyncio.get_event_loop()
32            # create a closed serial port
33
34        def tearDown(self):
35            self.loop.close()
36
37        def test_asyncio(self):
38            TEXT = b'hello world\n'
39            received = []
40            actions = []
41
42            class Output(asyncio.Protocol):
43                def connection_made(self, transport):
44                    self.transport = transport
45                    actions.append('open')
46                    transport.serial.rts = False
47                    transport.write(TEXT)
48
49                def data_received(self, data):
50                    #~ print('data received', repr(data))
51                    received.append(data)
52                    if b'\n' in data:
53                        self.transport.close()
54
55                def connection_lost(self, exc):
56                    actions.append('close')
57                    asyncio.get_event_loop().stop()
58
59                def pause_writing(self):
60                    actions.append('pause')
61                    print(self.transport.get_write_buffer_size())
62
63                def resume_writing(self):
64                    actions.append('resume')
65                    print(self.transport.get_write_buffer_size())
66
67            coro = serial.aio.create_serial_connection(self.loop, Output, PORT, baudrate=115200)
68            self.loop.run_until_complete(coro)
69            self.loop.run_forever()
70            self.assertEqual(b''.join(received), TEXT)
71            self.assertEqual(actions, ['open', 'close'])
72
73
74if __name__ == '__main__':
75    import sys
76    sys.stdout.write(__doc__)
77    if len(sys.argv) > 1:
78        PORT = sys.argv[1]
79    sys.stdout.write("Testing port: {!r}\n".format(PORT))
80    sys.argv[1:] = ['-v']
81    # When this module is executed from the command-line, it runs all its tests
82    unittest.main()
83