xref: /aosp_15_r20/external/pigweed/pw_transfer/ts/client.ts (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1// Copyright 2022 The Pigweed Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may not
4// use this file except in compliance with the License. You may obtain a copy of
5// the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12// License for the specific language governing permissions and limitations under
13// the License.
14
15/** Client for the pw_transfer service, which transmits data over pw_rpc. */
16
17import {
18  BidirectionalStreamingCall,
19  BidirectionalStreamingMethodStub,
20  ServiceClient,
21} from 'pigweedjs/pw_rpc';
22import { Status } from 'pigweedjs/pw_status';
23import { Chunk } from 'pigweedjs/protos/pw_transfer/transfer_pb';
24
25import {
26  ReadTransfer,
27  ProgressCallback,
28  Transfer,
29  WriteTransfer,
30} from './transfer';
31
32type TransferDict = {
33  [key: number]: Transfer;
34};
35
36const DEFAULT_MAX_RETRIES = 3;
37const DEFAULT_RESPONSE_TIMEOUT_S = 2;
38const DEFAULT_INITIAL_RESPONSE_TIMEOUT = 4;
39
40/**
41 *  A manager for transmitting data through an RPC TransferService.
42 *
43 *  This should be initialized with an active Manager over an RPC channel. Only
44 *  one instance of this class should exist for a configured RPC TransferService
45 *  -- the Manager supports multiple simultaneous transfers.
46 *
47 *  When created, a Manager starts a separate thread in which transfer
48 *  communications and events are handled.
49 */
50export class Manager {
51  // Ongoing transfers in the service by ID
52  readTransfers: TransferDict = {};
53  writeTransfers: TransferDict = {};
54
55  // RPC streams for read and write transfers. These are shareable by
56  // multiple transfers of the same type.
57  private readStream?: BidirectionalStreamingCall;
58  private writeStream?: BidirectionalStreamingCall;
59
60  /**
61   * Initializes a Manager on top of a TransferService.
62   *
63   * Args:
64   * @param{ServiceClient} service: the pw_rpc transfer service
65   * client
66   * @param{number} defaultResponseTimeoutS: max time to wait between receiving
67   * packets
68   * @param{number} initialResponseTimeoutS: timeout for the first packet; may
69   * be longer to account for transfer handler initialization
70   * @param{number} maxRetries: number of times to retry after a timeout
71   */
72  constructor(
73    private service: ServiceClient,
74    private defaultResponseTimeoutS = DEFAULT_RESPONSE_TIMEOUT_S,
75    private initialResponseTimeoutS = DEFAULT_INITIAL_RESPONSE_TIMEOUT,
76    private maxRetries = DEFAULT_MAX_RETRIES,
77  ) {}
78
79  /**
80   * Receives ("downloads") data from the server.
81   *
82   * @throws Throws an error when the transfer fails to complete.
83   */
84  async read(
85    resourceId: number,
86    progressCallback?: ProgressCallback,
87  ): Promise<Uint8Array> {
88    if (resourceId in this.readTransfers) {
89      throw new Error(
90        `Read transfer for resource ${resourceId} already exists`,
91      );
92    }
93    const transfer = new ReadTransfer(
94      resourceId,
95      this.sendReadChunkCallback,
96      this.defaultResponseTimeoutS,
97      this.maxRetries,
98      progressCallback,
99    );
100
101    this.startReadTransfer(transfer);
102
103    const status = await transfer.done;
104
105    delete this.readTransfers[transfer.id];
106    if (status !== Status.OK) {
107      throw new TransferError(transfer.id, transfer.status);
108    }
109    return transfer.data;
110  }
111
112  /** Begins a new read transfer, opening the stream if it isn't. */
113  startReadTransfer(transfer: Transfer): void {
114    this.readTransfers[transfer.id] = transfer;
115
116    if (this.readStream === undefined) {
117      this.openReadStream();
118    }
119    console.debug(`Starting new read transfer ${transfer.id}`);
120    transfer.begin();
121  }
122
123  /**
124  Transmits (uploads) data to the server.
125   *
126   * @param{number} resourceId: ID of the resource to which to write.
127   * @param{Uint8Array} data: Data to send to the server.
128   */
129  async write(
130    resourceId: number,
131    data: Uint8Array,
132    progressCallback?: ProgressCallback,
133  ): Promise<void> {
134    const transfer = new WriteTransfer(
135      resourceId,
136      data,
137      this.sendWriteChunkCallback,
138      this.defaultResponseTimeoutS,
139      this.initialResponseTimeoutS,
140      this.maxRetries,
141      progressCallback,
142    );
143    this.startWriteTransfer(transfer);
144
145    const status = await transfer.done;
146
147    delete this.writeTransfers[transfer.id];
148    if (transfer.status !== Status.OK) {
149      throw new TransferError(transfer.id, transfer.status);
150    }
151  }
152
153  sendReadChunkCallback = (chunk: Chunk) => {
154    this.readStream!.send(chunk);
155  };
156
157  sendWriteChunkCallback = (chunk: Chunk) => {
158    this.writeStream!.send(chunk);
159  };
160
161  /** Begins a new write transfer, opening the stream if it isn't */
162  startWriteTransfer(transfer: Transfer): void {
163    this.writeTransfers[transfer.id] = transfer;
164
165    if (!this.writeStream) {
166      this.openWriteStream();
167    }
168
169    console.debug(`Starting new write transfer ${transfer.id}`);
170    transfer.begin();
171  }
172
173  private openReadStream(): void {
174    const readRpc = this.service.method(
175      'Read',
176    )! as BidirectionalStreamingMethodStub;
177    this.readStream = readRpc.invoke(
178      (chunk: Chunk) => {
179        this.handleChunk(this.readTransfers, chunk);
180      },
181      () => {
182        // Do nothing.
183      },
184      this.onReadError,
185    );
186  }
187
188  private openWriteStream(): void {
189    const writeRpc = this.service.method(
190      'Write',
191    )! as BidirectionalStreamingMethodStub;
192    this.writeStream = writeRpc.invoke(
193      (chunk: Chunk) => {
194        this.handleChunk(this.writeTransfers, chunk);
195      },
196      () => {
197        // Do nothing.
198      },
199      this.onWriteError,
200    );
201  }
202
203  /**
204   * Callback for an RPC error in the read stream.
205   */
206  private onReadError = (status: Status) => {
207    if (status === Status.FAILED_PRECONDITION) {
208      // FAILED_PRECONDITION indicates that the stream packet was not
209      // recognized as the stream is not open. This could occur if the
210      // server resets during an active transfer. Re-open the stream to
211      // allow pending transfers to continue.
212      this.openReadStream();
213      return;
214    }
215
216    // Other errors are unrecoverable. Clear the stream and cancel any
217    // pending transfers with an INTERNAL status as this is a system
218    // error.
219    this.readStream = undefined;
220
221    for (const key in this.readTransfers) {
222      const transfer = this.readTransfers[key];
223      transfer.abort(Status.INTERNAL);
224    }
225    this.readTransfers = {};
226    console.error(`Read stream shut down ${Status[status]}`);
227  };
228
229  private onWriteError = (status: Status) => {
230    if (status === Status.FAILED_PRECONDITION) {
231      // FAILED_PRECONDITION indicates that the stream packet was not
232      // recognized as the stream is not open. This could occur if the
233      // server resets during an active transfer. Re-open the stream to
234      // allow pending transfers to continue.
235      this.openWriteStream();
236    } else {
237      // Other errors are unrecoverable. Clear the stream and cancel any
238      // pending transfers with an INTERNAL status as this is a system
239      // error.
240      this.writeStream = undefined;
241
242      for (const key in this.writeTransfers) {
243        const transfer = this.writeTransfers[key];
244        transfer.abort(Status.INTERNAL);
245      }
246      this.writeTransfers = {};
247      console.error(`Write stream shut down: ${Status[status]}`);
248    }
249  };
250
251  /**
252   * Processes an incoming chunk from a stream.
253   *
254   * The chunk is dispatched to an active transfer based on its ID. If the
255   * transfer indicates that it is complete, the provided completion callback
256   * is invoked.
257   */
258  private async handleChunk(transfers: TransferDict, chunk: Chunk) {
259    const transfer = transfers[chunk.getTransferId()];
260    if (transfer === undefined) {
261      console.error(
262        `TransferManager received chunk for unknown transfer ${chunk.getTransferId()}`,
263      );
264      return;
265    }
266    transfer.handleChunk(chunk);
267  }
268}
269
270/**
271 * Exception raised when a transfer fails.
272 *
273 * Stores the ID of the failed transfer and the error that occured.
274 */
275class TransferError extends Error {
276  id: number;
277  status: Status;
278
279  constructor(id: number, status: Status) {
280    super(`Transfer ${id} failed with status ${Status[status]}`);
281    this.status = status;
282    this.id = id;
283  }
284}
285