1 // Copyright 2024 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 #pragma once 15 16 #include <optional> 17 18 #include "pw_async/dispatcher.h" 19 #include "pw_async_basic/dispatcher.h" 20 #include "pw_bytes/span.h" 21 #include "pw_containers/intrusive_list.h" 22 #include "pw_status/status.h" 23 #include "pw_stream/stream.h" 24 #include "pw_sync/lock_annotations.h" 25 #include "pw_sync/mutex.h" 26 #include "pw_sync/timed_thread_notification.h" 27 #include "pw_thread/thread_core.h" 28 29 namespace pw::grpc { 30 31 // SendQueue is a queue+thread that serializes sending lists of bytes to 32 // a stream. 33 class SendQueue : public thread::ThreadCore { 34 public: SendQueue(stream::ReaderWriter & socket)35 SendQueue(stream::ReaderWriter& socket) 36 : socket_(socket), 37 send_task_(pw::bind_member<&SendQueue::ProcessSendQueue>(this)) {} 38 39 // Thread safe. Blocks till send is complete. Returns Status from stream 40 // write. 41 Status SendBytes(ConstByteSpan message) PW_LOCKS_EXCLUDED(send_mutex_); 42 43 // Thread safe. Blocks till send is complete. All messages are sent 44 // atomically. Returns union of Status's from stream writes. 45 Status SendBytesVector(span<ConstByteSpan> messages) 46 PW_LOCKS_EXCLUDED(send_mutex_); 47 48 // ThreadCore impl. Run()49 void Run() override { send_dispatcher_.Run(); } 50 // Call before attempting to join thread. RequestStop()51 void RequestStop() { send_dispatcher_.RequestStop(); } 52 53 private: 54 struct SendRequest : public IntrusiveList<SendRequest>::Item { SendRequestSendRequest55 SendRequest(span<ConstByteSpan> m) : messages(m) {} 56 sync::TimedThreadNotification notify; 57 Status status = OkStatus(); 58 span<ConstByteSpan> messages; 59 }; 60 61 std::optional<std::reference_wrapper<SendRequest>> NextSendRequest() 62 PW_LOCKS_EXCLUDED(send_mutex_); 63 void QueueSendRequest(SendRequest& request) PW_LOCKS_EXCLUDED(send_mutex_); 64 void CancelSendRequest(SendRequest& request) PW_LOCKS_EXCLUDED(send_mutex_); 65 void ProcessSendQueue(async::Context& context, Status status) 66 PW_LOCKS_EXCLUDED(send_mutex_); 67 68 stream::ReaderWriter& socket_; 69 async::BasicDispatcher send_dispatcher_; 70 async::Task send_task_; 71 sync::Mutex send_mutex_; 72 IntrusiveList<SendRequest> send_requests_ PW_GUARDED_BY(send_mutex_); 73 }; 74 75 } // namespace pw::grpc 76