xref: /aosp_15_r20/external/pigweed/pw_rpc/ts/method.ts (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1// Copyright 2021 The Pigweed Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may not
4// use this file except in compliance with the License. You may obtain a copy of
5// the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12// License for the specific language governing permissions and limitations under
13// the License.
14
15import { Status } from 'pigweedjs/pw_status';
16import { Message } from 'google-protobuf';
17
18import {
19  BidirectionalStreamingCall,
20  Callback,
21  ClientStreamingCall,
22  ServerStreamingCall,
23  UnaryCall,
24} from './call';
25import { Channel, Method, MethodType } from './descriptors';
26import { PendingCalls, Rpc } from './rpc_classes';
27
28const DEFAULT_MAX_STREAM_RESPONSES = 16_384;
29
30export function methodStubFactory(
31  rpcs: PendingCalls,
32  channel: Channel,
33  method: Method,
34): MethodStub {
35  switch (method.type) {
36    case MethodType.BIDIRECTIONAL_STREAMING:
37      return new BidirectionalStreamingMethodStub(rpcs, channel, method);
38    case MethodType.CLIENT_STREAMING:
39      return new ClientStreamingMethodStub(rpcs, channel, method);
40    case MethodType.SERVER_STREAMING:
41      return new ServerStreamingMethodStub(rpcs, channel, method);
42    case MethodType.UNARY:
43      return new UnaryMethodStub(rpcs, channel, method);
44  }
45}
46
47export abstract class MethodStub {
48  readonly method: Method;
49  readonly rpcs: PendingCalls;
50  readonly rpc: Rpc;
51  private channel: Channel;
52
53  constructor(rpcs: PendingCalls, channel: Channel, method: Method) {
54    this.method = method;
55    this.rpcs = rpcs;
56    this.channel = channel;
57    this.rpc = new Rpc(channel, method.service, method);
58  }
59
60  get id(): number {
61    return this.method.id;
62  }
63}
64
65export class UnaryMethodStub extends MethodStub {
66  invoke(
67    request: Message,
68    onNext: Callback = () => {
69      // Do nothing.
70    },
71    onCompleted: Callback = () => {
72      // Do nothing.
73    },
74    onError: Callback = () => {
75      // Do nothing.
76    },
77  ): UnaryCall {
78    const call = new UnaryCall(
79      this.rpcs,
80      this.rpc,
81      onNext,
82      onCompleted,
83      onError,
84      /*maxResponses=*/ 1,
85    );
86    call.invoke(request);
87    return call;
88  }
89
90  open(
91    request: Message,
92    onNext: Callback = () => {
93      // Do nothing.
94    },
95    onCompleted: Callback = () => {
96      // Do nothing.
97    },
98    onError: Callback = () => {
99      // Do nothing.
100    },
101  ): UnaryCall {
102    const call = new UnaryCall(
103      this.rpcs,
104      this.rpc,
105      onNext,
106      onCompleted,
107      onError,
108      /*maxResponses=*/ 1,
109    );
110    call.invoke(request, true);
111    return call;
112  }
113
114  async call(request: Message, timeout?: number): Promise<[Status, Message]> {
115    return await this.invoke(request).complete(timeout);
116  }
117}
118
119export class ServerStreamingMethodStub extends MethodStub {
120  invoke(
121    request?: Message,
122    onNext: Callback = () => {
123      // Do nothing.
124    },
125    onCompleted: Callback = () => {
126      // Do nothing.
127    },
128    onError: Callback = () => {
129      // Do nothing.
130    },
131    maxResponses: number = DEFAULT_MAX_STREAM_RESPONSES,
132  ): ServerStreamingCall {
133    const call = new ServerStreamingCall(
134      this.rpcs,
135      this.rpc,
136      onNext,
137      onCompleted,
138      onError,
139      maxResponses,
140    );
141    call.invoke(request);
142    return call;
143  }
144
145  open(
146    request: Message,
147    onNext: Callback = () => {
148      // Do nothing.
149    },
150    onCompleted: Callback = () => {
151      // Do nothing.
152    },
153    onError: Callback = () => {
154      // Do nothing.
155    },
156    maxResponses: number = DEFAULT_MAX_STREAM_RESPONSES,
157  ): UnaryCall {
158    const call = new UnaryCall(
159      this.rpcs,
160      this.rpc,
161      onNext,
162      onCompleted,
163      onError,
164      maxResponses,
165    );
166    call.invoke(request, true);
167    return call;
168  }
169
170  call(request?: Message, timeout?: number): Promise<[Status, Message[]]> {
171    return this.invoke(request).complete(timeout);
172  }
173}
174
175export class ClientStreamingMethodStub extends MethodStub {
176  invoke(
177    onNext: Callback = () => {
178      // Do nothing.
179    },
180    onCompleted: Callback = () => {
181      // Do nothing.
182    },
183    onError: Callback = () => {
184      // Do nothing.
185    },
186  ): ClientStreamingCall {
187    const call = new ClientStreamingCall(
188      this.rpcs,
189      this.rpc,
190      onNext,
191      onCompleted,
192      onError,
193      /*maxResponses=*/ 1,
194    );
195    call.invoke();
196    return call;
197  }
198
199  open(
200    onNext: Callback = () => {
201      // Do nothing.
202    },
203    onCompleted: Callback = () => {
204      // Do nothing.
205    },
206    onError: Callback = () => {
207      // Do nothing.
208    },
209  ): ClientStreamingCall {
210    const call = new ClientStreamingCall(
211      this.rpcs,
212      this.rpc,
213      onNext,
214      onCompleted,
215      onError,
216      /*maxResponses=*/ 1,
217    );
218    call.invoke(undefined, true);
219    return call;
220  }
221
222  async call(requests: Array<Message> = [], timeout?: number) {
223    return this.invoke().finishAndWait(requests, timeout);
224  }
225}
226
227export class BidirectionalStreamingMethodStub extends MethodStub {
228  invoke(
229    onNext: Callback = () => {
230      // Do nothing.
231    },
232    onCompleted: Callback = () => {
233      // Do nothing.
234    },
235    onError: Callback = () => {
236      // Do nothing.
237    },
238    maxResponses: number = DEFAULT_MAX_STREAM_RESPONSES,
239  ): BidirectionalStreamingCall {
240    const call = new BidirectionalStreamingCall(
241      this.rpcs,
242      this.rpc,
243      onNext,
244      onCompleted,
245      onError,
246      maxResponses,
247    );
248    call.invoke();
249    return call;
250  }
251
252  open(
253    onNext: Callback = () => {
254      // Do nothing.
255    },
256    onCompleted: Callback = () => {
257      // Do nothing.
258    },
259    onError: Callback = () => {
260      // Do nothing.
261    },
262    maxResponses: number = DEFAULT_MAX_STREAM_RESPONSES,
263  ): BidirectionalStreamingCall {
264    const call = new BidirectionalStreamingCall(
265      this.rpcs,
266      this.rpc,
267      onNext,
268      onCompleted,
269      onError,
270      maxResponses,
271    );
272    call.invoke(undefined, true);
273    return call;
274  }
275
276  async call(requests: Array<Message> = [], timeout?: number) {
277    return this.invoke().finishAndWait(requests, timeout);
278  }
279}
280