xref: /aosp_15_r20/external/federated-compute/fcp/client/http/in_memory_request_response.cc (revision 14675a029014e728ec732f129a32e299b2da0601)
1 /*
2  * Copyright 2021 Google LLC
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "fcp/client/http/in_memory_request_response.h"
17 
18 #include <cstdint>
19 #include <cstring>
20 #include <functional>
21 #include <memory>
22 #include <optional>
23 #include <string>
24 #include <utility>
25 #include <vector>
26 
27 #include "absl/status/status.h"
28 #include "absl/status/statusor.h"
29 #include "absl/strings/ascii.h"
30 #include "absl/strings/cord.h"
31 #include "absl/strings/match.h"
32 #include "absl/strings/numbers.h"
33 #include "absl/strings/str_cat.h"
34 #include "absl/strings/string_view.h"
35 #include "absl/synchronization/mutex.h"
36 #include "fcp/base/monitoring.h"
37 #include "fcp/client/cache/resource_cache.h"
38 #include "fcp/client/http/http_client.h"
39 #include "fcp/client/http/http_client_util.h"
40 #include "fcp/client/http/http_resource_metadata.pb.h"
41 #include "fcp/client/interruptible_runner.h"
42 #include "google/protobuf/io/gzip_stream.h"
43 #include "google/protobuf/io/zero_copy_stream_impl_lite.h"
44 
45 namespace fcp {
46 namespace client {
47 namespace http {
48 namespace {
49 
50 // Returns the resource from the cache, or NOT_FOUND if it was not in the cache.
51 // If the resource was compressed, it will be decompressed.
TryGetResourceFromCache(absl::string_view client_cache_id,const std::optional<absl::Duration> & max_age,cache::ResourceCache & resource_cache)52 absl::StatusOr<absl::Cord> TryGetResourceFromCache(
53     absl::string_view client_cache_id,
54     const std::optional<absl::Duration>& max_age,
55     cache::ResourceCache& resource_cache) {
56   FCP_ASSIGN_OR_RETURN(
57       cache::ResourceCache::ResourceAndMetadata cached_resource_and_metadata,
58       resource_cache.Get(client_cache_id, max_age));
59   HttpResourceMetadata metadata;
60   if (!cached_resource_and_metadata.metadata.UnpackTo(&metadata)) {
61     return absl::InternalError("Failed to unpack metadata!");
62   }
63   absl::Cord cached_resource = cached_resource_and_metadata.resource;
64   if (metadata.compression_format() ==
65       ResourceCompressionFormat::RESOURCE_COMPRESSION_FORMAT_GZIP) {
66     FCP_ASSIGN_OR_RETURN(cached_resource, internal::UncompressWithGzip(
67                                               std::string(cached_resource)));
68   }
69   return cached_resource;
70 }
71 
TryPutResourceInCache(absl::string_view client_cache_id,const absl::Cord & response_body,bool response_encoded_with_gzip,absl::Duration max_age,cache::ResourceCache & resource_cache)72 absl::Status TryPutResourceInCache(absl::string_view client_cache_id,
73                                    const absl::Cord& response_body,
74                                    bool response_encoded_with_gzip,
75                                    absl::Duration max_age,
76                                    cache::ResourceCache& resource_cache) {
77   // We fetched a resource that has a client_cache_id and was not
78   // loaded from the cache, put it in the cache.
79   HttpResourceMetadata metadata;
80   if (response_encoded_with_gzip) {
81     metadata.set_compression_format(
82         ResourceCompressionFormat::RESOURCE_COMPRESSION_FORMAT_GZIP);
83   } else {
84     metadata.set_compression_format(
85         ResourceCompressionFormat::RESOURCE_COMPRESSION_FORMAT_UNSPECIFIED);
86   }
87   google::protobuf::Any metadata_wrapper;
88   metadata_wrapper.PackFrom(metadata);
89   return resource_cache.Put(client_cache_id, response_body, metadata_wrapper,
90                             max_age);
91 }
92 
93 }  // namespace
94 
95 using ::google::protobuf::io::ArrayInputStream;
96 using ::google::protobuf::io::GzipInputStream;
97 using ::google::protobuf::io::GzipOutputStream;
98 using ::google::protobuf::io::StringOutputStream;
99 
100 using CompressionFormat =
101     ::fcp::client::http::UriOrInlineData::InlineData::CompressionFormat;
102 
103 static constexpr char kOctetStream[] = "application/octet-stream";
104 constexpr absl::string_view kClientDecodedGzipSuffix = "+gzip";
105 
Create(absl::string_view uri,Method method,HeaderList extra_headers,std::string body,bool use_compression)106 absl::StatusOr<std::unique_ptr<HttpRequest>> InMemoryHttpRequest::Create(
107     absl::string_view uri, Method method, HeaderList extra_headers,
108     std::string body, bool use_compression) {
109   // Allow http://localhost:xxxx as an exception to the https-only policy,
110   // so that we can use a local http test server.
111   if (!absl::StartsWithIgnoreCase(uri, kHttpsScheme) &&
112       !absl::StartsWithIgnoreCase(uri, kLocalhostUri)) {
113     return absl::InvalidArgumentError(
114         absl::StrCat("Non-HTTPS URIs are not supported: ", uri));
115   }
116   if (use_compression) {
117     FCP_ASSIGN_OR_RETURN(body, internal::CompressWithGzip(body));
118     extra_headers.push_back({kContentEncodingHdr, kGzipEncodingHdrValue});
119   }
120   std::optional<std::string> content_length_hdr =
121       FindHeader(extra_headers, kContentLengthHdr);
122   if (content_length_hdr.has_value()) {
123     return absl::InvalidArgumentError(
124         "Content-Length header should not be provided!");
125   }
126 
127   if (!body.empty()) {
128     switch (method) {
129       case HttpRequest::Method::kPost:
130       case HttpRequest::Method::kPatch:
131       case HttpRequest::Method::kPut:
132       case HttpRequest::Method::kDelete:
133         break;
134       default:
135         return absl::InvalidArgumentError(absl::StrCat(
136             "Request method does not allow request body: ", method));
137     }
138     // Add a Content-Length header, but only if there's a request body.
139     extra_headers.push_back({kContentLengthHdr, std::to_string(body.size())});
140   }
141 
142   return absl::WrapUnique(new InMemoryHttpRequest(
143       uri, method, std::move(extra_headers), std::move(body)));
144 }
145 
ReadBody(char * buffer,int64_t requested)146 absl::StatusOr<int64_t> InMemoryHttpRequest::ReadBody(char* buffer,
147                                                       int64_t requested) {
148   // This method is called from the HttpClient's thread (we don't really care
149   // which one). Hence, we use a mutex to ensure that subsequent calls to this
150   // method see the modifications to cursor_.
151   absl::WriterMutexLock _(&mutex_);
152 
153   // Check whether there's any bytes left to read, and indicate the end has been
154   // reached if not.
155   int64_t bytes_left = body_.size() - cursor_;
156   if (bytes_left == 0) {
157     return absl::OutOfRangeError("End of stream reached");
158   }
159   FCP_CHECK(buffer != nullptr);
160   FCP_CHECK(requested > 0);
161   // Calculate how much data we can return, based on the size of `buffer`.
162   int64_t actual_read = bytes_left <= requested ? bytes_left : requested;
163   std::memcpy(buffer, body_.data() + cursor_, actual_read);
164   cursor_ += actual_read;
165   return actual_read;
166 }
167 
OnResponseStarted(const HttpRequest & request,const HttpResponse & response)168 absl::Status InMemoryHttpRequestCallback::OnResponseStarted(
169     const HttpRequest& request, const HttpResponse& response) {
170   absl::WriterMutexLock _(&mutex_);
171   response_code_ = response.code();
172 
173   std::optional<std::string> content_encoding_header =
174       FindHeader(response.headers(), kContentEncodingHdr);
175   if (content_encoding_header.has_value()) {
176     // We don't expect the response body to be "Content-Encoding" encoded,
177     // because the `HttpClient` is supposed to transparently handle the decoding
178     // for us (unless we specified a "Accept-Encoding" header in the request,
179     // which would indicate that we wanted to handle the response decoding).
180     if (!FindHeader(request.extra_headers(), kAcceptEncodingHdr).has_value()) {
181       // Note: technically, we should only receive Content-Encoding values that
182       // match the Accept-Encoding values provided in the request headers. The
183       // check above isn't quite that strict, but that's probably fine (since
184       // such issues should be rare, and can be handled farther up the stack).
185       status_ = absl::InvalidArgumentError(
186           absl::StrCat("Unexpected header: ", kContentEncodingHdr));
187       return status_;
188     }
189     content_encoding_ = *content_encoding_header;
190   }
191 
192   content_type_ = FindHeader(response.headers(), kContentTypeHdr).value_or("");
193 
194   // Similarly, we should under no circumstances receive a non-identity
195   // Transfer-Encoding header, since the `HttpClient` is unconditionally
196   // required to undo any such encoding for us.
197   std::optional<std::string> transfer_encoding_header =
198       FindHeader(response.headers(), kTransferEncodingHdr);
199   if (transfer_encoding_header.has_value() &&
200       absl::AsciiStrToLower(*transfer_encoding_header) !=
201           kIdentityEncodingHdrValue) {
202     status_ = absl::InvalidArgumentError(
203         absl::StrCat("Unexpected header: ", kTransferEncodingHdr));
204     return status_;
205   }
206 
207   // If no Content-Length header is provided, this means that the server either
208   // didn't provide one and is streaming the response, or that the HttpClient
209   // implementation transparently decompressed the data for us and stripped the
210   // Content-Length header (as per the HttpClient contract).
211   std::optional<std::string> content_length_hdr =
212       FindHeader(response.headers(), kContentLengthHdr);
213   if (!content_length_hdr.has_value()) {
214     return absl::OkStatus();
215   }
216 
217   // A Content-Length header available. Let's parse it so that we know how much
218   // data to expect.
219   int64_t content_length;
220   // Note that SimpleAtoi safely handles non-ASCII data.
221   if (!absl::SimpleAtoi(*content_length_hdr, &content_length)) {
222     status_ = absl::InvalidArgumentError(
223         "Could not parse Content-Length response header");
224     return status_;
225   }
226   if (content_length < 0) {
227     status_ = absl::OutOfRangeError(absl::StrCat(
228         "Invalid Content-Length response header: ", content_length));
229     return status_;
230   }
231   expected_content_length_ = content_length;
232 
233   return absl::OkStatus();
234 }
235 
OnResponseError(const HttpRequest & request,const absl::Status & error)236 void InMemoryHttpRequestCallback::OnResponseError(const HttpRequest& request,
237                                                   const absl::Status& error) {
238   absl::WriterMutexLock _(&mutex_);
239   status_ = absl::Status(
240       error.code(), absl::StrCat("Error receiving response headers (error: ",
241                                  error.message(), ")"));
242 }
243 
OnResponseBody(const HttpRequest & request,const HttpResponse & response,absl::string_view data)244 absl::Status InMemoryHttpRequestCallback::OnResponseBody(
245     const HttpRequest& request, const HttpResponse& response,
246     absl::string_view data) {
247   // This runs on a thread chosen by the HttpClient implementation (i.e. it
248   // could be our original thread, or a different one). Ensure that if
249   // subsequent callbacks occur on different threads each thread sees the
250   // previous threads' updates to response_buffer_.
251   absl::WriterMutexLock _(&mutex_);
252 
253   // Ensure we're not receiving more data than expected.
254   if (expected_content_length_.has_value() &&
255       response_buffer_.size() + data.size() > *expected_content_length_) {
256     status_ = absl::OutOfRangeError(absl::StrCat(
257         "Too much response body data received (rcvd: ", response_buffer_.size(),
258         ", new: ", data.size(), ", max: ", *expected_content_length_, ")"));
259     return status_;
260   }
261 
262   // Copy the data into the target buffer. Note that this means we'll always
263   // store the response body as a number of memory fragments (rather than a
264   // contiguous buffer). However, because HttpClient implementations are
265   // encouraged to return response data in fairly large chunks, we don't expect
266   // this too cause much overhead.
267   response_buffer_.Append(data);
268 
269   return absl::OkStatus();
270 }
271 
OnResponseBodyError(const HttpRequest & request,const HttpResponse & response,const absl::Status & error)272 void InMemoryHttpRequestCallback::OnResponseBodyError(
273     const HttpRequest& request, const HttpResponse& response,
274     const absl::Status& error) {
275   absl::WriterMutexLock _(&mutex_);
276   status_ = absl::Status(
277       error.code(),
278       absl::StrCat("Error receiving response body (response code: ",
279                    response.code(), ", error: ", error.message(), ")"));
280 }
281 
OnResponseCompleted(const HttpRequest & request,const HttpResponse & response)282 void InMemoryHttpRequestCallback::OnResponseCompleted(
283     const HttpRequest& request, const HttpResponse& response) {
284   // Once the body has been received correctly, turn the response code into a
285   // canonical code.
286   absl::WriterMutexLock _(&mutex_);
287   // Note: the case when too *much* response data is unexpectedly received is
288   // handled in OnResponseBody (while this handles the case of too little data).
289   if (expected_content_length_.has_value() &&
290       response_buffer_.size() != *expected_content_length_) {
291     status_ = absl::InvalidArgumentError(
292         absl::StrCat("Too little response body data received (rcvd: ",
293                      response_buffer_.size(),
294                      ", expected: ", *expected_content_length_, ")"));
295     return;
296   }
297 
298   status_ = ConvertHttpCodeToStatus(*response_code_);
299 }
300 
Response() const301 absl::StatusOr<InMemoryHttpResponse> InMemoryHttpRequestCallback::Response()
302     const {
303   absl::ReaderMutexLock _(&mutex_);
304   FCP_RETURN_IF_ERROR(status_);
305   // If status_ is OK, then response_code_ and response_headers_ are guaranteed
306   // to have values.
307 
308   return InMemoryHttpResponse{*response_code_, content_encoding_, content_type_,
309                               response_buffer_};
310 }
311 
PerformRequestInMemory(HttpClient & http_client,InterruptibleRunner & interruptible_runner,std::unique_ptr<http::HttpRequest> request,int64_t * bytes_received_acc,int64_t * bytes_sent_acc)312 absl::StatusOr<InMemoryHttpResponse> PerformRequestInMemory(
313     HttpClient& http_client, InterruptibleRunner& interruptible_runner,
314     std::unique_ptr<http::HttpRequest> request, int64_t* bytes_received_acc,
315     int64_t* bytes_sent_acc) {
316   // Note: we must explicitly instantiate a vector here as opposed to passing an
317   // initializer list to PerformRequestsInMemory, because initializer lists do
318   // not support move-only values.
319   std::vector<std::unique_ptr<http::HttpRequest>> requests;
320   requests.push_back(std::move(request));
321   FCP_ASSIGN_OR_RETURN(
322       auto result, PerformMultipleRequestsInMemory(
323                        http_client, interruptible_runner, std::move(requests),
324                        bytes_received_acc, bytes_sent_acc));
325   return std::move(result[0]);
326 }
327 
328 absl::StatusOr<std::vector<absl::StatusOr<InMemoryHttpResponse>>>
PerformMultipleRequestsInMemory(HttpClient & http_client,InterruptibleRunner & interruptible_runner,std::vector<std::unique_ptr<http::HttpRequest>> requests,int64_t * bytes_received_acc,int64_t * bytes_sent_acc)329 PerformMultipleRequestsInMemory(
330     HttpClient& http_client, InterruptibleRunner& interruptible_runner,
331     std::vector<std::unique_ptr<http::HttpRequest>> requests,
332     int64_t* bytes_received_acc, int64_t* bytes_sent_acc) {
333   // A vector that will own the request handles and callbacks (and will
334   // determine their lifetimes).
335   std::vector<std::pair<std::unique_ptr<HttpRequestHandle>,
336                         std::unique_ptr<InMemoryHttpRequestCallback>>>
337       handles_and_callbacks;
338   handles_and_callbacks.reserve(requests.size());
339 
340   // An accompanying vector that contains just the raw pointers, for passing to
341   // `HttpClient::PerformRequests`.
342   std::vector<std::pair<HttpRequestHandle*, HttpRequestCallback*>>
343       handles_and_callbacks_ptrs;
344   handles_and_callbacks_ptrs.reserve(requests.size());
345 
346   // Enqueue each request, and create a simple callback for each request which
347   // will simply buffer the response body in-memory and allow us to consume that
348   // buffer once all requests have finished.
349   for (std::unique_ptr<HttpRequest>& request : requests) {
350     std::unique_ptr<HttpRequestHandle> handle =
351         http_client.EnqueueRequest(std::move(request));
352     auto callback = std::make_unique<InMemoryHttpRequestCallback>();
353     handles_and_callbacks_ptrs.push_back({handle.get(), callback.get()});
354     handles_and_callbacks.push_back({std::move(handle), std::move(callback)});
355   }
356 
357   // Issue the requests in one call (allowing the HttpClient to issue them
358   // concurrently), in an interruptible fashion.
359   absl::Status result = interruptible_runner.Run(
360       [&http_client, &handles_and_callbacks_ptrs]() {
361         return http_client.PerformRequests(handles_and_callbacks_ptrs);
362       },
363       [&handles_and_callbacks_ptrs] {
364         // If we get aborted then call HttpRequestHandle::Cancel on all handles.
365         // This should result in the PerformRequests call returning early and
366         // InterruptibleRunner::Run returning CANCELLED.
367         for (auto [handle, callback] : handles_and_callbacks_ptrs) {
368           handle->Cancel();
369         }
370       });
371   // Update the network stats *before* we return (just in case a failed
372   // `PerformRequests` call caused some network traffic to have been sent
373   // anyway).
374   for (auto& [handle, callback] : handles_and_callbacks) {
375     HttpRequestHandle::SentReceivedBytes sent_received_bytes =
376         handle->TotalSentReceivedBytes();
377     if (bytes_received_acc != nullptr) {
378       *bytes_received_acc += sent_received_bytes.received_bytes;
379     }
380     if (bytes_sent_acc != nullptr) {
381       *bytes_sent_acc += sent_received_bytes.sent_bytes;
382     }
383   }
384 
385   FCP_RETURN_IF_ERROR(result);
386 
387   // Gather and return the results.
388   std::vector<absl::StatusOr<InMemoryHttpResponse>> results;
389   results.reserve(handles_and_callbacks.size());
390   for (auto& [handle, callback] : handles_and_callbacks) {
391     results.push_back(callback->Response());
392   }
393   return results;
394 }
395 
396 absl::StatusOr<std::vector<absl::StatusOr<InMemoryHttpResponse>>>
FetchResourcesInMemory(HttpClient & http_client,InterruptibleRunner & interruptible_runner,const std::vector<UriOrInlineData> & resources,int64_t * bytes_received_acc,int64_t * bytes_sent_acc,cache::ResourceCache * resource_cache)397 FetchResourcesInMemory(HttpClient& http_client,
398                        InterruptibleRunner& interruptible_runner,
399                        const std::vector<UriOrInlineData>& resources,
400                        int64_t* bytes_received_acc, int64_t* bytes_sent_acc,
401                        cache::ResourceCache* resource_cache) {
402   // Each resource may have the data already available (by having been included
403   // in a prior response inline), or may need to be fetched.
404 
405   // We'll create an 'accessor' for each resource, providing access to that
406   // resource's data by the end of this function (either the fetched data, or
407   // the inline data). Additionally, this struct will contain the
408   // client_cache_id and max_age for the resource if the resource should be put
409   // in the cache. If the resource should not be put in the cache,
410   // client_cache_id will be an empty string.
411   struct AccessorAndCacheMetadata {
412     std::function<absl::StatusOr<InMemoryHttpResponse>()> accessor;
413     std::string client_cache_id;
414     absl::Duration max_age;
415   };
416   std::vector<AccessorAndCacheMetadata> response_accessors;
417 
418   // We'll compile HttpRequest instances for those resources that do need to be
419   // fetched, then we'll fire them off all at once, and then we'll gather their
420   // responses once all requests have finished.
421   std::vector<std::unique_ptr<http::HttpRequest>> http_requests;
422   std::vector<absl::StatusOr<InMemoryHttpResponse>> http_responses;
423   bool caching_enabled = resource_cache != nullptr;
424 
425   for (const UriOrInlineData& resource : resources) {
426     if (!resource.uri().uri.empty()) {
427       // If the resource has a cache_id, try getting it out of the cache. If any
428       // condition happens outside the happy path, fetch the resource normally.
429       if (caching_enabled && !resource.uri().client_cache_id.empty()) {
430         absl::StatusOr<absl::Cord> cached_resource =
431             TryGetResourceFromCache(resource.uri().client_cache_id,
432                                     resource.uri().max_age, *resource_cache);
433         if (cached_resource.ok()) {
434           // Resource was successfully fetched from the cache, so we do not set
435           // the client_cache_id or the max_age.
436           response_accessors.push_back({.accessor =
437                                             [cached_resource]() {
438                                               return InMemoryHttpResponse{
439                                                   kHttpOk, "", kOctetStream,
440                                                   *cached_resource};
441                                             },
442                                         .client_cache_id = "",
443                                         .max_age = absl::ZeroDuration()});
444           continue;
445         }
446       }
447       // If the resource URI is set, then create a request to fetch the data for
448       // it, and point the accessor at the slot in http_responses where that
449       // request's response will eventually live.
450       FCP_ASSIGN_OR_RETURN(std::unique_ptr<http::HttpRequest> request,
451                            InMemoryHttpRequest::Create(
452                                resource.uri().uri, HttpRequest::Method::kGet,
453                                {}, "", /*use_compression=*/
454                                false));
455       http_requests.push_back(std::move(request));
456       int64_t response_index = http_requests.end() - http_requests.begin() - 1;
457       auto response_accessing_fn = [&http_responses, response_index]() {
458         return std::move(http_responses.at(response_index));
459       };
460       if (caching_enabled) {
461         // We didn't load the resource from the cache, so set the
462         // client_cache_id and max_age in the response_accessor.
463         response_accessors.push_back(
464             {.accessor = response_accessing_fn,
465              .client_cache_id = std::string(resource.uri().client_cache_id),
466              .max_age = resource.uri().max_age});
467       } else {
468         response_accessors.push_back({.accessor = response_accessing_fn});
469       }
470     } else {
471       // The data is available inline. Make the accessor just return a "fake"
472       // successful HTTP response (that way the caller can have unified error
473       // handling logic and doesn't have to know whether a resource was truly
474       // fetched via HTTP or not). Because the inline_data field is an
475       // absl::Cord, making a copy of it should be very cheap.
476       response_accessors.push_back({.accessor = [resource]() {
477         std::string content_type(kOctetStream);
478         switch (resource.inline_data().compression_format) {
479           case UriOrInlineData::InlineData::CompressionFormat::kUncompressed:
480             break;
481           case UriOrInlineData::InlineData::CompressionFormat::kGzip:
482             absl::StrAppend(&content_type, kClientDecodedGzipSuffix);
483             break;
484         }
485         return InMemoryHttpResponse{kHttpOk, "", content_type,
486                                     resource.inline_data().data};
487       }});
488     }
489   }
490 
491   // Perform the requests.
492   auto resource_fetch_result = PerformMultipleRequestsInMemory(
493       http_client, interruptible_runner, std::move(http_requests),
494       bytes_received_acc, bytes_sent_acc);
495   // Check whether issuing the requests failed as a whole (generally indicating
496   // a programming error).
497   FCP_RETURN_IF_ERROR(resource_fetch_result);
498   http_responses = std::move(*resource_fetch_result);
499 
500   // Compile the result vector by getting each resource's response using the
501   // corresponding accessor.
502   // Note that the order of results returned corresponds to the order of
503   // resources in the vector we originally received.
504   std::vector<absl::StatusOr<InMemoryHttpResponse>> result;
505   result.reserve(response_accessors.size());
506   for (const auto& response_accessor : response_accessors) {
507     absl::StatusOr<InMemoryHttpResponse> response =
508         response_accessor.accessor();
509       if (response.ok()) {
510         bool encoded_with_gzip = absl::EndsWithIgnoreCase(
511             response->content_type, kClientDecodedGzipSuffix);
512         if (!response_accessor.client_cache_id.empty()) {
513           TryPutResourceInCache(response_accessor.client_cache_id,
514                                 response->body, encoded_with_gzip,
515                                 response_accessor.max_age, *resource_cache)
516               .IgnoreError();
517         }
518         if (encoded_with_gzip) {
519           std::string response_body_temp(response->body);
520           // We're going to overwrite the response body with the decoded
521           // contents shortly, no need to keep an extra copy of it in memory.
522           response->body.Clear();
523           absl::StatusOr<absl::Cord> decoded_response_body =
524               internal::UncompressWithGzip(response_body_temp);
525           if (!decoded_response_body.ok()) {
526             response = decoded_response_body.status();
527           } else {
528             response->body = *std::move(decoded_response_body);
529           }
530         }
531       }
532       result.push_back(response);
533     }
534   return result;
535 }
536 
537 namespace internal {
CompressWithGzip(const std::string & uncompressed_data)538 absl::StatusOr<std::string> CompressWithGzip(
539     const std::string& uncompressed_data) {
540   int starting_pos = 0;
541   size_t str_size = uncompressed_data.length();
542   size_t in_size = str_size;
543   std::string output;
544   StringOutputStream string_output_stream(&output);
545   GzipOutputStream::Options options;
546   options.format = GzipOutputStream::GZIP;
547   GzipOutputStream compressed_stream(&string_output_stream, options);
548   void* out;
549   int out_size;
550   while (starting_pos < str_size) {
551     if (!compressed_stream.Next(&out, &out_size) || out_size <= 0) {
552       return absl::InternalError(
553           absl::StrCat("An error has occurred during compression: ",
554                        compressed_stream.ZlibErrorMessage()));
555     }
556 
557     if (in_size <= out_size) {
558       uncompressed_data.copy(static_cast<char*>(out), in_size, starting_pos);
559       // Ensure that the stream's output buffer is truncated to match the total
560       // amount of data.
561       compressed_stream.BackUp(out_size - static_cast<int>(in_size));
562       break;
563     }
564     uncompressed_data.copy(static_cast<char*>(out), out_size, starting_pos);
565     starting_pos += out_size;
566     in_size -= out_size;
567   }
568 
569   if (!compressed_stream.Close()) {
570     return absl::InternalError(absl::StrCat(
571         "Failed to close the stream: ", compressed_stream.ZlibErrorMessage()));
572   }
573   return output;
574 }
575 
UncompressWithGzip(const std::string & compressed_data)576 absl::StatusOr<absl::Cord> UncompressWithGzip(
577     const std::string& compressed_data) {
578   absl::Cord out;
579   const void* buffer;
580   int size;
581   ArrayInputStream sub_stream(compressed_data.data(),
582                               static_cast<int>(compressed_data.size()));
583   GzipInputStream input_stream(&sub_stream, GzipInputStream::GZIP);
584 
585   while (input_stream.Next(&buffer, &size)) {
586     if (size <= -1) {
587       return absl::InternalError(
588           "Uncompress failed: invalid input size returned by the "
589           "GzipInputStream.");
590     }
591     out.Append(absl::string_view(reinterpret_cast<const char*>(buffer), size));
592   }
593 
594   if (input_stream.ZlibErrorMessage() != nullptr) {
595     // Some real error happened during decompression.
596     return absl::InternalError(
597         absl::StrCat("An error has occurred during decompression:",
598                      input_stream.ZlibErrorMessage()));
599   }
600 
601   return out;
602 }
603 
604 }  // namespace internal
605 }  // namespace http
606 }  // namespace client
607 }  // namespace fcp
608