1// Copyright 2024 The Pigweed Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); you may not 4// use this file except in compliance with the License. You may obtain a copy of 5// the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12// License for the specific language governing permissions and limitations under 13// the License. 14// 15// Package integration_test implements a client to exercise the pw_grpc server implementation 16package integration_test 17 18import ( 19 "bufio" 20 "context" 21 "fmt" 22 "hash/crc32" 23 "io" 24 "os/exec" 25 "strconv" 26 "strings" 27 "testing" 28 "time" 29 30 "google.golang.org/grpc" 31 "google.golang.org/grpc/codes" 32 "google.golang.org/grpc/credentials/insecure" 33 pb "google.golang.org/grpc/examples/features/proto/echo" 34 "google.golang.org/grpc/status" 35) 36 37const port = "3402" 38 39func TestUnaryEcho(t *testing.T) { 40 const num_connections = 1 41 cmd, reader, err := launchServer(t, num_connections) 42 if err != nil { 43 t.Errorf("Failed to launch %v", err) 44 } 45 defer cmd.Wait() 46 47 conn, echo_client, err := connectServer() 48 if err != nil { 49 t.Errorf("Failed to connect %v", err) 50 } 51 defer conn.Close() 52 go logServer(t, reader) 53 54 testRPC(t, func(t *testing.T, ctx context.Context, msg string) { 55 t.Logf("call UnaryEcho(%v)", msg) 56 resp, err := echo_client.UnaryEcho(ctx, &pb.EchoRequest{Message: msg}) 57 if err != nil { 58 t.Logf("... failed with error: %v", err.Error()) 59 if msg != "quiet" || status.Convert(err).Code() != codes.Canceled { 60 t.Errorf("Error unexpected %v", err) 61 } 62 } else { 63 t.Logf("... Recv %v", resp) 64 if resp.Message != msg { 65 t.Errorf("Unexpected response %v", resp) 66 } 67 } 68 }) 69} 70 71func TestFragmentedMessage(t *testing.T) { 72 // Test sending successively larger messages, larger than the maximum 73 // HTTP2 data frame size (16384), ensuring messages are fragmented across 74 // frames. 75 const num_connections = 1 76 cmd, reader, err := launchServer(t, num_connections) 77 if err != nil { 78 t.Errorf("Failed to launch %v", err) 79 } 80 defer cmd.Wait() 81 82 conn, echo_client, err := connectServer() 83 if err != nil { 84 t.Errorf("Failed to connect %v", err) 85 } 86 defer conn.Close() 87 go logServer(t, reader) 88 89 const num_calls = 4 90 for i := 0; i < num_calls; i++ { 91 t.Run(fmt.Sprintf("%d of %d", i+1, num_calls), func(t *testing.T) { 92 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 93 defer cancel() 94 95 msg := "crc32:" + strings.Repeat("testmessage!", 1500*(i+1)) 96 checksum := strconv.FormatUint(uint64(crc32.ChecksumIEEE([]byte(msg))), 10) 97 98 done := make(chan struct{}) 99 go func() { 100 t.Logf("call UnaryChecksum") 101 resp, err := echo_client.UnaryEcho(ctx, &pb.EchoRequest{Message: msg}) 102 if err != nil { 103 t.Logf("... failed with error: %v", err.Error()) 104 if msg != "quiet" || status.Convert(err).Code() != codes.Canceled { 105 t.Errorf("Error unexpected %v", err) 106 } 107 } else { 108 t.Logf("... Recv %v", resp) 109 if resp.Message != checksum { 110 t.Errorf("Unexpected response %v", resp) 111 } 112 } 113 close(done) 114 }() 115 <-done 116 }) 117 } 118} 119 120func TestMultipleConnections(t *testing.T) { 121 const num_connections = 3 122 cmd, reader, err := launchServer(t, num_connections) 123 if err != nil { 124 t.Errorf("Failed to launch %v", err) 125 } 126 defer cmd.Wait() 127 128 go logServer(t, reader) 129 130 for i := 0; i < num_connections; i++ { 131 t.Run(fmt.Sprintf("connection %d of %d", i+1, num_connections), func(t *testing.T) { 132 conn, echo_client, err := connectServer() 133 if err != nil { 134 t.Errorf("Failed to connect %v", err) 135 } 136 137 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 138 defer cancel() 139 140 resp, err := echo_client.UnaryEcho(ctx, &pb.EchoRequest{Message: "message0"}) 141 if err != nil { 142 t.Errorf("... failed with error: %v", err.Error()) 143 } else { 144 t.Logf("... Recv %v", resp) 145 if resp.Message != "message0" { 146 t.Errorf("Unexpected response %v", resp) 147 } 148 } 149 150 conn.Close() 151 }) 152 } 153} 154 155func TestServerStreamingEcho(t *testing.T) { 156 const num_connections = 1 157 cmd, reader, err := launchServer(t, num_connections) 158 if err != nil { 159 t.Errorf("Failed to launch %v", err) 160 } 161 defer cmd.Wait() 162 163 conn, echo_client, err := connectServer() 164 if err != nil { 165 t.Errorf("Failed to connect %v", err) 166 } 167 defer conn.Close() 168 go logServer(t, reader) 169 170 testRPC(t, func(t *testing.T, ctx context.Context, msg string) { 171 t.Logf("call ServerStreamingEcho(%v)", msg) 172 client, err := echo_client.ServerStreamingEcho(ctx, &pb.EchoRequest{Message: msg}) 173 if err != nil { 174 t.Errorf("... failed with error: %v", err) 175 return 176 } 177 for { 178 resp, err := client.Recv() 179 if err == io.EOF { 180 t.Logf("... completed") 181 return 182 } 183 if err != nil { 184 t.Logf("... Recv failed with error: %v", err) 185 if msg != "quiet" || status.Convert(err).Code() != codes.Canceled { 186 t.Errorf("Error unexpected %v", err) 187 } 188 return 189 } 190 t.Logf("... Recv %v", resp) 191 if resp.Message != msg && resp.Message != "done" { 192 t.Errorf("Unexpected response %v", resp) 193 } 194 } 195 }) 196} 197 198func TestClientStreamingEcho(t *testing.T) { 199 const num_connections = 1 200 cmd, reader, err := launchServer(t, num_connections) 201 if err != nil { 202 t.Errorf("Failed to launch %v", err) 203 } 204 defer cmd.Wait() 205 206 conn, echo_client, err := connectServer() 207 if err != nil { 208 t.Errorf("Failed to connect %v", err) 209 } 210 defer conn.Close() 211 go logServer(t, reader) 212 213 testRPC(t, func(t *testing.T, ctx context.Context, msg string) { 214 t.Logf("call ClientStreamingEcho()") 215 client, err := echo_client.ClientStreamingEcho(ctx) 216 if err != nil { 217 t.Errorf("... failed with error: %v", err) 218 return 219 } 220 for i := 0; i < 3; i++ { 221 t.Logf("... Send %v", msg) 222 if err := client.Send(&pb.EchoRequest{Message: msg}); err != nil { 223 t.Errorf("... Send failed with error: %v", err) 224 return 225 } 226 } 227 if err := client.CloseSend(); err != nil { 228 t.Errorf("... CloseSend failed with error: %v", err) 229 return 230 } 231 resp, err := client.CloseAndRecv() 232 if err != nil { 233 t.Logf("... CloseAndRecv failed with error: %v", err) 234 if msg != "quiet" || status.Convert(err).Code() != codes.Canceled { 235 t.Errorf("Error unexpected %v", err) 236 } 237 } else { 238 t.Logf("... CloseAndRecv %v", resp) 239 if resp.Message != "done" { 240 t.Errorf("Unexpected response %v", resp) 241 } 242 } 243 }) 244} 245 246func TestBidirectionalStreamingEcho(t *testing.T) { 247 const num_connections = 1 248 cmd, reader, err := launchServer(t, num_connections) 249 if err != nil { 250 t.Errorf("Failed to launch %v", err) 251 } 252 defer cmd.Wait() 253 254 conn, echo_client, err := connectServer() 255 if err != nil { 256 t.Errorf("Failed to connect %v", err) 257 } 258 defer conn.Close() 259 go logServer(t, reader) 260 261 testRPC(t, func(t *testing.T, ctx context.Context, msg string) { 262 t.Logf("call BidirectionalStreamingEcho()") 263 client, err := echo_client.BidirectionalStreamingEcho(ctx) 264 if err != nil { 265 t.Logf("... failed with error: %v", err) 266 return 267 } 268 for i := 0; i < 3; i++ { 269 t.Logf("... Send %v", msg) 270 if err := client.Send(&pb.EchoRequest{Message: msg}); err != nil { 271 t.Errorf("... Send failed with error: %v", err) 272 return 273 } 274 } 275 if err := client.CloseSend(); err != nil { 276 t.Logf("... CloseSend failed with error: %v", err) 277 return 278 } 279 for { 280 resp, err := client.Recv() 281 if err == io.EOF { 282 t.Logf("... completed") 283 return 284 } 285 if err != nil { 286 t.Logf("... Recv failed with error: %v", err) 287 if msg != "quiet" || status.Convert(err).Code() != codes.Canceled { 288 t.Errorf("Error unexpected %v", err) 289 } 290 return 291 } 292 t.Logf("... Recv %v", resp) 293 if resp.Message != msg { 294 t.Errorf("Unexpected response %v", resp) 295 } 296 } 297 }) 298} 299 300func logServer(t *testing.T, reader *bufio.Reader) { 301 for { 302 line, err := reader.ReadString('\n') 303 if err != nil { 304 break 305 } 306 t.Logf("SERVER: %v", line) 307 } 308} 309 310func launchServer(t *testing.T, num_connections int) (*exec.Cmd, *bufio.Reader, error) { 311 cmd := exec.Command("./test_pw_rpc_server", port, strconv.Itoa(num_connections)) 312 313 output, err := cmd.StdoutPipe() 314 if err != nil { 315 t.Errorf("Failed to get stdout of server %v", err) 316 return nil, nil, err 317 } 318 319 if err := cmd.Start(); err != nil { 320 t.Errorf("Failed to launch server %v", err) 321 return nil, nil, err 322 } 323 324 reader := bufio.NewReader(output) 325 for { 326 line, _ := reader.ReadString('\n') 327 if strings.Contains(line, "Accept") { 328 break 329 } 330 } 331 332 return cmd, reader, nil 333} 334 335func connectServer() (*grpc.ClientConn, pb.EchoClient, error) { 336 addr := "localhost:" + port 337 338 conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) 339 if err != nil { 340 return nil, nil, err 341 } 342 343 echo_client := pb.NewEchoClient(conn) 344 return conn, echo_client, nil 345} 346 347func testRPC(t *testing.T, call func(t *testing.T, ctx context.Context, msg string)) { 348 const num_calls = 30 349 for i := 0; i < num_calls; i++ { 350 t.Run(fmt.Sprintf("%d of %d", i+1, num_calls), func(t *testing.T) { 351 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 352 defer cancel() 353 354 msg := fmt.Sprintf("message%d", i) 355 if i == num_calls-1 { 356 msg = "quiet" 357 } 358 359 done := make(chan struct{}) 360 go func() { 361 call(t, ctx, msg) 362 close(done) 363 }() 364 // Test cancellation. When we sent "quiet", the server won't echo anything 365 // back and instead will hold onto the request. Sleep a bit to make sure 366 // the server doesn't respond. Then cancel the request, which should 367 // complete the RPC. 368 if msg == "quiet" { 369 time.Sleep(100 * time.Millisecond) 370 cancel() 371 } 372 <-done 373 }) 374 } 375} 376