1# Copyright 2020 The Chromium Authors 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4from __future__ import absolute_import 5import base64 6import json 7import logging 8import os 9 10import requests # pylint: disable=import-error 11from lib.results import result_types 12 13HTML_SUMMARY_MAX = 4096 14 15_HTML_SUMMARY_ARTIFACT = '<text-artifact artifact-id="HTML Summary" />' 16_TEST_LOG_ARTIFACT = '<text-artifact artifact-id="Test Log" />' 17 18# Maps result_types to the luci test-result.proto. 19# https://godoc.org/go.chromium.org/luci/resultdb/proto/v1#TestStatus 20RESULT_MAP = { 21 result_types.UNKNOWN: 'ABORT', 22 result_types.PASS: 'PASS', 23 result_types.FAIL: 'FAIL', 24 result_types.CRASH: 'CRASH', 25 result_types.TIMEOUT: 'ABORT', 26 result_types.SKIP: 'SKIP', 27 result_types.NOTRUN: 'SKIP', 28} 29 30 31def TryInitClient(): 32 """Tries to initialize a result_sink_client object. 33 34 Assumes that rdb stream is already running. 35 36 Returns: 37 A ResultSinkClient for the result_sink server else returns None. 38 """ 39 try: 40 with open(os.environ['LUCI_CONTEXT']) as f: 41 sink = json.load(f)['result_sink'] 42 return ResultSinkClient(sink) 43 except KeyError: 44 return None 45 46 47class ResultSinkClient(object): 48 """A class to store the sink's post configurations and make post requests. 49 50 This assumes that the rdb stream has been called already and that the 51 server is listening. 52 """ 53 54 def __init__(self, context): 55 base_url = 'http://%s/prpc/luci.resultsink.v1.Sink' % context['address'] 56 self.test_results_url = base_url + '/ReportTestResults' 57 self.report_artifacts_url = base_url + '/ReportInvocationLevelArtifacts' 58 self.update_invocation_url = base_url + '/UpdateInvocation' 59 60 headers = { 61 'Content-Type': 'application/json', 62 'Accept': 'application/json', 63 'Authorization': 'ResultSink %s' % context['auth_token'], 64 } 65 self.session = requests.Session() 66 self.session.headers.update(headers) 67 68 def __enter__(self): 69 return self 70 71 def __exit__(self, exc_type, exc_value, traceback): 72 self.close() 73 74 def close(self): 75 """Closes the session backing the sink.""" 76 self.session.close() 77 78 def Post(self, 79 test_id, 80 status, 81 duration, 82 test_log, 83 test_file, 84 variant=None, 85 artifacts=None, 86 failure_reason=None, 87 html_artifact=None, 88 tags=None): 89 """Uploads the test result to the ResultSink server. 90 91 This assumes that the rdb stream has been called already and that 92 server is ready listening. 93 94 Args: 95 test_id: A string representing the test's name. 96 status: A string representing if the test passed, failed, etc... 97 duration: An int representing time in ms. 98 test_log: A string representing the test's output. 99 test_file: A string representing the file location of the test. 100 variant: An optional dict of variant key value pairs as the 101 additional variant sent from test runners, which can override 102 or add to the variants passed to `rdb stream` command. 103 artifacts: An optional dict of artifacts to attach to the test. 104 failure_reason: An optional string with the reason why the test failed. 105 Should be None if the test did not fail. 106 html_artifact: An optional html-formatted string to prepend to the test's 107 log. Useful to encode click-able URL links in the test log, since that 108 won't be formatted in the test_log. 109 tags: An optional list of tuple of key name and value to prepend to the 110 test's tags. 111 112 Returns: 113 N/A 114 """ 115 assert status in RESULT_MAP 116 expected = status in (result_types.PASS, result_types.SKIP) 117 result_db_status = RESULT_MAP[status] 118 119 tr = { 120 'expected': 121 expected, 122 'status': 123 result_db_status, 124 'tags': [ 125 { 126 'key': 'test_name', 127 'value': test_id, 128 }, 129 { 130 # Status before getting mapped to result_db statuses. 131 'key': 'raw_status', 132 'value': status, 133 } 134 ], 135 'testId': 136 test_id, 137 'testMetadata': { 138 'name': test_id, 139 } 140 } 141 142 if tags: 143 tr['tags'].extend({ 144 'key': key_name, 145 'value': value 146 } for (key_name, value) in tags) 147 148 if variant: 149 tr['variant'] = {'def': variant} 150 151 artifacts = artifacts or {} 152 tr['summaryHtml'] = html_artifact if html_artifact else '' 153 154 # If over max supported length of html summary, replace with artifact 155 # upload. 156 if (test_log 157 and len(tr['summaryHtml']) + len(_TEST_LOG_ARTIFACT) > HTML_SUMMARY_MAX 158 or len(tr['summaryHtml']) > HTML_SUMMARY_MAX): 159 b64_summary = base64.b64encode(tr['summaryHtml'].encode()).decode() 160 artifacts.update({'HTML Summary': {'contents': b64_summary}}) 161 tr['summaryHtml'] = _HTML_SUMMARY_ARTIFACT 162 163 if test_log: 164 # Upload the original log without any modifications. 165 b64_log = base64.b64encode(test_log.encode()).decode() 166 artifacts.update({'Test Log': {'contents': b64_log}}) 167 tr['summaryHtml'] += _TEST_LOG_ARTIFACT 168 169 if artifacts: 170 tr['artifacts'] = artifacts 171 if failure_reason: 172 tr['failureReason'] = { 173 'primaryErrorMessage': _TruncateToUTF8Bytes(failure_reason, 1024) 174 } 175 176 if duration is not None: 177 # Duration must be formatted to avoid scientific notation in case 178 # number is too small or too large. Result_db takes seconds, not ms. 179 # Need to use float() otherwise it does substitution first then divides. 180 tr['duration'] = '%.9fs' % float(duration / 1000.0) 181 182 if test_file and str(test_file).startswith('//'): 183 tr['testMetadata']['location'] = { 184 'file_name': test_file, 185 'repo': 'https://chromium.googlesource.com/chromium/src', 186 } 187 188 res = self.session.post(url=self.test_results_url, 189 data=json.dumps({'testResults': [tr]})) 190 res.raise_for_status() 191 192 def ReportInvocationLevelArtifacts(self, artifacts): 193 """Uploads invocation-level artifacts to the ResultSink server. 194 195 This is for artifacts that don't apply to a single test but to the test 196 invocation as a whole (eg: system logs). 197 198 Args: 199 artifacts: A dict of artifacts to attach to the invocation. 200 """ 201 req = {'artifacts': artifacts} 202 res = self.session.post(url=self.report_artifacts_url, data=json.dumps(req)) 203 res.raise_for_status() 204 205 def UpdateInvocation(self, invocation, update_mask): 206 """Update the invocation to the ResultSink server. 207 208 Details can be found in the proto luci.resultsink.v1.UpdateInvocationRequest 209 210 Args: 211 invocation: a dict representation of luci.resultsink.v1.Invocation proto 212 update_mask: a dict representation of google.protobuf.FieldMask proto 213 """ 214 req = { 215 'invocation': invocation, 216 'update_mask': update_mask, 217 } 218 res = self.session.post(url=self.update_invocation_url, 219 data=json.dumps(req)) 220 res.raise_for_status() 221 222 def UpdateInvocationExtendedProperties(self, extended_properties, keys=None): 223 """Update the extended_properties field of an invocation. 224 225 Details can be found in the "extended_properties" field of the proto 226 luci.resultdb.v1.Invocation. 227 228 Args: 229 extended_properties: a dict containing the content of extended_properties. 230 The value in the dict shall be a dict containing a "@type" key 231 representing the data schema, and corresponding data. 232 keys: (Optional) a list of keys in extended_properties to add, replace, 233 or remove. If a key exists in "keys", but not in "extended_properties", 234 this is considered as deleting the key from the resultdb record side 235 If None, the keys in "extended_properties" dict will be used. 236 """ 237 if not keys: 238 keys = extended_properties.keys() 239 mask_paths = ['extended_properties.%s' % key for key in keys] 240 invocation = {'extended_properties': extended_properties} 241 update_mask = {'paths': mask_paths} 242 self.UpdateInvocation(invocation, update_mask) 243 244 245def _TruncateToUTF8Bytes(s, length): 246 """ Truncates a string to a given number of bytes when encoded as UTF-8. 247 248 Ensures the given string does not take more than length bytes when encoded 249 as UTF-8. Adds trailing ellipsis (...) if truncation occurred. A truncated 250 string may end up encoding to a length slightly shorter than length because 251 only whole Unicode codepoints are dropped. 252 253 Args: 254 s: The string to truncate. 255 length: the length (in bytes) to truncate to. 256 """ 257 try: 258 encoded = s.encode('utf-8') 259 # When encode throws UnicodeDecodeError in py2, it usually means the str is 260 # already encoded and has non-ascii chars. So skip re-encoding it. 261 except UnicodeDecodeError: 262 encoded = s 263 if len(encoded) > length: 264 # Truncate, leaving space for trailing ellipsis (...). 265 encoded = encoded[:length - 3] 266 # Truncating the string encoded as UTF-8 may have left the final codepoint 267 # only partially present. Pass 'ignore' to acknowledge and ensure this is 268 # dropped. 269 return encoded.decode('utf-8', 'ignore') + "..." 270 return s 271