xref: /aosp_15_r20/external/angle/build/util/lib/results/result_sink.py (revision 8975f5c5ed3d1c378011245431ada316dfb6f244)
1# Copyright 2020 The Chromium Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4from __future__ import absolute_import
5import base64
6import json
7import logging
8import os
9
10import requests  # pylint: disable=import-error
11from lib.results import result_types
12
13HTML_SUMMARY_MAX = 4096
14
15_HTML_SUMMARY_ARTIFACT = '<text-artifact artifact-id="HTML Summary" />'
16_TEST_LOG_ARTIFACT = '<text-artifact artifact-id="Test Log" />'
17
18# Maps result_types to the luci test-result.proto.
19# https://godoc.org/go.chromium.org/luci/resultdb/proto/v1#TestStatus
20RESULT_MAP = {
21    result_types.UNKNOWN: 'ABORT',
22    result_types.PASS: 'PASS',
23    result_types.FAIL: 'FAIL',
24    result_types.CRASH: 'CRASH',
25    result_types.TIMEOUT: 'ABORT',
26    result_types.SKIP: 'SKIP',
27    result_types.NOTRUN: 'SKIP',
28}
29
30
31def TryInitClient():
32  """Tries to initialize a result_sink_client object.
33
34  Assumes that rdb stream is already running.
35
36  Returns:
37    A ResultSinkClient for the result_sink server else returns None.
38  """
39  try:
40    with open(os.environ['LUCI_CONTEXT']) as f:
41      sink = json.load(f)['result_sink']
42      return ResultSinkClient(sink)
43  except KeyError:
44    return None
45
46
47class ResultSinkClient(object):
48  """A class to store the sink's post configurations and make post requests.
49
50  This assumes that the rdb stream has been called already and that the
51  server is listening.
52  """
53
54  def __init__(self, context):
55    base_url = 'http://%s/prpc/luci.resultsink.v1.Sink' % context['address']
56    self.test_results_url = base_url + '/ReportTestResults'
57    self.report_artifacts_url = base_url + '/ReportInvocationLevelArtifacts'
58    self.update_invocation_url = base_url + '/UpdateInvocation'
59
60    headers = {
61        'Content-Type': 'application/json',
62        'Accept': 'application/json',
63        'Authorization': 'ResultSink %s' % context['auth_token'],
64    }
65    self.session = requests.Session()
66    self.session.headers.update(headers)
67
68  def __enter__(self):
69    return self
70
71  def __exit__(self, exc_type, exc_value, traceback):
72    self.close()
73
74  def close(self):
75    """Closes the session backing the sink."""
76    self.session.close()
77
78  def Post(self,
79           test_id,
80           status,
81           duration,
82           test_log,
83           test_file,
84           variant=None,
85           artifacts=None,
86           failure_reason=None,
87           html_artifact=None,
88           tags=None):
89    """Uploads the test result to the ResultSink server.
90
91    This assumes that the rdb stream has been called already and that
92    server is ready listening.
93
94    Args:
95      test_id: A string representing the test's name.
96      status: A string representing if the test passed, failed, etc...
97      duration: An int representing time in ms.
98      test_log: A string representing the test's output.
99      test_file: A string representing the file location of the test.
100      variant: An optional dict of variant key value pairs as the
101          additional variant sent from test runners, which can override
102          or add to the variants passed to `rdb stream` command.
103      artifacts: An optional dict of artifacts to attach to the test.
104      failure_reason: An optional string with the reason why the test failed.
105          Should be None if the test did not fail.
106      html_artifact: An optional html-formatted string to prepend to the test's
107          log. Useful to encode click-able URL links in the test log, since that
108          won't be formatted in the test_log.
109      tags: An optional list of tuple of key name and value to prepend to the
110          test's tags.
111
112    Returns:
113      N/A
114    """
115    assert status in RESULT_MAP
116    expected = status in (result_types.PASS, result_types.SKIP)
117    result_db_status = RESULT_MAP[status]
118
119    tr = {
120        'expected':
121        expected,
122        'status':
123        result_db_status,
124        'tags': [
125            {
126                'key': 'test_name',
127                'value': test_id,
128            },
129            {
130                # Status before getting mapped to result_db statuses.
131                'key': 'raw_status',
132                'value': status,
133            }
134        ],
135        'testId':
136        test_id,
137        'testMetadata': {
138            'name': test_id,
139        }
140    }
141
142    if tags:
143      tr['tags'].extend({
144          'key': key_name,
145          'value': value
146      } for (key_name, value) in tags)
147
148    if variant:
149      tr['variant'] = {'def': variant}
150
151    artifacts = artifacts or {}
152    tr['summaryHtml'] = html_artifact if html_artifact else ''
153
154    # If over max supported length of html summary, replace with artifact
155    # upload.
156    if (test_log
157        and len(tr['summaryHtml']) + len(_TEST_LOG_ARTIFACT) > HTML_SUMMARY_MAX
158        or len(tr['summaryHtml']) > HTML_SUMMARY_MAX):
159      b64_summary = base64.b64encode(tr['summaryHtml'].encode()).decode()
160      artifacts.update({'HTML Summary': {'contents': b64_summary}})
161      tr['summaryHtml'] = _HTML_SUMMARY_ARTIFACT
162
163    if test_log:
164      # Upload the original log without any modifications.
165      b64_log = base64.b64encode(test_log.encode()).decode()
166      artifacts.update({'Test Log': {'contents': b64_log}})
167      tr['summaryHtml'] += _TEST_LOG_ARTIFACT
168
169    if artifacts:
170      tr['artifacts'] = artifacts
171    if failure_reason:
172      tr['failureReason'] = {
173          'primaryErrorMessage': _TruncateToUTF8Bytes(failure_reason, 1024)
174      }
175
176    if duration is not None:
177      # Duration must be formatted to avoid scientific notation in case
178      # number is too small or too large. Result_db takes seconds, not ms.
179      # Need to use float() otherwise it does substitution first then divides.
180      tr['duration'] = '%.9fs' % float(duration / 1000.0)
181
182    if test_file and str(test_file).startswith('//'):
183      tr['testMetadata']['location'] = {
184          'file_name': test_file,
185          'repo': 'https://chromium.googlesource.com/chromium/src',
186      }
187
188    res = self.session.post(url=self.test_results_url,
189                            data=json.dumps({'testResults': [tr]}))
190    res.raise_for_status()
191
192  def ReportInvocationLevelArtifacts(self, artifacts):
193    """Uploads invocation-level artifacts to the ResultSink server.
194
195    This is for artifacts that don't apply to a single test but to the test
196    invocation as a whole (eg: system logs).
197
198    Args:
199      artifacts: A dict of artifacts to attach to the invocation.
200    """
201    req = {'artifacts': artifacts}
202    res = self.session.post(url=self.report_artifacts_url, data=json.dumps(req))
203    res.raise_for_status()
204
205  def UpdateInvocation(self, invocation, update_mask):
206    """Update the invocation to the ResultSink server.
207
208    Details can be found in the proto luci.resultsink.v1.UpdateInvocationRequest
209
210    Args:
211      invocation: a dict representation of luci.resultsink.v1.Invocation proto
212      update_mask: a dict representation of google.protobuf.FieldMask proto
213    """
214    req = {
215        'invocation': invocation,
216        'update_mask': update_mask,
217    }
218    res = self.session.post(url=self.update_invocation_url,
219                            data=json.dumps(req))
220    res.raise_for_status()
221
222  def UpdateInvocationExtendedProperties(self, extended_properties, keys=None):
223    """Update the extended_properties field of an invocation.
224
225    Details can be found in the "extended_properties" field of the proto
226    luci.resultdb.v1.Invocation.
227
228    Args:
229      extended_properties: a dict containing the content of extended_properties.
230        The value in the dict shall be a dict containing a "@type" key
231        representing the data schema, and corresponding data.
232      keys: (Optional) a list of keys in extended_properties to add, replace,
233        or remove. If a key exists in "keys", but not in "extended_properties",
234        this is considered as deleting the key from the resultdb record side
235        If None, the keys in "extended_properties" dict will be used.
236    """
237    if not keys:
238      keys = extended_properties.keys()
239    mask_paths = ['extended_properties.%s' % key for key in keys]
240    invocation = {'extended_properties': extended_properties}
241    update_mask = {'paths': mask_paths}
242    self.UpdateInvocation(invocation, update_mask)
243
244
245def _TruncateToUTF8Bytes(s, length):
246  """ Truncates a string to a given number of bytes when encoded as UTF-8.
247
248  Ensures the given string does not take more than length bytes when encoded
249  as UTF-8. Adds trailing ellipsis (...) if truncation occurred. A truncated
250  string may end up encoding to a length slightly shorter than length because
251  only whole Unicode codepoints are dropped.
252
253  Args:
254    s: The string to truncate.
255    length: the length (in bytes) to truncate to.
256  """
257  try:
258    encoded = s.encode('utf-8')
259  # When encode throws UnicodeDecodeError in py2, it usually means the str is
260  # already encoded and has non-ascii chars. So skip re-encoding it.
261  except UnicodeDecodeError:
262    encoded = s
263  if len(encoded) > length:
264    # Truncate, leaving space for trailing ellipsis (...).
265    encoded = encoded[:length - 3]
266    # Truncating the string encoded as UTF-8 may have left the final codepoint
267    # only partially present. Pass 'ignore' to acknowledge and ensure this is
268    # dropped.
269    return encoded.decode('utf-8', 'ignore') + "..."
270  return s
271