1 /*
2  * Copyright (C) 2020 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "host/frontend/webrtc/libdevice/streamer.h"
18 
19 #include <android-base/logging.h>
20 #include <json/json.h>
21 
22 #include <api/audio_codecs/audio_decoder_factory.h>
23 #include <api/audio_codecs/audio_encoder_factory.h>
24 #include <api/audio_codecs/builtin_audio_decoder_factory.h>
25 #include <api/audio_codecs/builtin_audio_encoder_factory.h>
26 #include <api/create_peerconnection_factory.h>
27 #include <api/peer_connection_interface.h>
28 #include <api/video_codecs/builtin_video_decoder_factory.h>
29 #include <api/video_codecs/builtin_video_encoder_factory.h>
30 #include <api/video_codecs/video_decoder_factory.h>
31 #include <api/video_codecs/video_encoder_factory.h>
32 #include <media/base/video_broadcaster.h>
33 #include <pc/video_track_source.h>
34 
35 #include "common/libs/fs/shared_fd.h"
36 #include "host/frontend/webrtc/libcommon/audio_device.h"
37 #include "host/frontend/webrtc/libcommon/peer_connection_utils.h"
38 #include "host/frontend/webrtc/libcommon/port_range_socket_factory.h"
39 #include "host/frontend/webrtc/libcommon/utils.h"
40 #include "host/frontend/webrtc/libcommon/vp8only_encoder_factory.h"
41 #include "host/frontend/webrtc/libdevice/audio_track_source_impl.h"
42 #include "host/frontend/webrtc/libdevice/camera_streamer.h"
43 #include "host/frontend/webrtc/libdevice/client_handler.h"
44 #include "host/frontend/webrtc/libdevice/video_track_source_impl.h"
45 #include "host/frontend/webrtc_operator/constants/signaling_constants.h"
46 
47 namespace cuttlefish {
48 namespace webrtc_streaming {
49 namespace {
50 
51 constexpr auto kStreamIdField = "stream_id";
52 constexpr auto kLabelField = "label";
53 constexpr auto kXResField = "x_res";
54 constexpr auto kYResField = "y_res";
55 constexpr auto kDpiField = "dpi";
56 constexpr auto kIsTouchField = "is_touch";
57 constexpr auto kDisplaysField = "displays";
58 constexpr auto kTouchpadsField = "touchpads";
59 constexpr auto kAudioStreamsField = "audio_streams";
60 constexpr auto kHardwareField = "hardware";
61 constexpr auto kOpenwrtDeviceIdField = "openwrt_device_id";
62 constexpr auto kOpenwrtAddrField = "openwrt_addr";
63 constexpr auto kAdbPortField = "adb_port";
64 constexpr auto kControlEnvProxyServerPathField =
65     "control_env_proxy_server_path";
66 constexpr auto kControlPanelButtonCommand = "command";
67 constexpr auto kControlPanelButtonTitle = "title";
68 constexpr auto kControlPanelButtonIconName = "icon_name";
69 constexpr auto kControlPanelButtonShellCommand = "shell_command";
70 constexpr auto kControlPanelButtonDeviceStates = "device_states";
71 constexpr auto kControlPanelButtonLidSwitchOpen = "lid_switch_open";
72 constexpr auto kControlPanelButtonHingeAngleValue = "hinge_angle_value";
73 constexpr auto kCustomControlPanelButtonsField = "custom_control_panel_buttons";
74 constexpr auto kMouseEnabled = "mouse_enabled";
75 constexpr auto kGroupIdField = "group_id";
76 
77 constexpr int kRegistrationRetries = 3;
78 constexpr int kRetryFirstIntervalMs = 1000;
79 constexpr int kReconnectRetries = 100;
80 constexpr int kReconnectIntervalMs = 1000;
81 
ParseMessage(const uint8_t * data,size_t length,Json::Value * msg_out)82 bool ParseMessage(const uint8_t* data, size_t length, Json::Value* msg_out) {
83   auto str = reinterpret_cast<const char*>(data);
84   Json::CharReaderBuilder builder;
85   std::unique_ptr<Json::CharReader> json_reader(builder.newCharReader());
86   std::string errorMessage;
87   return json_reader->parse(str, str + length, msg_out, &errorMessage);
88 }
89 
90 struct DisplayDescriptor {
91   int width;
92   int height;
93   int dpi;
94   bool touch_enabled;
95   rtc::scoped_refptr<webrtc::VideoTrackSourceInterface> source;
96 };
97 
98 struct TouchpadDescriptor {
99   int width;
100   int height;
101 };
102 
103 struct ControlPanelButtonDescriptor {
104   std::string command;
105   std::string title;
106   std::string icon_name;
107   std::optional<std::string> shell_command;
108   std::vector<DeviceState> device_states;
109 };
110 
111 // TODO (jemoreira): move to a place in common with the signaling server
112 struct OperatorServerConfig {
113   std::vector<webrtc::PeerConnectionInterface::IceServer> servers;
114 };
115 
116 // Wraps a scoped_refptr pointer to an audio device module
117 class AudioDeviceModuleWrapper : public AudioSource {
118  public:
AudioDeviceModuleWrapper(rtc::scoped_refptr<CfAudioDeviceModule> device_module)119   AudioDeviceModuleWrapper(
120       rtc::scoped_refptr<CfAudioDeviceModule> device_module)
121       : device_module_(device_module) {}
GetMoreAudioData(void * data,int bytes_per_sample,int samples_per_channel,int num_channels,int sample_rate,bool & muted)122   int GetMoreAudioData(void* data, int bytes_per_sample,
123                        int samples_per_channel, int num_channels,
124                        int sample_rate, bool& muted) override {
125     return device_module_->GetMoreAudioData(data, bytes_per_sample,
126                                             samples_per_channel, num_channels,
127                                             sample_rate, muted);
128   }
129 
device_module()130   rtc::scoped_refptr<CfAudioDeviceModule> device_module() {
131     return device_module_;
132   }
133 
134  private:
135   rtc::scoped_refptr<CfAudioDeviceModule> device_module_;
136 };
137 
138 }  // namespace
139 
140 
141 class Streamer::Impl : public ServerConnectionObserver,
142                        public PeerConnectionBuilder,
143                        public std::enable_shared_from_this<ServerConnectionObserver> {
144  public:
145   std::shared_ptr<ClientHandler> CreateClientHandler(int client_id);
146 
147   void Register(std::weak_ptr<OperatorObserver> observer);
148 
149   void SendMessageToClient(int client_id, const Json::Value& msg);
150   void DestroyClientHandler(int client_id);
151   void SetupCameraForClient(int client_id);
152 
153   // WsObserver
154   void OnOpen() override;
155   void OnClose() override;
156   void OnError(const std::string& error) override;
157   void OnReceive(const uint8_t* msg, size_t length, bool is_binary) override;
158 
159   void HandleConfigMessage(const Json::Value& msg);
160   void HandleClientMessage(const Json::Value& server_message);
161 
162   // PeerConnectionBuilder
163   Result<rtc::scoped_refptr<webrtc::PeerConnectionInterface>> Build(
164       webrtc::PeerConnectionObserver& observer,
165       const std::vector<webrtc::PeerConnectionInterface::IceServer>&
166           per_connection_servers) override;
167 
168   // All accesses to these variables happen from the signal_thread, so there is
169   // no need for extra synchronization mechanisms (mutex)
170   StreamerConfig config_;
171   OperatorServerConfig operator_config_;
172   std::unique_ptr<ServerConnection> server_connection_;
173   std::shared_ptr<ConnectionObserverFactory> connection_observer_factory_;
174   rtc::scoped_refptr<webrtc::PeerConnectionFactoryInterface>
175       peer_connection_factory_;
176   std::unique_ptr<rtc::Thread> network_thread_;
177   std::unique_ptr<rtc::Thread> worker_thread_;
178   std::unique_ptr<rtc::Thread> signal_thread_;
179   std::map<std::string, DisplayDescriptor> displays_;
180   std::map<std::string, TouchpadDescriptor> touchpads_;
181   std::map<std::string, rtc::scoped_refptr<AudioTrackSourceImpl>>
182       audio_sources_;
183   std::map<int, std::shared_ptr<ClientHandler>> clients_;
184   std::weak_ptr<OperatorObserver> operator_observer_;
185   std::map<std::string, std::string> hardware_;
186   std::vector<ControlPanelButtonDescriptor> custom_control_panel_buttons_;
187   std::shared_ptr<AudioDeviceModuleWrapper> audio_device_module_;
188   std::unique_ptr<CameraStreamer> camera_streamer_;
189   int registration_retries_left_ = kRegistrationRetries;
190   int retry_interval_ms_ = kRetryFirstIntervalMs;
191   RecordingManager* recording_manager_ = nullptr;
192 };
193 
Streamer(std::unique_ptr<Streamer::Impl> impl)194 Streamer::Streamer(std::unique_ptr<Streamer::Impl> impl)
195     : impl_(std::move(impl)) {}
196 
197 /* static */
Create(const StreamerConfig & cfg,RecordingManager & recording_manager,std::shared_ptr<ConnectionObserverFactory> connection_observer_factory)198 std::unique_ptr<Streamer> Streamer::Create(
199     const StreamerConfig& cfg, RecordingManager& recording_manager,
200     std::shared_ptr<ConnectionObserverFactory> connection_observer_factory) {
201   rtc::LogMessage::LogToDebug(rtc::LS_ERROR);
202 
203   std::unique_ptr<Streamer::Impl> impl(new Streamer::Impl());
204   impl->config_ = cfg;
205   impl->recording_manager_ = &recording_manager;
206   impl->connection_observer_factory_ = connection_observer_factory;
207 
208   auto network_thread_result = CreateAndStartThread("network-thread");
209   if (!network_thread_result.ok()) {
210     LOG(ERROR) << network_thread_result.error().FormatForEnv();
211     return nullptr;
212   }
213   impl->network_thread_ = std::move(*network_thread_result);
214 
215   auto worker_thread_result = CreateAndStartThread("worker-thread");
216   if (!worker_thread_result.ok()) {
217     LOG(ERROR) << worker_thread_result.error().FormatForEnv();
218     return nullptr;
219   }
220   impl->worker_thread_ = std::move(*worker_thread_result);
221 
222   auto signal_thread_result = CreateAndStartThread("signal-thread");
223   if (!signal_thread_result.ok()) {
224     LOG(ERROR) << signal_thread_result.error().FormatForEnv();
225     return nullptr;
226   }
227   impl->signal_thread_ = std::move(*signal_thread_result);
228 
229   impl->audio_device_module_ = std::make_shared<AudioDeviceModuleWrapper>(
230       rtc::scoped_refptr<CfAudioDeviceModule>(
231           new rtc::RefCountedObject<CfAudioDeviceModule>()));
232 
233   auto result = CreatePeerConnectionFactory(
234       impl->network_thread_.get(), impl->worker_thread_.get(),
235       impl->signal_thread_.get(), impl->audio_device_module_->device_module());
236 
237   if (!result.ok()) {
238     LOG(ERROR) << result.error().FormatForEnv();
239     return nullptr;
240   }
241   impl->peer_connection_factory_ = *result;
242 
243   return std::unique_ptr<Streamer>(new Streamer(std::move(impl)));
244 }
245 
AddDisplay(const std::string & label,int width,int height,int dpi,bool touch_enabled)246 std::shared_ptr<VideoSink> Streamer::AddDisplay(const std::string& label,
247                                                 int width, int height, int dpi,
248                                                 bool touch_enabled) {
249   // Usually called from an application thread
250   return impl_->signal_thread_->BlockingCall(
251       [this, &label, width, height, dpi,
252        touch_enabled]() -> std::shared_ptr<VideoSink> {
253         if (impl_->displays_.count(label)) {
254           LOG(ERROR) << "Display with same label already exists: " << label;
255           return nullptr;
256         }
257         rtc::scoped_refptr<VideoTrackSourceImpl> source(
258             new rtc::RefCountedObject<VideoTrackSourceImpl>(width, height));
259         impl_->displays_[label] = {width, height, dpi, touch_enabled, source};
260 
261         auto video_track = impl_->peer_connection_factory_->CreateVideoTrack(
262             label, source.get());
263 
264         for (auto& [_, client] : impl_->clients_) {
265           client->AddDisplay(video_track, label);
266         }
267 
268         if (impl_->recording_manager_) {
269           rtc::scoped_refptr<webrtc::VideoTrackSourceInterface> source2 =
270               source;
271           auto deleter = [](webrtc::VideoTrackSourceInterface* source) {
272             source->Release();
273           };
274           std::shared_ptr<webrtc::VideoTrackSourceInterface> source_shared(
275               source2.release(), deleter);
276           impl_->recording_manager_->AddSource(width, height, source_shared, label);
277         }
278 
279         return std::shared_ptr<VideoSink>(
280             new VideoTrackSourceImplSinkWrapper(source));
281       });
282 }
283 
RemoveDisplay(const std::string & label)284 bool Streamer::RemoveDisplay(const std::string& label) {
285   // Usually called from an application thread
286   return impl_->signal_thread_->BlockingCall(
287       [this, &label]() -> bool {
288         if (impl_->recording_manager_) {
289           impl_->recording_manager_->RemoveSource(label);
290         }
291 
292         for (auto& [_, client] : impl_->clients_) {
293           client->RemoveDisplay(label);
294         }
295 
296         impl_->displays_.erase(label);
297         return true;
298       });
299 }
300 
AddTouchpad(const std::string & label,int width,int height)301 bool Streamer::AddTouchpad(const std::string& label, int width, int height) {
302   // Usually called from an application thread
303   return impl_->signal_thread_->BlockingCall(
304       [this, &label, width, height]() -> bool {
305         if (impl_->touchpads_.count(label)) {
306           LOG(ERROR) << "Touchpad with same label already exists: " << label;
307           return false;
308         }
309         impl_->touchpads_[label] = {width, height};
310 
311         return true;
312       });
313 }
314 
AddAudioStream(const std::string & label)315 std::shared_ptr<AudioSink> Streamer::AddAudioStream(const std::string& label) {
316   // Usually called from an application thread
317   return impl_->signal_thread_->BlockingCall(
318       [this, &label]() -> std::shared_ptr<AudioSink> {
319         if (impl_->audio_sources_.count(label)) {
320           LOG(ERROR) << "Audio stream with same label already exists: "
321                      << label;
322           return nullptr;
323         }
324         rtc::scoped_refptr<AudioTrackSourceImpl> source(
325             new rtc::RefCountedObject<AudioTrackSourceImpl>());
326         impl_->audio_sources_[label] = source;
327         return std::shared_ptr<AudioSink>(
328             new AudioTrackSourceImplSinkWrapper(source));
329       });
330 }
331 
GetAudioSource()332 std::shared_ptr<AudioSource> Streamer::GetAudioSource() {
333   return impl_->audio_device_module_;
334 }
335 
AddCamera(unsigned int port,unsigned int cid,bool vhost_user)336 CameraController* Streamer::AddCamera(unsigned int port, unsigned int cid,
337                                       bool vhost_user) {
338   impl_->camera_streamer_ =
339       std::make_unique<CameraStreamer>(port, cid, vhost_user);
340   return impl_->camera_streamer_.get();
341 }
342 
SetHardwareSpec(std::string key,std::string value)343 void Streamer::SetHardwareSpec(std::string key, std::string value) {
344   impl_->hardware_.emplace(key, value);
345 }
346 
AddCustomControlPanelButton(const std::string & command,const std::string & title,const std::string & icon_name)347 void Streamer::AddCustomControlPanelButton(const std::string& command,
348                                            const std::string& title,
349                                            const std::string& icon_name) {
350   ControlPanelButtonDescriptor button = {
351       .command = command, .title = title, .icon_name = icon_name};
352   impl_->custom_control_panel_buttons_.push_back(button);
353 }
354 
AddCustomControlPanelButtonWithShellCommand(const std::string & command,const std::string & title,const std::string & icon_name,const std::string & shell_command)355 void Streamer::AddCustomControlPanelButtonWithShellCommand(
356     const std::string& command, const std::string& title,
357     const std::string& icon_name, const std::string& shell_command) {
358   ControlPanelButtonDescriptor button = {
359       .command = command, .title = title, .icon_name = icon_name};
360   button.shell_command = shell_command;
361   impl_->custom_control_panel_buttons_.push_back(button);
362 }
363 
AddCustomControlPanelButtonWithDeviceStates(const std::string & command,const std::string & title,const std::string & icon_name,const std::vector<DeviceState> & device_states)364 void Streamer::AddCustomControlPanelButtonWithDeviceStates(
365     const std::string& command, const std::string& title,
366     const std::string& icon_name,
367     const std::vector<DeviceState>& device_states) {
368   ControlPanelButtonDescriptor button = {
369       .command = command, .title = title, .icon_name = icon_name};
370   button.device_states = device_states;
371   impl_->custom_control_panel_buttons_.push_back(button);
372 }
373 
Register(std::weak_ptr<OperatorObserver> observer)374 void Streamer::Register(std::weak_ptr<OperatorObserver> observer) {
375   // Usually called from an application thread
376   // No need to block the calling thread on this, the observer will be notified
377   // when the connection is established.
378   impl_->signal_thread_->PostTask([this, observer]() {
379     impl_->Register(observer);
380   });
381 }
382 
Unregister()383 void Streamer::Unregister() {
384   // Usually called from an application thread.
385   impl_->signal_thread_->PostTask(
386       [this]() { impl_->server_connection_.reset(); });
387 }
388 
Register(std::weak_ptr<OperatorObserver> observer)389 void Streamer::Impl::Register(std::weak_ptr<OperatorObserver> observer) {
390   operator_observer_ = observer;
391   // When the connection is established the OnOpen function will be called where
392   // the registration will take place
393   if (!server_connection_) {
394     server_connection_ =
395         ServerConnection::Connect(config_.operator_server, weak_from_this());
396   } else {
397     // in case connection attempt is retried, just call Reconnect().
398     // Recreating server_connection_ object will destroy existing WSConnection
399     // object and task re-scheduling will fail
400     server_connection_->Reconnect();
401   }
402 }
403 
OnOpen()404 void Streamer::Impl::OnOpen() {
405   // Called from the websocket thread.
406   // Connected to operator.
407   signal_thread_->PostTask([this]() {
408     Json::Value register_obj;
409     register_obj[cuttlefish::webrtc_signaling::kTypeField] =
410         cuttlefish::webrtc_signaling::kRegisterType;
411     register_obj[cuttlefish::webrtc_signaling::kDeviceIdField] =
412         config_.device_id;
413     CHECK(config_.client_files_port >= 0) << "Invalid device port provided";
414     register_obj[cuttlefish::webrtc_signaling::kDevicePortField] =
415         config_.client_files_port;
416 
417     Json::Value device_info;
418     Json::Value displays(Json::ValueType::arrayValue);
419     // No need to synchronize with other accesses to display_ because all
420     // happens on signal_thread.
421     for (auto& entry : displays_) {
422       Json::Value display;
423       display[kStreamIdField] = entry.first;
424       display[kXResField] = entry.second.width;
425       display[kYResField] = entry.second.height;
426       display[kDpiField] = entry.second.dpi;
427       display[kIsTouchField] = true;
428       displays.append(display);
429     }
430 
431     device_info[kGroupIdField] = config_.group_id;
432     device_info[kDisplaysField] = displays;
433 
434     Json::Value touchpads(Json::ValueType::arrayValue);
435     for (const auto& [label, touchpad_desc] : touchpads_) {
436       Json::Value touchpad;
437       touchpad[kXResField] = touchpad_desc.width;
438       touchpad[kYResField] = touchpad_desc.height;
439       touchpad[kLabelField] = label;
440       touchpads.append(touchpad);
441     }
442     device_info[kTouchpadsField] = touchpads;
443     Json::Value audio_streams(Json::ValueType::arrayValue);
444     for (auto& entry : audio_sources_) {
445       Json::Value audio;
446       audio[kStreamIdField] = entry.first;
447       audio_streams.append(audio);
448     }
449     device_info[kAudioStreamsField] = audio_streams;
450     Json::Value hardware;
451     for (const auto& [k, v] : hardware_) {
452       hardware[k] = v;
453     }
454     device_info[kHardwareField] = hardware;
455     device_info[kOpenwrtDeviceIdField] = config_.openwrt_device_id;
456     device_info[kOpenwrtAddrField] = config_.openwrt_addr;
457     device_info[kAdbPortField] = config_.adb_port;
458     device_info[kControlEnvProxyServerPathField] =
459         config_.control_env_proxy_server_path;
460     Json::Value custom_control_panel_buttons(Json::arrayValue);
461     for (const auto& button : custom_control_panel_buttons_) {
462       Json::Value button_entry;
463       button_entry[kControlPanelButtonCommand] = button.command;
464       button_entry[kControlPanelButtonTitle] = button.title;
465       button_entry[kControlPanelButtonIconName] = button.icon_name;
466       if (button.shell_command) {
467         button_entry[kControlPanelButtonShellCommand] = *(button.shell_command);
468       } else if (!button.device_states.empty()) {
469         Json::Value device_states(Json::arrayValue);
470         for (const DeviceState& device_state : button.device_states) {
471           Json::Value device_state_entry;
472           if (device_state.lid_switch_open) {
473             device_state_entry[kControlPanelButtonLidSwitchOpen] =
474                 *device_state.lid_switch_open;
475           }
476           if (device_state.hinge_angle_value) {
477             device_state_entry[kControlPanelButtonHingeAngleValue] =
478                 *device_state.hinge_angle_value;
479           }
480           device_states.append(device_state_entry);
481         }
482         button_entry[kControlPanelButtonDeviceStates] = device_states;
483       }
484       custom_control_panel_buttons.append(button_entry);
485     }
486     // Add mouse button conditionally.
487     device_info[kMouseEnabled] = config_.enable_mouse;
488 
489     device_info[kCustomControlPanelButtonsField] = custom_control_panel_buttons;
490     register_obj[cuttlefish::webrtc_signaling::kDeviceInfoField] = device_info;
491     server_connection_->Send(register_obj);
492     // Do this last as OnRegistered() is user code and may take some time to
493     // complete (although it shouldn't...)
494     auto observer = operator_observer_.lock();
495     if (observer) {
496       observer->OnRegistered();
497     }
498   });
499 }
500 
OnClose()501 void Streamer::Impl::OnClose() {
502   // Called from websocket thread
503   // The operator shouldn't close the connection with the client, it's up to the
504   // device to decide when to disconnect.
505   LOG(WARNING) << "Connection with server closed unexpectedly";
506   signal_thread_->PostTask([this]() {
507     auto observer = operator_observer_.lock();
508     if (observer) {
509       observer->OnClose();
510     }
511   });
512   LOG(INFO) << "Trying to re-connect to operator..";
513   registration_retries_left_ = kReconnectRetries;
514   retry_interval_ms_ = kReconnectIntervalMs;
515   signal_thread_->PostDelayedTask(
516       [this]() { Register(operator_observer_); },
517       webrtc::TimeDelta::Millis(retry_interval_ms_));
518 }
519 
OnError(const std::string & error)520 void Streamer::Impl::OnError(const std::string& error) {
521   // Called from websocket thread.
522   if (registration_retries_left_) {
523     LOG(WARNING) << "Connection to operator failed (" << error << "), "
524                  << registration_retries_left_ << " retries left"
525                  << " (will retry in " << retry_interval_ms_ / 1000 << "s)";
526     --registration_retries_left_;
527     signal_thread_->PostDelayedTask(
528         [this]() {
529           // Need to reconnect and register again with operator
530           Register(operator_observer_);
531         },
532         webrtc::TimeDelta::Millis(retry_interval_ms_));
533     retry_interval_ms_ *= 2;
534   } else {
535     LOG(ERROR) << "Error on connection with the operator: " << error;
536     signal_thread_->PostTask([this]() {
537       auto observer = operator_observer_.lock();
538       if (observer) {
539         observer->OnError();
540       }
541     });
542   }
543 }
544 
HandleConfigMessage(const Json::Value & server_message)545 void Streamer::Impl::HandleConfigMessage(const Json::Value& server_message) {
546   CHECK(signal_thread_->IsCurrent())
547       << __FUNCTION__ << " called from the wrong thread";
548   auto result = ParseIceServersMessage(server_message);
549   if (!result.ok()) {
550     LOG(WARNING) << "Failed to parse ice servers message from server: "
551                  << result.error().FormatForEnv();
552   }
553   operator_config_.servers = *result;
554 }
555 
HandleClientMessage(const Json::Value & server_message)556 void Streamer::Impl::HandleClientMessage(const Json::Value& server_message) {
557   CHECK(signal_thread_->IsCurrent())
558       << __FUNCTION__ << " called from the wrong thread";
559   if (!server_message.isMember(cuttlefish::webrtc_signaling::kClientIdField) ||
560       !server_message[cuttlefish::webrtc_signaling::kClientIdField].isInt()) {
561     LOG(ERROR) << "Client message received without valid client id";
562     return;
563   }
564   auto client_id =
565       server_message[cuttlefish::webrtc_signaling::kClientIdField].asInt();
566   if (!server_message.isMember(cuttlefish::webrtc_signaling::kPayloadField)) {
567     LOG(WARNING) << "Received empty client message";
568     return;
569   }
570   auto client_message =
571       server_message[cuttlefish::webrtc_signaling::kPayloadField];
572   if (clients_.count(client_id) == 0) {
573     auto client_handler = CreateClientHandler(client_id);
574     if (!client_handler) {
575       LOG(ERROR) << "Failed to create a new client handler";
576       return;
577     }
578     clients_.emplace(client_id, client_handler);
579   }
580   auto client_handler = clients_[client_id];
581 
582   client_handler->HandleMessage(client_message);
583 }
584 
OnReceive(const uint8_t * msg,size_t length,bool is_binary)585 void Streamer::Impl::OnReceive(const uint8_t* msg, size_t length,
586                                bool is_binary) {
587   // Usually called from websocket thread.
588   Json::Value server_message;
589   // Once OnReceive returns the buffer can be destroyed/recycled at any time, so
590   // parse the data into a JSON object while still on the websocket thread.
591   if (is_binary || !ParseMessage(msg, length, &server_message)) {
592     LOG(ERROR) << "Received invalid JSON from server: '"
593                << (is_binary ? std::string("(binary_data)")
594                              : std::string(msg, msg + length))
595                << "'";
596     return;
597   }
598   // Transition to the signal thread before member variables are accessed.
599   signal_thread_->PostTask([this, server_message]() {
600     if (!server_message.isMember(cuttlefish::webrtc_signaling::kTypeField) ||
601         !server_message[cuttlefish::webrtc_signaling::kTypeField].isString()) {
602       LOG(ERROR) << "No message_type field from server";
603       // Notify the caller
604       OnError(
605           "Invalid message received from operator: no message type field "
606           "present");
607       return;
608     }
609     auto type =
610         server_message[cuttlefish::webrtc_signaling::kTypeField].asString();
611     if (type == cuttlefish::webrtc_signaling::kConfigType) {
612       HandleConfigMessage(server_message);
613     } else if (type == cuttlefish::webrtc_signaling::kClientDisconnectType) {
614       if (!server_message.isMember(
615               cuttlefish::webrtc_signaling::kClientIdField) ||
616           !server_message.isMember(
617               cuttlefish::webrtc_signaling::kClientIdField)) {
618         LOG(ERROR) << "Invalid disconnect message received from server";
619         // Notify the caller
620         OnError("Invalid disconnect message: client_id is required");
621         return;
622       }
623       auto client_id =
624           server_message[cuttlefish::webrtc_signaling::kClientIdField].asInt();
625       LOG(INFO) << "Client " << client_id << " has disconnected.";
626       DestroyClientHandler(client_id);
627     } else if (type == cuttlefish::webrtc_signaling::kClientMessageType) {
628       HandleClientMessage(server_message);
629     } else {
630       LOG(ERROR) << "Unknown message type: " << type;
631       // Notify the caller
632       OnError("Invalid message received from operator: unknown message type");
633       return;
634     }
635   });
636 }
637 
CreateClientHandler(int client_id)638 std::shared_ptr<ClientHandler> Streamer::Impl::CreateClientHandler(
639     int client_id) {
640   CHECK(signal_thread_->IsCurrent())
641       << __FUNCTION__ << " called from the wrong thread";
642   auto observer = connection_observer_factory_->CreateObserver();
643 
644   auto client_handler = ClientHandler::Create(
645       client_id, observer, *this,
646       [this, client_id](const Json::Value& msg) {
647         SendMessageToClient(client_id, msg);
648       },
649       [this, client_id](bool isOpen) {
650         if (isOpen) {
651           SetupCameraForClient(client_id);
652         } else {
653           DestroyClientHandler(client_id);
654         }
655       });
656 
657   for (auto& entry : displays_) {
658     auto& label = entry.first;
659     auto& video_source = entry.second.source;
660 
661     auto video_track =
662         peer_connection_factory_->CreateVideoTrack(label, video_source.get());
663     client_handler->AddDisplay(video_track, label);
664   }
665 
666   for (auto& entry : audio_sources_) {
667     auto& label = entry.first;
668     auto& audio_stream = entry.second;
669     auto audio_track =
670         peer_connection_factory_->CreateAudioTrack(label, audio_stream.get());
671     client_handler->AddAudio(audio_track, label);
672   }
673 
674   return client_handler;
675 }
676 
677 Result<rtc::scoped_refptr<webrtc::PeerConnectionInterface>>
Build(webrtc::PeerConnectionObserver & observer,const std::vector<webrtc::PeerConnectionInterface::IceServer> & per_connection_servers)678 Streamer::Impl::Build(
679     webrtc::PeerConnectionObserver& observer,
680     const std::vector<webrtc::PeerConnectionInterface::IceServer>&
681         per_connection_servers) {
682   webrtc::PeerConnectionDependencies dependencies(&observer);
683   auto servers = operator_config_.servers;
684   servers.insert(servers.end(), per_connection_servers.begin(),
685                  per_connection_servers.end());
686   if (config_.udp_port_range != config_.tcp_port_range) {
687     // libwebrtc removed the ability to provide a packet socket factory when
688     // creating a peer connection. They plan to provide that functionality with
689     // the peer connection factory, but that's currently incomplete (the packet
690     // socket factory is ignored by the peer connection factory). The only other
691     // choice to customize port ranges is through the port allocator config, but
692     // this is suboptimal as it only allows to specify a single port range that
693     // will be use for both tcp and udp ports.
694     LOG(WARNING) << "TCP and UDP port ranges differ, TCP connections may not "
695                     "work properly";
696   }
697   return CF_EXPECT(
698       CreatePeerConnection(peer_connection_factory_, std::move(dependencies),
699                            config_.udp_port_range.first,
700                            config_.udp_port_range.second, servers),
701       "Failed to build peer connection");
702 }
703 
SendMessageToClient(int client_id,const Json::Value & msg)704 void Streamer::Impl::SendMessageToClient(int client_id,
705                                          const Json::Value& msg) {
706   LOG(VERBOSE) << "Sending to client: " << msg.toStyledString();
707   CHECK(signal_thread_->IsCurrent())
708       << __FUNCTION__ << " called from the wrong thread";
709   Json::Value wrapper;
710   wrapper[cuttlefish::webrtc_signaling::kPayloadField] = msg;
711   wrapper[cuttlefish::webrtc_signaling::kTypeField] =
712       cuttlefish::webrtc_signaling::kForwardType;
713   wrapper[cuttlefish::webrtc_signaling::kClientIdField] = client_id;
714   // This is safe to call from the webrtc threads because
715   // ServerConnection(s) are thread safe
716   server_connection_->Send(wrapper);
717 }
718 
DestroyClientHandler(int client_id)719 void Streamer::Impl::DestroyClientHandler(int client_id) {
720   // Usually called from signal thread, could be called from websocket thread or
721   // an application thread.
722   signal_thread_->PostTask([this, client_id]() {
723     // This needs to be 'posted' to the thread instead of 'invoked'
724     // immediately for two reasons:
725     // * The client handler is destroyed by this code, it's generally a
726     // bad idea (though not necessarily wrong) to return to a member
727     // function of a destroyed object.
728     // * The client handler may call this from within a peer connection
729     // observer callback, destroying the client handler there leads to a
730     // deadlock.
731     clients_.erase(client_id);
732   });
733 }
734 
SetupCameraForClient(int client_id)735 void Streamer::Impl::SetupCameraForClient(int client_id) {
736   if (!camera_streamer_) {
737     return;
738   }
739   auto client_handler = clients_[client_id];
740   if (client_handler) {
741     auto camera_track = client_handler->GetCameraStream();
742     if (camera_track) {
743       camera_track->AddOrUpdateSink(camera_streamer_.get(),
744                                     rtc::VideoSinkWants());
745     }
746   }
747 }
748 
749 }  // namespace webrtc_streaming
750 }  // namespace cuttlefish
751