xref: /aosp_15_r20/external/pigweed/pw_transfer/public/pw_transfer/internal/context.h (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 #pragma once
15 
16 #include <cinttypes>
17 #include <cstddef>
18 #include <limits>
19 #include <optional>
20 
21 #include "pw_assert/assert.h"
22 #include "pw_chrono/system_clock.h"
23 #include "pw_rpc/writer.h"
24 #include "pw_status/status.h"
25 #include "pw_stream/stream.h"
26 #include "pw_transfer/internal/chunk.h"
27 #include "pw_transfer/internal/config.h"
28 #include "pw_transfer/internal/event.h"
29 #include "pw_transfer/internal/protocol.h"
30 #include "pw_transfer/rate_estimate.h"
31 
32 namespace pw::transfer::internal {
33 
34 class TransferThread;
35 
36 class TransferParameters {
37  public:
TransferParameters(uint32_t max_window_size_bytes,uint32_t max_chunk_size_bytes,uint32_t extend_window_divisor)38   constexpr TransferParameters(uint32_t max_window_size_bytes,
39                                uint32_t max_chunk_size_bytes,
40                                uint32_t extend_window_divisor)
41       : max_window_size_bytes_(max_window_size_bytes),
42         max_chunk_size_bytes_(max_chunk_size_bytes),
43         extend_window_divisor_(extend_window_divisor) {
44     PW_ASSERT(max_window_size_bytes > 0);
45     PW_ASSERT(max_chunk_size_bytes > 0);
46     PW_ASSERT(extend_window_divisor > 1);
47   }
48 
max_window_size_bytes()49   constexpr uint32_t max_window_size_bytes() const {
50     return max_window_size_bytes_;
51   }
set_max_window_size_bytes(uint32_t max_window_size_bytes)52   constexpr void set_max_window_size_bytes(uint32_t max_window_size_bytes) {
53     max_window_size_bytes_ = max_window_size_bytes;
54   }
55 
max_chunk_size_bytes()56   constexpr uint32_t max_chunk_size_bytes() const {
57     return max_chunk_size_bytes_;
58   }
set_max_chunk_size_bytes(uint32_t max_chunk_size_bytes)59   constexpr void set_max_chunk_size_bytes(uint32_t max_chunk_size_bytes) {
60     max_chunk_size_bytes_ = max_chunk_size_bytes;
61   }
62 
extend_window_divisor()63   constexpr uint32_t extend_window_divisor() const {
64     return extend_window_divisor_;
65   }
set_extend_window_divisor(uint32_t extend_window_divisor)66   constexpr void set_extend_window_divisor(uint32_t extend_window_divisor) {
67     PW_DASSERT(extend_window_divisor > 1);
68     extend_window_divisor_ = extend_window_divisor;
69   }
70 
71  private:
72   uint32_t max_window_size_bytes_;
73   uint32_t max_chunk_size_bytes_;
74   uint32_t extend_window_divisor_;
75 };
76 
77 // Information about a single transfer.
78 class Context {
79  public:
80   static constexpr uint32_t kUnassignedSessionId = 0;
81 
82   Context(const Context&) = delete;
83   Context(Context&&) = delete;
84   Context& operator=(const Context&) = delete;
85   Context& operator=(Context&&) = delete;
86 
session_id()87   constexpr uint32_t session_id() const { return session_id_; }
resource_id()88   constexpr uint32_t resource_id() const { return resource_id_; }
89 
90   // True if the context has been used for a transfer (it has an ID).
initialized()91   bool initialized() const {
92     return transfer_state_ != TransferState::kInactive;
93   }
94 
95   // True if the transfer is active.
active()96   bool active() const { return transfer_state_ >= TransferState::kInitiating; }
97 
type()98   constexpr TransferType type() const {
99     return static_cast<TransferType>(flags_ & kFlagsType);
100   }
101 
timeout()102   std::optional<chrono::SystemClock::time_point> timeout() const {
103     return active() && next_timeout_ != kNoTimeout
104                ? std::optional(next_timeout_)
105                : std::nullopt;
106   }
107 
108   // Returns true if the transfer's most recently set timeout has passed.
timed_out()109   bool timed_out() const {
110     std::optional<chrono::SystemClock::time_point> next_timeout = timeout();
111     return next_timeout.has_value() &&
112            chrono::SystemClock::now() >= next_timeout.value();
113   }
114 
115   // Processes an event for this transfer.
116   void HandleEvent(const Event& event);
117 
118  protected:
119   ~Context() = default;
120 
Context()121   constexpr Context()
122       : initial_offset_(0),
123         session_id_(kUnassignedSessionId),
124         resource_id_(0),
125         desired_protocol_version_(ProtocolVersion::kUnknown),
126         configured_protocol_version_(ProtocolVersion::kUnknown),
127         flags_(0),
128         transfer_state_(TransferState::kInactive),
129         retries_(0),
130         max_retries_(0),
131         lifetime_retries_(0),
132         max_lifetime_retries_(0),
133         stream_(nullptr),
134         rpc_writer_(nullptr),
135         offset_(0),
136         window_size_(0),
137         window_end_offset_(0),
138         max_chunk_size_bytes_(std::numeric_limits<uint32_t>::max()),
139         window_size_multiplier_(1),
140         transmit_phase_(TransmitPhase::kSlowStart),
141         max_parameters_(nullptr),
142         thread_(nullptr),
143         last_chunk_sent_(Chunk::Type::kData),
144         last_chunk_offset_(0),
145         chunk_timeout_(chrono::SystemClock::duration::zero()),
146         initial_chunk_timeout_(chrono::SystemClock::duration::zero()),
147         interchunk_delay_(chrono::SystemClock::for_at_least(
148             std::chrono::microseconds(kDefaultChunkDelayMicroseconds))),
149         next_timeout_(kNoTimeout),
150         log_rate_limit_cfg_(cfg::kLogDefaultRateLimit),
151         log_rate_limit_(kNoRateLimit),
152         log_chunks_before_rate_limit_cfg_(
153             cfg::kLogDefaultChunksBeforeRateLimit),
154         log_chunks_before_rate_limit_(0) {}
155 
reader()156   stream::Reader& reader() {
157     PW_DASSERT(active() && type() == TransferType::kTransmit);
158     return static_cast<stream::Reader&>(*stream_);
159   }
160 
161   uint32_t initial_offset_;
162 
163  private:
164   enum class TransferState : uint8_t {
165     // The context is available for use for a new transfer.
166     kInactive,
167 
168     // A transfer completed and the final status chunk was sent. The Context is
169     // available for use for a new transfer. A receive transfer uses this state
170     // to allow a transmitter to retry its last chunk if the final status chunk
171     // was dropped.
172     //
173     // Only used by the legacy protocol. Starting from version 2, transfer
174     // completions are acknowledged, for which the TERMINATING state is used.
175     kCompleted,
176 
177     // Transfer is starting. The server and client are performing an initial
178     // handshake and negotiating protocol and feature flags.
179     kInitiating,
180 
181     // Waiting for the other end to send a chunk.
182     kWaiting,
183 
184     // Transmitting a window of data to a receiver.
185     kTransmitting,
186 
187     // Recovering after one or more chunks was dropped in an active transfer.
188     kRecovery,
189 
190     // Transfer has completed locally and is waiting for the peer to acknowledge
191     // its final status. Only entered by the terminating side of the transfer.
192     //
193     // The context remains in a TERMINATING state until it receives an
194     // acknowledgement from the peer or times out. Either way, the context
195     // transitions to INACTIVE afterwards, fully cleaning it up for reuse.
196     //
197     // Used instead of COMPLETED starting from version 2. Unlike COMPLETED,
198     // contexts in a TERMINATING state cannot be used to start new transfers.
199     kTerminating,
200   };
201 
202   enum class TransmitAction {
203     // Start of a new transfer.
204     kBegin,
205     // First parameters chunk of a transfer.
206     kFirstParameters,
207     // Extend the current window length.
208     kExtend,
209     // Retransmit from a specified offset.
210     kRetransmit,
211   };
212 
213   // Slow start and congestion avoidance are analogues to the equally named
214   // phases in TCP congestion control.
215   enum class TransmitPhase : bool { kSlowStart, kCongestionAvoidance };
216 
set_transfer_state(TransferState state)217   void set_transfer_state(TransferState state) { transfer_state_ = state; }
218 
219   // The session ID as unsigned instead of uint32_t so it can be used with %u.
id_for_log()220   unsigned id_for_log() const {
221     static_assert(sizeof(unsigned) >= sizeof(session_id_));
222     return static_cast<unsigned>(session_id_);
223   }
224 
writer()225   stream::Writer& writer() {
226     PW_DASSERT(active() && type() == TransferType::kReceive);
227     return static_cast<stream::Writer&>(*stream_);
228   }
229 
DataTransferComplete()230   bool DataTransferComplete() const {
231     return transfer_state_ == TransferState::kTerminating ||
232            transfer_state_ == TransferState::kCompleted;
233   }
234 
ShouldSkipCompletionHandshake()235   bool ShouldSkipCompletionHandshake() const {
236     // Completion handshakes are not part of the legacy protocol. Additionally,
237     // transfers which have not yet fully established should not handshake and
238     // simply time out.
239     return configured_protocol_version_ <= ProtocolVersion::kLegacy ||
240            transfer_state_ == TransferState::kInitiating;
241   }
242 
243   // Calculates the maximum size of actual data that can be sent within a
244   // single client write transfer chunk, accounting for the overhead of the
245   // transfer protocol and RPC system.
246   //
247   // Note: This function relies on RPC protocol internals. This is generally a
248   // *bad* idea, but is necessary here due to limitations of the RPC system
249   // and its asymmetric ingress and egress paths.
250   //
251   // TODO(frolv): This should be investigated further and perhaps addressed
252   // within the RPC system, at the least through a helper function.
253   uint32_t MaxWriteChunkSize(uint32_t max_chunk_size_bytes,
254                              uint32_t channel_id) const;
255 
256   // Initializes a new transfer using new_transfer. The provided stream
257   // argument is used in place of the NewTransferEvent's stream. Only
258   // initializes state; no packets are sent.
259   //
260   // Precondition: context is not active.
261   void Initialize(const NewTransferEvent& new_transfer);
262 
263   // Starts a new transfer from an initialized context by sending the initial
264   // transfer chunk. This is only used by transfer clients, as the transfer
265   // service cannot initiate transfers.
266   //
267   // Calls Finish(), which calls the on_completion callback, if initiating a
268   // transfer fails.
269   void InitiateTransferAsClient();
270 
271   // Starts a new transfer on the server after receiving a request from a
272   // client.
273   bool StartTransferAsServer(const NewTransferEvent& new_transfer);
274 
275   // Does final cleanup specific to the server or client. Returns whether the
276   // cleanup succeeded. An error in cleanup indicates that the transfer
277   // failed.
278   virtual Status FinalCleanup(Status status) = 0;
279 
280   // Returns the total size of the transfer resource, or
281   // `std::numeric_limits<size_t>::max()` if unbounded.
282   virtual size_t TransferSizeBytes() const = 0;
283 
284   // Seeks the reader source. Client may need to seek with reference to the
285   // initial offset, where the server shouldn't, so each context needs its own
286   // seek method.
287   virtual Status SeekReader(uint32_t offset) = 0;
288 
289   // Processes a chunk in either a transfer or receive transfer.
290   void HandleChunkEvent(const ChunkEvent& event);
291 
292   // Runs the initial three-way handshake when starting a new transfer.
293   void PerformInitialHandshake(const Chunk& chunk);
294 
295   void UpdateLocalProtocolConfigurationFromPeer(const Chunk& chunk);
296 
297   // Processes a chunk in a transmit transfer.
298   void HandleTransmitChunk(const Chunk& chunk);
299 
300   // Processes a transfer parameters update in a transmit transfer.
301   void HandleTransferParametersUpdate(const Chunk& chunk);
302 
303   // Sends the next chunk in a transmit transfer, if any.
304   void TransmitNextChunk(bool retransmit_requested);
305 
306   // Processes a chunk in a receive transfer.
307   void HandleReceiveChunk(const Chunk& chunk);
308 
309   // Processes a data chunk in a received while in the kWaiting state.
310   void HandleReceivedData(const Chunk& chunk);
311 
312   // Sends the first chunk in a legacy transmit transfer.
313   void SendInitialLegacyTransmitChunk();
314 
315   // Updates the current receive transfer parameters based on the context's
316   // configuration.
317   void UpdateTransferParameters(TransmitAction action);
318 
319   // Populates the transfer parameters fields on a chunk object.
320   void SetTransferParameters(Chunk& parameters);
321 
322   // In a receive transfer, sends a parameters chunk telling the transmitter
323   // how much data they can send.
324   void SendTransferParameters(TransmitAction action);
325 
326   // Updates the current receive transfer parameters, then sends them.
327   void UpdateAndSendTransferParameters(TransmitAction action);
328 
329   // Processes a chunk in a terminating state.
330   void HandleTerminatingChunk(const Chunk& chunk);
331 
332   // Ends the transfer with the specified status, sending a completion chunk to
333   // the peer.
334   void TerminateTransfer(Status status, bool with_resource_id = false);
335 
336   // Ends a transfer following notification of completion from the peer.
337   void HandleTermination(Status status);
338 
339   // Forcefully ends a transfer locally without contacting the peer.
Abort(Status status)340   void Abort(Status status) {
341     Finish(status);
342     set_transfer_state(TransferState::kCompleted);
343   }
344 
345   // Sends a final status chunk of a completed transfer without updating the
346   // transfer. Sends status_, which MUST have been set by a previous Finish()
347   // call.
348   void SendFinalStatusChunk(bool with_resource_id = false);
349 
350   // Marks the transfer as completed and calls FinalCleanup(). Sets status_ to
351   // the final status for this transfer. The transfer MUST be active when this
352   // is called.
353   void Finish(Status status);
354 
355   // Encodes the specified chunk to the encode buffer and sends it with the
356   // rpc_writer_. Calls Finish() with an error if the operation fails.
357   void EncodeAndSendChunk(const Chunk& chunk);
358 
359   void SetTimeout(chrono::SystemClock::duration timeout);
ClearTimeout()360   void ClearTimeout() { next_timeout_ = kNoTimeout; }
361 
362   // Called when the transfer's timeout expires.
363   void HandleTimeout();
364 
365   // Resends the last packet or aborts the transfer if the maximum retries has
366   // been exceeded.
367   void Retry();
368   void RetryHandshake();
369 
370   void LogTransferConfiguration();
371 
372   static constexpr uint8_t kFlagsType = 1 << 0;
373   static constexpr uint8_t kFlagsDataSent = 1 << 1;
374   static constexpr uint8_t kFlagsContactMade = 1 << 2;
375 
376   static constexpr uint32_t kDefaultChunkDelayMicroseconds = 2000;
377 
378   // How long to wait for the other side to ACK a final transfer chunk before
379   // resetting the context so that it can be reused. During this time, the
380   // status chunk will be re-sent for every non-ACK chunk received,
381   // continually notifying the other end that the transfer is over.
382   static constexpr chrono::SystemClock::duration kFinalChunkAckTimeout =
383       chrono::SystemClock::for_at_least(std::chrono::milliseconds(5000));
384 
385   static constexpr chrono::SystemClock::time_point kNoTimeout =
386       chrono::SystemClock::time_point(chrono::SystemClock::duration(0));
387 
388   static constexpr chrono::SystemClock::duration kNoRateLimit =
389       chrono::SystemClock::duration::zero();
390 
391   uint32_t session_id_;
392   uint32_t resource_id_;
393 
394   // The version of the transfer protocol that this node wants to run.
395   ProtocolVersion desired_protocol_version_;
396 
397   // The version of the transfer protocol that the context is actually running,
398   // following negotiation with the transfer peer.
399   ProtocolVersion configured_protocol_version_;
400 
401   uint8_t flags_;
402   TransferState transfer_state_;
403   uint8_t retries_;
404   uint8_t max_retries_;
405   uint32_t lifetime_retries_;
406   uint32_t max_lifetime_retries_;
407 
408   // The stream from which to read or to which to write data.
409   stream::Stream* stream_;
410   rpc::Writer* rpc_writer_;
411 
412   uint32_t offset_;
413   uint32_t window_size_;
414   uint32_t window_end_offset_;
415   uint32_t max_chunk_size_bytes_;
416 
417   uint32_t window_size_multiplier_;
418   TransmitPhase transmit_phase_;
419 
420   const TransferParameters* max_parameters_;
421   TransferThread* thread_;
422 
423   Chunk::Type last_chunk_sent_;
424 
425   union {
426     Status status_;               // Used when state is kCompleted.
427     uint32_t last_chunk_offset_;  // Used in states kWaiting and kRecovery.
428   };
429 
430   // How long to wait for a chunk from the other end.
431   chrono::SystemClock::duration chunk_timeout_;
432 
433   // How long for a client to wait for an initial server response.
434   chrono::SystemClock::duration initial_chunk_timeout_;
435 
436   // How long to delay between transmitting subsequent data chunks within a
437   // window.
438   chrono::SystemClock::duration interchunk_delay_;
439 
440   // Timestamp at which the transfer will next time out, or kNoTimeout.
441   chrono::SystemClock::time_point next_timeout_;
442 
443   // Rate limit for repetitive logs
444   chrono::SystemClock::duration log_rate_limit_cfg_;
445   chrono::SystemClock::duration log_rate_limit_;
446 
447   // chunk delay to start rate limiting
448   uint16_t log_chunks_before_rate_limit_cfg_;
449   uint16_t log_chunks_before_rate_limit_;
450 
451   RateEstimate transfer_rate_;
452 };
453 
454 }  // namespace pw::transfer::internal
455