1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "test/core/transport/binder/end2end/fake_binder.h"
16
17 #include <string>
18 #include <utility>
19
20 #include <grpc/support/log.h>
21
22 #include "src/core/lib/gprpp/crash.h"
23
24 namespace grpc_binder {
25 namespace end2end_testing {
26
27 TransactionProcessor* g_transaction_processor = nullptr;
28
GetDataSize() const29 int32_t FakeWritableParcel::GetDataSize() const { return data_size_; }
30
WriteInt32(int32_t data)31 absl::Status FakeWritableParcel::WriteInt32(int32_t data) {
32 data_.push_back(data);
33 data_size_ += sizeof(int32_t);
34 return absl::OkStatus();
35 }
36
WriteInt64(int64_t data)37 absl::Status FakeWritableParcel::WriteInt64(int64_t data) {
38 data_.push_back(data);
39 data_size_ += sizeof(int64_t);
40 return absl::OkStatus();
41 }
42
WriteBinder(HasRawBinder * binder)43 absl::Status FakeWritableParcel::WriteBinder(HasRawBinder* binder) {
44 data_.push_back(binder->GetRawBinder());
45 data_size_ += sizeof(void*);
46 return absl::OkStatus();
47 }
48
WriteString(absl::string_view s)49 absl::Status FakeWritableParcel::WriteString(absl::string_view s) {
50 data_.push_back(std::string(s));
51 data_size_ += s.size();
52 return absl::OkStatus();
53 }
54
WriteByteArray(const int8_t * buffer,int32_t length)55 absl::Status FakeWritableParcel::WriteByteArray(const int8_t* buffer,
56 int32_t length) {
57 data_.push_back(std::vector<int8_t>(buffer, buffer + length));
58 data_size_ += length;
59 return absl::OkStatus();
60 }
61
GetDataSize() const62 int32_t FakeReadableParcel::GetDataSize() const { return data_size_; }
63
ReadInt32(int32_t * data)64 absl::Status FakeReadableParcel::ReadInt32(int32_t* data) {
65 if (data_position_ >= data_.size() ||
66 !absl::holds_alternative<int32_t>(data_[data_position_])) {
67 return absl::InternalError("ReadInt32 failed");
68 }
69 *data = absl::get<int32_t>(data_[data_position_++]);
70 return absl::OkStatus();
71 }
72
ReadInt64(int64_t * data)73 absl::Status FakeReadableParcel::ReadInt64(int64_t* data) {
74 if (data_position_ >= data_.size() ||
75 !absl::holds_alternative<int64_t>(data_[data_position_])) {
76 return absl::InternalError("ReadInt64 failed");
77 }
78 *data = absl::get<int64_t>(data_[data_position_++]);
79 return absl::OkStatus();
80 }
81
ReadBinder(std::unique_ptr<Binder> * data)82 absl::Status FakeReadableParcel::ReadBinder(std::unique_ptr<Binder>* data) {
83 if (data_position_ >= data_.size() ||
84 !absl::holds_alternative<void*>(data_[data_position_])) {
85 return absl::InternalError("ReadBinder failed");
86 }
87 void* endpoint = absl::get<void*>(data_[data_position_++]);
88 if (!endpoint) return absl::InternalError("ReadBinder failed");
89 *data = std::make_unique<FakeBinder>(static_cast<FakeEndpoint*>(endpoint));
90 return absl::OkStatus();
91 }
92
ReadString(std::string * str)93 absl::Status FakeReadableParcel::ReadString(std::string* str) {
94 if (data_position_ >= data_.size() ||
95 !absl::holds_alternative<std::string>(data_[data_position_])) {
96 return absl::InternalError("ReadString failed");
97 }
98 *str = absl::get<std::string>(data_[data_position_++]);
99 return absl::OkStatus();
100 }
101
ReadByteArray(std::string * data)102 absl::Status FakeReadableParcel::ReadByteArray(std::string* data) {
103 if (data_position_ >= data_.size() ||
104 !absl::holds_alternative<std::vector<int8_t>>(data_[data_position_])) {
105 return absl::InternalError("ReadByteArray failed");
106 }
107 const std::vector<int8_t>& byte_array =
108 absl::get<std::vector<int8_t>>(data_[data_position_++]);
109 data->resize(byte_array.size());
110 for (size_t i = 0; i < byte_array.size(); ++i) {
111 (*data)[i] = byte_array[i];
112 }
113 return absl::OkStatus();
114 }
115
Transact(BinderTransportTxCode tx_code)116 absl::Status FakeBinder::Transact(BinderTransportTxCode tx_code) {
117 endpoint_->tunnel->EnQueueTransaction(endpoint_->other_end, tx_code,
118 input_->MoveData());
119 return absl::OkStatus();
120 }
121
FakeTransactionReceiver(grpc_core::RefCountedPtr<WireReader> wire_reader_ref,TransactionReceiver::OnTransactCb transact_cb)122 FakeTransactionReceiver::FakeTransactionReceiver(
123 grpc_core::RefCountedPtr<WireReader> wire_reader_ref,
124 TransactionReceiver::OnTransactCb transact_cb) {
125 persistent_tx_receiver_ = &g_transaction_processor->NewPersistentTxReceiver(
126 std::move(wire_reader_ref), std::move(transact_cb),
127 std::make_unique<FakeBinderTunnel>());
128 }
129
ConstructTxReceiver(grpc_core::RefCountedPtr<WireReader> wire_reader_ref,TransactionReceiver::OnTransactCb cb) const130 std::unique_ptr<TransactionReceiver> FakeBinder::ConstructTxReceiver(
131 grpc_core::RefCountedPtr<WireReader> wire_reader_ref,
132 TransactionReceiver::OnTransactCb cb) const {
133 return std::make_unique<FakeTransactionReceiver>(wire_reader_ref, cb);
134 }
135
GetRawBinder()136 void* FakeTransactionReceiver::GetRawBinder() {
137 return persistent_tx_receiver_->tunnel_->GetSendEndpoint();
138 }
139
GetSender() const140 std::unique_ptr<Binder> FakeTransactionReceiver::GetSender() const {
141 return std::make_unique<FakeBinder>(
142 persistent_tx_receiver_->tunnel_->GetSendEndpoint());
143 }
144
PersistentFakeTransactionReceiver(grpc_core::RefCountedPtr<WireReader> wire_reader_ref,TransactionReceiver::OnTransactCb cb,std::unique_ptr<FakeBinderTunnel> tunnel)145 PersistentFakeTransactionReceiver::PersistentFakeTransactionReceiver(
146 grpc_core::RefCountedPtr<WireReader> wire_reader_ref,
147 TransactionReceiver::OnTransactCb cb,
148 std::unique_ptr<FakeBinderTunnel> tunnel)
149 : wire_reader_ref_(std::move(wire_reader_ref)),
150 callback_(std::move(cb)),
151 tunnel_(std::move(tunnel)) {
152 FakeEndpoint* recv_endpoint = tunnel_->GetRecvEndpoint();
153 recv_endpoint->owner = this;
154 }
155
TransactionProcessor(absl::Duration delay)156 TransactionProcessor::TransactionProcessor(absl::Duration delay)
157 : delay_nsec_(absl::ToInt64Nanoseconds(delay)),
158 tx_thread_(
159 "process-thread",
160 [](void* arg) {
161 grpc_core::ExecCtx exec_ctx;
162 auto* self = static_cast<TransactionProcessor*>(arg);
163 self->ProcessLoop();
164 },
165 this),
166 terminated_(false) {
167 tx_thread_.Start();
168 }
169
SetDelay(absl::Duration delay)170 void TransactionProcessor::SetDelay(absl::Duration delay) {
171 delay_nsec_ = absl::ToInt64Nanoseconds(delay);
172 }
173
Terminate()174 void TransactionProcessor::Terminate() {
175 if (!terminated_.load(std::memory_order_seq_cst)) {
176 gpr_log(GPR_INFO, "Terminating the processor");
177 terminated_.store(true, std::memory_order_seq_cst);
178 tx_thread_.Join();
179 gpr_log(GPR_INFO, "Processor terminated");
180 }
181 }
182
WaitForNextTransaction()183 void TransactionProcessor::WaitForNextTransaction() {
184 absl::Time now = absl::Now();
185 if (now < deliver_time_) {
186 absl::Duration diff = deliver_time_ - now;
187 // Release the lock before going to sleep.
188 mu_.Unlock();
189 absl::SleepFor(diff);
190 mu_.Lock();
191 }
192 }
193
Flush()194 void TransactionProcessor::Flush() {
195 while (true) {
196 FakeEndpoint* target = nullptr;
197 BinderTransportTxCode tx_code{};
198 FakeData data;
199 mu_.Lock();
200 if (tx_queue_.empty()) {
201 mu_.Unlock();
202 break;
203 }
204 WaitForNextTransaction();
205 std::tie(target, tx_code, data) = std::move(tx_queue_.front());
206 tx_queue_.pop();
207 if (!tx_queue_.empty()) {
208 deliver_time_ = absl::Now() + GetRandomDelay();
209 }
210 mu_.Unlock();
211 auto* tx_receiver =
212 static_cast<PersistentFakeTransactionReceiver*>(target->owner);
213 auto parcel = std::make_unique<FakeReadableParcel>(std::move(data));
214 tx_receiver->Receive(tx_code, parcel.get()).IgnoreError();
215 }
216 }
217
ProcessLoop()218 void TransactionProcessor::ProcessLoop() {
219 while (!terminated_.load(std::memory_order_seq_cst)) {
220 FakeEndpoint* target = nullptr;
221 BinderTransportTxCode tx_code{};
222 FakeData data;
223 mu_.Lock();
224 if (tx_queue_.empty()) {
225 mu_.Unlock();
226 continue;
227 }
228 WaitForNextTransaction();
229 std::tie(target, tx_code, data) = std::move(tx_queue_.front());
230 tx_queue_.pop();
231 if (!tx_queue_.empty()) {
232 deliver_time_ = absl::Now() + GetRandomDelay();
233 }
234 mu_.Unlock();
235 auto* tx_receiver =
236 static_cast<PersistentFakeTransactionReceiver*>(target->owner);
237 auto parcel = std::make_unique<FakeReadableParcel>(std::move(data));
238 tx_receiver->Receive(tx_code, parcel.get()).IgnoreError();
239 grpc_core::ExecCtx::Get()->Flush();
240 }
241 Flush();
242 }
243
GetRandomDelay()244 absl::Duration TransactionProcessor::GetRandomDelay() {
245 int64_t delay =
246 absl::Uniform<int64_t>(bit_gen_, delay_nsec_ / 2, delay_nsec_);
247 return absl::Nanoseconds(delay);
248 }
249
EnQueueTransaction(FakeEndpoint * target,BinderTransportTxCode tx_code,FakeData data)250 void TransactionProcessor::EnQueueTransaction(FakeEndpoint* target,
251 BinderTransportTxCode tx_code,
252 FakeData data) {
253 grpc_core::MutexLock lock(&mu_);
254 if (tx_queue_.empty()) {
255 // This is the first transaction in the queue. Compute its deliver time.
256 deliver_time_ = absl::Now() + GetRandomDelay();
257 }
258 tx_queue_.emplace(target, tx_code, std::move(data));
259 }
260
FakeBinderTunnel()261 FakeBinderTunnel::FakeBinderTunnel()
262 : send_endpoint_(std::make_unique<FakeEndpoint>(this)),
263 recv_endpoint_(std::make_unique<FakeEndpoint>(this)) {
264 send_endpoint_->other_end = recv_endpoint_.get();
265 recv_endpoint_->other_end = send_endpoint_.get();
266 }
267
268 std::pair<std::unique_ptr<Binder>, std::unique_ptr<TransactionReceiver>>
NewBinderPair(TransactionReceiver::OnTransactCb transact_cb)269 NewBinderPair(TransactionReceiver::OnTransactCb transact_cb) {
270 auto tx_receiver = std::make_unique<FakeTransactionReceiver>(
271 nullptr, std::move(transact_cb));
272 std::unique_ptr<Binder> sender = tx_receiver->GetSender();
273 return std::make_pair(std::move(sender), std::move(tx_receiver));
274 }
275
276 } // namespace end2end_testing
277 } // namespace grpc_binder
278