xref: /aosp_15_r20/external/perfetto/src/traced_relay/socket_relay_handler.h (revision 6dbdd20afdafa5e3ca9b8809fa73465d530080dc)
1 /*
2  * Copyright (C) 2023 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef SRC_TRACED_RELAY_SOCKET_RELAY_HANDLER_H_
18 #define SRC_TRACED_RELAY_SOCKET_RELAY_HANDLER_H_
19 
20 #include <poll.h>
21 
22 #include <cstring>
23 #include <deque>
24 #include <future>
25 #include <mutex>
26 #include <optional>
27 #include <thread>
28 #include <tuple>
29 
30 #include "perfetto/base/platform_handle.h"
31 #include "perfetto/ext/base/event_fd.h"
32 #include "perfetto/ext/base/flat_hash_map.h"
33 #include "perfetto/ext/base/thread_checker.h"
34 #include "perfetto/ext/base/unix_socket.h"
35 #include "perfetto/ext/ipc/basic_types.h"
36 
37 namespace perfetto {
38 
39 // FdPoller is a utility for waiting for IO events of a set of watched file
40 // descriptors. It's used for multiplexing non-blocking IO operations.
41 class FdPoller {
42  public:
43   // The interface class for observing IO events from the FdPoller class.
44   class Watcher {
45    public:
46     virtual ~Watcher();
47     // Called when |fd| can be read from without blocking. For a socket
48     // connection, this indicates the socket read buffer has some data.
49     virtual void OnFdReadable(base::PlatformHandle fd) = 0;
50     // Called when |fd| can be written to without blocking. For a socket
51     // connection, this indicates that the socket write buffer has some capacity
52     // for writting data into.
53     virtual void OnFdWritable(base::PlatformHandle fd) = 0;
54   };
55 
56   using WatchEvents = decltype(pollfd::events);
57 
58   explicit FdPoller(Watcher* watcher);
59 
60   // Watch and unwatch IO event for a given file descriptor.
WatchForRead(base::PlatformHandle fd)61   inline void WatchForRead(base::PlatformHandle fd) { WatchFd(fd, POLLIN); }
WatchForWrite(base::PlatformHandle fd)62   inline void WatchForWrite(base::PlatformHandle fd) { WatchFd(fd, POLLOUT); }
UnwatchForRead(base::PlatformHandle fd)63   inline void UnwatchForRead(base::PlatformHandle fd) { UnwatchFd(fd, POLLIN); }
UnwatchForWrite(base::PlatformHandle fd)64   inline void UnwatchForWrite(base::PlatformHandle fd) {
65     UnwatchFd(fd, POLLOUT);
66   }
67 
68   // Called when |fd| is no longer of interest (e.g. when |fd| is to be closed).
69   void RemoveWatch(base::PlatformHandle fd);
70 
71   // Poll for all watched events previously added with WatchForRead() and
72   // WatchForWrite().
73   //
74   // Must be called on poller thread.
75   void Poll();
76 
77   // Notifies the poller for pending updates. Calling Notify() will unblock the
78   // poller and make it return from Poll(). It is caller's responsibility to
79   // call Poll() again once the updates are complete.
80   //
81   // This can be (and typically is) called from any thread.
82   void Notify();
83 
84  private:
85   std::vector<pollfd>::iterator FindPollEvent(base::PlatformHandle fd);
86   void WatchFd(base::PlatformHandle fd, WatchEvents events);
87   void UnwatchFd(base::PlatformHandle fd, WatchEvents events);
88 
89   base::ThreadChecker thread_checker_;
90   Watcher* const watcher_;
91   base::EventFd notify_fd_;
92   std::vector<pollfd> poll_fds_;
93 };
94 
95 // This class groups a UnixSocketRaw with an associated ring buffer. The ring
96 // buffer is used as a temporary storage for data *read* from the socket.
97 class SocketWithBuffer {
98  public:
99   constexpr static size_t kBuffSize = ipc::kIPCBufferSize;
100 
101   base::UnixSocketRaw sock;
102 
103   // Points to the beginning of buffered data.
data()104   inline uint8_t* data() { return &buf_[0]; }
105   // Size of the buffered data.
data_size()106   inline size_t data_size() { return data_size_; }
107 
108   // Points to the beginning of the free space for buffering new data.
buffer()109   inline uint8_t* buffer() { return &buf_[data_size_]; }
110   // Size of the free space.
available_bytes()111   inline size_t available_bytes() { return buf_.size() - data_size_; }
112 
113   // Called when |bytes| of data is enqueued to the buffer.
EnqueueData(size_t bytes)114   void EnqueueData(size_t bytes) {
115     PERFETTO_CHECK(bytes <= available_bytes());
116     data_size_ += bytes;
117   }
118   // Called when |bytes| of data is dequeued from the buffer.
DequeueData(size_t bytes)119   void DequeueData(size_t bytes) {
120     PERFETTO_CHECK(bytes <= data_size());
121     memmove(data(), data() + bytes, data_size() - bytes);
122     data_size_ -= bytes;
123   }
124 
SocketWithBuffer()125   SocketWithBuffer() : buf_(kBuffSize) {}
126 
127   // Movable only.
128   SocketWithBuffer(SocketWithBuffer&& other) = default;
129   SocketWithBuffer& operator=(SocketWithBuffer&& other) = default;
130   SocketWithBuffer(const SocketWithBuffer& other) = delete;
131   SocketWithBuffer& operator=(const SocketWithBuffer& other) = delete;
132 
133  private:
134   std::vector<uint8_t> buf_;
135   size_t data_size_ = 0;
136 };
137 
138 using SocketPair = std::pair<SocketWithBuffer, SocketWithBuffer>;
139 
140 // SocketRelayHandler bidirectionally forwards data between paired sockets.
141 // Internally it multiplexes IO operations of the sockets using a FdPoller on a
142 // dedicated thread.
143 class SocketRelayHandler : public FdPoller::Watcher {
144  public:
145   SocketRelayHandler();
146   SocketRelayHandler(const SocketRelayHandler&) = delete;
147   SocketRelayHandler& operator=(const SocketRelayHandler&) = delete;
148   ~SocketRelayHandler() override;
149 
150   // Transfer a pair of sockets to be relayed. Can be called from any thread.
151   void AddSocketPair(std::unique_ptr<SocketPair> socket_pair);
152 
153   // The FdPoller::Watcher callbacks.
154   void OnFdReadable(base::PlatformHandle fd) override;
155   void OnFdWritable(base::PlatformHandle fd) override;
156 
157  private:
158   void Run();
159   void RemoveSocketPair(SocketWithBuffer&, SocketWithBuffer&);
160 
161   // A helper for running a callable object on |io_thread_|.
162   template <typename Callable>
RunOnIOThread(Callable && c)163   void RunOnIOThread(Callable&& c) {
164     std::lock_guard<std::mutex> lock(mutex_);
165     pending_tasks_.emplace_back(std::forward<Callable>(c));
166     fd_poller_.Notify();
167   }
168 
169   std::optional<std::tuple<SocketWithBuffer&, SocketWithBuffer&>> GetSocketPair(
170       base::PlatformHandle fd);
171 
172   base::FlatHashMap<base::PlatformHandle, SocketPair*> socket_pairs_by_fd_;
173   std::vector<std::unique_ptr<SocketPair>> socket_pairs_;
174 
175   FdPoller fd_poller_;
176 
177   // The thread that fd_poller_ polls for IO events. Most methods of this class
178   // asserts to be running on this thread.
179   std::thread io_thread_;
180   base::ThreadChecker io_thread_checker_;
181 
182   bool exited_ = false;
183 
184   //--------------- Member data with multi-thread access ------------------
185   std::mutex mutex_;
186   std::deque<std::packaged_task<void()>> pending_tasks_;
187 };
188 
189 }  // namespace perfetto
190 #endif  // SRC_TRACED_RELAY_SOCKET_RELAY_HANDLER_H_
191