1#!/usr/bin/env python 2# 3# This file is part of pySerial - Cross platform serial port support for Python 4# (C) 2016 Chris Liechti <[email protected]> 5# 6# SPDX-License-Identifier: BSD-3-Clause 7"""\ 8Test asyncio related functionality. 9""" 10 11import os 12import unittest 13import serial 14 15# on which port should the tests be performed: 16PORT = '/dev/ttyUSB0' 17 18try: 19 import asyncio 20 import serial.aio 21except (ImportError, SyntaxError): 22 # not compatible with python 2.x 23 pass 24else: 25 26 @unittest.skipIf(os.name != 'posix', "asyncio not supported on platform") 27 class Test_asyncio(unittest.TestCase): 28 """Test asyncio related functionality""" 29 30 def setUp(self): 31 self.loop = asyncio.get_event_loop() 32 # create a closed serial port 33 34 def tearDown(self): 35 self.loop.close() 36 37 def test_asyncio(self): 38 TEXT = b'hello world\n' 39 received = [] 40 actions = [] 41 42 class Output(asyncio.Protocol): 43 def connection_made(self, transport): 44 self.transport = transport 45 actions.append('open') 46 transport.serial.rts = False 47 transport.write(TEXT) 48 49 def data_received(self, data): 50 #~ print('data received', repr(data)) 51 received.append(data) 52 if b'\n' in data: 53 self.transport.close() 54 55 def connection_lost(self, exc): 56 actions.append('close') 57 asyncio.get_event_loop().stop() 58 59 def pause_writing(self): 60 actions.append('pause') 61 print(self.transport.get_write_buffer_size()) 62 63 def resume_writing(self): 64 actions.append('resume') 65 print(self.transport.get_write_buffer_size()) 66 67 coro = serial.aio.create_serial_connection(self.loop, Output, PORT, baudrate=115200) 68 self.loop.run_until_complete(coro) 69 self.loop.run_forever() 70 self.assertEqual(b''.join(received), TEXT) 71 self.assertEqual(actions, ['open', 'close']) 72 73 74if __name__ == '__main__': 75 import sys 76 sys.stdout.write(__doc__) 77 if len(sys.argv) > 1: 78 PORT = sys.argv[1] 79 sys.stdout.write("Testing port: {!r}\n".format(PORT)) 80 sys.argv[1:] = ['-v'] 81 # When this module is executed from the command-line, it runs all its tests 82 unittest.main() 83