1// Copyright 2021 The Pigweed Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); you may not 4// use this file except in compliance with the License. You may obtain a copy of 5// the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12// License for the specific language governing permissions and limitations under 13// the License. 14 15import { Status } from 'pigweedjs/pw_status'; 16import { Message } from 'google-protobuf'; 17 18import { 19 BidirectionalStreamingCall, 20 Callback, 21 ClientStreamingCall, 22 ServerStreamingCall, 23 UnaryCall, 24} from './call'; 25import { Channel, Method, MethodType } from './descriptors'; 26import { PendingCalls, Rpc } from './rpc_classes'; 27 28const DEFAULT_MAX_STREAM_RESPONSES = 16_384; 29 30export function methodStubFactory( 31 rpcs: PendingCalls, 32 channel: Channel, 33 method: Method, 34): MethodStub { 35 switch (method.type) { 36 case MethodType.BIDIRECTIONAL_STREAMING: 37 return new BidirectionalStreamingMethodStub(rpcs, channel, method); 38 case MethodType.CLIENT_STREAMING: 39 return new ClientStreamingMethodStub(rpcs, channel, method); 40 case MethodType.SERVER_STREAMING: 41 return new ServerStreamingMethodStub(rpcs, channel, method); 42 case MethodType.UNARY: 43 return new UnaryMethodStub(rpcs, channel, method); 44 } 45} 46 47export abstract class MethodStub { 48 readonly method: Method; 49 readonly rpcs: PendingCalls; 50 readonly rpc: Rpc; 51 private channel: Channel; 52 53 constructor(rpcs: PendingCalls, channel: Channel, method: Method) { 54 this.method = method; 55 this.rpcs = rpcs; 56 this.channel = channel; 57 this.rpc = new Rpc(channel, method.service, method); 58 } 59 60 get id(): number { 61 return this.method.id; 62 } 63} 64 65export class UnaryMethodStub extends MethodStub { 66 invoke( 67 request: Message, 68 onNext: Callback = () => { 69 // Do nothing. 70 }, 71 onCompleted: Callback = () => { 72 // Do nothing. 73 }, 74 onError: Callback = () => { 75 // Do nothing. 76 }, 77 ): UnaryCall { 78 const call = new UnaryCall( 79 this.rpcs, 80 this.rpc, 81 onNext, 82 onCompleted, 83 onError, 84 /*maxResponses=*/ 1, 85 ); 86 call.invoke(request); 87 return call; 88 } 89 90 open( 91 request: Message, 92 onNext: Callback = () => { 93 // Do nothing. 94 }, 95 onCompleted: Callback = () => { 96 // Do nothing. 97 }, 98 onError: Callback = () => { 99 // Do nothing. 100 }, 101 ): UnaryCall { 102 const call = new UnaryCall( 103 this.rpcs, 104 this.rpc, 105 onNext, 106 onCompleted, 107 onError, 108 /*maxResponses=*/ 1, 109 ); 110 call.invoke(request, true); 111 return call; 112 } 113 114 async call(request: Message, timeout?: number): Promise<[Status, Message]> { 115 return await this.invoke(request).complete(timeout); 116 } 117} 118 119export class ServerStreamingMethodStub extends MethodStub { 120 invoke( 121 request?: Message, 122 onNext: Callback = () => { 123 // Do nothing. 124 }, 125 onCompleted: Callback = () => { 126 // Do nothing. 127 }, 128 onError: Callback = () => { 129 // Do nothing. 130 }, 131 maxResponses: number = DEFAULT_MAX_STREAM_RESPONSES, 132 ): ServerStreamingCall { 133 const call = new ServerStreamingCall( 134 this.rpcs, 135 this.rpc, 136 onNext, 137 onCompleted, 138 onError, 139 maxResponses, 140 ); 141 call.invoke(request); 142 return call; 143 } 144 145 open( 146 request: Message, 147 onNext: Callback = () => { 148 // Do nothing. 149 }, 150 onCompleted: Callback = () => { 151 // Do nothing. 152 }, 153 onError: Callback = () => { 154 // Do nothing. 155 }, 156 maxResponses: number = DEFAULT_MAX_STREAM_RESPONSES, 157 ): UnaryCall { 158 const call = new UnaryCall( 159 this.rpcs, 160 this.rpc, 161 onNext, 162 onCompleted, 163 onError, 164 maxResponses, 165 ); 166 call.invoke(request, true); 167 return call; 168 } 169 170 call(request?: Message, timeout?: number): Promise<[Status, Message[]]> { 171 return this.invoke(request).complete(timeout); 172 } 173} 174 175export class ClientStreamingMethodStub extends MethodStub { 176 invoke( 177 onNext: Callback = () => { 178 // Do nothing. 179 }, 180 onCompleted: Callback = () => { 181 // Do nothing. 182 }, 183 onError: Callback = () => { 184 // Do nothing. 185 }, 186 ): ClientStreamingCall { 187 const call = new ClientStreamingCall( 188 this.rpcs, 189 this.rpc, 190 onNext, 191 onCompleted, 192 onError, 193 /*maxResponses=*/ 1, 194 ); 195 call.invoke(); 196 return call; 197 } 198 199 open( 200 onNext: Callback = () => { 201 // Do nothing. 202 }, 203 onCompleted: Callback = () => { 204 // Do nothing. 205 }, 206 onError: Callback = () => { 207 // Do nothing. 208 }, 209 ): ClientStreamingCall { 210 const call = new ClientStreamingCall( 211 this.rpcs, 212 this.rpc, 213 onNext, 214 onCompleted, 215 onError, 216 /*maxResponses=*/ 1, 217 ); 218 call.invoke(undefined, true); 219 return call; 220 } 221 222 async call(requests: Array<Message> = [], timeout?: number) { 223 return this.invoke().finishAndWait(requests, timeout); 224 } 225} 226 227export class BidirectionalStreamingMethodStub extends MethodStub { 228 invoke( 229 onNext: Callback = () => { 230 // Do nothing. 231 }, 232 onCompleted: Callback = () => { 233 // Do nothing. 234 }, 235 onError: Callback = () => { 236 // Do nothing. 237 }, 238 maxResponses: number = DEFAULT_MAX_STREAM_RESPONSES, 239 ): BidirectionalStreamingCall { 240 const call = new BidirectionalStreamingCall( 241 this.rpcs, 242 this.rpc, 243 onNext, 244 onCompleted, 245 onError, 246 maxResponses, 247 ); 248 call.invoke(); 249 return call; 250 } 251 252 open( 253 onNext: Callback = () => { 254 // Do nothing. 255 }, 256 onCompleted: Callback = () => { 257 // Do nothing. 258 }, 259 onError: Callback = () => { 260 // Do nothing. 261 }, 262 maxResponses: number = DEFAULT_MAX_STREAM_RESPONSES, 263 ): BidirectionalStreamingCall { 264 const call = new BidirectionalStreamingCall( 265 this.rpcs, 266 this.rpc, 267 onNext, 268 onCompleted, 269 onError, 270 maxResponses, 271 ); 272 call.invoke(undefined, true); 273 return call; 274 } 275 276 async call(requests: Array<Message> = [], timeout?: number) { 277 return this.invoke().finishAndWait(requests, timeout); 278 } 279} 280