1// Copyright 2022 The Pigweed Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); you may not 4// use this file except in compliance with the License. You may obtain a copy of 5// the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12// License for the specific language governing permissions and limitations under 13// the License. 14 15/* eslint-env browser */ 16 17import { Status } from 'pigweedjs/pw_status'; 18import { MessageCreator } from 'pigweedjs/pw_protobuf_compiler'; 19import { Message } from 'google-protobuf'; 20import { 21 PacketType, 22 RpcPacket, 23} from 'pigweedjs/protos/pw_rpc/internal/packet_pb'; 24import { ProtoCollection } from 'pigweedjs/protos/collection'; 25import { Request, Response } from 'pigweedjs/protos/pw_rpc/ts/test_pb'; 26 27import { Client } from './client'; 28import { Channel, Method, Service } from './descriptors'; 29import { 30 BidirectionalStreamingMethodStub, 31 ClientStreamingMethodStub, 32 ServerStreamingMethodStub, 33 UnaryMethodStub, 34} from './method'; 35import * as packets from './packets'; 36 37const LEGACY_OPEN_CALL_ID = 0; 38const OPEN_CALL_ID = 2 ** 32 - 1; 39 40describe('Client', () => { 41 let protoCollection: ProtoCollection; 42 let client: Client; 43 let lastPacketSent: RpcPacket; 44 45 beforeEach(() => { 46 protoCollection = new ProtoCollection(); 47 const channels = [new Channel(1, savePacket), new Channel(5)]; 48 client = Client.fromProtoSet(channels, protoCollection); 49 }); 50 51 function savePacket(packetBytes: Uint8Array): void { 52 lastPacketSent = RpcPacket.deserializeBinary(packetBytes); 53 } 54 55 it('channel returns undefined for empty list', () => { 56 const channels = Array<Channel>(); 57 const emptyChannelClient = Client.fromProtoSet(channels, protoCollection); 58 expect(emptyChannelClient.channel()).toBeUndefined(); 59 }); 60 61 it('fetches channel or returns undefined', () => { 62 expect(client.channel(1)!.channel.id).toEqual(1); 63 expect(client.channel(5)!.channel.id).toEqual(5); 64 expect(client.channel()!.channel.id).toEqual(1); 65 expect(client.channel(2)).toBeUndefined(); 66 }); 67 68 it('ChannelClient fetches method by name', () => { 69 const channel = client.channel()!; 70 const stub = channel.methodStub('pw.rpc.test1.TheTestService.SomeUnary')!; 71 expect(stub.method.name).toEqual('SomeUnary'); 72 }); 73 74 it('ChannelClient for unknown name returns undefined', () => { 75 const channel = client.channel()!; 76 expect(channel.methodStub('')).toBeUndefined(); 77 expect( 78 channel.methodStub('pw.rpc.test1.Garbage.SomeUnary'), 79 ).toBeUndefined(); 80 expect( 81 channel.methodStub('pw.rpc.test1.TheTestService.Garbage'), 82 ).toBeUndefined(); 83 }); 84 85 it('processPacket with invalid proto data', () => { 86 const textEncoder = new TextEncoder(); 87 const data = textEncoder.encode('NOT a packet!'); 88 expect(client.processPacket(data)).toEqual(Status.DATA_LOSS); 89 }); 90 91 it('processPacket not for client', () => { 92 const packet = new RpcPacket(); 93 packet.setType(PacketType.REQUEST); 94 const processStatus = client.processPacket(packet.serializeBinary()); 95 expect(processStatus).toEqual(Status.INVALID_ARGUMENT); 96 }); 97 98 it('processPacket for unrecognized channel', () => { 99 const packet = packets.encodeResponse([123, 456, 789, 456], new Request()); 100 expect(client.processPacket(packet)).toEqual(Status.NOT_FOUND); 101 }); 102 103 it('processPacket for unrecognized service', () => { 104 const packet = packets.encodeResponse([1, 456, 789, 456], new Request()); 105 const status = client.processPacket(packet); 106 expect(client.processPacket(packet)).toEqual(Status.OK); 107 108 expect(lastPacketSent.getChannelId()).toEqual(1); 109 expect(lastPacketSent.getServiceId()).toEqual(456); 110 expect(lastPacketSent.getMethodId()).toEqual(789); 111 expect(lastPacketSent.getCallId()).toEqual(456); 112 expect(lastPacketSent.getType()).toEqual(PacketType.CLIENT_ERROR); 113 expect(lastPacketSent.getStatus()).toEqual(Status.NOT_FOUND); 114 }); 115 116 it('processPacket for unrecognized method', () => { 117 const service = client.services.values().next().value; 118 119 const packet = packets.encodeResponse( 120 [1, service.id, 789, 456], 121 new Request(), 122 ); 123 const status = client.processPacket(packet); 124 expect(client.processPacket(packet)).toEqual(Status.OK); 125 126 expect(lastPacketSent.getChannelId()).toEqual(1); 127 expect(lastPacketSent.getServiceId()).toEqual(service.id); 128 expect(lastPacketSent.getMethodId()).toEqual(789); 129 expect(lastPacketSent.getCallId()).toEqual(456); 130 expect(lastPacketSent.getType()).toEqual(PacketType.CLIENT_ERROR); 131 expect(lastPacketSent.getStatus()).toEqual(Status.NOT_FOUND); 132 }); 133 134 it('processPacket for non-pending method', () => { 135 const service = client.services.values().next().value; 136 const method = service.methods.values().next().value; 137 138 const packet = packets.encodeResponse( 139 [1, service.id, method.id, 456], 140 new Request(), 141 ); 142 const status = client.processPacket(packet); 143 expect(client.processPacket(packet)).toEqual(Status.OK); 144 145 expect(lastPacketSent.getChannelId()).toEqual(1); 146 expect(lastPacketSent.getServiceId()).toEqual(service.id); 147 expect(lastPacketSent.getMethodId()).toEqual(method.id); 148 expect(lastPacketSent.getType()).toEqual(PacketType.CLIENT_ERROR); 149 expect(lastPacketSent.getStatus()).toEqual(Status.FAILED_PRECONDITION); 150 }); 151}); 152 153describe('RPC', () => { 154 let protoCollection: ProtoCollection; 155 let client: Client; 156 let lastPacketSent: RpcPacket | undefined; 157 let requests: RpcPacket[] = []; 158 let nextPackets: [Uint8Array, Status][] = []; 159 let responseLock = false; 160 let sendResponsesAfterPackets = 0; 161 let outputException: Error | undefined; 162 163 beforeEach(async () => { 164 protoCollection = new ProtoCollection(); 165 const channels = [ 166 new Channel(1, handlePacket), 167 new Channel(2, () => { 168 // Do nothing. 169 }), 170 ]; 171 client = Client.fromProtoSet(channels, protoCollection); 172 lastPacketSent = undefined; 173 requests = []; 174 nextPackets = []; 175 responseLock = false; 176 sendResponsesAfterPackets = 0; 177 outputException = undefined; 178 }); 179 180 function newRequest(magicNumber = 123): Message { 181 const request = new Request(); 182 request.setMagicNumber(magicNumber); 183 return request; 184 } 185 186 function newResponse(payload = '._.'): Message { 187 const response = new Response(); 188 response.setPayload(payload); 189 return response; 190 } 191 192 function enqueueResponse( 193 channelId: number, 194 method: Method, 195 status: Status, 196 callId: number, 197 response?: Message, 198 ) { 199 const packet = new RpcPacket(); 200 packet.setType(PacketType.RESPONSE); 201 packet.setChannelId(channelId); 202 packet.setServiceId(method.service.id); 203 packet.setMethodId(method.id); 204 packet.setCallId(callId); 205 packet.setStatus(status); 206 if (response === undefined) { 207 packet.setPayload(new Uint8Array(0)); 208 } else { 209 packet.setPayload(response.serializeBinary()); 210 } 211 nextPackets.push([packet.serializeBinary(), Status.OK]); 212 } 213 214 function enqueueServerStream( 215 channelId: number, 216 method: Method, 217 response: Message, 218 callId: number, 219 status: Status = Status.OK, 220 ) { 221 const packet = new RpcPacket(); 222 packet.setType(PacketType.SERVER_STREAM); 223 packet.setChannelId(channelId); 224 packet.setServiceId(method.service.id); 225 packet.setMethodId(method.id); 226 packet.setCallId(callId); 227 packet.setPayload(response.serializeBinary()); 228 packet.setStatus(status); 229 nextPackets.push([packet.serializeBinary(), status]); 230 } 231 232 function enqueueError( 233 channelId: number, 234 method: Method, 235 status: Status, 236 processStatus: Status, 237 callId: number, 238 ) { 239 const packet = new RpcPacket(); 240 packet.setType(PacketType.SERVER_ERROR); 241 packet.setChannelId(channelId); 242 packet.setServiceId(method.service.id); 243 packet.setMethodId(method.id); 244 packet.setCallId(callId); 245 packet.setStatus(status); 246 247 nextPackets.push([packet.serializeBinary(), processStatus]); 248 } 249 250 function lastRequest(): RpcPacket { 251 if (requests.length == 0) { 252 throw Error('Tried to fetch request from empty list'); 253 } 254 return requests[requests.length - 1]; 255 } 256 257 function sentPayload(messageType: typeof Message): any { 258 return messageType.deserializeBinary(lastRequest().getPayload_asU8()); 259 } 260 261 function handlePacket(data: Uint8Array): void { 262 if (outputException !== undefined) { 263 throw outputException; 264 } 265 requests.push(packets.decode(data)); 266 267 if (sendResponsesAfterPackets > 1) { 268 sendResponsesAfterPackets -= 1; 269 return; 270 } 271 272 processEnqueuedPackets(); 273 } 274 275 function processEnqueuedPackets(): void { 276 // Avoid infinite recursion when processing a packet causes another packet 277 // to send. 278 if (responseLock) return; 279 responseLock = true; 280 for (const [packet, status] of nextPackets) { 281 expect(client.processPacket(packet)).toEqual(status); 282 } 283 nextPackets = []; 284 responseLock = false; 285 } 286 287 describe('Unary', () => { 288 let unaryStub: UnaryMethodStub; 289 290 beforeEach(async () => { 291 unaryStub = client 292 .channel() 293 ?.methodStub( 294 'pw.rpc.test1.TheTestService.SomeUnary', 295 ) as UnaryMethodStub; 296 }); 297 298 const openCallIds = [ 299 ['OPEN_CALL_ID', OPEN_CALL_ID], 300 ['LEGACY_OPEN_CALL_ID', LEGACY_OPEN_CALL_ID], 301 ]; 302 openCallIds.forEach(([idName, callId]) => { 303 it(`matches responses with ${idName} to requests with arbitrary IDs`, async () => { 304 const promisedResponse = unaryStub.call(newRequest(6)); 305 enqueueResponse( 306 1, 307 unaryStub.method, 308 Status.ABORTED, 309 OPEN_CALL_ID, 310 newResponse('is unrequested'), 311 ); 312 313 processEnqueuedPackets(); 314 const [status, response] = await promisedResponse; 315 316 expect(sentPayload(Request).getMagicNumber()).toEqual(6); 317 expect(status).toEqual(Status.ABORTED); 318 expect(response).toEqual(newResponse('is unrequested')); 319 }); 320 }); 321 322 it('blocking call', async () => { 323 for (let i = 0; i < 3; i++) { 324 enqueueResponse( 325 1, 326 unaryStub.method, 327 Status.ABORTED, 328 unaryStub.rpcs.nextCallId, 329 newResponse('0_o'), 330 ); 331 const [status, response] = await unaryStub.call(newRequest(6)); 332 333 expect(sentPayload(Request).getMagicNumber()).toEqual(6); 334 expect(status).toEqual(Status.ABORTED); 335 expect(response).toEqual(newResponse('0_o')); 336 } 337 }); 338 339 it('nonblocking call', () => { 340 for (let i = 0; i < 3; i++) { 341 const response = newResponse('hello world'); 342 enqueueResponse( 343 1, 344 unaryStub.method, 345 Status.ABORTED, 346 unaryStub.rpcs.nextCallId, 347 response, 348 ); 349 350 const onNext = jest.fn(); 351 const onCompleted = jest.fn(); 352 const onError = jest.fn(); 353 const call = unaryStub.invoke( 354 newRequest(5), 355 onNext, 356 onCompleted, 357 onError, 358 ); 359 360 expect(sentPayload(Request).getMagicNumber()).toEqual(5); 361 expect(onNext).toHaveBeenCalledWith(response); 362 expect(onError).not.toHaveBeenCalled(); 363 expect(onCompleted).toHaveBeenCalledWith(Status.ABORTED); 364 } 365 }); 366 367 it('open', () => { 368 outputException = Error('Error should be ignored'); 369 370 for (let i = 0; i < 3; i++) { 371 const response = newResponse('hello world'); 372 enqueueResponse( 373 1, 374 unaryStub.method, 375 Status.ABORTED, 376 unaryStub.rpcs.nextCallId, 377 response, 378 ); 379 380 const onNext = jest.fn(); 381 const onCompleted = jest.fn(); 382 const onError = jest.fn(); 383 unaryStub.open(newRequest(5), onNext, onCompleted, onError); 384 expect(requests).toHaveLength(0); 385 386 processEnqueuedPackets(); 387 388 expect(onNext).toHaveBeenCalledWith(response); 389 expect(onError).not.toHaveBeenCalled(); 390 expect(onCompleted).toHaveBeenCalledWith(Status.ABORTED); 391 } 392 }); 393 394 it('nonblocking concurrent call', () => { 395 // Start several calls to the same method 396 const callsAndCallbacks = []; 397 for (let i = 0; i < 3; i++) { 398 const onNext = jest.fn(); 399 const onCompleted = jest.fn(); 400 const onError = jest.fn(); 401 402 const call = unaryStub.invoke( 403 newRequest(5), 404 onNext, 405 onCompleted, 406 onError, 407 ); 408 callsAndCallbacks.push([call, onNext, onCompleted, onError]); 409 410 expect(sentPayload(Request).getMagicNumber()).toEqual(5); 411 } 412 // Respond only to the last call 413 const [lastCall, lastCallback] = callsAndCallbacks.pop(); 414 const lastResponse = newResponse('last payload'); 415 416 enqueueResponse( 417 1, 418 unaryStub.method, 419 Status.OK, 420 lastCall.callId, 421 lastResponse, 422 ); 423 processEnqueuedPackets(); 424 425 expect(lastCallback).toHaveBeenCalledWith(lastResponse); 426 for (const i in callsAndCallbacks) { 427 const [_call, onNext, onCompleted, onError] = callsAndCallbacks[i]; 428 expect(onNext).toBeCalledTimes(0); 429 expect(onCompleted).toBeCalledTimes(0); 430 expect(onError).toBeCalledTimes(0); 431 } 432 }); 433 434 it('blocking server error', async () => { 435 for (let i = 0; i < 3; i++) { 436 enqueueError( 437 1, 438 unaryStub.method, 439 Status.NOT_FOUND, 440 Status.OK, 441 unaryStub.rpcs.nextCallId, 442 ); 443 444 try { 445 await unaryStub.call(newRequest()); 446 fail('call expected to fail'); 447 } catch (e: any) { 448 expect(e.status).toBe(Status.NOT_FOUND); 449 } 450 } 451 }); 452 453 it('nonblocking call cancel', () => { 454 for (let i = 0; i < 3; i++) { 455 const onNext = jest.fn(); 456 const call = unaryStub.invoke(newRequest(), onNext); 457 458 expect(requests.length).toBeGreaterThan(0); 459 requests = []; 460 461 expect(call.cancel()).toBe(true); 462 expect(lastRequest().getType()).toEqual(PacketType.CLIENT_ERROR); 463 expect(lastRequest().getStatus()).toEqual(Status.CANCELLED); 464 465 expect(call.cancel()).toBe(false); 466 expect(onNext).not.toHaveBeenCalled(); 467 } 468 }); 469 470 it('blocking call with timeout', async () => { 471 try { 472 await unaryStub.call(newRequest(), 10); 473 fail('Promise should not be resolve'); 474 } catch (err: any) { 475 expect(err.timeoutMs).toEqual(10); 476 } 477 }); 478 479 it('nonblocking exception in callback', () => { 480 const errorCallback = () => { 481 throw Error('Something went wrong!'); 482 }; 483 484 enqueueResponse( 485 1, 486 unaryStub.method, 487 Status.OK, 488 unaryStub.rpcs.nextCallId, 489 ); 490 const call = unaryStub.invoke(newRequest(), errorCallback); 491 expect(call.callbackException!.name).toEqual('Error'); 492 expect(call.callbackException!.message).toEqual('Something went wrong!'); 493 }); 494 }); 495 496 describe('ServerStreaming', () => { 497 let serverStreaming: ServerStreamingMethodStub; 498 499 beforeEach(async () => { 500 serverStreaming = client 501 .channel() 502 ?.methodStub( 503 'pw.rpc.test1.TheTestService.SomeServerStreaming', 504 ) as ServerStreamingMethodStub; 505 }); 506 507 it('non-blocking call', () => { 508 const response1 = newResponse('!!!'); 509 const response2 = newResponse('?'); 510 511 for (let i = 0; i < 3; i++) { 512 enqueueServerStream( 513 1, 514 serverStreaming.method, 515 response1, 516 serverStreaming.rpcs.nextCallId, 517 ); 518 enqueueServerStream( 519 1, 520 serverStreaming.method, 521 response2, 522 serverStreaming.rpcs.nextCallId, 523 ); 524 enqueueResponse( 525 1, 526 serverStreaming.method, 527 Status.ABORTED, 528 serverStreaming.rpcs.nextCallId, 529 ); 530 531 const onNext = jest.fn(); 532 const onCompleted = jest.fn(); 533 const onError = jest.fn(); 534 serverStreaming.invoke(newRequest(4), onNext, onCompleted, onError); 535 536 expect(onNext).toHaveBeenCalledWith(response1); 537 expect(onNext).toHaveBeenCalledWith(response2); 538 expect(onError).not.toHaveBeenCalled(); 539 expect(onCompleted).toHaveBeenCalledWith(Status.ABORTED); 540 541 expect( 542 sentPayload(serverStreaming.method.requestType).getMagicNumber(), 543 ).toEqual(4); 544 } 545 }); 546 547 it('open', () => { 548 outputException = Error('Error should be ignored'); 549 const response1 = newResponse('!!!'); 550 const response2 = newResponse('?'); 551 552 for (let i = 0; i < 3; i++) { 553 enqueueServerStream( 554 1, 555 serverStreaming.method, 556 response1, 557 serverStreaming.rpcs.nextCallId, 558 ); 559 enqueueServerStream( 560 1, 561 serverStreaming.method, 562 response2, 563 serverStreaming.rpcs.nextCallId, 564 ); 565 enqueueResponse( 566 1, 567 serverStreaming.method, 568 Status.ABORTED, 569 serverStreaming.rpcs.nextCallId, 570 ); 571 572 const onNext = jest.fn(); 573 const onCompleted = jest.fn(); 574 const onError = jest.fn(); 575 const call = serverStreaming.open( 576 newRequest(3), 577 onNext, 578 onCompleted, 579 onError, 580 ); 581 582 expect(requests).toHaveLength(0); 583 processEnqueuedPackets(); 584 585 expect(onNext).toHaveBeenCalledWith(response1); 586 expect(onNext).toHaveBeenCalledWith(response2); 587 expect(onError).not.toHaveBeenCalled(); 588 expect(onCompleted).toHaveBeenCalledWith(Status.ABORTED); 589 } 590 }); 591 592 it('blocking timeout', async () => { 593 try { 594 await serverStreaming.call(newRequest(), 10); 595 fail('Promise should not be resolve'); 596 } catch (err: any) { 597 expect(err.timeoutMs).toEqual(10); 598 } 599 }); 600 601 it('non-blocking cancel', () => { 602 const testResponse = newResponse('!!!'); 603 enqueueServerStream( 604 1, 605 serverStreaming.method, 606 testResponse, 607 serverStreaming.rpcs.nextCallId, 608 ); 609 610 const onNext = jest.fn(); 611 const onCompleted = jest.fn(); 612 const onError = jest.fn(); 613 let call = serverStreaming.invoke(newRequest(3), onNext); 614 expect(onNext).toHaveBeenNthCalledWith(1, testResponse); 615 616 // onNext.calls.reset(); 617 618 call.cancel(); 619 expect(lastRequest().getType()).toEqual(PacketType.CLIENT_ERROR); 620 expect(lastRequest().getStatus()).toEqual(Status.CANCELLED); 621 622 // Ensure the RPC can be called after being cancelled. 623 enqueueServerStream( 624 1, 625 serverStreaming.method, 626 testResponse, 627 serverStreaming.rpcs.nextCallId, 628 ); 629 enqueueResponse( 630 1, 631 serverStreaming.method, 632 Status.OK, 633 serverStreaming.rpcs.nextCallId, 634 ); 635 call = serverStreaming.invoke(newRequest(), onNext, onCompleted, onError); 636 expect(onNext).toHaveBeenNthCalledWith(2, testResponse); 637 expect(onError).not.toHaveBeenCalled(); 638 expect(onCompleted).toHaveBeenCalledWith(Status.OK); 639 }); 640 }); 641 642 describe('ClientStreaming', () => { 643 let clientStreaming: ClientStreamingMethodStub; 644 645 beforeEach(async () => { 646 clientStreaming = client 647 .channel() 648 ?.methodStub( 649 'pw.rpc.test1.TheTestService.SomeClientStreaming', 650 ) as ClientStreamingMethodStub; 651 }); 652 653 it('non-blocking call', () => { 654 const testResponse = newResponse('-.-'); 655 656 for (let i = 0; i < 3; i++) { 657 const onNext = jest.fn(); 658 const stream = clientStreaming.invoke(onNext); 659 expect(stream.completed).toBe(false); 660 661 stream.send(newRequest(31)); 662 expect(lastRequest().getType()).toEqual(PacketType.CLIENT_STREAM); 663 expect(sentPayload(Request).getMagicNumber()).toEqual(31); 664 expect(stream.completed).toBe(false); 665 666 // Enqueue the server response to be sent after the next message. 667 enqueueResponse( 668 1, 669 clientStreaming.method, 670 Status.OK, 671 stream.callId, 672 testResponse, 673 ); 674 675 stream.send(newRequest(32)); 676 expect(lastRequest().getType()).toEqual(PacketType.CLIENT_STREAM); 677 expect(sentPayload(Request).getMagicNumber()).toEqual(32); 678 679 expect(onNext).toHaveBeenCalledWith(testResponse); 680 expect(stream.completed).toBe(true); 681 expect(stream.status).toEqual(Status.OK); 682 expect(stream.error).toBeUndefined(); 683 } 684 }); 685 686 it('open', () => { 687 outputException = Error('Error should be ignored'); 688 const response = newResponse('!!!'); 689 690 for (let i = 0; i < 3; i++) { 691 enqueueResponse( 692 1, 693 clientStreaming.method, 694 Status.OK, 695 clientStreaming.rpcs.nextCallId, 696 response, 697 ); 698 699 const onNext = jest.fn(); 700 const onCompleted = jest.fn(); 701 const onError = jest.fn(); 702 const call = clientStreaming.open(onNext, onCompleted, onError); 703 expect(requests).toHaveLength(0); 704 705 processEnqueuedPackets(); 706 707 expect(onNext).toHaveBeenCalledWith(response); 708 expect(onError).not.toHaveBeenCalled(); 709 expect(onCompleted).toHaveBeenCalledWith(Status.OK); 710 } 711 }); 712 713 it('blocking timeout', async () => { 714 try { 715 await clientStreaming.call([newRequest()], 10); 716 fail('Promise should not be resolve'); 717 } catch (err: any) { 718 expect(err.timeoutMs).toEqual(10); 719 } 720 }); 721 722 it('non-blocking call ended by client', () => { 723 const testResponse = newResponse('0.o'); 724 for (let i = 0; i < 3; i++) { 725 const onNext = jest.fn(); 726 const stream = clientStreaming.invoke(onNext); 727 expect(stream.completed).toBe(false); 728 729 stream.send(newRequest(31)); 730 expect(lastRequest().getType()).toEqual(PacketType.CLIENT_STREAM); 731 expect(sentPayload(Request).getMagicNumber()).toEqual(31); 732 expect(stream.completed).toBe(false); 733 734 // Enqueue the server response to be sent after the next message. 735 enqueueResponse( 736 1, 737 clientStreaming.method, 738 Status.OK, 739 stream.callId, 740 testResponse, 741 ); 742 743 stream.finishAndWait(); 744 expect(lastRequest().getType()).toEqual( 745 PacketType.CLIENT_REQUEST_COMPLETION, 746 ); 747 748 expect(onNext).toHaveBeenCalledWith(testResponse); 749 expect(stream.completed).toBe(true); 750 expect(stream.status).toEqual(Status.OK); 751 expect(stream.error).toBeUndefined(); 752 } 753 }); 754 755 it('non-blocking call cancelled', () => { 756 for (let i = 0; i < 3; i++) { 757 const stream = clientStreaming.invoke(); 758 stream.send(newRequest()); 759 760 expect(stream.cancel()).toBe(true); 761 expect(lastRequest().getType()).toEqual(PacketType.CLIENT_ERROR); 762 expect(lastRequest().getStatus()).toEqual(Status.CANCELLED); 763 expect(stream.cancel()).toBe(false); 764 expect(stream.completed).toBe(true); 765 expect(stream.error).toEqual(Status.CANCELLED); 766 } 767 }); 768 769 it('non-blocking call server error', async () => { 770 for (let i = 0; i < 3; i++) { 771 const stream = clientStreaming.invoke(); 772 enqueueError( 773 1, 774 clientStreaming.method, 775 Status.INVALID_ARGUMENT, 776 Status.OK, 777 stream.callId, 778 ); 779 780 stream.send(newRequest()); 781 782 await stream 783 .finishAndWait() 784 .then(() => { 785 fail('Promise should not be resolved'); 786 }) 787 .catch((reason) => { 788 expect(reason.status).toEqual(Status.INVALID_ARGUMENT); 789 }); 790 } 791 }); 792 793 it('non-blocking call server error after stream end', async () => { 794 for (let i = 0; i < 3; i++) { 795 const stream = clientStreaming.invoke(); 796 // Error will be sent in response to the CLIENT_REQUEST_COMPLETION packet. 797 enqueueError( 798 1, 799 clientStreaming.method, 800 Status.INVALID_ARGUMENT, 801 Status.OK, 802 stream.callId, 803 ); 804 805 await stream 806 .finishAndWait() 807 .then(() => { 808 fail('Promise should not be resolved'); 809 }) 810 .catch((reason) => { 811 expect(reason.status).toEqual(Status.INVALID_ARGUMENT); 812 }); 813 } 814 }); 815 816 it('non-blocking call send after cancelled', () => { 817 expect.assertions(2); 818 const stream = clientStreaming.invoke(); 819 expect(stream.cancel()).toBe(true); 820 821 try { 822 stream.send(newRequest()); 823 } catch (e) { 824 console.log(e); 825 expect(e.status).toEqual(Status.CANCELLED); 826 } 827 // expect(() => stream.send(newRequest())).toThrowError( 828 // error => error.status === Status.CANCELLED 829 // ); 830 }); 831 832 it('non-blocking finish after completed', async () => { 833 const enqueuedResponse = newResponse('?!'); 834 enqueueResponse( 835 1, 836 clientStreaming.method, 837 Status.UNAVAILABLE, 838 clientStreaming.rpcs.nextCallId, 839 enqueuedResponse, 840 ); 841 842 const stream = clientStreaming.invoke(); 843 const result = await stream.finishAndWait(); 844 expect(result[1]).toEqual(enqueuedResponse); 845 846 expect(await stream.finishAndWait()).toEqual(result); 847 expect(await stream.finishAndWait()).toEqual(result); 848 }); 849 850 it('non-blocking finish after error', async () => { 851 enqueueError( 852 1, 853 clientStreaming.method, 854 Status.UNAVAILABLE, 855 Status.OK, 856 clientStreaming.rpcs.nextCallId, 857 ); 858 const stream = clientStreaming.invoke(); 859 860 for (let i = 0; i < 3; i++) { 861 await stream 862 .finishAndWait() 863 .then(() => { 864 fail('Promise should not be resolved'); 865 }) 866 .catch((reason) => { 867 expect(reason.status).toEqual(Status.UNAVAILABLE); 868 expect(stream.error).toEqual(Status.UNAVAILABLE); 869 expect(stream.response).toBeUndefined(); 870 }); 871 } 872 }); 873 }); 874 875 describe('BidirectionalStreaming', () => { 876 let bidiStreaming: BidirectionalStreamingMethodStub; 877 878 beforeEach(async () => { 879 bidiStreaming = client 880 .channel() 881 ?.methodStub( 882 'pw.rpc.test1.TheTestService.SomeBidiStreaming', 883 ) as BidirectionalStreamingMethodStub; 884 }); 885 886 it('blocking call', async () => { 887 const testRequests = [newRequest(123), newRequest(456)]; 888 889 sendResponsesAfterPackets = 3; 890 enqueueResponse( 891 1, 892 bidiStreaming.method, 893 Status.NOT_FOUND, 894 bidiStreaming.rpcs.nextCallId, 895 ); 896 897 const results = await bidiStreaming.call(testRequests); 898 expect(results[0]).toEqual(Status.NOT_FOUND); 899 expect(results[1]).toEqual([]); 900 }); 901 902 it('blocking server error', async () => { 903 const testRequests = [newRequest(123)]; 904 enqueueError( 905 1, 906 bidiStreaming.method, 907 Status.NOT_FOUND, 908 Status.OK, 909 bidiStreaming.rpcs.nextCallId, 910 ); 911 912 await bidiStreaming 913 .call(testRequests) 914 .then(() => { 915 fail('Promise should not be resolved'); 916 }) 917 .catch((reason) => { 918 expect(reason.status).toEqual(Status.NOT_FOUND); 919 }); 920 }); 921 922 it('non-blocking call', () => { 923 const rep1 = newResponse('!!!'); 924 const rep2 = newResponse('?'); 925 926 for (let i = 0; i < 3; i++) { 927 const testResponses: Array<Message> = []; 928 const stream = bidiStreaming.invoke((response) => { 929 testResponses.push(response); 930 }); 931 expect(stream.completed).toBe(false); 932 933 stream.send(newRequest(55)); 934 expect(lastRequest().getType()).toEqual(PacketType.CLIENT_STREAM); 935 expect(sentPayload(Request).getMagicNumber()).toEqual(55); 936 expect(stream.completed).toBe(false); 937 expect(testResponses).toEqual([]); 938 939 enqueueServerStream(1, bidiStreaming.method, rep1, stream.callId); 940 enqueueServerStream(1, bidiStreaming.method, rep2, stream.callId); 941 942 stream.send(newRequest(66)); 943 expect(lastRequest().getType()).toEqual(PacketType.CLIENT_STREAM); 944 expect(sentPayload(Request).getMagicNumber()).toEqual(66); 945 expect(stream.completed).toBe(false); 946 expect(testResponses).toEqual([rep1, rep2]); 947 948 enqueueResponse(1, bidiStreaming.method, Status.OK, stream.callId); 949 950 stream.send(newRequest(77)); 951 expect(stream.completed).toBe(true); 952 expect(testResponses).toEqual([rep1, rep2]); 953 expect(stream.status).toEqual(Status.OK); 954 expect(stream.error).toBeUndefined(); 955 } 956 }); 957 958 it('open', () => { 959 outputException = Error('Error should be ignored'); 960 const response1 = newResponse('!!!'); 961 const response2 = newResponse('?'); 962 963 for (let i = 0; i < 3; i++) { 964 enqueueServerStream( 965 1, 966 bidiStreaming.method, 967 response1, 968 bidiStreaming.rpcs.nextCallId, 969 ); 970 enqueueServerStream( 971 1, 972 bidiStreaming.method, 973 response2, 974 bidiStreaming.rpcs.nextCallId, 975 ); 976 enqueueResponse( 977 1, 978 bidiStreaming.method, 979 Status.OK, 980 bidiStreaming.rpcs.nextCallId, 981 ); 982 983 const onNext = jest.fn(); 984 const onCompleted = jest.fn(); 985 const onError = jest.fn(); 986 const call = bidiStreaming.open(onNext, onCompleted, onError); 987 expect(requests).toHaveLength(0); 988 989 processEnqueuedPackets(); 990 991 expect(onNext).toHaveBeenCalledWith(response1); 992 expect(onNext).toHaveBeenCalledWith(response2); 993 expect(onError).not.toHaveBeenCalled(); 994 expect(onCompleted).toHaveBeenCalledWith(Status.OK); 995 } 996 }); 997 998 it('blocking timeout', async () => { 999 try { 1000 await bidiStreaming.call([newRequest()], 10); 1001 fail('Promise should not be resolve'); 1002 } catch (err: any) { 1003 expect(err.timeoutMs).toEqual(10); 1004 } 1005 }); 1006 1007 it('non-blocking server error', async () => { 1008 const response = newResponse('!!!'); 1009 1010 for (let i = 0; i < 3; i++) { 1011 const testResponses: Array<Message> = []; 1012 const stream = bidiStreaming.invoke((response) => { 1013 testResponses.push(response); 1014 }); 1015 expect(stream.completed).toBe(false); 1016 1017 enqueueServerStream(1, bidiStreaming.method, response, stream.callId); 1018 1019 stream.send(newRequest(55)); 1020 expect(stream.completed).toBe(false); 1021 expect(testResponses).toEqual([response]); 1022 1023 enqueueError( 1024 1, 1025 bidiStreaming.method, 1026 Status.OUT_OF_RANGE, 1027 Status.OK, 1028 stream.callId, 1029 ); 1030 1031 stream.send(newRequest(999)); 1032 expect(stream.completed).toBe(true); 1033 expect(testResponses).toEqual([response]); 1034 expect(stream.status).toBeUndefined(); 1035 expect(stream.error).toEqual(Status.OUT_OF_RANGE); 1036 1037 await stream 1038 .finishAndWait() 1039 .then(() => { 1040 fail('Promise should not be resolved'); 1041 }) 1042 .catch((reason) => { 1043 expect(reason.status).toEqual(Status.OUT_OF_RANGE); 1044 }); 1045 } 1046 }); 1047 it('non-blocking server error after stream end', async () => { 1048 for (let i = 0; i < 3; i++) { 1049 const stream = bidiStreaming.invoke(); 1050 1051 // Error is sent in response to CLIENT_REQUEST_COMPLETION packet. 1052 enqueueError( 1053 1, 1054 bidiStreaming.method, 1055 Status.INVALID_ARGUMENT, 1056 Status.OK, 1057 stream.callId, 1058 ); 1059 1060 await stream 1061 .finishAndWait() 1062 .then(() => { 1063 fail('Promise should not be resolved'); 1064 }) 1065 .catch((reason) => { 1066 expect(reason.status).toEqual(Status.INVALID_ARGUMENT); 1067 }); 1068 } 1069 }); 1070 1071 it('non-blocking send after cancelled', async () => { 1072 const stream = bidiStreaming.invoke(); 1073 expect(stream.cancel()).toBe(true); 1074 1075 try { 1076 stream.send(newRequest()); 1077 fail('send should have failed'); 1078 } catch (e: any) { 1079 expect(e.status).toBe(Status.CANCELLED); 1080 } 1081 }); 1082 1083 it('non-blocking finish after completed', async () => { 1084 const response = newResponse('!?'); 1085 enqueueServerStream( 1086 1, 1087 bidiStreaming.method, 1088 response, 1089 bidiStreaming.rpcs.nextCallId, 1090 ); 1091 enqueueResponse( 1092 1, 1093 bidiStreaming.method, 1094 Status.UNAVAILABLE, 1095 bidiStreaming.rpcs.nextCallId, 1096 ); 1097 1098 const stream = bidiStreaming.invoke(); 1099 const result = await stream.finishAndWait(); 1100 expect(result[1]).toEqual([response]); 1101 1102 expect(await stream.finishAndWait()).toEqual(result); 1103 expect(await stream.finishAndWait()).toEqual(result); 1104 }); 1105 1106 it('non-blocking finish after error', async () => { 1107 const response = newResponse('!?'); 1108 enqueueServerStream( 1109 1, 1110 bidiStreaming.method, 1111 response, 1112 bidiStreaming.rpcs.nextCallId, 1113 ); 1114 enqueueError( 1115 1, 1116 bidiStreaming.method, 1117 Status.UNAVAILABLE, 1118 Status.OK, 1119 bidiStreaming.rpcs.nextCallId, 1120 ); 1121 1122 const stream = bidiStreaming.invoke(); 1123 1124 for (let i = 0; i < 3; i++) { 1125 await stream 1126 .finishAndWait() 1127 .then(() => { 1128 fail('Promise should not be resolved'); 1129 }) 1130 .catch((reason) => { 1131 expect(reason.status).toEqual(Status.UNAVAILABLE); 1132 expect(stream.error).toEqual(Status.UNAVAILABLE); 1133 }); 1134 } 1135 }); 1136 }); 1137}); 1138 1139describe('RPC with custom serializers', () => { 1140 let client: Client; 1141 let lastPacketSent: RpcPacket | undefined; 1142 let requests: RpcPacket[] = []; 1143 let nextPackets: [Uint8Array, Status][] = []; 1144 let responseLock = false; 1145 let sendResponsesAfterPackets = 0; 1146 let outputException: Error | undefined; 1147 const requestSerializeFn = jest.fn(); 1148 const responseDeserializeFn = jest.fn(); 1149 1150 beforeEach(async () => { 1151 const channels = [ 1152 new Channel(1, handlePacket), 1153 new Channel(2, () => { 1154 // Do nothing. 1155 }), 1156 ]; 1157 const services = [ 1158 new Service('pw.rpc.test1.TheTestService', [ 1159 { 1160 name: 'SomeUnary', 1161 requestType: Request, 1162 responseType: Response, 1163 customRequestSerializer: { 1164 serialize: (msg) => { 1165 requestSerializeFn(msg); 1166 return msg.serializeBinary(); 1167 }, 1168 deserialize: Request.deserializeBinary, 1169 }, 1170 customResponseSerializer: { 1171 serialize: (msg) => { 1172 return msg.serializeBinary(); 1173 }, 1174 deserialize: (bytes) => { 1175 responseDeserializeFn(bytes); 1176 return Response.deserializeBinary(bytes); 1177 }, 1178 }, 1179 }, 1180 ]), 1181 ]; 1182 client = new Client(channels, services); 1183 lastPacketSent = undefined; 1184 requests = []; 1185 nextPackets = []; 1186 responseLock = false; 1187 sendResponsesAfterPackets = 0; 1188 outputException = undefined; 1189 }); 1190 1191 function lastRequest(): RpcPacket { 1192 if (requests.length == 0) { 1193 throw Error('Tried to fetch request from empty list'); 1194 } 1195 return requests[requests.length - 1]; 1196 } 1197 1198 function newRequest(magicNumber = 123): Message { 1199 const request = new Request(); 1200 request.setMagicNumber(magicNumber); 1201 return request; 1202 } 1203 1204 function newResponse(payload = '._.'): Message { 1205 const response = new Response(); 1206 response.setPayload(payload); 1207 return response; 1208 } 1209 1210 function handlePacket(data: Uint8Array): void { 1211 if (outputException !== undefined) { 1212 throw outputException; 1213 } 1214 requests.push(packets.decode(data)); 1215 1216 if (sendResponsesAfterPackets > 1) { 1217 sendResponsesAfterPackets -= 1; 1218 return; 1219 } 1220 1221 processEnqueuedPackets(); 1222 } 1223 1224 function processEnqueuedPackets(): void { 1225 // Avoid infinite recursion when processing a packet causes another packet 1226 // to send. 1227 if (responseLock) return; 1228 responseLock = true; 1229 for (const [packet, status] of nextPackets) { 1230 expect(client.processPacket(packet)).toEqual(status); 1231 } 1232 nextPackets = []; 1233 responseLock = false; 1234 } 1235 1236 function enqueueResponse( 1237 channelId: number, 1238 method: Method, 1239 status: Status, 1240 callId: number, 1241 response?: Message, 1242 ) { 1243 const packet = new RpcPacket(); 1244 packet.setType(PacketType.RESPONSE); 1245 packet.setChannelId(channelId); 1246 packet.setServiceId(method.service.id); 1247 packet.setMethodId(method.id); 1248 packet.setCallId(callId); 1249 packet.setStatus(status); 1250 if (response === undefined) { 1251 packet.setPayload(new Uint8Array(0)); 1252 } else { 1253 packet.setPayload(response.serializeBinary()); 1254 } 1255 nextPackets.push([packet.serializeBinary(), Status.OK]); 1256 } 1257 1258 function sentPayload(messageType: typeof Message): any { 1259 return messageType.deserializeBinary(lastRequest().getPayload_asU8()); 1260 } 1261 1262 describe('Unary', () => { 1263 let unaryStub: UnaryMethodStub; 1264 1265 beforeEach(async () => { 1266 unaryStub = client 1267 .channel() 1268 ?.methodStub( 1269 'pw.rpc.test1.TheTestService.SomeUnary', 1270 ) as UnaryMethodStub; 1271 }); 1272 1273 it('blocking call', async () => { 1274 for (let i = 0; i < 3; i++) { 1275 enqueueResponse( 1276 1, 1277 unaryStub.method, 1278 Status.ABORTED, 1279 unaryStub.rpcs.nextCallId, 1280 newResponse('0_o'), 1281 ); 1282 const [status, response] = await unaryStub.call(newRequest(6)); 1283 1284 expect(sentPayload(Request).getMagicNumber()).toEqual(6); 1285 expect(status).toEqual(Status.ABORTED); 1286 expect(response).toEqual(newResponse('0_o')); 1287 expect(requestSerializeFn).toBeCalledWith(newRequest(6)); 1288 expect(responseDeserializeFn).toBeCalledWith( 1289 newResponse('0_o').serializeBinary(), 1290 ); 1291 } 1292 }); 1293 }); 1294}); 1295