1 // Copyright 2024 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 15 #include "pw_grpc/send_queue.h" 16 17 namespace pw::grpc { 18 19 std::optional<std::reference_wrapper<SendQueue::SendRequest>> NextSendRequest()20SendQueue::NextSendRequest() { 21 std::lock_guard lock(send_mutex_); 22 if (send_requests_.empty()) { 23 return std::nullopt; 24 } 25 auto& front = send_requests_.front(); 26 send_requests_.pop_front(); 27 return front; 28 } 29 ProcessSendQueue(async::Context &,Status status)30void SendQueue::ProcessSendQueue(async::Context&, Status status) { 31 if (!status.ok()) { 32 return; 33 } 34 35 auto request = NextSendRequest(); 36 while (request.has_value()) { 37 for (auto message : request->get().messages) { 38 request->get().status.Update(socket_.Write(message)); 39 } 40 request->get().notify.release(); 41 request = NextSendRequest(); 42 } 43 } 44 QueueSendRequest(SendRequest & request)45void SendQueue::QueueSendRequest(SendRequest& request) { 46 std::lock_guard lock(send_mutex_); 47 send_requests_.push_back(request); 48 send_dispatcher_.Cancel(send_task_); 49 send_dispatcher_.Post(send_task_); 50 } 51 CancelSendRequest(SendRequest & request)52void SendQueue::CancelSendRequest(SendRequest& request) { 53 std::lock_guard lock(send_mutex_); 54 send_requests_.remove(request); 55 } 56 SendBytes(ConstByteSpan message)57Status SendQueue::SendBytes(ConstByteSpan message) { 58 std::array<ConstByteSpan, 1> messages = {message}; 59 return SendBytesVector(messages); 60 } 61 SendBytesVector(span<ConstByteSpan> messages)62Status SendQueue::SendBytesVector(span<ConstByteSpan> messages) { 63 SendRequest request(messages); 64 QueueSendRequest(request); 65 // TODO: b/345088816 - Add timeout error support to this blocking call. 66 request.notify.acquire(); 67 return request.status; 68 } 69 70 } // namespace pw::grpc 71