xref: /aosp_15_r20/external/pigweed/pw_log_rpc/public/pw_log_rpc/rpc_log_drain_thread.h (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #pragma once
16 
17 #include <cstddef>
18 #include <optional>
19 #include <utility>
20 
21 #include "pw_chrono/system_clock.h"
22 #include "pw_log_rpc/log_service.h"
23 #include "pw_log_rpc/rpc_log_drain_map.h"
24 #include "pw_multisink/multisink.h"
25 #include "pw_result/result.h"
26 #include "pw_rpc/raw/server_reader_writer.h"
27 #include "pw_span/span.h"
28 #include "pw_status/status.h"
29 #include "pw_status/try.h"
30 #include "pw_sync/timed_thread_notification.h"
31 #include "pw_thread/thread_core.h"
32 
33 namespace pw::log_rpc {
34 
35 // RpcLogDrainThread is a single thread and single MultiSink::Listener that
36 // manages multiple log streams. It is a suitable option when a minimal
37 // thread count is desired but comes with the cost of individual log streams
38 // blocking each other's flushing.
39 class RpcLogDrainThread : public thread::ThreadCore,
40                           public multisink::MultiSink::Listener {
41  public:
RpcLogDrainThread(multisink::MultiSink & multisink,RpcLogDrainMap & drain_map,span<std::byte> encoding_buffer)42   RpcLogDrainThread(multisink::MultiSink& multisink,
43                     RpcLogDrainMap& drain_map,
44                     span<std::byte> encoding_buffer)
45       : drain_map_(drain_map),
46         multisink_(multisink),
47         encoding_buffer_(encoding_buffer) {}
48 
OnNewEntryAvailable()49   void OnNewEntryAvailable() override {
50     ready_to_flush_notification_.release();
51   }
52 
53   // Sequentially flushes each log stream.
Run()54   void Run() override {
55     for (auto& drain : drain_map_.drains()) {
56       multisink_.AttachDrain(drain);
57       drain.set_on_open_callback(
58           [this]() { this->ready_to_flush_notification_.release(); });
59     }
60     multisink_.AttachListener(*this);
61 
62     bool drains_pending = true;
63     std::optional<chrono::SystemClock::duration> min_delay =
64         chrono::SystemClock::duration::zero();
65     while (true) {
66       if (drains_pending && min_delay.has_value()) {
67         std::ignore =
68             ready_to_flush_notification_.try_acquire_for(min_delay.value());
69       } else {
70         ready_to_flush_notification_.acquire();
71       }
72       drains_pending = false;
73       min_delay = std::nullopt;
74       for (auto& drain : drain_map_.drains()) {
75         std::optional<chrono::SystemClock::duration> drain_ready_in =
76             drain.Trickle(encoding_buffer_);
77         if (drain_ready_in.has_value()) {
78           min_delay = std::min(drain_ready_in.value(),
79                                min_delay.value_or(drain_ready_in.value()));
80           drains_pending = true;
81         }
82       }
83     }
84   }
85 
86   // Opens a server writer to set up an unrequested log stream.
OpenUnrequestedLogStream(uint32_t channel_id,rpc::Server & rpc_server,LogService & log_service)87   Status OpenUnrequestedLogStream(uint32_t channel_id,
88                                   rpc::Server& rpc_server,
89                                   LogService& log_service) {
90     rpc::RawServerWriter writer =
91         rpc::RawServerWriter::Open<log::pw_rpc::raw::Logs::Listen>(
92             rpc_server, channel_id, log_service);
93     const Result<RpcLogDrain*> drain =
94         drain_map_.GetDrainFromChannelId(channel_id);
95     PW_TRY(drain.status());
96     return drain.value()->Open(writer);
97   }
98 
99  private:
100   sync::TimedThreadNotification ready_to_flush_notification_;
101   RpcLogDrainMap& drain_map_;
102   multisink::MultiSink& multisink_;
103   span<std::byte> encoding_buffer_;
104 };
105 
106 template <size_t kEncodingBufferSizeBytes>
107 class RpcLogDrainThreadWithBuffer final : public RpcLogDrainThread {
108  public:
RpcLogDrainThreadWithBuffer(multisink::MultiSink & multisink,RpcLogDrainMap & drain_map)109   RpcLogDrainThreadWithBuffer(multisink::MultiSink& multisink,
110                               RpcLogDrainMap& drain_map)
111       : RpcLogDrainThread(multisink, drain_map, encoding_buffer_array_) {}
112 
113  private:
114   static_assert(kEncodingBufferSizeBytes >=
115                     RpcLogDrain::kLogEntriesEncodeFrameSize +
116                         RpcLogDrain::kMinEntryBufferSize,
117                 "RpcLogDrainThread's encoding buffer must be large enough for "
118                 "at least one entry");
119 
120   std::byte encoding_buffer_array_[kEncodingBufferSizeBytes];
121 };
122 
123 }  // namespace pw::log_rpc
124