xref: /aosp_15_r20/external/pigweed/pw_transfer/context.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #define PW_LOG_MODULE_NAME "TRN"
16 #define PW_LOG_LEVEL PW_TRANSFER_CONFIG_LOG_LEVEL
17 
18 #include "pw_transfer/internal/context.h"
19 
20 #include <chrono>
21 #include <limits>
22 
23 #include "pw_assert/check.h"
24 #include "pw_chrono/system_clock.h"
25 #include "pw_log/log.h"
26 #include "pw_log/rate_limited.h"
27 #include "pw_preprocessor/compiler.h"
28 #include "pw_protobuf/serialized_size.h"
29 #include "pw_transfer/internal/config.h"
30 #include "pw_transfer/transfer.pwpb.h"
31 #include "pw_transfer/transfer_thread.h"
32 #include "pw_varint/varint.h"
33 
34 namespace pw::transfer::internal {
35 
HandleEvent(const Event & event)36 void Context::HandleEvent(const Event& event) {
37   switch (event.type) {
38     case EventType::kNewClientTransfer:
39     case EventType::kNewServerTransfer: {
40       if (active()) {
41         if (event.type == EventType::kNewServerTransfer &&
42             event.new_transfer.session_id == session_id_ &&
43             last_chunk_sent_ == Chunk::Type::kStartAck) {
44           // The client is retrying its initial chunk as the response may not
45           // have made it back. Re-send the handshake response without going
46           // through handler reinitialization.
47           RetryHandshake();
48           return;
49         }
50         Abort(Status::Aborted());
51       }
52 
53       Initialize(event.new_transfer);
54 
55       if (event.type == EventType::kNewClientTransfer) {
56         InitiateTransferAsClient();
57       } else {
58         if (StartTransferAsServer(event.new_transfer)) {
59           // TODO(frolv): This should probably be restructured.
60           HandleChunkEvent({.context_identifier = event.new_transfer.session_id,
61                             .match_resource_id = false,  // Unused.
62                             .data = event.new_transfer.raw_chunk_data,
63                             .size = event.new_transfer.raw_chunk_size});
64         }
65       }
66       return;
67     }
68 
69     case EventType::kClientChunk:
70     case EventType::kServerChunk:
71       PW_CHECK(initialized());
72       HandleChunkEvent(event.chunk);
73       return;
74 
75     case EventType::kClientTimeout:
76     case EventType::kServerTimeout:
77       HandleTimeout();
78       return;
79 
80     case EventType::kClientEndTransfer:
81     case EventType::kServerEndTransfer:
82       if (active()) {
83         if (event.end_transfer.send_status_chunk) {
84           TerminateTransfer(event.end_transfer.status);
85         } else {
86           Abort(event.end_transfer.status);
87         }
88       }
89       return;
90 
91     case EventType::kSendStatusChunk:
92     case EventType::kAddTransferHandler:
93     case EventType::kRemoveTransferHandler:
94     case EventType::kSetStream:
95     case EventType::kTerminate:
96     case EventType::kUpdateClientTransfer:
97     case EventType::kGetResourceStatus:
98       // These events are intended for the transfer thread and should never be
99       // forwarded through to a context.
100       PW_CRASH("Transfer context received a transfer thread event");
101   }
102 }
103 
InitiateTransferAsClient()104 void Context::InitiateTransferAsClient() {
105   PW_DCHECK(active());
106 
107   SetTimeout(initial_chunk_timeout_);
108 
109   PW_LOG_INFO("Starting transfer for resource %u",
110               static_cast<unsigned>(resource_id_));
111 
112   // Receive transfers should prepare their initial parameters to be send in the
113   // initial chunk.
114   if (type() == TransferType::kReceive) {
115     UpdateTransferParameters(TransmitAction::kBegin);
116   }
117 
118   if (desired_protocol_version_ == ProtocolVersion::kLegacy) {
119     // Legacy transfers go straight into the data transfer phase without a
120     // handshake.
121     if (type() == TransferType::kReceive) {
122       SendTransferParameters(TransmitAction::kBegin);
123     } else {
124       SendInitialLegacyTransmitChunk();
125     }
126 
127     LogTransferConfiguration();
128     return;
129   }
130 
131   // In newer protocol versions, begin the initial transfer handshake.
132   Chunk start_chunk(desired_protocol_version_, Chunk::Type::kStart);
133   start_chunk.set_desired_session_id(session_id_);
134   start_chunk.set_resource_id(resource_id_);
135   start_chunk.set_initial_offset(offset_);
136 
137   if (type() == TransferType::kReceive) {
138     // Parameters should still be set on the initial chunk for backwards
139     // compatibility if the server only supports the legacy protocol.
140     SetTransferParameters(start_chunk);
141   }
142 
143   EncodeAndSendChunk(start_chunk);
144 }
145 
StartTransferAsServer(const NewTransferEvent & new_transfer)146 bool Context::StartTransferAsServer(const NewTransferEvent& new_transfer) {
147   PW_LOG_INFO("Starting %s transfer %u for resource %u with offset %u",
148               new_transfer.type == TransferType::kTransmit ? "read" : "write",
149               static_cast<unsigned>(new_transfer.session_id),
150               static_cast<unsigned>(new_transfer.resource_id),
151               static_cast<unsigned>(new_transfer.initial_offset));
152   LogTransferConfiguration();
153 
154   flags_ |= kFlagsContactMade;
155 
156   if (Status status = new_transfer.handler->Prepare(
157           new_transfer.type, new_transfer.initial_offset);
158       !status.ok()) {
159     PW_LOG_WARN("Transfer handler %u prepare failed with status %u",
160                 static_cast<unsigned>(new_transfer.handler->id()),
161                 status.code());
162 
163     // As this failure occurs at the start of a transfer, no protocol version is
164     // yet negotiated and one must be set to send a response. It is okay to use
165     // the desired version here, as that comes from the client.
166     configured_protocol_version_ = desired_protocol_version_;
167 
168     status = (status.IsPermissionDenied() || status.IsUnimplemented() ||
169               status.IsResourceExhausted())
170                  ? status
171                  : Status::DataLoss();
172     TerminateTransfer(status, /*with_resource_id=*/true);
173     return false;
174   }
175 
176   // Initialize doesn't set the handler since it's specific to server transfers.
177   static_cast<ServerContext&>(*this).set_handler(*new_transfer.handler);
178 
179   // Server transfers use the stream provided by the handler rather than the
180   // stream included in the NewTransferEvent.
181   stream_ = &new_transfer.handler->stream();
182 
183   return true;
184 }
185 
SendInitialLegacyTransmitChunk()186 void Context::SendInitialLegacyTransmitChunk() {
187   // A transmitter begins a transfer by sending the ID of the resource to which
188   // it wishes to write.
189   Chunk chunk(ProtocolVersion::kLegacy, Chunk::Type::kStart);
190   chunk.set_session_id(resource_id_);
191 
192   EncodeAndSendChunk(chunk);
193 }
194 
UpdateTransferParameters(TransmitAction action)195 void Context::UpdateTransferParameters(TransmitAction action) {
196   max_chunk_size_bytes_ = MaxWriteChunkSize(
197       max_parameters_->max_chunk_size_bytes(), rpc_writer_->channel_id());
198   uint32_t window_size = 0;
199 
200   if (max_chunk_size_bytes_ > max_parameters_->max_window_size_bytes()) {
201     window_size =
202         std::min(max_parameters_->max_window_size_bytes(),
203                  static_cast<uint32_t>(writer().ConservativeWriteLimit()));
204   } else {
205     // Adjust the window size based on the latest event in the transfer.
206     switch (action) {
207       case TransmitAction::kBegin:
208       case TransmitAction::kFirstParameters:
209         // A transfer always begins with a window size of one chunk, set during
210         // initialization. No further handling is required.
211         break;
212 
213       case TransmitAction::kExtend:
214         // Window was received successfully without packet loss and should grow.
215         // Double the window size during slow start, or increase it by a single
216         // chunk in congestion avoidance.
217         if (transmit_phase_ == TransmitPhase::kCongestionAvoidance) {
218           window_size_multiplier_ += 1;
219         } else {
220           window_size_multiplier_ *= 2;
221         }
222 
223         // The window size can never exceed the user-specified maximum bytes. If
224         // it does, reduce the multiplier to the largest size that fits.
225         if (window_size_multiplier_ * max_chunk_size_bytes_ >
226             max_parameters_->max_window_size_bytes()) {
227           window_size_multiplier_ =
228               max_parameters_->max_window_size_bytes() / max_chunk_size_bytes_;
229         }
230         break;
231 
232       case TransmitAction::kRetransmit:
233         // A packet was lost: shrink the window size. Additionally, after the
234         // first packet loss, transition from the slow start to the congestion
235         // avoidance phase of the transfer.
236         if (transmit_phase_ == TransmitPhase::kSlowStart) {
237           transmit_phase_ = TransmitPhase::kCongestionAvoidance;
238         }
239         window_size_multiplier_ =
240             std::max(window_size_multiplier_ / static_cast<uint32_t>(2),
241                      static_cast<uint32_t>(1));
242         break;
243     }
244 
245     window_size =
246         std::min({window_size_multiplier_ * max_chunk_size_bytes_,
247                   max_parameters_->max_window_size_bytes(),
248                   static_cast<uint32_t>(writer().ConservativeWriteLimit())});
249   }
250 
251   window_size_ = window_size;
252   window_end_offset_ = offset_ + window_size;
253 }
254 
SetTransferParameters(Chunk & parameters)255 void Context::SetTransferParameters(Chunk& parameters) {
256   parameters.set_window_end_offset(window_end_offset_)
257       .set_max_chunk_size_bytes(max_chunk_size_bytes_)
258       .set_min_delay_microseconds(kDefaultChunkDelayMicroseconds)
259       .set_offset(offset_);
260 }
261 
UpdateAndSendTransferParameters(TransmitAction action)262 void Context::UpdateAndSendTransferParameters(TransmitAction action) {
263   UpdateTransferParameters(action);
264 
265   return SendTransferParameters(action);
266 }
267 
SendTransferParameters(TransmitAction action)268 void Context::SendTransferParameters(TransmitAction action) {
269   Chunk::Type type = Chunk::Type::kParametersRetransmit;
270 
271   switch (action) {
272     case TransmitAction::kBegin:
273       type = Chunk::Type::kStart;
274       break;
275     case TransmitAction::kFirstParameters:
276     case TransmitAction::kRetransmit:
277       type = Chunk::Type::kParametersRetransmit;
278       break;
279     case TransmitAction::kExtend:
280       type = Chunk::Type::kParametersContinue;
281       break;
282   }
283 
284   Chunk parameters(configured_protocol_version_, type);
285   parameters.set_session_id(session_id_);
286   SetTransferParameters(parameters);
287 
288   PW_LOG_EVERY_N_DURATION(
289       PW_LOG_LEVEL_INFO,
290       log_rate_limit_,
291       "Transfer rate: %u B/s",
292       static_cast<unsigned>(transfer_rate_.GetRateBytesPerSecond()));
293 
294   PW_LOG_EVERY_N_DURATION(PW_LOG_LEVEL_INFO,
295                           log_rate_limit_,
296                           "Transfer %u sending transfer parameters: "
297                           "offset=%u, window_end_offset=%u, max_chunk_size=%u",
298                           static_cast<unsigned>(session_id_),
299                           static_cast<unsigned>(offset_),
300                           static_cast<unsigned>(window_end_offset_),
301                           static_cast<unsigned>(max_chunk_size_bytes_));
302 
303   if (log_chunks_before_rate_limit_ > 0) {
304     log_chunks_before_rate_limit_--;
305 
306     if (log_chunks_before_rate_limit_ == 0) {
307       log_rate_limit_ = log_rate_limit_cfg_;
308     }
309   }
310 
311   EncodeAndSendChunk(parameters);
312 }
313 
EncodeAndSendChunk(const Chunk & chunk)314 void Context::EncodeAndSendChunk(const Chunk& chunk) {
315   last_chunk_sent_ = chunk.type();
316 
317 #if PW_TRANSFER_CONFIG_DEBUG_CHUNKS
318   if ((chunk.remaining_bytes().has_value() &&
319        chunk.remaining_bytes().value() == 0) ||
320       (chunk.type() != Chunk::Type::kData &&
321        chunk.type() != Chunk::Type::kParametersContinue)) {
322     chunk.LogChunk(false, pw::chrono::SystemClock::duration::zero());
323   }
324 #endif
325 
326 #if PW_TRANSFER_CONFIG_DEBUG_DATA_CHUNKS
327   if (chunk.type() == Chunk::Type::kData ||
328       chunk.type() == Chunk::Type::kParametersContinue) {
329     chunk.LogChunk(false, log_rate_limit_);
330   }
331 #endif
332 
333   Result<ConstByteSpan> data = chunk.Encode(thread_->encode_buffer());
334   if (!data.ok()) {
335     PW_LOG_ERROR("Failed to encode chunk for transfer %u: %d",
336                  static_cast<unsigned>(chunk.session_id()),
337                  data.status().code());
338     if (active()) {
339       TerminateTransfer(Status::Internal());
340     }
341     return;
342   }
343 
344   if (const Status status = rpc_writer_->Write(*data); !status.ok()) {
345     PW_LOG_ERROR("Failed to write chunk for transfer %u: %d",
346                  static_cast<unsigned>(chunk.session_id()),
347                  status.code());
348     if (active()) {
349       TerminateTransfer(Status::Internal());
350     }
351     return;
352   }
353 }
354 
Initialize(const NewTransferEvent & new_transfer)355 void Context::Initialize(const NewTransferEvent& new_transfer) {
356   PW_DCHECK(!active());
357 
358   PW_DCHECK_INT_NE(new_transfer.protocol_version,
359                    ProtocolVersion::kUnknown,
360                    "Cannot start a transfer with an unknown protocol");
361 
362   session_id_ = new_transfer.session_id;
363   resource_id_ = new_transfer.resource_id;
364   desired_protocol_version_ = new_transfer.protocol_version;
365   configured_protocol_version_ = ProtocolVersion::kUnknown;
366 
367   flags_ = static_cast<uint8_t>(new_transfer.type);
368   transfer_state_ = TransferState::kWaiting;
369   retries_ = 0;
370   max_retries_ = new_transfer.max_retries;
371   lifetime_retries_ = 0;
372   max_lifetime_retries_ = new_transfer.max_lifetime_retries;
373 
374   if (desired_protocol_version_ == ProtocolVersion::kLegacy) {
375     // In a legacy transfer, there is no protocol negotiation stage.
376     // Automatically configure the context to run the legacy protocol and
377     // proceed to waiting for a chunk.
378     configured_protocol_version_ = ProtocolVersion::kLegacy;
379     transfer_state_ = TransferState::kWaiting;
380   } else {
381     transfer_state_ = TransferState::kInitiating;
382   }
383 
384   rpc_writer_ = new_transfer.rpc_writer;
385   stream_ = new_transfer.stream;
386 
387   offset_ = new_transfer.initial_offset;
388   initial_offset_ = new_transfer.initial_offset;
389   window_size_ = 0;
390   window_end_offset_ = 0;
391   max_chunk_size_bytes_ = new_transfer.max_parameters->max_chunk_size_bytes();
392 
393   window_size_multiplier_ = 1;
394   transmit_phase_ = TransmitPhase::kSlowStart;
395 
396   max_parameters_ = new_transfer.max_parameters;
397   thread_ = new_transfer.transfer_thread;
398 
399   last_chunk_sent_ = Chunk::Type::kStart;
400   last_chunk_offset_ = 0;
401   chunk_timeout_ = new_transfer.timeout;
402   initial_chunk_timeout_ = new_transfer.initial_timeout;
403   interchunk_delay_ = chrono::SystemClock::for_at_least(
404       std::chrono::microseconds(kDefaultChunkDelayMicroseconds));
405   next_timeout_ = kNoTimeout;
406   log_chunks_before_rate_limit_ = log_chunks_before_rate_limit_cfg_;
407 
408   transfer_rate_.Reset();
409 }
410 
HandleChunkEvent(const ChunkEvent & event)411 void Context::HandleChunkEvent(const ChunkEvent& event) {
412   Result<Chunk> maybe_chunk =
413       Chunk::Parse(ConstByteSpan(event.data, event.size));
414   if (!maybe_chunk.ok()) {
415     return;
416   }
417 
418   Chunk chunk = *maybe_chunk;
419 
420   // Received some data. Reset the retry counter.
421   retries_ = 0;
422   flags_ |= kFlagsContactMade;
423 
424 #if PW_TRANSFER_CONFIG_DEBUG_CHUNKS
425   if (chunk.type() != Chunk::Type::kData &&
426       chunk.type() != Chunk::Type::kParametersContinue) {
427     chunk.LogChunk(true, pw::chrono::SystemClock::duration::zero());
428   }
429 #endif
430 #if PW_TRANSFER_CONFIG_DEBUG_DATA_CHUNKS
431   if (chunk.type() == Chunk::Type::kData ||
432       chunk.type() == Chunk::Type::kParametersContinue) {
433     chunk.LogChunk(true, log_rate_limit_);
434   }
435 #endif
436 
437   if (chunk.IsTerminatingChunk()) {
438     if (active()) {
439       HandleTermination(chunk.status().value());
440     } else {
441       PW_LOG_INFO("Got final status %d for completed transfer %d",
442                   static_cast<int>(chunk.status().value().code()),
443                   static_cast<int>(session_id_));
444     }
445     return;
446   }
447 
448   if (type() == TransferType::kTransmit) {
449     HandleTransmitChunk(chunk);
450   } else {
451     HandleReceiveChunk(chunk);
452   }
453 }
454 
PerformInitialHandshake(const Chunk & chunk)455 void Context::PerformInitialHandshake(const Chunk& chunk) {
456   switch (chunk.type()) {
457     // Initial packet sent from a client to a server.
458     case Chunk::Type::kStart: {
459       UpdateLocalProtocolConfigurationFromPeer(chunk);
460 
461       if (type() == TransferType::kReceive) {
462         // Update window end offset so it is valid.
463         window_end_offset_ = offset_;
464       }
465 
466       // This cast is safe as we know we're running in a transfer server.
467       uint32_t resource_id = static_cast<ServerContext&>(*this).handler()->id();
468 
469       Chunk start_ack(configured_protocol_version_, Chunk::Type::kStartAck);
470       start_ack.set_session_id(session_id_);
471       start_ack.set_resource_id(resource_id);
472       start_ack.set_initial_offset(offset_);
473 
474       EncodeAndSendChunk(start_ack);
475       break;
476     }
477 
478     // Response packet sent from a server to a client, confirming the protocol
479     // version and session_id of the transfer.
480     case Chunk::Type::kStartAck: {
481       UpdateLocalProtocolConfigurationFromPeer(chunk);
482 
483       // This should confirm the offset we're starting at
484       if (offset_ != chunk.initial_offset()) {
485         TerminateTransfer(Status::Unimplemented());
486         break;
487       }
488 
489       Chunk start_ack_confirmation(configured_protocol_version_,
490                                    Chunk::Type::kStartAckConfirmation);
491       start_ack_confirmation.set_session_id(session_id_);
492 
493       if (type() == TransferType::kReceive) {
494         // In a receive transfer, tag the initial transfer parameters onto the
495         // confirmation chunk so that the server can immediately begin sending
496         // data.
497         UpdateTransferParameters(TransmitAction::kFirstParameters);
498         SetTransferParameters(start_ack_confirmation);
499       }
500 
501       set_transfer_state(TransferState::kWaiting);
502       EncodeAndSendChunk(start_ack_confirmation);
503       // we received a response, so we can re-up the timeout while waiting for
504       // parameters.
505       SetTimeout(chunk_timeout_);
506       break;
507     }
508 
509     // Confirmation sent by a client to a server of the configured transfer
510     // version and session ID. Completes the handshake and begins the actual
511     // data transfer.
512     case Chunk::Type::kStartAckConfirmation: {
513       set_transfer_state(TransferState::kWaiting);
514 
515       if (type() == TransferType::kTransmit) {
516         HandleTransmitChunk(chunk);
517       } else {
518         HandleReceiveChunk(chunk);
519       }
520       break;
521     }
522 
523     // If a non-handshake chunk is received during an INITIATING state, the
524     // transfer peer is running a legacy protocol version, which does not
525     // perform a handshake. End the handshake, revert to the legacy protocol,
526     // and process the chunk appropriately.
527     case Chunk::Type::kData:
528     case Chunk::Type::kParametersRetransmit:
529     case Chunk::Type::kParametersContinue:
530 
531       // Update the local session_id, which will map to the transfer_id of the
532       // legacy chunk.
533       session_id_ = chunk.session_id();
534 
535       configured_protocol_version_ = ProtocolVersion::kLegacy;
536       // Cancel if we are not using at least version 2, and we tried to start a
537       // non-zero offset transfer
538       if (chunk.initial_offset() != 0) {
539         PW_LOG_ERROR("Legacy transfer does not support offset transfers!");
540         TerminateTransfer(Status::Internal());
541         break;
542       }
543 
544       set_transfer_state(TransferState::kWaiting);
545 
546       PW_LOG_DEBUG(
547           "Transfer %u tried to start on protocol version %d, but peer only "
548           "supports legacy",
549           id_for_log(),
550           static_cast<int>(desired_protocol_version_));
551 
552       if (type() == TransferType::kTransmit) {
553         HandleTransmitChunk(chunk);
554       } else {
555         HandleReceiveChunk(chunk);
556       }
557       break;
558 
559     case Chunk::Type::kCompletion:
560     case Chunk::Type::kCompletionAck:
561       PW_CRASH(
562           "Transfer completion packets should be processed by "
563           "HandleChunkEvent()");
564       break;
565   }
566 }
567 
UpdateLocalProtocolConfigurationFromPeer(const Chunk & chunk)568 void Context::UpdateLocalProtocolConfigurationFromPeer(const Chunk& chunk) {
569   PW_LOG_DEBUG("Negotiating protocol version: ours=%d, theirs=%d",
570                static_cast<int>(desired_protocol_version_),
571                static_cast<int>(chunk.protocol_version()));
572 
573   configured_protocol_version_ =
574       std::min(desired_protocol_version_, chunk.protocol_version());
575 
576   PW_LOG_INFO("Transfer %u: using protocol version %d",
577               id_for_log(),
578               static_cast<int>(configured_protocol_version_));
579 }
580 
HandleTransmitChunk(const Chunk & chunk)581 void Context::HandleTransmitChunk(const Chunk& chunk) {
582   switch (transfer_state_) {
583     case TransferState::kInactive:
584     case TransferState::kRecovery:
585       PW_CRASH("Never should handle chunk while inactive");
586 
587     case TransferState::kCompleted:
588       // In a legacy transfer, if the transfer has already completed and another
589       // chunk is received, tell the other end that the transfer is over.
590       if (!chunk.IsInitialChunk() && status_.ok()) {
591         status_ = Status::FailedPrecondition();
592       }
593 
594       SendFinalStatusChunk();
595       return;
596 
597     case TransferState::kInitiating:
598       PerformInitialHandshake(chunk);
599       return;
600 
601     case TransferState::kWaiting:
602     case TransferState::kTransmitting:
603       if (chunk.protocol_version() == configured_protocol_version_) {
604         HandleTransferParametersUpdate(chunk);
605       } else {
606         PW_LOG_ERROR(
607             "Transmit transfer %u was configured to use protocol version %d "
608             "but received a chunk with version %d",
609             id_for_log(),
610             static_cast<int>(configured_protocol_version_),
611             static_cast<int>(chunk.protocol_version()));
612         TerminateTransfer(Status::Internal());
613       }
614       return;
615 
616     case TransferState::kTerminating:
617       HandleTerminatingChunk(chunk);
618       return;
619   }
620 }
621 
HandleTransferParametersUpdate(const Chunk & chunk)622 void Context::HandleTransferParametersUpdate(const Chunk& chunk) {
623   bool retransmit = chunk.RequestsTransmissionFromOffset();
624 
625   if (retransmit) {
626     // If the offsets don't match, attempt to seek on the reader. Not all
627     // readers support seeking; abort with UNIMPLEMENTED if this handler
628     // doesn't.
629     if (offset_ != chunk.offset()) {
630       if (Status seek_status = SeekReader(chunk.offset()); !seek_status.ok()) {
631         PW_LOG_WARN("Transfer %u seek to %u failed with status %u",
632                     static_cast<unsigned>(session_id_),
633                     static_cast<unsigned>(chunk.offset()),
634                     seek_status.code());
635 
636         // Remap status codes to return one of the following:
637         //
638         //   INTERNAL: invalid seek, never should happen
639         //   DATA_LOSS: the reader is in a bad state
640         //   UNIMPLEMENTED: seeking is not supported
641         //
642         if (seek_status.IsOutOfRange()) {
643           seek_status = Status::Internal();
644         } else if (!seek_status.IsUnimplemented()) {
645           seek_status = Status::DataLoss();
646         }
647 
648         TerminateTransfer(seek_status);
649         return;
650       }
651     }
652 
653     offset_ = chunk.offset();
654   }
655 
656   window_end_offset_ = chunk.window_end_offset();
657 
658   if (chunk.max_chunk_size_bytes().has_value()) {
659     max_chunk_size_bytes_ = std::min(chunk.max_chunk_size_bytes().value(),
660                                      max_parameters_->max_chunk_size_bytes());
661   }
662 
663   if (chunk.min_delay_microseconds().has_value()) {
664     interchunk_delay_ = chrono::SystemClock::for_at_least(
665         std::chrono::microseconds(chunk.min_delay_microseconds().value()));
666   }
667 
668   if (retransmit) {
669     PW_LOG_INFO(
670         "Transfer %u received parameters type=RETRANSMIT offset=%u "
671         "window_end_offset=%u",
672         static_cast<unsigned>(session_id_),
673         static_cast<unsigned>(chunk.offset()),
674         static_cast<unsigned>(window_end_offset_));
675   } else {
676     PW_LOG_EVERY_N_DURATION(
677         PW_LOG_LEVEL_INFO,
678         std::chrono::seconds(3),
679         "Transfer %u received parameters type=CONTINUE offset=%u "
680         "window_end_offset=%u",
681         static_cast<unsigned>(session_id_),
682         static_cast<unsigned>(chunk.offset()),
683         static_cast<unsigned>(window_end_offset_));
684   }
685 
686   // Parsed all of the parameters; start sending the window.
687   set_transfer_state(TransferState::kTransmitting);
688 
689   TransmitNextChunk(retransmit);
690 }
691 
TransmitNextChunk(bool retransmit_requested)692 void Context::TransmitNextChunk(bool retransmit_requested) {
693   Chunk chunk(configured_protocol_version_, Chunk::Type::kData);
694   chunk.set_session_id(session_id_);
695   chunk.set_offset(offset_);
696 
697   // Reserve space for the data proto field overhead and use the remainder of
698   // the buffer for the chunk data.
699   size_t reserved_size =
700       chunk.EncodedSize() + 1 /* data key */ + 5 /* data size */;
701 
702   size_t total_size = TransferSizeBytes();
703   if (total_size != std::numeric_limits<size_t>::max()) {
704     reserved_size += protobuf::SizeOfVarintField(
705         pwpb::Chunk::Fields::kRemainingBytes, total_size);
706   }
707 
708   ByteSpan buffer = thread_->encode_buffer();
709   Result<ByteSpan> data;
710 
711   if (offset_ < total_size) {
712     // Read the next chunk of data into the encode buffer.
713     ByteSpan data_buffer = buffer.subspan(reserved_size);
714     size_t max_bytes_to_send =
715         std::min(window_end_offset_ - offset_, max_chunk_size_bytes_);
716 
717     if (max_bytes_to_send < data_buffer.size()) {
718       data_buffer = data_buffer.first(max_bytes_to_send);
719     }
720 
721     data = reader().Read(data_buffer);
722   } else {
723     // The user-specified resource size has been reached: respect it.
724     data = Status::OutOfRange();
725   }
726 
727   if (data.status().IsOutOfRange()) {
728     // No more data to read.
729     chunk.set_remaining_bytes(0);
730     window_end_offset_ = offset_;
731 
732     PW_LOG_INFO("Transfer %u sending final chunk with remaining_bytes=0",
733                 static_cast<unsigned>(session_id_));
734   } else if (data.ok()) {
735     if (offset_ == window_end_offset_) {
736       if (retransmit_requested) {
737         PW_LOG_ERROR(
738             "Transfer %u: received an empty retransmit request, but there is "
739             "still data to send; aborting with RESOURCE_EXHAUSTED",
740             id_for_log());
741         TerminateTransfer(Status::ResourceExhausted());
742       } else {
743         PW_LOG_DEBUG(
744             "Transfer %u: ignoring continuation packet for transfer window "
745             "that has already been sent",
746             id_for_log());
747         SetTimeout(chunk_timeout_);
748       }
749       return;  // No data was requested, so there is nothing else to do.
750     }
751 
752     PW_LOG_EVERY_N_DURATION(PW_LOG_LEVEL_DEBUG,
753                             std::chrono::seconds(3),
754                             "Transfer %u sending chunk offset=%u size=%u",
755                             static_cast<unsigned>(session_id_),
756                             static_cast<unsigned>(offset_),
757                             static_cast<unsigned>(data.value().size()));
758 
759     chunk.set_payload(data.value());
760     last_chunk_offset_ = offset_;
761     offset_ += data.value().size();
762 
763     if (total_size != std::numeric_limits<size_t>::max()) {
764       chunk.set_remaining_bytes(total_size - offset_);
765     }
766   } else {
767     PW_LOG_ERROR("Transfer %u Read() failed with status %u",
768                  static_cast<unsigned>(session_id_),
769                  data.status().code());
770     TerminateTransfer(Status::DataLoss());
771     return;
772   }
773 
774   Result<ConstByteSpan> encoded_chunk = chunk.Encode(buffer);
775   if (!encoded_chunk.ok()) {
776     PW_LOG_ERROR("Transfer %u failed to encode transmit chunk",
777                  static_cast<unsigned>(session_id_));
778     TerminateTransfer(Status::Internal());
779     return;
780   }
781 
782   if (const Status status = rpc_writer_->Write(*encoded_chunk); !status.ok()) {
783     PW_LOG_ERROR("Transfer %u failed to send transmit chunk, status %u",
784                  static_cast<unsigned>(session_id_),
785                  status.code());
786     TerminateTransfer(Status::DataLoss());
787     return;
788   }
789 
790   last_chunk_sent_ = chunk.type();
791   flags_ |= kFlagsDataSent;
792 
793   if (offset_ == window_end_offset_ || offset_ == total_size) {
794     // Sent all requested data. Must now wait for next parameters from the
795     // receiver.
796     set_transfer_state(TransferState::kWaiting);
797     SetTimeout(chunk_timeout_);
798   } else {
799     // More data is to be sent. Set a timeout to send the next chunk following
800     // the chunk delay.
801     SetTimeout(chrono::SystemClock::for_at_least(interchunk_delay_));
802   }
803 }
804 
HandleReceiveChunk(const Chunk & chunk)805 void Context::HandleReceiveChunk(const Chunk& chunk) {
806   if (transfer_state_ == TransferState::kInitiating) {
807     PerformInitialHandshake(chunk);
808     return;
809   }
810 
811   if (transfer_state_ == TransferState::kCompleted) {
812     // If the transfer has already completed and another chunk is received,
813     // re-send the final status chunk.
814     SendFinalStatusChunk();
815     return;
816   }
817 
818   if (chunk.protocol_version() != configured_protocol_version_) {
819     PW_LOG_ERROR(
820         "Receive transfer %u was configured to use protocol version %d "
821         "but received a chunk with version %d",
822         id_for_log(),
823         static_cast<int>(configured_protocol_version_),
824         static_cast<int>(chunk.protocol_version()));
825     TerminateTransfer(Status::Internal());
826     return;
827   }
828 
829   switch (transfer_state_) {
830     case TransferState::kInactive:
831     case TransferState::kTransmitting:
832     case TransferState::kInitiating:
833       PW_CRASH("HandleReceiveChunk() called in bad transfer state %d",
834                static_cast<int>(transfer_state_));
835 
836     case TransferState::kCompleted:
837       // Handled earlier.
838       PW_UNREACHABLE;
839 
840     case TransferState::kRecovery:
841       if (chunk.offset() != offset_) {
842         if (last_chunk_offset_ == chunk.offset()) {
843           PW_LOG_DEBUG(
844               "Transfer %u received repeated offset %u; retry detected, "
845               "resending transfer parameters",
846               static_cast<unsigned>(session_id_),
847               static_cast<unsigned>(chunk.offset()));
848 
849           log_chunks_before_rate_limit_ = log_chunks_before_rate_limit_cfg_;
850           log_rate_limit_ = kNoRateLimit;
851 
852           UpdateAndSendTransferParameters(TransmitAction::kRetransmit);
853           if (DataTransferComplete()) {
854             return;
855           }
856           PW_LOG_DEBUG("Transfer %u waiting for offset %u, ignoring %u",
857                        static_cast<unsigned>(session_id_),
858                        static_cast<unsigned>(offset_),
859                        static_cast<unsigned>(chunk.offset()));
860         }
861 
862         last_chunk_offset_ = chunk.offset();
863         SetTimeout(chunk_timeout_);
864         return;
865       }
866 
867       PW_LOG_DEBUG("Transfer %u received expected offset %u, resuming transfer",
868                    static_cast<unsigned>(session_id_),
869                    static_cast<unsigned>(offset_));
870       set_transfer_state(TransferState::kWaiting);
871 
872       // The correct chunk was received; process it normally.
873       [[fallthrough]];
874     case TransferState::kWaiting:
875       HandleReceivedData(chunk);
876       return;
877 
878     case TransferState::kTerminating:
879       HandleTerminatingChunk(chunk);
880       return;
881   }
882 }
883 
HandleReceivedData(const Chunk & chunk)884 void Context::HandleReceivedData(const Chunk& chunk) {
885   if (chunk.offset() != offset_) {
886     if (chunk.offset() + chunk.payload().size() <= offset_ &&
887         chunk.type() != Chunk::Type::kStartAckConfirmation) {
888       // If the chunk's data has already been received, don't go through a full
889       // recovery cycle to avoid shrinking the window size and potentially
890       // thrashing. The expected data may already be in-flight, so just allow
891       // the transmitter to keep going with a CONTINUE parameters chunk.
892       //
893       // However, as a retried chunk indicates a potential issue with the
894       // underlying connection, shrink the transfer window.
895       //
896       // Start ack confs do not come with an offset set, so it can get stuck
897       // here if we are doing an offset transfer.
898       PW_LOG_DEBUG("Transfer %u received duplicate chunk with offset %u",
899                    id_for_log(),
900                    static_cast<unsigned>(chunk.offset()));
901       UpdateTransferParameters(TransmitAction::kRetransmit);
902       SendTransferParameters(TransmitAction::kExtend);
903     } else {
904       // Bad offset; reset window size to send another parameters chunk.
905       PW_LOG_WARN(
906           "Transfer %u expected offset %u, received %u; entering recovery "
907           "state",
908           static_cast<unsigned>(session_id_),
909           static_cast<unsigned>(offset_),
910           static_cast<unsigned>(chunk.offset()));
911 
912       set_transfer_state(TransferState::kRecovery);
913       UpdateAndSendTransferParameters(TransmitAction::kRetransmit);
914     }
915 
916     SetTimeout(chunk_timeout_);
917     return;
918   }
919 
920   if (chunk.offset() + chunk.payload().size() > window_end_offset_) {
921     PW_LOG_WARN(
922         "Transfer %u received more data than what was requested (%u received "
923         "for %u pending); attempting to recover.",
924         id_for_log(),
925         static_cast<unsigned>(chunk.payload().size()),
926         static_cast<unsigned>(window_end_offset_ - offset_));
927 
928     // To prevent an improperly implemented client which doesn't respect
929     // window_end_offset from entering an infinite retry loop, limit recovery
930     // attempts to the lifetime retry count.
931     lifetime_retries_++;
932     if (lifetime_retries_ <= max_lifetime_retries_) {
933       set_transfer_state(TransferState::kRecovery);
934       SetTimeout(chunk_timeout_);
935 
936       UpdateAndSendTransferParameters(TransmitAction::kRetransmit);
937     } else {
938       TerminateTransfer(Status::Internal());
939     }
940     return;
941   }
942 
943   // Update the last offset seen so that retries can be detected.
944   last_chunk_offset_ = chunk.offset();
945 
946   // Write staged data from the buffer to the stream.
947   if (chunk.has_payload()) {
948     if (Status status = writer().Write(chunk.payload()); !status.ok()) {
949       PW_LOG_ERROR(
950           "Transfer %u write of %u B chunk failed with status %u; aborting "
951           "with DATA_LOSS",
952           static_cast<unsigned>(session_id_),
953           static_cast<unsigned>(chunk.payload().size()),
954           status.code());
955       TerminateTransfer(Status::DataLoss());
956       return;
957     }
958 
959     transfer_rate_.Update(chunk.payload().size());
960   }
961 
962   // Update the transfer state.
963   offset_ += chunk.payload().size();
964 
965   // When the client sets remaining_bytes to 0, it indicates completion of the
966   // transfer. Acknowledge the completion through a status chunk and clean up.
967   if (chunk.IsFinalTransmitChunk()) {
968     TerminateTransfer(OkStatus());
969     return;
970   }
971 
972   if (chunk.window_end_offset() != 0) {
973     if (chunk.window_end_offset() < offset_) {
974       PW_LOG_ERROR(
975           "Transfer %u got invalid end offset of %u (current offset %u)",
976           id_for_log(),
977           static_cast<unsigned>(chunk.window_end_offset()),
978           static_cast<unsigned>(offset_));
979       TerminateTransfer(Status::Internal());
980       return;
981     }
982 
983     if (chunk.window_end_offset() > window_end_offset_) {
984       // A transmitter should never send a larger end offset than what the
985       // receiver has advertised. If this occurs, there is a bug in the
986       // transmitter implementation. Terminate the transfer.
987       PW_LOG_ERROR(
988           "Transfer %u transmitter sent invalid end offset of %u, "
989           "greater than receiver offset %u",
990           id_for_log(),
991           static_cast<unsigned>(chunk.window_end_offset()),
992           static_cast<unsigned>(window_end_offset_));
993       TerminateTransfer(Status::Internal());
994       return;
995     }
996 
997     window_end_offset_ = chunk.window_end_offset();
998   }
999 
1000   SetTimeout(chunk_timeout_);
1001 
1002   if (chunk.type() == Chunk::Type::kStartAckConfirmation) {
1003     // Send the first parameters in the receive transfer.
1004     UpdateAndSendTransferParameters(TransmitAction::kFirstParameters);
1005     return;
1006   }
1007 
1008   if (offset_ == window_end_offset_) {
1009     // Received all pending data. Advance the transfer parameters.
1010     UpdateAndSendTransferParameters(TransmitAction::kExtend);
1011     return;
1012   }
1013 
1014   // Once the transmitter has sent a sufficient amount of data, try to extend
1015   // the window to allow it to continue sending data without blocking.
1016   uint32_t remaining_window_size = window_end_offset_ - offset_;
1017   bool extend_window = remaining_window_size <=
1018                        window_size_ / max_parameters_->extend_window_divisor();
1019 
1020   if (extend_window) {
1021     UpdateAndSendTransferParameters(TransmitAction::kExtend);
1022   }
1023 }
1024 
HandleTerminatingChunk(const Chunk & chunk)1025 void Context::HandleTerminatingChunk(const Chunk& chunk) {
1026   switch (chunk.type()) {
1027     case Chunk::Type::kCompletion:
1028       PW_CRASH("Completion chunks should be processed by HandleChunkEvent()");
1029 
1030     case Chunk::Type::kCompletionAck:
1031       PW_LOG_INFO(
1032           "Transfer %u completed with status %u", id_for_log(), status_.code());
1033       set_transfer_state(TransferState::kInactive);
1034       break;
1035 
1036     case Chunk::Type::kData:
1037     case Chunk::Type::kStart:
1038     case Chunk::Type::kParametersRetransmit:
1039     case Chunk::Type::kParametersContinue:
1040     case Chunk::Type::kStartAck:
1041     case Chunk::Type::kStartAckConfirmation:
1042       // If a non-completion chunk is received in a TERMINATING state, re-send
1043       // the transfer's completion chunk to the peer.
1044       EncodeAndSendChunk(
1045           Chunk::Final(configured_protocol_version_, session_id_, status_));
1046       break;
1047   }
1048 }
1049 
TerminateTransfer(Status status,bool with_resource_id)1050 void Context::TerminateTransfer(Status status, bool with_resource_id) {
1051   if (transfer_state_ == TransferState::kTerminating ||
1052       transfer_state_ == TransferState::kCompleted) {
1053     // Transfer has already been terminated; no need to do it again.
1054     return;
1055   }
1056 
1057   Finish(status);
1058 
1059   PW_LOG_INFO("Transfer %u terminating with status: %u, offset: %u",
1060               static_cast<unsigned>(session_id_),
1061               status.code(),
1062               static_cast<unsigned>(offset_));
1063 
1064   if (ShouldSkipCompletionHandshake()) {
1065     set_transfer_state(TransferState::kCompleted);
1066   } else {
1067     set_transfer_state(TransferState::kTerminating);
1068     SetTimeout(chunk_timeout_);
1069   }
1070 
1071   // Don't send a final chunk if the other end of the transfer has not yet
1072   // made contact, as there is no one to notify.
1073   if ((flags_ & kFlagsContactMade) == kFlagsContactMade) {
1074     SendFinalStatusChunk(with_resource_id);
1075   }
1076 }
1077 
HandleTermination(Status status)1078 void Context::HandleTermination(Status status) {
1079   Finish(status);
1080 
1081   PW_LOG_INFO("Transfer %u completed with status %u",
1082               static_cast<unsigned>(session_id_),
1083               status.code());
1084 
1085   if (ShouldSkipCompletionHandshake()) {
1086     set_transfer_state(TransferState::kCompleted);
1087   } else {
1088     EncodeAndSendChunk(
1089         Chunk(configured_protocol_version_, Chunk::Type::kCompletionAck)
1090             .set_session_id(session_id_));
1091 
1092     set_transfer_state(TransferState::kInactive);
1093   }
1094 }
1095 
SendFinalStatusChunk(bool with_resource_id)1096 void Context::SendFinalStatusChunk(bool with_resource_id) {
1097   PW_DCHECK(transfer_state_ == TransferState::kCompleted ||
1098             transfer_state_ == TransferState::kTerminating);
1099 
1100   if (configured_protocol_version_ == ProtocolVersion::kUnknown) {
1101     // If the transfer is ended before contact is made with the peer,
1102     // the protocol version may not yet be configured. Use the desired
1103     // version for the status chunk.
1104     configured_protocol_version_ = desired_protocol_version_;
1105     PW_LOG_WARN(
1106         "Transfer %u ending before protocol version was confirmed; using "
1107         "version %u",
1108         id_for_log(),
1109         static_cast<unsigned>(desired_protocol_version_));
1110   }
1111 
1112   PW_LOG_INFO("Sending final chunk for transfer %u with status %u",
1113               static_cast<unsigned>(session_id_),
1114               status_.code());
1115 
1116   Chunk chunk =
1117       Chunk::Final(configured_protocol_version_, session_id_, status_);
1118   if (with_resource_id) {
1119     chunk.set_resource_id(resource_id_);
1120   }
1121   EncodeAndSendChunk(chunk);
1122 }
1123 
Finish(Status status)1124 void Context::Finish(Status status) {
1125   PW_DCHECK(active());
1126 
1127   status.Update(FinalCleanup(status));
1128   status_ = status;
1129 
1130   SetTimeout(kFinalChunkAckTimeout);
1131 }
1132 
SetTimeout(chrono::SystemClock::duration timeout)1133 void Context::SetTimeout(chrono::SystemClock::duration timeout) {
1134   next_timeout_ = chrono::SystemClock::TimePointAfterAtLeast(timeout);
1135 }
1136 
HandleTimeout()1137 void Context::HandleTimeout() {
1138   ClearTimeout();
1139 
1140   switch (transfer_state_) {
1141     case TransferState::kCompleted:
1142       // A timeout occurring in a completed state indicates that the other side
1143       // never ACKed the final status packet. Reset the context to inactive.
1144       set_transfer_state(TransferState::kInactive);
1145       return;
1146 
1147     case TransferState::kTransmitting:
1148       // A timeout occurring in a TRANSMITTING state indicates that the transfer
1149       // has waited for its inter-chunk delay and should transmit its next
1150       // chunk.
1151       TransmitNextChunk(/*retransmit_requested=*/false);
1152       break;
1153 
1154     case TransferState::kInitiating:
1155     case TransferState::kWaiting:
1156     case TransferState::kRecovery:
1157     case TransferState::kTerminating:
1158       // A timeout occurring in a transfer or handshake state indicates that no
1159       // chunk has been received from the other side. The transfer should retry
1160       // its previous operation.
1161       //
1162       // The timeout is set immediately. Retry() will clear it if it fails.
1163       if (transfer_state_ == TransferState::kInitiating &&
1164           last_chunk_sent_ == Chunk::Type::kStart) {
1165         SetTimeout(initial_chunk_timeout_);
1166       } else {
1167         SetTimeout(chunk_timeout_);
1168       }
1169       Retry();
1170       break;
1171 
1172     case TransferState::kInactive:
1173       PW_LOG_ERROR("Timeout occurred in INACTIVE state");
1174       return;
1175   }
1176 }
1177 
Retry()1178 void Context::Retry() {
1179   if (retries_ == max_retries_ || lifetime_retries_ == max_lifetime_retries_) {
1180     PW_LOG_ERROR(
1181         "Transfer %u failed to receive a chunk after %u retries (lifetime %u).",
1182         id_for_log(),
1183         static_cast<unsigned>(retries_),
1184         static_cast<unsigned>(lifetime_retries_));
1185     PW_LOG_ERROR("Canceling transfer.");
1186 
1187     if (transfer_state_ == TransferState::kTerminating) {
1188       // Timeouts occurring in a TERMINATING state indicate that the completion
1189       // chunk was never ACKed. Simply clean up the transfer context.
1190       set_transfer_state(TransferState::kInactive);
1191     } else {
1192       TerminateTransfer(Status::DeadlineExceeded());
1193     }
1194     return;
1195   }
1196 
1197   ++retries_;
1198   ++lifetime_retries_;
1199 
1200   if (transfer_state_ == TransferState::kInitiating ||
1201       last_chunk_sent_ == Chunk::Type::kStartAckConfirmation) {
1202     RetryHandshake();
1203     return;
1204   }
1205 
1206   if (transfer_state_ == TransferState::kTerminating) {
1207     EncodeAndSendChunk(
1208         Chunk::Final(configured_protocol_version_, session_id_, status_));
1209     return;
1210   }
1211 
1212   if (type() == TransferType::kReceive) {
1213     // Resend the most recent transfer parameters.
1214     PW_LOG_DEBUG(
1215         "Receive transfer %u timed out waiting for chunk; resending parameters",
1216         static_cast<unsigned>(session_id_));
1217 
1218     UpdateAndSendTransferParameters(TransmitAction::kRetransmit);
1219     return;
1220   }
1221 
1222   // In a transmit, if a data chunk has not yet been sent, the initial transfer
1223   // parameters did not arrive from the receiver. Resend the initial chunk.
1224   if ((flags_ & kFlagsDataSent) != kFlagsDataSent) {
1225     PW_LOG_DEBUG(
1226         "Transmit transfer %u timed out waiting for initial parameters",
1227         static_cast<unsigned>(session_id_));
1228     SendInitialLegacyTransmitChunk();
1229     return;
1230   }
1231 
1232   // Otherwise, resend the most recent chunk. If the reader doesn't support
1233   // seeking, this isn't possible, so just terminate the transfer immediately.
1234   if (!SeekReader(last_chunk_offset_).ok()) {
1235     PW_LOG_ERROR("Transmit transfer %u timed out waiting for new parameters.",
1236                  id_for_log());
1237     PW_LOG_ERROR("Retrying requires a seekable reader. Alas, ours is not.");
1238     TerminateTransfer(Status::DeadlineExceeded());
1239     return;
1240   }
1241 
1242   // Rewind the transfer position and resend the chunk.
1243   offset_ = last_chunk_offset_;
1244 
1245   TransmitNextChunk(/*retransmit_requested=*/false);
1246 }
1247 
RetryHandshake()1248 void Context::RetryHandshake() {
1249   Chunk retry_chunk(configured_protocol_version_, last_chunk_sent_);
1250 
1251   switch (last_chunk_sent_) {
1252     case Chunk::Type::kStart:
1253       // No protocol version is yet configured at the time of sending the start
1254       // chunk, so we use the client's desired version instead.
1255       retry_chunk.set_protocol_version(desired_protocol_version_)
1256           .set_desired_session_id(session_id_)
1257           .set_resource_id(resource_id_)
1258           .set_initial_offset(offset_);
1259       if (type() == TransferType::kReceive) {
1260         SetTransferParameters(retry_chunk);
1261       }
1262       break;
1263 
1264     case Chunk::Type::kStartAck:
1265       retry_chunk.set_session_id(session_id_)
1266           .set_resource_id(static_cast<ServerContext&>(*this).handler()->id());
1267       break;
1268 
1269     case Chunk::Type::kStartAckConfirmation:
1270       retry_chunk.set_session_id(session_id_);
1271       if (type() == TransferType::kReceive) {
1272         SetTransferParameters(retry_chunk);
1273       }
1274       break;
1275 
1276     case Chunk::Type::kData:
1277     case Chunk::Type::kParametersRetransmit:
1278     case Chunk::Type::kParametersContinue:
1279     case Chunk::Type::kCompletion:
1280     case Chunk::Type::kCompletionAck:
1281       PW_CRASH("Should not RetryHandshake() when not in handshake phase");
1282   }
1283 
1284   EncodeAndSendChunk(retry_chunk);
1285 }
1286 
MaxWriteChunkSize(uint32_t max_chunk_size_bytes,uint32_t channel_id) const1287 uint32_t Context::MaxWriteChunkSize(uint32_t max_chunk_size_bytes,
1288                                     uint32_t channel_id) const {
1289   // Start with the user-provided maximum chunk size, which should be the usable
1290   // payload length on the RPC ingress path after any transport overhead.
1291   ptrdiff_t max_size = max_chunk_size_bytes;
1292 
1293   // Subtract the RPC overhead (pw_rpc/internal/packet.proto).
1294   //
1295   //   type:       1 byte key, 1 byte value (CLIENT_STREAM)
1296   //   channel_id: 1 byte key, varint value (calculate from stream)
1297   //   service_id: 1 byte key, 4 byte value
1298   //   method_id:  1 byte key, 4 byte value
1299   //   payload:    1 byte key, varint length (remaining space)
1300   //   status:     0 bytes (not set in stream packets)
1301   //
1302   //   TOTAL: 14 bytes + encoded channel_id size + encoded payload length
1303   //
1304   max_size -= 14;
1305   max_size -= varint::EncodedSize(channel_id);
1306   max_size -= varint::EncodedSize(max_size);
1307 
1308   // TODO(frolv): Temporarily add 5 bytes for the new call_id change. The RPC
1309   // overhead calculation will be moved into an RPC helper to avoid having
1310   // pw_transfer depend on RPC internals.
1311   max_size -= 5;
1312 
1313   // Subtract the transfer service overhead for a client write chunk
1314   // (pw_transfer/transfer.proto).
1315   //
1316   //   session_id: 1 byte key, varint value (calculate)
1317   //   offset:     1 byte key, varint value (calculate)
1318   //   data:       1 byte key, varint length (remaining space)
1319   //
1320   //   TOTAL: 3 + encoded session_id + encoded offset + encoded data length
1321   //
1322   // Use a lower bound of a single chunk for the window end offset, as it will
1323   // always be at least in that range.
1324   size_t window_end_offset = std::max(window_end_offset_, max_chunk_size_bytes);
1325   max_size -= 3;
1326   max_size -= varint::EncodedSize(session_id_);
1327   max_size -= varint::EncodedSize(window_end_offset);
1328   max_size -= varint::EncodedSize(max_size);
1329 
1330   // A resulting value of zero (or less) renders write transfers unusable, as
1331   // there is no space to send any payload. This should be considered a
1332   // programmer error in the transfer service setup.
1333   PW_CHECK_INT_GT(
1334       max_size,
1335       0,
1336       "Transfer service maximum chunk size is too small to fit a payload. "
1337       "Increase max_chunk_size_bytes to support write transfers.");
1338 
1339   return max_size;
1340 }
1341 
LogTransferConfiguration()1342 void Context::LogTransferConfiguration() {
1343   PW_LOG_DEBUG(
1344       "Local transfer timing configuration: "
1345       "chunk_timeout=%ums, max_retries=%u, interchunk_delay=%uus",
1346       static_cast<unsigned>(
1347           std::chrono::ceil<std::chrono::milliseconds>(chunk_timeout_).count()),
1348       static_cast<unsigned>(max_retries_),
1349       static_cast<unsigned>(
1350           std::chrono::ceil<std::chrono::microseconds>(interchunk_delay_)
1351               .count()));
1352 
1353   PW_LOG_DEBUG(
1354       "Local transfer windowing configuration: max_window_size_bytes=%u, "
1355       "extend_window_divisor=%u, max_chunk_size_bytes=%u",
1356       static_cast<unsigned>(max_parameters_->max_window_size_bytes()),
1357       static_cast<unsigned>(max_parameters_->extend_window_divisor()),
1358       static_cast<unsigned>(max_parameters_->max_chunk_size_bytes()));
1359 }
1360 
1361 }  // namespace pw::transfer::internal
1362