1 // Copyright 2021 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 15 #pragma once 16 17 #include <cstddef> 18 #include <optional> 19 #include <utility> 20 21 #include "pw_chrono/system_clock.h" 22 #include "pw_log_rpc/log_service.h" 23 #include "pw_log_rpc/rpc_log_drain_map.h" 24 #include "pw_multisink/multisink.h" 25 #include "pw_result/result.h" 26 #include "pw_rpc/raw/server_reader_writer.h" 27 #include "pw_span/span.h" 28 #include "pw_status/status.h" 29 #include "pw_status/try.h" 30 #include "pw_sync/timed_thread_notification.h" 31 #include "pw_thread/thread_core.h" 32 33 namespace pw::log_rpc { 34 35 // RpcLogDrainThread is a single thread and single MultiSink::Listener that 36 // manages multiple log streams. It is a suitable option when a minimal 37 // thread count is desired but comes with the cost of individual log streams 38 // blocking each other's flushing. 39 class RpcLogDrainThread : public thread::ThreadCore, 40 public multisink::MultiSink::Listener { 41 public: RpcLogDrainThread(multisink::MultiSink & multisink,RpcLogDrainMap & drain_map,span<std::byte> encoding_buffer)42 RpcLogDrainThread(multisink::MultiSink& multisink, 43 RpcLogDrainMap& drain_map, 44 span<std::byte> encoding_buffer) 45 : drain_map_(drain_map), 46 multisink_(multisink), 47 encoding_buffer_(encoding_buffer) {} 48 OnNewEntryAvailable()49 void OnNewEntryAvailable() override { 50 ready_to_flush_notification_.release(); 51 } 52 53 // Sequentially flushes each log stream. Run()54 void Run() override { 55 for (auto& drain : drain_map_.drains()) { 56 multisink_.AttachDrain(drain); 57 drain.set_on_open_callback( 58 [this]() { this->ready_to_flush_notification_.release(); }); 59 } 60 multisink_.AttachListener(*this); 61 62 bool drains_pending = true; 63 std::optional<chrono::SystemClock::duration> min_delay = 64 chrono::SystemClock::duration::zero(); 65 while (true) { 66 if (drains_pending && min_delay.has_value()) { 67 std::ignore = 68 ready_to_flush_notification_.try_acquire_for(min_delay.value()); 69 } else { 70 ready_to_flush_notification_.acquire(); 71 } 72 drains_pending = false; 73 min_delay = std::nullopt; 74 for (auto& drain : drain_map_.drains()) { 75 std::optional<chrono::SystemClock::duration> drain_ready_in = 76 drain.Trickle(encoding_buffer_); 77 if (drain_ready_in.has_value()) { 78 min_delay = std::min(drain_ready_in.value(), 79 min_delay.value_or(drain_ready_in.value())); 80 drains_pending = true; 81 } 82 } 83 } 84 } 85 86 // Opens a server writer to set up an unrequested log stream. OpenUnrequestedLogStream(uint32_t channel_id,rpc::Server & rpc_server,LogService & log_service)87 Status OpenUnrequestedLogStream(uint32_t channel_id, 88 rpc::Server& rpc_server, 89 LogService& log_service) { 90 rpc::RawServerWriter writer = 91 rpc::RawServerWriter::Open<log::pw_rpc::raw::Logs::Listen>( 92 rpc_server, channel_id, log_service); 93 const Result<RpcLogDrain*> drain = 94 drain_map_.GetDrainFromChannelId(channel_id); 95 PW_TRY(drain.status()); 96 return drain.value()->Open(writer); 97 } 98 99 private: 100 sync::TimedThreadNotification ready_to_flush_notification_; 101 RpcLogDrainMap& drain_map_; 102 multisink::MultiSink& multisink_; 103 span<std::byte> encoding_buffer_; 104 }; 105 106 template <size_t kEncodingBufferSizeBytes> 107 class RpcLogDrainThreadWithBuffer final : public RpcLogDrainThread { 108 public: RpcLogDrainThreadWithBuffer(multisink::MultiSink & multisink,RpcLogDrainMap & drain_map)109 RpcLogDrainThreadWithBuffer(multisink::MultiSink& multisink, 110 RpcLogDrainMap& drain_map) 111 : RpcLogDrainThread(multisink, drain_map, encoding_buffer_array_) {} 112 113 private: 114 static_assert(kEncodingBufferSizeBytes >= 115 RpcLogDrain::kLogEntriesEncodeFrameSize + 116 RpcLogDrain::kMinEntryBufferSize, 117 "RpcLogDrainThread's encoding buffer must be large enough for " 118 "at least one entry"); 119 120 std::byte encoding_buffer_array_[kEncodingBufferSizeBytes]; 121 }; 122 123 } // namespace pw::log_rpc 124