xref: /aosp_15_r20/external/autotest/server/cros/resource_monitor_unittest.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Lint as: python2, python3
2# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6from __future__ import absolute_import
7from __future__ import division
8from __future__ import print_function
9
10import logging
11import unittest
12import re
13import csv
14import common
15import os
16
17
18from autotest_lib.server.cros import resource_monitor
19from autotest_lib.server.hosts import abstract_ssh
20from autotest_lib.server import utils
21import six
22from six.moves import map
23from six.moves import range
24
25class HostMock(abstract_ssh.AbstractSSHHost):
26    """Mocks a host object."""
27
28    TOP_PID = '1234'
29
30    def _initialize(self, test_env):
31        self.top_is_running = False
32
33        # Keep track of whether the top raw output file exists on the system,
34        # and if it does, where it is.
35        self.top_output_file_path = None
36
37        # Keep track of whether the raw top output file is currently being
38        # written to by top.
39        self.top_output_file_is_open = False
40        self.test_env = test_env
41
42
43    def get_file(self, src, dest):
44        pass
45
46
47    def called_unsupported_command(self, command):
48        """Raises assertion error when called.
49
50        @param command string the unsupported command called.
51
52        """
53        raise AssertionError(
54                "ResourceMonitor called unsupported command %s" % command)
55
56
57    def _process_top(self, cmd_args, cmd_line):
58        """Process top command.
59
60        @param cmd_args string_list of command line args.
61        @param cmd_line string the command to run.
62
63        """
64        self.test_env.assertFalse(self.top_is_running,
65                msg="Top must not already be running.")
66        self.test_env.assertFalse(self.top_output_file_is_open,
67                msg="The top output file should not be being written "
68                "to before top is started")
69        self.test_env.assertIsNone(self.top_output_file_path,
70                msg="The top output file should not exist"
71                "before top is started")
72        try:
73            self.redirect_index = cmd_args.index(">")
74            self.top_output_file_path = cmd_args[self.redirect_index + 1]
75        except (ValueError, IndexError):
76            self.called_unsupported_command(cmd_line)
77
78        self.top_is_running = True
79        self.top_output_file_is_open = True
80
81        return HostMock.TOP_PID
82
83
84    def _process_kill(self, cmd_args, cmd_line):
85        """Process kill command.
86
87        @param cmd_args string_list of command line args.
88        @param cmd_line string the command to run.
89
90        """
91        try:
92            if cmd_args[1].startswith('-'):
93                pid_to_kill = cmd_args[2]
94            else:
95                pid_to_kill = cmd_args[1]
96        except IndexError:
97            self.called_unsupported_command(cmd_line)
98
99        self.test_env.assertEqual(pid_to_kill, HostMock.TOP_PID,
100                msg="Wrong pid (%r) killed . Top pid is %r." % (pid_to_kill,
101                HostMock.TOP_PID))
102        self.test_env.assertTrue(self.top_is_running,
103                msg="Top must be running before we try to kill it")
104
105        self.top_is_running = False
106        self.top_output_file_is_open = False
107
108
109    def _process_rm(self, cmd_args, cmd_line):
110        """Process rm command.
111
112        @param cmd_args string list list of command line args.
113        @param cmd_line string the command to run.
114
115        """
116        try:
117            if cmd_args[1].startswith('-'):
118                file_to_rm = cmd_args[2]
119            else:
120                file_to_rm = cmd_args[1]
121        except IndexError:
122            self.called_unsupported_command(cmd_line)
123
124        self.test_env.assertEqual(file_to_rm, self.top_output_file_path,
125                msg="Tried to remove file that is not the top output file.")
126        self.test_env.assertFalse(self.top_output_file_is_open,
127                msg="Tried to remove top output file while top is still "
128                "writing to it.")
129        self.test_env.assertFalse(self.top_is_running,
130                msg="Top was still running when we tried to remove"
131                "the top output file.")
132        self.test_env.assertIsNotNone(self.top_output_file_path)
133
134        self.top_output_file_path = None
135
136
137    def _run_single_cmd(self, cmd_line, *args, **kwargs):
138        """Run a single command on host.
139
140        @param cmd_line command to run on the host.
141
142        """
143        # Make the input a little nicer.
144        cmd_line = cmd_line.strip()
145        cmd_line = re.sub(">", " > ", cmd_line)
146
147        cmd_args = re.split("\s+", cmd_line)
148        self.test_env.assertTrue(len(cmd_args) >= 1)
149        command = cmd_args[0]
150        if (command == "top"):
151            return self._process_top(cmd_args, cmd_line)
152        elif (command == "kill"):
153            return self._process_kill(cmd_args, cmd_line)
154        elif(command == "rm"):
155            return self._process_rm(cmd_args, cmd_line)
156        else:
157            logging.warning("Called unemulated command %r", cmd_line)
158            return None
159
160
161    def run(self, cmd_line, *args, **kwargs):
162        """Run command(s) on host.
163
164        @param cmd_line command to run on the host.
165        @return CmdResult object.
166
167        """
168        cmds = re.split("&&", cmd_line)
169        for cmd in cmds:
170            self._run_single_cmd(cmd)
171        return utils.CmdResult(exit_status=0)
172
173
174    def run_background(self, cmd_line, *args, **kwargs):
175        """Run command in background on host.
176
177        @param cmd_line command to run on the host.
178
179        """
180        return self._run_single_cmd(cmd_line, args, kwargs)
181
182
183    def is_monitoring(self):
184        """Return true iff host is currently running top and writing output
185            to a file.
186        """
187        return self.top_is_running and self.top_output_file_is_open and (
188            self.top_output_file_path is not None)
189
190
191    def monitoring_stopped(self):
192        """Return true iff host is not running top and all top output files are
193            closed.
194        """
195        return not self.is_monitoring()
196
197
198class ResourceMonitorTest(unittest.TestCase):
199    """Tests the non-trivial functionality of ResourceMonitor."""
200
201    def setUp(self):
202        self.topoutfile = '/tmp/resourcemonitorunittest-1234'
203        self.monitor_period = 1
204        self.rm_conf = resource_monitor.ResourceMonitorConfig(
205                monitor_period=self.monitor_period,
206                rawresult_output_filename=self.topoutfile)
207        self.host = HostMock(self)
208
209
210    def test_normal_operation(self):
211        """Checks that normal (i.e. no exceptions, etc.) execution works."""
212        self.assertFalse(self.host.is_monitoring())
213        with resource_monitor.ResourceMonitor(self.host, self.rm_conf) as rm:
214            self.assertFalse(self.host.is_monitoring())
215            for i in range(3):
216                rm.start()
217                self.assertTrue(self.host.is_monitoring())
218                rm.stop()
219                self.assertTrue(self.host.monitoring_stopped())
220        self.assertTrue(self.host.monitoring_stopped())
221
222
223    def test_forgot_to_stop_monitor(self):
224        """Checks that resource monitor is cleaned up even if user forgets to
225            explicitly stop it.
226        """
227        self.assertFalse(self.host.is_monitoring())
228        with resource_monitor.ResourceMonitor(self.host, self.rm_conf) as rm:
229            self.assertFalse(self.host.is_monitoring())
230            rm.start()
231            self.assertTrue(self.host.is_monitoring())
232        self.assertTrue(self.host.monitoring_stopped())
233
234
235    def test_unexpected_interruption_while_monitoring(self):
236        """Checks that monitor is cleaned up upon unexpected interrupt."""
237        self.assertFalse(self.host.is_monitoring())
238
239        with resource_monitor.ResourceMonitor(self.host, self.rm_conf) as rm:
240            self.assertFalse(self.host.is_monitoring())
241            rm.start()
242            self.assertTrue(self.host.is_monitoring())
243            raise KeyboardInterrupt
244
245        self.assertTrue(self.host.monitoring_stopped())
246
247
248class ResourceMonitorResultTest(unittest.TestCase):
249    """Functional tests for ResourceMonitorParsedResult."""
250
251    def setUp(self):
252        self._res_dir = os.path.join(
253                            os.path.dirname(os.path.realpath(__file__)),
254                            'res_resource_monitor')
255
256
257    def run_with_test_data(self, testdata_file, testans_file):
258        """Parses testdata_file with the parses, and checks that results
259            are the same as those in testans_file.
260
261        @param testdata_file string filename containing top output to test.
262        @param testans_file string filename containing answers to the test.
263
264        """
265        parsed_results = resource_monitor.ResourceMonitorParsedResult(
266                testdata_file)
267        with open(testans_file, "r") as testans:
268            csvreader = csv.reader(testans)
269            columns = next(csvreader)
270            self.assertEqual(list(columns),
271                    resource_monitor.ResourceMonitorParsedResult._columns)
272            utils_over_time = []
273            for util_val in map(
274                    resource_monitor.
275                            ResourceMonitorParsedResult.UtilValues._make,
276                    csvreader):
277                utils_over_time.append(util_val)
278            self.assertEqual(utils_over_time, parsed_results._utils_over_time)
279
280
281    def test_full_data(self):
282        """General test with many possible changes to input."""
283        self.run_with_test_data(
284                os.path.join(self._res_dir, 'top_test_data.txt'),
285                os.path.join(self._res_dir, 'top_test_data_ans.csv'))
286
287
288    def test_whitespace_ridden(self):
289        """Tests resilience to arbitrary whitespace characters between fields"""
290        self.run_with_test_data(
291                os.path.join(self._res_dir, 'top_whitespace_ridden.txt'),
292                os.path.join(self._res_dir, 'top_whitespace_ridden_ans.csv'))
293
294
295    def test_field_order_changed(self):
296        """Tests resilience to changes in the order of fields
297            (for e.g, if the Mem free/used fields change orders in the input).
298        """
299        self.run_with_test_data(
300                os.path.join(self._res_dir, 'top_field_order_changed.txt'),
301                os.path.join(self._res_dir, 'top_field_order_changed_ans.csv'))
302
303
304if __name__ == '__main__':
305    unittest.main()
306