1 /*
2  * Copyright (C) 2021 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef INCLUDE_PERFETTO_EXT_PROTOZERO_PROTO_RING_BUFFER_H_
18 #define INCLUDE_PERFETTO_EXT_PROTOZERO_PROTO_RING_BUFFER_H_
19 
20 #include <stdint.h>
21 
22 #include "perfetto/ext/base/paged_memory.h"
23 
24 namespace protozero {
25 
26 // This class buffers and tokenizes proto messages.
27 //
28 // From a logical level, it works with a sequence of protos like this.
29 // [ header 1 ] [ payload 1   ]
30 // [ header 2 ] [ payload 2  ]
31 // [ header 3 ] [ payload 3     ]
32 // Where [ header ] is a variable-length sequence of:
33 // [ Field ID = 1, type = length-delimited] [ length (varint) ].
34 //
35 // The input to this class is byte-oriented, not message-oriented (like a TCP
36 // stream or a pipe). The caller is not required to respect the boundaries of
37 // each message; only guarantee that data is not lost or duplicated. The
38 // following sequence of inbound events is possible:
39 // 1. [ hdr 1 (incomplete) ... ]
40 // 2. [ ... hdr 1 ] [ payload 1 ] [ hdr 2 ] [ payoad 2 ] [ hdr 3 ] [ pay... ]
41 // 3. [ ...load 3 ]
42 //
43 // This class maintains inbound requests in a ring buffer.
44 // The expected usage is:
45 // ring_buf.Append(data, len);
46 // for (;;) {
47 //   auto msg = ring_buf.ReadMessage();
48 //   if (!msg.valid())
49 //     break;
50 //   Decode(msg);
51 // }
52 //
53 // After each call to Append, the caller is expected to call ReadMessage() until
54 // it returns an invalid message (signalling no more messages could be decoded).
55 // Note that a single Append can "unblock" > 1 messages, which is why the caller
56 // needs to keep calling ReadMessage in a loop.
57 //
58 // Internal architecture
59 // ---------------------
60 // Internally this is similar to a ring-buffer, with the caveat that it never
61 // wraps, it only expands. Expansions are rare. The deal is that in most cases
62 // the read cursor follows very closely the write cursor. For instance, if the
63 // underlying transport behaves as a dgram socket, after each Append, the read
64 // cursor will chase completely the write cursor. Even if the underlying stream
65 // is not always atomic, the expectation is that the read cursor will eventually
66 // reach the write one within few messages.
67 // A visual example, imagine we have four messages: 2it 4will 2be 4fine
68 // Visually:
69 //
70 // Append("2it4wi"): A message and a bit:
71 // [ 2it 4wi                     ]
72 // ^R       ^W
73 //
74 // After the ReadMessage(), the 1st message will be read, but not the 2nd.
75 // [ 2it 4wi                     ]
76 //      ^R ^W
77 //
78 // Append("ll2be4f")
79 // [ 2it 4will 2be 4f            ]
80 //      ^R           ^W
81 //
82 // After the ReadMessage() loop:
83 // [ 2it 4will 2be 4f            ]
84 //                ^R ^W
85 // Append("ine")
86 // [ 2it 4will 2be 4fine         ]
87 //                ^R    ^W
88 //
89 // In the next ReadMessage() the R cursor will chase the W cursor. When this
90 // happens (very frequent) we can just reset both cursors to 0 and restart.
91 // If we are unlucky and get to the end of the buffer, two things happen:
92 // 1. We try first to recompact the buffer, moving everything left by R.
93 // 2. If still there isn't enough space, we expand the buffer.
94 // Given that each message is expected to be at most kMaxMsgSize (64 MB), the
95 // expansion is bound at 2 * kMaxMsgSize.
96 
97 class RingBufferMessageReader {
98  public:
99   static constexpr size_t kMaxMsgSize = 64 * 1024 * 1024;
100   struct Message {
101     const uint8_t* start = nullptr;
102     uint32_t len = 0;
103     uint32_t field_id = 0;
104     bool fatal_framing_error = false;
endMessage105     const uint8_t* end() const { return start + len; }
validMessage106     inline bool valid() const { return !!start; }
107   };
108 
109   RingBufferMessageReader();
110   virtual ~RingBufferMessageReader();
111   RingBufferMessageReader(const RingBufferMessageReader&) = delete;
112   RingBufferMessageReader& operator=(const RingBufferMessageReader&) = delete;
113 
114   // Appends data into the ring buffer, recompacting or resizing it if needed.
115   // Will invaildate the pointers previously handed out.
116   void Append(const void* data, size_t len);
117 
118   // If a message can be read, it returns the boundaries of the message
119   // (without including the preamble) and advances the read cursor.
120   // If no message is available, returns a null range.
121   // The returned pointer is only valid until the next call to Append(), as
122   // that can recompact or resize the underlying buffer.
123   Message ReadMessage();
124 
125   // Exposed for testing.
capacity()126   size_t capacity() const { return buf_.size(); }
avail()127   size_t avail() const { return buf_.size() - (wr_ - rd_); }
128 
129  protected:
130   // Subclasses must implement the header parsing.
131   virtual Message TryReadMessage(const uint8_t* start, const uint8_t* end) = 0;
132 
133  private:
134   perfetto::base::PagedMemory buf_;
135   Message fastpath_{};
136   bool failed_ = false;  // Set in case of an unrecoverable framing faiulre.
137   size_t rd_ = 0;        // Offset of the read cursor in |buf_|.
138   size_t wr_ = 0;        // Offset of the write cursor in |buf_|.
139 };
140 
141 class ProtoRingBuffer final : public RingBufferMessageReader {
142  public:
143   ProtoRingBuffer();
144   ~ProtoRingBuffer() override final;
145 
146  protected:
147   Message TryReadMessage(const uint8_t* start,
148                          const uint8_t* end) override final;
149 };
150 
151 }  // namespace protozero
152 
153 #endif  // INCLUDE_PERFETTO_EXT_PROTOZERO_PROTO_RING_BUFFER_H_
154