1 // Copyright 2024 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 #pragma once 15 16 #include <cinttypes> 17 #include <cstddef> 18 #include <limits> 19 #include <optional> 20 21 #include "pw_assert/assert.h" 22 #include "pw_chrono/system_clock.h" 23 #include "pw_rpc/writer.h" 24 #include "pw_status/status.h" 25 #include "pw_stream/stream.h" 26 #include "pw_transfer/internal/chunk.h" 27 #include "pw_transfer/internal/config.h" 28 #include "pw_transfer/internal/event.h" 29 #include "pw_transfer/internal/protocol.h" 30 #include "pw_transfer/rate_estimate.h" 31 32 namespace pw::transfer::internal { 33 34 class TransferThread; 35 36 class TransferParameters { 37 public: TransferParameters(uint32_t max_window_size_bytes,uint32_t max_chunk_size_bytes,uint32_t extend_window_divisor)38 constexpr TransferParameters(uint32_t max_window_size_bytes, 39 uint32_t max_chunk_size_bytes, 40 uint32_t extend_window_divisor) 41 : max_window_size_bytes_(max_window_size_bytes), 42 max_chunk_size_bytes_(max_chunk_size_bytes), 43 extend_window_divisor_(extend_window_divisor) { 44 PW_ASSERT(max_window_size_bytes > 0); 45 PW_ASSERT(max_chunk_size_bytes > 0); 46 PW_ASSERT(extend_window_divisor > 1); 47 } 48 max_window_size_bytes()49 constexpr uint32_t max_window_size_bytes() const { 50 return max_window_size_bytes_; 51 } set_max_window_size_bytes(uint32_t max_window_size_bytes)52 constexpr void set_max_window_size_bytes(uint32_t max_window_size_bytes) { 53 max_window_size_bytes_ = max_window_size_bytes; 54 } 55 max_chunk_size_bytes()56 constexpr uint32_t max_chunk_size_bytes() const { 57 return max_chunk_size_bytes_; 58 } set_max_chunk_size_bytes(uint32_t max_chunk_size_bytes)59 constexpr void set_max_chunk_size_bytes(uint32_t max_chunk_size_bytes) { 60 max_chunk_size_bytes_ = max_chunk_size_bytes; 61 } 62 extend_window_divisor()63 constexpr uint32_t extend_window_divisor() const { 64 return extend_window_divisor_; 65 } set_extend_window_divisor(uint32_t extend_window_divisor)66 constexpr void set_extend_window_divisor(uint32_t extend_window_divisor) { 67 PW_DASSERT(extend_window_divisor > 1); 68 extend_window_divisor_ = extend_window_divisor; 69 } 70 71 private: 72 uint32_t max_window_size_bytes_; 73 uint32_t max_chunk_size_bytes_; 74 uint32_t extend_window_divisor_; 75 }; 76 77 // Information about a single transfer. 78 class Context { 79 public: 80 static constexpr uint32_t kUnassignedSessionId = 0; 81 82 Context(const Context&) = delete; 83 Context(Context&&) = delete; 84 Context& operator=(const Context&) = delete; 85 Context& operator=(Context&&) = delete; 86 session_id()87 constexpr uint32_t session_id() const { return session_id_; } resource_id()88 constexpr uint32_t resource_id() const { return resource_id_; } 89 90 // True if the context has been used for a transfer (it has an ID). initialized()91 bool initialized() const { 92 return transfer_state_ != TransferState::kInactive; 93 } 94 95 // True if the transfer is active. active()96 bool active() const { return transfer_state_ >= TransferState::kInitiating; } 97 type()98 constexpr TransferType type() const { 99 return static_cast<TransferType>(flags_ & kFlagsType); 100 } 101 timeout()102 std::optional<chrono::SystemClock::time_point> timeout() const { 103 return active() && next_timeout_ != kNoTimeout 104 ? std::optional(next_timeout_) 105 : std::nullopt; 106 } 107 108 // Returns true if the transfer's most recently set timeout has passed. timed_out()109 bool timed_out() const { 110 std::optional<chrono::SystemClock::time_point> next_timeout = timeout(); 111 return next_timeout.has_value() && 112 chrono::SystemClock::now() >= next_timeout.value(); 113 } 114 115 // Processes an event for this transfer. 116 void HandleEvent(const Event& event); 117 118 protected: 119 ~Context() = default; 120 Context()121 constexpr Context() 122 : initial_offset_(0), 123 session_id_(kUnassignedSessionId), 124 resource_id_(0), 125 desired_protocol_version_(ProtocolVersion::kUnknown), 126 configured_protocol_version_(ProtocolVersion::kUnknown), 127 flags_(0), 128 transfer_state_(TransferState::kInactive), 129 retries_(0), 130 max_retries_(0), 131 lifetime_retries_(0), 132 max_lifetime_retries_(0), 133 stream_(nullptr), 134 rpc_writer_(nullptr), 135 offset_(0), 136 window_size_(0), 137 window_end_offset_(0), 138 max_chunk_size_bytes_(std::numeric_limits<uint32_t>::max()), 139 window_size_multiplier_(1), 140 transmit_phase_(TransmitPhase::kSlowStart), 141 max_parameters_(nullptr), 142 thread_(nullptr), 143 last_chunk_sent_(Chunk::Type::kData), 144 last_chunk_offset_(0), 145 chunk_timeout_(chrono::SystemClock::duration::zero()), 146 initial_chunk_timeout_(chrono::SystemClock::duration::zero()), 147 interchunk_delay_(chrono::SystemClock::for_at_least( 148 std::chrono::microseconds(kDefaultChunkDelayMicroseconds))), 149 next_timeout_(kNoTimeout), 150 log_rate_limit_cfg_(cfg::kLogDefaultRateLimit), 151 log_rate_limit_(kNoRateLimit), 152 log_chunks_before_rate_limit_cfg_( 153 cfg::kLogDefaultChunksBeforeRateLimit), 154 log_chunks_before_rate_limit_(0) {} 155 reader()156 stream::Reader& reader() { 157 PW_DASSERT(active() && type() == TransferType::kTransmit); 158 return static_cast<stream::Reader&>(*stream_); 159 } 160 161 uint32_t initial_offset_; 162 163 private: 164 enum class TransferState : uint8_t { 165 // The context is available for use for a new transfer. 166 kInactive, 167 168 // A transfer completed and the final status chunk was sent. The Context is 169 // available for use for a new transfer. A receive transfer uses this state 170 // to allow a transmitter to retry its last chunk if the final status chunk 171 // was dropped. 172 // 173 // Only used by the legacy protocol. Starting from version 2, transfer 174 // completions are acknowledged, for which the TERMINATING state is used. 175 kCompleted, 176 177 // Transfer is starting. The server and client are performing an initial 178 // handshake and negotiating protocol and feature flags. 179 kInitiating, 180 181 // Waiting for the other end to send a chunk. 182 kWaiting, 183 184 // Transmitting a window of data to a receiver. 185 kTransmitting, 186 187 // Recovering after one or more chunks was dropped in an active transfer. 188 kRecovery, 189 190 // Transfer has completed locally and is waiting for the peer to acknowledge 191 // its final status. Only entered by the terminating side of the transfer. 192 // 193 // The context remains in a TERMINATING state until it receives an 194 // acknowledgement from the peer or times out. Either way, the context 195 // transitions to INACTIVE afterwards, fully cleaning it up for reuse. 196 // 197 // Used instead of COMPLETED starting from version 2. Unlike COMPLETED, 198 // contexts in a TERMINATING state cannot be used to start new transfers. 199 kTerminating, 200 }; 201 202 enum class TransmitAction { 203 // Start of a new transfer. 204 kBegin, 205 // First parameters chunk of a transfer. 206 kFirstParameters, 207 // Extend the current window length. 208 kExtend, 209 // Retransmit from a specified offset. 210 kRetransmit, 211 }; 212 213 // Slow start and congestion avoidance are analogues to the equally named 214 // phases in TCP congestion control. 215 enum class TransmitPhase : bool { kSlowStart, kCongestionAvoidance }; 216 set_transfer_state(TransferState state)217 void set_transfer_state(TransferState state) { transfer_state_ = state; } 218 219 // The session ID as unsigned instead of uint32_t so it can be used with %u. id_for_log()220 unsigned id_for_log() const { 221 static_assert(sizeof(unsigned) >= sizeof(session_id_)); 222 return static_cast<unsigned>(session_id_); 223 } 224 writer()225 stream::Writer& writer() { 226 PW_DASSERT(active() && type() == TransferType::kReceive); 227 return static_cast<stream::Writer&>(*stream_); 228 } 229 DataTransferComplete()230 bool DataTransferComplete() const { 231 return transfer_state_ == TransferState::kTerminating || 232 transfer_state_ == TransferState::kCompleted; 233 } 234 ShouldSkipCompletionHandshake()235 bool ShouldSkipCompletionHandshake() const { 236 // Completion handshakes are not part of the legacy protocol. Additionally, 237 // transfers which have not yet fully established should not handshake and 238 // simply time out. 239 return configured_protocol_version_ <= ProtocolVersion::kLegacy || 240 transfer_state_ == TransferState::kInitiating; 241 } 242 243 // Calculates the maximum size of actual data that can be sent within a 244 // single client write transfer chunk, accounting for the overhead of the 245 // transfer protocol and RPC system. 246 // 247 // Note: This function relies on RPC protocol internals. This is generally a 248 // *bad* idea, but is necessary here due to limitations of the RPC system 249 // and its asymmetric ingress and egress paths. 250 // 251 // TODO(frolv): This should be investigated further and perhaps addressed 252 // within the RPC system, at the least through a helper function. 253 uint32_t MaxWriteChunkSize(uint32_t max_chunk_size_bytes, 254 uint32_t channel_id) const; 255 256 // Initializes a new transfer using new_transfer. The provided stream 257 // argument is used in place of the NewTransferEvent's stream. Only 258 // initializes state; no packets are sent. 259 // 260 // Precondition: context is not active. 261 void Initialize(const NewTransferEvent& new_transfer); 262 263 // Starts a new transfer from an initialized context by sending the initial 264 // transfer chunk. This is only used by transfer clients, as the transfer 265 // service cannot initiate transfers. 266 // 267 // Calls Finish(), which calls the on_completion callback, if initiating a 268 // transfer fails. 269 void InitiateTransferAsClient(); 270 271 // Starts a new transfer on the server after receiving a request from a 272 // client. 273 bool StartTransferAsServer(const NewTransferEvent& new_transfer); 274 275 // Does final cleanup specific to the server or client. Returns whether the 276 // cleanup succeeded. An error in cleanup indicates that the transfer 277 // failed. 278 virtual Status FinalCleanup(Status status) = 0; 279 280 // Returns the total size of the transfer resource, or 281 // `std::numeric_limits<size_t>::max()` if unbounded. 282 virtual size_t TransferSizeBytes() const = 0; 283 284 // Seeks the reader source. Client may need to seek with reference to the 285 // initial offset, where the server shouldn't, so each context needs its own 286 // seek method. 287 virtual Status SeekReader(uint32_t offset) = 0; 288 289 // Processes a chunk in either a transfer or receive transfer. 290 void HandleChunkEvent(const ChunkEvent& event); 291 292 // Runs the initial three-way handshake when starting a new transfer. 293 void PerformInitialHandshake(const Chunk& chunk); 294 295 void UpdateLocalProtocolConfigurationFromPeer(const Chunk& chunk); 296 297 // Processes a chunk in a transmit transfer. 298 void HandleTransmitChunk(const Chunk& chunk); 299 300 // Processes a transfer parameters update in a transmit transfer. 301 void HandleTransferParametersUpdate(const Chunk& chunk); 302 303 // Sends the next chunk in a transmit transfer, if any. 304 void TransmitNextChunk(bool retransmit_requested); 305 306 // Processes a chunk in a receive transfer. 307 void HandleReceiveChunk(const Chunk& chunk); 308 309 // Processes a data chunk in a received while in the kWaiting state. 310 void HandleReceivedData(const Chunk& chunk); 311 312 // Sends the first chunk in a legacy transmit transfer. 313 void SendInitialLegacyTransmitChunk(); 314 315 // Updates the current receive transfer parameters based on the context's 316 // configuration. 317 void UpdateTransferParameters(TransmitAction action); 318 319 // Populates the transfer parameters fields on a chunk object. 320 void SetTransferParameters(Chunk& parameters); 321 322 // In a receive transfer, sends a parameters chunk telling the transmitter 323 // how much data they can send. 324 void SendTransferParameters(TransmitAction action); 325 326 // Updates the current receive transfer parameters, then sends them. 327 void UpdateAndSendTransferParameters(TransmitAction action); 328 329 // Processes a chunk in a terminating state. 330 void HandleTerminatingChunk(const Chunk& chunk); 331 332 // Ends the transfer with the specified status, sending a completion chunk to 333 // the peer. 334 void TerminateTransfer(Status status, bool with_resource_id = false); 335 336 // Ends a transfer following notification of completion from the peer. 337 void HandleTermination(Status status); 338 339 // Forcefully ends a transfer locally without contacting the peer. Abort(Status status)340 void Abort(Status status) { 341 Finish(status); 342 set_transfer_state(TransferState::kCompleted); 343 } 344 345 // Sends a final status chunk of a completed transfer without updating the 346 // transfer. Sends status_, which MUST have been set by a previous Finish() 347 // call. 348 void SendFinalStatusChunk(bool with_resource_id = false); 349 350 // Marks the transfer as completed and calls FinalCleanup(). Sets status_ to 351 // the final status for this transfer. The transfer MUST be active when this 352 // is called. 353 void Finish(Status status); 354 355 // Encodes the specified chunk to the encode buffer and sends it with the 356 // rpc_writer_. Calls Finish() with an error if the operation fails. 357 void EncodeAndSendChunk(const Chunk& chunk); 358 359 void SetTimeout(chrono::SystemClock::duration timeout); ClearTimeout()360 void ClearTimeout() { next_timeout_ = kNoTimeout; } 361 362 // Called when the transfer's timeout expires. 363 void HandleTimeout(); 364 365 // Resends the last packet or aborts the transfer if the maximum retries has 366 // been exceeded. 367 void Retry(); 368 void RetryHandshake(); 369 370 void LogTransferConfiguration(); 371 372 static constexpr uint8_t kFlagsType = 1 << 0; 373 static constexpr uint8_t kFlagsDataSent = 1 << 1; 374 static constexpr uint8_t kFlagsContactMade = 1 << 2; 375 376 static constexpr uint32_t kDefaultChunkDelayMicroseconds = 2000; 377 378 // How long to wait for the other side to ACK a final transfer chunk before 379 // resetting the context so that it can be reused. During this time, the 380 // status chunk will be re-sent for every non-ACK chunk received, 381 // continually notifying the other end that the transfer is over. 382 static constexpr chrono::SystemClock::duration kFinalChunkAckTimeout = 383 chrono::SystemClock::for_at_least(std::chrono::milliseconds(5000)); 384 385 static constexpr chrono::SystemClock::time_point kNoTimeout = 386 chrono::SystemClock::time_point(chrono::SystemClock::duration(0)); 387 388 static constexpr chrono::SystemClock::duration kNoRateLimit = 389 chrono::SystemClock::duration::zero(); 390 391 uint32_t session_id_; 392 uint32_t resource_id_; 393 394 // The version of the transfer protocol that this node wants to run. 395 ProtocolVersion desired_protocol_version_; 396 397 // The version of the transfer protocol that the context is actually running, 398 // following negotiation with the transfer peer. 399 ProtocolVersion configured_protocol_version_; 400 401 uint8_t flags_; 402 TransferState transfer_state_; 403 uint8_t retries_; 404 uint8_t max_retries_; 405 uint32_t lifetime_retries_; 406 uint32_t max_lifetime_retries_; 407 408 // The stream from which to read or to which to write data. 409 stream::Stream* stream_; 410 rpc::Writer* rpc_writer_; 411 412 uint32_t offset_; 413 uint32_t window_size_; 414 uint32_t window_end_offset_; 415 uint32_t max_chunk_size_bytes_; 416 417 uint32_t window_size_multiplier_; 418 TransmitPhase transmit_phase_; 419 420 const TransferParameters* max_parameters_; 421 TransferThread* thread_; 422 423 Chunk::Type last_chunk_sent_; 424 425 union { 426 Status status_; // Used when state is kCompleted. 427 uint32_t last_chunk_offset_; // Used in states kWaiting and kRecovery. 428 }; 429 430 // How long to wait for a chunk from the other end. 431 chrono::SystemClock::duration chunk_timeout_; 432 433 // How long for a client to wait for an initial server response. 434 chrono::SystemClock::duration initial_chunk_timeout_; 435 436 // How long to delay between transmitting subsequent data chunks within a 437 // window. 438 chrono::SystemClock::duration interchunk_delay_; 439 440 // Timestamp at which the transfer will next time out, or kNoTimeout. 441 chrono::SystemClock::time_point next_timeout_; 442 443 // Rate limit for repetitive logs 444 chrono::SystemClock::duration log_rate_limit_cfg_; 445 chrono::SystemClock::duration log_rate_limit_; 446 447 // chunk delay to start rate limiting 448 uint16_t log_chunks_before_rate_limit_cfg_; 449 uint16_t log_chunks_before_rate_limit_; 450 451 RateEstimate transfer_rate_; 452 }; 453 454 } // namespace pw::transfer::internal 455