1 // 2 // 3 // Copyright 2018 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 // 17 // 18 19 #ifndef GRPCPP_SUPPORT_PROTO_BUFFER_WRITER_H 20 #define GRPCPP_SUPPORT_PROTO_BUFFER_WRITER_H 21 22 #include <type_traits> 23 24 #include "absl/strings/cord.h" 25 26 #include <grpc/byte_buffer.h> 27 #include <grpc/impl/grpc_types.h> 28 #include <grpc/slice.h> 29 #include <grpc/slice_buffer.h> 30 #include <grpc/support/log.h> 31 #include <grpcpp/impl/codegen/config_protobuf.h> 32 #include <grpcpp/impl/serialization_traits.h> 33 #include <grpcpp/support/byte_buffer.h> 34 #include <grpcpp/support/status.h> 35 36 /// This header provides an object that writes bytes directly into a 37 /// grpc::ByteBuffer, via the ZeroCopyOutputStream interface 38 39 namespace grpc { 40 41 // Forward declaration for testing use only 42 namespace internal { 43 class ProtoBufferWriterPeer; 44 } // namespace internal 45 46 const int kProtoBufferWriterMaxBufferLength = 1024 * 1024; 47 48 /// This is a specialization of the protobuf class ZeroCopyOutputStream. 49 /// The principle is to give the proto layer one buffer of bytes at a time 50 /// that it can use to serialize the next portion of the message, with the 51 /// option to "backup" if more buffer is given than required at the last buffer. 52 /// 53 /// Read more about ZeroCopyOutputStream interface here: 54 /// https://developers.google.com/protocol-buffers/docs/reference/cpp/google.protobuf.io.zero_copy_stream#ZeroCopyOutputStream 55 class ProtoBufferWriter : public grpc::protobuf::io::ZeroCopyOutputStream { 56 public: 57 /// Constructor for this derived class 58 /// 59 /// \param[out] byte_buffer A pointer to the grpc::ByteBuffer created 60 /// \param block_size How big are the chunks to allocate at a time 61 /// \param total_size How many total bytes are required for this proto ProtoBufferWriter(ByteBuffer * byte_buffer,int block_size,int total_size)62 ProtoBufferWriter(ByteBuffer* byte_buffer, int block_size, int total_size) 63 : block_size_(block_size), 64 total_size_(total_size), 65 byte_count_(0), 66 have_backup_(false) { 67 GPR_ASSERT(!byte_buffer->Valid()); 68 /// Create an empty raw byte buffer and look at its underlying slice buffer 69 grpc_byte_buffer* bp = grpc_raw_byte_buffer_create(nullptr, 0); 70 byte_buffer->set_buffer(bp); 71 slice_buffer_ = &bp->data.raw.slice_buffer; 72 } 73 ~ProtoBufferWriter()74 ~ProtoBufferWriter() override { 75 if (have_backup_) { 76 grpc_slice_unref(backup_slice_); 77 } 78 } 79 80 /// Give the proto library the next buffer of bytes and its size. It is 81 /// safe for the caller to write from data[0, size - 1]. Next(void ** data,int * size)82 bool Next(void** data, int* size) override { 83 // Protobuf should not ask for more memory than total_size_. 84 GPR_ASSERT(byte_count_ < total_size_); 85 // 1. Use the remaining backup slice if we have one 86 // 2. Otherwise allocate a slice, up to the remaining length needed 87 // or our maximum allocation size 88 // 3. Provide the slice start and size available 89 // 4. Add the slice being returned to the slice buffer 90 size_t remain = static_cast<size_t>(total_size_ - byte_count_); 91 if (have_backup_) { 92 /// If we have a backup slice, we should use it first 93 slice_ = backup_slice_; 94 have_backup_ = false; 95 if (GRPC_SLICE_LENGTH(slice_) > remain) { 96 GRPC_SLICE_SET_LENGTH(slice_, remain); 97 } 98 } else { 99 // When less than a whole block is needed, only allocate that much. 100 // But make sure the allocated slice is not inlined. 101 size_t allocate_length = 102 remain > static_cast<size_t>(block_size_) ? block_size_ : remain; 103 slice_ = grpc_slice_malloc(allocate_length > GRPC_SLICE_INLINED_SIZE 104 ? allocate_length 105 : GRPC_SLICE_INLINED_SIZE + 1); 106 } 107 *data = GRPC_SLICE_START_PTR(slice_); 108 // On win x64, int is only 32bit 109 GPR_ASSERT(GRPC_SLICE_LENGTH(slice_) <= INT_MAX); 110 byte_count_ += * size = static_cast<int>(GRPC_SLICE_LENGTH(slice_)); 111 // Using grpc_slice_buffer_add could modify slice_ and merge it with the 112 // previous slice. Therefore, use grpc_slice_buffer_add_indexed method to 113 // ensure the slice gets added at a separate index. It can then be kept 114 // around and popped later in the BackUp function. 115 grpc_slice_buffer_add_indexed(slice_buffer_, slice_); 116 return true; 117 } 118 119 /// Backup by \a count bytes because Next returned more bytes than needed 120 /// (only used in the last buffer). \a count must be less than or equal too 121 /// the last buffer returned from next. BackUp(int count)122 void BackUp(int count) override { 123 // count == 0 is invoked by ZeroCopyOutputStream users indicating that any 124 // potential buffer obtained through a previous call to Next() is final. 125 // ZeroCopyOutputStream implementations such as streaming output can use 126 // these calls to flush any temporary buffer and flush the output. The logic 127 // below is not robust against count == 0 invocations, so directly return. 128 if (count == 0) return; 129 130 /// 1. Remove the partially-used last slice from the slice buffer 131 /// 2. Split it into the needed (if any) and unneeded part 132 /// 3. Add the needed part back to the slice buffer 133 /// 4. Mark that we still have the remaining part (for later use/unref) 134 GPR_ASSERT(count <= static_cast<int>(GRPC_SLICE_LENGTH(slice_))); 135 grpc_slice_buffer_pop(slice_buffer_); 136 if (static_cast<size_t>(count) == GRPC_SLICE_LENGTH(slice_)) { 137 backup_slice_ = slice_; 138 } else { 139 backup_slice_ = 140 grpc_slice_split_tail(&slice_, GRPC_SLICE_LENGTH(slice_) - count); 141 grpc_slice_buffer_add(slice_buffer_, slice_); 142 } 143 // It's dangerous to keep an inlined grpc_slice as the backup slice, since 144 // on a following Next() call, a reference will be returned to this slice 145 // via GRPC_SLICE_START_PTR, which will not be an address held by 146 // slice_buffer_. 147 have_backup_ = backup_slice_.refcount != nullptr; 148 byte_count_ -= count; 149 } 150 151 /// Returns the total number of bytes written since this object was created. ByteCount()152 int64_t ByteCount() const override { return byte_count_; } 153 154 #ifdef GRPC_PROTOBUF_CORD_SUPPORT_ENABLED 155 /// Writes cord to the backing byte_buffer, sharing the memory between the 156 /// blocks of the cord, and the slices of the byte_buffer. 157 // (override is conditionally omitted here to support old Protobuf which 158 // doesn't have ReadCord method) 159 // NOLINTBEGIN(modernize-use-override, 160 // clang-diagnostic-inconsistent-missing-override) WriteCord(const absl::Cord & cord)161 virtual bool WriteCord(const absl::Cord& cord) 162 #if GOOGLE_PROTOBUF_VERSION >= 4022000 163 override 164 #endif 165 // NOLINTEND(modernize-use-override, 166 // clang-diagnostic-inconsistent-missing-override) 167 { 168 grpc_slice_buffer* buffer = slice_buffer(); 169 size_t cur = 0; 170 for (absl::string_view chunk : cord.Chunks()) { 171 // TODO(veblush): Revisit this 512 threadhold which could be smaller. 172 if (chunk.size() < 512) { 173 // If chunk is small enough, just copy it. 174 grpc_slice slice = 175 grpc_slice_from_copied_buffer(chunk.data(), chunk.size()); 176 grpc_slice_buffer_add(buffer, slice); 177 } else { 178 // If chunk is large, just use the pointer instead of copying. 179 // To make sure it's alive while being used, a subcord for chunk is 180 // created and attached to a grpc_slice instance. 181 absl::Cord* subcord = new absl::Cord(cord.Subcord(cur, chunk.size())); 182 grpc_slice slice = grpc_slice_new_with_user_data( 183 const_cast<uint8_t*>( 184 reinterpret_cast<const uint8_t*>(chunk.data())), 185 chunk.size(), [](void* p) { delete static_cast<absl::Cord*>(p); }, 186 subcord); 187 grpc_slice_buffer_add(buffer, slice); 188 } 189 cur += chunk.size(); 190 } 191 set_byte_count(ByteCount() + cur); 192 return true; 193 } 194 #endif // GRPC_PROTOBUF_CORD_SUPPORT_ENABLED 195 196 // These protected members are needed to support internal optimizations. 197 // they expose internal bits of grpc core that are NOT stable. If you have 198 // a use case needs to use one of these functions, please send an email to 199 // https://groups.google.com/forum/#!forum/grpc-io. 200 protected: slice_buffer()201 grpc_slice_buffer* slice_buffer() { return slice_buffer_; } set_byte_count(int64_t byte_count)202 void set_byte_count(int64_t byte_count) { byte_count_ = byte_count; } 203 204 private: 205 // friend for testing purposes only 206 friend class internal::ProtoBufferWriterPeer; 207 const int block_size_; ///< size to alloc for each new \a grpc_slice needed 208 const int total_size_; ///< byte size of proto being serialized 209 int64_t byte_count_; ///< bytes written since this object was created 210 grpc_slice_buffer* 211 slice_buffer_; ///< internal buffer of slices holding the serialized data 212 bool have_backup_; ///< if we are holding a backup slice or not 213 grpc_slice backup_slice_; ///< holds space we can still write to, if the 214 ///< caller has called BackUp 215 grpc_slice slice_; ///< current slice passed back to the caller 216 }; 217 218 } // namespace grpc 219 220 #endif // GRPCPP_SUPPORT_PROTO_BUFFER_WRITER_H 221