1 // Copyright 2024 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 #pragma once 15 16 #include <cstdint> 17 #include <mutex> 18 #include <optional> 19 20 #include "pw_allocator/allocator.h" 21 #include "pw_async2/dispatcher.h" 22 #include "pw_channel/channel.h" 23 #include "pw_channel/forwarding_channel.h" 24 #include "pw_containers/inline_var_len_entry_queue.h" 25 #include "pw_hdlc/router.h" 26 #include "pw_multibuf/multibuf.h" 27 #include "pw_multibuf/simple_allocator.h" 28 #include "pw_rpc/server.h" 29 #include "pw_sync/lock_annotations.h" 30 #include "pw_sync/mutex.h" 31 #include "pw_sync/thread_notification.h" 32 #include "pw_system/config.h" 33 #include "pw_thread/thread.h" 34 #include "pw_thread/thread_core.h" 35 36 namespace pw::system::internal { 37 38 // pw::rpc::ChannelOutput with a queue for outgoing RPC packets. 39 class RpcChannelOutputQueue final : public rpc::ChannelOutput { 40 public: RpcChannelOutputQueue()41 RpcChannelOutputQueue() 42 : rpc::ChannelOutput("RPC output queue"), dropped_packets_(0) {} 43 44 // Read packets from the outbound queue. 45 async2::Poll<InlineVarLenEntryQueue<>::Entry> PendOutgoingDatagram( 46 async2::Context& cx); 47 48 // Pops the packet read from a Pend call. Pop()49 void Pop() { 50 std::lock_guard lock(mutex_); 51 queue_.pop(); 52 } 53 54 // Number of outgoing packets that have been dropped. dropped_packets()55 uint32_t dropped_packets() const { 56 std::lock_guard lock(mutex_); 57 return dropped_packets_; 58 } 59 60 private: 61 Status Send(ConstByteSpan datagram) override; 62 63 mutable sync::Mutex mutex_; 64 InlineVarLenEntryQueue<PW_SYSTEM_MAX_TRANSMISSION_UNIT> queue_ 65 PW_GUARDED_BY(mutex_); 66 async2::Waker packet_ready_ PW_GUARDED_BY(mutex_); 67 uint32_t dropped_packets_ PW_GUARDED_BY(mutex_); 68 }; 69 70 // Thread that receives inbound RPC packets and calls 71 // pw::rpc::Server::ProcessPacket() with them. 72 class RpcServerThread final { 73 public: 74 RpcServerThread(Allocator& allocator, rpc::Server& server); 75 PendOutgoingDatagram(async2::Context & cx)76 async2::Poll<InlineVarLenEntryQueue<>::Entry> PendOutgoingDatagram( 77 async2::Context& cx) { 78 return rpc_packet_queue_.PendOutgoingDatagram(cx); 79 } 80 PopOutboundPacket()81 void PopOutboundPacket() { return rpc_packet_queue_.Pop(); } 82 83 // This approach only works with a single producer. PendReadyForPacket(async2::Context & cx)84 async2::Poll<> PendReadyForPacket(async2::Context& cx) { 85 std::lock_guard lock(mutex_); 86 if (ready_for_packet_) { 87 return async2::Ready(); 88 } 89 PW_ASYNC_STORE_WAKER( 90 cx, ready_to_receive_packet_, "RpcServerThread waiting for RPC packet"); 91 return async2::Pending(); 92 } 93 94 void PushPacket(multibuf::MultiBuf&& packet); 95 96 void RunOnce(); 97 98 private: 99 Allocator& allocator_; 100 sync::Mutex mutex_; 101 bool ready_for_packet_ PW_GUARDED_BY(mutex_) = true; 102 async2::Waker ready_to_receive_packet_ PW_GUARDED_BY(mutex_); 103 multibuf::MultiBuf packet_multibuf_; 104 sync::ThreadNotification new_packet_available_; 105 RpcChannelOutputQueue rpc_packet_queue_; 106 rpc::Server& rpc_server_; 107 }; 108 109 class PacketIO { 110 public: 111 PacketIO(channel::ByteReaderWriter& io_channel, 112 ByteSpan buffer, 113 Allocator& allocator, 114 rpc::Server& rpc_server); 115 116 void Start(async2::Dispatcher& dispatcher, 117 const thread::Options& thread_options); 118 119 private: 120 class PacketReader : public async2::Task { 121 public: PacketReader(PacketIO & io)122 constexpr PacketReader(PacketIO& io) : io_(io) {} 123 124 private: 125 async2::Poll<> DoPend(async2::Context& cx) override; 126 127 PacketIO& io_; 128 }; 129 130 class PacketWriter : public async2::Task { 131 public: PacketWriter(PacketIO & io)132 constexpr PacketWriter(PacketIO& io) 133 : io_(io), outbound_packet_(async2::Pending()) {} 134 135 private: 136 async2::Poll<> DoPend(async2::Context& cx) override; 137 138 PacketIO& io_; 139 async2::Poll<InlineVarLenEntryQueue<>::Entry> outbound_packet_; 140 }; 141 channel()142 channel::DatagramReaderWriter& channel() { return channels_.first(); } 143 144 std::byte mb_allocator_buffer_1_[PW_SYSTEM_MAX_TRANSMISSION_UNIT]; 145 std::byte mb_allocator_buffer_2_[PW_SYSTEM_MAX_TRANSMISSION_UNIT]; 146 Allocator& allocator_; 147 multibuf::SimpleAllocator mb_allocator_1_; 148 multibuf::SimpleAllocator mb_allocator_2_; 149 channel::ForwardingDatagramChannelPair channels_; 150 hdlc::Router router_; 151 RpcServerThread rpc_server_thread_; 152 153 PacketReader packet_reader_; 154 PacketWriter packet_writer_; 155 }; 156 157 } // namespace pw::system::internal 158