1 /*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/profiling/memory/shared_ring_buffer.h"
18
19 #include <array>
20 #include <mutex>
21 #include <optional>
22 #include <random>
23 #include <thread>
24 #include <unordered_map>
25
26 #include "test/gtest_and_gmock.h"
27
28 namespace perfetto {
29 namespace profiling {
30 namespace {
31
ToString(const SharedRingBuffer::Buffer & buf_and_size)32 std::string ToString(const SharedRingBuffer::Buffer& buf_and_size) {
33 return std::string(reinterpret_cast<const char*>(&buf_and_size.data[0]),
34 buf_and_size.size);
35 }
36
TryWrite(SharedRingBuffer * wr,const char * src,size_t size)37 bool TryWrite(SharedRingBuffer* wr, const char* src, size_t size) {
38 SharedRingBuffer::Buffer buf;
39 {
40 auto lock = wr->AcquireLock(ScopedSpinlock::Mode::Try);
41 if (!lock.locked())
42 return false;
43 buf = wr->BeginWrite(lock, size);
44 }
45 if (!buf)
46 return false;
47 memcpy(buf.data, src, size);
48 wr->EndWrite(std::move(buf));
49 return true;
50 }
51
StructuredTest(SharedRingBuffer * wr,SharedRingBuffer * rd)52 void StructuredTest(SharedRingBuffer* wr, SharedRingBuffer* rd) {
53 ASSERT_TRUE(wr);
54 ASSERT_TRUE(wr->is_valid());
55 ASSERT_TRUE(wr->size() == rd->size());
56 const size_t buf_size = wr->size();
57
58 // Test small writes.
59 ASSERT_TRUE(TryWrite(wr, "foo", 4));
60 ASSERT_TRUE(TryWrite(wr, "bar", 4));
61
62 {
63 auto buf_and_size = rd->BeginRead();
64 ASSERT_EQ(buf_and_size.size, 4u);
65 ASSERT_STREQ(reinterpret_cast<const char*>(&buf_and_size.data[0]), "foo");
66 rd->EndRead(std::move(buf_and_size));
67 }
68 {
69 auto buf_and_size = rd->BeginRead();
70 ASSERT_EQ(buf_and_size.size, 4u);
71 ASSERT_STREQ(reinterpret_cast<const char*>(&buf_and_size.data[0]), "bar");
72 rd->EndRead(std::move(buf_and_size));
73 }
74
75 for (int i = 0; i < 3; i++) {
76 auto buf_and_size = rd->BeginRead();
77 ASSERT_EQ(buf_and_size.data, nullptr);
78 ASSERT_EQ(buf_and_size.size, 0u);
79 }
80
81 // Test extremely large writes (fill the buffer)
82 for (int i = 0; i < 3; i++) {
83 // TryWrite precisely |buf_size| bytes (minus the size header itself).
84 std::string data(buf_size - sizeof(uint64_t), '.' + static_cast<char>(i));
85 ASSERT_TRUE(TryWrite(wr, data.data(), data.size()));
86 ASSERT_FALSE(TryWrite(wr, data.data(), data.size()));
87 ASSERT_FALSE(TryWrite(wr, "?", 1));
88
89 // And read it back
90 auto buf_and_size = rd->BeginRead();
91 ASSERT_EQ(ToString(buf_and_size), data);
92 rd->EndRead(std::move(buf_and_size));
93 }
94
95 // Test large writes that wrap.
96 std::string data(buf_size / 4 * 3 - sizeof(uint64_t), '!');
97 ASSERT_TRUE(TryWrite(wr, data.data(), data.size()));
98 ASSERT_FALSE(TryWrite(wr, data.data(), data.size()));
99 {
100 auto buf_and_size = rd->BeginRead();
101 ASSERT_EQ(ToString(buf_and_size), data);
102 rd->EndRead(std::move(buf_and_size));
103 }
104 data = std::string(base::GetSysPageSize() - sizeof(uint64_t), '#');
105 for (int i = 0; i < 4; i++)
106 ASSERT_TRUE(TryWrite(wr, data.data(), data.size()));
107
108 for (int i = 0; i < 4; i++) {
109 auto buf_and_size = rd->BeginRead();
110 ASSERT_EQ(buf_and_size.size, data.size());
111 ASSERT_EQ(ToString(buf_and_size), data);
112 rd->EndRead(std::move(buf_and_size));
113 }
114
115 // Test misaligned writes.
116 ASSERT_TRUE(TryWrite(wr, "1", 1));
117 ASSERT_TRUE(TryWrite(wr, "22", 2));
118 ASSERT_TRUE(TryWrite(wr, "333", 3));
119 ASSERT_TRUE(TryWrite(wr, "55555", 5));
120 ASSERT_TRUE(TryWrite(wr, "7777777", 7));
121 {
122 auto buf_and_size = rd->BeginRead();
123 ASSERT_EQ(ToString(buf_and_size), "1");
124 rd->EndRead(std::move(buf_and_size));
125 }
126 {
127 auto buf_and_size = rd->BeginRead();
128 ASSERT_EQ(ToString(buf_and_size), "22");
129 rd->EndRead(std::move(buf_and_size));
130 }
131 {
132 auto buf_and_size = rd->BeginRead();
133 ASSERT_EQ(ToString(buf_and_size), "333");
134 rd->EndRead(std::move(buf_and_size));
135 }
136 {
137 auto buf_and_size = rd->BeginRead();
138 ASSERT_EQ(ToString(buf_and_size), "55555");
139 rd->EndRead(std::move(buf_and_size));
140 }
141 {
142 auto buf_and_size = rd->BeginRead();
143 ASSERT_EQ(ToString(buf_and_size), "7777777");
144 rd->EndRead(std::move(buf_and_size));
145 }
146 }
147
TEST(SharedRingBufferTest,ReadShutdown)148 TEST(SharedRingBufferTest, ReadShutdown) {
149 const size_t kBufSize = base::GetSysPageSize() * 4;
150 std::optional<SharedRingBuffer> wr = SharedRingBuffer::Create(kBufSize);
151 ASSERT_TRUE(wr);
152 SharedRingBuffer rd =
153 *SharedRingBuffer::Attach(base::ScopedFile(dup(wr->fd())));
154 auto buf = rd.BeginRead();
155 wr = std::nullopt;
156 rd.EndRead(std::move(buf));
157 }
158
TEST(SharedRingBufferTest,WriteShutdown)159 TEST(SharedRingBufferTest, WriteShutdown) {
160 const size_t kBufSize = base::GetSysPageSize() * 4;
161 std::optional<SharedRingBuffer> rd = SharedRingBuffer::Create(kBufSize);
162 ASSERT_TRUE(rd);
163 SharedRingBuffer wr =
164 *SharedRingBuffer::Attach(base::ScopedFile(dup(rd->fd())));
165 SharedRingBuffer::Buffer buf;
166 {
167 auto lock = wr.AcquireLock(ScopedSpinlock::Mode::Blocking);
168 buf = wr.BeginWrite(lock, 10);
169 }
170 rd = std::nullopt;
171 memset(buf.data, 0, buf.size);
172 wr.EndWrite(std::move(buf));
173 }
174
TEST(SharedRingBufferTest,SingleThreadSameInstance)175 TEST(SharedRingBufferTest, SingleThreadSameInstance) {
176 const size_t kBufSize = base::GetSysPageSize() * 4;
177 std::optional<SharedRingBuffer> buf = SharedRingBuffer::Create(kBufSize);
178 StructuredTest(&*buf, &*buf);
179 }
180
TEST(SharedRingBufferTest,SingleThreadAttach)181 TEST(SharedRingBufferTest, SingleThreadAttach) {
182 const size_t kBufSize = base::GetSysPageSize() * 4;
183 std::optional<SharedRingBuffer> buf1 = SharedRingBuffer::Create(kBufSize);
184 std::optional<SharedRingBuffer> buf2 =
185 SharedRingBuffer::Attach(base::ScopedFile(dup(buf1->fd())));
186 StructuredTest(&*buf1, &*buf2);
187 }
188
TEST(SharedRingBufferTest,MultiThreadingTest)189 TEST(SharedRingBufferTest, MultiThreadingTest) {
190 const size_t kBufSize = base::GetSysPageSize() * 1024; // 4 MB
191 SharedRingBuffer rd = *SharedRingBuffer::Create(kBufSize);
192 SharedRingBuffer wr =
193 *SharedRingBuffer::Attach(base::ScopedFile(dup(rd.fd())));
194
195 std::mutex mutex;
196 std::unordered_map<std::string, int64_t> expected_contents;
197 std::atomic<bool> writers_enabled{false};
198
199 auto writer_thread_fn = [&wr, &expected_contents, &mutex,
200 &writers_enabled](size_t thread_id) {
201 while (!writers_enabled.load()) {
202 }
203 std::minstd_rand0 rnd_engine(static_cast<uint32_t>(thread_id));
204 std::uniform_int_distribution<size_t> dist(1, base::GetSysPageSize() * 8);
205 for (int i = 0; i < 1000; i++) {
206 size_t size = dist(rnd_engine);
207 ASSERT_GT(size, 0u);
208 std::string data;
209 data.resize(size);
210 std::generate(data.begin(), data.end(), rnd_engine);
211 if (TryWrite(&wr, data.data(), data.size())) {
212 std::lock_guard<std::mutex> lock(mutex);
213 expected_contents[std::move(data)]++;
214 } else {
215 std::this_thread::yield();
216 }
217 }
218 };
219
220 auto reader_thread_fn = [&rd, &expected_contents, &mutex, &writers_enabled] {
221 for (;;) {
222 auto buf_and_size = rd.BeginRead();
223 if (!buf_and_size) {
224 if (!writers_enabled.load()) {
225 // Failing to read after the writers are done means that there is no
226 // data left in the ring buffer.
227 return;
228 }
229 std::this_thread::yield();
230 continue;
231 }
232 ASSERT_GT(buf_and_size.size, 0u);
233 std::string data = ToString(buf_and_size);
234 std::lock_guard<std::mutex> lock(mutex);
235 expected_contents[std::move(data)]--;
236 rd.EndRead(std::move(buf_and_size));
237 }
238 };
239
240 constexpr size_t kNumWriterThreads = 4;
241 std::array<std::thread, kNumWriterThreads> writer_threads;
242 for (size_t i = 0; i < kNumWriterThreads; i++)
243 writer_threads[i] = std::thread(writer_thread_fn, i);
244
245 writers_enabled.store(true);
246
247 std::thread reader_thread(reader_thread_fn);
248
249 for (size_t i = 0; i < kNumWriterThreads; i++)
250 writer_threads[i].join();
251
252 writers_enabled.store(false);
253
254 reader_thread.join();
255 }
256
TEST(SharedRingBufferTest,InvalidSize)257 TEST(SharedRingBufferTest, InvalidSize) {
258 const size_t kBufSize = base::GetSysPageSize() * 4 + 1;
259 std::optional<SharedRingBuffer> wr = SharedRingBuffer::Create(kBufSize);
260 EXPECT_EQ(wr, std::nullopt);
261 }
262
TEST(SharedRingBufferTest,EmptyWrite)263 TEST(SharedRingBufferTest, EmptyWrite) {
264 const size_t kBufSize = base::GetSysPageSize() * 4;
265 std::optional<SharedRingBuffer> wr = SharedRingBuffer::Create(kBufSize);
266 ASSERT_TRUE(wr);
267 SharedRingBuffer::Buffer buf;
268 {
269 auto lock = wr->AcquireLock(ScopedSpinlock::Mode::Try);
270 ASSERT_TRUE(lock.locked());
271 buf = wr->BeginWrite(lock, 0);
272 }
273 EXPECT_TRUE(buf);
274 wr->EndWrite(std::move(buf));
275 }
276
277 } // namespace
278 } // namespace profiling
279 } // namespace perfetto
280