xref: /aosp_15_r20/external/grpc-grpc/include/grpcpp/support/proto_buffer_writer.h (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2018 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPCPP_SUPPORT_PROTO_BUFFER_WRITER_H
20 #define GRPCPP_SUPPORT_PROTO_BUFFER_WRITER_H
21 
22 #include <type_traits>
23 
24 #include "absl/strings/cord.h"
25 
26 #include <grpc/byte_buffer.h>
27 #include <grpc/impl/grpc_types.h>
28 #include <grpc/slice.h>
29 #include <grpc/slice_buffer.h>
30 #include <grpc/support/log.h>
31 #include <grpcpp/impl/codegen/config_protobuf.h>
32 #include <grpcpp/impl/serialization_traits.h>
33 #include <grpcpp/support/byte_buffer.h>
34 #include <grpcpp/support/status.h>
35 
36 /// This header provides an object that writes bytes directly into a
37 /// grpc::ByteBuffer, via the ZeroCopyOutputStream interface
38 
39 namespace grpc {
40 
41 // Forward declaration for testing use only
42 namespace internal {
43 class ProtoBufferWriterPeer;
44 }  // namespace internal
45 
46 const int kProtoBufferWriterMaxBufferLength = 1024 * 1024;
47 
48 /// This is a specialization of the protobuf class ZeroCopyOutputStream.
49 /// The principle is to give the proto layer one buffer of bytes at a time
50 /// that it can use to serialize the next portion of the message, with the
51 /// option to "backup" if more buffer is given than required at the last buffer.
52 ///
53 /// Read more about ZeroCopyOutputStream interface here:
54 /// https://developers.google.com/protocol-buffers/docs/reference/cpp/google.protobuf.io.zero_copy_stream#ZeroCopyOutputStream
55 class ProtoBufferWriter : public grpc::protobuf::io::ZeroCopyOutputStream {
56  public:
57   /// Constructor for this derived class
58   ///
59   /// \param[out] byte_buffer A pointer to the grpc::ByteBuffer created
60   /// \param block_size How big are the chunks to allocate at a time
61   /// \param total_size How many total bytes are required for this proto
ProtoBufferWriter(ByteBuffer * byte_buffer,int block_size,int total_size)62   ProtoBufferWriter(ByteBuffer* byte_buffer, int block_size, int total_size)
63       : block_size_(block_size),
64         total_size_(total_size),
65         byte_count_(0),
66         have_backup_(false) {
67     GPR_ASSERT(!byte_buffer->Valid());
68     /// Create an empty raw byte buffer and look at its underlying slice buffer
69     grpc_byte_buffer* bp = grpc_raw_byte_buffer_create(nullptr, 0);
70     byte_buffer->set_buffer(bp);
71     slice_buffer_ = &bp->data.raw.slice_buffer;
72   }
73 
~ProtoBufferWriter()74   ~ProtoBufferWriter() override {
75     if (have_backup_) {
76       grpc_slice_unref(backup_slice_);
77     }
78   }
79 
80   /// Give the proto library the next buffer of bytes and its size. It is
81   /// safe for the caller to write from data[0, size - 1].
Next(void ** data,int * size)82   bool Next(void** data, int* size) override {
83     // Protobuf should not ask for more memory than total_size_.
84     GPR_ASSERT(byte_count_ < total_size_);
85     // 1. Use the remaining backup slice if we have one
86     // 2. Otherwise allocate a slice, up to the remaining length needed
87     //    or our maximum allocation size
88     // 3. Provide the slice start and size available
89     // 4. Add the slice being returned to the slice buffer
90     size_t remain = static_cast<size_t>(total_size_ - byte_count_);
91     if (have_backup_) {
92       /// If we have a backup slice, we should use it first
93       slice_ = backup_slice_;
94       have_backup_ = false;
95       if (GRPC_SLICE_LENGTH(slice_) > remain) {
96         GRPC_SLICE_SET_LENGTH(slice_, remain);
97       }
98     } else {
99       // When less than a whole block is needed, only allocate that much.
100       // But make sure the allocated slice is not inlined.
101       size_t allocate_length =
102           remain > static_cast<size_t>(block_size_) ? block_size_ : remain;
103       slice_ = grpc_slice_malloc(allocate_length > GRPC_SLICE_INLINED_SIZE
104                                      ? allocate_length
105                                      : GRPC_SLICE_INLINED_SIZE + 1);
106     }
107     *data = GRPC_SLICE_START_PTR(slice_);
108     // On win x64, int is only 32bit
109     GPR_ASSERT(GRPC_SLICE_LENGTH(slice_) <= INT_MAX);
110     byte_count_ += * size = static_cast<int>(GRPC_SLICE_LENGTH(slice_));
111     // Using grpc_slice_buffer_add could modify slice_ and merge it with the
112     // previous slice. Therefore, use grpc_slice_buffer_add_indexed method to
113     // ensure the slice gets added at a separate index. It can then be kept
114     // around and popped later in the BackUp function.
115     grpc_slice_buffer_add_indexed(slice_buffer_, slice_);
116     return true;
117   }
118 
119   /// Backup by \a count bytes because Next returned more bytes than needed
120   /// (only used in the last buffer). \a count must be less than or equal too
121   /// the last buffer returned from next.
BackUp(int count)122   void BackUp(int count) override {
123     // count == 0 is invoked by ZeroCopyOutputStream users indicating that any
124     // potential buffer obtained through a previous call to Next() is final.
125     // ZeroCopyOutputStream implementations such as streaming output can use
126     // these calls to flush any temporary buffer and flush the output. The logic
127     // below is not robust against count == 0 invocations, so directly return.
128     if (count == 0) return;
129 
130     /// 1. Remove the partially-used last slice from the slice buffer
131     /// 2. Split it into the needed (if any) and unneeded part
132     /// 3. Add the needed part back to the slice buffer
133     /// 4. Mark that we still have the remaining part (for later use/unref)
134     GPR_ASSERT(count <= static_cast<int>(GRPC_SLICE_LENGTH(slice_)));
135     grpc_slice_buffer_pop(slice_buffer_);
136     if (static_cast<size_t>(count) == GRPC_SLICE_LENGTH(slice_)) {
137       backup_slice_ = slice_;
138     } else {
139       backup_slice_ =
140           grpc_slice_split_tail(&slice_, GRPC_SLICE_LENGTH(slice_) - count);
141       grpc_slice_buffer_add(slice_buffer_, slice_);
142     }
143     // It's dangerous to keep an inlined grpc_slice as the backup slice, since
144     // on a following Next() call, a reference will be returned to this slice
145     // via GRPC_SLICE_START_PTR, which will not be an address held by
146     // slice_buffer_.
147     have_backup_ = backup_slice_.refcount != nullptr;
148     byte_count_ -= count;
149   }
150 
151   /// Returns the total number of bytes written since this object was created.
ByteCount()152   int64_t ByteCount() const override { return byte_count_; }
153 
154 #ifdef GRPC_PROTOBUF_CORD_SUPPORT_ENABLED
155   /// Writes cord to the backing byte_buffer, sharing the memory between the
156   /// blocks of the cord, and the slices of the byte_buffer.
157   // (override is conditionally omitted here to support old Protobuf which
158   //  doesn't have ReadCord method)
159   // NOLINTBEGIN(modernize-use-override,
160   // clang-diagnostic-inconsistent-missing-override)
WriteCord(const absl::Cord & cord)161   virtual bool WriteCord(const absl::Cord& cord)
162 #if GOOGLE_PROTOBUF_VERSION >= 4022000
163       override
164 #endif
165   // NOLINTEND(modernize-use-override,
166   // clang-diagnostic-inconsistent-missing-override)
167   {
168     grpc_slice_buffer* buffer = slice_buffer();
169     size_t cur = 0;
170     for (absl::string_view chunk : cord.Chunks()) {
171       // TODO(veblush): Revisit this 512 threadhold which could be smaller.
172       if (chunk.size() < 512) {
173         // If chunk is small enough, just copy it.
174         grpc_slice slice =
175             grpc_slice_from_copied_buffer(chunk.data(), chunk.size());
176         grpc_slice_buffer_add(buffer, slice);
177       } else {
178         // If chunk is large, just use the pointer instead of copying.
179         // To make sure it's alive while being used, a subcord for chunk is
180         // created and attached to a grpc_slice instance.
181         absl::Cord* subcord = new absl::Cord(cord.Subcord(cur, chunk.size()));
182         grpc_slice slice = grpc_slice_new_with_user_data(
183             const_cast<uint8_t*>(
184                 reinterpret_cast<const uint8_t*>(chunk.data())),
185             chunk.size(), [](void* p) { delete static_cast<absl::Cord*>(p); },
186             subcord);
187         grpc_slice_buffer_add(buffer, slice);
188       }
189       cur += chunk.size();
190     }
191     set_byte_count(ByteCount() + cur);
192     return true;
193   }
194 #endif  // GRPC_PROTOBUF_CORD_SUPPORT_ENABLED
195 
196   // These protected members are needed to support internal optimizations.
197   // they expose internal bits of grpc core that are NOT stable. If you have
198   // a use case needs to use one of these functions, please send an email to
199   // https://groups.google.com/forum/#!forum/grpc-io.
200  protected:
slice_buffer()201   grpc_slice_buffer* slice_buffer() { return slice_buffer_; }
set_byte_count(int64_t byte_count)202   void set_byte_count(int64_t byte_count) { byte_count_ = byte_count; }
203 
204  private:
205   // friend for testing purposes only
206   friend class internal::ProtoBufferWriterPeer;
207   const int block_size_;  ///< size to alloc for each new \a grpc_slice needed
208   const int total_size_;  ///< byte size of proto being serialized
209   int64_t byte_count_;    ///< bytes written since this object was created
210   grpc_slice_buffer*
211       slice_buffer_;  ///< internal buffer of slices holding the serialized data
212   bool have_backup_;  ///< if we are holding a backup slice or not
213   grpc_slice backup_slice_;  ///< holds space we can still write to, if the
214                              ///< caller has called BackUp
215   grpc_slice slice_;         ///< current slice passed back to the caller
216 };
217 
218 }  // namespace grpc
219 
220 #endif  // GRPCPP_SUPPORT_PROTO_BUFFER_WRITER_H
221