1 /*
2 * Copyright (C) 2023 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "GRPCVehicleProxyServer.h"
18
19 #include "ProtoMessageConverter.h"
20
21 #include <grpc++/grpc++.h>
22
23 #include <android-base/logging.h>
24
25 #include <algorithm>
26 #include <condition_variable>
27 #include <mutex>
28 #include <unordered_set>
29 #include <utility>
30 #include <vector>
31
32 namespace android::hardware::automotive::vehicle::virtualization {
33
34 std::atomic<uint64_t> GrpcVehicleProxyServer::ConnectionDescriptor::connection_id_counter_{0};
35
getServerCredentials()36 static std::shared_ptr<::grpc::ServerCredentials> getServerCredentials() {
37 // TODO(chenhaosjtuacm): get secured credentials here
38 return ::grpc::InsecureServerCredentials();
39 }
40
GrpcVehicleProxyServer(std::string serverAddr,std::unique_ptr<IVehicleHardware> && hardware)41 GrpcVehicleProxyServer::GrpcVehicleProxyServer(std::string serverAddr,
42 std::unique_ptr<IVehicleHardware>&& hardware)
43 : GrpcVehicleProxyServer(std::vector<std::string>({serverAddr}), std::move(hardware)) {};
44
GrpcVehicleProxyServer(std::vector<std::string> serverAddrs,std::unique_ptr<IVehicleHardware> && hardware)45 GrpcVehicleProxyServer::GrpcVehicleProxyServer(std::vector<std::string> serverAddrs,
46 std::unique_ptr<IVehicleHardware>&& hardware)
47 : mServiceAddrs(std::move(serverAddrs)), mHardware(std::move(hardware)) {
48 mHardware->registerOnPropertyChangeEvent(
49 std::make_unique<const IVehicleHardware::PropertyChangeCallback>(
50 [this](std::vector<aidlvhal::VehiclePropValue> values) {
51 OnVehiclePropChange(values);
52 }));
53 }
54
GetAllPropertyConfig(::grpc::ServerContext * context,const::google::protobuf::Empty * request,::grpc::ServerWriter<proto::VehiclePropConfig> * stream)55 ::grpc::Status GrpcVehicleProxyServer::GetAllPropertyConfig(
56 ::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
57 ::grpc::ServerWriter<proto::VehiclePropConfig>* stream) {
58 for (const auto& config : mHardware->getAllPropertyConfigs()) {
59 proto::VehiclePropConfig protoConfig;
60 proto_msg_converter::aidlToProto(config, &protoConfig);
61 if (!stream->Write(protoConfig)) {
62 return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost.");
63 }
64 }
65 return ::grpc::Status::OK;
66 }
67
SetValues(::grpc::ServerContext * context,const proto::VehiclePropValueRequests * requests,proto::SetValueResults * results)68 ::grpc::Status GrpcVehicleProxyServer::SetValues(::grpc::ServerContext* context,
69 const proto::VehiclePropValueRequests* requests,
70 proto::SetValueResults* results) {
71 std::vector<aidlvhal::SetValueRequest> aidlRequests;
72 std::unordered_set<int64_t> requestIds;
73 for (const auto& protoRequest : requests->requests()) {
74 auto& aidlRequest = aidlRequests.emplace_back();
75 int64_t requestId = protoRequest.request_id();
76 aidlRequest.requestId = requestId;
77 proto_msg_converter::protoToAidl(protoRequest.value(), &aidlRequest.value);
78 requestIds.insert(requestId);
79 }
80 auto waitMtx = std::make_shared<std::mutex>();
81 auto waitCV = std::make_shared<std::condition_variable>();
82 auto complete = std::make_shared<bool>(false);
83 auto tmpResults = std::make_shared<proto::SetValueResults>();
84 auto aidlStatus = mHardware->setValues(
85 std::make_shared<const IVehicleHardware::SetValuesCallback>(
86 [waitMtx, waitCV, complete, tmpResults,
87 &requestIds](std::vector<aidlvhal::SetValueResult> setValueResults) {
88 bool receivedAllResults = false;
89 {
90 std::lock_guard lck(*waitMtx);
91 for (const auto& aidlResult : setValueResults) {
92 auto& protoResult = *tmpResults->add_results();
93 int64_t requestIdForResult = aidlResult.requestId;
94 protoResult.set_request_id(requestIdForResult);
95 protoResult.set_status(
96 static_cast<proto::StatusCode>(aidlResult.status));
97 requestIds.erase(requestIdForResult);
98 }
99 if (requestIds.empty()) {
100 receivedAllResults = true;
101 *complete = true;
102 }
103 }
104 if (receivedAllResults) {
105 waitCV->notify_all();
106 }
107 }),
108 aidlRequests);
109 if (aidlStatus != aidlvhal::StatusCode::OK) {
110 return ::grpc::Status(::grpc::StatusCode::INTERNAL,
111 "The underlying hardware fails to set values, VHAL status: " +
112 toString(aidlStatus));
113 }
114 std::unique_lock lck(*waitMtx);
115 bool success = waitCV->wait_for(lck, kHardwareOpTimeout, [complete] { return *complete; });
116 if (!success) {
117 return ::grpc::Status(::grpc::StatusCode::INTERNAL,
118 "The underlying hardware set values timeout.");
119 }
120 *results = std::move(*tmpResults);
121 return ::grpc::Status::OK;
122 }
123
GetValues(::grpc::ServerContext * context,const proto::VehiclePropValueRequests * requests,proto::GetValueResults * results)124 ::grpc::Status GrpcVehicleProxyServer::GetValues(::grpc::ServerContext* context,
125 const proto::VehiclePropValueRequests* requests,
126 proto::GetValueResults* results) {
127 std::vector<aidlvhal::GetValueRequest> aidlRequests;
128 std::unordered_set<int64_t> requestIds;
129 for (const auto& protoRequest : requests->requests()) {
130 auto& aidlRequest = aidlRequests.emplace_back();
131 int64_t requestId = protoRequest.request_id();
132 aidlRequest.requestId = requestId;
133 proto_msg_converter::protoToAidl(protoRequest.value(), &aidlRequest.prop);
134 requestIds.insert(requestId);
135 }
136 auto waitMtx = std::make_shared<std::mutex>();
137 auto waitCV = std::make_shared<std::condition_variable>();
138 auto complete = std::make_shared<bool>(false);
139 auto tmpResults = std::make_shared<proto::GetValueResults>();
140 auto aidlStatus = mHardware->getValues(
141 std::make_shared<const IVehicleHardware::GetValuesCallback>(
142 [waitMtx, waitCV, complete, tmpResults,
143 &requestIds](std::vector<aidlvhal::GetValueResult> getValueResults) {
144 bool receivedAllResults = false;
145 {
146 std::lock_guard lck(*waitMtx);
147 for (const auto& aidlResult : getValueResults) {
148 auto& protoResult = *tmpResults->add_results();
149 int64_t requestIdForResult = aidlResult.requestId;
150 protoResult.set_request_id(requestIdForResult);
151 protoResult.set_status(
152 static_cast<proto::StatusCode>(aidlResult.status));
153 if (aidlResult.prop) {
154 auto* valuePtr = protoResult.mutable_value();
155 proto_msg_converter::aidlToProto(*aidlResult.prop, valuePtr);
156 }
157 requestIds.erase(requestIdForResult);
158 }
159 if (requestIds.empty()) {
160 receivedAllResults = true;
161 *complete = true;
162 }
163 }
164 if (receivedAllResults) {
165 waitCV->notify_all();
166 }
167 }),
168 aidlRequests);
169 if (aidlStatus != aidlvhal::StatusCode::OK) {
170 return ::grpc::Status(::grpc::StatusCode::INTERNAL,
171 "The underlying hardware fails to get values, VHAL status: " +
172 toString(aidlStatus));
173 }
174 std::unique_lock lck(*waitMtx);
175 bool success = waitCV->wait_for(lck, kHardwareOpTimeout, [complete] { return *complete; });
176 if (!success) {
177 return ::grpc::Status(::grpc::StatusCode::INTERNAL,
178 "The underlying hardware get values timeout.");
179 }
180 *results = std::move(*tmpResults);
181 return ::grpc::Status::OK;
182 }
183
UpdateSampleRate(::grpc::ServerContext * context,const proto::UpdateSampleRateRequest * request,proto::VehicleHalCallStatus * status)184 ::grpc::Status GrpcVehicleProxyServer::UpdateSampleRate(
185 ::grpc::ServerContext* context, const proto::UpdateSampleRateRequest* request,
186 proto::VehicleHalCallStatus* status) {
187 const auto status_code = mHardware->updateSampleRate(request->prop(), request->area_id(),
188 request->sample_rate());
189 status->set_status_code(static_cast<proto::StatusCode>(status_code));
190 return ::grpc::Status::OK;
191 }
192
Subscribe(::grpc::ServerContext * context,const proto::SubscribeRequest * request,proto::VehicleHalCallStatus * status)193 ::grpc::Status GrpcVehicleProxyServer::Subscribe(::grpc::ServerContext* context,
194 const proto::SubscribeRequest* request,
195 proto::VehicleHalCallStatus* status) {
196 const auto& protoSubscribeOptions = request->options();
197 aidlvhal::SubscribeOptions aidlSubscribeOptions = {};
198 proto_msg_converter::protoToAidl(protoSubscribeOptions, &aidlSubscribeOptions);
199 const auto status_code = mHardware->subscribe(aidlSubscribeOptions);
200 status->set_status_code(static_cast<proto::StatusCode>(status_code));
201 return ::grpc::Status::OK;
202 }
203
Unsubscribe(::grpc::ServerContext * context,const proto::UnsubscribeRequest * request,proto::VehicleHalCallStatus * status)204 ::grpc::Status GrpcVehicleProxyServer::Unsubscribe(::grpc::ServerContext* context,
205 const proto::UnsubscribeRequest* request,
206 proto::VehicleHalCallStatus* status) {
207 int32_t propId = request->prop_id();
208 int32_t areaId = request->area_id();
209 const auto status_code = mHardware->unsubscribe(propId, areaId);
210 status->set_status_code(static_cast<proto::StatusCode>(status_code));
211 return ::grpc::Status::OK;
212 }
213
CheckHealth(::grpc::ServerContext * context,const::google::protobuf::Empty *,proto::VehicleHalCallStatus * status)214 ::grpc::Status GrpcVehicleProxyServer::CheckHealth(::grpc::ServerContext* context,
215 const ::google::protobuf::Empty*,
216 proto::VehicleHalCallStatus* status) {
217 status->set_status_code(static_cast<proto::StatusCode>(mHardware->checkHealth()));
218 return ::grpc::Status::OK;
219 }
220
Dump(::grpc::ServerContext * context,const proto::DumpOptions * options,proto::DumpResult * result)221 ::grpc::Status GrpcVehicleProxyServer::Dump(::grpc::ServerContext* context,
222 const proto::DumpOptions* options,
223 proto::DumpResult* result) {
224 std::vector<std::string> dumpOptionStrings(options->options().begin(),
225 options->options().end());
226 auto dumpResult = mHardware->dump(dumpOptionStrings);
227 result->set_caller_should_dump_state(dumpResult.callerShouldDumpState);
228 result->set_buffer(dumpResult.buffer);
229 result->set_refresh_property_configs(dumpResult.refreshPropertyConfigs);
230 return ::grpc::Status::OK;
231 }
232
StartPropertyValuesStream(::grpc::ServerContext * context,const::google::protobuf::Empty * request,::grpc::ServerWriter<proto::VehiclePropValues> * stream)233 ::grpc::Status GrpcVehicleProxyServer::StartPropertyValuesStream(
234 ::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
235 ::grpc::ServerWriter<proto::VehiclePropValues>* stream) {
236 auto conn = std::make_shared<ConnectionDescriptor>(stream);
237 {
238 std::lock_guard lck(mConnectionMutex);
239 mValueStreamingConnections.push_back(conn);
240 }
241 conn->Wait();
242 LOG(ERROR) << __func__ << ": Stream lost, ID : " << conn->ID();
243 return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost.");
244 }
245
OnVehiclePropChange(const std::vector<aidlvhal::VehiclePropValue> & values)246 void GrpcVehicleProxyServer::OnVehiclePropChange(
247 const std::vector<aidlvhal::VehiclePropValue>& values) {
248 std::unordered_set<uint64_t> brokenConn;
249 proto::VehiclePropValues protoValues;
250 for (const auto& value : values) {
251 auto* protoValuePtr = protoValues.add_values();
252 proto_msg_converter::aidlToProto(value, protoValuePtr);
253 }
254 {
255 std::shared_lock read_lock(mConnectionMutex);
256 for (auto& connection : mValueStreamingConnections) {
257 auto writeOK = connection->Write(protoValues);
258 if (!writeOK) {
259 LOG(ERROR) << __func__
260 << ": Server Write failed, connection lost. ID: " << connection->ID();
261 brokenConn.insert(connection->ID());
262 }
263 }
264 }
265 if (brokenConn.empty()) {
266 return;
267 }
268 std::unique_lock write_lock(mConnectionMutex);
269 mValueStreamingConnections.erase(
270 std::remove_if(mValueStreamingConnections.begin(), mValueStreamingConnections.end(),
271 [&brokenConn](const auto& conn) {
272 return brokenConn.find(conn->ID()) != brokenConn.end();
273 }),
274 mValueStreamingConnections.end());
275 }
276
Start()277 GrpcVehicleProxyServer& GrpcVehicleProxyServer::Start() {
278 if (mServer) {
279 LOG(WARNING) << __func__ << ": GrpcVehicleProxyServer has already started.";
280 return *this;
281 }
282 ::grpc::ServerBuilder builder;
283 builder.RegisterService(this);
284 for (const std::string& serviceAddr : mServiceAddrs) {
285 builder.AddListeningPort(serviceAddr, getServerCredentials());
286 }
287 mServer = builder.BuildAndStart();
288 CHECK(mServer) << __func__ << ": failed to create the GRPC server, "
289 << "please make sure the configuration and permissions are correct";
290 return *this;
291 }
292
Shutdown()293 GrpcVehicleProxyServer& GrpcVehicleProxyServer::Shutdown() {
294 std::shared_lock read_lock(mConnectionMutex);
295 for (auto& conn : mValueStreamingConnections) {
296 conn->Shutdown();
297 }
298 if (mServer) {
299 mServer->Shutdown();
300 }
301 return *this;
302 }
303
Wait()304 void GrpcVehicleProxyServer::Wait() {
305 if (mServer) {
306 mServer->Wait();
307 }
308 mServer.reset();
309 }
310
~ConnectionDescriptor()311 GrpcVehicleProxyServer::ConnectionDescriptor::~ConnectionDescriptor() {
312 Shutdown();
313 }
314
Write(const proto::VehiclePropValues & values)315 bool GrpcVehicleProxyServer::ConnectionDescriptor::Write(const proto::VehiclePropValues& values) {
316 if (!mStream) {
317 LOG(ERROR) << __func__ << ": Empty stream. ID: " << ID();
318 Shutdown();
319 return false;
320 }
321 {
322 std::lock_guard lck(*mMtx);
323 if (!mShutdownFlag && mStream->Write(values)) {
324 return true;
325 } else {
326 LOG(ERROR) << __func__ << ": Server Write failed, connection lost. ID: " << ID();
327 }
328 }
329 Shutdown();
330 return false;
331 }
332
Wait()333 void GrpcVehicleProxyServer::ConnectionDescriptor::Wait() {
334 std::unique_lock lck(*mMtx);
335 mCV->wait(lck, [this] { return mShutdownFlag; });
336 }
337
Shutdown()338 void GrpcVehicleProxyServer::ConnectionDescriptor::Shutdown() {
339 {
340 std::lock_guard lck(*mMtx);
341 mShutdownFlag = true;
342 }
343 mCV->notify_all();
344 }
345
346 } // namespace android::hardware::automotive::vehicle::virtualization
347