1 /*
2  * Copyright (C) 2021 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "camera_streamer.h"
17 
18 #include <android-base/logging.h>
19 #include <chrono>
20 #include "common/libs/utils/vsock_connection.h"
21 
22 namespace cuttlefish {
23 namespace webrtc_streaming {
24 
CameraStreamer(unsigned int port,unsigned int cid,bool vhost_user)25 CameraStreamer::CameraStreamer(unsigned int port, unsigned int cid,
26                                bool vhost_user)
27     : cid_(cid),
28       port_(port),
29       vhost_user_(vhost_user),
30       camera_session_active_(false) {}
31 
~CameraStreamer()32 CameraStreamer::~CameraStreamer() { Disconnect(); }
33 
34 // We are getting frames from the client so try forwarding those to the CVD
OnFrame(const webrtc::VideoFrame & client_frame)35 void CameraStreamer::OnFrame(const webrtc::VideoFrame& client_frame) {
36   std::lock_guard<std::mutex> lock(onframe_mutex_);
37   if (!cvd_connection_.IsConnected_Unguarded() &&
38       !pending_connection_.valid()) {
39     // Start new connection
40     pending_connection_ =
41         cvd_connection_.ConnectAsync(port_, cid_, vhost_user_);
42     return;
43   } else if (pending_connection_.valid()) {
44     if (!IsConnectionReady()) {
45       return;
46     }
47     std::lock_guard<std::mutex> lock(settings_mutex_);
48     if (!cvd_connection_.WriteMessage(settings_buffer_)) {
49       LOG(ERROR) << "Failed writing camera settings:";
50       return;
51     }
52     StartReadLoop();
53     LOG(INFO) << "Connected!";
54   }
55   auto resolution = resolution_.load();
56   if (resolution.height <= 0 || resolution.width <= 0 ||
57       !camera_session_active_.load()) {
58     // Nobody is receiving frames or we don't have a valid resolution that is
59     // necessary for potential frame scaling
60     return;
61   }
62   auto frame = client_frame.video_frame_buffer()->ToI420().get();
63   if (frame->width() != resolution.width ||
64       frame->height() != resolution.height) {
65     // incoming resolution does not match with the resolution we
66     // have communicated to the CVD - scaling required
67     if (!scaled_frame_ || resolution.width != scaled_frame_->width() ||
68         resolution.height != scaled_frame_->height()) {
69       scaled_frame_ =
70           webrtc::I420Buffer::Create(resolution.width, resolution.height);
71     }
72     scaled_frame_->CropAndScaleFrom(*frame);
73     frame = scaled_frame_.get();
74   }
75   if (!VsockSendYUVFrame(frame)) {
76     LOG(ERROR) << "Sending frame over vsock failed";
77   }
78 }
79 
80 // Handle message json coming from client
HandleMessage(const Json::Value & message)81 void CameraStreamer::HandleMessage(const Json::Value& message) {
82   auto command = message["command"].asString();
83   if (command == "camera_settings") {
84     // save local copy of resolution that is required for frame scaling
85     resolution_ = GetResolutionFromSettings(message);
86     Json::StreamWriterBuilder factory;
87     std::string new_settings = Json::writeString(factory, message);
88     if (!settings_buffer_.empty() && new_settings != settings_buffer_) {
89       // Settings have changed - disconnect
90       // Next incoming frames will trigger re-connection
91       Disconnect();
92     }
93     std::lock_guard<std::mutex> lock(settings_mutex_);
94     settings_buffer_ = new_settings;
95     LOG(INFO) << "New camera settings received:" << new_settings;
96   }
97 }
98 
99 // Handle binary blobs coming from client
HandleMessage(const std::vector<char> & message)100 void CameraStreamer::HandleMessage(const std::vector<char>& message) {
101   LOG(INFO) << "Pass through " << message.size() << "bytes";
102   std::lock_guard<std::mutex> lock(frame_mutex_);
103   cvd_connection_.WriteMessage(message);
104 }
105 
GetResolutionFromSettings(const Json::Value & settings)106 CameraStreamer::Resolution CameraStreamer::GetResolutionFromSettings(
107     const Json::Value& settings) {
108   return {.width = settings["width"].asInt(),
109           .height = settings["height"].asInt()};
110 }
111 
VsockSendYUVFrame(const webrtc::I420BufferInterface * frame)112 bool CameraStreamer::VsockSendYUVFrame(
113     const webrtc::I420BufferInterface* frame) {
114   int32_t size = frame->width() * frame->height() +
115                  2 * frame->ChromaWidth() * frame->ChromaHeight();
116   const char* y = reinterpret_cast<const char*>(frame->DataY());
117   const char* u = reinterpret_cast<const char*>(frame->DataU());
118   const char* v = reinterpret_cast<const char*>(frame->DataV());
119   auto chroma_width = frame->ChromaWidth();
120   auto chroma_height = frame->ChromaHeight();
121   std::lock_guard<std::mutex> lock(frame_mutex_);
122   return cvd_connection_.Write(size) &&
123          cvd_connection_.WriteStrides(y, frame->width(), frame->height(),
124                                       frame->StrideY()) &&
125          cvd_connection_.WriteStrides(u, chroma_width, chroma_height,
126                                       frame->StrideU()) &&
127          cvd_connection_.WriteStrides(v, chroma_width, chroma_height,
128                                       frame->StrideV());
129 }
130 
IsConnectionReady()131 bool CameraStreamer::IsConnectionReady() {
132   if (!pending_connection_.valid()) {
133     return cvd_connection_.IsConnected();
134   } else if (pending_connection_.wait_for(std::chrono::seconds(0)) !=
135              std::future_status::ready) {
136     // Still waiting for connection
137     return false;
138   } else if (settings_buffer_.empty()) {
139     // connection is ready but we have not yet received client
140     // camera settings
141     return false;
142   }
143   return pending_connection_.get();
144 }
145 
StartReadLoop()146 void CameraStreamer::StartReadLoop() {
147   if (reader_thread_.joinable()) {
148     reader_thread_.join();
149   }
150   reader_thread_ = std::thread([this] {
151     while (cvd_connection_.IsConnected()) {
152       static constexpr auto kEventKey = "event";
153       static constexpr auto kMessageStart =
154           "VIRTUAL_DEVICE_START_CAMERA_SESSION";
155       static constexpr auto kMessageStop = "VIRTUAL_DEVICE_STOP_CAMERA_SESSION";
156       auto json_value = cvd_connection_.ReadJsonMessage();
157       if (json_value[kEventKey] == kMessageStart) {
158         camera_session_active_ = true;
159       } else if (json_value[kEventKey] == kMessageStop) {
160         camera_session_active_ = false;
161       }
162       if (!json_value.empty()) {
163         SendMessage(json_value);
164       }
165     }
166     LOG(INFO) << "Exit reader thread";
167   });
168 }
169 
Disconnect()170 void CameraStreamer::Disconnect() {
171   cvd_connection_.Disconnect();
172   if (reader_thread_.joinable()) {
173     reader_thread_.join();
174   }
175 }
176 
177 }  // namespace webrtc_streaming
178 }  // namespace cuttlefish
179