xref: /aosp_15_r20/external/pigweed/pw_grpc/send_queue.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_grpc/send_queue.h"
16 
17 namespace pw::grpc {
18 
19 std::optional<std::reference_wrapper<SendQueue::SendRequest>>
NextSendRequest()20 SendQueue::NextSendRequest() {
21   std::lock_guard lock(send_mutex_);
22   if (send_requests_.empty()) {
23     return std::nullopt;
24   }
25   auto& front = send_requests_.front();
26   send_requests_.pop_front();
27   return front;
28 }
29 
ProcessSendQueue(async::Context &,Status status)30 void SendQueue::ProcessSendQueue(async::Context&, Status status) {
31   if (!status.ok()) {
32     return;
33   }
34 
35   auto request = NextSendRequest();
36   while (request.has_value()) {
37     for (auto message : request->get().messages) {
38       request->get().status.Update(socket_.Write(message));
39     }
40     request->get().notify.release();
41     request = NextSendRequest();
42   }
43 }
44 
QueueSendRequest(SendRequest & request)45 void SendQueue::QueueSendRequest(SendRequest& request) {
46   std::lock_guard lock(send_mutex_);
47   send_requests_.push_back(request);
48   send_dispatcher_.Cancel(send_task_);
49   send_dispatcher_.Post(send_task_);
50 }
51 
CancelSendRequest(SendRequest & request)52 void SendQueue::CancelSendRequest(SendRequest& request) {
53   std::lock_guard lock(send_mutex_);
54   send_requests_.remove(request);
55 }
56 
SendBytes(ConstByteSpan message)57 Status SendQueue::SendBytes(ConstByteSpan message) {
58   std::array<ConstByteSpan, 1> messages = {message};
59   return SendBytesVector(messages);
60 }
61 
SendBytesVector(span<ConstByteSpan> messages)62 Status SendQueue::SendBytesVector(span<ConstByteSpan> messages) {
63   SendRequest request(messages);
64   QueueSendRequest(request);
65   // TODO: b/345088816 - Add timeout error support to this blocking call.
66   request.notify.acquire();
67   return request.status;
68 }
69 
70 }  // namespace pw::grpc
71