xref: /aosp_15_r20/external/grpc-grpc/test/core/transport/binder/end2end/fake_binder.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "test/core/transport/binder/end2end/fake_binder.h"
16 
17 #include <string>
18 #include <utility>
19 
20 #include <grpc/support/log.h>
21 
22 #include "src/core/lib/gprpp/crash.h"
23 
24 namespace grpc_binder {
25 namespace end2end_testing {
26 
27 TransactionProcessor* g_transaction_processor = nullptr;
28 
GetDataSize() const29 int32_t FakeWritableParcel::GetDataSize() const { return data_size_; }
30 
WriteInt32(int32_t data)31 absl::Status FakeWritableParcel::WriteInt32(int32_t data) {
32   data_.push_back(data);
33   data_size_ += sizeof(int32_t);
34   return absl::OkStatus();
35 }
36 
WriteInt64(int64_t data)37 absl::Status FakeWritableParcel::WriteInt64(int64_t data) {
38   data_.push_back(data);
39   data_size_ += sizeof(int64_t);
40   return absl::OkStatus();
41 }
42 
WriteBinder(HasRawBinder * binder)43 absl::Status FakeWritableParcel::WriteBinder(HasRawBinder* binder) {
44   data_.push_back(binder->GetRawBinder());
45   data_size_ += sizeof(void*);
46   return absl::OkStatus();
47 }
48 
WriteString(absl::string_view s)49 absl::Status FakeWritableParcel::WriteString(absl::string_view s) {
50   data_.push_back(std::string(s));
51   data_size_ += s.size();
52   return absl::OkStatus();
53 }
54 
WriteByteArray(const int8_t * buffer,int32_t length)55 absl::Status FakeWritableParcel::WriteByteArray(const int8_t* buffer,
56                                                 int32_t length) {
57   data_.push_back(std::vector<int8_t>(buffer, buffer + length));
58   data_size_ += length;
59   return absl::OkStatus();
60 }
61 
GetDataSize() const62 int32_t FakeReadableParcel::GetDataSize() const { return data_size_; }
63 
ReadInt32(int32_t * data)64 absl::Status FakeReadableParcel::ReadInt32(int32_t* data) {
65   if (data_position_ >= data_.size() ||
66       !absl::holds_alternative<int32_t>(data_[data_position_])) {
67     return absl::InternalError("ReadInt32 failed");
68   }
69   *data = absl::get<int32_t>(data_[data_position_++]);
70   return absl::OkStatus();
71 }
72 
ReadInt64(int64_t * data)73 absl::Status FakeReadableParcel::ReadInt64(int64_t* data) {
74   if (data_position_ >= data_.size() ||
75       !absl::holds_alternative<int64_t>(data_[data_position_])) {
76     return absl::InternalError("ReadInt64 failed");
77   }
78   *data = absl::get<int64_t>(data_[data_position_++]);
79   return absl::OkStatus();
80 }
81 
ReadBinder(std::unique_ptr<Binder> * data)82 absl::Status FakeReadableParcel::ReadBinder(std::unique_ptr<Binder>* data) {
83   if (data_position_ >= data_.size() ||
84       !absl::holds_alternative<void*>(data_[data_position_])) {
85     return absl::InternalError("ReadBinder failed");
86   }
87   void* endpoint = absl::get<void*>(data_[data_position_++]);
88   if (!endpoint) return absl::InternalError("ReadBinder failed");
89   *data = std::make_unique<FakeBinder>(static_cast<FakeEndpoint*>(endpoint));
90   return absl::OkStatus();
91 }
92 
ReadString(std::string * str)93 absl::Status FakeReadableParcel::ReadString(std::string* str) {
94   if (data_position_ >= data_.size() ||
95       !absl::holds_alternative<std::string>(data_[data_position_])) {
96     return absl::InternalError("ReadString failed");
97   }
98   *str = absl::get<std::string>(data_[data_position_++]);
99   return absl::OkStatus();
100 }
101 
ReadByteArray(std::string * data)102 absl::Status FakeReadableParcel::ReadByteArray(std::string* data) {
103   if (data_position_ >= data_.size() ||
104       !absl::holds_alternative<std::vector<int8_t>>(data_[data_position_])) {
105     return absl::InternalError("ReadByteArray failed");
106   }
107   const std::vector<int8_t>& byte_array =
108       absl::get<std::vector<int8_t>>(data_[data_position_++]);
109   data->resize(byte_array.size());
110   for (size_t i = 0; i < byte_array.size(); ++i) {
111     (*data)[i] = byte_array[i];
112   }
113   return absl::OkStatus();
114 }
115 
Transact(BinderTransportTxCode tx_code)116 absl::Status FakeBinder::Transact(BinderTransportTxCode tx_code) {
117   endpoint_->tunnel->EnQueueTransaction(endpoint_->other_end, tx_code,
118                                         input_->MoveData());
119   return absl::OkStatus();
120 }
121 
FakeTransactionReceiver(grpc_core::RefCountedPtr<WireReader> wire_reader_ref,TransactionReceiver::OnTransactCb transact_cb)122 FakeTransactionReceiver::FakeTransactionReceiver(
123     grpc_core::RefCountedPtr<WireReader> wire_reader_ref,
124     TransactionReceiver::OnTransactCb transact_cb) {
125   persistent_tx_receiver_ = &g_transaction_processor->NewPersistentTxReceiver(
126       std::move(wire_reader_ref), std::move(transact_cb),
127       std::make_unique<FakeBinderTunnel>());
128 }
129 
ConstructTxReceiver(grpc_core::RefCountedPtr<WireReader> wire_reader_ref,TransactionReceiver::OnTransactCb cb) const130 std::unique_ptr<TransactionReceiver> FakeBinder::ConstructTxReceiver(
131     grpc_core::RefCountedPtr<WireReader> wire_reader_ref,
132     TransactionReceiver::OnTransactCb cb) const {
133   return std::make_unique<FakeTransactionReceiver>(wire_reader_ref, cb);
134 }
135 
GetRawBinder()136 void* FakeTransactionReceiver::GetRawBinder() {
137   return persistent_tx_receiver_->tunnel_->GetSendEndpoint();
138 }
139 
GetSender() const140 std::unique_ptr<Binder> FakeTransactionReceiver::GetSender() const {
141   return std::make_unique<FakeBinder>(
142       persistent_tx_receiver_->tunnel_->GetSendEndpoint());
143 }
144 
PersistentFakeTransactionReceiver(grpc_core::RefCountedPtr<WireReader> wire_reader_ref,TransactionReceiver::OnTransactCb cb,std::unique_ptr<FakeBinderTunnel> tunnel)145 PersistentFakeTransactionReceiver::PersistentFakeTransactionReceiver(
146     grpc_core::RefCountedPtr<WireReader> wire_reader_ref,
147     TransactionReceiver::OnTransactCb cb,
148     std::unique_ptr<FakeBinderTunnel> tunnel)
149     : wire_reader_ref_(std::move(wire_reader_ref)),
150       callback_(std::move(cb)),
151       tunnel_(std::move(tunnel)) {
152   FakeEndpoint* recv_endpoint = tunnel_->GetRecvEndpoint();
153   recv_endpoint->owner = this;
154 }
155 
TransactionProcessor(absl::Duration delay)156 TransactionProcessor::TransactionProcessor(absl::Duration delay)
157     : delay_nsec_(absl::ToInt64Nanoseconds(delay)),
158       tx_thread_(
159           "process-thread",
160           [](void* arg) {
161             grpc_core::ExecCtx exec_ctx;
162             auto* self = static_cast<TransactionProcessor*>(arg);
163             self->ProcessLoop();
164           },
165           this),
166       terminated_(false) {
167   tx_thread_.Start();
168 }
169 
SetDelay(absl::Duration delay)170 void TransactionProcessor::SetDelay(absl::Duration delay) {
171   delay_nsec_ = absl::ToInt64Nanoseconds(delay);
172 }
173 
Terminate()174 void TransactionProcessor::Terminate() {
175   if (!terminated_.load(std::memory_order_seq_cst)) {
176     gpr_log(GPR_INFO, "Terminating the processor");
177     terminated_.store(true, std::memory_order_seq_cst);
178     tx_thread_.Join();
179     gpr_log(GPR_INFO, "Processor terminated");
180   }
181 }
182 
WaitForNextTransaction()183 void TransactionProcessor::WaitForNextTransaction() {
184   absl::Time now = absl::Now();
185   if (now < deliver_time_) {
186     absl::Duration diff = deliver_time_ - now;
187     // Release the lock before going to sleep.
188     mu_.Unlock();
189     absl::SleepFor(diff);
190     mu_.Lock();
191   }
192 }
193 
Flush()194 void TransactionProcessor::Flush() {
195   while (true) {
196     FakeEndpoint* target = nullptr;
197     BinderTransportTxCode tx_code{};
198     FakeData data;
199     mu_.Lock();
200     if (tx_queue_.empty()) {
201       mu_.Unlock();
202       break;
203     }
204     WaitForNextTransaction();
205     std::tie(target, tx_code, data) = std::move(tx_queue_.front());
206     tx_queue_.pop();
207     if (!tx_queue_.empty()) {
208       deliver_time_ = absl::Now() + GetRandomDelay();
209     }
210     mu_.Unlock();
211     auto* tx_receiver =
212         static_cast<PersistentFakeTransactionReceiver*>(target->owner);
213     auto parcel = std::make_unique<FakeReadableParcel>(std::move(data));
214     tx_receiver->Receive(tx_code, parcel.get()).IgnoreError();
215   }
216 }
217 
ProcessLoop()218 void TransactionProcessor::ProcessLoop() {
219   while (!terminated_.load(std::memory_order_seq_cst)) {
220     FakeEndpoint* target = nullptr;
221     BinderTransportTxCode tx_code{};
222     FakeData data;
223     mu_.Lock();
224     if (tx_queue_.empty()) {
225       mu_.Unlock();
226       continue;
227     }
228     WaitForNextTransaction();
229     std::tie(target, tx_code, data) = std::move(tx_queue_.front());
230     tx_queue_.pop();
231     if (!tx_queue_.empty()) {
232       deliver_time_ = absl::Now() + GetRandomDelay();
233     }
234     mu_.Unlock();
235     auto* tx_receiver =
236         static_cast<PersistentFakeTransactionReceiver*>(target->owner);
237     auto parcel = std::make_unique<FakeReadableParcel>(std::move(data));
238     tx_receiver->Receive(tx_code, parcel.get()).IgnoreError();
239     grpc_core::ExecCtx::Get()->Flush();
240   }
241   Flush();
242 }
243 
GetRandomDelay()244 absl::Duration TransactionProcessor::GetRandomDelay() {
245   int64_t delay =
246       absl::Uniform<int64_t>(bit_gen_, delay_nsec_ / 2, delay_nsec_);
247   return absl::Nanoseconds(delay);
248 }
249 
EnQueueTransaction(FakeEndpoint * target,BinderTransportTxCode tx_code,FakeData data)250 void TransactionProcessor::EnQueueTransaction(FakeEndpoint* target,
251                                               BinderTransportTxCode tx_code,
252                                               FakeData data) {
253   grpc_core::MutexLock lock(&mu_);
254   if (tx_queue_.empty()) {
255     // This is the first transaction in the queue. Compute its deliver time.
256     deliver_time_ = absl::Now() + GetRandomDelay();
257   }
258   tx_queue_.emplace(target, tx_code, std::move(data));
259 }
260 
FakeBinderTunnel()261 FakeBinderTunnel::FakeBinderTunnel()
262     : send_endpoint_(std::make_unique<FakeEndpoint>(this)),
263       recv_endpoint_(std::make_unique<FakeEndpoint>(this)) {
264   send_endpoint_->other_end = recv_endpoint_.get();
265   recv_endpoint_->other_end = send_endpoint_.get();
266 }
267 
268 std::pair<std::unique_ptr<Binder>, std::unique_ptr<TransactionReceiver>>
NewBinderPair(TransactionReceiver::OnTransactCb transact_cb)269 NewBinderPair(TransactionReceiver::OnTransactCb transact_cb) {
270   auto tx_receiver = std::make_unique<FakeTransactionReceiver>(
271       nullptr, std::move(transact_cb));
272   std::unique_ptr<Binder> sender = tx_receiver->GetSender();
273   return std::make_pair(std::move(sender), std::move(tx_receiver));
274 }
275 
276 }  // namespace end2end_testing
277 }  // namespace grpc_binder
278