1 /*
2 * Copyright 2021 Google LLC
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "fcp/client/http/in_memory_request_response.h"
17
18 #include <cstdint>
19 #include <cstring>
20 #include <functional>
21 #include <memory>
22 #include <optional>
23 #include <string>
24 #include <utility>
25 #include <vector>
26
27 #include "absl/status/status.h"
28 #include "absl/status/statusor.h"
29 #include "absl/strings/ascii.h"
30 #include "absl/strings/cord.h"
31 #include "absl/strings/match.h"
32 #include "absl/strings/numbers.h"
33 #include "absl/strings/str_cat.h"
34 #include "absl/strings/string_view.h"
35 #include "absl/synchronization/mutex.h"
36 #include "fcp/base/monitoring.h"
37 #include "fcp/client/cache/resource_cache.h"
38 #include "fcp/client/http/http_client.h"
39 #include "fcp/client/http/http_client_util.h"
40 #include "fcp/client/http/http_resource_metadata.pb.h"
41 #include "fcp/client/interruptible_runner.h"
42 #include "google/protobuf/io/gzip_stream.h"
43 #include "google/protobuf/io/zero_copy_stream_impl_lite.h"
44
45 namespace fcp {
46 namespace client {
47 namespace http {
48 namespace {
49
50 // Returns the resource from the cache, or NOT_FOUND if it was not in the cache.
51 // If the resource was compressed, it will be decompressed.
TryGetResourceFromCache(absl::string_view client_cache_id,const std::optional<absl::Duration> & max_age,cache::ResourceCache & resource_cache)52 absl::StatusOr<absl::Cord> TryGetResourceFromCache(
53 absl::string_view client_cache_id,
54 const std::optional<absl::Duration>& max_age,
55 cache::ResourceCache& resource_cache) {
56 FCP_ASSIGN_OR_RETURN(
57 cache::ResourceCache::ResourceAndMetadata cached_resource_and_metadata,
58 resource_cache.Get(client_cache_id, max_age));
59 HttpResourceMetadata metadata;
60 if (!cached_resource_and_metadata.metadata.UnpackTo(&metadata)) {
61 return absl::InternalError("Failed to unpack metadata!");
62 }
63 absl::Cord cached_resource = cached_resource_and_metadata.resource;
64 if (metadata.compression_format() ==
65 ResourceCompressionFormat::RESOURCE_COMPRESSION_FORMAT_GZIP) {
66 FCP_ASSIGN_OR_RETURN(cached_resource, internal::UncompressWithGzip(
67 std::string(cached_resource)));
68 }
69 return cached_resource;
70 }
71
TryPutResourceInCache(absl::string_view client_cache_id,const absl::Cord & response_body,bool response_encoded_with_gzip,absl::Duration max_age,cache::ResourceCache & resource_cache)72 absl::Status TryPutResourceInCache(absl::string_view client_cache_id,
73 const absl::Cord& response_body,
74 bool response_encoded_with_gzip,
75 absl::Duration max_age,
76 cache::ResourceCache& resource_cache) {
77 // We fetched a resource that has a client_cache_id and was not
78 // loaded from the cache, put it in the cache.
79 HttpResourceMetadata metadata;
80 if (response_encoded_with_gzip) {
81 metadata.set_compression_format(
82 ResourceCompressionFormat::RESOURCE_COMPRESSION_FORMAT_GZIP);
83 } else {
84 metadata.set_compression_format(
85 ResourceCompressionFormat::RESOURCE_COMPRESSION_FORMAT_UNSPECIFIED);
86 }
87 google::protobuf::Any metadata_wrapper;
88 metadata_wrapper.PackFrom(metadata);
89 return resource_cache.Put(client_cache_id, response_body, metadata_wrapper,
90 max_age);
91 }
92
93 } // namespace
94
95 using ::google::protobuf::io::ArrayInputStream;
96 using ::google::protobuf::io::GzipInputStream;
97 using ::google::protobuf::io::GzipOutputStream;
98 using ::google::protobuf::io::StringOutputStream;
99
100 using CompressionFormat =
101 ::fcp::client::http::UriOrInlineData::InlineData::CompressionFormat;
102
103 static constexpr char kOctetStream[] = "application/octet-stream";
104 constexpr absl::string_view kClientDecodedGzipSuffix = "+gzip";
105
Create(absl::string_view uri,Method method,HeaderList extra_headers,std::string body,bool use_compression)106 absl::StatusOr<std::unique_ptr<HttpRequest>> InMemoryHttpRequest::Create(
107 absl::string_view uri, Method method, HeaderList extra_headers,
108 std::string body, bool use_compression) {
109 // Allow http://localhost:xxxx as an exception to the https-only policy,
110 // so that we can use a local http test server.
111 if (!absl::StartsWithIgnoreCase(uri, kHttpsScheme) &&
112 !absl::StartsWithIgnoreCase(uri, kLocalhostUri)) {
113 return absl::InvalidArgumentError(
114 absl::StrCat("Non-HTTPS URIs are not supported: ", uri));
115 }
116 if (use_compression) {
117 FCP_ASSIGN_OR_RETURN(body, internal::CompressWithGzip(body));
118 extra_headers.push_back({kContentEncodingHdr, kGzipEncodingHdrValue});
119 }
120 std::optional<std::string> content_length_hdr =
121 FindHeader(extra_headers, kContentLengthHdr);
122 if (content_length_hdr.has_value()) {
123 return absl::InvalidArgumentError(
124 "Content-Length header should not be provided!");
125 }
126
127 if (!body.empty()) {
128 switch (method) {
129 case HttpRequest::Method::kPost:
130 case HttpRequest::Method::kPatch:
131 case HttpRequest::Method::kPut:
132 case HttpRequest::Method::kDelete:
133 break;
134 default:
135 return absl::InvalidArgumentError(absl::StrCat(
136 "Request method does not allow request body: ", method));
137 }
138 // Add a Content-Length header, but only if there's a request body.
139 extra_headers.push_back({kContentLengthHdr, std::to_string(body.size())});
140 }
141
142 return absl::WrapUnique(new InMemoryHttpRequest(
143 uri, method, std::move(extra_headers), std::move(body)));
144 }
145
ReadBody(char * buffer,int64_t requested)146 absl::StatusOr<int64_t> InMemoryHttpRequest::ReadBody(char* buffer,
147 int64_t requested) {
148 // This method is called from the HttpClient's thread (we don't really care
149 // which one). Hence, we use a mutex to ensure that subsequent calls to this
150 // method see the modifications to cursor_.
151 absl::WriterMutexLock _(&mutex_);
152
153 // Check whether there's any bytes left to read, and indicate the end has been
154 // reached if not.
155 int64_t bytes_left = body_.size() - cursor_;
156 if (bytes_left == 0) {
157 return absl::OutOfRangeError("End of stream reached");
158 }
159 FCP_CHECK(buffer != nullptr);
160 FCP_CHECK(requested > 0);
161 // Calculate how much data we can return, based on the size of `buffer`.
162 int64_t actual_read = bytes_left <= requested ? bytes_left : requested;
163 std::memcpy(buffer, body_.data() + cursor_, actual_read);
164 cursor_ += actual_read;
165 return actual_read;
166 }
167
OnResponseStarted(const HttpRequest & request,const HttpResponse & response)168 absl::Status InMemoryHttpRequestCallback::OnResponseStarted(
169 const HttpRequest& request, const HttpResponse& response) {
170 absl::WriterMutexLock _(&mutex_);
171 response_code_ = response.code();
172
173 std::optional<std::string> content_encoding_header =
174 FindHeader(response.headers(), kContentEncodingHdr);
175 if (content_encoding_header.has_value()) {
176 // We don't expect the response body to be "Content-Encoding" encoded,
177 // because the `HttpClient` is supposed to transparently handle the decoding
178 // for us (unless we specified a "Accept-Encoding" header in the request,
179 // which would indicate that we wanted to handle the response decoding).
180 if (!FindHeader(request.extra_headers(), kAcceptEncodingHdr).has_value()) {
181 // Note: technically, we should only receive Content-Encoding values that
182 // match the Accept-Encoding values provided in the request headers. The
183 // check above isn't quite that strict, but that's probably fine (since
184 // such issues should be rare, and can be handled farther up the stack).
185 status_ = absl::InvalidArgumentError(
186 absl::StrCat("Unexpected header: ", kContentEncodingHdr));
187 return status_;
188 }
189 content_encoding_ = *content_encoding_header;
190 }
191
192 content_type_ = FindHeader(response.headers(), kContentTypeHdr).value_or("");
193
194 // Similarly, we should under no circumstances receive a non-identity
195 // Transfer-Encoding header, since the `HttpClient` is unconditionally
196 // required to undo any such encoding for us.
197 std::optional<std::string> transfer_encoding_header =
198 FindHeader(response.headers(), kTransferEncodingHdr);
199 if (transfer_encoding_header.has_value() &&
200 absl::AsciiStrToLower(*transfer_encoding_header) !=
201 kIdentityEncodingHdrValue) {
202 status_ = absl::InvalidArgumentError(
203 absl::StrCat("Unexpected header: ", kTransferEncodingHdr));
204 return status_;
205 }
206
207 // If no Content-Length header is provided, this means that the server either
208 // didn't provide one and is streaming the response, or that the HttpClient
209 // implementation transparently decompressed the data for us and stripped the
210 // Content-Length header (as per the HttpClient contract).
211 std::optional<std::string> content_length_hdr =
212 FindHeader(response.headers(), kContentLengthHdr);
213 if (!content_length_hdr.has_value()) {
214 return absl::OkStatus();
215 }
216
217 // A Content-Length header available. Let's parse it so that we know how much
218 // data to expect.
219 int64_t content_length;
220 // Note that SimpleAtoi safely handles non-ASCII data.
221 if (!absl::SimpleAtoi(*content_length_hdr, &content_length)) {
222 status_ = absl::InvalidArgumentError(
223 "Could not parse Content-Length response header");
224 return status_;
225 }
226 if (content_length < 0) {
227 status_ = absl::OutOfRangeError(absl::StrCat(
228 "Invalid Content-Length response header: ", content_length));
229 return status_;
230 }
231 expected_content_length_ = content_length;
232
233 return absl::OkStatus();
234 }
235
OnResponseError(const HttpRequest & request,const absl::Status & error)236 void InMemoryHttpRequestCallback::OnResponseError(const HttpRequest& request,
237 const absl::Status& error) {
238 absl::WriterMutexLock _(&mutex_);
239 status_ = absl::Status(
240 error.code(), absl::StrCat("Error receiving response headers (error: ",
241 error.message(), ")"));
242 }
243
OnResponseBody(const HttpRequest & request,const HttpResponse & response,absl::string_view data)244 absl::Status InMemoryHttpRequestCallback::OnResponseBody(
245 const HttpRequest& request, const HttpResponse& response,
246 absl::string_view data) {
247 // This runs on a thread chosen by the HttpClient implementation (i.e. it
248 // could be our original thread, or a different one). Ensure that if
249 // subsequent callbacks occur on different threads each thread sees the
250 // previous threads' updates to response_buffer_.
251 absl::WriterMutexLock _(&mutex_);
252
253 // Ensure we're not receiving more data than expected.
254 if (expected_content_length_.has_value() &&
255 response_buffer_.size() + data.size() > *expected_content_length_) {
256 status_ = absl::OutOfRangeError(absl::StrCat(
257 "Too much response body data received (rcvd: ", response_buffer_.size(),
258 ", new: ", data.size(), ", max: ", *expected_content_length_, ")"));
259 return status_;
260 }
261
262 // Copy the data into the target buffer. Note that this means we'll always
263 // store the response body as a number of memory fragments (rather than a
264 // contiguous buffer). However, because HttpClient implementations are
265 // encouraged to return response data in fairly large chunks, we don't expect
266 // this too cause much overhead.
267 response_buffer_.Append(data);
268
269 return absl::OkStatus();
270 }
271
OnResponseBodyError(const HttpRequest & request,const HttpResponse & response,const absl::Status & error)272 void InMemoryHttpRequestCallback::OnResponseBodyError(
273 const HttpRequest& request, const HttpResponse& response,
274 const absl::Status& error) {
275 absl::WriterMutexLock _(&mutex_);
276 status_ = absl::Status(
277 error.code(),
278 absl::StrCat("Error receiving response body (response code: ",
279 response.code(), ", error: ", error.message(), ")"));
280 }
281
OnResponseCompleted(const HttpRequest & request,const HttpResponse & response)282 void InMemoryHttpRequestCallback::OnResponseCompleted(
283 const HttpRequest& request, const HttpResponse& response) {
284 // Once the body has been received correctly, turn the response code into a
285 // canonical code.
286 absl::WriterMutexLock _(&mutex_);
287 // Note: the case when too *much* response data is unexpectedly received is
288 // handled in OnResponseBody (while this handles the case of too little data).
289 if (expected_content_length_.has_value() &&
290 response_buffer_.size() != *expected_content_length_) {
291 status_ = absl::InvalidArgumentError(
292 absl::StrCat("Too little response body data received (rcvd: ",
293 response_buffer_.size(),
294 ", expected: ", *expected_content_length_, ")"));
295 return;
296 }
297
298 status_ = ConvertHttpCodeToStatus(*response_code_);
299 }
300
Response() const301 absl::StatusOr<InMemoryHttpResponse> InMemoryHttpRequestCallback::Response()
302 const {
303 absl::ReaderMutexLock _(&mutex_);
304 FCP_RETURN_IF_ERROR(status_);
305 // If status_ is OK, then response_code_ and response_headers_ are guaranteed
306 // to have values.
307
308 return InMemoryHttpResponse{*response_code_, content_encoding_, content_type_,
309 response_buffer_};
310 }
311
PerformRequestInMemory(HttpClient & http_client,InterruptibleRunner & interruptible_runner,std::unique_ptr<http::HttpRequest> request,int64_t * bytes_received_acc,int64_t * bytes_sent_acc)312 absl::StatusOr<InMemoryHttpResponse> PerformRequestInMemory(
313 HttpClient& http_client, InterruptibleRunner& interruptible_runner,
314 std::unique_ptr<http::HttpRequest> request, int64_t* bytes_received_acc,
315 int64_t* bytes_sent_acc) {
316 // Note: we must explicitly instantiate a vector here as opposed to passing an
317 // initializer list to PerformRequestsInMemory, because initializer lists do
318 // not support move-only values.
319 std::vector<std::unique_ptr<http::HttpRequest>> requests;
320 requests.push_back(std::move(request));
321 FCP_ASSIGN_OR_RETURN(
322 auto result, PerformMultipleRequestsInMemory(
323 http_client, interruptible_runner, std::move(requests),
324 bytes_received_acc, bytes_sent_acc));
325 return std::move(result[0]);
326 }
327
328 absl::StatusOr<std::vector<absl::StatusOr<InMemoryHttpResponse>>>
PerformMultipleRequestsInMemory(HttpClient & http_client,InterruptibleRunner & interruptible_runner,std::vector<std::unique_ptr<http::HttpRequest>> requests,int64_t * bytes_received_acc,int64_t * bytes_sent_acc)329 PerformMultipleRequestsInMemory(
330 HttpClient& http_client, InterruptibleRunner& interruptible_runner,
331 std::vector<std::unique_ptr<http::HttpRequest>> requests,
332 int64_t* bytes_received_acc, int64_t* bytes_sent_acc) {
333 // A vector that will own the request handles and callbacks (and will
334 // determine their lifetimes).
335 std::vector<std::pair<std::unique_ptr<HttpRequestHandle>,
336 std::unique_ptr<InMemoryHttpRequestCallback>>>
337 handles_and_callbacks;
338 handles_and_callbacks.reserve(requests.size());
339
340 // An accompanying vector that contains just the raw pointers, for passing to
341 // `HttpClient::PerformRequests`.
342 std::vector<std::pair<HttpRequestHandle*, HttpRequestCallback*>>
343 handles_and_callbacks_ptrs;
344 handles_and_callbacks_ptrs.reserve(requests.size());
345
346 // Enqueue each request, and create a simple callback for each request which
347 // will simply buffer the response body in-memory and allow us to consume that
348 // buffer once all requests have finished.
349 for (std::unique_ptr<HttpRequest>& request : requests) {
350 std::unique_ptr<HttpRequestHandle> handle =
351 http_client.EnqueueRequest(std::move(request));
352 auto callback = std::make_unique<InMemoryHttpRequestCallback>();
353 handles_and_callbacks_ptrs.push_back({handle.get(), callback.get()});
354 handles_and_callbacks.push_back({std::move(handle), std::move(callback)});
355 }
356
357 // Issue the requests in one call (allowing the HttpClient to issue them
358 // concurrently), in an interruptible fashion.
359 absl::Status result = interruptible_runner.Run(
360 [&http_client, &handles_and_callbacks_ptrs]() {
361 return http_client.PerformRequests(handles_and_callbacks_ptrs);
362 },
363 [&handles_and_callbacks_ptrs] {
364 // If we get aborted then call HttpRequestHandle::Cancel on all handles.
365 // This should result in the PerformRequests call returning early and
366 // InterruptibleRunner::Run returning CANCELLED.
367 for (auto [handle, callback] : handles_and_callbacks_ptrs) {
368 handle->Cancel();
369 }
370 });
371 // Update the network stats *before* we return (just in case a failed
372 // `PerformRequests` call caused some network traffic to have been sent
373 // anyway).
374 for (auto& [handle, callback] : handles_and_callbacks) {
375 HttpRequestHandle::SentReceivedBytes sent_received_bytes =
376 handle->TotalSentReceivedBytes();
377 if (bytes_received_acc != nullptr) {
378 *bytes_received_acc += sent_received_bytes.received_bytes;
379 }
380 if (bytes_sent_acc != nullptr) {
381 *bytes_sent_acc += sent_received_bytes.sent_bytes;
382 }
383 }
384
385 FCP_RETURN_IF_ERROR(result);
386
387 // Gather and return the results.
388 std::vector<absl::StatusOr<InMemoryHttpResponse>> results;
389 results.reserve(handles_and_callbacks.size());
390 for (auto& [handle, callback] : handles_and_callbacks) {
391 results.push_back(callback->Response());
392 }
393 return results;
394 }
395
396 absl::StatusOr<std::vector<absl::StatusOr<InMemoryHttpResponse>>>
FetchResourcesInMemory(HttpClient & http_client,InterruptibleRunner & interruptible_runner,const std::vector<UriOrInlineData> & resources,int64_t * bytes_received_acc,int64_t * bytes_sent_acc,cache::ResourceCache * resource_cache)397 FetchResourcesInMemory(HttpClient& http_client,
398 InterruptibleRunner& interruptible_runner,
399 const std::vector<UriOrInlineData>& resources,
400 int64_t* bytes_received_acc, int64_t* bytes_sent_acc,
401 cache::ResourceCache* resource_cache) {
402 // Each resource may have the data already available (by having been included
403 // in a prior response inline), or may need to be fetched.
404
405 // We'll create an 'accessor' for each resource, providing access to that
406 // resource's data by the end of this function (either the fetched data, or
407 // the inline data). Additionally, this struct will contain the
408 // client_cache_id and max_age for the resource if the resource should be put
409 // in the cache. If the resource should not be put in the cache,
410 // client_cache_id will be an empty string.
411 struct AccessorAndCacheMetadata {
412 std::function<absl::StatusOr<InMemoryHttpResponse>()> accessor;
413 std::string client_cache_id;
414 absl::Duration max_age;
415 };
416 std::vector<AccessorAndCacheMetadata> response_accessors;
417
418 // We'll compile HttpRequest instances for those resources that do need to be
419 // fetched, then we'll fire them off all at once, and then we'll gather their
420 // responses once all requests have finished.
421 std::vector<std::unique_ptr<http::HttpRequest>> http_requests;
422 std::vector<absl::StatusOr<InMemoryHttpResponse>> http_responses;
423 bool caching_enabled = resource_cache != nullptr;
424
425 for (const UriOrInlineData& resource : resources) {
426 if (!resource.uri().uri.empty()) {
427 // If the resource has a cache_id, try getting it out of the cache. If any
428 // condition happens outside the happy path, fetch the resource normally.
429 if (caching_enabled && !resource.uri().client_cache_id.empty()) {
430 absl::StatusOr<absl::Cord> cached_resource =
431 TryGetResourceFromCache(resource.uri().client_cache_id,
432 resource.uri().max_age, *resource_cache);
433 if (cached_resource.ok()) {
434 // Resource was successfully fetched from the cache, so we do not set
435 // the client_cache_id or the max_age.
436 response_accessors.push_back({.accessor =
437 [cached_resource]() {
438 return InMemoryHttpResponse{
439 kHttpOk, "", kOctetStream,
440 *cached_resource};
441 },
442 .client_cache_id = "",
443 .max_age = absl::ZeroDuration()});
444 continue;
445 }
446 }
447 // If the resource URI is set, then create a request to fetch the data for
448 // it, and point the accessor at the slot in http_responses where that
449 // request's response will eventually live.
450 FCP_ASSIGN_OR_RETURN(std::unique_ptr<http::HttpRequest> request,
451 InMemoryHttpRequest::Create(
452 resource.uri().uri, HttpRequest::Method::kGet,
453 {}, "", /*use_compression=*/
454 false));
455 http_requests.push_back(std::move(request));
456 int64_t response_index = http_requests.end() - http_requests.begin() - 1;
457 auto response_accessing_fn = [&http_responses, response_index]() {
458 return std::move(http_responses.at(response_index));
459 };
460 if (caching_enabled) {
461 // We didn't load the resource from the cache, so set the
462 // client_cache_id and max_age in the response_accessor.
463 response_accessors.push_back(
464 {.accessor = response_accessing_fn,
465 .client_cache_id = std::string(resource.uri().client_cache_id),
466 .max_age = resource.uri().max_age});
467 } else {
468 response_accessors.push_back({.accessor = response_accessing_fn});
469 }
470 } else {
471 // The data is available inline. Make the accessor just return a "fake"
472 // successful HTTP response (that way the caller can have unified error
473 // handling logic and doesn't have to know whether a resource was truly
474 // fetched via HTTP or not). Because the inline_data field is an
475 // absl::Cord, making a copy of it should be very cheap.
476 response_accessors.push_back({.accessor = [resource]() {
477 std::string content_type(kOctetStream);
478 switch (resource.inline_data().compression_format) {
479 case UriOrInlineData::InlineData::CompressionFormat::kUncompressed:
480 break;
481 case UriOrInlineData::InlineData::CompressionFormat::kGzip:
482 absl::StrAppend(&content_type, kClientDecodedGzipSuffix);
483 break;
484 }
485 return InMemoryHttpResponse{kHttpOk, "", content_type,
486 resource.inline_data().data};
487 }});
488 }
489 }
490
491 // Perform the requests.
492 auto resource_fetch_result = PerformMultipleRequestsInMemory(
493 http_client, interruptible_runner, std::move(http_requests),
494 bytes_received_acc, bytes_sent_acc);
495 // Check whether issuing the requests failed as a whole (generally indicating
496 // a programming error).
497 FCP_RETURN_IF_ERROR(resource_fetch_result);
498 http_responses = std::move(*resource_fetch_result);
499
500 // Compile the result vector by getting each resource's response using the
501 // corresponding accessor.
502 // Note that the order of results returned corresponds to the order of
503 // resources in the vector we originally received.
504 std::vector<absl::StatusOr<InMemoryHttpResponse>> result;
505 result.reserve(response_accessors.size());
506 for (const auto& response_accessor : response_accessors) {
507 absl::StatusOr<InMemoryHttpResponse> response =
508 response_accessor.accessor();
509 if (response.ok()) {
510 bool encoded_with_gzip = absl::EndsWithIgnoreCase(
511 response->content_type, kClientDecodedGzipSuffix);
512 if (!response_accessor.client_cache_id.empty()) {
513 TryPutResourceInCache(response_accessor.client_cache_id,
514 response->body, encoded_with_gzip,
515 response_accessor.max_age, *resource_cache)
516 .IgnoreError();
517 }
518 if (encoded_with_gzip) {
519 std::string response_body_temp(response->body);
520 // We're going to overwrite the response body with the decoded
521 // contents shortly, no need to keep an extra copy of it in memory.
522 response->body.Clear();
523 absl::StatusOr<absl::Cord> decoded_response_body =
524 internal::UncompressWithGzip(response_body_temp);
525 if (!decoded_response_body.ok()) {
526 response = decoded_response_body.status();
527 } else {
528 response->body = *std::move(decoded_response_body);
529 }
530 }
531 }
532 result.push_back(response);
533 }
534 return result;
535 }
536
537 namespace internal {
CompressWithGzip(const std::string & uncompressed_data)538 absl::StatusOr<std::string> CompressWithGzip(
539 const std::string& uncompressed_data) {
540 int starting_pos = 0;
541 size_t str_size = uncompressed_data.length();
542 size_t in_size = str_size;
543 std::string output;
544 StringOutputStream string_output_stream(&output);
545 GzipOutputStream::Options options;
546 options.format = GzipOutputStream::GZIP;
547 GzipOutputStream compressed_stream(&string_output_stream, options);
548 void* out;
549 int out_size;
550 while (starting_pos < str_size) {
551 if (!compressed_stream.Next(&out, &out_size) || out_size <= 0) {
552 return absl::InternalError(
553 absl::StrCat("An error has occurred during compression: ",
554 compressed_stream.ZlibErrorMessage()));
555 }
556
557 if (in_size <= out_size) {
558 uncompressed_data.copy(static_cast<char*>(out), in_size, starting_pos);
559 // Ensure that the stream's output buffer is truncated to match the total
560 // amount of data.
561 compressed_stream.BackUp(out_size - static_cast<int>(in_size));
562 break;
563 }
564 uncompressed_data.copy(static_cast<char*>(out), out_size, starting_pos);
565 starting_pos += out_size;
566 in_size -= out_size;
567 }
568
569 if (!compressed_stream.Close()) {
570 return absl::InternalError(absl::StrCat(
571 "Failed to close the stream: ", compressed_stream.ZlibErrorMessage()));
572 }
573 return output;
574 }
575
UncompressWithGzip(const std::string & compressed_data)576 absl::StatusOr<absl::Cord> UncompressWithGzip(
577 const std::string& compressed_data) {
578 absl::Cord out;
579 const void* buffer;
580 int size;
581 ArrayInputStream sub_stream(compressed_data.data(),
582 static_cast<int>(compressed_data.size()));
583 GzipInputStream input_stream(&sub_stream, GzipInputStream::GZIP);
584
585 while (input_stream.Next(&buffer, &size)) {
586 if (size <= -1) {
587 return absl::InternalError(
588 "Uncompress failed: invalid input size returned by the "
589 "GzipInputStream.");
590 }
591 out.Append(absl::string_view(reinterpret_cast<const char*>(buffer), size));
592 }
593
594 if (input_stream.ZlibErrorMessage() != nullptr) {
595 // Some real error happened during decompression.
596 return absl::InternalError(
597 absl::StrCat("An error has occurred during decompression:",
598 input_stream.ZlibErrorMessage()));
599 }
600
601 return out;
602 }
603
604 } // namespace internal
605 } // namespace http
606 } // namespace client
607 } // namespace fcp
608