1# Lint as: python2, python3 2# Copyright (c) 2018 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6import socket 7import telnetlib 8 9from autotest_lib.client.common_lib import error 10 11SHORT_TIMEOUT = 2 12LONG_TIMEOUT = 30 13 14def _ascii_string(uc_string): 15 """Returns ascii string given unicode string. 16 17 @param uc_string: Unicode string 18 19 """ 20 return str(uc_string).encode('ASCII') 21 22class TelnetHelper(object): 23 """Helper class to run basic string commands on a telnet host.""" 24 25 def __init__(self, tx_cmd_separator="\n", rx_cmd_separator="\n", prompt=""): 26 self._tn = None 27 28 self._tx_cmd_separator = tx_cmd_separator 29 self._rx_cmd_separator = rx_cmd_separator 30 self._prompt = prompt 31 32 def open(self, hostname, port=22): 33 """Opens telnet connection to attenuator host. 34 35 @param hostname: Valid hostname 36 @param port: Optional port number, defaults to 22 37 38 """ 39 if self._tn: 40 self._tn.close() 41 42 self._tn = telnetlib.Telnet() 43 44 try: 45 self._tn.open(hostname, port, LONG_TIMEOUT) 46 except socket.timeout as e: 47 raise error.TestError("Timed out while opening telnet connection") 48 49 def is_open(self): 50 """Returns true if telnet connection is open.""" 51 return bool(self._tn) 52 53 def close(self): 54 """Closes telnet connection.""" 55 if self._tn: 56 self._tn.close() 57 self._tn = None 58 59 def cmd(self, cmd_str, wait_ret=True): 60 """Run command on attenuator. 61 62 @param cmd_str: Command to run 63 @param wait_ret: Wait for command output or not 64 @returns command output 65 """ 66 if not isinstance(cmd_str, str): 67 raise error.TestError("Invalid command string %s" % cmd_str) 68 69 if not self.is_open(): 70 raise error.TestError("Telnet connection not open for commands") 71 72 cmd_str.strip(self._tx_cmd_separator) 73 try: 74 self._tn.read_until(_ascii_string(self._prompt), SHORT_TIMEOUT) 75 except EOFError as e: 76 raise error.TestError("Connection closed. EOFError (%s)" % e) 77 78 try: 79 self._tn.write(_ascii_string(cmd_str + self._tx_cmd_separator)) 80 except socket.error as e: 81 raise error.TestError("Connection closed. Socket error (%s)." % e) 82 83 if wait_ret is False: 84 return None 85 86 try: 87 match_channel_idx, _, ret_text = self._tn.expect( 88 [_ascii_string("\S+" + self._rx_cmd_separator)], SHORT_TIMEOUT) 89 except EOFError as e: 90 raise error.TestError("Connection closed. EOFError (%s)" % e) 91 92 if match_channel_idx == -1: 93 raise error.TestError("Telnet command failed to return valid data. " 94 "Data returned: %s" % ret_text) 95 96 ret_text = ret_text.decode() 97 ret_text = ret_text.strip() 98 99 return ret_text