1 // Copyright 2019 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "quiche/quic/core/quic_datagram_queue.h"
6
7 #include "absl/types/span.h"
8 #include "quiche/quic/core/quic_constants.h"
9 #include "quiche/quic/core/quic_session.h"
10 #include "quiche/quic/core/quic_time.h"
11 #include "quiche/quic/core/quic_types.h"
12
13 namespace quic {
14
15 constexpr float kExpiryInMinRtts = 1.25;
16 constexpr float kMinPacingWindows = 4;
17
QuicDatagramQueue(QuicSession * session)18 QuicDatagramQueue::QuicDatagramQueue(QuicSession* session)
19 : QuicDatagramQueue(session, nullptr) {}
20
QuicDatagramQueue(QuicSession * session,std::unique_ptr<Observer> observer)21 QuicDatagramQueue::QuicDatagramQueue(QuicSession* session,
22 std::unique_ptr<Observer> observer)
23 : session_(session),
24 clock_(session->connection()->clock()),
25 observer_(std::move(observer)) {}
26
SendOrQueueDatagram(quiche::QuicheMemSlice datagram)27 MessageStatus QuicDatagramQueue::SendOrQueueDatagram(
28 quiche::QuicheMemSlice datagram) {
29 // If the queue is non-empty, always queue the daragram. This ensures that
30 // the datagrams are sent in the same order that they were sent by the
31 // application.
32 if (queue_.empty()) {
33 MessageResult result = session_->SendMessage(absl::MakeSpan(&datagram, 1),
34 /*flush=*/force_flush_);
35 if (result.status != MESSAGE_STATUS_BLOCKED) {
36 if (observer_) {
37 observer_->OnDatagramProcessed(result.status);
38 }
39 return result.status;
40 }
41 }
42
43 queue_.emplace_back(Datagram{std::move(datagram),
44 clock_->ApproximateNow() + GetMaxTimeInQueue()});
45 return MESSAGE_STATUS_BLOCKED;
46 }
47
TrySendingNextDatagram()48 std::optional<MessageStatus> QuicDatagramQueue::TrySendingNextDatagram() {
49 RemoveExpiredDatagrams();
50 if (queue_.empty()) {
51 return std::nullopt;
52 }
53
54 MessageResult result =
55 session_->SendMessage(absl::MakeSpan(&queue_.front().datagram, 1));
56 if (result.status != MESSAGE_STATUS_BLOCKED) {
57 queue_.pop_front();
58 if (observer_) {
59 observer_->OnDatagramProcessed(result.status);
60 }
61 }
62 return result.status;
63 }
64
SendDatagrams()65 size_t QuicDatagramQueue::SendDatagrams() {
66 size_t num_datagrams = 0;
67 for (;;) {
68 std::optional<MessageStatus> status = TrySendingNextDatagram();
69 if (!status.has_value()) {
70 break;
71 }
72 if (*status == MESSAGE_STATUS_BLOCKED) {
73 break;
74 }
75 num_datagrams++;
76 }
77 return num_datagrams;
78 }
79
GetMaxTimeInQueue() const80 QuicTime::Delta QuicDatagramQueue::GetMaxTimeInQueue() const {
81 if (!max_time_in_queue_.IsZero()) {
82 return max_time_in_queue_;
83 }
84
85 const QuicTime::Delta min_rtt =
86 session_->connection()->sent_packet_manager().GetRttStats()->min_rtt();
87 return std::max(kExpiryInMinRtts * min_rtt,
88 kMinPacingWindows * kAlarmGranularity);
89 }
90
RemoveExpiredDatagrams()91 void QuicDatagramQueue::RemoveExpiredDatagrams() {
92 QuicTime now = clock_->ApproximateNow();
93 while (!queue_.empty() && queue_.front().expiry <= now) {
94 ++expired_datagram_count_;
95 queue_.pop_front();
96 if (observer_) {
97 observer_->OnDatagramProcessed(std::nullopt);
98 }
99 }
100 }
101
102 } // namespace quic
103