xref: /aosp_15_r20/external/pigweed/pw_hdlc/router.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_hdlc/router.h"
16 
17 #include <inttypes.h>
18 
19 #include <algorithm>
20 
21 #include "pw_hdlc/encoder.h"
22 #include "pw_log/log.h"
23 #include "pw_multibuf/multibuf.h"
24 #include "pw_multibuf/stream.h"
25 #include "pw_result/result.h"
26 #include "pw_stream/null_stream.h"
27 
28 namespace pw::hdlc {
29 
30 using ::pw::async2::Context;
31 using ::pw::async2::Pending;
32 using ::pw::async2::Poll;
33 using ::pw::async2::Ready;
34 using ::pw::channel::ByteReaderWriter;
35 using ::pw::channel::DatagramReaderWriter;
36 using ::pw::multibuf::Chunk;
37 using ::pw::multibuf::MultiBuf;
38 using ::pw::stream::CountingNullStream;
39 
40 namespace {
41 
42 /// HDLC encodes the contents of ``payload`` to ``writer``.
WriteMultiBufUIFrame(uint64_t address,const MultiBuf & payload,stream::Writer & writer)43 Status WriteMultiBufUIFrame(uint64_t address,
44                             const MultiBuf& payload,
45                             stream::Writer& writer) {
46   Encoder encoder(writer);
47   if (Status status = encoder.StartUnnumberedFrame(address); !status.ok()) {
48     return status;
49   }
50   for (const Chunk& chunk : payload.Chunks()) {
51     if (Status status = encoder.WriteData(chunk); !status.ok()) {
52       return status;
53     }
54   }
55   return encoder.FinishFrame();
56 }
57 
58 /// Calculates the size of ``payload`` once HDLC-encoded.
CalculateSizeOnceEncoded(uint64_t address,const MultiBuf & payload)59 Result<size_t> CalculateSizeOnceEncoded(uint64_t address,
60                                         const MultiBuf& payload) {
61   CountingNullStream null_stream;
62   Status encode_status = WriteMultiBufUIFrame(address, payload, null_stream);
63   if (!encode_status.ok()) {
64     return encode_status;
65   }
66   return null_stream.bytes_written();
67 }
68 
69 /// Attempts to decode a frame from ``data``, advancing ``data`` forwards by
70 /// any bytes that are consumed.
DecodeFrame(Decoder & decoder,MultiBuf & data)71 std::optional<Frame> DecodeFrame(Decoder& decoder, MultiBuf& data) {
72   size_t processed = 0;
73   for (std::byte byte : data) {
74     Result<Frame> frame_result = decoder.Process(byte);
75     ++processed;
76     if (frame_result.status().IsUnavailable()) {
77       // No frame is yet available.
78     } else if (frame_result.ok()) {
79       data.DiscardPrefix(processed);
80       return std::move(*frame_result);
81     } else if (frame_result.status().IsDataLoss()) {
82       PW_LOG_ERROR("Discarding invalid incoming HDLC frame.");
83     } else if (frame_result.status().IsResourceExhausted()) {
84       PW_LOG_ERROR("Discarding incoming HDLC frame: too large for buffer.");
85     }
86   }
87   data.DiscardPrefix(processed);
88   return std::nullopt;
89 }
90 
91 }  // namespace
92 
AddChannel(DatagramReaderWriter & channel,uint64_t receive_address,uint64_t send_address)93 Status Router::AddChannel(DatagramReaderWriter& channel,
94                           uint64_t receive_address,
95                           uint64_t send_address) {
96   for (const ChannelData& data : channel_datas_) {
97     if ((data.channel == &channel) ||
98         (data.receive_address == receive_address) ||
99         (data.send_address == send_address)) {
100       return Status::AlreadyExists();
101     }
102   }
103   channel_datas_.emplace_back(channel, receive_address, send_address);
104   return OkStatus();
105 }
106 
RemoveChannel(DatagramReaderWriter & channel,uint64_t receive_address,uint64_t send_address)107 Status Router::RemoveChannel(DatagramReaderWriter& channel,
108                              uint64_t receive_address,
109                              uint64_t send_address) {
110   auto channel_entry = std::find_if(
111       channel_datas_.begin(),
112       channel_datas_.end(),
113       [&channel, receive_address, send_address](const ChannelData& data) {
114         return (data.channel == &channel) &&
115                (data.receive_address == receive_address) &&
116                (data.send_address == send_address);
117       });
118   if (channel_entry == channel_datas_.end()) {
119     return Status::NotFound();
120   }
121   if (channel_datas_.size() == 1) {
122     channel_datas_.clear();
123   } else {
124     // Put the ChannelData in the back of
125     // the list and pop it out to avoid shifting
126     // all elements.
127     std::swap(*channel_entry, channel_datas_.back());
128     channel_datas_.pop_back();
129   }
130   return OkStatus();
131 }
132 
FindChannelForReceiveAddress(uint64_t receive_address)133 Router::ChannelData* Router::FindChannelForReceiveAddress(
134     uint64_t receive_address) {
135   for (auto& channel : channel_datas_) {
136     if (channel.receive_address == receive_address) {
137       return &channel;
138     }
139   }
140   return nullptr;
141 }
142 
PollDeliverIncomingFrame(Context & cx,const Frame & frame)143 Poll<> Router::PollDeliverIncomingFrame(Context& cx, const Frame& frame) {
144   ConstByteSpan data = frame.data();
145   uint64_t address = frame.address();
146   ChannelData* channel = FindChannelForReceiveAddress(address);
147   if (channel == nullptr) {
148     PW_LOG_ERROR("Received incoming HDLC packet with address %" PRIu64
149                  ", but no channel with that incoming address is registered.",
150                  address);
151     return Ready();
152   }
153   Poll<Status> ready_to_write = channel->channel->PendReadyToWrite(cx);
154   if (ready_to_write.IsPending()) {
155     return Pending();
156   }
157   if (!ready_to_write->ok()) {
158     PW_LOG_ERROR("Channel at incoming HDLC address %" PRIu64
159                  " became unwriteable. Status: %d",
160                  channel->receive_address,
161                  ready_to_write->code());
162     return Ready();
163   }
164   Poll<std::optional<MultiBuf>> buffer =
165       channel->channel->PendAllocateWriteBuffer(cx, data.size());
166   if (buffer.IsPending()) {
167     return Pending();
168   }
169   if (!buffer->has_value()) {
170     PW_LOG_ERROR(
171         "Unable to allocate a buffer of size %zu destined for incoming "
172         "HDLC address %" PRIu64 ". Packet will be discarded.",
173         data.size(),
174         frame.address());
175     return Ready();
176   }
177   std::copy(frame.data().begin(), frame.data().end(), (**buffer).begin());
178   Status write_status = channel->channel->StageWrite(std::move(**buffer));
179   if (!write_status.ok()) {
180     PW_LOG_ERROR(
181         "Failed to write a buffer of size %zu destined for incoming HDLC "
182         "address %" PRIu64 ". Status: %d",
183         data.size(),
184         channel->receive_address,
185         write_status.code());
186   }
187   return Ready();
188 }
189 
DecodeAndWriteIncoming(Context & cx)190 void Router::DecodeAndWriteIncoming(Context& cx) {
191   while (true) {
192     if (decoded_frame_.has_value()) {
193       if (PollDeliverIncomingFrame(cx, *decoded_frame_).IsPending()) {
194         return;
195       }
196       // Zero out the frame delivery state.
197       decoded_frame_ = std::nullopt;
198     }
199 
200     while (incoming_data_.empty()) {
201       Poll<Result<MultiBuf>> incoming = io_channel_.PendRead(cx);
202       if (incoming.IsPending()) {
203         return;
204       }
205       if (!incoming->ok()) {
206         if (incoming->status().IsFailedPrecondition()) {
207           PW_LOG_WARN("HDLC io_channel has closed.");
208         } else {
209           PW_LOG_ERROR("Unable to read from HDLC io_channel. Status: %d",
210                        incoming->status().code());
211         }
212         return;
213       }
214       incoming_data_ = std::move(**incoming);
215     }
216 
217     decoded_frame_ = DecodeFrame(decoder_, incoming_data_);
218   }
219 }
220 
TryFillBufferToEncodeAndSend(Context & cx)221 void Router::TryFillBufferToEncodeAndSend(Context& cx) {
222   if (buffer_to_encode_and_send_.has_value()) {
223     return;
224   }
225   for (size_t i = 0; i < channel_datas_.size(); ++i) {
226     ChannelData& cd =
227         channel_datas_[(next_first_read_index_ + i) % channel_datas_.size()];
228     Poll<Result<MultiBuf>> buf_result = cd.channel->PendRead(cx);
229     if (buf_result.IsPending()) {
230       continue;
231     }
232     if (!buf_result->ok()) {
233       if (buf_result->status().IsUnimplemented()) {
234         PW_LOG_ERROR("Channel registered for outgoing HDLC address %" PRIu64
235                      " is not readable.",
236                      cd.send_address);
237       }
238       // We ignore FAILED_PRECONDITION (closed) because it will be handled
239       // elsewhere. OUT_OF_RANGE just means we have finished writing. No
240       // action is needed because the channel may still be receiving data.
241       continue;
242     }
243     MultiBuf& buf = **buf_result;
244     uint64_t target_address = cd.send_address;
245     Result<size_t> encoded_size = CalculateSizeOnceEncoded(target_address, buf);
246     if (!encoded_size.ok()) {
247       PW_LOG_ERROR(
248           "Unable to compute size of encoded packet for outgoing buffer of "
249           "size %zu destined for outgoing HDLC address %" PRIu64
250           ". Packet will be discarded.",
251           buf.size(),
252           target_address);
253       continue;
254     }
255     buffer_to_encode_and_send_ =
256         OutgoingBuffer{/*buffer=*/std::move(buf),
257                        /*hdlc_encoded_size=*/*encoded_size,
258                        /*target_address=*/target_address};
259     // We received data, so ensure that we start by reading from a different
260     // index next time.
261     next_first_read_index_ =
262         (next_first_read_index_ + 1) % channel_datas_.size();
263     return;
264   }
265 }
266 
WriteOutgoingMessages(Context & cx)267 void Router::WriteOutgoingMessages(Context& cx) {
268   while (io_channel_.is_write_open() &&
269          io_channel_.PendReadyToWrite(cx).IsReady()) {
270     TryFillBufferToEncodeAndSend(cx);
271     if (!buffer_to_encode_and_send_.has_value()) {
272       // No channels have new data to send.
273       return;
274     }
275     uint64_t target_address = buffer_to_encode_and_send_->target_address;
276     size_t hdlc_encoded_size = buffer_to_encode_and_send_->hdlc_encoded_size;
277     Poll<std::optional<MultiBuf>> maybe_write_buffer =
278         io_channel_.PendAllocateWriteBuffer(cx, hdlc_encoded_size);
279     if (maybe_write_buffer.IsPending()) {
280       // Channel cannot write any further messages until we can allocate.
281       return;
282     }
283     // We've gotten the allocation: discard the future.
284     if (!maybe_write_buffer->has_value()) {
285       // We can't allocate a write buffer large enough for our encoded frame.
286       // Sadly, we have to throw the frame away.
287       PW_LOG_ERROR(
288           "Unable to allocate a buffer of size %zu destined for outgoing "
289           "HDLC address %" PRIu64 ". Packet will be discarded.",
290           hdlc_encoded_size,
291           target_address);
292       buffer_to_encode_and_send_ = std::nullopt;
293       continue;
294     }
295     MultiBuf write_buffer = std::move(**maybe_write_buffer);
296     Status encode_status =
297         WriteMultiBufUIFrame(target_address,
298                              buffer_to_encode_and_send_->buffer,
299                              pw::multibuf::Stream(write_buffer));
300     buffer_to_encode_and_send_ = std::nullopt;
301     if (!encode_status.ok()) {
302       PW_LOG_ERROR(
303           "Failed to encode a buffer destined for outgoing HDLC address "
304           "%" PRIu64 ". Status: %d",
305           target_address,
306           encode_status.code());
307       continue;
308     }
309     Status write_status = io_channel_.StageWrite(std::move(write_buffer));
310     if (!write_status.ok()) {
311       PW_LOG_ERROR(
312           "Failed to write a buffer of size %zu destined for outgoing HDLC "
313           "address %" PRIu64 ". Status: %d",
314           hdlc_encoded_size,
315           target_address,
316           write_status.code());
317     }
318   }
319 }
320 
Pend(Context & cx)321 Poll<> Router::Pend(Context& cx) {
322   // We check for ability to read, but not write, because we may not always
323   // attempt a write, which would cause us to miss that the channel has closed
324   // for writes.
325   //
326   // Additionally, it is uncommon for a channel to remain readable but not
327   // writeable: the reverse is more common (still readable while no longer
328   // writeable).
329   if (!io_channel_.is_read_open()) {
330     return PendClose(cx);
331   }
332   DecodeAndWriteIncoming(cx);
333   WriteOutgoingMessages(cx);
334   RemoveClosedChannels();
335   if (!io_channel_.is_read_open()) {
336     return PendClose(cx);
337   }
338   return Pending();
339 }
340 
PendClose(Context & cx)341 Poll<> Router::PendClose(Context& cx) {
342   for (ChannelData& cd : channel_datas_) {
343     // We ignore the status value from close.
344     // If one or more channels are unable to close, they will remain after
345     // `RemoveClosedChannels` and `channel_datas_.size()` will be nonzero.
346     cd.channel->PendClose(cx).IgnorePoll();
347   }
348   RemoveClosedChannels();
349   if (io_channel_.PendClose(cx).IsPending()) {
350     return Pending();
351   }
352   if (channel_datas_.empty()) {
353     return Ready();
354   } else {
355     return Pending();
356   }
357 }
358 
RemoveClosedChannels()359 void Router::RemoveClosedChannels() {
360   auto first_to_remove = std::remove_if(
361       channel_datas_.begin(), channel_datas_.end(), [](const ChannelData& cd) {
362         return !cd.channel->is_read_or_write_open();
363       });
364   channel_datas_.erase(first_to_remove, channel_datas_.end());
365 }
366 
367 }  // namespace pw::hdlc
368