xref: /aosp_15_r20/external/pigweed/pw_system/public/pw_system/internal/async_packet_io.h (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 #pragma once
15 
16 #include <cstdint>
17 #include <mutex>
18 #include <optional>
19 
20 #include "pw_allocator/allocator.h"
21 #include "pw_async2/dispatcher.h"
22 #include "pw_channel/channel.h"
23 #include "pw_channel/forwarding_channel.h"
24 #include "pw_containers/inline_var_len_entry_queue.h"
25 #include "pw_hdlc/router.h"
26 #include "pw_multibuf/multibuf.h"
27 #include "pw_multibuf/simple_allocator.h"
28 #include "pw_rpc/server.h"
29 #include "pw_sync/lock_annotations.h"
30 #include "pw_sync/mutex.h"
31 #include "pw_sync/thread_notification.h"
32 #include "pw_system/config.h"
33 #include "pw_thread/thread.h"
34 #include "pw_thread/thread_core.h"
35 
36 namespace pw::system::internal {
37 
38 // pw::rpc::ChannelOutput with a queue for outgoing RPC packets.
39 class RpcChannelOutputQueue final : public rpc::ChannelOutput {
40  public:
RpcChannelOutputQueue()41   RpcChannelOutputQueue()
42       : rpc::ChannelOutput("RPC output queue"), dropped_packets_(0) {}
43 
44   // Read packets from the outbound queue.
45   async2::Poll<InlineVarLenEntryQueue<>::Entry> PendOutgoingDatagram(
46       async2::Context& cx);
47 
48   // Pops the packet read from a Pend call.
Pop()49   void Pop() {
50     std::lock_guard lock(mutex_);
51     queue_.pop();
52   }
53 
54   // Number of outgoing packets that have been dropped.
dropped_packets()55   uint32_t dropped_packets() const {
56     std::lock_guard lock(mutex_);
57     return dropped_packets_;
58   }
59 
60  private:
61   Status Send(ConstByteSpan datagram) override;
62 
63   mutable sync::Mutex mutex_;
64   InlineVarLenEntryQueue<PW_SYSTEM_MAX_TRANSMISSION_UNIT> queue_
65       PW_GUARDED_BY(mutex_);
66   async2::Waker packet_ready_ PW_GUARDED_BY(mutex_);
67   uint32_t dropped_packets_ PW_GUARDED_BY(mutex_);
68 };
69 
70 // Thread that receives inbound RPC packets and calls
71 // pw::rpc::Server::ProcessPacket() with them.
72 class RpcServerThread final {
73  public:
74   RpcServerThread(Allocator& allocator, rpc::Server& server);
75 
PendOutgoingDatagram(async2::Context & cx)76   async2::Poll<InlineVarLenEntryQueue<>::Entry> PendOutgoingDatagram(
77       async2::Context& cx) {
78     return rpc_packet_queue_.PendOutgoingDatagram(cx);
79   }
80 
PopOutboundPacket()81   void PopOutboundPacket() { return rpc_packet_queue_.Pop(); }
82 
83   // This approach only works with a single producer.
PendReadyForPacket(async2::Context & cx)84   async2::Poll<> PendReadyForPacket(async2::Context& cx) {
85     std::lock_guard lock(mutex_);
86     if (ready_for_packet_) {
87       return async2::Ready();
88     }
89     PW_ASYNC_STORE_WAKER(
90         cx, ready_to_receive_packet_, "RpcServerThread waiting for RPC packet");
91     return async2::Pending();
92   }
93 
94   void PushPacket(multibuf::MultiBuf&& packet);
95 
96   void RunOnce();
97 
98  private:
99   Allocator& allocator_;
100   sync::Mutex mutex_;
101   bool ready_for_packet_ PW_GUARDED_BY(mutex_) = true;
102   async2::Waker ready_to_receive_packet_ PW_GUARDED_BY(mutex_);
103   multibuf::MultiBuf packet_multibuf_;
104   sync::ThreadNotification new_packet_available_;
105   RpcChannelOutputQueue rpc_packet_queue_;
106   rpc::Server& rpc_server_;
107 };
108 
109 class PacketIO {
110  public:
111   PacketIO(channel::ByteReaderWriter& io_channel,
112            ByteSpan buffer,
113            Allocator& allocator,
114            rpc::Server& rpc_server);
115 
116   void Start(async2::Dispatcher& dispatcher,
117              const thread::Options& thread_options);
118 
119  private:
120   class PacketReader : public async2::Task {
121    public:
PacketReader(PacketIO & io)122     constexpr PacketReader(PacketIO& io) : io_(io) {}
123 
124    private:
125     async2::Poll<> DoPend(async2::Context& cx) override;
126 
127     PacketIO& io_;
128   };
129 
130   class PacketWriter : public async2::Task {
131    public:
PacketWriter(PacketIO & io)132     constexpr PacketWriter(PacketIO& io)
133         : io_(io), outbound_packet_(async2::Pending()) {}
134 
135    private:
136     async2::Poll<> DoPend(async2::Context& cx) override;
137 
138     PacketIO& io_;
139     async2::Poll<InlineVarLenEntryQueue<>::Entry> outbound_packet_;
140   };
141 
channel()142   channel::DatagramReaderWriter& channel() { return channels_.first(); }
143 
144   std::byte mb_allocator_buffer_1_[PW_SYSTEM_MAX_TRANSMISSION_UNIT];
145   std::byte mb_allocator_buffer_2_[PW_SYSTEM_MAX_TRANSMISSION_UNIT];
146   Allocator& allocator_;
147   multibuf::SimpleAllocator mb_allocator_1_;
148   multibuf::SimpleAllocator mb_allocator_2_;
149   channel::ForwardingDatagramChannelPair channels_;
150   hdlc::Router router_;
151   RpcServerThread rpc_server_thread_;
152 
153   PacketReader packet_reader_;
154   PacketWriter packet_writer_;
155 };
156 
157 }  // namespace pw::system::internal
158