1 /*
2 * Copyright (C) 2023 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <GRPCVehicleHardware.h>
18
19 #include "ProtoMessageConverter.h"
20
21 #include <android-base/logging.h>
22 #include <grpc++/grpc++.h>
23 #include <utils/SystemClock.h>
24
25 #include <cstdlib>
26 #include <mutex>
27 #include <shared_mutex>
28 #include <utility>
29
30 namespace android::hardware::automotive::vehicle::virtualization {
31
32 namespace {
33
34 constexpr size_t MAX_RETRY_COUNT = 5;
35
getChannelCredentials()36 std::shared_ptr<::grpc::ChannelCredentials> getChannelCredentials() {
37 return ::grpc::InsecureChannelCredentials();
38 }
39
40 } // namespace
41
GRPCVehicleHardware(std::string service_addr)42 GRPCVehicleHardware::GRPCVehicleHardware(std::string service_addr)
43 : mServiceAddr(std::move(service_addr)),
44 mGrpcChannel(::grpc::CreateChannel(mServiceAddr, getChannelCredentials())),
45 mGrpcStub(proto::VehicleServer::NewStub(mGrpcChannel)),
46 mValuePollingThread([this] { ValuePollingLoop(); }) {}
47
48 // Only used for unit testing.
GRPCVehicleHardware(std::unique_ptr<proto::VehicleServer::StubInterface> stub,bool startValuePollingLoop)49 GRPCVehicleHardware::GRPCVehicleHardware(std::unique_ptr<proto::VehicleServer::StubInterface> stub,
50 bool startValuePollingLoop)
51 : mServiceAddr(""), mGrpcChannel(nullptr), mGrpcStub(std::move(stub)) {
52 if (startValuePollingLoop) {
53 mValuePollingThread = std::thread([this] { ValuePollingLoop(); });
54 }
55 }
56
~GRPCVehicleHardware()57 GRPCVehicleHardware::~GRPCVehicleHardware() {
58 {
59 std::lock_guard lck(mShutdownMutex);
60 mShuttingDownFlag.store(true);
61 }
62 mShutdownCV.notify_all();
63 if (mValuePollingThread.joinable()) {
64 mValuePollingThread.join();
65 }
66 }
67
getAllPropertyConfigs() const68 std::vector<aidlvhal::VehiclePropConfig> GRPCVehicleHardware::getAllPropertyConfigs() const {
69 std::vector<aidlvhal::VehiclePropConfig> configs;
70 ::grpc::ClientContext context;
71 auto config_stream = mGrpcStub->GetAllPropertyConfig(&context, ::google::protobuf::Empty());
72 proto::VehiclePropConfig protoConfig;
73 while (config_stream->Read(&protoConfig)) {
74 aidlvhal::VehiclePropConfig config;
75 proto_msg_converter::protoToAidl(protoConfig, &config);
76 configs.push_back(std::move(config));
77 }
78 auto grpc_status = config_stream->Finish();
79 if (!grpc_status.ok()) {
80 LOG(ERROR) << __func__
81 << ": GRPC GetAllPropertyConfig Failed: " << grpc_status.error_message();
82 }
83 return configs;
84 }
85
getPropertyConfig(int32_t propId) const86 std::optional<aidlvhal::VehiclePropConfig> GRPCVehicleHardware::getPropertyConfig(
87 int32_t propId) const {
88 // TODO(b/354055835): Use GRPC call to get one config instead of getting all the configs.
89 for (const auto& config : getAllPropertyConfigs()) {
90 if (config.prop == propId) {
91 return config;
92 }
93 }
94 return std::nullopt;
95 }
96
setValues(std::shared_ptr<const SetValuesCallback> callback,const std::vector<aidlvhal::SetValueRequest> & requests)97 aidlvhal::StatusCode GRPCVehicleHardware::setValues(
98 std::shared_ptr<const SetValuesCallback> callback,
99 const std::vector<aidlvhal::SetValueRequest>& requests) {
100 ::grpc::ClientContext context;
101 proto::VehiclePropValueRequests protoRequests;
102 proto::SetValueResults protoResults;
103 for (const auto& request : requests) {
104 auto& protoRequest = *protoRequests.add_requests();
105 protoRequest.set_request_id(request.requestId);
106 proto_msg_converter::aidlToProto(request.value, protoRequest.mutable_value());
107 }
108 // TODO(chenhaosjtuacm): Make it Async.
109 auto grpc_status = mGrpcStub->SetValues(&context, protoRequests, &protoResults);
110 if (!grpc_status.ok()) {
111 LOG(ERROR) << __func__ << ": GRPC SetValues Failed: " << grpc_status.error_message();
112 {
113 std::shared_lock lck(mCallbackMutex);
114 // TODO(chenhaosjtuacm): call on-set-error callback.
115 }
116 return aidlvhal::StatusCode::INTERNAL_ERROR;
117 }
118 std::vector<aidlvhal::SetValueResult> results;
119 for (const auto& protoResult : protoResults.results()) {
120 auto& result = results.emplace_back();
121 result.requestId = protoResult.request_id();
122 result.status = static_cast<aidlvhal::StatusCode>(protoResult.status());
123 // TODO(chenhaosjtuacm): call on-set-error callback.
124 }
125 (*callback)(std::move(results));
126
127 return aidlvhal::StatusCode::OK;
128 }
129
getValues(std::shared_ptr<const GetValuesCallback> callback,const std::vector<aidlvhal::GetValueRequest> & requests) const130 aidlvhal::StatusCode GRPCVehicleHardware::getValues(
131 std::shared_ptr<const GetValuesCallback> callback,
132 const std::vector<aidlvhal::GetValueRequest>& requests) const {
133 std::vector<aidlvhal::GetValueResult> results;
134 auto status = getValuesWithRetry(requests, &results, /*retryCount=*/0);
135 if (status != aidlvhal::StatusCode::OK) {
136 return status;
137 }
138 if (!results.empty()) {
139 (*callback)(std::move(results));
140 }
141 return status;
142 }
143
getValuesWithRetry(const std::vector<aidlvhal::GetValueRequest> & requests,std::vector<aidlvhal::GetValueResult> * results,size_t retryCount) const144 aidlvhal::StatusCode GRPCVehicleHardware::getValuesWithRetry(
145 const std::vector<aidlvhal::GetValueRequest>& requests,
146 std::vector<aidlvhal::GetValueResult>* results, size_t retryCount) const {
147 if (retryCount == MAX_RETRY_COUNT) {
148 LOG(ERROR) << __func__ << ": GRPC GetValues Failed, failed to get the latest value after "
149 << retryCount << " retries";
150 return aidlvhal::StatusCode::TRY_AGAIN;
151 }
152
153 proto::VehiclePropValueRequests protoRequests;
154 std::unordered_map<int64_t, const aidlvhal::GetValueRequest*> requestById;
155 for (const auto& request : requests) {
156 auto& protoRequest = *protoRequests.add_requests();
157 protoRequest.set_request_id(request.requestId);
158 proto_msg_converter::aidlToProto(request.prop, protoRequest.mutable_value());
159 requestById[request.requestId] = &request;
160 }
161
162 // TODO(chenhaosjtuacm): Make it Async.
163 ::grpc::ClientContext context;
164 proto::GetValueResults protoResults;
165 auto grpc_status = mGrpcStub->GetValues(&context, protoRequests, &protoResults);
166 if (!grpc_status.ok()) {
167 LOG(ERROR) << __func__ << ": GRPC GetValues Failed: " << grpc_status.error_message();
168 return aidlvhal::StatusCode::INTERNAL_ERROR;
169 }
170
171 std::vector<aidlvhal::GetValueRequest> retryRequests;
172 for (const auto& protoResult : protoResults.results()) {
173 int64_t requestId = protoResult.request_id();
174 auto it = requestById.find(requestId);
175 if (it == requestById.end()) {
176 LOG(ERROR) << __func__
177 << "Invalid getValue request with unknown request ID: " << requestId
178 << ", ignore";
179 continue;
180 }
181
182 if (!protoResult.has_value()) {
183 auto& result = results->emplace_back();
184 result.requestId = requestId;
185 result.status = static_cast<aidlvhal::StatusCode>(protoResult.status());
186 continue;
187 }
188
189 aidlvhal::VehiclePropValue value;
190 proto_msg_converter::protoToAidl(protoResult.value(), &value);
191
192 // VHAL proxy server uses a different timestamp then AAOS timestamp, so we have to reset
193 // the timestamp.
194 // TODO(b/350822044): Remove this once we use timestamp from proxy server.
195 if (!setAndroidTimestamp(&value)) {
196 // This is a rare case when we receive a property update event reflecting a new value
197 // for the property before we receive the get value result. This means that the result
198 // is already outdated, hence we should retry getting the latest value again.
199 LOG(WARNING) << __func__ << "getValue result for propId: " << value.prop
200 << " areaId: " << value.areaId << " is oudated, retry";
201 retryRequests.push_back(*(it->second));
202 continue;
203 }
204
205 auto& result = results->emplace_back();
206 result.requestId = requestId;
207 result.status = static_cast<aidlvhal::StatusCode>(protoResult.status());
208 result.prop = std::move(value);
209 }
210
211 if (retryRequests.size() != 0) {
212 return getValuesWithRetry(retryRequests, results, retryCount++);
213 }
214
215 return aidlvhal::StatusCode::OK;
216 }
217
setAndroidTimestamp(aidlvhal::VehiclePropValue * propValue) const218 bool GRPCVehicleHardware::setAndroidTimestamp(aidlvhal::VehiclePropValue* propValue) const {
219 PropIdAreaId propIdAreaId = {
220 .propId = propValue->prop,
221 .areaId = propValue->areaId,
222 };
223 int64_t now = elapsedRealtimeNano();
224 int64_t externalTimestamp = propValue->timestamp;
225
226 {
227 std::lock_guard lck(mLatestUpdateTimestampsMutex);
228 auto it = mLatestUpdateTimestamps.find(propIdAreaId);
229 if (it == mLatestUpdateTimestamps.end() || externalTimestamp > (it->second).first) {
230 mLatestUpdateTimestamps[propIdAreaId].first = externalTimestamp;
231 mLatestUpdateTimestamps[propIdAreaId].second = now;
232 propValue->timestamp = now;
233 return true;
234 }
235 if (externalTimestamp == (it->second).first) {
236 propValue->timestamp = (it->second).second;
237 return true;
238 }
239 }
240 // externalTimestamp < (it->second).first, the value is outdated.
241 return false;
242 }
243
registerOnPropertyChangeEvent(std::unique_ptr<const PropertyChangeCallback> callback)244 void GRPCVehicleHardware::registerOnPropertyChangeEvent(
245 std::unique_ptr<const PropertyChangeCallback> callback) {
246 std::lock_guard lck(mCallbackMutex);
247 if (mOnPropChange) {
248 LOG(ERROR) << __func__ << " must only be called once.";
249 return;
250 }
251 mOnPropChange = std::move(callback);
252 }
253
registerOnPropertySetErrorEvent(std::unique_ptr<const PropertySetErrorCallback> callback)254 void GRPCVehicleHardware::registerOnPropertySetErrorEvent(
255 std::unique_ptr<const PropertySetErrorCallback> callback) {
256 std::lock_guard lck(mCallbackMutex);
257 if (mOnSetErr) {
258 LOG(ERROR) << __func__ << " must only be called once.";
259 return;
260 }
261 mOnSetErr = std::move(callback);
262 }
263
dump(const std::vector<std::string> & options)264 DumpResult GRPCVehicleHardware::dump(const std::vector<std::string>& options) {
265 ::grpc::ClientContext context;
266 proto::DumpOptions protoDumpOptions;
267 proto::DumpResult protoDumpResult;
268 for (const auto& option : options) {
269 protoDumpOptions.add_options(option);
270 }
271 auto grpc_status = mGrpcStub->Dump(&context, protoDumpOptions, &protoDumpResult);
272 if (!grpc_status.ok()) {
273 LOG(ERROR) << __func__ << ": GRPC Dump Failed: " << grpc_status.error_message();
274 return {};
275 }
276 return {
277 .callerShouldDumpState = protoDumpResult.caller_should_dump_state(),
278 .buffer = protoDumpResult.buffer(),
279 .refreshPropertyConfigs = protoDumpResult.refresh_property_configs(),
280 };
281 }
282
checkHealth()283 aidlvhal::StatusCode GRPCVehicleHardware::checkHealth() {
284 ::grpc::ClientContext context;
285 proto::VehicleHalCallStatus protoStatus;
286 auto grpc_status = mGrpcStub->CheckHealth(&context, ::google::protobuf::Empty(), &protoStatus);
287 if (!grpc_status.ok()) {
288 LOG(ERROR) << __func__ << ": GRPC CheckHealth Failed: " << grpc_status.error_message();
289 return aidlvhal::StatusCode::INTERNAL_ERROR;
290 }
291 return static_cast<aidlvhal::StatusCode>(protoStatus.status_code());
292 }
293
subscribe(aidlvhal::SubscribeOptions options)294 aidlvhal::StatusCode GRPCVehicleHardware::subscribe(aidlvhal::SubscribeOptions options) {
295 proto::SubscribeRequest request;
296 ::grpc::ClientContext context;
297 proto::VehicleHalCallStatus protoStatus;
298 proto_msg_converter::aidlToProto(options, request.mutable_options());
299 auto grpc_status = mGrpcStub->Subscribe(&context, request, &protoStatus);
300 if (!grpc_status.ok()) {
301 if (grpc_status.error_code() == ::grpc::StatusCode::UNIMPLEMENTED) {
302 // This is a legacy sever. It should handle updateSampleRate.
303 LOG(INFO) << __func__ << ": GRPC Subscribe is not supported by the server";
304 return aidlvhal::StatusCode::OK;
305 }
306 LOG(ERROR) << __func__ << ": GRPC Subscribe Failed: " << grpc_status.error_message();
307 return aidlvhal::StatusCode::INTERNAL_ERROR;
308 }
309 return static_cast<aidlvhal::StatusCode>(protoStatus.status_code());
310 }
311
unsubscribe(int32_t propId,int32_t areaId)312 aidlvhal::StatusCode GRPCVehicleHardware::unsubscribe(int32_t propId, int32_t areaId) {
313 proto::UnsubscribeRequest request;
314 ::grpc::ClientContext context;
315 proto::VehicleHalCallStatus protoStatus;
316 request.set_prop_id(propId);
317 request.set_area_id(areaId);
318 auto grpc_status = mGrpcStub->Unsubscribe(&context, request, &protoStatus);
319 if (!grpc_status.ok()) {
320 if (grpc_status.error_code() == ::grpc::StatusCode::UNIMPLEMENTED) {
321 // This is a legacy sever. Ignore unsubscribe request.
322 LOG(INFO) << __func__ << ": GRPC Unsubscribe is not supported by the server";
323 return aidlvhal::StatusCode::OK;
324 }
325 LOG(ERROR) << __func__ << ": GRPC Unsubscribe Failed: " << grpc_status.error_message();
326 return aidlvhal::StatusCode::INTERNAL_ERROR;
327 }
328 return static_cast<aidlvhal::StatusCode>(protoStatus.status_code());
329 }
330
updateSampleRate(int32_t propId,int32_t areaId,float sampleRate)331 aidlvhal::StatusCode GRPCVehicleHardware::updateSampleRate(int32_t propId, int32_t areaId,
332 float sampleRate) {
333 ::grpc::ClientContext context;
334 proto::UpdateSampleRateRequest request;
335 proto::VehicleHalCallStatus protoStatus;
336 request.set_prop(propId);
337 request.set_area_id(areaId);
338 request.set_sample_rate(sampleRate);
339 auto grpc_status = mGrpcStub->UpdateSampleRate(&context, request, &protoStatus);
340 if (!grpc_status.ok()) {
341 LOG(ERROR) << __func__ << ": GRPC UpdateSampleRate Failed: " << grpc_status.error_message();
342 return aidlvhal::StatusCode::INTERNAL_ERROR;
343 }
344 return static_cast<aidlvhal::StatusCode>(protoStatus.status_code());
345 }
346
waitForConnected(std::chrono::milliseconds waitTime)347 bool GRPCVehicleHardware::waitForConnected(std::chrono::milliseconds waitTime) {
348 return mGrpcChannel->WaitForConnected(gpr_time_add(
349 gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_millis(waitTime.count(), GPR_TIMESPAN)));
350 }
351
ValuePollingLoop()352 void GRPCVehicleHardware::ValuePollingLoop() {
353 while (!mShuttingDownFlag.load()) {
354 pollValue();
355 // try to reconnect
356 }
357 }
358
pollValue()359 void GRPCVehicleHardware::pollValue() {
360 ::grpc::ClientContext context;
361
362 bool rpc_stopped{false};
363 std::thread shuttingdown_watcher([this, &rpc_stopped, &context]() {
364 std::unique_lock<std::mutex> lck(mShutdownMutex);
365 mShutdownCV.wait(
366 lck, [this, &rpc_stopped]() { return rpc_stopped || mShuttingDownFlag.load(); });
367 context.TryCancel();
368 });
369
370 auto value_stream = mGrpcStub->StartPropertyValuesStream(&context, ::google::protobuf::Empty());
371 LOG(INFO) << __func__ << ": GRPC Value Streaming Started";
372 proto::VehiclePropValues protoValues;
373 while (!mShuttingDownFlag.load() && value_stream->Read(&protoValues)) {
374 std::vector<aidlvhal::VehiclePropValue> values;
375 for (const auto protoValue : protoValues.values()) {
376 aidlvhal::VehiclePropValue aidlValue = {};
377 proto_msg_converter::protoToAidl(protoValue, &aidlValue);
378
379 // VHAL proxy server uses a different timestamp then AAOS timestamp, so we have to
380 // reset the timestamp.
381 // TODO(b/350822044): Remove this once we use timestamp from proxy server.
382 if (!setAndroidTimestamp(&aidlValue)) {
383 LOG(WARNING) << __func__ << ": property event for propId: " << aidlValue.prop
384 << " areaId: " << aidlValue.areaId << " is outdated, ignore";
385 continue;
386 }
387
388 values.push_back(std::move(aidlValue));
389 }
390 if (values.empty()) {
391 continue;
392 }
393 std::shared_lock lck(mCallbackMutex);
394 if (mOnPropChange) {
395 (*mOnPropChange)(values);
396 }
397 }
398
399 {
400 std::lock_guard lck(mShutdownMutex);
401 rpc_stopped = true;
402 }
403 mShutdownCV.notify_all();
404 shuttingdown_watcher.join();
405
406 auto grpc_status = value_stream->Finish();
407 // never reach here until connection lost
408 LOG(ERROR) << __func__ << ": GRPC Value Streaming Failed: " << grpc_status.error_message();
409 }
410
411 } // namespace android::hardware::automotive::vehicle::virtualization
412