1 // Copyright 2019 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "osp/public/presentation/presentation_connection.h"
6
7 #include <algorithm>
8 #include <memory>
9 #include <ostream>
10
11 #include "absl/strings/string_view.h"
12 #include "osp/impl/presentation/presentation_common.h"
13 #include "osp/msgs/osp_messages.h"
14 #include "osp/public/network_service_manager.h"
15 #include "osp/public/presentation/presentation_controller.h"
16 #include "osp/public/presentation/presentation_receiver.h"
17 #include "osp/public/protocol_connection.h"
18 #include "util/osp_logging.h"
19 #include "util/std_util.h"
20
21 // TODO(crbug.com/openscreen/27): Address TODOs in this file
22
23 namespace openscreen {
24 namespace osp {
25
26 namespace {
27
28 // TODO(jophba): replace Write methods with a unified write message surface
WriteConnectionMessage(const msgs::PresentationConnectionMessage & message,ProtocolConnection * connection)29 Error WriteConnectionMessage(const msgs::PresentationConnectionMessage& message,
30 ProtocolConnection* connection) {
31 return connection->WriteMessage(message,
32 msgs::EncodePresentationConnectionMessage);
33 }
34 } // namespace
35
Connection(const PresentationInfo & info,Delegate * delegate,ParentDelegate * parent_delegate)36 Connection::Connection(const PresentationInfo& info,
37 Delegate* delegate,
38 ParentDelegate* parent_delegate)
39 : presentation_(info),
40 state_(State::kConnecting),
41 delegate_(delegate),
42 parent_delegate_(parent_delegate),
43 connection_id_(0),
44 protocol_connection_(nullptr) {}
45
~Connection()46 Connection::~Connection() {
47 if (state_ == State::kConnected) {
48 Close(CloseReason::kDiscarded);
49 delegate_->OnDiscarded();
50 }
51 parent_delegate_->OnConnectionDestroyed(this);
52 }
53
OnConnecting()54 void Connection::OnConnecting() {
55 OSP_DCHECK(!protocol_connection_);
56 state_ = State::kConnecting;
57 }
58
OnConnected(uint64_t connection_id,uint64_t endpoint_id,std::unique_ptr<ProtocolConnection> protocol_connection)59 void Connection::OnConnected(
60 uint64_t connection_id,
61 uint64_t endpoint_id,
62 std::unique_ptr<ProtocolConnection> protocol_connection) {
63 if (state_ != State::kConnecting) {
64 return;
65 }
66 connection_id_ = connection_id;
67 endpoint_id_ = endpoint_id;
68 protocol_connection_ = std::move(protocol_connection);
69 state_ = State::kConnected;
70 delegate_->OnConnected();
71 }
72
OnClosed()73 bool Connection::OnClosed() {
74 if (state_ != State::kConnecting && state_ != State::kConnected)
75 return false;
76
77 protocol_connection_.reset();
78 state_ = State::kClosed;
79
80 return true;
81 }
82
OnClosedByError(Error cause)83 void Connection::OnClosedByError(Error cause) {
84 if (OnClosed()) {
85 std::ostringstream stream;
86 stream << cause;
87 delegate_->OnError(stream.str());
88 }
89 }
90
OnClosedByRemote()91 void Connection::OnClosedByRemote() {
92 if (OnClosed())
93 delegate_->OnClosedByRemote();
94 }
95
OnTerminated()96 void Connection::OnTerminated() {
97 if (state_ == State::kTerminated)
98 return;
99 protocol_connection_.reset();
100 state_ = State::kTerminated;
101 delegate_->OnTerminated();
102 }
103
SendString(absl::string_view message)104 Error Connection::SendString(absl::string_view message) {
105 if (state_ != State::kConnected)
106 return Error::Code::kNoActiveConnection;
107
108 msgs::PresentationConnectionMessage cbor_message;
109 OSP_LOG_INFO << "sending '" << message << "' to (" << presentation_.id << ", "
110 << connection_id_.value() << ")";
111 cbor_message.connection_id = connection_id_.value();
112 cbor_message.message.which =
113 msgs::PresentationConnectionMessage::Message::Which::kString;
114
115 new (&cbor_message.message.str) std::string(message);
116
117 return WriteConnectionMessage(cbor_message, protocol_connection_.get());
118 }
119
SendBinary(std::vector<uint8_t> && data)120 Error Connection::SendBinary(std::vector<uint8_t>&& data) {
121 if (state_ != State::kConnected)
122 return Error::Code::kNoActiveConnection;
123
124 msgs::PresentationConnectionMessage cbor_message;
125 OSP_LOG_INFO << "sending " << data.size() << " bytes to (" << presentation_.id
126 << ", " << connection_id_.value() << ")";
127 cbor_message.connection_id = connection_id_.value();
128 cbor_message.message.which =
129 msgs::PresentationConnectionMessage::Message::Which::kBytes;
130
131 new (&cbor_message.message.bytes) std::vector<uint8_t>(std::move(data));
132
133 return WriteConnectionMessage(cbor_message, protocol_connection_.get());
134 }
135
Close(CloseReason reason)136 Error Connection::Close(CloseReason reason) {
137 if (state_ == State::kClosed || state_ == State::kTerminated)
138 return Error::Code::kAlreadyClosed;
139
140 state_ = State::kClosed;
141 protocol_connection_.reset();
142
143 return parent_delegate_->CloseConnection(this, reason);
144 }
145
Terminate(TerminationReason reason)146 void Connection::Terminate(TerminationReason reason) {
147 if (state_ == State::kTerminated)
148 return;
149 state_ = State::kTerminated;
150 protocol_connection_.reset();
151 parent_delegate_->OnPresentationTerminated(presentation_.id, reason);
152 }
153
ConnectionManager(MessageDemuxer * demuxer)154 ConnectionManager::ConnectionManager(MessageDemuxer* demuxer) {
155 message_watch_ = demuxer->SetDefaultMessageTypeWatch(
156 msgs::Type::kPresentationConnectionMessage, this);
157
158 close_request_watch_ = demuxer->SetDefaultMessageTypeWatch(
159 msgs::Type::kPresentationConnectionCloseRequest, this);
160
161 close_event_watch_ = demuxer->SetDefaultMessageTypeWatch(
162 msgs::Type::kPresentationConnectionCloseEvent, this);
163 }
164
AddConnection(Connection * connection)165 void ConnectionManager::AddConnection(Connection* connection) {
166 auto emplace_result =
167 connections_.emplace(connection->connection_id(), connection);
168
169 OSP_DCHECK(emplace_result.second);
170 }
171
RemoveConnection(Connection * connection)172 void ConnectionManager::RemoveConnection(Connection* connection) {
173 auto entry = connections_.find(connection->connection_id());
174 if (entry != connections_.end()) {
175 connections_.erase(entry);
176 }
177 }
178
179 // TODO(jophba): add a utility object to track requests/responses
180 // TODO(jophba): refine the RegisterWatch/OnStreamMessage API. We
181 // should add a layer between the message logic and the parse/dispatch
182 // logic, and remove the CBOR information from ConnectionManager.
OnStreamMessage(uint64_t endpoint_id,uint64_t connection_id,msgs::Type message_type,const uint8_t * buffer,size_t buffer_size,Clock::time_point now)183 ErrorOr<size_t> ConnectionManager::OnStreamMessage(uint64_t endpoint_id,
184 uint64_t connection_id,
185 msgs::Type message_type,
186 const uint8_t* buffer,
187 size_t buffer_size,
188 Clock::time_point now) {
189 switch (message_type) {
190 case msgs::Type::kPresentationConnectionMessage: {
191 msgs::PresentationConnectionMessage message;
192 ssize_t bytes_decoded = msgs::DecodePresentationConnectionMessage(
193 buffer, buffer_size, &message);
194 if (bytes_decoded < 0) {
195 OSP_LOG_WARN << "presentation-connection-message parse error";
196 return Error::Code::kParseError;
197 }
198
199 Connection* connection = GetConnection(message.connection_id);
200 if (!connection) {
201 return Error::Code::kItemNotFound;
202 }
203
204 switch (message.message.which) {
205 case decltype(message.message.which)::kString:
206 connection->get_delegate()->OnStringMessage(message.message.str);
207 break;
208 case decltype(message.message.which)::kBytes:
209 connection->get_delegate()->OnBinaryMessage(message.message.bytes);
210 break;
211 default:
212 OSP_LOG_WARN << "uninitialized message data in "
213 "presentation-connection-message";
214 break;
215 }
216 return bytes_decoded;
217 }
218
219 case msgs::Type::kPresentationConnectionCloseRequest: {
220 msgs::PresentationConnectionCloseRequest request;
221 ssize_t bytes_decoded = msgs::DecodePresentationConnectionCloseRequest(
222 buffer, buffer_size, &request);
223 if (bytes_decoded < 0) {
224 OSP_LOG_WARN << "decode presentation-connection-close-request error: "
225 << bytes_decoded;
226 return Error::Code::kCborInvalidMessage;
227 }
228
229 msgs::PresentationConnectionCloseResponse response;
230 response.request_id = request.request_id;
231
232 Connection* connection = GetConnection(request.connection_id);
233 if (connection) {
234 response.result =
235 msgs::PresentationConnectionCloseResponse_result::kSuccess;
236 connection->OnClosedByRemote();
237 } else {
238 response.result = msgs::PresentationConnectionCloseResponse_result::
239 kInvalidConnectionId;
240 }
241
242 std::unique_ptr<ProtocolConnection> protocol_connection =
243 NetworkServiceManager::Get()
244 ->GetProtocolConnectionServer()
245 ->CreateProtocolConnection(endpoint_id);
246 if (protocol_connection) {
247 protocol_connection->WriteMessage(
248 response, &msgs::EncodePresentationConnectionCloseResponse);
249 }
250
251 return (response.result ==
252 msgs::PresentationConnectionCloseResponse_result::kSuccess)
253 ? ErrorOr<size_t>(bytes_decoded)
254 : Error::Code::kNoActiveConnection;
255 }
256
257 case msgs::Type::kPresentationConnectionCloseEvent: {
258 msgs::PresentationConnectionCloseEvent event;
259 ssize_t bytes_decoded = msgs::DecodePresentationConnectionCloseEvent(
260 buffer, buffer_size, &event);
261 if (bytes_decoded < 0) {
262 OSP_LOG_WARN << "decode presentation-connection-close-event error: "
263 << bytes_decoded;
264 return Error::Code::kParseError;
265 }
266
267 Connection* connection = GetConnection(event.connection_id);
268 if (!connection) {
269 return Error::Code::kNoActiveConnection;
270 }
271
272 connection->OnClosedByRemote();
273 return bytes_decoded;
274 }
275
276 // TODO(jophba): The spec says to close the connection if we get a message
277 // we don't understand. Figure out how to honor the spec here.
278 default:
279 return Error::Code::kUnknownMessageType;
280 }
281 }
282
GetConnection(uint64_t connection_id)283 Connection* ConnectionManager::GetConnection(uint64_t connection_id) {
284 auto entry = connections_.find(connection_id);
285 if (entry != connections_.end()) {
286 return entry->second;
287 }
288
289 OSP_DVLOG << "unknown ID: " << connection_id;
290 return nullptr;
291 }
292
293 } // namespace osp
294 } // namespace openscreen
295