xref: /aosp_15_r20/external/autotest/server/cros/power/power_telemetry_logger.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Lint as: python2, python3
2# Copyright 2018 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""Helper class for power measurement with telemetry devices."""
7
8from __future__ import absolute_import
9from __future__ import division
10from __future__ import print_function
11
12import collections
13import csv
14import datetime
15from distutils import sysconfig
16import json
17import logging
18import numpy
19import os
20import re
21import shutil
22import six
23import string
24import subprocess
25import threading
26import time
27
28import powerlog
29
30from servo import measure_power
31
32from autotest_lib.client.common_lib import error
33from autotest_lib.client.cros.power import power_status
34from autotest_lib.client.cros.power import power_telemetry_utils as utils
35from autotest_lib.server.cros.power import power_dashboard
36
37
38# If a sample has over 10% NaN values, the data might be very unreliable if
39# interpolation is applied.
40ACCEPTABLE_NAN_RATIO = 0.1
41
42# If a sample has more than these NaN values in sequence, the data is also not
43# reliable.
44MAX_CONSECUTIVE_NAN_READINGS = 5
45
46# If for over a second no values can be read, the data is also not reliable.
47MAX_NAN_GAP_S = 1
48
49# Dictionary to make passing the default arguments for loggers to the NaN
50# interpolation utility easy.
51INTERPOLATION_ARGS = {'max_nan_ratio': ACCEPTABLE_NAN_RATIO,
52                      'max_sample_gap': MAX_CONSECUTIVE_NAN_READINGS,
53                      'max_sample_time_gap': MAX_NAN_GAP_S}
54
55def ts_processing(ts_str):
56    """Parse autotest log timestamp into local time seconds since epoch.
57
58    @param ts_str: a timestamp string from client.DEBUG file in local time zone.
59    @return seconds since epoch, inserting the current year because ts_str does
60            not include year. This introduces error if client side test is
61            running across the turn of the year.
62    """
63    ts = datetime.datetime.strptime(ts_str, '%m/%d %H:%M:%S.%f ')
64    # TODO(mqg): fix the wrong year at turn of the year.
65    ts = ts.replace(year=datetime.datetime.today().year)
66    return time.mktime(ts.timetuple()) + ts.microsecond / 1e6
67
68
69class PowerTelemetryLogger(object):
70    """A helper class for power autotests requiring telemetry devices.
71
72    Telemetry: external pieces of hardware which help with measuring power
73    data on the Chrome device. This is not to be confused with library
74    telemetry.core, which is a required library / dependency for autotests
75    involving Chrome and / or ARC. Examples of power telemetry devices include
76    Servo and Sweetberry.
77
78    This logger class detects telemetry devices connected to the DUT. It will
79    then start and stop the measurement, trim the excessive power telemetry
80    device data and report the data back to the workstation and the dashboard
81    """
82
83    DASHBOARD_UPLOAD_URL = 'http://chrome-power.appspot.com'
84    DEFAULT_START = r'starting test\(run_once\(\)\), test details follow'
85    DEFAULT_END = r'The test has completed successfully'
86
87    def __init__(self, config, resultsdir, host):
88        """Init PowerTelemetryLogger.
89
90        @param config: the args argument from test_that in a dict. Settings for
91                       power telemetry devices.
92                       required data: {'test': 'test_TestName.tag'}
93        @param resultsdir: path to directory where current autotest results are
94                           stored, e.g. /tmp/test_that_results/
95                           results-1-test_TestName.tag/test_TestName.tag/
96                           results/
97        @param host: CrosHost object representing the DUT.
98        """
99        logging.debug('%s initialize.', self.__class__.__name__)
100        self._resultsdir = resultsdir
101        self._host = host
102        self._tagged_testname = config['test']
103        self._pdash_note = config.get('pdash_note', '')
104
105    def start_measurement(self):
106        """Start power telemetry devices."""
107        self._start_measurement()
108        logging.info('%s starts.', self.__class__.__name__)
109        self._start_ts = time.time()
110
111    def _start_measurement(self):
112        """Start power telemetry devices."""
113        raise NotImplementedError('Subclasses must implement '
114                '_start_measurement.')
115
116    def end_measurement(self, client_test_dir):
117        """End power telemetry devices.
118
119        End power measurement with telemetry devices, get the power telemetry
120        device data, trim the data recorded outside of actual testing, and
121        upload statistics to dashboard.
122
123        @param client_test_dir: directory of the client side test.
124        """
125        self._end_measurement()
126        logging.info('%s finishes.', self.__class__.__name__)
127        checkpoint_logger = self._get_client_test_checkpoint_logger(
128                client_test_dir)
129        start_ts, end_ts = self._get_client_test_ts(client_test_dir)
130        loggers = self._load_and_trim_data(start_ts, end_ts)
131        # Call export after trimming to only export trimmed data.
132        self._export_data_locally(client_test_dir,
133                                  checkpoint_logger.checkpoint_data)
134        self._upload_data(loggers, checkpoint_logger)
135
136    def _end_measurement(self):
137        """End power telemetry devices."""
138        raise NotImplementedError('Subclasses must implement _end_measurement.')
139
140    def _export_data_locally(self, client_test_dir, checkpoint_data=None):
141        """Slot for the logger to export measurements locally."""
142        raise NotImplementedError('Subclasses must implement '
143                                  '_export_data_locally.')
144
145    def _get_client_test_ts(self, client_test_dir):
146        """Determine the start and end timestamp for the telemetry data.
147
148        Power telemetry devices will run through the entire autotest, including
149        the overhead time, but we only need the measurements of actual testing,
150        so parse logs from client side test to determine the start and end
151        timestamp for the telemetry data.
152
153        @param client_test_dir: directory of the client side test.
154        @return (start_ts, end_ts)
155                start_ts: the start timestamp of the client side test in seconds
156                          since epoch or None.
157                end_ts: the end timestamp of the client side test in seconds
158                        since epoch or None.
159        """
160        if not os.path.isdir(client_test_dir):
161            logging.error('Cannot find client side test dir %s, no need to '
162                          'trim power telemetry measurements.', client_test_dir)
163            return (None, None)
164
165        # Use timestamp in client side test power_log.json as start & end
166        # timestamp.
167        power_log_path = os.path.join(client_test_dir, 'results',
168                                      'power_log.json')
169        start_ts, end_ts = self._get_power_log_ts(power_log_path)
170        if start_ts and end_ts:
171            self._start_ts = start_ts
172            return (start_ts, end_ts)
173
174        # Parse timestamp in client side test debug log and use as start & end
175        # timestamp.
176        client_test_name = os.path.basename(client_test_dir)
177        debug_file_path = os.path.join(client_test_dir, 'debug',
178                                       '%s.DEBUG' % client_test_name)
179        start_ts, end_ts = self._get_debug_log_ts(debug_file_path)
180        if start_ts:
181            self._start_ts = start_ts
182        return (start_ts, end_ts)
183
184    def _get_debug_log_ts(self, debug_file_path):
185        """Parse client side test start and end timestamp from debug log.
186
187        @param debug_file_path: path to client side test debug log.
188        @return (start_ts, end_ts)
189                start_ts: the start timestamp of the client side test in seconds
190                          since epoch or None.
191                end_ts: the end timestamp of the client side test in seconds
192                        since epoch or None.
193        """
194        default_test_events = collections.defaultdict(dict)
195        custom_test_events = collections.defaultdict(dict)
196        default_test_events['start']['str'] = self.DEFAULT_START
197        default_test_events['end']['str'] = self.DEFAULT_END
198        custom_test_events['start']['str'] = utils.CUSTOM_START
199        custom_test_events['end']['str'] = utils.CUSTOM_END
200        for event in default_test_events:
201            default_test_events[event]['re'] = re.compile(r'([\d\s\./:]+).+' +
202                    default_test_events[event]['str'])
203            default_test_events[event]['match'] = False
204        for event in custom_test_events:
205            custom_test_events[event]['re'] = re.compile(r'.*' +
206                    custom_test_events[event]['str'] + r'\s+([\d\.]+)')
207        events_ts = {
208            'start': None,
209            'end': None,
210        }
211
212        try:
213            with open(debug_file_path, 'r') as debug_log:
214
215                for line in debug_log:
216                    for event in default_test_events:
217                        match = default_test_events[event]['re'].match(line)
218                        if match:
219                            default_test_events[event]['ts'] = \
220                                    ts_processing(match.group(1))
221                            default_test_events[event]['match'] = True
222                    for event in custom_test_events:
223                        match = custom_test_events[event]['re'].match(line)
224                        if match:
225                            custom_test_events[event]['ts'] = \
226                                    float(match.group(1))
227
228            for event in default_test_events:
229                if not default_test_events[event]['match']:
230                    raise error.TestWarn('Cannot find %s timestamp in client '
231                                         'side test debug log.')
232
233            for event in events_ts:
234                events_ts[event] = default_test_events[event].get(
235                        'ts', events_ts[event])
236                events_ts[event] = custom_test_events[event].get(
237                        'ts', events_ts[event])
238
239            return (events_ts['start'], events_ts['end'])
240
241        except Exception as exc:
242            logging.warning('Client side test debug log %s does not contain '
243                            'valid start and end timestamp, see exception: %s',
244                            debug_file_path, exc)
245            return (None, None)
246
247    def _get_power_log_ts(self, power_log_path):
248        """Parse client side test start and end timestamp from power_log.json.
249
250        @param power_log_path: path to client side test power_log.json.
251        @return (start_ts, end_ts)
252                start_ts: the start timestamp of the client side test in seconds
253                          since epoch or None.
254                end_ts: the end timestamp of the client side test in seconds
255                        since epoch or None.
256        """
257        try:
258            with open(power_log_path, 'r') as power_log:
259                power_log_str = power_log.read()
260                json_decoder = json.JSONDecoder()
261                power_log_obj = []
262
263                idx = 0
264                start_ts = list()
265                end_ts = list()
266                while idx < len(power_log_str):
267                    log_entry, idx = json_decoder.raw_decode(power_log_str, idx)
268                    start_ts.append(log_entry['timestamp'])
269                    end_ts.append(log_entry['timestamp'] +
270                                  log_entry['power']['sample_duration'] *
271                                  log_entry['power']['sample_count'])
272
273                return (min(start_ts), max(end_ts))
274        except Exception as exc:
275            logging.warning('Client side test power_log %s does not contain '
276                            'valid start and end timestamp, see exception: %s',
277                            power_log_path, exc)
278            return (None, None)
279
280    def _load_and_trim_data(self, start_ts, end_ts):
281        """Load data and trim data.
282
283        Load and format data recorded by power telemetry devices. Trim data if
284        necessary.
285
286        @param start_ts: start timestamp in seconds since epoch, None if no
287                         need to trim data.
288        @param end_ts: end timestamp in seconds since epoch, None if no need to
289                       trim data.
290        @return a list of loggers, where each logger contains raw power data and
291                statistics.
292
293        logger format:
294        {
295            'sample_count' : 60,
296            'sample_duration' : 60,
297            'data' : {
298                'domain_1' : [ 111.11, 123.45 , ... , 99.99 ],
299                ...
300                'domain_n' : [ 3999.99, 4242.42, ... , 4567.89 ]
301            },
302            'average' : {
303                'domain_1' : 100.00,
304                ...
305                'domain_n' : 4300.00
306            },
307            'unit' : {
308                'domain_1' : 'milliwatt',
309                ...
310                'domain_n' : 'milliwatt'
311            },
312            'type' : {
313                'domain_1' : 'servod',
314                ...
315                'domain_n' : 'servod'
316            },
317        }
318        """
319        raise NotImplementedError('Subclasses must implement '
320                '_load_and_trim_data and return a list of loggers.')
321
322    def _get_client_test_checkpoint_logger(self, client_test_dir):
323        """Load the client-side test checkpoints.
324
325        The key data we need is the checkpoint_logger.checkpoint_data object.
326        This is a dictionary that contains for each key a list of [start, end]
327        timestamps (seconds since epoch) for a checkpoint.
328        Note: should there be issues loading the data, the checkpoint logger
329        will still be returned, but it will be empty. Code that relies on the
330        returned object here and wants to make sure its valid, needs to check
331        against the |checkpoint_logger.checkpoint_data| being empty, as it
332        will never be None
333
334        Returns: CheckpointLogger object with client endpoints, or empty data
335        """
336        client_test_resultsdir = os.path.join(client_test_dir, 'results')
337        checkpoint_logger = power_status.get_checkpoint_logger_from_file(
338                resultsdir=client_test_resultsdir)
339        return checkpoint_logger
340
341    def _upload_data(self, loggers, checkpoint_logger):
342        """Upload the data to dashboard.
343
344        @param loggers: a list of loggers, where each logger contains raw power
345                        data and statistics.
346        """
347
348        for logger in loggers:
349            pdash = power_dashboard.PowerTelemetryLoggerDashboard(
350                    logger=logger, testname=self._tagged_testname,
351                    host=self._host, start_ts=self._start_ts,
352                    checkpoint_logger=checkpoint_logger,
353                    resultsdir=self._resultsdir,
354                    uploadurl=self.DASHBOARD_UPLOAD_URL, note=self._pdash_note)
355            pdash.upload()
356
357
358class PacTelemetryLogger(PowerTelemetryLogger):
359    """This logger class measures power via pacman debugger."""
360
361    def __init__(self, config, resultsdir, host):
362        """Init PacTelemetryLogger.
363
364        @param config: the args argument from test_that in a dict. Settings for
365                       power telemetry devices.
366                       required data:
367                       {'test': 'test_TestName.tag',
368                        'config': PAC address and sense resistor .py file location,
369                        'mapping: DUT power rail mapping csv file,
370                        'gpio': gpio}
371        @param resultsdir: path to directory where current autotest results are
372                           stored, e.g. /tmp/test_that_results/
373                           results-1-test_TestName.tag/test_TestName.tag/
374                           results/
375        @param host: CrosHost object representing the DUT.
376
377        @raises error.TestError if problem running pacman.py
378        """
379        super(PacTelemetryLogger, self).__init__(config, resultsdir, host)
380        required_args = ['config', 'mapping', 'gpio']
381        for arg in required_args:
382            if arg not in config:
383                msg = 'Missing required arguments for PacTelemetryLogger: %s' % arg
384                raise error.TestError(msg)
385        self._pac_config_file = config['config']
386        self._pac_mapping_file = config['mapping']
387        self._pac_gpio_file = config['gpio']
388        self._resultsdir = resultsdir
389        self.pac_path = self._get_pacman_install_path()
390        self.pac_data_path = os.path.join(resultsdir, 'pac')
391
392        os.makedirs(self.pac_data_path, exist_ok=True)
393
394        # Check if pacman is able to run
395        try:
396            subprocess.check_output('pacman.py', timeout=5, cwd=self.pac_path)
397        except subprocess.CalledProcessError as e:
398            msg = 'Error running pacman.py '\
399                  'Check dependencies have been installed'
400            logging.error(msg)
401            logging.error(e.output)
402            raise error.TestError(e)
403
404    def _start_measurement(self):
405        """Start a pacman thread with the given config, mapping, and gpio files."""
406
407        self._log = open(os.path.join(self.pac_data_path, "pac.log"), "a")
408
409        self._pacman_args = [
410                '--config', self._pac_config_file, '--mapping',
411                self._pac_mapping_file, '--gpio', self._pac_gpio_file,
412                '--output', self.pac_data_path
413        ]
414
415        logging.debug('Starting pacman process')
416        cmds = ['pacman.py'] + self._pacman_args
417        logging.debug(cmds)
418
419        self._pacman_process = subprocess.Popen(cmds,
420                                                cwd=self.pac_path,
421                                                stdout=self._log,
422                                                stderr=self._log)
423
424    def _end_measurement(self):
425        """Stop pacman thread. This will dump and process the accumulators."""
426        self._pacman_process.send_signal(2)
427        self._pacman_process.wait(timeout=10)
428        self._load_and_trim_data(None, None)
429        self._export_data_locally(self._resultsdir)
430
431        self._log.close()
432
433    def _get_pacman_install_path(self):
434        """Return the absolute path of pacman on the host.
435
436        @raises error.TestError if pacman is not in PATH
437        """
438        pac_path = shutil.which('pacman.py')
439        if pac_path == None:
440            msg = 'Unable to locate pacman.py \n'\
441                  'Check pacman.py is in PATH'
442            logging.error(msg)
443            raise error.TestNAError(msg)
444        return os.path.dirname(pac_path)
445
446    def _load_and_trim_data(self, start_ts, end_ts):
447        """Load data and trim data.
448
449        Load and format data recorded by power telemetry devices. Trim data if
450        necessary.
451
452        @param start_ts: start timestamp in seconds since epoch, None if no
453                         need to trim data.
454        @param end_ts: end timestamp in seconds since epoch, None if no need to
455                       trim data.
456        @return a list of loggers, where each logger contains raw power data and
457                statistics.
458
459        @raises TestError when unable to locate or open pacman accumulator results
460
461        logger format:
462        {
463            'sample_count' : 60,
464            'sample_duration' : 60,
465            'data' : {
466                'domain_1' : [ 111.11, 123.45 , ... , 99.99 ],
467                ...
468                'domain_n' : [ 3999.99, 4242.42, ... , 4567.89 ]
469            },
470            'average' : {
471                'domain_1' : 100.00,
472                ...
473                'domain_n' : 4300.00
474            },
475            'unit' : {
476                'domain_1' : 'milliwatt',
477                ...
478                'domain_n' : 'milliwatt'
479            },
480            'type' : {
481                'domain_1' : 'servod',
482                ...
483                'domain_n' : 'servod'
484            },
485        }
486        """
487        loggers = list()
488        accumulator_path = os.path.join(self.pac_data_path, 'accumulatorData.csv')
489        if not os.path.exists(accumulator_path):
490            raise error.TestError('Unable to locate pacman results!')
491        # Load resulting pacman csv file
492        try:
493            with open(accumulator_path, 'r') as csvfile:
494                reader = csv.reader(csvfile, delimiter=',')
495                # Capture the first line
496                schema = next(reader)
497                # First column is an index
498                schema[0] = 'index'
499                # Place data into a dictionary
500                self._accumulator_data = list()
501                for row in reader:
502                    measurement = dict(zip(schema, row))
503                    self._accumulator_data.append(measurement)
504        except OSError:
505            raise error.TestError('Unable to open pacman accumulator results!')
506
507        # Match required logger format
508        log = {
509                'sample_count': 1,
510                'sample_duration': float(self._accumulator_data[0]['tAccum']),
511                'data': {
512                        x['Rail']: [float(x['Average Power (w)'])]
513                        for x in self._accumulator_data
514                },
515                'average': {
516                        x['Rail']: float(x['Average Power (w)'])
517                        for x in self._accumulator_data
518                },
519                'unit': {x['Rail']: 'watts'
520                         for x in self._accumulator_data},
521                'type': {x['Rail']: 'pacman'
522                         for x in self._accumulator_data},
523        }
524        loggers.append(log)
525        return loggers
526
527    def output_pacman_aggregates(self, test):
528        """This outputs all the processed aggregate values to the results-chart.json
529
530        @param test: the test.test object to use when outputting the
531                    performance values to results-chart.json
532        """
533        for rail in self._accumulator_data:
534            test.output_perf_value(rail['Rail'],
535                                   float(rail['Average Power (w)']),
536                                   units='watts',
537                                   replace_existing_values=True)
538
539    def _export_data_locally(self, client_test_dir, checkpoint_data=None):
540        """Slot for the logger to export measurements locally."""
541        self._local_pac_data_path = os.path.join(client_test_dir,
542                                                 'pacman_data')
543        shutil.copytree(self.pac_data_path, self._local_pac_data_path)
544
545    def _upload_data(self, loggers, checkpoint_logger):
546        """
547        _upload_data is defined as a pass as a hot-fix to external partners' lack
548        of access to the power_dashboard URL
549        """
550        pass
551
552
553class ServodTelemetryLogger(PowerTelemetryLogger):
554    """This logger class measures power by querying a servod instance."""
555
556    DEFAULT_INA_RATE = 20.0
557    DEFAULT_VBAT_RATE = 60.0
558
559    def __init__(self, config, resultsdir, host):
560        """Init ServodTelemetryLogger.
561
562        @param config: the args argument from test_that in a dict. Settings for
563                       power telemetry devices.
564                       required data:
565                       {'test': 'test_TestName.tag',
566                        'servo_host': host of servod instance,
567                        'servo_port: port that the servod instance is on}
568        @param resultsdir: path to directory where current autotest results are
569                           stored, e.g. /tmp/test_that_results/
570                           results-1-test_TestName.tag/test_TestName.tag/
571                           results/
572        @param host: CrosHost object representing the DUT.
573        """
574        super(ServodTelemetryLogger, self).__init__(config, resultsdir, host)
575
576        self._servo_host = host.servo._servo_host.hostname
577        self._servo_port = host.servo._servo_host.servo_port
578        self._ina_rate = float(config.get('ina_rate', self.DEFAULT_INA_RATE))
579        self._vbat_rate = float(config.get('vbat_rate', self.DEFAULT_VBAT_RATE))
580        self._pm = measure_power.PowerMeasurement(host=self._servo_host,
581                                                  port=self._servo_port,
582                                                  adc_rate=self._ina_rate,
583                                                  vbat_rate=self._vbat_rate)
584
585    def _start_measurement(self):
586        """Start power measurement by querying servod."""
587        setup_done = self._pm.MeasurePower()
588        # Block the main thread until setup is done and measurement has started.
589        setup_done.wait()
590
591    def _end_measurement(self):
592        """End querying servod."""
593        self._pm.FinishMeasurement()
594
595    def _export_data_locally(self, client_test_dir, checkpoint_data=None):
596        """Output formatted text summaries to test results directory.
597
598        @param client_test_dir: path to the client test output
599        @param checkpoint_data: dict, checkpoint data. data is list of tuples
600                                of (start,end) format for the timesteps
601        """
602        # At this point the PowerMeasurement unit has been processed. Dump its
603        # formatted summaries into the results directory.
604        power_summaries_dir = os.path.join(self._resultsdir, 'power_summaries')
605        self._pm.SaveSummary(outdir=power_summaries_dir)
606        # After the main summaries are exported, we also want to export one
607        # for each checkpoint. As each checkpoint might contain multiple
608        # entries, the checkpoint name is expanded by a digit.
609        def export_checkpoint(name, start, end):
610            """Helper to avoid code duplication for 0th and next cases."""
611            self._pm.SaveTrimmedSummary(tag=name,
612                                        tstart=start,
613                                        tend=end,
614                                        outdir=power_summaries_dir)
615
616        if checkpoint_data:
617            for checkpoint_name, checkpoint_list in checkpoint_data.items():
618                # Export the first entry without any sort of name change.
619                tstart, tend = checkpoint_list[0]
620                export_checkpoint(checkpoint_name, tstart, tend)
621                for suffix, checkpoint_element in enumerate(
622                        checkpoint_list[1:], start=1):
623                    # Export subsequent entries with a suffix
624                    tstart, tend = checkpoint_element
625                    export_checkpoint('%s%d' % (checkpoint_name, suffix),
626                                      tstart, tend)
627
628    def _load_and_trim_data(self, start_ts, end_ts):
629        """Load data and trim data.
630
631        Load and format data recorded by servod. Trim data if necessary.
632        """
633        self._pm.ProcessMeasurement(start_ts, end_ts)
634
635        summary = self._pm.GetSummary()
636        raw_data = self._pm.GetRawData()
637
638        loggers = list()
639
640        # Domains in summary/raw_data that carry no power-data.
641        metadata_domains = ['Sample_msecs', 'time', 'timeline']
642
643        for source in summary:
644            tl = raw_data[source]['timeline']
645            samples = len(tl)
646            data = {
647                k[:-3] if k.endswith('_mw') else k: v
648                for k, v in six.iteritems(raw_data[source])
649                if k not in metadata_domains
650            }
651
652            # Add the timeline of this measurement to the interpolation
653            # arguments. This is to detect and reject large measurement gaps.
654            # See above for details or in power_telemetry_utils.
655            INTERPOLATION_ARGS['timeline'] = tl
656
657            try:
658                # Smoothen out data to remove any NaN values by interpolating
659                # the missing values. If too many values are NaN, or too many
660                # values are NaN consecutively, fail the test.
661                # Here r stands for rail and d stands for data.
662                data = {r: utils.interpolate_missing_data(d,
663                                                          **INTERPOLATION_ARGS)
664                        for r, d in six.iteritems(data)}
665            except utils.TelemetryUtilsError as e:
666                raise error.TestFail('Issue at source %s: %s' % (source,
667                                                                 str(e)))
668
669            ave = {
670                k[:-3] if k.endswith('_mw') else k: v['mean']
671                for k, v in six.iteritems(summary[source])
672                if k not in metadata_domains
673            }
674            if samples > 1:
675                # Having more than one sample allows the code to properly set a
676                # sample duration.
677                sample_duration = (tl[-1] - tl[0]) / (samples - 1)
678            else:
679                # In thise case, it seems that there is only one sample as the
680                # difference between start and end is 0. Use the entire duration
681                # of the test as the sample start/end
682                sample_duration = end_ts - start_ts
683
684            logger = {
685                # All data domains should have same sample count.
686                'sample_count': summary[source]['time']['count'],
687                'sample_duration': sample_duration,
688                'data': data,
689                'average': ave,
690                # TODO(mqg): hard code the units for now because we are only
691                # dealing with power so far. When we start to work with voltage
692                # or current, read the units from the .json files.
693                'unit': {k: 'milliwatt' for k in data},
694                'type': {k: 'servod' for k in data},
695            }
696
697            loggers.append(logger)
698
699        return loggers
700
701
702class PowerlogTelemetryLogger(PowerTelemetryLogger):
703    """This logger class measures power with Sweetberry via powerlog tool.
704    """
705
706    DEFAULT_SWEETBERRY_INTERVAL = 20.0
707    SWEETBERRY_CONFIG_DIR = os.path.join(
708            sysconfig.get_python_lib(standard_lib=False), 'servo', 'data')
709
710    def __init__(self, config, resultsdir, host):
711        """Init PowerlogTelemetryLogger.
712
713        @param config: the args argument from test_that in a dict. Settings for
714                       power telemetry devices.
715                       required data: {'test': 'test_TestName.tag'}
716        @param resultsdir: path to directory where current autotest results are
717                           stored, e.g. /tmp/test_that_results/
718                           results-1-test_TestName.tag/test_TestName.tag/
719                           results/
720        @param host: CrosHost object representing the DUT.
721        """
722        super(PowerlogTelemetryLogger, self).__init__(config, resultsdir, host)
723
724        self._interval = float(config.get('sweetberry_interval',
725                                          self.DEFAULT_SWEETBERRY_INTERVAL))
726        self._logdir = os.path.join(resultsdir, 'sweetberry_log')
727        self._end_flag = threading.Event()
728        self._sweetberry_serial = config.get('sweetberry_serial', None)
729        if 'sweetberry_config' in config:
730            self._sweetberry_config = config['sweetberry_config']
731        else:
732            board = self._host.get_board().replace('board:', '')
733            hardware_rev = self._host.get_hardware_revision()
734            self._sweetberry_config = board + '_' + hardware_rev
735        board_path, scenario_path = \
736                self._get_sweetberry_config_path(self._sweetberry_config)
737        self._sweetberry_thread = SweetberryThread(
738                board=board_path,
739                scenario=scenario_path,
740                interval=self._interval,
741                stats_json_dir=self._logdir,
742                end_flag=self._end_flag,
743                serial=self._sweetberry_serial)
744        self._sweetberry_thread.setDaemon(True)
745
746    def _start_measurement(self):
747        """Start power measurement with Sweetberry via powerlog tool."""
748        self._sweetberry_thread.start()
749
750    def _export_data_locally(self, client_test_dir, checkpoint_data=None):
751        """Output formatted text summaries locally."""
752        #TODO(crbug.com/978665): implement this.
753        pass
754
755    def _end_measurement(self):
756        """End querying Sweetberry."""
757        self._end_flag.set()
758        # Sweetberry thread should theoretically finish within 1 self._interval
759        # but giving 2 here to be more lenient.
760        self._sweetberry_thread.join(self._interval * 2)
761        if self._sweetberry_thread.is_alive():
762            logging.warning('%s %s thread did not finish. There might be extra '
763                            'data at the end.', self.__class__.__name__,
764                            self._sweetberry_thread.name)
765
766    def _load_and_trim_data(self, start_ts, end_ts):
767        """Load data and trim data.
768
769        Load and format data recorded by powerlog tool. Trim data if necessary.
770        """
771        if not os.path.exists(self._logdir):
772            logging.error('Cannot find %s, no Sweetberry measurements exist, '
773                          'not uploading to dashboard.', self._logdir)
774            return
775
776        trimmed_log_dirs = list()
777        # Adding a padding to both start and end timestamp because the timestamp
778        # of each data point is taken at the end of its corresponding interval.
779        start_ts = start_ts + self._interval / 2 if start_ts else 0
780        end_ts = end_ts + self._interval / 2 if end_ts else time.time()
781        for dirname in os.listdir(self._logdir):
782            if dirname.startswith('sweetberry'):
783                sweetberry_ts = float(string.lstrip(dirname, 'sweetberry'))
784                if start_ts <= sweetberry_ts <= end_ts:
785                    trimmed_log_dirs.append(dirname)
786
787        data = collections.defaultdict(list)
788        for sweetberry_file in sorted(trimmed_log_dirs):
789            fname = os.path.join(self._logdir, sweetberry_file, 'summary.json')
790            with open(fname, 'r') as f:
791                d = json.load(f)
792                for k, v in six.iteritems(d):
793                    data[k].append(v['mean'])
794
795        logger = {
796            # All data domains should have same sample count.
797            'sample_count': len(next(six.itervalues(data))),
798            'sample_duration': self._interval,
799            'data': data,
800            'average': {k: numpy.average(v) for k, v in six.iteritems(data)},
801            # TODO(mqg): hard code the units for now because we are only dealing
802            # with power so far. When we start to work with voltage or current,
803            # read the units from the .json files.
804            'unit': {k: 'milliwatt' for k in data},
805            'type': {k: 'sweetberry' for k in data},
806        }
807
808        return [logger]
809
810    def _get_sweetberry_config_path(self, filename):
811        """Get the absolute path for Sweetberry board and scenario file.
812
813        @param filename: string of Sweetberry config filename.
814        @return a tuple of the path to Sweetberry board file and the path to
815                Sweetberry scenario file.
816        @raises error.TestError if board file or scenario file does not exist in
817                file system.
818        """
819        board_path = os.path.join(self.SWEETBERRY_CONFIG_DIR,
820                                  '%s.board' % filename)
821        if not os.path.isfile(board_path):
822            msg = 'Sweetberry board file %s does not exist.' % board_path
823            raise error.TestError(msg)
824
825        scenario_path = os.path.join(self.SWEETBERRY_CONFIG_DIR,
826                                     '%s.scenario' % filename)
827        if not os.path.isfile(scenario_path):
828            msg = 'Sweetberry scenario file %s does not exist.' % scenario_path
829            raise error.TestError(msg)
830        return (board_path, scenario_path)
831
832
833class SweetberryThread(threading.Thread):
834    """A thread that starts and ends Sweetberry measurement."""
835
836    def __init__(self, board, scenario, interval, stats_json_dir, end_flag,
837                 serial=None):
838        """Initialize the Sweetberry thread.
839
840        Once started, this thread will invoke Sweetberry powerlog tool every
841        [interval] seconds, which will sample each rail in [scenario] file
842        multiple times and write the average of those samples in json format to
843        [stats_json_dir]. The resistor size of each power rail is specified in
844        [board] file.
845
846        See go/sweetberry and go/sweetberry-readme for more details.
847
848        @param board: file name for Sweetberry board file.
849        @param scenario: file name for Sweetberry scenario file.
850        @param interval: time of each Sweetberry run cycle; print Sweetberry
851                         data every <interval> seconds.
852        @param stats_json_dir: directory to store Sweetberry stats in json.
853        @param end_flag: event object, stop Sweetberry measurement when this is
854                         set.
855        @param serial: serial number of sweetberry.
856        """
857        threading.Thread.__init__(self, name='Sweetberry')
858        self._end_flag = end_flag
859        self._interval = interval
860        self._argv = ['--board', board,
861                      '--config', scenario,
862                      '--save_stats_json', stats_json_dir,
863                      '--no_print_raw_data',
864                      '--mW']
865        if serial:
866            self._argv.extend(['--serial', serial])
867
868    def run(self):
869        """Start Sweetberry measurement until end_flag is set."""
870        logging.debug('Sweetberry starts.')
871        loop = 0
872        start_timestamp = time.time()
873        while not self._end_flag.is_set():
874            # TODO(mqg): in the future use more of powerlog components
875            # explicitly, make a long call and harvest data from Sweetberry,
876            # instead of using it like a command line tool now.
877            loop += 1
878            next_loop_start_timestamp = start_timestamp + loop * self._interval
879            current_timestamp = time.time()
880            this_loop_duration = next_loop_start_timestamp - current_timestamp
881            powerlog.main(self._argv + ['--seconds', str(this_loop_duration)])
882        logging.debug('Sweetberry stops.')
883