xref: /aosp_15_r20/external/autotest/client/cros/power/power_telemetry_utils.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Lint as: python2, python3
2# Copyright 2018 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""Helper class for power autotests requiring telemetry devices."""
7
8from __future__ import absolute_import
9from __future__ import division
10from __future__ import print_function
11
12import logging
13import time
14
15import numpy
16from six.moves import range
17
18CUSTOM_START = 'PowerTelemetryLogger custom start.'
19CUSTOM_END = 'PowerTelemetryLogger custom end.'
20INTERPOLATION_RESOLUTION = 6
21
22
23class TelemetryUtilsError(Exception):
24    """Error class for issues using these utilities."""
25
26
27def interpolate_missing_data(data, max_nan_ratio=None, max_sample_gap=None,
28                             max_sample_time_gap=None, timeline=None):
29    """Interpolate missing power readings in data.
30
31    @param data: array of values
32    @min_nan_ratio: optional, float, max acceptable ratio of NaN to real values
33    @max_sample_gap: optional, int, max acceptable number of NaN in a row
34    @max_sample_time_gap: optional, float, max measurement gap in seconds
35                       Note: supplying max_nan_time_gap requires timeline
36    @timeline: array of same size as |data| with timeline info for each sample
37
38    @returns: list, array |data| with missing values interpolated.
39    @raises: TelemetryUtilsError if
40              - the ratio of NaN is higher than |max_nan_ratio| (if supplied)
41              - no NaN gap is larger than |max_sample_gap| (if supplied)
42              - no NaN gap takes more time in |timeline| than
43                |max_sample_time_gap| (if supplied)
44              - all values in |data| are NaN.
45    """
46    if max_sample_time_gap is not None and timeline is None:
47        # These are mutually required.
48        raise TelemetryUtilsError('Supplying max_sample_time_gap requires a '
49                                  'timeline.')
50    data = numpy.array(data)
51    nan_data = numpy.isnan(data)
52    if max_nan_ratio:
53        # Validate the ratio if a ratio is supplied.
54        nan_ratio = float(sum(nan_data)) / len(data)
55        if nan_ratio > max_nan_ratio:
56            # There are too many errors in this source.
57            # Throw an error so the user has a chance to adjust their power
58            # collection setup.
59            raise TelemetryUtilsError('NaN ratio of %.02f '
60                                      ' - Max is %.02f.' % (nan_ratio,
61                                                            max_nan_ratio))
62    if max_sample_gap is not None or max_sample_time_gap is not None:
63        # Flag to keep track whether the loop is in a measurement gap (NaN).
64        consecutive_nan_start = None
65        # Add a stub at the end to make sure the iteration covers all real
66        # examples.
67        for i, isnan in enumerate(numpy.append(nan_data, False)):
68            if isnan and consecutive_nan_start is None:
69                consecutive_nan_start = i
70            if not isnan and consecutive_nan_start is not None:
71                consecutive_nans = i - consecutive_nan_start
72                if max_sample_gap and consecutive_nans >= max_sample_gap:
73                    # Reject if there are too many consecutive failures.
74                    raise TelemetryUtilsError('Too many consecutive NaN samples'
75                                              ': %d.' % consecutive_nans)
76                if max_sample_time_gap:
77                    # Checks whether the first valid timestamp before the
78                    # gap exists and whether the first valid timestamp after the
79                    # gap exists.
80                    if consecutive_nan_start == 0 or i == len(data):
81                        # We cannot determine the gap timeline properly here
82                        # as the gap either starts or ends with the time.
83                        # Ignore for now.
84                        continue
85                    sample_time_gap = (timeline[i] -
86                                       timeline[consecutive_nan_start-1])
87                    if sample_time_gap > max_sample_time_gap:
88                        raise TelemetryUtilsError('Excessively long sample gap '
89                                                  'of %.02fs. Longest '
90                                                  'permissible gap is %.02fs.'
91                                                  % (sample_time_gap,
92                                                     max_sample_time_gap))
93
94                # Reset the flag for the next gap.
95                consecutive_nan_start = None
96    # At this point the data passed all validations required.
97    sample_idx = numpy.arange(len(data))[[~nan_data]]
98    sample_vals = data[[~nan_data]]
99    if not len(sample_idx):
100        raise TelemetryUtilsError('Data has no valid readings. Cannot '
101                                  'interpolate.')
102    output = numpy.interp(list(range(len(data))), sample_idx, sample_vals)
103    return [round(x, INTERPOLATION_RESOLUTION) for x in output]
104
105def log_event_ts(message=None, timestamp=None, offset=0):
106    """Log the event and timestamp for parsing later.
107
108    @param message: description of the event.
109    @param timestamp: timestamp to for the event, if not provided, default to
110           current time. Local seconds since epoch.
111    @param offset: offset in seconds from the provided timestamp, or offset from
112           current time if timestamp is not provided. Can be positive or
113           negative.
114    """
115    if not message:
116        return
117    if timestamp:
118        ts = timestamp + offset
119    else:
120        ts = time.time() + offset
121    logging.debug("%s %s", message, ts)
122
123def start_measurement(timestamp=None, offset=0):
124    """Mark the start of power telemetry measurement.
125
126    Optional. Use only once in the client side test that is wrapped in the
127    power measurement wrapper tests to help pinpoint exactly where power
128    telemetry data should start. PowerTelemetryLogger will trim off excess data
129    before this point. If not used, power telemetry data will start right before
130    the client side test.
131    @param timestamp: timestamp for the start of measurement, if not provided,
132           default to current time. Local seconds since epoch.
133    @param offset: offset in seconds from the provided timestamp, or offset from
134           current time if timestamp is not provided. Can be positive or
135           negative.
136    """
137    log_event_ts(CUSTOM_START, timestamp, offset)
138
139def end_measurement(timestamp=None, offset=0):
140    """Mark the end of power telemetry measurement.
141
142    Optional. Use only once in the client side test that is wrapped in the
143    power measurement wrapper tests to help pinpoint exactly where power
144    telemetry data should end. PowerTelemetryLogger will trim off excess data
145    after this point. If not used, power telemetry data will end right after the
146    client side test.
147    @param timestamp: timestamp for the end of measurement, if not provided,
148           default to current time. Local seconds since epoch.
149    @param offset: offset in seconds from the provided timestamp, or offset from
150           current time if timestamp is not provided. Can be positive or
151           negative.
152    """
153    log_event_ts(CUSTOM_END, timestamp, offset)
154