xref: /aosp_15_r20/external/pigweed/pw_rpc/ts/client_test.ts (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1// Copyright 2022 The Pigweed Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may not
4// use this file except in compliance with the License. You may obtain a copy of
5// the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12// License for the specific language governing permissions and limitations under
13// the License.
14
15/* eslint-env browser */
16
17import { Status } from 'pigweedjs/pw_status';
18import { MessageCreator } from 'pigweedjs/pw_protobuf_compiler';
19import { Message } from 'google-protobuf';
20import {
21  PacketType,
22  RpcPacket,
23} from 'pigweedjs/protos/pw_rpc/internal/packet_pb';
24import { ProtoCollection } from 'pigweedjs/protos/collection';
25import { Request, Response } from 'pigweedjs/protos/pw_rpc/ts/test_pb';
26
27import { Client } from './client';
28import { Channel, Method, Service } from './descriptors';
29import {
30  BidirectionalStreamingMethodStub,
31  ClientStreamingMethodStub,
32  ServerStreamingMethodStub,
33  UnaryMethodStub,
34} from './method';
35import * as packets from './packets';
36
37const LEGACY_OPEN_CALL_ID = 0;
38const OPEN_CALL_ID = 2 ** 32 - 1;
39
40describe('Client', () => {
41  let protoCollection: ProtoCollection;
42  let client: Client;
43  let lastPacketSent: RpcPacket;
44
45  beforeEach(() => {
46    protoCollection = new ProtoCollection();
47    const channels = [new Channel(1, savePacket), new Channel(5)];
48    client = Client.fromProtoSet(channels, protoCollection);
49  });
50
51  function savePacket(packetBytes: Uint8Array): void {
52    lastPacketSent = RpcPacket.deserializeBinary(packetBytes);
53  }
54
55  it('channel returns undefined for empty list', () => {
56    const channels = Array<Channel>();
57    const emptyChannelClient = Client.fromProtoSet(channels, protoCollection);
58    expect(emptyChannelClient.channel()).toBeUndefined();
59  });
60
61  it('fetches channel or returns undefined', () => {
62    expect(client.channel(1)!.channel.id).toEqual(1);
63    expect(client.channel(5)!.channel.id).toEqual(5);
64    expect(client.channel()!.channel.id).toEqual(1);
65    expect(client.channel(2)).toBeUndefined();
66  });
67
68  it('ChannelClient fetches method by name', () => {
69    const channel = client.channel()!;
70    const stub = channel.methodStub('pw.rpc.test1.TheTestService.SomeUnary')!;
71    expect(stub.method.name).toEqual('SomeUnary');
72  });
73
74  it('ChannelClient for unknown name returns undefined', () => {
75    const channel = client.channel()!;
76    expect(channel.methodStub('')).toBeUndefined();
77    expect(
78      channel.methodStub('pw.rpc.test1.Garbage.SomeUnary'),
79    ).toBeUndefined();
80    expect(
81      channel.methodStub('pw.rpc.test1.TheTestService.Garbage'),
82    ).toBeUndefined();
83  });
84
85  it('processPacket with invalid proto data', () => {
86    const textEncoder = new TextEncoder();
87    const data = textEncoder.encode('NOT a packet!');
88    expect(client.processPacket(data)).toEqual(Status.DATA_LOSS);
89  });
90
91  it('processPacket not for client', () => {
92    const packet = new RpcPacket();
93    packet.setType(PacketType.REQUEST);
94    const processStatus = client.processPacket(packet.serializeBinary());
95    expect(processStatus).toEqual(Status.INVALID_ARGUMENT);
96  });
97
98  it('processPacket for unrecognized channel', () => {
99    const packet = packets.encodeResponse([123, 456, 789, 456], new Request());
100    expect(client.processPacket(packet)).toEqual(Status.NOT_FOUND);
101  });
102
103  it('processPacket for unrecognized service', () => {
104    const packet = packets.encodeResponse([1, 456, 789, 456], new Request());
105    const status = client.processPacket(packet);
106    expect(client.processPacket(packet)).toEqual(Status.OK);
107
108    expect(lastPacketSent.getChannelId()).toEqual(1);
109    expect(lastPacketSent.getServiceId()).toEqual(456);
110    expect(lastPacketSent.getMethodId()).toEqual(789);
111    expect(lastPacketSent.getCallId()).toEqual(456);
112    expect(lastPacketSent.getType()).toEqual(PacketType.CLIENT_ERROR);
113    expect(lastPacketSent.getStatus()).toEqual(Status.NOT_FOUND);
114  });
115
116  it('processPacket for unrecognized method', () => {
117    const service = client.services.values().next().value;
118
119    const packet = packets.encodeResponse(
120      [1, service.id, 789, 456],
121      new Request(),
122    );
123    const status = client.processPacket(packet);
124    expect(client.processPacket(packet)).toEqual(Status.OK);
125
126    expect(lastPacketSent.getChannelId()).toEqual(1);
127    expect(lastPacketSent.getServiceId()).toEqual(service.id);
128    expect(lastPacketSent.getMethodId()).toEqual(789);
129    expect(lastPacketSent.getCallId()).toEqual(456);
130    expect(lastPacketSent.getType()).toEqual(PacketType.CLIENT_ERROR);
131    expect(lastPacketSent.getStatus()).toEqual(Status.NOT_FOUND);
132  });
133
134  it('processPacket for non-pending method', () => {
135    const service = client.services.values().next().value;
136    const method = service.methods.values().next().value;
137
138    const packet = packets.encodeResponse(
139      [1, service.id, method.id, 456],
140      new Request(),
141    );
142    const status = client.processPacket(packet);
143    expect(client.processPacket(packet)).toEqual(Status.OK);
144
145    expect(lastPacketSent.getChannelId()).toEqual(1);
146    expect(lastPacketSent.getServiceId()).toEqual(service.id);
147    expect(lastPacketSent.getMethodId()).toEqual(method.id);
148    expect(lastPacketSent.getType()).toEqual(PacketType.CLIENT_ERROR);
149    expect(lastPacketSent.getStatus()).toEqual(Status.FAILED_PRECONDITION);
150  });
151});
152
153describe('RPC', () => {
154  let protoCollection: ProtoCollection;
155  let client: Client;
156  let lastPacketSent: RpcPacket | undefined;
157  let requests: RpcPacket[] = [];
158  let nextPackets: [Uint8Array, Status][] = [];
159  let responseLock = false;
160  let sendResponsesAfterPackets = 0;
161  let outputException: Error | undefined;
162
163  beforeEach(async () => {
164    protoCollection = new ProtoCollection();
165    const channels = [
166      new Channel(1, handlePacket),
167      new Channel(2, () => {
168        // Do nothing.
169      }),
170    ];
171    client = Client.fromProtoSet(channels, protoCollection);
172    lastPacketSent = undefined;
173    requests = [];
174    nextPackets = [];
175    responseLock = false;
176    sendResponsesAfterPackets = 0;
177    outputException = undefined;
178  });
179
180  function newRequest(magicNumber = 123): Message {
181    const request = new Request();
182    request.setMagicNumber(magicNumber);
183    return request;
184  }
185
186  function newResponse(payload = '._.'): Message {
187    const response = new Response();
188    response.setPayload(payload);
189    return response;
190  }
191
192  function enqueueResponse(
193    channelId: number,
194    method: Method,
195    status: Status,
196    callId: number,
197    response?: Message,
198  ) {
199    const packet = new RpcPacket();
200    packet.setType(PacketType.RESPONSE);
201    packet.setChannelId(channelId);
202    packet.setServiceId(method.service.id);
203    packet.setMethodId(method.id);
204    packet.setCallId(callId);
205    packet.setStatus(status);
206    if (response === undefined) {
207      packet.setPayload(new Uint8Array(0));
208    } else {
209      packet.setPayload(response.serializeBinary());
210    }
211    nextPackets.push([packet.serializeBinary(), Status.OK]);
212  }
213
214  function enqueueServerStream(
215    channelId: number,
216    method: Method,
217    response: Message,
218    callId: number,
219    status: Status = Status.OK,
220  ) {
221    const packet = new RpcPacket();
222    packet.setType(PacketType.SERVER_STREAM);
223    packet.setChannelId(channelId);
224    packet.setServiceId(method.service.id);
225    packet.setMethodId(method.id);
226    packet.setCallId(callId);
227    packet.setPayload(response.serializeBinary());
228    packet.setStatus(status);
229    nextPackets.push([packet.serializeBinary(), status]);
230  }
231
232  function enqueueError(
233    channelId: number,
234    method: Method,
235    status: Status,
236    processStatus: Status,
237    callId: number,
238  ) {
239    const packet = new RpcPacket();
240    packet.setType(PacketType.SERVER_ERROR);
241    packet.setChannelId(channelId);
242    packet.setServiceId(method.service.id);
243    packet.setMethodId(method.id);
244    packet.setCallId(callId);
245    packet.setStatus(status);
246
247    nextPackets.push([packet.serializeBinary(), processStatus]);
248  }
249
250  function lastRequest(): RpcPacket {
251    if (requests.length == 0) {
252      throw Error('Tried to fetch request from empty list');
253    }
254    return requests[requests.length - 1];
255  }
256
257  function sentPayload(messageType: typeof Message): any {
258    return messageType.deserializeBinary(lastRequest().getPayload_asU8());
259  }
260
261  function handlePacket(data: Uint8Array): void {
262    if (outputException !== undefined) {
263      throw outputException;
264    }
265    requests.push(packets.decode(data));
266
267    if (sendResponsesAfterPackets > 1) {
268      sendResponsesAfterPackets -= 1;
269      return;
270    }
271
272    processEnqueuedPackets();
273  }
274
275  function processEnqueuedPackets(): void {
276    // Avoid infinite recursion when processing a packet causes another packet
277    // to send.
278    if (responseLock) return;
279    responseLock = true;
280    for (const [packet, status] of nextPackets) {
281      expect(client.processPacket(packet)).toEqual(status);
282    }
283    nextPackets = [];
284    responseLock = false;
285  }
286
287  describe('Unary', () => {
288    let unaryStub: UnaryMethodStub;
289
290    beforeEach(async () => {
291      unaryStub = client
292        .channel()
293        ?.methodStub(
294          'pw.rpc.test1.TheTestService.SomeUnary',
295        ) as UnaryMethodStub;
296    });
297
298    const openCallIds = [
299      ['OPEN_CALL_ID', OPEN_CALL_ID],
300      ['LEGACY_OPEN_CALL_ID', LEGACY_OPEN_CALL_ID],
301    ];
302    openCallIds.forEach(([idName, callId]) => {
303      it(`matches responses with ${idName} to requests with arbitrary IDs`, async () => {
304        const promisedResponse = unaryStub.call(newRequest(6));
305        enqueueResponse(
306          1,
307          unaryStub.method,
308          Status.ABORTED,
309          OPEN_CALL_ID,
310          newResponse('is unrequested'),
311        );
312
313        processEnqueuedPackets();
314        const [status, response] = await promisedResponse;
315
316        expect(sentPayload(Request).getMagicNumber()).toEqual(6);
317        expect(status).toEqual(Status.ABORTED);
318        expect(response).toEqual(newResponse('is unrequested'));
319      });
320    });
321
322    it('blocking call', async () => {
323      for (let i = 0; i < 3; i++) {
324        enqueueResponse(
325          1,
326          unaryStub.method,
327          Status.ABORTED,
328          unaryStub.rpcs.nextCallId,
329          newResponse('0_o'),
330        );
331        const [status, response] = await unaryStub.call(newRequest(6));
332
333        expect(sentPayload(Request).getMagicNumber()).toEqual(6);
334        expect(status).toEqual(Status.ABORTED);
335        expect(response).toEqual(newResponse('0_o'));
336      }
337    });
338
339    it('nonblocking call', () => {
340      for (let i = 0; i < 3; i++) {
341        const response = newResponse('hello world');
342        enqueueResponse(
343          1,
344          unaryStub.method,
345          Status.ABORTED,
346          unaryStub.rpcs.nextCallId,
347          response,
348        );
349
350        const onNext = jest.fn();
351        const onCompleted = jest.fn();
352        const onError = jest.fn();
353        const call = unaryStub.invoke(
354          newRequest(5),
355          onNext,
356          onCompleted,
357          onError,
358        );
359
360        expect(sentPayload(Request).getMagicNumber()).toEqual(5);
361        expect(onNext).toHaveBeenCalledWith(response);
362        expect(onError).not.toHaveBeenCalled();
363        expect(onCompleted).toHaveBeenCalledWith(Status.ABORTED);
364      }
365    });
366
367    it('open', () => {
368      outputException = Error('Error should be ignored');
369
370      for (let i = 0; i < 3; i++) {
371        const response = newResponse('hello world');
372        enqueueResponse(
373          1,
374          unaryStub.method,
375          Status.ABORTED,
376          unaryStub.rpcs.nextCallId,
377          response,
378        );
379
380        const onNext = jest.fn();
381        const onCompleted = jest.fn();
382        const onError = jest.fn();
383        unaryStub.open(newRequest(5), onNext, onCompleted, onError);
384        expect(requests).toHaveLength(0);
385
386        processEnqueuedPackets();
387
388        expect(onNext).toHaveBeenCalledWith(response);
389        expect(onError).not.toHaveBeenCalled();
390        expect(onCompleted).toHaveBeenCalledWith(Status.ABORTED);
391      }
392    });
393
394    it('nonblocking concurrent call', () => {
395      // Start several calls to the same method
396      const callsAndCallbacks = [];
397      for (let i = 0; i < 3; i++) {
398        const onNext = jest.fn();
399        const onCompleted = jest.fn();
400        const onError = jest.fn();
401
402        const call = unaryStub.invoke(
403          newRequest(5),
404          onNext,
405          onCompleted,
406          onError,
407        );
408        callsAndCallbacks.push([call, onNext, onCompleted, onError]);
409
410        expect(sentPayload(Request).getMagicNumber()).toEqual(5);
411      }
412      // Respond only to the last call
413      const [lastCall, lastCallback] = callsAndCallbacks.pop();
414      const lastResponse = newResponse('last payload');
415
416      enqueueResponse(
417        1,
418        unaryStub.method,
419        Status.OK,
420        lastCall.callId,
421        lastResponse,
422      );
423      processEnqueuedPackets();
424
425      expect(lastCallback).toHaveBeenCalledWith(lastResponse);
426      for (const i in callsAndCallbacks) {
427        const [_call, onNext, onCompleted, onError] = callsAndCallbacks[i];
428        expect(onNext).toBeCalledTimes(0);
429        expect(onCompleted).toBeCalledTimes(0);
430        expect(onError).toBeCalledTimes(0);
431      }
432    });
433
434    it('blocking server error', async () => {
435      for (let i = 0; i < 3; i++) {
436        enqueueError(
437          1,
438          unaryStub.method,
439          Status.NOT_FOUND,
440          Status.OK,
441          unaryStub.rpcs.nextCallId,
442        );
443
444        try {
445          await unaryStub.call(newRequest());
446          fail('call expected to fail');
447        } catch (e: any) {
448          expect(e.status).toBe(Status.NOT_FOUND);
449        }
450      }
451    });
452
453    it('nonblocking call cancel', () => {
454      for (let i = 0; i < 3; i++) {
455        const onNext = jest.fn();
456        const call = unaryStub.invoke(newRequest(), onNext);
457
458        expect(requests.length).toBeGreaterThan(0);
459        requests = [];
460
461        expect(call.cancel()).toBe(true);
462        expect(lastRequest().getType()).toEqual(PacketType.CLIENT_ERROR);
463        expect(lastRequest().getStatus()).toEqual(Status.CANCELLED);
464
465        expect(call.cancel()).toBe(false);
466        expect(onNext).not.toHaveBeenCalled();
467      }
468    });
469
470    it('blocking call with timeout', async () => {
471      try {
472        await unaryStub.call(newRequest(), 10);
473        fail('Promise should not be resolve');
474      } catch (err: any) {
475        expect(err.timeoutMs).toEqual(10);
476      }
477    });
478
479    it('nonblocking exception in callback', () => {
480      const errorCallback = () => {
481        throw Error('Something went wrong!');
482      };
483
484      enqueueResponse(
485        1,
486        unaryStub.method,
487        Status.OK,
488        unaryStub.rpcs.nextCallId,
489      );
490      const call = unaryStub.invoke(newRequest(), errorCallback);
491      expect(call.callbackException!.name).toEqual('Error');
492      expect(call.callbackException!.message).toEqual('Something went wrong!');
493    });
494  });
495
496  describe('ServerStreaming', () => {
497    let serverStreaming: ServerStreamingMethodStub;
498
499    beforeEach(async () => {
500      serverStreaming = client
501        .channel()
502        ?.methodStub(
503          'pw.rpc.test1.TheTestService.SomeServerStreaming',
504        ) as ServerStreamingMethodStub;
505    });
506
507    it('non-blocking call', () => {
508      const response1 = newResponse('!!!');
509      const response2 = newResponse('?');
510
511      for (let i = 0; i < 3; i++) {
512        enqueueServerStream(
513          1,
514          serverStreaming.method,
515          response1,
516          serverStreaming.rpcs.nextCallId,
517        );
518        enqueueServerStream(
519          1,
520          serverStreaming.method,
521          response2,
522          serverStreaming.rpcs.nextCallId,
523        );
524        enqueueResponse(
525          1,
526          serverStreaming.method,
527          Status.ABORTED,
528          serverStreaming.rpcs.nextCallId,
529        );
530
531        const onNext = jest.fn();
532        const onCompleted = jest.fn();
533        const onError = jest.fn();
534        serverStreaming.invoke(newRequest(4), onNext, onCompleted, onError);
535
536        expect(onNext).toHaveBeenCalledWith(response1);
537        expect(onNext).toHaveBeenCalledWith(response2);
538        expect(onError).not.toHaveBeenCalled();
539        expect(onCompleted).toHaveBeenCalledWith(Status.ABORTED);
540
541        expect(
542          sentPayload(serverStreaming.method.requestType).getMagicNumber(),
543        ).toEqual(4);
544      }
545    });
546
547    it('open', () => {
548      outputException = Error('Error should be ignored');
549      const response1 = newResponse('!!!');
550      const response2 = newResponse('?');
551
552      for (let i = 0; i < 3; i++) {
553        enqueueServerStream(
554          1,
555          serverStreaming.method,
556          response1,
557          serverStreaming.rpcs.nextCallId,
558        );
559        enqueueServerStream(
560          1,
561          serverStreaming.method,
562          response2,
563          serverStreaming.rpcs.nextCallId,
564        );
565        enqueueResponse(
566          1,
567          serverStreaming.method,
568          Status.ABORTED,
569          serverStreaming.rpcs.nextCallId,
570        );
571
572        const onNext = jest.fn();
573        const onCompleted = jest.fn();
574        const onError = jest.fn();
575        const call = serverStreaming.open(
576          newRequest(3),
577          onNext,
578          onCompleted,
579          onError,
580        );
581
582        expect(requests).toHaveLength(0);
583        processEnqueuedPackets();
584
585        expect(onNext).toHaveBeenCalledWith(response1);
586        expect(onNext).toHaveBeenCalledWith(response2);
587        expect(onError).not.toHaveBeenCalled();
588        expect(onCompleted).toHaveBeenCalledWith(Status.ABORTED);
589      }
590    });
591
592    it('blocking timeout', async () => {
593      try {
594        await serverStreaming.call(newRequest(), 10);
595        fail('Promise should not be resolve');
596      } catch (err: any) {
597        expect(err.timeoutMs).toEqual(10);
598      }
599    });
600
601    it('non-blocking cancel', () => {
602      const testResponse = newResponse('!!!');
603      enqueueServerStream(
604        1,
605        serverStreaming.method,
606        testResponse,
607        serverStreaming.rpcs.nextCallId,
608      );
609
610      const onNext = jest.fn();
611      const onCompleted = jest.fn();
612      const onError = jest.fn();
613      let call = serverStreaming.invoke(newRequest(3), onNext);
614      expect(onNext).toHaveBeenNthCalledWith(1, testResponse);
615
616      // onNext.calls.reset();
617
618      call.cancel();
619      expect(lastRequest().getType()).toEqual(PacketType.CLIENT_ERROR);
620      expect(lastRequest().getStatus()).toEqual(Status.CANCELLED);
621
622      // Ensure the RPC can be called after being cancelled.
623      enqueueServerStream(
624        1,
625        serverStreaming.method,
626        testResponse,
627        serverStreaming.rpcs.nextCallId,
628      );
629      enqueueResponse(
630        1,
631        serverStreaming.method,
632        Status.OK,
633        serverStreaming.rpcs.nextCallId,
634      );
635      call = serverStreaming.invoke(newRequest(), onNext, onCompleted, onError);
636      expect(onNext).toHaveBeenNthCalledWith(2, testResponse);
637      expect(onError).not.toHaveBeenCalled();
638      expect(onCompleted).toHaveBeenCalledWith(Status.OK);
639    });
640  });
641
642  describe('ClientStreaming', () => {
643    let clientStreaming: ClientStreamingMethodStub;
644
645    beforeEach(async () => {
646      clientStreaming = client
647        .channel()
648        ?.methodStub(
649          'pw.rpc.test1.TheTestService.SomeClientStreaming',
650        ) as ClientStreamingMethodStub;
651    });
652
653    it('non-blocking call', () => {
654      const testResponse = newResponse('-.-');
655
656      for (let i = 0; i < 3; i++) {
657        const onNext = jest.fn();
658        const stream = clientStreaming.invoke(onNext);
659        expect(stream.completed).toBe(false);
660
661        stream.send(newRequest(31));
662        expect(lastRequest().getType()).toEqual(PacketType.CLIENT_STREAM);
663        expect(sentPayload(Request).getMagicNumber()).toEqual(31);
664        expect(stream.completed).toBe(false);
665
666        // Enqueue the server response to be sent after the next message.
667        enqueueResponse(
668          1,
669          clientStreaming.method,
670          Status.OK,
671          stream.callId,
672          testResponse,
673        );
674
675        stream.send(newRequest(32));
676        expect(lastRequest().getType()).toEqual(PacketType.CLIENT_STREAM);
677        expect(sentPayload(Request).getMagicNumber()).toEqual(32);
678
679        expect(onNext).toHaveBeenCalledWith(testResponse);
680        expect(stream.completed).toBe(true);
681        expect(stream.status).toEqual(Status.OK);
682        expect(stream.error).toBeUndefined();
683      }
684    });
685
686    it('open', () => {
687      outputException = Error('Error should be ignored');
688      const response = newResponse('!!!');
689
690      for (let i = 0; i < 3; i++) {
691        enqueueResponse(
692          1,
693          clientStreaming.method,
694          Status.OK,
695          clientStreaming.rpcs.nextCallId,
696          response,
697        );
698
699        const onNext = jest.fn();
700        const onCompleted = jest.fn();
701        const onError = jest.fn();
702        const call = clientStreaming.open(onNext, onCompleted, onError);
703        expect(requests).toHaveLength(0);
704
705        processEnqueuedPackets();
706
707        expect(onNext).toHaveBeenCalledWith(response);
708        expect(onError).not.toHaveBeenCalled();
709        expect(onCompleted).toHaveBeenCalledWith(Status.OK);
710      }
711    });
712
713    it('blocking timeout', async () => {
714      try {
715        await clientStreaming.call([newRequest()], 10);
716        fail('Promise should not be resolve');
717      } catch (err: any) {
718        expect(err.timeoutMs).toEqual(10);
719      }
720    });
721
722    it('non-blocking call ended by client', () => {
723      const testResponse = newResponse('0.o');
724      for (let i = 0; i < 3; i++) {
725        const onNext = jest.fn();
726        const stream = clientStreaming.invoke(onNext);
727        expect(stream.completed).toBe(false);
728
729        stream.send(newRequest(31));
730        expect(lastRequest().getType()).toEqual(PacketType.CLIENT_STREAM);
731        expect(sentPayload(Request).getMagicNumber()).toEqual(31);
732        expect(stream.completed).toBe(false);
733
734        // Enqueue the server response to be sent after the next message.
735        enqueueResponse(
736          1,
737          clientStreaming.method,
738          Status.OK,
739          stream.callId,
740          testResponse,
741        );
742
743        stream.finishAndWait();
744        expect(lastRequest().getType()).toEqual(
745          PacketType.CLIENT_REQUEST_COMPLETION,
746        );
747
748        expect(onNext).toHaveBeenCalledWith(testResponse);
749        expect(stream.completed).toBe(true);
750        expect(stream.status).toEqual(Status.OK);
751        expect(stream.error).toBeUndefined();
752      }
753    });
754
755    it('non-blocking call cancelled', () => {
756      for (let i = 0; i < 3; i++) {
757        const stream = clientStreaming.invoke();
758        stream.send(newRequest());
759
760        expect(stream.cancel()).toBe(true);
761        expect(lastRequest().getType()).toEqual(PacketType.CLIENT_ERROR);
762        expect(lastRequest().getStatus()).toEqual(Status.CANCELLED);
763        expect(stream.cancel()).toBe(false);
764        expect(stream.completed).toBe(true);
765        expect(stream.error).toEqual(Status.CANCELLED);
766      }
767    });
768
769    it('non-blocking call server error', async () => {
770      for (let i = 0; i < 3; i++) {
771        const stream = clientStreaming.invoke();
772        enqueueError(
773          1,
774          clientStreaming.method,
775          Status.INVALID_ARGUMENT,
776          Status.OK,
777          stream.callId,
778        );
779
780        stream.send(newRequest());
781
782        await stream
783          .finishAndWait()
784          .then(() => {
785            fail('Promise should not be resolved');
786          })
787          .catch((reason) => {
788            expect(reason.status).toEqual(Status.INVALID_ARGUMENT);
789          });
790      }
791    });
792
793    it('non-blocking call server error after stream end', async () => {
794      for (let i = 0; i < 3; i++) {
795        const stream = clientStreaming.invoke();
796        // Error will be sent in response to the CLIENT_REQUEST_COMPLETION packet.
797        enqueueError(
798          1,
799          clientStreaming.method,
800          Status.INVALID_ARGUMENT,
801          Status.OK,
802          stream.callId,
803        );
804
805        await stream
806          .finishAndWait()
807          .then(() => {
808            fail('Promise should not be resolved');
809          })
810          .catch((reason) => {
811            expect(reason.status).toEqual(Status.INVALID_ARGUMENT);
812          });
813      }
814    });
815
816    it('non-blocking call send after cancelled', () => {
817      expect.assertions(2);
818      const stream = clientStreaming.invoke();
819      expect(stream.cancel()).toBe(true);
820
821      try {
822        stream.send(newRequest());
823      } catch (e) {
824        console.log(e);
825        expect(e.status).toEqual(Status.CANCELLED);
826      }
827      // expect(() => stream.send(newRequest())).toThrowError(
828      //   error => error.status === Status.CANCELLED
829      // );
830    });
831
832    it('non-blocking finish after completed', async () => {
833      const enqueuedResponse = newResponse('?!');
834      enqueueResponse(
835        1,
836        clientStreaming.method,
837        Status.UNAVAILABLE,
838        clientStreaming.rpcs.nextCallId,
839        enqueuedResponse,
840      );
841
842      const stream = clientStreaming.invoke();
843      const result = await stream.finishAndWait();
844      expect(result[1]).toEqual(enqueuedResponse);
845
846      expect(await stream.finishAndWait()).toEqual(result);
847      expect(await stream.finishAndWait()).toEqual(result);
848    });
849
850    it('non-blocking finish after error', async () => {
851      enqueueError(
852        1,
853        clientStreaming.method,
854        Status.UNAVAILABLE,
855        Status.OK,
856        clientStreaming.rpcs.nextCallId,
857      );
858      const stream = clientStreaming.invoke();
859
860      for (let i = 0; i < 3; i++) {
861        await stream
862          .finishAndWait()
863          .then(() => {
864            fail('Promise should not be resolved');
865          })
866          .catch((reason) => {
867            expect(reason.status).toEqual(Status.UNAVAILABLE);
868            expect(stream.error).toEqual(Status.UNAVAILABLE);
869            expect(stream.response).toBeUndefined();
870          });
871      }
872    });
873  });
874
875  describe('BidirectionalStreaming', () => {
876    let bidiStreaming: BidirectionalStreamingMethodStub;
877
878    beforeEach(async () => {
879      bidiStreaming = client
880        .channel()
881        ?.methodStub(
882          'pw.rpc.test1.TheTestService.SomeBidiStreaming',
883        ) as BidirectionalStreamingMethodStub;
884    });
885
886    it('blocking call', async () => {
887      const testRequests = [newRequest(123), newRequest(456)];
888
889      sendResponsesAfterPackets = 3;
890      enqueueResponse(
891        1,
892        bidiStreaming.method,
893        Status.NOT_FOUND,
894        bidiStreaming.rpcs.nextCallId,
895      );
896
897      const results = await bidiStreaming.call(testRequests);
898      expect(results[0]).toEqual(Status.NOT_FOUND);
899      expect(results[1]).toEqual([]);
900    });
901
902    it('blocking server error', async () => {
903      const testRequests = [newRequest(123)];
904      enqueueError(
905        1,
906        bidiStreaming.method,
907        Status.NOT_FOUND,
908        Status.OK,
909        bidiStreaming.rpcs.nextCallId,
910      );
911
912      await bidiStreaming
913        .call(testRequests)
914        .then(() => {
915          fail('Promise should not be resolved');
916        })
917        .catch((reason) => {
918          expect(reason.status).toEqual(Status.NOT_FOUND);
919        });
920    });
921
922    it('non-blocking call', () => {
923      const rep1 = newResponse('!!!');
924      const rep2 = newResponse('?');
925
926      for (let i = 0; i < 3; i++) {
927        const testResponses: Array<Message> = [];
928        const stream = bidiStreaming.invoke((response) => {
929          testResponses.push(response);
930        });
931        expect(stream.completed).toBe(false);
932
933        stream.send(newRequest(55));
934        expect(lastRequest().getType()).toEqual(PacketType.CLIENT_STREAM);
935        expect(sentPayload(Request).getMagicNumber()).toEqual(55);
936        expect(stream.completed).toBe(false);
937        expect(testResponses).toEqual([]);
938
939        enqueueServerStream(1, bidiStreaming.method, rep1, stream.callId);
940        enqueueServerStream(1, bidiStreaming.method, rep2, stream.callId);
941
942        stream.send(newRequest(66));
943        expect(lastRequest().getType()).toEqual(PacketType.CLIENT_STREAM);
944        expect(sentPayload(Request).getMagicNumber()).toEqual(66);
945        expect(stream.completed).toBe(false);
946        expect(testResponses).toEqual([rep1, rep2]);
947
948        enqueueResponse(1, bidiStreaming.method, Status.OK, stream.callId);
949
950        stream.send(newRequest(77));
951        expect(stream.completed).toBe(true);
952        expect(testResponses).toEqual([rep1, rep2]);
953        expect(stream.status).toEqual(Status.OK);
954        expect(stream.error).toBeUndefined();
955      }
956    });
957
958    it('open', () => {
959      outputException = Error('Error should be ignored');
960      const response1 = newResponse('!!!');
961      const response2 = newResponse('?');
962
963      for (let i = 0; i < 3; i++) {
964        enqueueServerStream(
965          1,
966          bidiStreaming.method,
967          response1,
968          bidiStreaming.rpcs.nextCallId,
969        );
970        enqueueServerStream(
971          1,
972          bidiStreaming.method,
973          response2,
974          bidiStreaming.rpcs.nextCallId,
975        );
976        enqueueResponse(
977          1,
978          bidiStreaming.method,
979          Status.OK,
980          bidiStreaming.rpcs.nextCallId,
981        );
982
983        const onNext = jest.fn();
984        const onCompleted = jest.fn();
985        const onError = jest.fn();
986        const call = bidiStreaming.open(onNext, onCompleted, onError);
987        expect(requests).toHaveLength(0);
988
989        processEnqueuedPackets();
990
991        expect(onNext).toHaveBeenCalledWith(response1);
992        expect(onNext).toHaveBeenCalledWith(response2);
993        expect(onError).not.toHaveBeenCalled();
994        expect(onCompleted).toHaveBeenCalledWith(Status.OK);
995      }
996    });
997
998    it('blocking timeout', async () => {
999      try {
1000        await bidiStreaming.call([newRequest()], 10);
1001        fail('Promise should not be resolve');
1002      } catch (err: any) {
1003        expect(err.timeoutMs).toEqual(10);
1004      }
1005    });
1006
1007    it('non-blocking server error', async () => {
1008      const response = newResponse('!!!');
1009
1010      for (let i = 0; i < 3; i++) {
1011        const testResponses: Array<Message> = [];
1012        const stream = bidiStreaming.invoke((response) => {
1013          testResponses.push(response);
1014        });
1015        expect(stream.completed).toBe(false);
1016
1017        enqueueServerStream(1, bidiStreaming.method, response, stream.callId);
1018
1019        stream.send(newRequest(55));
1020        expect(stream.completed).toBe(false);
1021        expect(testResponses).toEqual([response]);
1022
1023        enqueueError(
1024          1,
1025          bidiStreaming.method,
1026          Status.OUT_OF_RANGE,
1027          Status.OK,
1028          stream.callId,
1029        );
1030
1031        stream.send(newRequest(999));
1032        expect(stream.completed).toBe(true);
1033        expect(testResponses).toEqual([response]);
1034        expect(stream.status).toBeUndefined();
1035        expect(stream.error).toEqual(Status.OUT_OF_RANGE);
1036
1037        await stream
1038          .finishAndWait()
1039          .then(() => {
1040            fail('Promise should not be resolved');
1041          })
1042          .catch((reason) => {
1043            expect(reason.status).toEqual(Status.OUT_OF_RANGE);
1044          });
1045      }
1046    });
1047    it('non-blocking server error after stream end', async () => {
1048      for (let i = 0; i < 3; i++) {
1049        const stream = bidiStreaming.invoke();
1050
1051        // Error is sent in response to CLIENT_REQUEST_COMPLETION packet.
1052        enqueueError(
1053          1,
1054          bidiStreaming.method,
1055          Status.INVALID_ARGUMENT,
1056          Status.OK,
1057          stream.callId,
1058        );
1059
1060        await stream
1061          .finishAndWait()
1062          .then(() => {
1063            fail('Promise should not be resolved');
1064          })
1065          .catch((reason) => {
1066            expect(reason.status).toEqual(Status.INVALID_ARGUMENT);
1067          });
1068      }
1069    });
1070
1071    it('non-blocking send after cancelled', async () => {
1072      const stream = bidiStreaming.invoke();
1073      expect(stream.cancel()).toBe(true);
1074
1075      try {
1076        stream.send(newRequest());
1077        fail('send should have failed');
1078      } catch (e: any) {
1079        expect(e.status).toBe(Status.CANCELLED);
1080      }
1081    });
1082
1083    it('non-blocking finish after completed', async () => {
1084      const response = newResponse('!?');
1085      enqueueServerStream(
1086        1,
1087        bidiStreaming.method,
1088        response,
1089        bidiStreaming.rpcs.nextCallId,
1090      );
1091      enqueueResponse(
1092        1,
1093        bidiStreaming.method,
1094        Status.UNAVAILABLE,
1095        bidiStreaming.rpcs.nextCallId,
1096      );
1097
1098      const stream = bidiStreaming.invoke();
1099      const result = await stream.finishAndWait();
1100      expect(result[1]).toEqual([response]);
1101
1102      expect(await stream.finishAndWait()).toEqual(result);
1103      expect(await stream.finishAndWait()).toEqual(result);
1104    });
1105
1106    it('non-blocking finish after error', async () => {
1107      const response = newResponse('!?');
1108      enqueueServerStream(
1109        1,
1110        bidiStreaming.method,
1111        response,
1112        bidiStreaming.rpcs.nextCallId,
1113      );
1114      enqueueError(
1115        1,
1116        bidiStreaming.method,
1117        Status.UNAVAILABLE,
1118        Status.OK,
1119        bidiStreaming.rpcs.nextCallId,
1120      );
1121
1122      const stream = bidiStreaming.invoke();
1123
1124      for (let i = 0; i < 3; i++) {
1125        await stream
1126          .finishAndWait()
1127          .then(() => {
1128            fail('Promise should not be resolved');
1129          })
1130          .catch((reason) => {
1131            expect(reason.status).toEqual(Status.UNAVAILABLE);
1132            expect(stream.error).toEqual(Status.UNAVAILABLE);
1133          });
1134      }
1135    });
1136  });
1137});
1138
1139describe('RPC with custom serializers', () => {
1140  let client: Client;
1141  let lastPacketSent: RpcPacket | undefined;
1142  let requests: RpcPacket[] = [];
1143  let nextPackets: [Uint8Array, Status][] = [];
1144  let responseLock = false;
1145  let sendResponsesAfterPackets = 0;
1146  let outputException: Error | undefined;
1147  const requestSerializeFn = jest.fn();
1148  const responseDeserializeFn = jest.fn();
1149
1150  beforeEach(async () => {
1151    const channels = [
1152      new Channel(1, handlePacket),
1153      new Channel(2, () => {
1154        // Do nothing.
1155      }),
1156    ];
1157    const services = [
1158      new Service('pw.rpc.test1.TheTestService', [
1159        {
1160          name: 'SomeUnary',
1161          requestType: Request,
1162          responseType: Response,
1163          customRequestSerializer: {
1164            serialize: (msg) => {
1165              requestSerializeFn(msg);
1166              return msg.serializeBinary();
1167            },
1168            deserialize: Request.deserializeBinary,
1169          },
1170          customResponseSerializer: {
1171            serialize: (msg) => {
1172              return msg.serializeBinary();
1173            },
1174            deserialize: (bytes) => {
1175              responseDeserializeFn(bytes);
1176              return Response.deserializeBinary(bytes);
1177            },
1178          },
1179        },
1180      ]),
1181    ];
1182    client = new Client(channels, services);
1183    lastPacketSent = undefined;
1184    requests = [];
1185    nextPackets = [];
1186    responseLock = false;
1187    sendResponsesAfterPackets = 0;
1188    outputException = undefined;
1189  });
1190
1191  function lastRequest(): RpcPacket {
1192    if (requests.length == 0) {
1193      throw Error('Tried to fetch request from empty list');
1194    }
1195    return requests[requests.length - 1];
1196  }
1197
1198  function newRequest(magicNumber = 123): Message {
1199    const request = new Request();
1200    request.setMagicNumber(magicNumber);
1201    return request;
1202  }
1203
1204  function newResponse(payload = '._.'): Message {
1205    const response = new Response();
1206    response.setPayload(payload);
1207    return response;
1208  }
1209
1210  function handlePacket(data: Uint8Array): void {
1211    if (outputException !== undefined) {
1212      throw outputException;
1213    }
1214    requests.push(packets.decode(data));
1215
1216    if (sendResponsesAfterPackets > 1) {
1217      sendResponsesAfterPackets -= 1;
1218      return;
1219    }
1220
1221    processEnqueuedPackets();
1222  }
1223
1224  function processEnqueuedPackets(): void {
1225    // Avoid infinite recursion when processing a packet causes another packet
1226    // to send.
1227    if (responseLock) return;
1228    responseLock = true;
1229    for (const [packet, status] of nextPackets) {
1230      expect(client.processPacket(packet)).toEqual(status);
1231    }
1232    nextPackets = [];
1233    responseLock = false;
1234  }
1235
1236  function enqueueResponse(
1237    channelId: number,
1238    method: Method,
1239    status: Status,
1240    callId: number,
1241    response?: Message,
1242  ) {
1243    const packet = new RpcPacket();
1244    packet.setType(PacketType.RESPONSE);
1245    packet.setChannelId(channelId);
1246    packet.setServiceId(method.service.id);
1247    packet.setMethodId(method.id);
1248    packet.setCallId(callId);
1249    packet.setStatus(status);
1250    if (response === undefined) {
1251      packet.setPayload(new Uint8Array(0));
1252    } else {
1253      packet.setPayload(response.serializeBinary());
1254    }
1255    nextPackets.push([packet.serializeBinary(), Status.OK]);
1256  }
1257
1258  function sentPayload(messageType: typeof Message): any {
1259    return messageType.deserializeBinary(lastRequest().getPayload_asU8());
1260  }
1261
1262  describe('Unary', () => {
1263    let unaryStub: UnaryMethodStub;
1264
1265    beforeEach(async () => {
1266      unaryStub = client
1267        .channel()
1268        ?.methodStub(
1269          'pw.rpc.test1.TheTestService.SomeUnary',
1270        ) as UnaryMethodStub;
1271    });
1272
1273    it('blocking call', async () => {
1274      for (let i = 0; i < 3; i++) {
1275        enqueueResponse(
1276          1,
1277          unaryStub.method,
1278          Status.ABORTED,
1279          unaryStub.rpcs.nextCallId,
1280          newResponse('0_o'),
1281        );
1282        const [status, response] = await unaryStub.call(newRequest(6));
1283
1284        expect(sentPayload(Request).getMagicNumber()).toEqual(6);
1285        expect(status).toEqual(Status.ABORTED);
1286        expect(response).toEqual(newResponse('0_o'));
1287        expect(requestSerializeFn).toBeCalledWith(newRequest(6));
1288        expect(responseDeserializeFn).toBeCalledWith(
1289          newResponse('0_o').serializeBinary(),
1290        );
1291      }
1292    });
1293  });
1294});
1295