1*760c253cSXin Li# Copyright 2013 The ChromiumOS Authors 2*760c253cSXin Li# Use of this source code is governed by a BSD-style license that can be 3*760c253cSXin Li# found in the LICENSE file. 4*760c253cSXin Li"""The pipeline_worker functions of the build and test stage of the framework. 5*760c253cSXin Li 6*760c253cSXin LiPart of the Chrome build flags optimization. 7*760c253cSXin Li 8*760c253cSXin LiThis module defines the helper and the worker. If there are duplicate tasks, for 9*760c253cSXin Liexample t1 and t2, needs to be built/tested, one of them, for example t1, will 10*760c253cSXin Libe built/tested and the helper waits for the result of t1 and set the results of 11*760c253cSXin Lithe other task, t2 here, to be the same as that of t1. Setting the result of t2 12*760c253cSXin Lito be the same as t1 is referred to as resolving the result of t2. 13*760c253cSXin LiThe worker invokes the work method of the tasks that are not duplicate. 14*760c253cSXin Li""" 15*760c253cSXin Li 16*760c253cSXin Li__author__ = "[email protected] (Yuheng Long)" 17*760c253cSXin Li 18*760c253cSXin Liimport pipeline_process 19*760c253cSXin Li 20*760c253cSXin Li 21*760c253cSXin Lidef Helper(stage, done_dict, helper_queue, completed_queue, result_queue): 22*760c253cSXin Li """Helper that filters duplicate tasks. 23*760c253cSXin Li 24*760c253cSXin Li This method Continuously pulls duplicate tasks from the helper_queue. The 25*760c253cSXin Li duplicate tasks need not be compiled/tested. This method also pulls completed 26*760c253cSXin Li tasks from the worker queue and let the results of the duplicate tasks be the 27*760c253cSXin Li same as their corresponding finished task. 28*760c253cSXin Li 29*760c253cSXin Li Args: 30*760c253cSXin Li stage: The current stage of the pipeline, for example, build stage or test 31*760c253cSXin Li stage. 32*760c253cSXin Li done_dict: A dictionary of tasks that are done. The key of the dictionary is 33*760c253cSXin Li the identifier of the task. The value of the dictionary is the results of 34*760c253cSXin Li performing the corresponding task. 35*760c253cSXin Li helper_queue: A queue of duplicate tasks whose results need to be resolved. 36*760c253cSXin Li This is a communication channel between the pipeline_process and this 37*760c253cSXin Li helper process. 38*760c253cSXin Li completed_queue: A queue of tasks that have been built/tested. The results 39*760c253cSXin Li of these tasks are needed to resolve the results of the duplicate tasks. 40*760c253cSXin Li This is the communication channel between the workers and this helper 41*760c253cSXin Li process. 42*760c253cSXin Li result_queue: After the results of the duplicate tasks have been resolved, 43*760c253cSXin Li the duplicate tasks will be sent to the next stage via this queue. 44*760c253cSXin Li """ 45*760c253cSXin Li 46*760c253cSXin Li # The list of duplicate tasks, the results of which need to be resolved. 47*760c253cSXin Li waiting_list = [] 48*760c253cSXin Li 49*760c253cSXin Li while True: 50*760c253cSXin Li # Pull duplicate task from the helper queue. 51*760c253cSXin Li if not helper_queue.empty(): 52*760c253cSXin Li task = helper_queue.get() 53*760c253cSXin Li 54*760c253cSXin Li if task == pipeline_process.POISONPILL: 55*760c253cSXin Li # Poison pill means no more duplicate task from the helper queue. 56*760c253cSXin Li break 57*760c253cSXin Li 58*760c253cSXin Li # The task has not been performed before. 59*760c253cSXin Li assert not task.Done(stage) 60*760c253cSXin Li 61*760c253cSXin Li # The identifier of this task. 62*760c253cSXin Li identifier = task.GetIdentifier(stage) 63*760c253cSXin Li 64*760c253cSXin Li # If a duplicate task comes before the corresponding resolved results from 65*760c253cSXin Li # the completed_queue, it will be put in the waiting list. If the result 66*760c253cSXin Li # arrives before the duplicate task, the duplicate task will be resolved 67*760c253cSXin Li # right away. 68*760c253cSXin Li if identifier in done_dict: 69*760c253cSXin Li # This task has been encountered before and the result is available. The 70*760c253cSXin Li # result can be resolved right away. 71*760c253cSXin Li task.SetResult(stage, done_dict[identifier]) 72*760c253cSXin Li result_queue.put(task) 73*760c253cSXin Li else: 74*760c253cSXin Li waiting_list.append(task) 75*760c253cSXin Li 76*760c253cSXin Li # Check and get completed tasks from completed_queue. 77*760c253cSXin Li GetResultFromCompletedQueue( 78*760c253cSXin Li stage, completed_queue, done_dict, waiting_list, result_queue 79*760c253cSXin Li ) 80*760c253cSXin Li 81*760c253cSXin Li # Wait to resolve the results of the remaining duplicate tasks. 82*760c253cSXin Li while waiting_list: 83*760c253cSXin Li GetResultFromCompletedQueue( 84*760c253cSXin Li stage, completed_queue, done_dict, waiting_list, result_queue 85*760c253cSXin Li ) 86*760c253cSXin Li 87*760c253cSXin Li 88*760c253cSXin Lidef GetResultFromCompletedQueue( 89*760c253cSXin Li stage, completed_queue, done_dict, waiting_list, result_queue 90*760c253cSXin Li): 91*760c253cSXin Li """Pull results from the completed queue and resolves duplicate tasks. 92*760c253cSXin Li 93*760c253cSXin Li Args: 94*760c253cSXin Li stage: The current stage of the pipeline, for example, build stage or test 95*760c253cSXin Li stage. 96*760c253cSXin Li completed_queue: A queue of tasks that have been performed. The results of 97*760c253cSXin Li these tasks are needed to resolve the results of the duplicate tasks. This 98*760c253cSXin Li is the communication channel between the workers and this method. 99*760c253cSXin Li done_dict: A dictionary of tasks that are done. The key of the dictionary is 100*760c253cSXin Li the optimization flags of the task. The value of the dictionary is the 101*760c253cSXin Li compilation results of the corresponding task. 102*760c253cSXin Li waiting_list: The list of duplicate tasks, the results of which need to be 103*760c253cSXin Li resolved. 104*760c253cSXin Li result_queue: After the results of the duplicate tasks have been resolved, 105*760c253cSXin Li the duplicate tasks will be sent to the next stage via this queue. 106*760c253cSXin Li 107*760c253cSXin Li This helper method tries to pull a completed task from the completed queue. 108*760c253cSXin Li If it gets a task from the queue, it resolves the results of all the relevant 109*760c253cSXin Li duplicate tasks in the waiting list. Relevant tasks are the tasks that have 110*760c253cSXin Li the same flags as the currently received results from the completed_queue. 111*760c253cSXin Li """ 112*760c253cSXin Li # Pull completed task from the worker queue. 113*760c253cSXin Li if not completed_queue.empty(): 114*760c253cSXin Li (identifier, result) = completed_queue.get() 115*760c253cSXin Li done_dict[identifier] = result 116*760c253cSXin Li 117*760c253cSXin Li tasks = [ 118*760c253cSXin Li t for t in waiting_list if t.GetIdentifier(stage) == identifier 119*760c253cSXin Li ] 120*760c253cSXin Li for duplicate_task in tasks: 121*760c253cSXin Li duplicate_task.SetResult(stage, result) 122*760c253cSXin Li result_queue.put(duplicate_task) 123*760c253cSXin Li waiting_list.remove(duplicate_task) 124*760c253cSXin Li 125*760c253cSXin Li 126*760c253cSXin Lidef Worker(stage, task, helper_queue, result_queue): 127*760c253cSXin Li """Worker that performs the task. 128*760c253cSXin Li 129*760c253cSXin Li This method calls the work method of the input task and distribute the result 130*760c253cSXin Li to the helper and the next stage. 131*760c253cSXin Li 132*760c253cSXin Li Args: 133*760c253cSXin Li stage: The current stage of the pipeline, for example, build stage or test 134*760c253cSXin Li stage. 135*760c253cSXin Li task: Input task that needs to be performed. 136*760c253cSXin Li helper_queue: Queue that holds the completed tasks and the results. This is 137*760c253cSXin Li the communication channel between the worker and the helper. 138*760c253cSXin Li result_queue: Queue that holds the completed tasks and the results. This is 139*760c253cSXin Li the communication channel between the worker and the next stage. 140*760c253cSXin Li """ 141*760c253cSXin Li 142*760c253cSXin Li # The task has not been completed before. 143*760c253cSXin Li assert not task.Done(stage) 144*760c253cSXin Li 145*760c253cSXin Li task.Work(stage) 146*760c253cSXin Li helper_queue.put((task.GetIdentifier(stage), task.GetResult(stage))) 147*760c253cSXin Li result_queue.put(task) 148