xref: /aosp_15_r20/external/autotest/server/site_tests/firmware_ECThermal/firmware_ECThermal.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import re
7import six
8import time
9
10from autotest_lib.client.common_lib import error
11from autotest_lib.server.cros.faft.firmware_test import FirmwareTest
12from functools import reduce
13
14
15class firmware_ECThermal(FirmwareTest):
16    """
17    Servo based EC thermal engine test.
18    """
19    version = 1
20
21    # Delay for waiting fan to start or stop
22    FAN_DELAY = 5
23
24    # Delay for waiting device stressing to stablize
25    STRESS_DELAY = 30
26
27    # Delay for stressing device with fan off to check temperature increase
28    STRESS_DELAY_NO_FAN = 12
29
30    # Margin for comparing servo based and ectool based CPU temperature
31    TEMP_MISMATCH_MARGIN = 3
32
33    # Minimum increase of CPU temperature when stressing DUT
34    TEMP_STRESS_INCREASE = 3
35
36    # Pseudo INT_MAX. Used as infinity when comparing temperature readings
37    INT_MAX = 10000
38
39    # Sensor type ID of ignored sensors
40    SENSOR_TYPE_IGNORED = 255
41
42    # PID of DUT stressing processes
43    _stress_pid = list()
44
45    def enable_auto_fan_control(self):
46        """Enable EC automatic fan speed control"""
47        # We use set_nocheck because servo reports current target
48        # RPM instead 'auto', and therefore servo.set always fails.
49        self.servo.set_nocheck('fan_target_rpm', 'auto')
50
51
52    def max_fan(self):
53        """Maximize fan speed"""
54        # We use set_nocheck because servo reports current target
55        # RPM instead 'max', and therefore servo.set always fails.
56        self.servo.set_nocheck('fan_target_rpm', 'max')
57
58
59    def turn_off_fan(self):
60        """Turn off fan"""
61        self.servo.set('fan_target_rpm', 'off')
62
63
64    def _get_setting_for_type(self, type_id):
65        """
66        Retrieve thermal setting for a given type of sensor
67
68        Args:
69          type_id: The ID of sensor type.
70
71        Returns:
72          A list containing thresholds in the following order:
73            Warning
74            CPU off
75            All power off
76            Fan speed thresholds
77        """
78        setting = list()
79        current_id = 0
80        while True:
81            try:
82                lines = self.faft_client.system.run_shell_command_get_output(
83                        'ectool thermalget %d %d' % (type_id, current_id))
84            except six.moves.xmlrpc_client.Fault:
85                break
86            pattern = re.compile('Threshold \d* [a-z ]* \d* is (\d*) K.')
87            for line in lines:
88                matched = pattern.match(line)
89                if matched is not None:
90                    # Convert degree K to degree C
91                    setting.append(int(matched.group(1)) - 273)
92            current_id = current_id + 1
93
94        if len(setting) == 0:
95            return None
96        return setting
97
98
99    def get_fan_steps(self):
100        """Retrieve fan step config from EC"""
101        num_steps = len(self._thermal_setting[0]) - 3
102        self._fan_steps = list()
103        expected_pat = (["Lowest speed: ([0-9-]+) RPM"] +
104                        ["\d+ K:\s+([0-9-]+) RPM"] * num_steps)
105        match = self.ec.send_command_get_output("thermalfan 0", expected_pat)
106        for m in match:
107            self._fan_steps.append(int(m[1]))
108
109        # Get the actual value of each fan step
110        for i in range(num_steps + 1):
111            if self._fan_steps[i] == 0:
112                continue
113            self.servo.set_nocheck('fan_target_rpm', "%d" % self._fan_steps[i])
114            self._fan_steps[i] = int(self.servo.get('fan_target_rpm'))
115
116        logging.info("Actual fan steps: %s", self._fan_steps)
117
118
119    def get_thermal_setting(self):
120        """Retrieve thermal engine setting from EC"""
121        self._thermal_setting = list()
122        type_id = 0
123        while True:
124            setting = self._get_setting_for_type(type_id)
125            if setting is None:
126                break
127            self._thermal_setting.append(setting)
128            type_id = type_id + 1
129        logging.info("Number of tempearture sensor types: %d", type_id)
130
131        # Get the number of temperature sensors
132        self._num_temp_sensor = 0
133        while True:
134            try:
135                self.faft_client.system.run_shell_command('ectool temps %d' %
136                                                   self._num_temp_sensor)
137                self._num_temp_sensor = self._num_temp_sensor + 1
138            except six.moves.xmlrpc_client.Fault:
139                break
140        logging.info("Number of temperature sensor: %d", self._num_temp_sensor)
141
142
143    def initialize(self, host, cmdline_args):
144        super(firmware_ECThermal, self).initialize(host, cmdline_args)
145        # Don't bother if there is no Chrome EC.
146        if not self.check_ec_capability():
147            raise error.TestNAError("Nothing needs to be tested on this device")
148        self.ec.send_command("chan 0")
149        try:
150            self.faft_client.system.run_shell_command('stop temp_metrics')
151        except six.moves.xmlrpc_client.Fault:
152            self._has_temp_metrics = False
153        else:
154            logging.info('Stopped temp_metrics')
155            self._has_temp_metrics = True
156        if self.check_ec_capability(['thermal']):
157            self.get_thermal_setting()
158            self.get_fan_steps()
159            self.enable_auto_fan_control()
160
161
162    def cleanup(self):
163        try:
164            if self.check_ec_capability(['thermal']):
165                self.enable_auto_fan_control()
166            if self._has_temp_metrics:
167                logging.info('Starting temp_metrics')
168                self.faft_client.system.run_shell_command('start temp_metrics')
169            self.ec.send_command("chan 0xffffffff")
170        except Exception as e:
171            logging.error("Caught exception: %s", str(e))
172        super(firmware_ECThermal, self).cleanup()
173
174
175    def _find_cpu_sensor_id(self):
176        """
177        This function find CPU temperature sensor using ectool.
178
179        Returns:
180          Integer ID of CPU temperature sensor.
181
182        Raises:
183          error.TestFail: Raised if we fail to find PECI temparture through
184            ectool.
185        """
186        for temp_id in range(self._num_temp_sensor):
187            lines = self.faft_client.system.run_shell_command_get_output(
188                    'ectool tempsinfo %d' % temp_id)
189            for line in lines:
190                matched = re.match('Sensor name: (.*)', line)
191                if matched is not None and matched.group(1) == 'PECI':
192                    return temp_id
193        raise error.TestFail('Cannot find CPU temperature sensor ID.')
194
195
196    def _get_temp_reading(self, sensor_id):
197        """
198        Get temperature reading on a sensor through ectool
199
200        Args:
201          sensor_id: Temperature sensor ID.
202
203        Returns:
204          Temperature reading in degree C.
205
206        Raises:
207          six.moves.xmlrpc_client.Fault: Raised when we fail to read
208          temperature.
209          error.TestError: Raised if ectool doesn't behave as we expected.
210        """
211        assert sensor_id < self._num_temp_sensor
212        pattern = re.compile('Reading temperature...(\d*)')
213        lines = self.faft_client.system.run_shell_command_get_output(
214                'ectool temps %d' % sensor_id)
215        for line in lines:
216            matched = pattern.match(line)
217            if matched is not None:
218                return int(matched.group(1)) - 273
219        # Should never reach here
220        raise error.TestError("Unexpected error occurred")
221
222
223    def check_temp_report(self):
224        """
225        Checker of temperature reporting.
226
227        This function reads CPU temperature from servo and ectool. If
228        the two readings mismatches by more than TEMP_MISMATCH_MARGIN,'
229        test fails.
230
231        Raises:
232          error.TestFail: Raised when temperature reading mismatches by
233            more than TEMP_MISMATCH_MARGIN.
234        """
235        cpu_temp_id = self._find_cpu_sensor_id()
236        logging.info("CPU temperature sensor ID is %d", cpu_temp_id)
237        ectool_cpu_temp = self._get_temp_reading(cpu_temp_id)
238        servo_cpu_temp = int(self.servo.get('cpu_temp'))
239        logging.info("CPU temperature from servo: %d C", servo_cpu_temp)
240        logging.info("CPU temperature from ectool: %d C", ectool_cpu_temp)
241        if abs(ectool_cpu_temp - servo_cpu_temp) > self.TEMP_MISMATCH_MARGIN:
242            raise error.TestFail(
243                    'CPU temperature readings from servo and ectool differ')
244
245
246    def _stress_dut(self, threads=4):
247        """
248        Stress DUT system.
249
250        By reading from /dev/urandom and writing to /dev/null, we can stress
251        DUT and cause CPU temperature to go up. We stress the system forever,
252        until _stop_stressing is called to kill the stress threads. This
253        function is non-blocking.
254
255        Args:
256          threads: Number of threads (processes) when stressing forever.
257
258        Returns:
259          A list of stress process IDs is returned.
260        """
261        logging.info("Stressing DUT with %d threads...", threads)
262        self.faft_client.system.run_shell_command('pkill dd')
263        stress_cmd = 'dd if=/dev/urandom of=/dev/null bs=1M'
264        # Grep for [d]d instead of dd to prevent getting the PID of grep
265        # itself.
266        pid_cmd = "ps -ef | grep '[d]d if=/dev/urandom' | awk '{print $2}'"
267        block = False
268        self._stress_pid = list()
269        for _ in range(threads):
270            self.faft_client.system.run_shell_command(stress_cmd, block)
271        lines = self.faft_client.system.run_shell_command_get_output(
272                    pid_cmd)
273        for line in lines:
274            logging.info("PID is %s", line)
275            self._stress_pid.append(int(line.strip()))
276        return self._stress_pid
277
278
279    def _stop_stressing(self):
280        """Stop stressing DUT system"""
281        stop_cmd = 'kill -9 %d'
282        for pid in self._stress_pid:
283            self.faft_client.system.run_shell_command(stop_cmd % pid)
284
285
286    def check_fan_off(self):
287        """
288        Checker of fan turned off.
289
290        The function first delay FAN_DELAY seconds to ensure fan stops.
291        Then it reads fan speed and return False if fan speed is non-zero.
292        Then it stresses the system a bit and check if the temperature
293        goes up by more than TEMP_STRESS_INCREASE.
294
295        Raises:
296          error.TestFail: Raised when temperature doesn't increase by more than
297            TEMP_STRESS_INCREASE.
298        """
299        time.sleep(self.FAN_DELAY)
300        fan_speed = self.servo.get('fan_actual_rpm')
301        if int(fan_speed) != 0:
302            raise error.TestFail("Fan is not turned off.")
303        logging.info("EC reports fan turned off.")
304        cpu_temp_before = int(self.servo.get('cpu_temp'))
305        logging.info("CPU temperature before stressing is %d C",
306                     cpu_temp_before)
307        self._stress_dut()
308        time.sleep(self.STRESS_DELAY_NO_FAN)
309        cpu_temp_after = int(self.servo.get('cpu_temp'))
310        self._stop_stressing()
311        logging.info("CPU temperature after stressing is %d C",
312                     cpu_temp_after)
313        if cpu_temp_after - cpu_temp_before < self.TEMP_STRESS_INCREASE:
314            raise error.TestFail(
315                    "CPU temperature did not go up by more than %d degrees" %
316                    self.TEMP_STRESS_INCREASE)
317
318
319    def _get_temp_sensor_type(self, sensor_id):
320        """
321        Get type of a given temperature sensor
322
323        Args:
324          sensor_id: Temperature sensor ID.
325
326        Returns:
327          Type ID of the temperature sensor.
328
329        Raises:
330          error.TestError: Raised when ectool doesn't behave as we expected.
331        """
332        assert sensor_id < self._num_temp_sensor
333        pattern = re.compile('Sensor type: (\d*)')
334        lines = self.faft_client.system.run_shell_command_get_output(
335                'ectool tempsinfo %d' % sensor_id)
336        for line in lines:
337            matched = pattern.match(line)
338            if matched is not None:
339                return int(matched.group(1))
340        # Should never reach here
341        raise error.TestError("Unexpected error occurred")
342
343
344    def _check_fan_speed_per_sensor(self, fan_speed, sensor_id):
345        """
346        Check if the given fan_speed is reasonable from the view of certain
347        temperature sensor. There could be three types of outcome:
348          1. Fan speed is higher than expected. This may be due to other
349             sensor sensing higher temperature and setting fan to higher
350             speed.
351          2. Fan speed is as expected.
352          3. Fan speed is lower than expected. In this case, EC is not
353             working as expected and an error should be raised.
354
355        Args:
356          fan_speed: The current fan speed in RPM.
357          sensor_id: The ID of temperature sensor.
358
359        Returns:
360          0x00: Fan speed is higher than expected.
361          0x01: Fan speed is as expected.
362          0x10: Fan speed is lower than expected.
363
364        Raises:
365          error.TestError: Raised when getting unexpected fan speed.
366        """
367        sensor_type = self._get_temp_sensor_type(sensor_id)
368        if sensor_type == self.SENSOR_TYPE_IGNORED:
369            # This sensor should be ignored
370            return 0x00
371
372        if self._thermal_setting[sensor_type][-1] == -273:
373            # The fan stepping for this type of sensor is disabled
374            return 0x00
375
376        try:
377            idx = self._fan_steps.index(fan_speed)
378        except:
379            raise error.TestError("Unexpected fan speed: %d" % fan_speed)
380
381        if idx == 0:
382            lower_bound = -self.INT_MAX
383            upper_bound = self._thermal_setting[sensor_type][3]
384        elif idx == len(self._fan_steps) - 1:
385            lower_bound = self._thermal_setting[sensor_type][idx + 2] - 3
386            upper_bound = self.INT_MAX
387        else:
388            lower_bound = self._thermal_setting[sensor_type][idx + 2] - 3
389            upper_bound = self._thermal_setting[sensor_type][idx + 3]
390
391        temp_reading = self._get_temp_reading(sensor_id)
392        logging.info("Sensor %d = %d C", sensor_id, temp_reading)
393        logging.info("  Expecting %d - %d C", lower_bound, upper_bound)
394        if temp_reading > upper_bound:
395            return 0x00
396        elif temp_reading < lower_bound:
397            return 0x10
398        else:
399            return 0x01
400
401
402    def check_auto_fan(self):
403        """
404        Checker of thermal engine automatic fan speed control.
405
406        Stress DUT system for a longer period to make temperature more stable
407        and check if fan speed is controlled as expected.
408
409        Raises:
410          error.TestFail: Raised when fan speed is not as expected.
411        """
412        self._stress_dut()
413        time.sleep(self.STRESS_DELAY)
414        fan_rpm = int(self.servo.get('fan_target_rpm'))
415        logging.info('Fan speed is %d RPM', fan_rpm)
416        try:
417            result = reduce(lambda x, y: x | y,
418                            [self._check_fan_speed_per_sensor(fan_rpm, x)
419                             for x in range(self._num_temp_sensor)])
420        finally:
421            self._stop_stressing()
422        if result == 0x00:
423            raise error.TestFail("Fan speed higher than expected")
424        if result == 0x10:
425            raise error.TestFail("Fan speed lower than expected")
426
427
428    def run_once(self):
429        """Execute the main body of the test.
430        """
431        if not self.check_ec_capability(['thermal']):
432            raise error.TestNAError("Nothing needs to be tested on this device")
433        logging.info("Checking host temperature report.")
434        self.check_temp_report()
435
436        self.turn_off_fan()
437        logging.info("Verifying fan is turned off.")
438        self.check_fan_off()
439
440        self.enable_auto_fan_control()
441        logging.info("Verifying automatic fan control functionality.")
442        self.check_auto_fan()
443