xref: /aosp_15_r20/external/grpc-grpc/include/grpcpp/support/proto_buffer_reader.h (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2018 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPCPP_SUPPORT_PROTO_BUFFER_READER_H
20 #define GRPCPP_SUPPORT_PROTO_BUFFER_READER_H
21 
22 #include <type_traits>
23 
24 #include "absl/strings/cord.h"
25 
26 #include <grpc/byte_buffer.h>
27 #include <grpc/byte_buffer_reader.h>
28 #include <grpc/impl/grpc_types.h>
29 #include <grpc/slice.h>
30 #include <grpc/support/log.h>
31 #include <grpcpp/impl/codegen/config_protobuf.h>
32 #include <grpcpp/impl/serialization_traits.h>
33 #include <grpcpp/support/byte_buffer.h>
34 #include <grpcpp/support/status.h>
35 
36 /// This header provides an object that reads bytes directly from a
37 /// grpc::ByteBuffer, via the ZeroCopyInputStream interface
38 
39 namespace grpc {
40 
41 /// This is a specialization of the protobuf class ZeroCopyInputStream
42 /// The principle is to get one chunk of data at a time from the proto layer,
43 /// with options to backup (re-see some bytes) or skip (forward past some bytes)
44 ///
45 /// Read more about ZeroCopyInputStream interface here:
46 /// https://developers.google.com/protocol-buffers/docs/reference/cpp/google.protobuf.io.zero_copy_stream#ZeroCopyInputStream
47 class ProtoBufferReader : public grpc::protobuf::io::ZeroCopyInputStream {
48  public:
49   /// Constructs buffer reader from \a buffer. Will set \a status() to non ok
50   /// if \a buffer is invalid (the internal buffer has not been initialized).
ProtoBufferReader(ByteBuffer * buffer)51   explicit ProtoBufferReader(ByteBuffer* buffer)
52       : byte_count_(0), backup_count_(0), status_() {
53     /// Implemented through a grpc_byte_buffer_reader which iterates
54     /// over the slices that make up a byte buffer
55     if (!buffer->Valid() ||
56         !grpc_byte_buffer_reader_init(&reader_, buffer->c_buffer())) {
57       status_ = Status(StatusCode::INTERNAL,
58                        "Couldn't initialize byte buffer reader");
59     }
60   }
61 
~ProtoBufferReader()62   ~ProtoBufferReader() override {
63     if (status_.ok()) {
64       grpc_byte_buffer_reader_destroy(&reader_);
65     }
66   }
67 
68   /// Give the proto library a chunk of data from the stream. The caller
69   /// may safely read from data[0, size - 1].
Next(const void ** data,int * size)70   bool Next(const void** data, int* size) override {
71     if (!status_.ok()) {
72       return false;
73     }
74     /// If we have backed up previously, we need to return the backed-up slice
75     if (backup_count_ > 0) {
76       *data = GRPC_SLICE_START_PTR(*slice_) + GRPC_SLICE_LENGTH(*slice_) -
77               backup_count_;
78       GPR_ASSERT(backup_count_ <= INT_MAX);
79       *size = static_cast<int>(backup_count_);
80       backup_count_ = 0;
81       return true;
82     }
83     /// Otherwise get the next slice from the byte buffer reader
84     if (!grpc_byte_buffer_reader_peek(&reader_, &slice_)) {
85       return false;
86     }
87     *data = GRPC_SLICE_START_PTR(*slice_);
88     // On win x64, int is only 32bit
89     GPR_ASSERT(GRPC_SLICE_LENGTH(*slice_) <= INT_MAX);
90     byte_count_ += * size = static_cast<int>(GRPC_SLICE_LENGTH(*slice_));
91     return true;
92   }
93 
94   /// Returns the status of the buffer reader.
status()95   Status status() const { return status_; }
96 
97   /// The proto library calls this to indicate that we should back up \a count
98   /// bytes that have already been returned by the last call of Next.
99   /// So do the backup and have that ready for a later Next.
BackUp(int count)100   void BackUp(int count) override {
101     GPR_ASSERT(count <= static_cast<int>(GRPC_SLICE_LENGTH(*slice_)));
102     backup_count_ = count;
103   }
104 
105   /// The proto library calls this to skip over \a count bytes. Implement this
106   /// using Next and BackUp combined.
Skip(int count)107   bool Skip(int count) override {
108     const void* data;
109     int size;
110     while (Next(&data, &size)) {
111       if (size >= count) {
112         BackUp(size - count);
113         return true;
114       }
115       // size < count;
116       count -= size;
117     }
118     // error or we have too large count;
119     return false;
120   }
121 
122   /// Returns the total number of bytes read since this object was created.
ByteCount()123   int64_t ByteCount() const override { return byte_count_ - backup_count_; }
124 
125 #ifdef GRPC_PROTOBUF_CORD_SUPPORT_ENABLED
126   /// Read the next `count` bytes and append it to the given Cord.
127   // (override is conditionally omitted here to support old Protobuf which
128   //  doesn't have ReadCord method)
129   // NOLINTBEGIN(modernize-use-override,
130   // clang-diagnostic-inconsistent-missing-override)
ReadCord(absl::Cord * cord,int count)131   virtual bool ReadCord(absl::Cord* cord, int count)
132 #if GOOGLE_PROTOBUF_VERSION >= 4022000
133       override
134 #endif
135   // NOLINTEND(modernize-use-override,
136   // clang-diagnostic-inconsistent-missing-override)
137   {
138     if (!status().ok()) {
139       return false;
140     }
141     // check for backed up data
142     if (backup_count() > 0) {
143       if (backup_count() <= count) {
144         cord->Append(MakeCordFromSlice(grpc_slice_split_tail(
145             slice(), GRPC_SLICE_LENGTH(*slice()) - backup_count())));
146       } else {
147         cord->Append(MakeCordFromSlice(grpc_slice_sub(
148             *slice(), GRPC_SLICE_LENGTH(*slice()) - backup_count(),
149             GRPC_SLICE_LENGTH(*slice()) - backup_count() + count)));
150       }
151       int64_t take = (std::min)(backup_count(), static_cast<int64_t>(count));
152       set_backup_count(backup_count() - take);
153       // This cast is safe as the size of a serialized protobuf message
154       // should be smaller than 2GiB.
155       // (https://protobuf.dev/programming-guides/encoding/#size-limit)
156       count -= static_cast<int>(take);
157       if (count == 0) {
158         return true;
159       }
160     }
161     while (count > 0) {
162       if (!grpc_byte_buffer_reader_peek(reader(), mutable_slice_ptr())) {
163         return false;
164       }
165       uint64_t slice_length = GRPC_SLICE_LENGTH(*slice());
166       set_byte_count(ByteCount() + slice_length);
167       if (slice_length <= static_cast<uint64_t>(count)) {
168         cord->Append(MakeCordFromSlice(grpc_slice_ref(*slice())));
169         // This cast is safe as above.
170         count -= static_cast<int>(slice_length);
171       } else {
172         cord->Append(MakeCordFromSlice(grpc_slice_split_head(slice(), count)));
173         set_backup_count(slice_length - count);
174         return true;
175       }
176     }
177     GPR_ASSERT(count == 0);
178     return true;
179   }
180 #endif  // GRPC_PROTOBUF_CORD_SUPPORT_ENABLED
181 
182   // These protected members are needed to support internal optimizations.
183   // they expose internal bits of grpc core that are NOT stable. If you have
184   // a use case needs to use one of these functions, please send an email to
185   // https://groups.google.com/forum/#!forum/grpc-io.
186  protected:
set_byte_count(int64_t byte_count)187   void set_byte_count(int64_t byte_count) { byte_count_ = byte_count; }
backup_count()188   int64_t backup_count() { return backup_count_; }
set_backup_count(int64_t backup_count)189   void set_backup_count(int64_t backup_count) { backup_count_ = backup_count; }
reader()190   grpc_byte_buffer_reader* reader() { return &reader_; }
slice()191   grpc_slice* slice() { return slice_; }
mutable_slice_ptr()192   grpc_slice** mutable_slice_ptr() { return &slice_; }
193 
194  private:
195 #ifdef GRPC_PROTOBUF_CORD_SUPPORT_ENABLED
196   // This function takes ownership of slice and return a newly created Cord off
197   // of it.
MakeCordFromSlice(grpc_slice slice)198   static absl::Cord MakeCordFromSlice(grpc_slice slice) {
199     // slice_for_cord is created to keep inlined data of the given slice
200     grpc_slice* slice_for_cord = new grpc_slice;
201     *slice_for_cord = slice;
202     return absl::MakeCordFromExternal(
203         absl::string_view(
204             reinterpret_cast<char*>(GRPC_SLICE_START_PTR(*slice_for_cord)),
205             GRPC_SLICE_LENGTH(*slice_for_cord)),
206         [slice_for_cord](absl::string_view /* view */) {
207           grpc_slice_unref(*slice_for_cord);
208           delete slice_for_cord;
209         });
210   }
211 #endif  // GRPC_PROTOBUF_CORD_SUPPORT_ENABLED
212 
213   int64_t byte_count_;              ///< total bytes read since object creation
214   int64_t backup_count_;            ///< how far backed up in the stream we are
215   grpc_byte_buffer_reader reader_;  ///< internal object to read \a grpc_slice
216                                     ///< from the \a grpc_byte_buffer
217   grpc_slice* slice_;               ///< current slice passed back to the caller
218   Status status_;                   ///< status of the entire object
219 };
220 
221 }  // namespace grpc
222 
223 #endif  // GRPCPP_SUPPORT_PROTO_BUFFER_READER_H
224