xref: /aosp_15_r20/external/autotest/server/cros/network/telnet_helper.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Lint as: python2, python3
2# Copyright (c) 2018 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6import socket
7import telnetlib
8
9from autotest_lib.client.common_lib import error
10
11SHORT_TIMEOUT = 2
12LONG_TIMEOUT = 30
13
14def _ascii_string(uc_string):
15    """Returns ascii string given unicode string.
16
17    @param uc_string: Unicode string
18
19    """
20    return str(uc_string).encode('ASCII')
21
22class TelnetHelper(object):
23    """Helper class to run basic string commands on a telnet host."""
24
25    def __init__(self, tx_cmd_separator="\n", rx_cmd_separator="\n", prompt=""):
26        self._tn = None
27
28        self._tx_cmd_separator = tx_cmd_separator
29        self._rx_cmd_separator = rx_cmd_separator
30        self._prompt = prompt
31
32    def open(self, hostname, port=22):
33        """Opens telnet connection to attenuator host.
34
35        @param hostname: Valid hostname
36        @param port: Optional port number, defaults to 22
37
38        """
39        if self._tn:
40            self._tn.close()
41
42        self._tn = telnetlib.Telnet()
43
44        try:
45            self._tn.open(hostname, port, LONG_TIMEOUT)
46        except socket.timeout as e:
47            raise error.TestError("Timed out while opening telnet connection")
48
49    def is_open(self):
50        """Returns true if telnet connection is open."""
51        return bool(self._tn)
52
53    def close(self):
54        """Closes telnet connection."""
55        if self._tn:
56            self._tn.close()
57            self._tn = None
58
59    def cmd(self, cmd_str, wait_ret=True):
60        """Run command on attenuator.
61
62        @param cmd_str: Command to run
63        @param wait_ret: Wait for command output or not
64        @returns command output
65        """
66        if not isinstance(cmd_str, str):
67            raise error.TestError("Invalid command string %s" % cmd_str)
68
69        if not self.is_open():
70            raise error.TestError("Telnet connection not open for commands")
71
72        cmd_str.strip(self._tx_cmd_separator)
73        try:
74            self._tn.read_until(_ascii_string(self._prompt), SHORT_TIMEOUT)
75        except EOFError as e:
76            raise error.TestError("Connection closed. EOFError (%s)" % e)
77
78        try:
79            self._tn.write(_ascii_string(cmd_str + self._tx_cmd_separator))
80        except socket.error as e:
81            raise error.TestError("Connection closed. Socket error (%s)." % e)
82
83        if wait_ret is False:
84            return None
85
86        try:
87            match_channel_idx, _, ret_text = self._tn.expect(
88                [_ascii_string("\S+" + self._rx_cmd_separator)], SHORT_TIMEOUT)
89        except EOFError as e:
90            raise error.TestError("Connection closed. EOFError (%s)" % e)
91
92        if match_channel_idx == -1:
93            raise error.TestError("Telnet command failed to return valid data. "
94                                  "Data returned: %s" % ret_text)
95
96        ret_text = ret_text.decode()
97        ret_text = ret_text.strip()
98
99        return ret_text