xref: /aosp_15_r20/external/pigweed/pw_grpc/public/pw_grpc/send_queue.h (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 #pragma once
15 
16 #include <optional>
17 
18 #include "pw_async/dispatcher.h"
19 #include "pw_async_basic/dispatcher.h"
20 #include "pw_bytes/span.h"
21 #include "pw_containers/intrusive_list.h"
22 #include "pw_status/status.h"
23 #include "pw_stream/stream.h"
24 #include "pw_sync/lock_annotations.h"
25 #include "pw_sync/mutex.h"
26 #include "pw_sync/timed_thread_notification.h"
27 #include "pw_thread/thread_core.h"
28 
29 namespace pw::grpc {
30 
31 // SendQueue is a queue+thread that serializes sending lists of bytes to
32 // a stream.
33 class SendQueue : public thread::ThreadCore {
34  public:
SendQueue(stream::ReaderWriter & socket)35   SendQueue(stream::ReaderWriter& socket)
36       : socket_(socket),
37         send_task_(pw::bind_member<&SendQueue::ProcessSendQueue>(this)) {}
38 
39   // Thread safe. Blocks till send is complete. Returns Status from stream
40   // write.
41   Status SendBytes(ConstByteSpan message) PW_LOCKS_EXCLUDED(send_mutex_);
42 
43   // Thread safe. Blocks till send is complete. All messages are sent
44   // atomically. Returns union of Status's from stream writes.
45   Status SendBytesVector(span<ConstByteSpan> messages)
46       PW_LOCKS_EXCLUDED(send_mutex_);
47 
48   // ThreadCore impl.
Run()49   void Run() override { send_dispatcher_.Run(); }
50   // Call before attempting to join thread.
RequestStop()51   void RequestStop() { send_dispatcher_.RequestStop(); }
52 
53  private:
54   struct SendRequest : public IntrusiveList<SendRequest>::Item {
SendRequestSendRequest55     SendRequest(span<ConstByteSpan> m) : messages(m) {}
56     sync::TimedThreadNotification notify;
57     Status status = OkStatus();
58     span<ConstByteSpan> messages;
59   };
60 
61   std::optional<std::reference_wrapper<SendRequest>> NextSendRequest()
62       PW_LOCKS_EXCLUDED(send_mutex_);
63   void QueueSendRequest(SendRequest& request) PW_LOCKS_EXCLUDED(send_mutex_);
64   void CancelSendRequest(SendRequest& request) PW_LOCKS_EXCLUDED(send_mutex_);
65   void ProcessSendQueue(async::Context& context, Status status)
66       PW_LOCKS_EXCLUDED(send_mutex_);
67 
68   stream::ReaderWriter& socket_;
69   async::BasicDispatcher send_dispatcher_;
70   async::Task send_task_;
71   sync::Mutex send_mutex_;
72   IntrusiveList<SendRequest> send_requests_ PW_GUARDED_BY(send_mutex_);
73 };
74 
75 }  // namespace pw::grpc
76