xref: /aosp_15_r20/external/pigweed/pw_transfer/public/pw_transfer/client.h (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 #pragma once
15 
16 #include "pw_function/function.h"
17 #include "pw_result/result.h"
18 #include "pw_rpc/client.h"
19 #include "pw_status/status.h"
20 #include "pw_stream/stream.h"
21 #include "pw_transfer/internal/config.h"
22 #include "pw_transfer/transfer.raw_rpc.pb.h"
23 #include "pw_transfer/transfer_thread.h"
24 
25 namespace pw::transfer {
26 
27 class Client {
28  public:
29   /// A handle to an active transfer. Used to manage the transfer during its
30   /// operation.
31   class Handle {
32    public:
Handle()33     constexpr Handle() : client_(nullptr), id_(kUnassignedHandleId) {}
34 
35     /// Terminates the transfer.
Cancel()36     void Cancel() {
37       if (client_ != nullptr) {
38         client_->CancelTransfer(*this);
39       }
40     }
41 
42     /// In a `Write()` transfer, updates the size of the resource being
43     /// transferred. This size will be indicated to the server.
SetTransferSize(size_t size_bytes)44     void SetTransferSize(size_t size_bytes) {
45       if (client_ != nullptr) {
46         client_->UpdateTransferSize(*this, size_bytes);
47       }
48     }
49 
50    private:
51     friend class Client;
52 
53     static constexpr uint32_t kUnassignedHandleId = 0;
54 
Handle(Client * client,uint32_t id)55     explicit constexpr Handle(Client* client, uint32_t id)
56         : client_(client), id_(id) {}
id()57     constexpr uint32_t id() const { return id_; }
is_unassigned()58     constexpr bool is_unassigned() const { return id_ == kUnassignedHandleId; }
59 
60     Client* client_;
61     uint32_t id_;
62   };
63 
64   using CompletionFunc = Function<void(Status)>;
65 
66   // Initializes a transfer client on a specified RPC client and channel.
67   // Transfer tasks are processed on the provided transfer thread, which may be
68   // shared between a transfer client and service.
69   //
70   // `max_window_size_bytes` is the maximum amount of data to ask for at a
71   // time during a read transfer, unless told a more restrictive amount by the
72   // transfer's stream. This size should span multiple chunks, and can be set
73   // quite large. The transfer protocol automatically adjusts its window size
74   // as a transfer progresses to attempt to find an optimal configuration for
75   // the connection over which it is running.
76   Client(rpc::Client& rpc_client,
77          uint32_t channel_id,
78          TransferThread& transfer_thread,
79          size_t max_window_size_bytes,
80          uint32_t extend_window_divisor = cfg::kDefaultExtendWindowDivisor)
default_protocol_version(ProtocolVersion::kLatest)81       : default_protocol_version(ProtocolVersion::kLatest),
82         client_(rpc_client, channel_id),
83         transfer_thread_(transfer_thread),
84         next_handle_id_(1),
85         max_parameters_(max_window_size_bytes,
86                         transfer_thread.max_chunk_size(),
87                         extend_window_divisor),
88         max_retries_(cfg::kDefaultMaxClientRetries),
89         max_lifetime_retries_(cfg::kDefaultMaxLifetimeRetries),
90         has_read_stream_(false),
91         has_write_stream_(false) {}
92 
93   [[deprecated("Explicitly provide a maximum window size")]]
Client(rpc::Client & rpc_client,uint32_t channel_id,TransferThread & transfer_thread)94   Client(rpc::Client& rpc_client,
95          uint32_t channel_id,
96          TransferThread& transfer_thread)
97       : Client(rpc_client,
98                channel_id,
99                transfer_thread,
100                transfer_thread.max_chunk_size()) {}
101 
102   // Begins a new read transfer for the given resource ID. The data read from
103   // the server is written to the provided writer. Returns OK if the transfer is
104   // successfully started. When the transfer finishes (successfully or not), the
105   // completion callback is invoked with the overall status.
106   Result<Handle> Read(
107       uint32_t resource_id,
108       stream::Writer& output,
109       CompletionFunc&& on_completion,
110       ProtocolVersion protocol_version,
111       chrono::SystemClock::duration timeout = cfg::kDefaultClientTimeout,
112       chrono::SystemClock::duration initial_chunk_timeout =
113           cfg::kDefaultInitialChunkTimeout,
114       uint32_t initial_offset = 0u);
115 
116   Result<Handle> Read(
117       uint32_t resource_id,
118       stream::Writer& output,
119       CompletionFunc&& on_completion,
120       chrono::SystemClock::duration timeout = cfg::kDefaultClientTimeout,
121       chrono::SystemClock::duration initial_chunk_timeout =
122           cfg::kDefaultInitialChunkTimeout,
123       uint32_t initial_offset = 0u) {
124     return Read(resource_id,
125                 output,
126                 std::move(on_completion),
127                 default_protocol_version,
128                 timeout,
129                 initial_chunk_timeout,
130                 initial_offset);
131   }
132 
133   // Begins a new write transfer for the given resource ID. Data from the
134   // provided reader is sent to the server. When the transfer finishes
135   // (successfully or not), the completion callback is invoked with the overall
136   // status.
137   Result<Handle> Write(
138       uint32_t resource_id,
139       stream::Reader& input,
140       CompletionFunc&& on_completion,
141       ProtocolVersion protocol_version,
142       chrono::SystemClock::duration timeout = cfg::kDefaultClientTimeout,
143       chrono::SystemClock::duration initial_chunk_timeout =
144           cfg::kDefaultInitialChunkTimeout,
145       uint32_t initial_offset = 0u);
146 
147   Result<Handle> Write(
148       uint32_t resource_id,
149       stream::Reader& input,
150       CompletionFunc&& on_completion,
151       chrono::SystemClock::duration timeout = cfg::kDefaultClientTimeout,
152       chrono::SystemClock::duration initial_chunk_timeout =
153           cfg::kDefaultInitialChunkTimeout,
154       uint32_t initial_offset = 0u) {
155     return Write(resource_id,
156                  input,
157                  std::move(on_completion),
158                  default_protocol_version,
159                  timeout,
160                  initial_chunk_timeout,
161                  initial_offset);
162   }
163 
set_extend_window_divisor(uint32_t extend_window_divisor)164   Status set_extend_window_divisor(uint32_t extend_window_divisor) {
165     if (extend_window_divisor <= 1) {
166       return Status::InvalidArgument();
167     }
168 
169     max_parameters_.set_extend_window_divisor(extend_window_divisor);
170     return OkStatus();
171   }
172 
set_max_retries(uint32_t max_retries)173   constexpr Status set_max_retries(uint32_t max_retries) {
174     if (max_retries < 1 || max_retries > max_lifetime_retries_) {
175       return Status::InvalidArgument();
176     }
177     max_retries_ = max_retries;
178     return OkStatus();
179   }
180 
set_max_lifetime_retries(uint32_t max_lifetime_retries)181   constexpr Status set_max_lifetime_retries(uint32_t max_lifetime_retries) {
182     if (max_lifetime_retries < max_retries_) {
183       return Status::InvalidArgument();
184     }
185     max_lifetime_retries_ = max_lifetime_retries;
186     return OkStatus();
187   }
188 
set_protocol_version(ProtocolVersion new_version)189   constexpr void set_protocol_version(ProtocolVersion new_version) {
190     default_protocol_version = new_version;
191   }
192 
193  private:
194   // Terminates an ongoing transfer.
CancelTransfer(Handle handle)195   void CancelTransfer(Handle handle) {
196     if (!handle.is_unassigned()) {
197       transfer_thread_.CancelClientTransfer(handle.id());
198     }
199   }
200 
UpdateTransferSize(Handle handle,size_t transfer_size_bytes)201   void UpdateTransferSize(Handle handle, size_t transfer_size_bytes) {
202     if (!handle.is_unassigned()) {
203       transfer_thread_.UpdateClientTransfer(handle.id(), transfer_size_bytes);
204     }
205   }
206 
207   ProtocolVersion default_protocol_version;
208 
209   using Transfer = pw_rpc::raw::Transfer;
210 
211   void OnRpcError(Status status, internal::TransferType type);
212 
213   Handle AssignHandle();
214 
215   Transfer::Client client_;
216   TransferThread& transfer_thread_;
217 
218   uint32_t next_handle_id_;
219 
220   internal::TransferParameters max_parameters_;
221   uint32_t max_retries_;
222   uint32_t max_lifetime_retries_;
223 
224   bool has_read_stream_;
225   bool has_write_stream_;
226 };
227 
228 }  // namespace pw::transfer
229