1 /*
2 * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10 #include "rtc_tools/data_channel_benchmark/grpc_signaling.h"
11
12 #include <grpc/support/log.h>
13 #include <grpcpp/grpcpp.h>
14
15 #include <string>
16 #include <utility>
17
18 #include "api/jsep.h"
19 #include "api/jsep_ice_candidate.h"
20 #include "rtc_base/thread.h"
21 #include "rtc_tools/data_channel_benchmark/peer_connection_signaling.grpc.pb.h"
22
23 namespace webrtc {
24 namespace {
25
26 using GrpcSignaling::IceCandidate;
27 using GrpcSignaling::PeerConnectionSignaling;
28 using GrpcSignaling::SessionDescription;
29 using GrpcSignaling::SignalingMessage;
30
31 template <class T>
32 class SessionData : public webrtc::SignalingInterface {
33 public:
SessionData()34 SessionData() {}
SessionData(T * stream)35 explicit SessionData(T* stream) : stream_(stream) {}
SetStream(T * stream)36 void SetStream(T* stream) { stream_ = stream; }
37
SendIceCandidate(const IceCandidateInterface * candidate)38 void SendIceCandidate(const IceCandidateInterface* candidate) override {
39 RTC_LOG(LS_INFO) << "SendIceCandidate";
40 std::string serialized_candidate;
41 if (!candidate->ToString(&serialized_candidate)) {
42 RTC_LOG(LS_ERROR) << "Failed to serialize ICE candidate";
43 return;
44 }
45
46 SignalingMessage message;
47 IceCandidate* proto_candidate = message.mutable_candidate();
48 proto_candidate->set_description(serialized_candidate);
49 proto_candidate->set_mid(candidate->sdp_mid());
50 proto_candidate->set_mline_index(candidate->sdp_mline_index());
51
52 stream_->Write(message);
53 }
54
SendDescription(const SessionDescriptionInterface * sdp)55 void SendDescription(const SessionDescriptionInterface* sdp) override {
56 RTC_LOG(LS_INFO) << "SendDescription";
57
58 std::string serialized_sdp;
59 sdp->ToString(&serialized_sdp);
60
61 SignalingMessage message;
62 if (sdp->GetType() == SdpType::kOffer)
63 message.mutable_description()->set_type(SessionDescription::OFFER);
64 else if (sdp->GetType() == SdpType::kAnswer)
65 message.mutable_description()->set_type(SessionDescription::ANSWER);
66 message.mutable_description()->set_content(serialized_sdp);
67
68 stream_->Write(message);
69 }
70
OnRemoteDescription(std::function<void (std::unique_ptr<SessionDescriptionInterface> sdp)> callback)71 void OnRemoteDescription(
72 std::function<void(std::unique_ptr<SessionDescriptionInterface> sdp)>
73 callback) override {
74 RTC_LOG(LS_INFO) << "OnRemoteDescription";
75 remote_description_callback_ = callback;
76 }
77
OnIceCandidate(std::function<void (std::unique_ptr<IceCandidateInterface> candidate)> callback)78 void OnIceCandidate(
79 std::function<void(std::unique_ptr<IceCandidateInterface> candidate)>
80 callback) override {
81 RTC_LOG(LS_INFO) << "OnIceCandidate";
82 ice_candidate_callback_ = callback;
83 }
84
85 T* stream_;
86
87 std::function<void(std::unique_ptr<webrtc::IceCandidateInterface>)>
88 ice_candidate_callback_;
89 std::function<void(std::unique_ptr<webrtc::SessionDescriptionInterface>)>
90 remote_description_callback_;
91 };
92
93 using ServerSessionData =
94 SessionData<grpc::ServerReaderWriter<SignalingMessage, SignalingMessage>>;
95 using ClientSessionData =
96 SessionData<grpc::ClientReaderWriter<SignalingMessage, SignalingMessage>>;
97
98 template <class MessageType, class StreamReader, class SessionData>
ProcessMessages(StreamReader * stream,SessionData * session)99 void ProcessMessages(StreamReader* stream, SessionData* session) {
100 MessageType message;
101
102 while (stream->Read(&message)) {
103 switch (message.Content_case()) {
104 case SignalingMessage::ContentCase::kCandidate: {
105 webrtc::SdpParseError error;
106 auto jsep_candidate = std::make_unique<webrtc::JsepIceCandidate>(
107 message.candidate().mid(), message.candidate().mline_index());
108 if (!jsep_candidate->Initialize(message.candidate().description(),
109 &error)) {
110 RTC_LOG(LS_ERROR) << "Failed to deserialize ICE candidate '"
111 << message.candidate().description() << "'";
112 RTC_LOG(LS_ERROR)
113 << "Error at line " << error.line << ":" << error.description;
114 continue;
115 }
116
117 session->ice_candidate_callback_(std::move(jsep_candidate));
118 break;
119 }
120 case SignalingMessage::ContentCase::kDescription: {
121 auto& description = message.description();
122 auto content = description.content();
123
124 auto sdp = webrtc::CreateSessionDescription(
125 description.type() == SessionDescription::OFFER
126 ? webrtc::SdpType::kOffer
127 : webrtc::SdpType::kAnswer,
128 description.content());
129 session->remote_description_callback_(std::move(sdp));
130 break;
131 }
132 default:
133 RTC_DCHECK_NOTREACHED();
134 }
135 }
136 }
137
138 class GrpcNegotiationServer : public GrpcSignalingServerInterface,
139 public PeerConnectionSignaling::Service {
140 public:
GrpcNegotiationServer(std::function<void (webrtc::SignalingInterface *)> callback,int port,bool oneshot)141 GrpcNegotiationServer(
142 std::function<void(webrtc::SignalingInterface*)> callback,
143 int port,
144 bool oneshot)
145 : connect_callback_(std::move(callback)),
146 requested_port_(port),
147 oneshot_(oneshot) {}
~GrpcNegotiationServer()148 ~GrpcNegotiationServer() override {
149 Stop();
150 if (server_stop_thread_)
151 server_stop_thread_->Stop();
152 }
153
Start()154 void Start() override {
155 std::string server_address = "[::]";
156
157 grpc::ServerBuilder builder;
158 builder.AddListeningPort(
159 server_address + ":" + std::to_string(requested_port_),
160 grpc::InsecureServerCredentials(), &selected_port_);
161 builder.RegisterService(this);
162 server_ = builder.BuildAndStart();
163 }
164
Wait()165 void Wait() override { server_->Wait(); }
166
Stop()167 void Stop() override { server_->Shutdown(); }
168
SelectedPort()169 int SelectedPort() override { return selected_port_; }
170
Connect(grpc::ServerContext * context,grpc::ServerReaderWriter<SignalingMessage,SignalingMessage> * stream)171 grpc::Status Connect(
172 grpc::ServerContext* context,
173 grpc::ServerReaderWriter<SignalingMessage, SignalingMessage>* stream)
174 override {
175 if (oneshot_) {
176 // Request the termination of the server early so we don't serve another
177 // client in parallel.
178 server_stop_thread_ = rtc::Thread::Create();
179 server_stop_thread_->Start();
180 server_stop_thread_->PostTask([this] { Stop(); });
181 }
182
183 ServerSessionData session(stream);
184
185 auto reading_thread = rtc::Thread::Create();
186 reading_thread->Start();
187 reading_thread->PostTask([&session, &stream] {
188 ProcessMessages<SignalingMessage>(stream, &session);
189 });
190
191 connect_callback_(&session);
192
193 reading_thread->Stop();
194
195 return grpc::Status::OK;
196 }
197
198 private:
199 std::function<void(webrtc::SignalingInterface*)> connect_callback_;
200 int requested_port_;
201 int selected_port_;
202 bool oneshot_;
203
204 std::unique_ptr<grpc::Server> server_;
205 std::unique_ptr<rtc::Thread> server_stop_thread_;
206 };
207
208 class GrpcNegotiationClient : public GrpcSignalingClientInterface {
209 public:
GrpcNegotiationClient(const std::string & server)210 explicit GrpcNegotiationClient(const std::string& server) {
211 channel_ = grpc::CreateChannel(server, grpc::InsecureChannelCredentials());
212 stub_ = PeerConnectionSignaling::NewStub(channel_);
213 }
214
~GrpcNegotiationClient()215 ~GrpcNegotiationClient() override {
216 context_.TryCancel();
217 if (reading_thread_)
218 reading_thread_->Stop();
219 }
220
Start()221 bool Start() override {
222 if (!channel_->WaitForConnected(
223 absl::ToChronoTime(absl::Now() + absl::Seconds(3)))) {
224 return false;
225 }
226
227 stream_ = stub_->Connect(&context_);
228 session_.SetStream(stream_.get());
229
230 reading_thread_ = rtc::Thread::Create();
231 reading_thread_->Start();
232 reading_thread_->PostTask([this] {
233 ProcessMessages<SignalingMessage>(stream_.get(), &session_);
234 });
235
236 return true;
237 }
238
signaling_client()239 webrtc::SignalingInterface* signaling_client() override { return &session_; }
240
241 private:
242 std::shared_ptr<grpc::Channel> channel_;
243 std::unique_ptr<PeerConnectionSignaling::Stub> stub_;
244 std::unique_ptr<rtc::Thread> reading_thread_;
245 grpc::ClientContext context_;
246 std::unique_ptr<
247 ::grpc::ClientReaderWriter<SignalingMessage, SignalingMessage>>
248 stream_;
249 ClientSessionData session_;
250 };
251 } // namespace
252
253 std::unique_ptr<GrpcSignalingServerInterface>
Create(std::function<void (webrtc::SignalingInterface *)> callback,int port,bool oneshot)254 GrpcSignalingServerInterface::Create(
255 std::function<void(webrtc::SignalingInterface*)> callback,
256 int port,
257 bool oneshot) {
258 return std::make_unique<GrpcNegotiationServer>(std::move(callback), port,
259 oneshot);
260 }
261
262 std::unique_ptr<GrpcSignalingClientInterface>
Create(const std::string & server)263 GrpcSignalingClientInterface::Create(const std::string& server) {
264 return std::make_unique<GrpcNegotiationClient>(server);
265 }
266
267 } // namespace webrtc
268