1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_hdlc/router.h"
16
17 #include <inttypes.h>
18
19 #include <algorithm>
20
21 #include "pw_hdlc/encoder.h"
22 #include "pw_log/log.h"
23 #include "pw_multibuf/multibuf.h"
24 #include "pw_multibuf/stream.h"
25 #include "pw_result/result.h"
26 #include "pw_stream/null_stream.h"
27
28 namespace pw::hdlc {
29
30 using ::pw::async2::Context;
31 using ::pw::async2::Pending;
32 using ::pw::async2::Poll;
33 using ::pw::async2::Ready;
34 using ::pw::channel::ByteReaderWriter;
35 using ::pw::channel::DatagramReaderWriter;
36 using ::pw::multibuf::Chunk;
37 using ::pw::multibuf::MultiBuf;
38 using ::pw::stream::CountingNullStream;
39
40 namespace {
41
42 /// HDLC encodes the contents of ``payload`` to ``writer``.
WriteMultiBufUIFrame(uint64_t address,const MultiBuf & payload,stream::Writer & writer)43 Status WriteMultiBufUIFrame(uint64_t address,
44 const MultiBuf& payload,
45 stream::Writer& writer) {
46 Encoder encoder(writer);
47 if (Status status = encoder.StartUnnumberedFrame(address); !status.ok()) {
48 return status;
49 }
50 for (const Chunk& chunk : payload.Chunks()) {
51 if (Status status = encoder.WriteData(chunk); !status.ok()) {
52 return status;
53 }
54 }
55 return encoder.FinishFrame();
56 }
57
58 /// Calculates the size of ``payload`` once HDLC-encoded.
CalculateSizeOnceEncoded(uint64_t address,const MultiBuf & payload)59 Result<size_t> CalculateSizeOnceEncoded(uint64_t address,
60 const MultiBuf& payload) {
61 CountingNullStream null_stream;
62 Status encode_status = WriteMultiBufUIFrame(address, payload, null_stream);
63 if (!encode_status.ok()) {
64 return encode_status;
65 }
66 return null_stream.bytes_written();
67 }
68
69 /// Attempts to decode a frame from ``data``, advancing ``data`` forwards by
70 /// any bytes that are consumed.
DecodeFrame(Decoder & decoder,MultiBuf & data)71 std::optional<Frame> DecodeFrame(Decoder& decoder, MultiBuf& data) {
72 size_t processed = 0;
73 for (std::byte byte : data) {
74 Result<Frame> frame_result = decoder.Process(byte);
75 ++processed;
76 if (frame_result.status().IsUnavailable()) {
77 // No frame is yet available.
78 } else if (frame_result.ok()) {
79 data.DiscardPrefix(processed);
80 return std::move(*frame_result);
81 } else if (frame_result.status().IsDataLoss()) {
82 PW_LOG_ERROR("Discarding invalid incoming HDLC frame.");
83 } else if (frame_result.status().IsResourceExhausted()) {
84 PW_LOG_ERROR("Discarding incoming HDLC frame: too large for buffer.");
85 }
86 }
87 data.DiscardPrefix(processed);
88 return std::nullopt;
89 }
90
91 } // namespace
92
AddChannel(DatagramReaderWriter & channel,uint64_t receive_address,uint64_t send_address)93 Status Router::AddChannel(DatagramReaderWriter& channel,
94 uint64_t receive_address,
95 uint64_t send_address) {
96 for (const ChannelData& data : channel_datas_) {
97 if ((data.channel == &channel) ||
98 (data.receive_address == receive_address) ||
99 (data.send_address == send_address)) {
100 return Status::AlreadyExists();
101 }
102 }
103 channel_datas_.emplace_back(channel, receive_address, send_address);
104 return OkStatus();
105 }
106
RemoveChannel(DatagramReaderWriter & channel,uint64_t receive_address,uint64_t send_address)107 Status Router::RemoveChannel(DatagramReaderWriter& channel,
108 uint64_t receive_address,
109 uint64_t send_address) {
110 auto channel_entry = std::find_if(
111 channel_datas_.begin(),
112 channel_datas_.end(),
113 [&channel, receive_address, send_address](const ChannelData& data) {
114 return (data.channel == &channel) &&
115 (data.receive_address == receive_address) &&
116 (data.send_address == send_address);
117 });
118 if (channel_entry == channel_datas_.end()) {
119 return Status::NotFound();
120 }
121 if (channel_datas_.size() == 1) {
122 channel_datas_.clear();
123 } else {
124 // Put the ChannelData in the back of
125 // the list and pop it out to avoid shifting
126 // all elements.
127 std::swap(*channel_entry, channel_datas_.back());
128 channel_datas_.pop_back();
129 }
130 return OkStatus();
131 }
132
FindChannelForReceiveAddress(uint64_t receive_address)133 Router::ChannelData* Router::FindChannelForReceiveAddress(
134 uint64_t receive_address) {
135 for (auto& channel : channel_datas_) {
136 if (channel.receive_address == receive_address) {
137 return &channel;
138 }
139 }
140 return nullptr;
141 }
142
PollDeliverIncomingFrame(Context & cx,const Frame & frame)143 Poll<> Router::PollDeliverIncomingFrame(Context& cx, const Frame& frame) {
144 ConstByteSpan data = frame.data();
145 uint64_t address = frame.address();
146 ChannelData* channel = FindChannelForReceiveAddress(address);
147 if (channel == nullptr) {
148 PW_LOG_ERROR("Received incoming HDLC packet with address %" PRIu64
149 ", but no channel with that incoming address is registered.",
150 address);
151 return Ready();
152 }
153 Poll<Status> ready_to_write = channel->channel->PendReadyToWrite(cx);
154 if (ready_to_write.IsPending()) {
155 return Pending();
156 }
157 if (!ready_to_write->ok()) {
158 PW_LOG_ERROR("Channel at incoming HDLC address %" PRIu64
159 " became unwriteable. Status: %d",
160 channel->receive_address,
161 ready_to_write->code());
162 return Ready();
163 }
164 Poll<std::optional<MultiBuf>> buffer =
165 channel->channel->PendAllocateWriteBuffer(cx, data.size());
166 if (buffer.IsPending()) {
167 return Pending();
168 }
169 if (!buffer->has_value()) {
170 PW_LOG_ERROR(
171 "Unable to allocate a buffer of size %zu destined for incoming "
172 "HDLC address %" PRIu64 ". Packet will be discarded.",
173 data.size(),
174 frame.address());
175 return Ready();
176 }
177 std::copy(frame.data().begin(), frame.data().end(), (**buffer).begin());
178 Status write_status = channel->channel->StageWrite(std::move(**buffer));
179 if (!write_status.ok()) {
180 PW_LOG_ERROR(
181 "Failed to write a buffer of size %zu destined for incoming HDLC "
182 "address %" PRIu64 ". Status: %d",
183 data.size(),
184 channel->receive_address,
185 write_status.code());
186 }
187 return Ready();
188 }
189
DecodeAndWriteIncoming(Context & cx)190 void Router::DecodeAndWriteIncoming(Context& cx) {
191 while (true) {
192 if (decoded_frame_.has_value()) {
193 if (PollDeliverIncomingFrame(cx, *decoded_frame_).IsPending()) {
194 return;
195 }
196 // Zero out the frame delivery state.
197 decoded_frame_ = std::nullopt;
198 }
199
200 while (incoming_data_.empty()) {
201 Poll<Result<MultiBuf>> incoming = io_channel_.PendRead(cx);
202 if (incoming.IsPending()) {
203 return;
204 }
205 if (!incoming->ok()) {
206 if (incoming->status().IsFailedPrecondition()) {
207 PW_LOG_WARN("HDLC io_channel has closed.");
208 } else {
209 PW_LOG_ERROR("Unable to read from HDLC io_channel. Status: %d",
210 incoming->status().code());
211 }
212 return;
213 }
214 incoming_data_ = std::move(**incoming);
215 }
216
217 decoded_frame_ = DecodeFrame(decoder_, incoming_data_);
218 }
219 }
220
TryFillBufferToEncodeAndSend(Context & cx)221 void Router::TryFillBufferToEncodeAndSend(Context& cx) {
222 if (buffer_to_encode_and_send_.has_value()) {
223 return;
224 }
225 for (size_t i = 0; i < channel_datas_.size(); ++i) {
226 ChannelData& cd =
227 channel_datas_[(next_first_read_index_ + i) % channel_datas_.size()];
228 Poll<Result<MultiBuf>> buf_result = cd.channel->PendRead(cx);
229 if (buf_result.IsPending()) {
230 continue;
231 }
232 if (!buf_result->ok()) {
233 if (buf_result->status().IsUnimplemented()) {
234 PW_LOG_ERROR("Channel registered for outgoing HDLC address %" PRIu64
235 " is not readable.",
236 cd.send_address);
237 }
238 // We ignore FAILED_PRECONDITION (closed) because it will be handled
239 // elsewhere. OUT_OF_RANGE just means we have finished writing. No
240 // action is needed because the channel may still be receiving data.
241 continue;
242 }
243 MultiBuf& buf = **buf_result;
244 uint64_t target_address = cd.send_address;
245 Result<size_t> encoded_size = CalculateSizeOnceEncoded(target_address, buf);
246 if (!encoded_size.ok()) {
247 PW_LOG_ERROR(
248 "Unable to compute size of encoded packet for outgoing buffer of "
249 "size %zu destined for outgoing HDLC address %" PRIu64
250 ". Packet will be discarded.",
251 buf.size(),
252 target_address);
253 continue;
254 }
255 buffer_to_encode_and_send_ =
256 OutgoingBuffer{/*buffer=*/std::move(buf),
257 /*hdlc_encoded_size=*/*encoded_size,
258 /*target_address=*/target_address};
259 // We received data, so ensure that we start by reading from a different
260 // index next time.
261 next_first_read_index_ =
262 (next_first_read_index_ + 1) % channel_datas_.size();
263 return;
264 }
265 }
266
WriteOutgoingMessages(Context & cx)267 void Router::WriteOutgoingMessages(Context& cx) {
268 while (io_channel_.is_write_open() &&
269 io_channel_.PendReadyToWrite(cx).IsReady()) {
270 TryFillBufferToEncodeAndSend(cx);
271 if (!buffer_to_encode_and_send_.has_value()) {
272 // No channels have new data to send.
273 return;
274 }
275 uint64_t target_address = buffer_to_encode_and_send_->target_address;
276 size_t hdlc_encoded_size = buffer_to_encode_and_send_->hdlc_encoded_size;
277 Poll<std::optional<MultiBuf>> maybe_write_buffer =
278 io_channel_.PendAllocateWriteBuffer(cx, hdlc_encoded_size);
279 if (maybe_write_buffer.IsPending()) {
280 // Channel cannot write any further messages until we can allocate.
281 return;
282 }
283 // We've gotten the allocation: discard the future.
284 if (!maybe_write_buffer->has_value()) {
285 // We can't allocate a write buffer large enough for our encoded frame.
286 // Sadly, we have to throw the frame away.
287 PW_LOG_ERROR(
288 "Unable to allocate a buffer of size %zu destined for outgoing "
289 "HDLC address %" PRIu64 ". Packet will be discarded.",
290 hdlc_encoded_size,
291 target_address);
292 buffer_to_encode_and_send_ = std::nullopt;
293 continue;
294 }
295 MultiBuf write_buffer = std::move(**maybe_write_buffer);
296 Status encode_status =
297 WriteMultiBufUIFrame(target_address,
298 buffer_to_encode_and_send_->buffer,
299 pw::multibuf::Stream(write_buffer));
300 buffer_to_encode_and_send_ = std::nullopt;
301 if (!encode_status.ok()) {
302 PW_LOG_ERROR(
303 "Failed to encode a buffer destined for outgoing HDLC address "
304 "%" PRIu64 ". Status: %d",
305 target_address,
306 encode_status.code());
307 continue;
308 }
309 Status write_status = io_channel_.StageWrite(std::move(write_buffer));
310 if (!write_status.ok()) {
311 PW_LOG_ERROR(
312 "Failed to write a buffer of size %zu destined for outgoing HDLC "
313 "address %" PRIu64 ". Status: %d",
314 hdlc_encoded_size,
315 target_address,
316 write_status.code());
317 }
318 }
319 }
320
Pend(Context & cx)321 Poll<> Router::Pend(Context& cx) {
322 // We check for ability to read, but not write, because we may not always
323 // attempt a write, which would cause us to miss that the channel has closed
324 // for writes.
325 //
326 // Additionally, it is uncommon for a channel to remain readable but not
327 // writeable: the reverse is more common (still readable while no longer
328 // writeable).
329 if (!io_channel_.is_read_open()) {
330 return PendClose(cx);
331 }
332 DecodeAndWriteIncoming(cx);
333 WriteOutgoingMessages(cx);
334 RemoveClosedChannels();
335 if (!io_channel_.is_read_open()) {
336 return PendClose(cx);
337 }
338 return Pending();
339 }
340
PendClose(Context & cx)341 Poll<> Router::PendClose(Context& cx) {
342 for (ChannelData& cd : channel_datas_) {
343 // We ignore the status value from close.
344 // If one or more channels are unable to close, they will remain after
345 // `RemoveClosedChannels` and `channel_datas_.size()` will be nonzero.
346 cd.channel->PendClose(cx).IgnorePoll();
347 }
348 RemoveClosedChannels();
349 if (io_channel_.PendClose(cx).IsPending()) {
350 return Pending();
351 }
352 if (channel_datas_.empty()) {
353 return Ready();
354 } else {
355 return Pending();
356 }
357 }
358
RemoveClosedChannels()359 void Router::RemoveClosedChannels() {
360 auto first_to_remove = std::remove_if(
361 channel_datas_.begin(), channel_datas_.end(), [](const ChannelData& cd) {
362 return !cd.channel->is_read_or_write_open();
363 });
364 channel_datas_.erase(first_to_remove, channel_datas_.end());
365 }
366
367 } // namespace pw::hdlc
368