1*760c253cSXin Li# Copyright 2013 The ChromiumOS Authors 2*760c253cSXin Li# Use of this source code is governed by a BSD-style license that can be 3*760c253cSXin Li# found in the LICENSE file. 4*760c253cSXin Li"""Pipeline process that encapsulates the actual content. 5*760c253cSXin Li 6*760c253cSXin LiPart of the Chrome build flags optimization. 7*760c253cSXin Li 8*760c253cSXin LiThe actual stages include the builder and the executor. 9*760c253cSXin Li""" 10*760c253cSXin Li 11*760c253cSXin Li__author__ = "[email protected] (Yuheng Long)" 12*760c253cSXin Li 13*760c253cSXin Liimport multiprocessing 14*760c253cSXin Li 15*760c253cSXin Li 16*760c253cSXin Li# Pick an integer at random. 17*760c253cSXin LiPOISONPILL = 975 18*760c253cSXin Li 19*760c253cSXin Li 20*760c253cSXin Liclass PipelineProcess(multiprocessing.Process): 21*760c253cSXin Li """A process that encapsulates the actual content pipeline stage. 22*760c253cSXin Li 23*760c253cSXin Li The actual pipeline stage can be the builder or the tester. This process 24*760c253cSXin Li continuously pull tasks from the queue until a poison pill is received. 25*760c253cSXin Li Once a job is received, it will hand it to the actual stage for processing. 26*760c253cSXin Li 27*760c253cSXin Li Each pipeline stage contains three modules. 28*760c253cSXin Li The first module continuously pulls task from the input queue. It searches the 29*760c253cSXin Li cache to check whether the task has encountered before. If so, duplicate 30*760c253cSXin Li computation can be avoided. 31*760c253cSXin Li The second module consists of a pool of workers that do the actual work, e.g., 32*760c253cSXin Li the worker will compile the source code and get the image in the builder 33*760c253cSXin Li pipeline stage. 34*760c253cSXin Li The third module is a helper that put the result cost to the cost field of the 35*760c253cSXin Li duplicate tasks. For example, if two tasks are equivalent, only one task, say 36*760c253cSXin Li t1 will be executed and the other task, say t2 will not be executed. The third 37*760c253cSXin Li mode gets the result from t1, when it is available and set the cost of t2 to 38*760c253cSXin Li be the same as that of t1. 39*760c253cSXin Li """ 40*760c253cSXin Li 41*760c253cSXin Li def __init__( 42*760c253cSXin Li self, 43*760c253cSXin Li num_processes, 44*760c253cSXin Li name, 45*760c253cSXin Li cache, 46*760c253cSXin Li stage, 47*760c253cSXin Li task_queue, 48*760c253cSXin Li helper, 49*760c253cSXin Li worker, 50*760c253cSXin Li result_queue, 51*760c253cSXin Li ): 52*760c253cSXin Li """Set up input/output queue and the actual method to be called. 53*760c253cSXin Li 54*760c253cSXin Li Args: 55*760c253cSXin Li num_processes: Number of helpers subprocessors this stage has. 56*760c253cSXin Li name: The name of this stage. 57*760c253cSXin Li cache: The computed tasks encountered before. 58*760c253cSXin Li stage: An int value that specifies the stage for this pipeline stage, for 59*760c253cSXin Li example, build stage or test stage. This value will be used to retrieve 60*760c253cSXin Li the keys in different stage. I.e., the flags set is the key in build 61*760c253cSXin Li stage and the checksum is the key in the test stage. The key is used to 62*760c253cSXin Li detect duplicates. 63*760c253cSXin Li task_queue: The input task queue for this pipeline stage. 64*760c253cSXin Li helper: The method hosted by the helper module to fill up the cost of the 65*760c253cSXin Li duplicate tasks. 66*760c253cSXin Li worker: The method hosted by the worker pools to do the actual work, e.g., 67*760c253cSXin Li compile the image. 68*760c253cSXin Li result_queue: The output task queue for this pipeline stage. 69*760c253cSXin Li """ 70*760c253cSXin Li 71*760c253cSXin Li multiprocessing.Process.__init__(self) 72*760c253cSXin Li 73*760c253cSXin Li self._name = name 74*760c253cSXin Li self._task_queue = task_queue 75*760c253cSXin Li self._result_queue = result_queue 76*760c253cSXin Li 77*760c253cSXin Li self._helper = helper 78*760c253cSXin Li self._worker = worker 79*760c253cSXin Li 80*760c253cSXin Li self._cache = cache 81*760c253cSXin Li self._stage = stage 82*760c253cSXin Li self._num_processes = num_processes 83*760c253cSXin Li 84*760c253cSXin Li # the queues used by the modules for communication 85*760c253cSXin Li manager = multiprocessing.Manager() 86*760c253cSXin Li self._helper_queue = manager.Queue() 87*760c253cSXin Li self._work_queue = manager.Queue() 88*760c253cSXin Li 89*760c253cSXin Li def run(self): 90*760c253cSXin Li """Busy pulling the next task from the queue for execution. 91*760c253cSXin Li 92*760c253cSXin Li Once a job is pulled, this stage invokes the actual stage method and submits 93*760c253cSXin Li the result to the next pipeline stage. 94*760c253cSXin Li 95*760c253cSXin Li The process will terminate on receiving the poison pill from previous stage. 96*760c253cSXin Li """ 97*760c253cSXin Li 98*760c253cSXin Li # the worker pool 99*760c253cSXin Li work_pool = multiprocessing.Pool(self._num_processes) 100*760c253cSXin Li 101*760c253cSXin Li # the helper process 102*760c253cSXin Li helper_process = multiprocessing.Process( 103*760c253cSXin Li target=self._helper, 104*760c253cSXin Li args=( 105*760c253cSXin Li self._stage, 106*760c253cSXin Li self._cache, 107*760c253cSXin Li self._helper_queue, 108*760c253cSXin Li self._work_queue, 109*760c253cSXin Li self._result_queue, 110*760c253cSXin Li ), 111*760c253cSXin Li ) 112*760c253cSXin Li helper_process.start() 113*760c253cSXin Li mycache = self._cache.keys() 114*760c253cSXin Li 115*760c253cSXin Li while True: 116*760c253cSXin Li task = self._task_queue.get() 117*760c253cSXin Li if task == POISONPILL: 118*760c253cSXin Li # Poison pill means shutdown 119*760c253cSXin Li self._result_queue.put(POISONPILL) 120*760c253cSXin Li break 121*760c253cSXin Li 122*760c253cSXin Li task_key = task.GetIdentifier(self._stage) 123*760c253cSXin Li if task_key in mycache: 124*760c253cSXin Li # The task has been encountered before. It will be sent to the helper 125*760c253cSXin Li # module for further processing. 126*760c253cSXin Li self._helper_queue.put(task) 127*760c253cSXin Li else: 128*760c253cSXin Li # Let the workers do the actual work. 129*760c253cSXin Li work_pool.apply_async( 130*760c253cSXin Li self._worker, 131*760c253cSXin Li args=( 132*760c253cSXin Li self._stage, 133*760c253cSXin Li task, 134*760c253cSXin Li self._work_queue, 135*760c253cSXin Li self._result_queue, 136*760c253cSXin Li ), 137*760c253cSXin Li ) 138*760c253cSXin Li mycache.append(task_key) 139*760c253cSXin Li 140*760c253cSXin Li # Shutdown the workers pool and the helper process. 141*760c253cSXin Li work_pool.close() 142*760c253cSXin Li work_pool.join() 143*760c253cSXin Li 144*760c253cSXin Li self._helper_queue.put(POISONPILL) 145*760c253cSXin Li helper_process.join() 146