1*6dbdd20aSAndroid Build Coastguard Worker /*
2*6dbdd20aSAndroid Build Coastguard Worker * Copyright (C) 2021 The Android Open Source Project
3*6dbdd20aSAndroid Build Coastguard Worker *
4*6dbdd20aSAndroid Build Coastguard Worker * Licensed under the Apache License, Version 2.0 (the "License");
5*6dbdd20aSAndroid Build Coastguard Worker * you may not use this file except in compliance with the License.
6*6dbdd20aSAndroid Build Coastguard Worker * You may obtain a copy of the License at
7*6dbdd20aSAndroid Build Coastguard Worker *
8*6dbdd20aSAndroid Build Coastguard Worker * http://www.apache.org/licenses/LICENSE-2.0
9*6dbdd20aSAndroid Build Coastguard Worker *
10*6dbdd20aSAndroid Build Coastguard Worker * Unless required by applicable law or agreed to in writing, software
11*6dbdd20aSAndroid Build Coastguard Worker * distributed under the License is distributed on an "AS IS" BASIS,
12*6dbdd20aSAndroid Build Coastguard Worker * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13*6dbdd20aSAndroid Build Coastguard Worker * See the License for the specific language governing permissions and
14*6dbdd20aSAndroid Build Coastguard Worker * limitations under the License.
15*6dbdd20aSAndroid Build Coastguard Worker */
16*6dbdd20aSAndroid Build Coastguard Worker
17*6dbdd20aSAndroid Build Coastguard Worker #include "perfetto/ext/protozero/proto_ring_buffer.h"
18*6dbdd20aSAndroid Build Coastguard Worker
19*6dbdd20aSAndroid Build Coastguard Worker #include "perfetto/base/logging.h"
20*6dbdd20aSAndroid Build Coastguard Worker #include "perfetto/ext/base/paged_memory.h"
21*6dbdd20aSAndroid Build Coastguard Worker #include "perfetto/protozero/proto_utils.h"
22*6dbdd20aSAndroid Build Coastguard Worker
23*6dbdd20aSAndroid Build Coastguard Worker namespace protozero {
24*6dbdd20aSAndroid Build Coastguard Worker
25*6dbdd20aSAndroid Build Coastguard Worker namespace {
26*6dbdd20aSAndroid Build Coastguard Worker constexpr size_t kGrowBytes = 128 * 1024;
27*6dbdd20aSAndroid Build Coastguard Worker
FramingError()28*6dbdd20aSAndroid Build Coastguard Worker inline ProtoRingBuffer::Message FramingError() {
29*6dbdd20aSAndroid Build Coastguard Worker ProtoRingBuffer::Message msg{};
30*6dbdd20aSAndroid Build Coastguard Worker msg.fatal_framing_error = true;
31*6dbdd20aSAndroid Build Coastguard Worker return msg;
32*6dbdd20aSAndroid Build Coastguard Worker }
33*6dbdd20aSAndroid Build Coastguard Worker
34*6dbdd20aSAndroid Build Coastguard Worker // Tries to decode a length-delimited proto field from |start|.
35*6dbdd20aSAndroid Build Coastguard Worker // Returns a valid boundary if the preamble is valid and the length is within
36*6dbdd20aSAndroid Build Coastguard Worker // |end|, or an invalid message otherwise.
TryReadProtoMessage(const uint8_t * start,const uint8_t * end)37*6dbdd20aSAndroid Build Coastguard Worker ProtoRingBuffer::Message TryReadProtoMessage(const uint8_t* start,
38*6dbdd20aSAndroid Build Coastguard Worker const uint8_t* end) {
39*6dbdd20aSAndroid Build Coastguard Worker namespace proto_utils = protozero::proto_utils;
40*6dbdd20aSAndroid Build Coastguard Worker uint64_t field_tag = 0;
41*6dbdd20aSAndroid Build Coastguard Worker auto* start_of_len = proto_utils::ParseVarInt(start, end, &field_tag);
42*6dbdd20aSAndroid Build Coastguard Worker if (start_of_len == start)
43*6dbdd20aSAndroid Build Coastguard Worker return ProtoRingBuffer::Message{}; // Not enough data.
44*6dbdd20aSAndroid Build Coastguard Worker
45*6dbdd20aSAndroid Build Coastguard Worker const uint32_t tag = field_tag & 0x07;
46*6dbdd20aSAndroid Build Coastguard Worker if (tag !=
47*6dbdd20aSAndroid Build Coastguard Worker static_cast<uint32_t>(proto_utils::ProtoWireType::kLengthDelimited)) {
48*6dbdd20aSAndroid Build Coastguard Worker PERFETTO_ELOG("RPC framing error, unexpected msg tag 0x%xu", tag);
49*6dbdd20aSAndroid Build Coastguard Worker return FramingError();
50*6dbdd20aSAndroid Build Coastguard Worker }
51*6dbdd20aSAndroid Build Coastguard Worker
52*6dbdd20aSAndroid Build Coastguard Worker uint64_t msg_len = 0;
53*6dbdd20aSAndroid Build Coastguard Worker auto* start_of_msg = proto_utils::ParseVarInt(start_of_len, end, &msg_len);
54*6dbdd20aSAndroid Build Coastguard Worker if (start_of_msg == start_of_len)
55*6dbdd20aSAndroid Build Coastguard Worker return ProtoRingBuffer::Message{}; // Not enough data.
56*6dbdd20aSAndroid Build Coastguard Worker
57*6dbdd20aSAndroid Build Coastguard Worker if (msg_len > ProtoRingBuffer::kMaxMsgSize) {
58*6dbdd20aSAndroid Build Coastguard Worker PERFETTO_ELOG("RPC framing error, message too large (%" PRIu64 " > %zu)",
59*6dbdd20aSAndroid Build Coastguard Worker msg_len, ProtoRingBuffer::kMaxMsgSize);
60*6dbdd20aSAndroid Build Coastguard Worker return FramingError();
61*6dbdd20aSAndroid Build Coastguard Worker }
62*6dbdd20aSAndroid Build Coastguard Worker
63*6dbdd20aSAndroid Build Coastguard Worker if (start_of_msg + msg_len > end)
64*6dbdd20aSAndroid Build Coastguard Worker return ProtoRingBuffer::Message{}; // Not enough data.
65*6dbdd20aSAndroid Build Coastguard Worker
66*6dbdd20aSAndroid Build Coastguard Worker ProtoRingBuffer::Message msg{};
67*6dbdd20aSAndroid Build Coastguard Worker msg.start = start_of_msg;
68*6dbdd20aSAndroid Build Coastguard Worker msg.len = static_cast<uint32_t>(msg_len);
69*6dbdd20aSAndroid Build Coastguard Worker msg.field_id = static_cast<uint32_t>(field_tag >> 3);
70*6dbdd20aSAndroid Build Coastguard Worker return msg;
71*6dbdd20aSAndroid Build Coastguard Worker }
72*6dbdd20aSAndroid Build Coastguard Worker
73*6dbdd20aSAndroid Build Coastguard Worker } // namespace
74*6dbdd20aSAndroid Build Coastguard Worker
RingBufferMessageReader()75*6dbdd20aSAndroid Build Coastguard Worker RingBufferMessageReader::RingBufferMessageReader()
76*6dbdd20aSAndroid Build Coastguard Worker : buf_(perfetto::base::PagedMemory::Allocate(kGrowBytes)) {}
77*6dbdd20aSAndroid Build Coastguard Worker RingBufferMessageReader::~RingBufferMessageReader() = default;
78*6dbdd20aSAndroid Build Coastguard Worker
Append(const void * data_void,size_t data_len)79*6dbdd20aSAndroid Build Coastguard Worker void RingBufferMessageReader::Append(const void* data_void, size_t data_len) {
80*6dbdd20aSAndroid Build Coastguard Worker if (failed_)
81*6dbdd20aSAndroid Build Coastguard Worker return;
82*6dbdd20aSAndroid Build Coastguard Worker const uint8_t* data = static_cast<const uint8_t*>(data_void);
83*6dbdd20aSAndroid Build Coastguard Worker PERFETTO_DCHECK(wr_ <= buf_.size());
84*6dbdd20aSAndroid Build Coastguard Worker PERFETTO_DCHECK(wr_ >= rd_);
85*6dbdd20aSAndroid Build Coastguard Worker
86*6dbdd20aSAndroid Build Coastguard Worker // If the last call to ReadMessage() consumed all the data in the buffer and
87*6dbdd20aSAndroid Build Coastguard Worker // there are no incomplete messages pending, restart from the beginning rather
88*6dbdd20aSAndroid Build Coastguard Worker // than keep ringing. This is the most common case.
89*6dbdd20aSAndroid Build Coastguard Worker if (rd_ == wr_)
90*6dbdd20aSAndroid Build Coastguard Worker rd_ = wr_ = 0;
91*6dbdd20aSAndroid Build Coastguard Worker
92*6dbdd20aSAndroid Build Coastguard Worker // The caller is expected to always issue a ReadMessage() after each Append().
93*6dbdd20aSAndroid Build Coastguard Worker PERFETTO_CHECK(!fastpath_.valid());
94*6dbdd20aSAndroid Build Coastguard Worker if (rd_ == wr_) {
95*6dbdd20aSAndroid Build Coastguard Worker auto msg = TryReadMessage(data, data + data_len);
96*6dbdd20aSAndroid Build Coastguard Worker if (msg.valid() && msg.end() == (data + data_len)) {
97*6dbdd20aSAndroid Build Coastguard Worker // Fastpath: in many cases, the underlying stream will effectively
98*6dbdd20aSAndroid Build Coastguard Worker // preserve the atomicity of messages for most small messages.
99*6dbdd20aSAndroid Build Coastguard Worker // In this case we can avoid the extra buf_ roundtrip and just pass a
100*6dbdd20aSAndroid Build Coastguard Worker // pointer to |data| + (proto preamble len).
101*6dbdd20aSAndroid Build Coastguard Worker // The next call to ReadMessage)= will return |fastpath_|.
102*6dbdd20aSAndroid Build Coastguard Worker fastpath_ = std::move(msg);
103*6dbdd20aSAndroid Build Coastguard Worker return;
104*6dbdd20aSAndroid Build Coastguard Worker }
105*6dbdd20aSAndroid Build Coastguard Worker }
106*6dbdd20aSAndroid Build Coastguard Worker
107*6dbdd20aSAndroid Build Coastguard Worker size_t avail = buf_.size() - wr_;
108*6dbdd20aSAndroid Build Coastguard Worker if (data_len > avail) {
109*6dbdd20aSAndroid Build Coastguard Worker // This whole section should be hit extremely rarely.
110*6dbdd20aSAndroid Build Coastguard Worker
111*6dbdd20aSAndroid Build Coastguard Worker // Try first just recompacting the buffer by moving everything to the left.
112*6dbdd20aSAndroid Build Coastguard Worker // This can happen if we received "a message and a bit" on each Append call
113*6dbdd20aSAndroid Build Coastguard Worker // so we ended pup in a situation like:
114*6dbdd20aSAndroid Build Coastguard Worker // buf_: [unused space] [msg1 incomplete]
115*6dbdd20aSAndroid Build Coastguard Worker // ^rd_ ^wr_
116*6dbdd20aSAndroid Build Coastguard Worker //
117*6dbdd20aSAndroid Build Coastguard Worker // After recompaction:
118*6dbdd20aSAndroid Build Coastguard Worker // buf_: [msg1 incomplete]
119*6dbdd20aSAndroid Build Coastguard Worker // ^rd_ ^wr_
120*6dbdd20aSAndroid Build Coastguard Worker uint8_t* buf = static_cast<uint8_t*>(buf_.Get());
121*6dbdd20aSAndroid Build Coastguard Worker memmove(&buf[0], &buf[rd_], wr_ - rd_);
122*6dbdd20aSAndroid Build Coastguard Worker avail += rd_;
123*6dbdd20aSAndroid Build Coastguard Worker wr_ -= rd_;
124*6dbdd20aSAndroid Build Coastguard Worker rd_ = 0;
125*6dbdd20aSAndroid Build Coastguard Worker if (data_len > avail) {
126*6dbdd20aSAndroid Build Coastguard Worker // The compaction didn't free up enough space and we need to expand the
127*6dbdd20aSAndroid Build Coastguard Worker // ring buffer. Yes, we could have detected this earlier and split the
128*6dbdd20aSAndroid Build Coastguard Worker // code paths, rather than first compacting and then realizing it wasn't
129*6dbdd20aSAndroid Build Coastguard Worker // sufficient. However, that would make the code harder to reason about,
130*6dbdd20aSAndroid Build Coastguard Worker // creating code paths that are nearly never hit, hence making it more
131*6dbdd20aSAndroid Build Coastguard Worker // likely to accumulate bugs in future. All this is very rare.
132*6dbdd20aSAndroid Build Coastguard Worker size_t new_size = buf_.size();
133*6dbdd20aSAndroid Build Coastguard Worker while (data_len > new_size - wr_)
134*6dbdd20aSAndroid Build Coastguard Worker new_size += kGrowBytes;
135*6dbdd20aSAndroid Build Coastguard Worker if (new_size > kMaxMsgSize * 2) {
136*6dbdd20aSAndroid Build Coastguard Worker failed_ = true;
137*6dbdd20aSAndroid Build Coastguard Worker return;
138*6dbdd20aSAndroid Build Coastguard Worker }
139*6dbdd20aSAndroid Build Coastguard Worker auto new_buf = perfetto::base::PagedMemory::Allocate(new_size);
140*6dbdd20aSAndroid Build Coastguard Worker memcpy(new_buf.Get(), buf_.Get(), buf_.size());
141*6dbdd20aSAndroid Build Coastguard Worker buf_ = std::move(new_buf);
142*6dbdd20aSAndroid Build Coastguard Worker avail = new_size - wr_;
143*6dbdd20aSAndroid Build Coastguard Worker // No need to touch rd_ / wr_ cursors.
144*6dbdd20aSAndroid Build Coastguard Worker }
145*6dbdd20aSAndroid Build Coastguard Worker }
146*6dbdd20aSAndroid Build Coastguard Worker
147*6dbdd20aSAndroid Build Coastguard Worker // Append the received data at the end of the ring buffer.
148*6dbdd20aSAndroid Build Coastguard Worker uint8_t* buf = static_cast<uint8_t*>(buf_.Get());
149*6dbdd20aSAndroid Build Coastguard Worker memcpy(&buf[wr_], data, data_len);
150*6dbdd20aSAndroid Build Coastguard Worker wr_ += data_len;
151*6dbdd20aSAndroid Build Coastguard Worker }
152*6dbdd20aSAndroid Build Coastguard Worker
ReadMessage()153*6dbdd20aSAndroid Build Coastguard Worker RingBufferMessageReader::Message RingBufferMessageReader::ReadMessage() {
154*6dbdd20aSAndroid Build Coastguard Worker if (failed_)
155*6dbdd20aSAndroid Build Coastguard Worker return FramingError();
156*6dbdd20aSAndroid Build Coastguard Worker
157*6dbdd20aSAndroid Build Coastguard Worker if (fastpath_.valid()) {
158*6dbdd20aSAndroid Build Coastguard Worker // The fastpath can only be hit when the buffer is empty.
159*6dbdd20aSAndroid Build Coastguard Worker PERFETTO_CHECK(rd_ == wr_);
160*6dbdd20aSAndroid Build Coastguard Worker auto msg = std::move(fastpath_);
161*6dbdd20aSAndroid Build Coastguard Worker fastpath_ = Message{};
162*6dbdd20aSAndroid Build Coastguard Worker return msg;
163*6dbdd20aSAndroid Build Coastguard Worker }
164*6dbdd20aSAndroid Build Coastguard Worker
165*6dbdd20aSAndroid Build Coastguard Worker uint8_t* buf = static_cast<uint8_t*>(buf_.Get());
166*6dbdd20aSAndroid Build Coastguard Worker
167*6dbdd20aSAndroid Build Coastguard Worker PERFETTO_DCHECK(rd_ <= wr_);
168*6dbdd20aSAndroid Build Coastguard Worker if (rd_ >= wr_)
169*6dbdd20aSAndroid Build Coastguard Worker return Message{}; // Completely empty.
170*6dbdd20aSAndroid Build Coastguard Worker
171*6dbdd20aSAndroid Build Coastguard Worker auto msg = TryReadMessage(&buf[rd_], &buf[wr_]);
172*6dbdd20aSAndroid Build Coastguard Worker if (!msg.valid()) {
173*6dbdd20aSAndroid Build Coastguard Worker failed_ = failed_ || msg.fatal_framing_error;
174*6dbdd20aSAndroid Build Coastguard Worker return msg; // Return |msg| because it could be a framing error.
175*6dbdd20aSAndroid Build Coastguard Worker }
176*6dbdd20aSAndroid Build Coastguard Worker
177*6dbdd20aSAndroid Build Coastguard Worker const uint8_t* msg_end = msg.start + msg.len;
178*6dbdd20aSAndroid Build Coastguard Worker PERFETTO_CHECK(msg_end > &buf[rd_] && msg_end <= &buf[wr_]);
179*6dbdd20aSAndroid Build Coastguard Worker auto msg_outer_len = static_cast<size_t>(msg_end - &buf[rd_]);
180*6dbdd20aSAndroid Build Coastguard Worker rd_ += msg_outer_len;
181*6dbdd20aSAndroid Build Coastguard Worker return msg;
182*6dbdd20aSAndroid Build Coastguard Worker }
183*6dbdd20aSAndroid Build Coastguard Worker
184*6dbdd20aSAndroid Build Coastguard Worker ProtoRingBuffer::ProtoRingBuffer() = default;
185*6dbdd20aSAndroid Build Coastguard Worker ProtoRingBuffer::~ProtoRingBuffer() = default;
186*6dbdd20aSAndroid Build Coastguard Worker
TryReadMessage(const uint8_t * start,const uint8_t * end)187*6dbdd20aSAndroid Build Coastguard Worker ProtoRingBuffer::Message ProtoRingBuffer::TryReadMessage(const uint8_t* start,
188*6dbdd20aSAndroid Build Coastguard Worker const uint8_t* end) {
189*6dbdd20aSAndroid Build Coastguard Worker return TryReadProtoMessage(start, end);
190*6dbdd20aSAndroid Build Coastguard Worker }
191*6dbdd20aSAndroid Build Coastguard Worker
192*6dbdd20aSAndroid Build Coastguard Worker } // namespace protozero
193