xref: /aosp_15_r20/external/webrtc/rtc_tools/data_channel_benchmark/grpc_signaling.cc (revision d9f758449e529ab9291ac668be2861e7a55c2422)
1 /*
2  *  Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 #include "rtc_tools/data_channel_benchmark/grpc_signaling.h"
11 
12 #include <grpc/support/log.h>
13 #include <grpcpp/grpcpp.h>
14 
15 #include <string>
16 #include <utility>
17 
18 #include "api/jsep.h"
19 #include "api/jsep_ice_candidate.h"
20 #include "rtc_base/thread.h"
21 #include "rtc_tools/data_channel_benchmark/peer_connection_signaling.grpc.pb.h"
22 
23 namespace webrtc {
24 namespace {
25 
26 using GrpcSignaling::IceCandidate;
27 using GrpcSignaling::PeerConnectionSignaling;
28 using GrpcSignaling::SessionDescription;
29 using GrpcSignaling::SignalingMessage;
30 
31 template <class T>
32 class SessionData : public webrtc::SignalingInterface {
33  public:
SessionData()34   SessionData() {}
SessionData(T * stream)35   explicit SessionData(T* stream) : stream_(stream) {}
SetStream(T * stream)36   void SetStream(T* stream) { stream_ = stream; }
37 
SendIceCandidate(const IceCandidateInterface * candidate)38   void SendIceCandidate(const IceCandidateInterface* candidate) override {
39     RTC_LOG(LS_INFO) << "SendIceCandidate";
40     std::string serialized_candidate;
41     if (!candidate->ToString(&serialized_candidate)) {
42       RTC_LOG(LS_ERROR) << "Failed to serialize ICE candidate";
43       return;
44     }
45 
46     SignalingMessage message;
47     IceCandidate* proto_candidate = message.mutable_candidate();
48     proto_candidate->set_description(serialized_candidate);
49     proto_candidate->set_mid(candidate->sdp_mid());
50     proto_candidate->set_mline_index(candidate->sdp_mline_index());
51 
52     stream_->Write(message);
53   }
54 
SendDescription(const SessionDescriptionInterface * sdp)55   void SendDescription(const SessionDescriptionInterface* sdp) override {
56     RTC_LOG(LS_INFO) << "SendDescription";
57 
58     std::string serialized_sdp;
59     sdp->ToString(&serialized_sdp);
60 
61     SignalingMessage message;
62     if (sdp->GetType() == SdpType::kOffer)
63       message.mutable_description()->set_type(SessionDescription::OFFER);
64     else if (sdp->GetType() == SdpType::kAnswer)
65       message.mutable_description()->set_type(SessionDescription::ANSWER);
66     message.mutable_description()->set_content(serialized_sdp);
67 
68     stream_->Write(message);
69   }
70 
OnRemoteDescription(std::function<void (std::unique_ptr<SessionDescriptionInterface> sdp)> callback)71   void OnRemoteDescription(
72       std::function<void(std::unique_ptr<SessionDescriptionInterface> sdp)>
73           callback) override {
74     RTC_LOG(LS_INFO) << "OnRemoteDescription";
75     remote_description_callback_ = callback;
76   }
77 
OnIceCandidate(std::function<void (std::unique_ptr<IceCandidateInterface> candidate)> callback)78   void OnIceCandidate(
79       std::function<void(std::unique_ptr<IceCandidateInterface> candidate)>
80           callback) override {
81     RTC_LOG(LS_INFO) << "OnIceCandidate";
82     ice_candidate_callback_ = callback;
83   }
84 
85   T* stream_;
86 
87   std::function<void(std::unique_ptr<webrtc::IceCandidateInterface>)>
88       ice_candidate_callback_;
89   std::function<void(std::unique_ptr<webrtc::SessionDescriptionInterface>)>
90       remote_description_callback_;
91 };
92 
93 using ServerSessionData =
94     SessionData<grpc::ServerReaderWriter<SignalingMessage, SignalingMessage>>;
95 using ClientSessionData =
96     SessionData<grpc::ClientReaderWriter<SignalingMessage, SignalingMessage>>;
97 
98 template <class MessageType, class StreamReader, class SessionData>
ProcessMessages(StreamReader * stream,SessionData * session)99 void ProcessMessages(StreamReader* stream, SessionData* session) {
100   MessageType message;
101 
102   while (stream->Read(&message)) {
103     switch (message.Content_case()) {
104       case SignalingMessage::ContentCase::kCandidate: {
105         webrtc::SdpParseError error;
106         auto jsep_candidate = std::make_unique<webrtc::JsepIceCandidate>(
107             message.candidate().mid(), message.candidate().mline_index());
108         if (!jsep_candidate->Initialize(message.candidate().description(),
109                                         &error)) {
110           RTC_LOG(LS_ERROR) << "Failed to deserialize ICE candidate '"
111                             << message.candidate().description() << "'";
112           RTC_LOG(LS_ERROR)
113               << "Error at line " << error.line << ":" << error.description;
114           continue;
115         }
116 
117         session->ice_candidate_callback_(std::move(jsep_candidate));
118         break;
119       }
120       case SignalingMessage::ContentCase::kDescription: {
121         auto& description = message.description();
122         auto content = description.content();
123 
124         auto sdp = webrtc::CreateSessionDescription(
125             description.type() == SessionDescription::OFFER
126                 ? webrtc::SdpType::kOffer
127                 : webrtc::SdpType::kAnswer,
128             description.content());
129         session->remote_description_callback_(std::move(sdp));
130         break;
131       }
132       default:
133         RTC_DCHECK_NOTREACHED();
134     }
135   }
136 }
137 
138 class GrpcNegotiationServer : public GrpcSignalingServerInterface,
139                               public PeerConnectionSignaling::Service {
140  public:
GrpcNegotiationServer(std::function<void (webrtc::SignalingInterface *)> callback,int port,bool oneshot)141   GrpcNegotiationServer(
142       std::function<void(webrtc::SignalingInterface*)> callback,
143       int port,
144       bool oneshot)
145       : connect_callback_(std::move(callback)),
146         requested_port_(port),
147         oneshot_(oneshot) {}
~GrpcNegotiationServer()148   ~GrpcNegotiationServer() override {
149     Stop();
150     if (server_stop_thread_)
151       server_stop_thread_->Stop();
152   }
153 
Start()154   void Start() override {
155     std::string server_address = "[::]";
156 
157     grpc::ServerBuilder builder;
158     builder.AddListeningPort(
159         server_address + ":" + std::to_string(requested_port_),
160         grpc::InsecureServerCredentials(), &selected_port_);
161     builder.RegisterService(this);
162     server_ = builder.BuildAndStart();
163   }
164 
Wait()165   void Wait() override { server_->Wait(); }
166 
Stop()167   void Stop() override { server_->Shutdown(); }
168 
SelectedPort()169   int SelectedPort() override { return selected_port_; }
170 
Connect(grpc::ServerContext * context,grpc::ServerReaderWriter<SignalingMessage,SignalingMessage> * stream)171   grpc::Status Connect(
172       grpc::ServerContext* context,
173       grpc::ServerReaderWriter<SignalingMessage, SignalingMessage>* stream)
174       override {
175     if (oneshot_) {
176       // Request the termination of the server early so we don't serve another
177       // client in parallel.
178       server_stop_thread_ = rtc::Thread::Create();
179       server_stop_thread_->Start();
180       server_stop_thread_->PostTask([this] { Stop(); });
181     }
182 
183     ServerSessionData session(stream);
184 
185     auto reading_thread = rtc::Thread::Create();
186     reading_thread->Start();
187     reading_thread->PostTask([&session, &stream] {
188       ProcessMessages<SignalingMessage>(stream, &session);
189     });
190 
191     connect_callback_(&session);
192 
193     reading_thread->Stop();
194 
195     return grpc::Status::OK;
196   }
197 
198  private:
199   std::function<void(webrtc::SignalingInterface*)> connect_callback_;
200   int requested_port_;
201   int selected_port_;
202   bool oneshot_;
203 
204   std::unique_ptr<grpc::Server> server_;
205   std::unique_ptr<rtc::Thread> server_stop_thread_;
206 };
207 
208 class GrpcNegotiationClient : public GrpcSignalingClientInterface {
209  public:
GrpcNegotiationClient(const std::string & server)210   explicit GrpcNegotiationClient(const std::string& server) {
211     channel_ = grpc::CreateChannel(server, grpc::InsecureChannelCredentials());
212     stub_ = PeerConnectionSignaling::NewStub(channel_);
213   }
214 
~GrpcNegotiationClient()215   ~GrpcNegotiationClient() override {
216     context_.TryCancel();
217     if (reading_thread_)
218       reading_thread_->Stop();
219   }
220 
Start()221   bool Start() override {
222     if (!channel_->WaitForConnected(
223             absl::ToChronoTime(absl::Now() + absl::Seconds(3)))) {
224       return false;
225     }
226 
227     stream_ = stub_->Connect(&context_);
228     session_.SetStream(stream_.get());
229 
230     reading_thread_ = rtc::Thread::Create();
231     reading_thread_->Start();
232     reading_thread_->PostTask([this] {
233       ProcessMessages<SignalingMessage>(stream_.get(), &session_);
234     });
235 
236     return true;
237   }
238 
signaling_client()239   webrtc::SignalingInterface* signaling_client() override { return &session_; }
240 
241  private:
242   std::shared_ptr<grpc::Channel> channel_;
243   std::unique_ptr<PeerConnectionSignaling::Stub> stub_;
244   std::unique_ptr<rtc::Thread> reading_thread_;
245   grpc::ClientContext context_;
246   std::unique_ptr<
247       ::grpc::ClientReaderWriter<SignalingMessage, SignalingMessage>>
248       stream_;
249   ClientSessionData session_;
250 };
251 }  // namespace
252 
253 std::unique_ptr<GrpcSignalingServerInterface>
Create(std::function<void (webrtc::SignalingInterface *)> callback,int port,bool oneshot)254 GrpcSignalingServerInterface::Create(
255     std::function<void(webrtc::SignalingInterface*)> callback,
256     int port,
257     bool oneshot) {
258   return std::make_unique<GrpcNegotiationServer>(std::move(callback), port,
259                                                  oneshot);
260 }
261 
262 std::unique_ptr<GrpcSignalingClientInterface>
Create(const std::string & server)263 GrpcSignalingClientInterface::Create(const std::string& server) {
264   return std::make_unique<GrpcNegotiationClient>(server);
265 }
266 
267 }  // namespace webrtc
268