1#!/usr/bin/env python3 2# Copyright 2018 The Chromium Authors 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5"""Custom swarming base trigger class. 6 7This base class consolidates custom swarming triggering logic, to allow one bot 8to conceptually span multiple Swarming configurations, while lumping all trigger 9calls under one logical step. It also gives the subclasses the ability to 10define their own logic for pruning the configurations they want to trigger 11jobs on and what configurations to use. 12 13See perf_device_triggerer.py for an example of how to use this base class. 14 15""" 16 17import copy 18import json 19import os 20import subprocess 21import sys 22import tempfile 23import time 24import logging 25import six 26 27SRC_DIR = os.path.dirname( 28 os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) 29 30# .exe on Windows. 31EXECUTABLE_SUFFIX = '.exe' if sys.platform == 'win32' else '' 32 33SWARMING_GO = os.path.join(SRC_DIR, 'tools', 'luci-go', 34 'swarming' + EXECUTABLE_SUFFIX) 35 36_A_WEEK_IN_SECONDS = 60 * 60 * 24 * 7 37 38 39def _convert_to_go_swarming_args(args): 40 go_args = [] 41 i = 0 42 while i < len(args): 43 current_arg = args[i] 44 if current_arg == '--swarming': 45 current_arg = '--server' 46 go_args.append(current_arg) 47 i += 1 48 if current_arg == '--dimension': 49 go_args.append('{}={}'.format(args[i], args[i + 1])) 50 i += 2 51 return go_args 52 53 54def strip_unicode(obj): 55 """Recursively re-encodes strings as utf-8 inside |obj|. Returns the result. 56 """ 57 if isinstance(obj, six.text_type): 58 return obj.encode('utf-8', 'replace') 59 if isinstance(obj, list): 60 return list(map(strip_unicode, obj)) 61 62 if isinstance(obj, dict): 63 new_obj = type(obj)( 64 (strip_unicode(k), strip_unicode(v)) for k, v in obj.items()) 65 return new_obj 66 return obj 67 68 69class BaseTestTriggerer(object): # pylint: disable=useless-object-inheritance 70 def __init__(self): 71 self._bot_configs = None 72 self._bot_statuses = [] 73 self._total_bots = 0 74 75 def modify_args(self, 76 all_args, 77 bot_index, 78 shard_index, 79 total_shards, 80 temp_file, 81 shard_map=None): 82 """Modifies the given argument list. 83 84 Specifically, it does the following: 85 * Adds a --dump_json argument, to read in the results of the 86 individual trigger command. 87 * Adds the dimensions associated with the bot config at the given index. 88 * If the number of shards is greater than one, adds --env 89 arguments to set the GTEST_SHARD_INDEX and GTEST_TOTAL_SHARDS 90 environment variables to _shard_index_ and _total_shards_, 91 respectively. 92 93 The arguments are structured like this: 94 <args to swarming trigger> -- <args to bot running isolate> 95 This means we have to add arguments to specific locations in the argument 96 list, to either affect the trigger command, or what the bot runs. 97 98 """ 99 bot_args = ['--dump-json', temp_file] 100 if total_shards > 1: 101 bot_args.append('--env') 102 bot_args.append('GTEST_SHARD_INDEX=%s' % shard_index) 103 bot_args.append('--env') 104 bot_args.append('GTEST_TOTAL_SHARDS=%s' % total_shards) 105 if self._bot_configs: 106 for key, val in sorted(self._bot_configs[bot_index].items()): 107 bot_args.append('--dimension') 108 bot_args.append(key) 109 bot_args.append(val) 110 if '--' in all_args: 111 dash_ind = all_args.index('--') 112 additional_args = all_args[:dash_ind] + bot_args + all_args[ 113 dash_ind:] 114 else: 115 additional_args = all_args + bot_args 116 additional_args = self.append_additional_args(additional_args, 117 shard_index) 118 # crbug/1140389: debug print outs 119 logging.info('DEBUG: Before adding shardmap args: %s', additional_args) 120 if shard_map: 121 shard_map_str = json.dumps(shard_map, separators=(',', ':')) 122 shard_map_args = ['--use-dynamic-shards'] 123 shard_map_args.append('--dynamic-shardmap=%s' % shard_map_str) 124 additional_args += shard_map_args 125 return additional_args 126 127 def append_additional_args(self, args, shard_index): 128 """ Gives subclasses ability to append additional args if necessary 129 130 Base class just returns given args.""" 131 del shard_index # unused 132 return args 133 134 def parse_bot_configs(self, args): 135 try: 136 self._bot_configs = strip_unicode( 137 json.loads(args.multiple_trigger_configs)) 138 except Exception as e: 139 six.raise_from(ValueError( 140 'Error while parsing JSON from bot config string %s: %s' % 141 (args.multiple_trigger_configs, str(e))), e) 142 # Validate the input. 143 if not isinstance(self._bot_configs, list): 144 raise ValueError('Bot configurations must be a list, were: %s' % 145 args.multiple_trigger_configs) 146 if len(self._bot_configs) < 1: 147 raise ValueError( 148 'Bot configuration list must have at least one entry') 149 if not all(isinstance(entry, dict) for entry in self._bot_configs): 150 raise ValueError('Bot configurations must all be dictionaries') 151 152 def list_bots(self, 153 dimensions, 154 server='chromium-swarm.appspot.com'): 155 """List bots having specified bot dimensions. 156 157 Type of returned value is list of 158 https://source.chromium.org/search?q=%22class%20BotInfo(messages.Message)%22%20f:luci%2Fappengine%2Fswarming&ssfr=1 159 """ 160 161 args = [SWARMING_GO, 'bots', '-server', server] 162 163 for key in sorted(dimensions): 164 args.extend(['-dimension', '%s=%s' % (key, dimensions[key])]) 165 166 logging.info('Running Go `swarming` with args: %s', args) 167 168 with tempfile.NamedTemporaryFile(delete=False) as result_json: 169 result_json.close() 170 args.extend(['--json', result_json.name]) 171 subprocess.check_call(args) 172 with open(result_json.name) as f: 173 return json.load(f) 174 175 def list_tasks(self, tags, limit=None, 176 server='chromium-swarm.appspot.com'): 177 """List bots having specified task tags. 178 179 Type of returned value is list of 180 https://source.chromium.org/search?q=%22class%20TaskResult(messages.Message):%22%20f:luci%2Fappengine%2Fswarming&ssfr=1 181 """ 182 183 args = [SWARMING_GO, 'tasks', '-server', server] 184 185 for tag in sorted(tags): 186 args.extend(['-tag', tag]) 187 188 # If a query uses a general dimension value, e.g., os:Mac, it will take 189 # forever. We now limited the time range to be within a week. 190 start_epoch_time = int(time.time()) - _A_WEEK_IN_SECONDS 191 args.extend(['-start', str(start_epoch_time)]) 192 193 if limit is not None: 194 args.extend(['-limit', str(limit)]) 195 196 logging.info('Running Go `swarming` with args: %s', args) 197 198 with tempfile.NamedTemporaryFile(delete=False) as result_json: 199 result_json.close() 200 args.extend(['-json', result_json.name]) 201 subprocess.check_call(args) 202 with open(result_json.name) as f: 203 return json.load(f) 204 205 def remove_swarming_dimension(self, args, dimension): 206 for i in range(len(args)): 207 if args[i] == '--dimension' and args[i + 1] == dimension: 208 return args[:i] + args[i + 3:] 209 return args 210 211 def make_temp_file(self, prefix=None, suffix=None): 212 # This trick of closing the file handle is needed on Windows in order to 213 # make the file writeable. 214 h, temp_file = tempfile.mkstemp(prefix=prefix, suffix=suffix) 215 os.close(h) 216 return temp_file 217 218 def delete_temp_file(self, temp_file): 219 os.remove(temp_file) 220 221 def read_json_from_temp_file(self, temp_file): 222 with open(temp_file) as f: 223 return json.load(f) 224 225 def read_encoded_json_from_temp_file(self, temp_file): 226 return strip_unicode(self.read_json_from_temp_file(temp_file)) 227 228 def write_json_to_file(self, merged_json, output_file): 229 with open(output_file, 'w') as f: 230 json.dump(merged_json, f) 231 232 def run_swarming_go(self, 233 args, 234 json_path, 235 shard_index, 236 shards, 237 merged_json=None): 238 239 logging.info('Running Go `swarming` with args: %s', args) 240 241 if merged_json is None: 242 merged_json = {} 243 244 if 'tasks' not in merged_json: 245 merged_json['tasks'] = {} 246 247 ret = subprocess.call([SWARMING_GO] + 248 _convert_to_go_swarming_args(args)) 249 result_json = self.read_json_from_temp_file(json_path) 250 251 tasks = {} 252 for task in result_json['tasks']: 253 k = task['request']['task_id'] 254 tasks[k] = task['request'] 255 invocation = task.get('task_result', {}).get('resultdb_info', 256 {}).get('invocation') 257 if invocation: 258 tasks[k]['invocation'] = invocation 259 260 for k, v in tasks.items(): 261 v['shard_index'] = shard_index 262 merged_json['tasks'][k + ':%d:%d' % (shard_index, shards)] = v 263 self.write_json_to_file(merged_json, json_path) 264 return ret 265 266 def prune_test_specific_configs(self, args): 267 # Ability for base class to further prune configs to 268 # run tests on. 269 pass 270 271 def select_config_indices(self, args): 272 # Main implementation for base class to determine which bot config to 273 # trigger for each shard. 274 # 275 # Returns a list of tuples (shard_index, bot_config_index). 276 # bot_config_index is an index into self._bot_configs 277 pass 278 279 def indices_to_trigger(self, args): 280 """Returns the indices of the swarming shards that should be 281 triggered.""" 282 if args.shard_index is None: 283 return list(range(args.shards)) 284 return [args.shard_index] 285 286 def generate_shard_map(self, args, buildername, selected_config): 287 """Returns shard map generated on runtime if needed.""" 288 pass # pylint: disable=unnecessary-pass 289 290 def trigger_tasks(self, args, remaining): 291 """Triggers tasks for each bot. 292 293 Args: 294 args: Parsed arguments which we need to use. 295 remaining: The remainder of the arguments, which should be passed to 296 swarming.py calls. 297 298 Returns: 299 Exit code for the script. 300 """ 301 if args.multiple_dimension_script_verbose: 302 logging.basicConfig(level=logging.DEBUG) 303 304 # crbug/1140389: debug print outs 305 logging.info('DEBUG: init: %s', remaining) 306 307 self.parse_bot_configs(args) 308 # Prunes config list to the exact set of configurations to trigger jobs 309 # on. This logic is specific to the base class if they want to prune 310 # list further. 311 self.prune_test_specific_configs(args) 312 313 # In the remaining arguments, find the Swarming dimensions that are 314 # specified by the bot configs and remove them, because for each shard, 315 # we're going to select one of the bot configs and put all of its 316 # Swarming dimensions on the command line. 317 filtered_remaining_args = copy.deepcopy(remaining) 318 for config in self._bot_configs: 319 for k in config.keys(): 320 filtered_remaining_args = self.remove_swarming_dimension( 321 filtered_remaining_args, k) 322 # crbug/1140389: debug print outs 323 logging.info('DEBUG: After filtered: %s', filtered_remaining_args) 324 325 merged_json = {} 326 #pylint: disable=assignment-from-no-return 327 selected_config = self.select_config_indices(args) 328 shard_map = self.generate_shard_map( 329 args, self._findBuilderName(filtered_remaining_args), 330 selected_config) 331 #pylint: enable=assignment-from-no-return 332 # Choose selected configs for this run of the test suite. 333 for shard_index, bot_index in selected_config: 334 # For each shard that we're going to distribute, do the following: 335 # 1. Pick which bot configuration to use. 336 # 2. Insert that bot configuration's dimensions as command line 337 # arguments, and invoke "swarming.py trigger". 338 # Holds the results of the swarming.py trigger call. 339 try: 340 json_temp = self.make_temp_file( 341 prefix='base_trigger_dimensions', suffix='.json') 342 # crbug/1140389: debug print outs 343 logging.info('DEBUG: Before modify args: %s', 344 filtered_remaining_args) 345 args_to_pass = self.modify_args(filtered_remaining_args, 346 bot_index, shard_index, 347 args.shards, json_temp, 348 shard_map) 349 # crbug/1140389: debug print outs 350 logging.info('DEBUG: Before calling swarming: %s', 351 args_to_pass) 352 ret = self.run_swarming_go(args_to_pass, json_temp, 353 shard_index, args.shards, 354 merged_json) 355 if ret: 356 sys.stderr.write('Failed to trigger a task, aborting\n') 357 return ret 358 finally: 359 self.delete_temp_file(json_temp) 360 self.write_json_to_file(merged_json, args.dump_json) 361 return 0 362 363 # pylint: disable=inconsistent-return-statements 364 def _findBuilderName(self, args): 365 args_length = len(args) 366 for i in range(args_length): 367 if (args[i] == '--tag' and i < args_length - 1 368 and args[i + 1].startswith('buildername:')): 369 return args[i + 1].split(':', 1)[1] 370 # pylint: enable=inconsistent-return-statements 371 372 @staticmethod 373 def setup_parser_contract(parser): 374 parser.add_argument( 375 '--multiple-trigger-configs', 376 type=str, 377 required=False, 378 help='The Swarming configurations to trigger tasks on, ' 379 'in the form of a JSON array of dictionaries (these are' 380 ' Swarming dimension_sets). At least one entry is' 381 'required if you dont override parse_bot_configs') 382 parser.add_argument('--multiple-dimension-script-verbose', 383 type=bool, 384 default=False, 385 help='Turn on verbose logging') 386 parser.add_argument( 387 '--dump-json', 388 required=True, 389 help='(Swarming Trigger Script API) Where to dump the' 390 ' resulting json which indicates which tasks were' 391 ' triggered for which shards.') 392 parser.add_argument( 393 '--shards', 394 type=int, 395 default=1, 396 help='How many shards to trigger. Duplicated from the' 397 ' `swarming.py trigger` command.') 398 parser.add_argument('--shard-index', 399 type=int, 400 default=None, 401 help='Which shard to trigger. Duplicated from the ' 402 '`swarming.py trigger` command.') 403 return parser 404