1// Copyright 2019 The Pigweed Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); you may not 4// use this file except in compliance with the License. You may obtain a copy of 5// the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12// License for the specific language governing permissions and limitations under 13// the License. 14 15// Package pw_target_runner implements a target runner gRPC server which queues 16// and distributes executables among a group of worker routines. 17package pw_target_runner 18 19import ( 20 "context" 21 "errors" 22 "fmt" 23 "log" 24 "net" 25 "os" 26 "time" 27 28 "google.golang.org/grpc" 29 "google.golang.org/grpc/codes" 30 "google.golang.org/grpc/status" 31 32 pb "pigweed/proto/pw_target_runner/target_runner_pb" 33) 34 35var ( 36 errServerNotBound = errors.New("Server not bound to a port") 37 errServerNotRunning = errors.New("Server is not running") 38) 39 40// Server is a gRPC server that runs a TargetRunner service. 41type Server struct { 42 grpcServer *grpc.Server 43 listener net.Listener 44 tasksPassed uint32 45 tasksFailed uint32 46 startTime time.Time 47 active bool 48 workerPool *WorkerPool 49} 50 51// NewServer creates a gRPC server with a registered TargetRunner service. 52func NewServer() *Server { 53 s := &Server{ 54 grpcServer: grpc.NewServer(grpc.MaxRecvMsgSize(20 * 1024 * 1024)), 55 workerPool: newWorkerPool("ServerWorkerPool"), 56 } 57 58 pb.RegisterTargetRunnerServer(s.grpcServer, &pwTargetRunnerService{s}) 59 60 return s 61} 62 63// Bind starts a TCP listener on a specified port. 64func (s *Server) Bind(port int) error { 65 lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) 66 if err != nil { 67 return err 68 } 69 s.listener = lis 70 return nil 71} 72 73// RegisterWorker adds a worker to the server's worker pool. 74func (s *Server) RegisterWorker(worker DeviceRunner) { 75 s.workerPool.RegisterWorker(worker) 76} 77 78// RunBinary runs an executable through a worker in the server, returning 79// the worker's response. The function blocks until the executable has been 80// processed. 81func (s *Server) RunBinary(path string) (*RunResponse, error) { 82 if !s.active { 83 return nil, errServerNotRunning 84 } 85 86 resChan := make(chan *RunResponse, 1) 87 defer close(resChan) 88 89 s.workerPool.QueueExecutable(&RunRequest{ 90 Path: path, 91 ResponseChannel: resChan, 92 }) 93 94 res := <-resChan 95 96 if res.Err != nil { 97 return nil, res.Err 98 } 99 100 if res.Status == pb.RunStatus_SUCCESS { 101 s.tasksPassed++ 102 } else { 103 s.tasksFailed++ 104 } 105 106 return res, nil 107} 108 109// Serve starts the gRPC server on its configured port. Bind must have been 110// called before this; an error is returned if it is not. This function blocks 111// until the server is terminated. 112func (s *Server) Serve() error { 113 if s.listener == nil { 114 return errServerNotBound 115 } 116 117 log.Printf("Starting gRPC server on %v\n", s.listener.Addr()) 118 119 s.startTime = time.Now() 120 s.active = true 121 s.workerPool.Start() 122 123 return s.grpcServer.Serve(s.listener) 124} 125 126// pwTargetRunnerService implements the pw.target_runner.TargetRunner gRPC 127// service. 128type pwTargetRunnerService struct { 129 server *Server 130} 131 132// RunBinary runs a single executable on-device and returns its result. 133func (s *pwTargetRunnerService) RunBinary( 134 ctx context.Context, 135 desc *pb.RunBinaryRequest, 136) (*pb.RunBinaryResponse, error) { 137 var path string 138 139 switch bin := desc.Binary.(type) { 140 case *pb.RunBinaryRequest_FilePath: 141 path = bin.FilePath 142 break 143 case *pb.RunBinaryRequest_TestBinary: 144 f, err := os.CreateTemp("", "pw_target_runner_") 145 if err != nil { 146 return nil, status.Errorf(codes.Internal, "Internal server error: %v", err) 147 } 148 149 defer os.Remove(f.Name()) 150 151 _, err = f.Write(bin.TestBinary) 152 if err != nil { 153 return nil, status.Errorf(codes.Internal, "Internal server error: %v", err) 154 } 155 156 err = os.Chmod(f.Name(), 0755) 157 if err != nil { 158 return nil, status.Errorf(codes.Internal, "Internal server error: %v", err) 159 } 160 161 path = f.Name() 162 break 163 default: 164 return nil, status.Error(codes.InvalidArgument, "No test path or binary provided") 165 } 166 167 runRes, err := s.server.RunBinary(path) 168 if err != nil { 169 return nil, status.Errorf(codes.Internal, "Internal server error: %v", err) 170 } 171 172 res := &pb.RunBinaryResponse{ 173 Result: runRes.Status, 174 QueueTimeNs: uint64(runRes.QueueTime), 175 RunTimeNs: uint64(runRes.RunTime), 176 Output: runRes.Output, 177 } 178 return res, nil 179} 180 181// Status returns information about the server. 182func (s *pwTargetRunnerService) Status( 183 ctx context.Context, 184 _ *pb.Empty, 185) (*pb.ServerStatus, error) { 186 resp := &pb.ServerStatus{ 187 UptimeNs: uint64(time.Since(s.server.startTime)), 188 TasksPassed: s.server.tasksPassed, 189 TasksFailed: s.server.tasksFailed, 190 } 191 192 return resp, nil 193} 194