1 /* 2 * Copyright (C) 2023 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef SRC_TRACED_RELAY_SOCKET_RELAY_HANDLER_H_ 18 #define SRC_TRACED_RELAY_SOCKET_RELAY_HANDLER_H_ 19 20 #include <poll.h> 21 22 #include <cstring> 23 #include <deque> 24 #include <future> 25 #include <mutex> 26 #include <optional> 27 #include <thread> 28 #include <tuple> 29 30 #include "perfetto/base/platform_handle.h" 31 #include "perfetto/ext/base/event_fd.h" 32 #include "perfetto/ext/base/flat_hash_map.h" 33 #include "perfetto/ext/base/thread_checker.h" 34 #include "perfetto/ext/base/unix_socket.h" 35 #include "perfetto/ext/ipc/basic_types.h" 36 37 namespace perfetto { 38 39 // FdPoller is a utility for waiting for IO events of a set of watched file 40 // descriptors. It's used for multiplexing non-blocking IO operations. 41 class FdPoller { 42 public: 43 // The interface class for observing IO events from the FdPoller class. 44 class Watcher { 45 public: 46 virtual ~Watcher(); 47 // Called when |fd| can be read from without blocking. For a socket 48 // connection, this indicates the socket read buffer has some data. 49 virtual void OnFdReadable(base::PlatformHandle fd) = 0; 50 // Called when |fd| can be written to without blocking. For a socket 51 // connection, this indicates that the socket write buffer has some capacity 52 // for writting data into. 53 virtual void OnFdWritable(base::PlatformHandle fd) = 0; 54 }; 55 56 using WatchEvents = decltype(pollfd::events); 57 58 explicit FdPoller(Watcher* watcher); 59 60 // Watch and unwatch IO event for a given file descriptor. WatchForRead(base::PlatformHandle fd)61 inline void WatchForRead(base::PlatformHandle fd) { WatchFd(fd, POLLIN); } WatchForWrite(base::PlatformHandle fd)62 inline void WatchForWrite(base::PlatformHandle fd) { WatchFd(fd, POLLOUT); } UnwatchForRead(base::PlatformHandle fd)63 inline void UnwatchForRead(base::PlatformHandle fd) { UnwatchFd(fd, POLLIN); } UnwatchForWrite(base::PlatformHandle fd)64 inline void UnwatchForWrite(base::PlatformHandle fd) { 65 UnwatchFd(fd, POLLOUT); 66 } 67 68 // Called when |fd| is no longer of interest (e.g. when |fd| is to be closed). 69 void RemoveWatch(base::PlatformHandle fd); 70 71 // Poll for all watched events previously added with WatchForRead() and 72 // WatchForWrite(). 73 // 74 // Must be called on poller thread. 75 void Poll(); 76 77 // Notifies the poller for pending updates. Calling Notify() will unblock the 78 // poller and make it return from Poll(). It is caller's responsibility to 79 // call Poll() again once the updates are complete. 80 // 81 // This can be (and typically is) called from any thread. 82 void Notify(); 83 84 private: 85 std::vector<pollfd>::iterator FindPollEvent(base::PlatformHandle fd); 86 void WatchFd(base::PlatformHandle fd, WatchEvents events); 87 void UnwatchFd(base::PlatformHandle fd, WatchEvents events); 88 89 base::ThreadChecker thread_checker_; 90 Watcher* const watcher_; 91 base::EventFd notify_fd_; 92 std::vector<pollfd> poll_fds_; 93 }; 94 95 // This class groups a UnixSocketRaw with an associated ring buffer. The ring 96 // buffer is used as a temporary storage for data *read* from the socket. 97 class SocketWithBuffer { 98 public: 99 constexpr static size_t kBuffSize = ipc::kIPCBufferSize; 100 101 base::UnixSocketRaw sock; 102 103 // Points to the beginning of buffered data. data()104 inline uint8_t* data() { return &buf_[0]; } 105 // Size of the buffered data. data_size()106 inline size_t data_size() { return data_size_; } 107 108 // Points to the beginning of the free space for buffering new data. buffer()109 inline uint8_t* buffer() { return &buf_[data_size_]; } 110 // Size of the free space. available_bytes()111 inline size_t available_bytes() { return buf_.size() - data_size_; } 112 113 // Called when |bytes| of data is enqueued to the buffer. EnqueueData(size_t bytes)114 void EnqueueData(size_t bytes) { 115 PERFETTO_CHECK(bytes <= available_bytes()); 116 data_size_ += bytes; 117 } 118 // Called when |bytes| of data is dequeued from the buffer. DequeueData(size_t bytes)119 void DequeueData(size_t bytes) { 120 PERFETTO_CHECK(bytes <= data_size()); 121 memmove(data(), data() + bytes, data_size() - bytes); 122 data_size_ -= bytes; 123 } 124 SocketWithBuffer()125 SocketWithBuffer() : buf_(kBuffSize) {} 126 127 // Movable only. 128 SocketWithBuffer(SocketWithBuffer&& other) = default; 129 SocketWithBuffer& operator=(SocketWithBuffer&& other) = default; 130 SocketWithBuffer(const SocketWithBuffer& other) = delete; 131 SocketWithBuffer& operator=(const SocketWithBuffer& other) = delete; 132 133 private: 134 std::vector<uint8_t> buf_; 135 size_t data_size_ = 0; 136 }; 137 138 using SocketPair = std::pair<SocketWithBuffer, SocketWithBuffer>; 139 140 // SocketRelayHandler bidirectionally forwards data between paired sockets. 141 // Internally it multiplexes IO operations of the sockets using a FdPoller on a 142 // dedicated thread. 143 class SocketRelayHandler : public FdPoller::Watcher { 144 public: 145 SocketRelayHandler(); 146 SocketRelayHandler(const SocketRelayHandler&) = delete; 147 SocketRelayHandler& operator=(const SocketRelayHandler&) = delete; 148 ~SocketRelayHandler() override; 149 150 // Transfer a pair of sockets to be relayed. Can be called from any thread. 151 void AddSocketPair(std::unique_ptr<SocketPair> socket_pair); 152 153 // The FdPoller::Watcher callbacks. 154 void OnFdReadable(base::PlatformHandle fd) override; 155 void OnFdWritable(base::PlatformHandle fd) override; 156 157 private: 158 void Run(); 159 void RemoveSocketPair(SocketWithBuffer&, SocketWithBuffer&); 160 161 // A helper for running a callable object on |io_thread_|. 162 template <typename Callable> RunOnIOThread(Callable && c)163 void RunOnIOThread(Callable&& c) { 164 std::lock_guard<std::mutex> lock(mutex_); 165 pending_tasks_.emplace_back(std::forward<Callable>(c)); 166 fd_poller_.Notify(); 167 } 168 169 std::optional<std::tuple<SocketWithBuffer&, SocketWithBuffer&>> GetSocketPair( 170 base::PlatformHandle fd); 171 172 base::FlatHashMap<base::PlatformHandle, SocketPair*> socket_pairs_by_fd_; 173 std::vector<std::unique_ptr<SocketPair>> socket_pairs_; 174 175 FdPoller fd_poller_; 176 177 // The thread that fd_poller_ polls for IO events. Most methods of this class 178 // asserts to be running on this thread. 179 std::thread io_thread_; 180 base::ThreadChecker io_thread_checker_; 181 182 bool exited_ = false; 183 184 //--------------- Member data with multi-thread access ------------------ 185 std::mutex mutex_; 186 std::deque<std::packaged_task<void()>> pending_tasks_; 187 }; 188 189 } // namespace perfetto 190 #endif // SRC_TRACED_RELAY_SOCKET_RELAY_HANDLER_H_ 191