xref: /aosp_15_r20/hardware/interfaces/automotive/vehicle/aidl/impl/3/grpc/GRPCVehicleHardware.cpp (revision 4d7e907c777eeecc4c5bd7cf640a754fac206ff7)
1 /*
2  * Copyright (C) 2023 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <GRPCVehicleHardware.h>
18 
19 #include "ProtoMessageConverter.h"
20 
21 #include <android-base/logging.h>
22 #include <grpc++/grpc++.h>
23 #include <utils/SystemClock.h>
24 
25 #include <cstdlib>
26 #include <mutex>
27 #include <shared_mutex>
28 #include <utility>
29 
30 namespace android::hardware::automotive::vehicle::virtualization {
31 
32 namespace {
33 
34 constexpr size_t MAX_RETRY_COUNT = 5;
35 
getChannelCredentials()36 std::shared_ptr<::grpc::ChannelCredentials> getChannelCredentials() {
37     return ::grpc::InsecureChannelCredentials();
38 }
39 
40 }  // namespace
41 
GRPCVehicleHardware(std::string service_addr)42 GRPCVehicleHardware::GRPCVehicleHardware(std::string service_addr)
43     : mServiceAddr(std::move(service_addr)),
44       mGrpcChannel(::grpc::CreateChannel(mServiceAddr, getChannelCredentials())),
45       mGrpcStub(proto::VehicleServer::NewStub(mGrpcChannel)),
46       mValuePollingThread([this] { ValuePollingLoop(); }) {}
47 
48 // Only used for unit testing.
GRPCVehicleHardware(std::unique_ptr<proto::VehicleServer::StubInterface> stub,bool startValuePollingLoop)49 GRPCVehicleHardware::GRPCVehicleHardware(std::unique_ptr<proto::VehicleServer::StubInterface> stub,
50                                          bool startValuePollingLoop)
51     : mServiceAddr(""), mGrpcChannel(nullptr), mGrpcStub(std::move(stub)) {
52     if (startValuePollingLoop) {
53         mValuePollingThread = std::thread([this] { ValuePollingLoop(); });
54     }
55 }
56 
~GRPCVehicleHardware()57 GRPCVehicleHardware::~GRPCVehicleHardware() {
58     {
59         std::lock_guard lck(mShutdownMutex);
60         mShuttingDownFlag.store(true);
61     }
62     mShutdownCV.notify_all();
63     if (mValuePollingThread.joinable()) {
64         mValuePollingThread.join();
65     }
66 }
67 
getAllPropertyConfigs() const68 std::vector<aidlvhal::VehiclePropConfig> GRPCVehicleHardware::getAllPropertyConfigs() const {
69     std::vector<aidlvhal::VehiclePropConfig> configs;
70     ::grpc::ClientContext context;
71     auto config_stream = mGrpcStub->GetAllPropertyConfig(&context, ::google::protobuf::Empty());
72     proto::VehiclePropConfig protoConfig;
73     while (config_stream->Read(&protoConfig)) {
74         aidlvhal::VehiclePropConfig config;
75         proto_msg_converter::protoToAidl(protoConfig, &config);
76         configs.push_back(std::move(config));
77     }
78     auto grpc_status = config_stream->Finish();
79     if (!grpc_status.ok()) {
80         LOG(ERROR) << __func__
81                    << ": GRPC GetAllPropertyConfig Failed: " << grpc_status.error_message();
82     }
83     return configs;
84 }
85 
getPropertyConfig(int32_t propId) const86 std::optional<aidlvhal::VehiclePropConfig> GRPCVehicleHardware::getPropertyConfig(
87         int32_t propId) const {
88     // TODO(b/354055835): Use GRPC call to get one config instead of getting all the configs.
89     for (const auto& config : getAllPropertyConfigs()) {
90         if (config.prop == propId) {
91             return config;
92         }
93     }
94     return std::nullopt;
95 }
96 
setValues(std::shared_ptr<const SetValuesCallback> callback,const std::vector<aidlvhal::SetValueRequest> & requests)97 aidlvhal::StatusCode GRPCVehicleHardware::setValues(
98         std::shared_ptr<const SetValuesCallback> callback,
99         const std::vector<aidlvhal::SetValueRequest>& requests) {
100     ::grpc::ClientContext context;
101     proto::VehiclePropValueRequests protoRequests;
102     proto::SetValueResults protoResults;
103     for (const auto& request : requests) {
104         auto& protoRequest = *protoRequests.add_requests();
105         protoRequest.set_request_id(request.requestId);
106         proto_msg_converter::aidlToProto(request.value, protoRequest.mutable_value());
107     }
108     // TODO(chenhaosjtuacm): Make it Async.
109     auto grpc_status = mGrpcStub->SetValues(&context, protoRequests, &protoResults);
110     if (!grpc_status.ok()) {
111         LOG(ERROR) << __func__ << ": GRPC SetValues Failed: " << grpc_status.error_message();
112         {
113             std::shared_lock lck(mCallbackMutex);
114             // TODO(chenhaosjtuacm): call on-set-error callback.
115         }
116         return aidlvhal::StatusCode::INTERNAL_ERROR;
117     }
118     std::vector<aidlvhal::SetValueResult> results;
119     for (const auto& protoResult : protoResults.results()) {
120         auto& result = results.emplace_back();
121         result.requestId = protoResult.request_id();
122         result.status = static_cast<aidlvhal::StatusCode>(protoResult.status());
123         // TODO(chenhaosjtuacm): call on-set-error callback.
124     }
125     (*callback)(std::move(results));
126 
127     return aidlvhal::StatusCode::OK;
128 }
129 
getValues(std::shared_ptr<const GetValuesCallback> callback,const std::vector<aidlvhal::GetValueRequest> & requests) const130 aidlvhal::StatusCode GRPCVehicleHardware::getValues(
131         std::shared_ptr<const GetValuesCallback> callback,
132         const std::vector<aidlvhal::GetValueRequest>& requests) const {
133     std::vector<aidlvhal::GetValueResult> results;
134     auto status = getValuesWithRetry(requests, &results, /*retryCount=*/0);
135     if (status != aidlvhal::StatusCode::OK) {
136         return status;
137     }
138     if (!results.empty()) {
139         (*callback)(std::move(results));
140     }
141     return status;
142 }
143 
getValuesWithRetry(const std::vector<aidlvhal::GetValueRequest> & requests,std::vector<aidlvhal::GetValueResult> * results,size_t retryCount) const144 aidlvhal::StatusCode GRPCVehicleHardware::getValuesWithRetry(
145         const std::vector<aidlvhal::GetValueRequest>& requests,
146         std::vector<aidlvhal::GetValueResult>* results, size_t retryCount) const {
147     if (retryCount == MAX_RETRY_COUNT) {
148         LOG(ERROR) << __func__ << ": GRPC GetValues Failed, failed to get the latest value after "
149                    << retryCount << " retries";
150         return aidlvhal::StatusCode::TRY_AGAIN;
151     }
152 
153     proto::VehiclePropValueRequests protoRequests;
154     std::unordered_map<int64_t, const aidlvhal::GetValueRequest*> requestById;
155     for (const auto& request : requests) {
156         auto& protoRequest = *protoRequests.add_requests();
157         protoRequest.set_request_id(request.requestId);
158         proto_msg_converter::aidlToProto(request.prop, protoRequest.mutable_value());
159         requestById[request.requestId] = &request;
160     }
161 
162     // TODO(chenhaosjtuacm): Make it Async.
163     ::grpc::ClientContext context;
164     proto::GetValueResults protoResults;
165     auto grpc_status = mGrpcStub->GetValues(&context, protoRequests, &protoResults);
166     if (!grpc_status.ok()) {
167         LOG(ERROR) << __func__ << ": GRPC GetValues Failed: " << grpc_status.error_message();
168         return aidlvhal::StatusCode::INTERNAL_ERROR;
169     }
170 
171     std::vector<aidlvhal::GetValueRequest> retryRequests;
172     for (const auto& protoResult : protoResults.results()) {
173         int64_t requestId = protoResult.request_id();
174         auto it = requestById.find(requestId);
175         if (it == requestById.end()) {
176             LOG(ERROR) << __func__
177                        << "Invalid getValue request with unknown request ID: " << requestId
178                        << ", ignore";
179             continue;
180         }
181 
182         if (!protoResult.has_value()) {
183             auto& result = results->emplace_back();
184             result.requestId = requestId;
185             result.status = static_cast<aidlvhal::StatusCode>(protoResult.status());
186             continue;
187         }
188 
189         aidlvhal::VehiclePropValue value;
190         proto_msg_converter::protoToAidl(protoResult.value(), &value);
191 
192         // VHAL proxy server uses a different timestamp then AAOS timestamp, so we have to reset
193         // the timestamp.
194         // TODO(b/350822044): Remove this once we use timestamp from proxy server.
195         if (!setAndroidTimestamp(&value)) {
196             // This is a rare case when we receive a property update event reflecting a new value
197             // for the property before we receive the get value result. This means that the result
198             // is already outdated, hence we should retry getting the latest value again.
199             LOG(WARNING) << __func__ << "getValue result for propId: " << value.prop
200                          << " areaId: " << value.areaId << " is oudated, retry";
201             retryRequests.push_back(*(it->second));
202             continue;
203         }
204 
205         auto& result = results->emplace_back();
206         result.requestId = requestId;
207         result.status = static_cast<aidlvhal::StatusCode>(protoResult.status());
208         result.prop = std::move(value);
209     }
210 
211     if (retryRequests.size() != 0) {
212         return getValuesWithRetry(retryRequests, results, retryCount++);
213     }
214 
215     return aidlvhal::StatusCode::OK;
216 }
217 
setAndroidTimestamp(aidlvhal::VehiclePropValue * propValue) const218 bool GRPCVehicleHardware::setAndroidTimestamp(aidlvhal::VehiclePropValue* propValue) const {
219     PropIdAreaId propIdAreaId = {
220             .propId = propValue->prop,
221             .areaId = propValue->areaId,
222     };
223     int64_t now = elapsedRealtimeNano();
224     int64_t externalTimestamp = propValue->timestamp;
225 
226     {
227         std::lock_guard lck(mLatestUpdateTimestampsMutex);
228         auto it = mLatestUpdateTimestamps.find(propIdAreaId);
229         if (it == mLatestUpdateTimestamps.end() || externalTimestamp > (it->second).first) {
230             mLatestUpdateTimestamps[propIdAreaId].first = externalTimestamp;
231             mLatestUpdateTimestamps[propIdAreaId].second = now;
232             propValue->timestamp = now;
233             return true;
234         }
235         if (externalTimestamp == (it->second).first) {
236             propValue->timestamp = (it->second).second;
237             return true;
238         }
239     }
240     // externalTimestamp < (it->second).first, the value is outdated.
241     return false;
242 }
243 
registerOnPropertyChangeEvent(std::unique_ptr<const PropertyChangeCallback> callback)244 void GRPCVehicleHardware::registerOnPropertyChangeEvent(
245         std::unique_ptr<const PropertyChangeCallback> callback) {
246     std::lock_guard lck(mCallbackMutex);
247     if (mOnPropChange) {
248         LOG(ERROR) << __func__ << " must only be called once.";
249         return;
250     }
251     mOnPropChange = std::move(callback);
252 }
253 
registerOnPropertySetErrorEvent(std::unique_ptr<const PropertySetErrorCallback> callback)254 void GRPCVehicleHardware::registerOnPropertySetErrorEvent(
255         std::unique_ptr<const PropertySetErrorCallback> callback) {
256     std::lock_guard lck(mCallbackMutex);
257     if (mOnSetErr) {
258         LOG(ERROR) << __func__ << " must only be called once.";
259         return;
260     }
261     mOnSetErr = std::move(callback);
262 }
263 
dump(const std::vector<std::string> & options)264 DumpResult GRPCVehicleHardware::dump(const std::vector<std::string>& options) {
265     ::grpc::ClientContext context;
266     proto::DumpOptions protoDumpOptions;
267     proto::DumpResult protoDumpResult;
268     for (const auto& option : options) {
269         protoDumpOptions.add_options(option);
270     }
271     auto grpc_status = mGrpcStub->Dump(&context, protoDumpOptions, &protoDumpResult);
272     if (!grpc_status.ok()) {
273         LOG(ERROR) << __func__ << ": GRPC Dump Failed: " << grpc_status.error_message();
274         return {};
275     }
276     return {
277             .callerShouldDumpState = protoDumpResult.caller_should_dump_state(),
278             .buffer = protoDumpResult.buffer(),
279             .refreshPropertyConfigs = protoDumpResult.refresh_property_configs(),
280     };
281 }
282 
checkHealth()283 aidlvhal::StatusCode GRPCVehicleHardware::checkHealth() {
284     ::grpc::ClientContext context;
285     proto::VehicleHalCallStatus protoStatus;
286     auto grpc_status = mGrpcStub->CheckHealth(&context, ::google::protobuf::Empty(), &protoStatus);
287     if (!grpc_status.ok()) {
288         LOG(ERROR) << __func__ << ": GRPC CheckHealth Failed: " << grpc_status.error_message();
289         return aidlvhal::StatusCode::INTERNAL_ERROR;
290     }
291     return static_cast<aidlvhal::StatusCode>(protoStatus.status_code());
292 }
293 
subscribe(aidlvhal::SubscribeOptions options)294 aidlvhal::StatusCode GRPCVehicleHardware::subscribe(aidlvhal::SubscribeOptions options) {
295     proto::SubscribeRequest request;
296     ::grpc::ClientContext context;
297     proto::VehicleHalCallStatus protoStatus;
298     proto_msg_converter::aidlToProto(options, request.mutable_options());
299     auto grpc_status = mGrpcStub->Subscribe(&context, request, &protoStatus);
300     if (!grpc_status.ok()) {
301         if (grpc_status.error_code() == ::grpc::StatusCode::UNIMPLEMENTED) {
302             // This is a legacy sever. It should handle updateSampleRate.
303             LOG(INFO) << __func__ << ": GRPC Subscribe is not supported by the server";
304             return aidlvhal::StatusCode::OK;
305         }
306         LOG(ERROR) << __func__ << ": GRPC Subscribe Failed: " << grpc_status.error_message();
307         return aidlvhal::StatusCode::INTERNAL_ERROR;
308     }
309     return static_cast<aidlvhal::StatusCode>(protoStatus.status_code());
310 }
311 
unsubscribe(int32_t propId,int32_t areaId)312 aidlvhal::StatusCode GRPCVehicleHardware::unsubscribe(int32_t propId, int32_t areaId) {
313     proto::UnsubscribeRequest request;
314     ::grpc::ClientContext context;
315     proto::VehicleHalCallStatus protoStatus;
316     request.set_prop_id(propId);
317     request.set_area_id(areaId);
318     auto grpc_status = mGrpcStub->Unsubscribe(&context, request, &protoStatus);
319     if (!grpc_status.ok()) {
320         if (grpc_status.error_code() == ::grpc::StatusCode::UNIMPLEMENTED) {
321             // This is a legacy sever. Ignore unsubscribe request.
322             LOG(INFO) << __func__ << ": GRPC Unsubscribe is not supported by the server";
323             return aidlvhal::StatusCode::OK;
324         }
325         LOG(ERROR) << __func__ << ": GRPC Unsubscribe Failed: " << grpc_status.error_message();
326         return aidlvhal::StatusCode::INTERNAL_ERROR;
327     }
328     return static_cast<aidlvhal::StatusCode>(protoStatus.status_code());
329 }
330 
updateSampleRate(int32_t propId,int32_t areaId,float sampleRate)331 aidlvhal::StatusCode GRPCVehicleHardware::updateSampleRate(int32_t propId, int32_t areaId,
332                                                            float sampleRate) {
333     ::grpc::ClientContext context;
334     proto::UpdateSampleRateRequest request;
335     proto::VehicleHalCallStatus protoStatus;
336     request.set_prop(propId);
337     request.set_area_id(areaId);
338     request.set_sample_rate(sampleRate);
339     auto grpc_status = mGrpcStub->UpdateSampleRate(&context, request, &protoStatus);
340     if (!grpc_status.ok()) {
341         LOG(ERROR) << __func__ << ": GRPC UpdateSampleRate Failed: " << grpc_status.error_message();
342         return aidlvhal::StatusCode::INTERNAL_ERROR;
343     }
344     return static_cast<aidlvhal::StatusCode>(protoStatus.status_code());
345 }
346 
waitForConnected(std::chrono::milliseconds waitTime)347 bool GRPCVehicleHardware::waitForConnected(std::chrono::milliseconds waitTime) {
348     return mGrpcChannel->WaitForConnected(gpr_time_add(
349             gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_millis(waitTime.count(), GPR_TIMESPAN)));
350 }
351 
ValuePollingLoop()352 void GRPCVehicleHardware::ValuePollingLoop() {
353     while (!mShuttingDownFlag.load()) {
354         pollValue();
355         // try to reconnect
356     }
357 }
358 
pollValue()359 void GRPCVehicleHardware::pollValue() {
360     ::grpc::ClientContext context;
361 
362     bool rpc_stopped{false};
363     std::thread shuttingdown_watcher([this, &rpc_stopped, &context]() {
364         std::unique_lock<std::mutex> lck(mShutdownMutex);
365         mShutdownCV.wait(
366                 lck, [this, &rpc_stopped]() { return rpc_stopped || mShuttingDownFlag.load(); });
367         context.TryCancel();
368     });
369 
370     auto value_stream = mGrpcStub->StartPropertyValuesStream(&context, ::google::protobuf::Empty());
371     LOG(INFO) << __func__ << ": GRPC Value Streaming Started";
372     proto::VehiclePropValues protoValues;
373     while (!mShuttingDownFlag.load() && value_stream->Read(&protoValues)) {
374         std::vector<aidlvhal::VehiclePropValue> values;
375         for (const auto protoValue : protoValues.values()) {
376             aidlvhal::VehiclePropValue aidlValue = {};
377             proto_msg_converter::protoToAidl(protoValue, &aidlValue);
378 
379             // VHAL proxy server uses a different timestamp then AAOS timestamp, so we have to
380             // reset the timestamp.
381             // TODO(b/350822044): Remove this once we use timestamp from proxy server.
382             if (!setAndroidTimestamp(&aidlValue)) {
383                 LOG(WARNING) << __func__ << ": property event for propId: " << aidlValue.prop
384                              << " areaId: " << aidlValue.areaId << " is outdated, ignore";
385                 continue;
386             }
387 
388             values.push_back(std::move(aidlValue));
389         }
390         if (values.empty()) {
391             continue;
392         }
393         std::shared_lock lck(mCallbackMutex);
394         if (mOnPropChange) {
395             (*mOnPropChange)(values);
396         }
397     }
398 
399     {
400         std::lock_guard lck(mShutdownMutex);
401         rpc_stopped = true;
402     }
403     mShutdownCV.notify_all();
404     shuttingdown_watcher.join();
405 
406     auto grpc_status = value_stream->Finish();
407     // never reach here until connection lost
408     LOG(ERROR) << __func__ << ": GRPC Value Streaming Failed: " << grpc_status.error_message();
409 }
410 
411 }  // namespace android::hardware::automotive::vehicle::virtualization
412