xref: /aosp_15_r20/hardware/interfaces/automotive/vehicle/aidl/impl/3/grpc/GRPCVehicleProxyServer.cpp (revision 4d7e907c777eeecc4c5bd7cf640a754fac206ff7)
1 /*
2  * Copyright (C) 2023 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "GRPCVehicleProxyServer.h"
18 
19 #include "ProtoMessageConverter.h"
20 
21 #include <grpc++/grpc++.h>
22 
23 #include <android-base/logging.h>
24 
25 #include <algorithm>
26 #include <condition_variable>
27 #include <mutex>
28 #include <unordered_set>
29 #include <utility>
30 #include <vector>
31 
32 namespace android::hardware::automotive::vehicle::virtualization {
33 
34 std::atomic<uint64_t> GrpcVehicleProxyServer::ConnectionDescriptor::connection_id_counter_{0};
35 
getServerCredentials()36 static std::shared_ptr<::grpc::ServerCredentials> getServerCredentials() {
37     // TODO(chenhaosjtuacm): get secured credentials here
38     return ::grpc::InsecureServerCredentials();
39 }
40 
GrpcVehicleProxyServer(std::string serverAddr,std::unique_ptr<IVehicleHardware> && hardware)41 GrpcVehicleProxyServer::GrpcVehicleProxyServer(std::string serverAddr,
42                                                std::unique_ptr<IVehicleHardware>&& hardware)
43     : GrpcVehicleProxyServer(std::vector<std::string>({serverAddr}), std::move(hardware)) {};
44 
GrpcVehicleProxyServer(std::vector<std::string> serverAddrs,std::unique_ptr<IVehicleHardware> && hardware)45 GrpcVehicleProxyServer::GrpcVehicleProxyServer(std::vector<std::string> serverAddrs,
46                                                std::unique_ptr<IVehicleHardware>&& hardware)
47     : mServiceAddrs(std::move(serverAddrs)), mHardware(std::move(hardware)) {
48     mHardware->registerOnPropertyChangeEvent(
49             std::make_unique<const IVehicleHardware::PropertyChangeCallback>(
50                     [this](std::vector<aidlvhal::VehiclePropValue> values) {
51                         OnVehiclePropChange(values);
52                     }));
53 }
54 
GetAllPropertyConfig(::grpc::ServerContext * context,const::google::protobuf::Empty * request,::grpc::ServerWriter<proto::VehiclePropConfig> * stream)55 ::grpc::Status GrpcVehicleProxyServer::GetAllPropertyConfig(
56         ::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
57         ::grpc::ServerWriter<proto::VehiclePropConfig>* stream) {
58     for (const auto& config : mHardware->getAllPropertyConfigs()) {
59         proto::VehiclePropConfig protoConfig;
60         proto_msg_converter::aidlToProto(config, &protoConfig);
61         if (!stream->Write(protoConfig)) {
62             return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost.");
63         }
64     }
65     return ::grpc::Status::OK;
66 }
67 
SetValues(::grpc::ServerContext * context,const proto::VehiclePropValueRequests * requests,proto::SetValueResults * results)68 ::grpc::Status GrpcVehicleProxyServer::SetValues(::grpc::ServerContext* context,
69                                                  const proto::VehiclePropValueRequests* requests,
70                                                  proto::SetValueResults* results) {
71     std::vector<aidlvhal::SetValueRequest> aidlRequests;
72     std::unordered_set<int64_t> requestIds;
73     for (const auto& protoRequest : requests->requests()) {
74         auto& aidlRequest = aidlRequests.emplace_back();
75         int64_t requestId = protoRequest.request_id();
76         aidlRequest.requestId = requestId;
77         proto_msg_converter::protoToAidl(protoRequest.value(), &aidlRequest.value);
78         requestIds.insert(requestId);
79     }
80     auto waitMtx = std::make_shared<std::mutex>();
81     auto waitCV = std::make_shared<std::condition_variable>();
82     auto complete = std::make_shared<bool>(false);
83     auto tmpResults = std::make_shared<proto::SetValueResults>();
84     auto aidlStatus = mHardware->setValues(
85             std::make_shared<const IVehicleHardware::SetValuesCallback>(
86                     [waitMtx, waitCV, complete, tmpResults,
87                      &requestIds](std::vector<aidlvhal::SetValueResult> setValueResults) {
88                         bool receivedAllResults = false;
89                         {
90                             std::lock_guard lck(*waitMtx);
91                             for (const auto& aidlResult : setValueResults) {
92                                 auto& protoResult = *tmpResults->add_results();
93                                 int64_t requestIdForResult = aidlResult.requestId;
94                                 protoResult.set_request_id(requestIdForResult);
95                                 protoResult.set_status(
96                                         static_cast<proto::StatusCode>(aidlResult.status));
97                                 requestIds.erase(requestIdForResult);
98                             }
99                             if (requestIds.empty()) {
100                                 receivedAllResults = true;
101                                 *complete = true;
102                             }
103                         }
104                         if (receivedAllResults) {
105                             waitCV->notify_all();
106                         }
107                     }),
108             aidlRequests);
109     if (aidlStatus != aidlvhal::StatusCode::OK) {
110         return ::grpc::Status(::grpc::StatusCode::INTERNAL,
111                               "The underlying hardware fails to set values, VHAL status: " +
112                                       toString(aidlStatus));
113     }
114     std::unique_lock lck(*waitMtx);
115     bool success = waitCV->wait_for(lck, kHardwareOpTimeout, [complete] { return *complete; });
116     if (!success) {
117         return ::grpc::Status(::grpc::StatusCode::INTERNAL,
118                               "The underlying hardware set values timeout.");
119     }
120     *results = std::move(*tmpResults);
121     return ::grpc::Status::OK;
122 }
123 
GetValues(::grpc::ServerContext * context,const proto::VehiclePropValueRequests * requests,proto::GetValueResults * results)124 ::grpc::Status GrpcVehicleProxyServer::GetValues(::grpc::ServerContext* context,
125                                                  const proto::VehiclePropValueRequests* requests,
126                                                  proto::GetValueResults* results) {
127     std::vector<aidlvhal::GetValueRequest> aidlRequests;
128     std::unordered_set<int64_t> requestIds;
129     for (const auto& protoRequest : requests->requests()) {
130         auto& aidlRequest = aidlRequests.emplace_back();
131         int64_t requestId = protoRequest.request_id();
132         aidlRequest.requestId = requestId;
133         proto_msg_converter::protoToAidl(protoRequest.value(), &aidlRequest.prop);
134         requestIds.insert(requestId);
135     }
136     auto waitMtx = std::make_shared<std::mutex>();
137     auto waitCV = std::make_shared<std::condition_variable>();
138     auto complete = std::make_shared<bool>(false);
139     auto tmpResults = std::make_shared<proto::GetValueResults>();
140     auto aidlStatus = mHardware->getValues(
141             std::make_shared<const IVehicleHardware::GetValuesCallback>(
142                     [waitMtx, waitCV, complete, tmpResults,
143                      &requestIds](std::vector<aidlvhal::GetValueResult> getValueResults) {
144                         bool receivedAllResults = false;
145                         {
146                             std::lock_guard lck(*waitMtx);
147                             for (const auto& aidlResult : getValueResults) {
148                                 auto& protoResult = *tmpResults->add_results();
149                                 int64_t requestIdForResult = aidlResult.requestId;
150                                 protoResult.set_request_id(requestIdForResult);
151                                 protoResult.set_status(
152                                         static_cast<proto::StatusCode>(aidlResult.status));
153                                 if (aidlResult.prop) {
154                                     auto* valuePtr = protoResult.mutable_value();
155                                     proto_msg_converter::aidlToProto(*aidlResult.prop, valuePtr);
156                                 }
157                                 requestIds.erase(requestIdForResult);
158                             }
159                             if (requestIds.empty()) {
160                                 receivedAllResults = true;
161                                 *complete = true;
162                             }
163                         }
164                         if (receivedAllResults) {
165                             waitCV->notify_all();
166                         }
167                     }),
168             aidlRequests);
169     if (aidlStatus != aidlvhal::StatusCode::OK) {
170         return ::grpc::Status(::grpc::StatusCode::INTERNAL,
171                               "The underlying hardware fails to get values, VHAL status: " +
172                                       toString(aidlStatus));
173     }
174     std::unique_lock lck(*waitMtx);
175     bool success = waitCV->wait_for(lck, kHardwareOpTimeout, [complete] { return *complete; });
176     if (!success) {
177         return ::grpc::Status(::grpc::StatusCode::INTERNAL,
178                               "The underlying hardware get values timeout.");
179     }
180     *results = std::move(*tmpResults);
181     return ::grpc::Status::OK;
182 }
183 
UpdateSampleRate(::grpc::ServerContext * context,const proto::UpdateSampleRateRequest * request,proto::VehicleHalCallStatus * status)184 ::grpc::Status GrpcVehicleProxyServer::UpdateSampleRate(
185         ::grpc::ServerContext* context, const proto::UpdateSampleRateRequest* request,
186         proto::VehicleHalCallStatus* status) {
187     const auto status_code = mHardware->updateSampleRate(request->prop(), request->area_id(),
188                                                          request->sample_rate());
189     status->set_status_code(static_cast<proto::StatusCode>(status_code));
190     return ::grpc::Status::OK;
191 }
192 
Subscribe(::grpc::ServerContext * context,const proto::SubscribeRequest * request,proto::VehicleHalCallStatus * status)193 ::grpc::Status GrpcVehicleProxyServer::Subscribe(::grpc::ServerContext* context,
194                                                  const proto::SubscribeRequest* request,
195                                                  proto::VehicleHalCallStatus* status) {
196     const auto& protoSubscribeOptions = request->options();
197     aidlvhal::SubscribeOptions aidlSubscribeOptions = {};
198     proto_msg_converter::protoToAidl(protoSubscribeOptions, &aidlSubscribeOptions);
199     const auto status_code = mHardware->subscribe(aidlSubscribeOptions);
200     status->set_status_code(static_cast<proto::StatusCode>(status_code));
201     return ::grpc::Status::OK;
202 }
203 
Unsubscribe(::grpc::ServerContext * context,const proto::UnsubscribeRequest * request,proto::VehicleHalCallStatus * status)204 ::grpc::Status GrpcVehicleProxyServer::Unsubscribe(::grpc::ServerContext* context,
205                                                    const proto::UnsubscribeRequest* request,
206                                                    proto::VehicleHalCallStatus* status) {
207     int32_t propId = request->prop_id();
208     int32_t areaId = request->area_id();
209     const auto status_code = mHardware->unsubscribe(propId, areaId);
210     status->set_status_code(static_cast<proto::StatusCode>(status_code));
211     return ::grpc::Status::OK;
212 }
213 
CheckHealth(::grpc::ServerContext * context,const::google::protobuf::Empty *,proto::VehicleHalCallStatus * status)214 ::grpc::Status GrpcVehicleProxyServer::CheckHealth(::grpc::ServerContext* context,
215                                                    const ::google::protobuf::Empty*,
216                                                    proto::VehicleHalCallStatus* status) {
217     status->set_status_code(static_cast<proto::StatusCode>(mHardware->checkHealth()));
218     return ::grpc::Status::OK;
219 }
220 
Dump(::grpc::ServerContext * context,const proto::DumpOptions * options,proto::DumpResult * result)221 ::grpc::Status GrpcVehicleProxyServer::Dump(::grpc::ServerContext* context,
222                                             const proto::DumpOptions* options,
223                                             proto::DumpResult* result) {
224     std::vector<std::string> dumpOptionStrings(options->options().begin(),
225                                                options->options().end());
226     auto dumpResult = mHardware->dump(dumpOptionStrings);
227     result->set_caller_should_dump_state(dumpResult.callerShouldDumpState);
228     result->set_buffer(dumpResult.buffer);
229     result->set_refresh_property_configs(dumpResult.refreshPropertyConfigs);
230     return ::grpc::Status::OK;
231 }
232 
StartPropertyValuesStream(::grpc::ServerContext * context,const::google::protobuf::Empty * request,::grpc::ServerWriter<proto::VehiclePropValues> * stream)233 ::grpc::Status GrpcVehicleProxyServer::StartPropertyValuesStream(
234         ::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
235         ::grpc::ServerWriter<proto::VehiclePropValues>* stream) {
236     auto conn = std::make_shared<ConnectionDescriptor>(stream);
237     {
238         std::lock_guard lck(mConnectionMutex);
239         mValueStreamingConnections.push_back(conn);
240     }
241     conn->Wait();
242     LOG(ERROR) << __func__ << ": Stream lost, ID : " << conn->ID();
243     return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost.");
244 }
245 
OnVehiclePropChange(const std::vector<aidlvhal::VehiclePropValue> & values)246 void GrpcVehicleProxyServer::OnVehiclePropChange(
247         const std::vector<aidlvhal::VehiclePropValue>& values) {
248     std::unordered_set<uint64_t> brokenConn;
249     proto::VehiclePropValues protoValues;
250     for (const auto& value : values) {
251         auto* protoValuePtr = protoValues.add_values();
252         proto_msg_converter::aidlToProto(value, protoValuePtr);
253     }
254     {
255         std::shared_lock read_lock(mConnectionMutex);
256         for (auto& connection : mValueStreamingConnections) {
257             auto writeOK = connection->Write(protoValues);
258             if (!writeOK) {
259                 LOG(ERROR) << __func__
260                            << ": Server Write failed, connection lost. ID: " << connection->ID();
261                 brokenConn.insert(connection->ID());
262             }
263         }
264     }
265     if (brokenConn.empty()) {
266         return;
267     }
268     std::unique_lock write_lock(mConnectionMutex);
269     mValueStreamingConnections.erase(
270             std::remove_if(mValueStreamingConnections.begin(), mValueStreamingConnections.end(),
271                            [&brokenConn](const auto& conn) {
272                                return brokenConn.find(conn->ID()) != brokenConn.end();
273                            }),
274             mValueStreamingConnections.end());
275 }
276 
Start()277 GrpcVehicleProxyServer& GrpcVehicleProxyServer::Start() {
278     if (mServer) {
279         LOG(WARNING) << __func__ << ": GrpcVehicleProxyServer has already started.";
280         return *this;
281     }
282     ::grpc::ServerBuilder builder;
283     builder.RegisterService(this);
284     for (const std::string& serviceAddr : mServiceAddrs) {
285         builder.AddListeningPort(serviceAddr, getServerCredentials());
286     }
287     mServer = builder.BuildAndStart();
288     CHECK(mServer) << __func__ << ": failed to create the GRPC server, "
289                    << "please make sure the configuration and permissions are correct";
290     return *this;
291 }
292 
Shutdown()293 GrpcVehicleProxyServer& GrpcVehicleProxyServer::Shutdown() {
294     std::shared_lock read_lock(mConnectionMutex);
295     for (auto& conn : mValueStreamingConnections) {
296         conn->Shutdown();
297     }
298     if (mServer) {
299         mServer->Shutdown();
300     }
301     return *this;
302 }
303 
Wait()304 void GrpcVehicleProxyServer::Wait() {
305     if (mServer) {
306         mServer->Wait();
307     }
308     mServer.reset();
309 }
310 
~ConnectionDescriptor()311 GrpcVehicleProxyServer::ConnectionDescriptor::~ConnectionDescriptor() {
312     Shutdown();
313 }
314 
Write(const proto::VehiclePropValues & values)315 bool GrpcVehicleProxyServer::ConnectionDescriptor::Write(const proto::VehiclePropValues& values) {
316     if (!mStream) {
317         LOG(ERROR) << __func__ << ": Empty stream. ID: " << ID();
318         Shutdown();
319         return false;
320     }
321     {
322         std::lock_guard lck(*mMtx);
323         if (!mShutdownFlag && mStream->Write(values)) {
324             return true;
325         } else {
326             LOG(ERROR) << __func__ << ": Server Write failed, connection lost. ID: " << ID();
327         }
328     }
329     Shutdown();
330     return false;
331 }
332 
Wait()333 void GrpcVehicleProxyServer::ConnectionDescriptor::Wait() {
334     std::unique_lock lck(*mMtx);
335     mCV->wait(lck, [this] { return mShutdownFlag; });
336 }
337 
Shutdown()338 void GrpcVehicleProxyServer::ConnectionDescriptor::Shutdown() {
339     {
340         std::lock_guard lck(*mMtx);
341         mShutdownFlag = true;
342     }
343     mCV->notify_all();
344 }
345 
346 }  // namespace android::hardware::automotive::vehicle::virtualization
347