1 /*
2 * Copyright (C) 2023 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/traced_relay/socket_relay_handler.h"
18
19 #include <chrono>
20 #include <cstring>
21 #include <memory>
22 #include <random>
23 #include <string>
24 #include <thread>
25 #include <utility>
26
27 #include "perfetto/ext/base/threading/thread_pool.h"
28 #include "perfetto/ext/base/unix_socket.h"
29
30 #include "test/gtest_and_gmock.h"
31
32 using testing::Values;
33
34 namespace perfetto {
35 namespace {
36
37 using RawSocketPair = std::pair<base::UnixSocketRaw, base::UnixSocketRaw>;
38 using RngValueType = std::minstd_rand0::result_type;
39
40 struct TestClient {
41 RawSocketPair endpoint_sockets;
42 std::minstd_rand0 data_prng;
43 std::thread client_thread;
44 };
45
46 class SocketRelayHandlerTest : public ::testing::TestWithParam<uint32_t> {
47 protected:
SetUp()48 void SetUp() override {
49 socket_relay_handler_ = std::make_unique<SocketRelayHandler>();
50
51 for (uint32_t i = 0; i < GetParam(); i++) {
52 TestClient client{SetUpEndToEndSockets(), std::minstd_rand0(i), {}};
53 test_clients_.push_back(std::move(client));
54 }
55 }
TearDown()56 void TearDown() override { socket_relay_handler_ = nullptr; }
57
SetUpEndToEndSockets()58 RawSocketPair SetUpEndToEndSockets() {
59 // Creates 2 SocketPairs:
60 // sock1 <-> sock2 <-> SocketRelayHandler <-> sock3 <-> sock4.
61 // sock2 and sock3 are transferred to the SocketRelayHandler.
62 // We test by reading and writing bidirectionally using sock1 and sock4.
63 auto [sock1, sock2] = base::UnixSocketRaw::CreatePairPosix(
64 base::SockFamily::kUnix, base::SockType::kStream);
65 sock2.SetBlocking(false);
66
67 auto [sock3, sock4] = base::UnixSocketRaw::CreatePairPosix(
68 base::SockFamily::kUnix, base::SockType::kStream);
69 sock3.SetBlocking(false);
70
71 auto socket_pair = std::make_unique<SocketPair>();
72 socket_pair->first.sock = std::move(sock2);
73 socket_pair->second.sock = std::move(sock3);
74
75 socket_relay_handler_->AddSocketPair(std::move(socket_pair));
76
77 RawSocketPair endpoint_sockets;
78 endpoint_sockets.first = std::move(sock1);
79 endpoint_sockets.second = std::move(sock4);
80
81 return endpoint_sockets;
82 }
83
84 std::unique_ptr<SocketRelayHandler> socket_relay_handler_;
85 std::vector<TestClient> test_clients_;
86 // Use fewer receiver threads than sender threads.
87 base::ThreadPool receiver_thread_pool_{1 + GetParam() / 10};
88 };
89
TEST(SocketWithBufferTest,EnqueueDequeue)90 TEST(SocketWithBufferTest, EnqueueDequeue) {
91 SocketWithBuffer socket_with_buffer;
92 // No data initially.
93 EXPECT_EQ(0u, socket_with_buffer.data_size());
94
95 // Has room for writing some bytes into.
96 std::string data = "12345678901234567890";
97 EXPECT_GT(socket_with_buffer.available_bytes(), data.size());
98
99 memcpy(socket_with_buffer.buffer(), data.data(), data.size());
100 socket_with_buffer.EnqueueData(data.size());
101 EXPECT_EQ(data.size(), socket_with_buffer.data_size());
102
103 // Dequeue some bytes.
104 socket_with_buffer.DequeueData(5);
105 EXPECT_EQ(socket_with_buffer.data_size(), data.size() - 5);
106 std::string buffered_data(reinterpret_cast<char*>(socket_with_buffer.data()),
107 socket_with_buffer.data_size());
108 EXPECT_EQ(buffered_data, "678901234567890");
109 }
110
111 // Test the SocketRelayHander with randomized request and response data.
TEST_P(SocketRelayHandlerTest,RandomizedRequestResponse)112 TEST_P(SocketRelayHandlerTest, RandomizedRequestResponse) {
113 #if defined(ADDRESS_SANITIZER) || defined(THREAD_SANITIZER) || \
114 defined(MEMORY_SANITIZER) || defined(LEAK_SANITIZER)
115 // Reduce the test strength for sanitizer builds.
116 constexpr size_t kMaxMsgSizeRng = 1 << 16;
117 constexpr size_t kMaxNumRequests = 10;
118 #else
119 // The max message size in the number of RNG calls.
120 constexpr size_t kMaxMsgSizeRng = 1 << 18;
121 // The max number of requests.
122 constexpr size_t kMaxNumRequests = 25;
123 #endif
124
125 // Create the threads for sending and receiving data through the
126 // SocketRelayHandler.
127 for (auto& client : test_clients_) {
128 auto* thread_pool = &receiver_thread_pool_;
129
130 auto thread_func = [&client, thread_pool]() {
131 auto& rng = client.data_prng;
132
133 // The max number of requests.
134 const size_t num_requests = rng() % kMaxNumRequests;
135
136 for (size_t j = 0; j < num_requests; j++) {
137 auto& send_endpoint = client.endpoint_sockets.first;
138 auto& receive_endpoint = client.endpoint_sockets.second;
139
140 auto req_size = rng() % kMaxMsgSizeRng;
141
142 // Generate the random request.
143 std::vector<RngValueType> request;
144 request.reserve(req_size);
145 for (size_t r = 0; r < req_size; r++) {
146 request.emplace_back(rng());
147 }
148
149 // Create a buffer for receiving the request.
150 std::vector<RngValueType> received_request(request.size());
151
152 std::mutex mutex;
153 std::condition_variable cv;
154 std::unique_lock<std::mutex> lock(mutex);
155 bool done = false;
156
157 // Blocking receive on the thread pool.
158 thread_pool->PostTask([&]() {
159 const size_t bytes_to_receive =
160 received_request.size() * sizeof(RngValueType);
161 uint8_t* receive_buffer =
162 reinterpret_cast<uint8_t*>(received_request.data());
163 size_t bytes_received = 0;
164
165 // Perform a blocking read until we received the expected bytes.
166 while (bytes_received < bytes_to_receive) {
167 ssize_t rsize = PERFETTO_EINTR(
168 receive_endpoint.Receive(receive_buffer + bytes_received,
169 bytes_to_receive - bytes_received));
170 if (rsize <= 0)
171 break;
172 bytes_received += static_cast<size_t>(rsize);
173
174 std::this_thread::yield(); // Adds some scheduling randomness.
175 }
176
177 std::lock_guard<std::mutex> inner_lock(mutex);
178 done = true;
179 cv.notify_one();
180 });
181
182 // Perform a blocking send of the request data.
183 PERFETTO_EINTR(send_endpoint.Send(
184 request.data(), request.size() * sizeof(RngValueType)));
185
186 // Wait until the request is fully received.
187 cv.wait(lock, [&done] { return done; });
188
189 // Check data integrity.
190 EXPECT_EQ(request, received_request);
191
192 // Add some randomness to timing.
193 std::this_thread::sleep_for(std::chrono::microseconds(rng() % 1000));
194
195 // Emulate the response by reversing the data flow direction.
196 std::swap(send_endpoint, receive_endpoint);
197 }
198 };
199
200 client.client_thread = std::thread(std::move(thread_func));
201 }
202
203 for (auto& client : test_clients_) {
204 client.client_thread.join();
205 }
206 }
207
208 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
209 INSTANTIATE_TEST_SUITE_P(ByConnections, SocketRelayHandlerTest, Values(1, 5));
210 #else
211 INSTANTIATE_TEST_SUITE_P(ByConnections,
212 SocketRelayHandlerTest,
213 Values(1, 5, 25));
214 #endif
215 } // namespace
216 } // namespace perfetto
217