xref: /aosp_15_r20/external/pigweed/pw_target_runner/go/server.go (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1// Copyright 2019 The Pigweed Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may not
4// use this file except in compliance with the License. You may obtain a copy of
5// the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12// License for the specific language governing permissions and limitations under
13// the License.
14
15// Package pw_target_runner implements a target runner gRPC server which queues
16// and distributes executables among a group of worker routines.
17package pw_target_runner
18
19import (
20	"context"
21	"errors"
22	"fmt"
23	"log"
24	"net"
25	"os"
26	"time"
27
28	"google.golang.org/grpc"
29	"google.golang.org/grpc/codes"
30	"google.golang.org/grpc/status"
31
32	pb "pigweed/proto/pw_target_runner/target_runner_pb"
33)
34
35var (
36	errServerNotBound   = errors.New("Server not bound to a port")
37	errServerNotRunning = errors.New("Server is not running")
38)
39
40// Server is a gRPC server that runs a TargetRunner service.
41type Server struct {
42	grpcServer  *grpc.Server
43	listener    net.Listener
44	tasksPassed uint32
45	tasksFailed uint32
46	startTime   time.Time
47	active      bool
48	workerPool  *WorkerPool
49}
50
51// NewServer creates a gRPC server with a registered TargetRunner service.
52func NewServer() *Server {
53	s := &Server{
54		grpcServer: grpc.NewServer(grpc.MaxRecvMsgSize(20 * 1024 * 1024)),
55		workerPool: newWorkerPool("ServerWorkerPool"),
56	}
57
58	pb.RegisterTargetRunnerServer(s.grpcServer, &pwTargetRunnerService{s})
59
60	return s
61}
62
63// Bind starts a TCP listener on a specified port.
64func (s *Server) Bind(port int) error {
65	lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
66	if err != nil {
67		return err
68	}
69	s.listener = lis
70	return nil
71}
72
73// RegisterWorker adds a worker to the server's worker pool.
74func (s *Server) RegisterWorker(worker DeviceRunner) {
75	s.workerPool.RegisterWorker(worker)
76}
77
78// RunBinary runs an executable through a worker in the server, returning
79// the worker's response. The function blocks until the executable has been
80// processed.
81func (s *Server) RunBinary(path string) (*RunResponse, error) {
82	if !s.active {
83		return nil, errServerNotRunning
84	}
85
86	resChan := make(chan *RunResponse, 1)
87	defer close(resChan)
88
89	s.workerPool.QueueExecutable(&RunRequest{
90		Path:            path,
91		ResponseChannel: resChan,
92	})
93
94	res := <-resChan
95
96	if res.Err != nil {
97		return nil, res.Err
98	}
99
100	if res.Status == pb.RunStatus_SUCCESS {
101		s.tasksPassed++
102	} else {
103		s.tasksFailed++
104	}
105
106	return res, nil
107}
108
109// Serve starts the gRPC server on its configured port. Bind must have been
110// called before this; an error is returned if it is not. This function blocks
111// until the server is terminated.
112func (s *Server) Serve() error {
113	if s.listener == nil {
114		return errServerNotBound
115	}
116
117	log.Printf("Starting gRPC server on %v\n", s.listener.Addr())
118
119	s.startTime = time.Now()
120	s.active = true
121	s.workerPool.Start()
122
123	return s.grpcServer.Serve(s.listener)
124}
125
126// pwTargetRunnerService implements the pw.target_runner.TargetRunner gRPC
127// service.
128type pwTargetRunnerService struct {
129	server *Server
130}
131
132// RunBinary runs a single executable on-device and returns its result.
133func (s *pwTargetRunnerService) RunBinary(
134	ctx context.Context,
135	desc *pb.RunBinaryRequest,
136) (*pb.RunBinaryResponse, error) {
137	var path string
138
139	switch bin := desc.Binary.(type) {
140	case *pb.RunBinaryRequest_FilePath:
141		path = bin.FilePath
142		break
143	case *pb.RunBinaryRequest_TestBinary:
144		f, err := os.CreateTemp("", "pw_target_runner_")
145		if err != nil {
146			return nil, status.Errorf(codes.Internal, "Internal server error: %v", err)
147		}
148
149		defer os.Remove(f.Name())
150
151		_, err = f.Write(bin.TestBinary)
152		if err != nil {
153			return nil, status.Errorf(codes.Internal, "Internal server error: %v", err)
154		}
155
156		err = os.Chmod(f.Name(), 0755)
157		if err != nil {
158			return nil, status.Errorf(codes.Internal, "Internal server error: %v", err)
159		}
160
161		path = f.Name()
162		break
163	default:
164		return nil, status.Error(codes.InvalidArgument, "No test path or binary provided")
165	}
166
167	runRes, err := s.server.RunBinary(path)
168	if err != nil {
169		return nil, status.Errorf(codes.Internal, "Internal server error: %v", err)
170	}
171
172	res := &pb.RunBinaryResponse{
173		Result:      runRes.Status,
174		QueueTimeNs: uint64(runRes.QueueTime),
175		RunTimeNs:   uint64(runRes.RunTime),
176		Output:      runRes.Output,
177	}
178	return res, nil
179}
180
181// Status returns information about the server.
182func (s *pwTargetRunnerService) Status(
183	ctx context.Context,
184	_ *pb.Empty,
185) (*pb.ServerStatus, error) {
186	resp := &pb.ServerStatus{
187		UptimeNs:    uint64(time.Since(s.server.startTime)),
188		TasksPassed: s.server.tasksPassed,
189		TasksFailed: s.server.tasksFailed,
190	}
191
192	return resp, nil
193}
194