1// Copyright 2019 The Pigweed Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); you may not 4// use this file except in compliance with the License. You may obtain a copy of 5// the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12// License for the specific language governing permissions and limitations under 13// the License. 14 15package pw_target_runner 16 17import ( 18 "errors" 19 "fmt" 20 "log" 21 "os" 22 "sync" 23 "sync/atomic" 24 "time" 25 26 pb "pigweed/proto/pw_target_runner/target_runner_pb" 27) 28 29// RunRequest represents a client request to run a single executable on-device. 30type RunRequest struct { 31 // Filesystem path to the executable. 32 Path string 33 34 // Channel to which the response is sent back. 35 ResponseChannel chan<- *RunResponse 36 37 // Time when the request was queued. Internal to the worker pool. 38 queueStart time.Time 39} 40 41// RunResponse is the response sent after a run request is processed. 42type RunResponse struct { 43 // Length of time that the run request was queued before being handled 44 // by a worker. Set by the worker pool. 45 QueueTime time.Duration 46 47 // Length of time the runner command took to run the executable. 48 // Set by the worker pool. 49 RunTime time.Duration 50 51 // Raw output of the execution. 52 Output []byte 53 54 // Result of the run. 55 Status pb.RunStatus 56 57 // Error that occurred during the run, if any. If this is not nil, none 58 // of the other fields in this struct are guaranteed to be valid. 59 Err error 60} 61 62// DeviceRunner represents a worker which handles run requests. 63type DeviceRunner interface { 64 // WorkerStart is the lifecycle hook called when the worker routine is 65 // started. Any resources required by the worker should be initialized 66 // here. 67 WorkerStart() error 68 69 // HandleRunRequest is the method called when an executable is scheduled 70 // to run on the worker by the worker pool. It processes the request, 71 // runs the executable, and returns an appropriate response. 72 HandleRunRequest(*RunRequest) *RunResponse 73 74 // WorkerExit is the lifecycle hook called before the worker exits. 75 // Should be used to clean up any resources used by the worker. 76 WorkerExit() 77} 78 79// WorkerPool represents a collection of device runners which run on-device 80// binaries. The worker pool distributes requests to run binaries among its 81// available workers. 82type WorkerPool struct { 83 activeWorkers uint32 84 logger *log.Logger 85 workers []DeviceRunner 86 waitGroup sync.WaitGroup 87 reqChannel chan *RunRequest 88 quitChannel chan bool 89} 90 91var ( 92 errWorkerPoolActive = errors.New("Worker pool is running") 93 errNoRegisteredWorkers = errors.New("No workers registered in pool") 94) 95 96// newWorkerPool creates an empty worker pool. 97func newWorkerPool(name string) *WorkerPool { 98 logPrefix := fmt.Sprintf("[%s] ", name) 99 return &WorkerPool{ 100 logger: log.New(os.Stdout, logPrefix, log.LstdFlags), 101 workers: make([]DeviceRunner, 0), 102 reqChannel: make(chan *RunRequest, 1024), 103 quitChannel: make(chan bool, 64), 104 } 105} 106 107// RegisterWorker adds a new worker to the pool. This cannot be done when the 108// pool is processing requests; Stop() must be called first. 109func (p *WorkerPool) RegisterWorker(worker DeviceRunner) error { 110 if p.Active() { 111 return errWorkerPoolActive 112 } 113 p.workers = append(p.workers, worker) 114 return nil 115} 116 117// Start launches all registered workers in the pool. 118func (p *WorkerPool) Start() error { 119 if p.Active() { 120 return errWorkerPoolActive 121 } 122 123 p.logger.Printf("Starting %d workers\n", len(p.workers)) 124 for _, worker := range p.workers { 125 p.waitGroup.Add(1) 126 atomic.AddUint32(&p.activeWorkers, 1) 127 go p.runWorker(worker) 128 } 129 130 return nil 131} 132 133// Stop terminates all running workers in the pool. The work queue is not 134// cleared; queued requests persist and can be processed by calling Start() 135// again. 136func (p *WorkerPool) Stop() { 137 if !p.Active() { 138 return 139 } 140 141 // Send N quit commands to the workers and wait for them to exit. 142 for i := uint32(0); i < p.activeWorkers; i++ { 143 p.quitChannel <- true 144 } 145 p.waitGroup.Wait() 146 147 p.logger.Println("All workers in pool stopped") 148} 149 150// Active returns true if any worker routines are currently running. 151func (p *WorkerPool) Active() bool { 152 return p.activeWorkers > 0 153} 154 155// QueueExecutable adds an executable to the worker pool's queue. If no workers 156// are registered in the pool, this operation fails and an immediate response is 157// sent back to the requester indicating the error. 158func (p *WorkerPool) QueueExecutable(req *RunRequest) { 159 if len(p.workers) == 0 { 160 p.logger.Printf("Attempt to queue executable %s with no active workers", req.Path) 161 req.ResponseChannel <- &RunResponse{ 162 Err: errNoRegisteredWorkers, 163 } 164 return 165 } 166 167 p.logger.Printf("Queueing executable %s\n", req.Path) 168 169 // Start tracking how long the request is queued. 170 req.queueStart = time.Now() 171 p.reqChannel <- req 172} 173 174// runWorker is a function run by the worker pool in a separate goroutine for 175// each of its registered workers. The function is responsible for calling the 176// appropriate worker lifecycle hooks and processing requests as they come in 177// through the worker pool's queue. 178func (p *WorkerPool) runWorker(worker DeviceRunner) { 179 defer func() { 180 atomic.AddUint32(&p.activeWorkers, ^uint32(0)) 181 p.waitGroup.Done() 182 }() 183 184 if err := worker.WorkerStart(); err != nil { 185 return 186 } 187 188processLoop: 189 for { 190 // Force the quit channel to be processed before the request 191 // channel by using a select statement with an empty default 192 // case to make the read non-blocking. If the quit channel is 193 // empty, the code will fall through to the main select below. 194 select { 195 case q, ok := <-p.quitChannel: 196 if q || !ok { 197 break processLoop 198 } 199 default: 200 } 201 202 select { 203 case q, ok := <-p.quitChannel: 204 if q || !ok { 205 break processLoop 206 } 207 case req, ok := <-p.reqChannel: 208 if !ok { 209 continue 210 } 211 212 queueTime := time.Since(req.queueStart) 213 214 runStart := time.Now() 215 res := worker.HandleRunRequest(req) 216 res.RunTime = time.Since(runStart) 217 218 res.QueueTime = queueTime 219 req.ResponseChannel <- res 220 } 221 } 222 223 worker.WorkerExit() 224} 225