xref: /aosp_15_r20/external/pigweed/pw_grpc/integration_test.go (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1// Copyright 2024 The Pigweed Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may not
4// use this file except in compliance with the License. You may obtain a copy of
5// the License at
6//
7//	https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12// License for the specific language governing permissions and limitations under
13// the License.
14//
15// Package integration_test implements a client to exercise the pw_grpc server implementation
16package integration_test
17
18import (
19	"bufio"
20	"context"
21	"fmt"
22	"hash/crc32"
23	"io"
24	"os/exec"
25	"strconv"
26	"strings"
27	"testing"
28	"time"
29
30	"google.golang.org/grpc"
31	"google.golang.org/grpc/codes"
32	"google.golang.org/grpc/credentials/insecure"
33	pb "google.golang.org/grpc/examples/features/proto/echo"
34	"google.golang.org/grpc/status"
35)
36
37const port = "3402"
38
39func TestUnaryEcho(t *testing.T) {
40	const num_connections = 1
41	cmd, reader, err := launchServer(t, num_connections)
42	if err != nil {
43		t.Errorf("Failed to launch %v", err)
44	}
45	defer cmd.Wait()
46
47	conn, echo_client, err := connectServer()
48	if err != nil {
49		t.Errorf("Failed to connect %v", err)
50	}
51	defer conn.Close()
52	go logServer(t, reader)
53
54	testRPC(t, func(t *testing.T, ctx context.Context, msg string) {
55		t.Logf("call UnaryEcho(%v)", msg)
56		resp, err := echo_client.UnaryEcho(ctx, &pb.EchoRequest{Message: msg})
57		if err != nil {
58			t.Logf("... failed with error: %v", err.Error())
59			if msg != "quiet" || status.Convert(err).Code() != codes.Canceled {
60				t.Errorf("Error unexpected %v", err)
61			}
62		} else {
63			t.Logf("... Recv %v", resp)
64			if resp.Message != msg {
65				t.Errorf("Unexpected response %v", resp)
66			}
67		}
68	})
69}
70
71func TestFragmentedMessage(t *testing.T) {
72	// Test sending successively larger messages, larger than the maximum
73	// HTTP2 data frame size (16384), ensuring messages are fragmented across
74	// frames.
75	const num_connections = 1
76	cmd, reader, err := launchServer(t, num_connections)
77	if err != nil {
78		t.Errorf("Failed to launch %v", err)
79	}
80	defer cmd.Wait()
81
82	conn, echo_client, err := connectServer()
83	if err != nil {
84		t.Errorf("Failed to connect %v", err)
85	}
86	defer conn.Close()
87	go logServer(t, reader)
88
89	const num_calls = 4
90	for i := 0; i < num_calls; i++ {
91		t.Run(fmt.Sprintf("%d of %d", i+1, num_calls), func(t *testing.T) {
92			ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
93			defer cancel()
94
95			msg := "crc32:" + strings.Repeat("testmessage!", 1500*(i+1))
96			checksum := strconv.FormatUint(uint64(crc32.ChecksumIEEE([]byte(msg))), 10)
97
98			done := make(chan struct{})
99			go func() {
100				t.Logf("call UnaryChecksum")
101				resp, err := echo_client.UnaryEcho(ctx, &pb.EchoRequest{Message: msg})
102				if err != nil {
103					t.Logf("... failed with error: %v", err.Error())
104					if msg != "quiet" || status.Convert(err).Code() != codes.Canceled {
105						t.Errorf("Error unexpected %v", err)
106					}
107				} else {
108					t.Logf("... Recv %v", resp)
109					if resp.Message != checksum {
110						t.Errorf("Unexpected response %v", resp)
111					}
112				}
113				close(done)
114			}()
115			<-done
116		})
117	}
118}
119
120func TestMultipleConnections(t *testing.T) {
121	const num_connections = 3
122	cmd, reader, err := launchServer(t, num_connections)
123	if err != nil {
124		t.Errorf("Failed to launch %v", err)
125	}
126	defer cmd.Wait()
127
128	go logServer(t, reader)
129
130	for i := 0; i < num_connections; i++ {
131		t.Run(fmt.Sprintf("connection %d of %d", i+1, num_connections), func(t *testing.T) {
132			conn, echo_client, err := connectServer()
133			if err != nil {
134				t.Errorf("Failed to connect %v", err)
135			}
136
137			ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
138			defer cancel()
139
140			resp, err := echo_client.UnaryEcho(ctx, &pb.EchoRequest{Message: "message0"})
141			if err != nil {
142				t.Errorf("... failed with error: %v", err.Error())
143			} else {
144				t.Logf("... Recv %v", resp)
145				if resp.Message != "message0" {
146					t.Errorf("Unexpected response %v", resp)
147				}
148			}
149
150			conn.Close()
151		})
152	}
153}
154
155func TestServerStreamingEcho(t *testing.T) {
156	const num_connections = 1
157	cmd, reader, err := launchServer(t, num_connections)
158	if err != nil {
159		t.Errorf("Failed to launch %v", err)
160	}
161	defer cmd.Wait()
162
163	conn, echo_client, err := connectServer()
164	if err != nil {
165		t.Errorf("Failed to connect %v", err)
166	}
167	defer conn.Close()
168	go logServer(t, reader)
169
170	testRPC(t, func(t *testing.T, ctx context.Context, msg string) {
171		t.Logf("call ServerStreamingEcho(%v)", msg)
172		client, err := echo_client.ServerStreamingEcho(ctx, &pb.EchoRequest{Message: msg})
173		if err != nil {
174			t.Errorf("... failed with error: %v", err)
175			return
176		}
177		for {
178			resp, err := client.Recv()
179			if err == io.EOF {
180				t.Logf("... completed")
181				return
182			}
183			if err != nil {
184				t.Logf("... Recv failed with error: %v", err)
185				if msg != "quiet" || status.Convert(err).Code() != codes.Canceled {
186					t.Errorf("Error unexpected %v", err)
187				}
188				return
189			}
190			t.Logf("... Recv %v", resp)
191			if resp.Message != msg && resp.Message != "done" {
192				t.Errorf("Unexpected response %v", resp)
193			}
194		}
195	})
196}
197
198func TestClientStreamingEcho(t *testing.T) {
199	const num_connections = 1
200	cmd, reader, err := launchServer(t, num_connections)
201	if err != nil {
202		t.Errorf("Failed to launch %v", err)
203	}
204	defer cmd.Wait()
205
206	conn, echo_client, err := connectServer()
207	if err != nil {
208		t.Errorf("Failed to connect %v", err)
209	}
210	defer conn.Close()
211	go logServer(t, reader)
212
213	testRPC(t, func(t *testing.T, ctx context.Context, msg string) {
214		t.Logf("call ClientStreamingEcho()")
215		client, err := echo_client.ClientStreamingEcho(ctx)
216		if err != nil {
217			t.Errorf("... failed with error: %v", err)
218			return
219		}
220		for i := 0; i < 3; i++ {
221			t.Logf("... Send %v", msg)
222			if err := client.Send(&pb.EchoRequest{Message: msg}); err != nil {
223				t.Errorf("... Send failed with error: %v", err)
224				return
225			}
226		}
227		if err := client.CloseSend(); err != nil {
228			t.Errorf("... CloseSend failed with error: %v", err)
229			return
230		}
231		resp, err := client.CloseAndRecv()
232		if err != nil {
233			t.Logf("... CloseAndRecv failed with error: %v", err)
234			if msg != "quiet" || status.Convert(err).Code() != codes.Canceled {
235				t.Errorf("Error unexpected %v", err)
236			}
237		} else {
238			t.Logf("... CloseAndRecv %v", resp)
239			if resp.Message != "done" {
240				t.Errorf("Unexpected response %v", resp)
241			}
242		}
243	})
244}
245
246func TestBidirectionalStreamingEcho(t *testing.T) {
247	const num_connections = 1
248	cmd, reader, err := launchServer(t, num_connections)
249	if err != nil {
250		t.Errorf("Failed to launch %v", err)
251	}
252	defer cmd.Wait()
253
254	conn, echo_client, err := connectServer()
255	if err != nil {
256		t.Errorf("Failed to connect %v", err)
257	}
258	defer conn.Close()
259	go logServer(t, reader)
260
261	testRPC(t, func(t *testing.T, ctx context.Context, msg string) {
262		t.Logf("call BidirectionalStreamingEcho()")
263		client, err := echo_client.BidirectionalStreamingEcho(ctx)
264		if err != nil {
265			t.Logf("... failed with error: %v", err)
266			return
267		}
268		for i := 0; i < 3; i++ {
269			t.Logf("... Send %v", msg)
270			if err := client.Send(&pb.EchoRequest{Message: msg}); err != nil {
271				t.Errorf("... Send failed with error: %v", err)
272				return
273			}
274		}
275		if err := client.CloseSend(); err != nil {
276			t.Logf("... CloseSend failed with error: %v", err)
277			return
278		}
279		for {
280			resp, err := client.Recv()
281			if err == io.EOF {
282				t.Logf("... completed")
283				return
284			}
285			if err != nil {
286				t.Logf("... Recv failed with error: %v", err)
287				if msg != "quiet" || status.Convert(err).Code() != codes.Canceled {
288					t.Errorf("Error unexpected %v", err)
289				}
290				return
291			}
292			t.Logf("... Recv %v", resp)
293			if resp.Message != msg {
294				t.Errorf("Unexpected response %v", resp)
295			}
296		}
297	})
298}
299
300func logServer(t *testing.T, reader *bufio.Reader) {
301	for {
302		line, err := reader.ReadString('\n')
303		if err != nil {
304			break
305		}
306		t.Logf("SERVER: %v", line)
307	}
308}
309
310func launchServer(t *testing.T, num_connections int) (*exec.Cmd, *bufio.Reader, error) {
311	cmd := exec.Command("./test_pw_rpc_server", port, strconv.Itoa(num_connections))
312
313	output, err := cmd.StdoutPipe()
314	if err != nil {
315		t.Errorf("Failed to get stdout of server %v", err)
316		return nil, nil, err
317	}
318
319	if err := cmd.Start(); err != nil {
320		t.Errorf("Failed to launch server %v", err)
321		return nil, nil, err
322	}
323
324	reader := bufio.NewReader(output)
325	for {
326		line, _ := reader.ReadString('\n')
327		if strings.Contains(line, "Accept") {
328			break
329		}
330	}
331
332	return cmd, reader, nil
333}
334
335func connectServer() (*grpc.ClientConn, pb.EchoClient, error) {
336	addr := "localhost:" + port
337
338	conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
339	if err != nil {
340		return nil, nil, err
341	}
342
343	echo_client := pb.NewEchoClient(conn)
344	return conn, echo_client, nil
345}
346
347func testRPC(t *testing.T, call func(t *testing.T, ctx context.Context, msg string)) {
348	const num_calls = 30
349	for i := 0; i < num_calls; i++ {
350		t.Run(fmt.Sprintf("%d of %d", i+1, num_calls), func(t *testing.T) {
351			ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
352			defer cancel()
353
354			msg := fmt.Sprintf("message%d", i)
355			if i == num_calls-1 {
356				msg = "quiet"
357			}
358
359			done := make(chan struct{})
360			go func() {
361				call(t, ctx, msg)
362				close(done)
363			}()
364			// Test cancellation. When we sent "quiet", the server won't echo anything
365			// back and instead will hold onto the request. Sleep a bit to make sure
366			// the server doesn't respond. Then cancel the request, which should
367			// complete the RPC.
368			if msg == "quiet" {
369				time.Sleep(100 * time.Millisecond)
370				cancel()
371			}
372			<-done
373		})
374	}
375}
376