1 // Copyright 2024 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 #pragma once 15 16 #include "pw_function/function.h" 17 #include "pw_result/result.h" 18 #include "pw_rpc/client.h" 19 #include "pw_status/status.h" 20 #include "pw_stream/stream.h" 21 #include "pw_transfer/internal/config.h" 22 #include "pw_transfer/transfer.raw_rpc.pb.h" 23 #include "pw_transfer/transfer_thread.h" 24 25 namespace pw::transfer { 26 27 class Client { 28 public: 29 /// A handle to an active transfer. Used to manage the transfer during its 30 /// operation. 31 class Handle { 32 public: Handle()33 constexpr Handle() : client_(nullptr), id_(kUnassignedHandleId) {} 34 35 /// Terminates the transfer. Cancel()36 void Cancel() { 37 if (client_ != nullptr) { 38 client_->CancelTransfer(*this); 39 } 40 } 41 42 /// In a `Write()` transfer, updates the size of the resource being 43 /// transferred. This size will be indicated to the server. SetTransferSize(size_t size_bytes)44 void SetTransferSize(size_t size_bytes) { 45 if (client_ != nullptr) { 46 client_->UpdateTransferSize(*this, size_bytes); 47 } 48 } 49 50 private: 51 friend class Client; 52 53 static constexpr uint32_t kUnassignedHandleId = 0; 54 Handle(Client * client,uint32_t id)55 explicit constexpr Handle(Client* client, uint32_t id) 56 : client_(client), id_(id) {} id()57 constexpr uint32_t id() const { return id_; } is_unassigned()58 constexpr bool is_unassigned() const { return id_ == kUnassignedHandleId; } 59 60 Client* client_; 61 uint32_t id_; 62 }; 63 64 using CompletionFunc = Function<void(Status)>; 65 66 // Initializes a transfer client on a specified RPC client and channel. 67 // Transfer tasks are processed on the provided transfer thread, which may be 68 // shared between a transfer client and service. 69 // 70 // `max_window_size_bytes` is the maximum amount of data to ask for at a 71 // time during a read transfer, unless told a more restrictive amount by the 72 // transfer's stream. This size should span multiple chunks, and can be set 73 // quite large. The transfer protocol automatically adjusts its window size 74 // as a transfer progresses to attempt to find an optimal configuration for 75 // the connection over which it is running. 76 Client(rpc::Client& rpc_client, 77 uint32_t channel_id, 78 TransferThread& transfer_thread, 79 size_t max_window_size_bytes, 80 uint32_t extend_window_divisor = cfg::kDefaultExtendWindowDivisor) default_protocol_version(ProtocolVersion::kLatest)81 : default_protocol_version(ProtocolVersion::kLatest), 82 client_(rpc_client, channel_id), 83 transfer_thread_(transfer_thread), 84 next_handle_id_(1), 85 max_parameters_(max_window_size_bytes, 86 transfer_thread.max_chunk_size(), 87 extend_window_divisor), 88 max_retries_(cfg::kDefaultMaxClientRetries), 89 max_lifetime_retries_(cfg::kDefaultMaxLifetimeRetries), 90 has_read_stream_(false), 91 has_write_stream_(false) {} 92 93 [[deprecated("Explicitly provide a maximum window size")]] Client(rpc::Client & rpc_client,uint32_t channel_id,TransferThread & transfer_thread)94 Client(rpc::Client& rpc_client, 95 uint32_t channel_id, 96 TransferThread& transfer_thread) 97 : Client(rpc_client, 98 channel_id, 99 transfer_thread, 100 transfer_thread.max_chunk_size()) {} 101 102 // Begins a new read transfer for the given resource ID. The data read from 103 // the server is written to the provided writer. Returns OK if the transfer is 104 // successfully started. When the transfer finishes (successfully or not), the 105 // completion callback is invoked with the overall status. 106 Result<Handle> Read( 107 uint32_t resource_id, 108 stream::Writer& output, 109 CompletionFunc&& on_completion, 110 ProtocolVersion protocol_version, 111 chrono::SystemClock::duration timeout = cfg::kDefaultClientTimeout, 112 chrono::SystemClock::duration initial_chunk_timeout = 113 cfg::kDefaultInitialChunkTimeout, 114 uint32_t initial_offset = 0u); 115 116 Result<Handle> Read( 117 uint32_t resource_id, 118 stream::Writer& output, 119 CompletionFunc&& on_completion, 120 chrono::SystemClock::duration timeout = cfg::kDefaultClientTimeout, 121 chrono::SystemClock::duration initial_chunk_timeout = 122 cfg::kDefaultInitialChunkTimeout, 123 uint32_t initial_offset = 0u) { 124 return Read(resource_id, 125 output, 126 std::move(on_completion), 127 default_protocol_version, 128 timeout, 129 initial_chunk_timeout, 130 initial_offset); 131 } 132 133 // Begins a new write transfer for the given resource ID. Data from the 134 // provided reader is sent to the server. When the transfer finishes 135 // (successfully or not), the completion callback is invoked with the overall 136 // status. 137 Result<Handle> Write( 138 uint32_t resource_id, 139 stream::Reader& input, 140 CompletionFunc&& on_completion, 141 ProtocolVersion protocol_version, 142 chrono::SystemClock::duration timeout = cfg::kDefaultClientTimeout, 143 chrono::SystemClock::duration initial_chunk_timeout = 144 cfg::kDefaultInitialChunkTimeout, 145 uint32_t initial_offset = 0u); 146 147 Result<Handle> Write( 148 uint32_t resource_id, 149 stream::Reader& input, 150 CompletionFunc&& on_completion, 151 chrono::SystemClock::duration timeout = cfg::kDefaultClientTimeout, 152 chrono::SystemClock::duration initial_chunk_timeout = 153 cfg::kDefaultInitialChunkTimeout, 154 uint32_t initial_offset = 0u) { 155 return Write(resource_id, 156 input, 157 std::move(on_completion), 158 default_protocol_version, 159 timeout, 160 initial_chunk_timeout, 161 initial_offset); 162 } 163 set_extend_window_divisor(uint32_t extend_window_divisor)164 Status set_extend_window_divisor(uint32_t extend_window_divisor) { 165 if (extend_window_divisor <= 1) { 166 return Status::InvalidArgument(); 167 } 168 169 max_parameters_.set_extend_window_divisor(extend_window_divisor); 170 return OkStatus(); 171 } 172 set_max_retries(uint32_t max_retries)173 constexpr Status set_max_retries(uint32_t max_retries) { 174 if (max_retries < 1 || max_retries > max_lifetime_retries_) { 175 return Status::InvalidArgument(); 176 } 177 max_retries_ = max_retries; 178 return OkStatus(); 179 } 180 set_max_lifetime_retries(uint32_t max_lifetime_retries)181 constexpr Status set_max_lifetime_retries(uint32_t max_lifetime_retries) { 182 if (max_lifetime_retries < max_retries_) { 183 return Status::InvalidArgument(); 184 } 185 max_lifetime_retries_ = max_lifetime_retries; 186 return OkStatus(); 187 } 188 set_protocol_version(ProtocolVersion new_version)189 constexpr void set_protocol_version(ProtocolVersion new_version) { 190 default_protocol_version = new_version; 191 } 192 193 private: 194 // Terminates an ongoing transfer. CancelTransfer(Handle handle)195 void CancelTransfer(Handle handle) { 196 if (!handle.is_unassigned()) { 197 transfer_thread_.CancelClientTransfer(handle.id()); 198 } 199 } 200 UpdateTransferSize(Handle handle,size_t transfer_size_bytes)201 void UpdateTransferSize(Handle handle, size_t transfer_size_bytes) { 202 if (!handle.is_unassigned()) { 203 transfer_thread_.UpdateClientTransfer(handle.id(), transfer_size_bytes); 204 } 205 } 206 207 ProtocolVersion default_protocol_version; 208 209 using Transfer = pw_rpc::raw::Transfer; 210 211 void OnRpcError(Status status, internal::TransferType type); 212 213 Handle AssignHandle(); 214 215 Transfer::Client client_; 216 TransferThread& transfer_thread_; 217 218 uint32_t next_handle_id_; 219 220 internal::TransferParameters max_parameters_; 221 uint32_t max_retries_; 222 uint32_t max_lifetime_retries_; 223 224 bool has_read_stream_; 225 bool has_write_stream_; 226 }; 227 228 } // namespace pw::transfer 229