xref: /aosp_15_r20/external/federated-compute/fcp/client/http/protocol_request_helper.cc (revision 14675a029014e728ec732f129a32e299b2da0601)
1 /*
2  * Copyright 2022 Google LLC
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "fcp/client/http/protocol_request_helper.h"
17 
18 #include "absl/strings/substitute.h"
19 #include "fcp/base/time_util.h"
20 #include "fcp/client/http/http_client_util.h"
21 #include "fcp/protos/federatedcompute/secure_aggregations.pb.h"
22 #include "fcp/protos/federatedcompute/task_assignments.pb.h"
23 #include "google/protobuf/any.pb.h"
24 
25 namespace fcp {
26 namespace client {
27 namespace http {
28 
29 // The default interval when polling pending operations.
30 const absl::Duration kDefaultLroPollingInterval = absl::Milliseconds(500);
31 // The maximum interval when polling pending operations.
32 const absl::Duration kMaxLroPollingInterval = absl::Minutes(1);
33 
34 constexpr absl::string_view kStartTaskAssignmentMetadata =
35     "type.googleapis.com/"
36     "google.internal.federatedcompute.v1.StartTaskAssignmentMetadata";  // NOLINT
37 constexpr absl::string_view kAdvertiseKeysMetadata =
38     "type.googleapis.com/"
39     "google.internal.federatedcompute.v1.AdvertiseKeysMetadata";  // NOLINT
40 constexpr absl::string_view kShareKeysMetadata =
41     "type.googleapis.com/google.internal.federatedcompute.v1.ShareKeysMetadata";
42 constexpr absl::string_view kSubmitSecureAggregationResultMetadata =
43     "type.googleapis.com/"
44     "google.internal.federatedcompute.v1."
45     "SubmitSecureAggregationResultMetadata";  // NOLINT
46 
47 using ::google::internal::federatedcompute::v1::AdvertiseKeysMetadata;
48 using ::google::internal::federatedcompute::v1::ForwardingInfo;
49 using ::google::internal::federatedcompute::v1::ShareKeysMetadata;
50 using ::google::internal::federatedcompute::v1::StartTaskAssignmentMetadata;
51 using ::google::internal::federatedcompute::v1::
52     SubmitSecureAggregationResultMetadata;
53 // using ::google::longrunning::Operation;
54 
55 namespace {
56 // A note on error handling:
57 //
58 // The implementation here makes a distinction between what we call 'transient'
59 // and 'permanent' errors. While the exact categorization of transient vs.
60 // permanent errors is defined by a flag, the intent is that transient errors
61 // are those types of errors that may occur in the regular course of business,
62 // e.g. due to an interrupted network connection, a load balancer temporarily
63 // rejecting our request etc. Generally, these are expected to be resolvable by
64 // merely retrying the request at a slightly later time. Permanent errors are
65 // intended to be those that are not expected to be resolvable as quickly or by
66 // merely retrying the request. E.g. if a client checks in to the server with a
67 // population name that doesn't exist, then the server may return NOT_FOUND, and
68 // until the server-side configuration is changed, it will continue returning
69 // such an error. Hence, such errors can warrant a longer retry period (to waste
70 // less of both the client's and server's resources).
71 //
72 // The errors also differ in how they interact with the server-specified retry
73 // windows that are returned via the EligbilityEvalTaskResponse message.
74 // - If a permanent error occurs, then we will always return a retry window
75 //   based on the target 'permanent errors retry period' flag, regardless of
76 //   whether we received an EligbilityEvalTaskResponse from the server at an
77 //   earlier time.
78 // - If a transient error occurs, then we will only return a retry window
79 //   based on the target 'transient errors retry period' flag if the server
80 //   didn't already return an EligibilityEvalTaskResponse. If it did return such
81 //   a response, then one of the retry windows in that message will be used
82 //   instead.
83 //
84 // Finally, note that for simplicity's sake we generally check whether a
85 // permanent error was received at the level of this class's public methods,
86 // rather than deeper down in each of our helper methods that actually call
87 // directly into the HTTP stack. This keeps our state-managing code simpler, but
88 // does mean that if any of our helper methods like
89 // PerformEligibilityEvalTaskRequest produce a permanent error code locally
90 // (i.e. without it being sent by the server), it will be treated as if the
91 // server sent it and the permanent error retry period will be used. We consider
92 // this a reasonable tradeoff.
93 
CreateUriSuffixFromPathAndParams(absl::string_view path,const QueryParams & params)94 std::string CreateUriSuffixFromPathAndParams(absl::string_view path,
95                                              const QueryParams& params) {
96   return absl::StrCat(path, "?",
97                       absl::StrJoin(params.begin(), params.end(), "&",
98                                     absl::PairFormatter("=")));
99 }
100 
101 // Creates the URI suffix for a GetOperation protocol request.
CreateGetOperationUriSuffix(absl::string_view operation_name)102 absl::StatusOr<std::string> CreateGetOperationUriSuffix(
103     absl::string_view operation_name) {
104   constexpr absl::string_view kGetOperationUriSuffix = "/v1/$0";
105   FCP_ASSIGN_OR_RETURN(std::string encoded_operation_name,
106                        EncodeUriMultiplePathSegments(operation_name));
107   return absl::Substitute(kGetOperationUriSuffix, encoded_operation_name);
108 }
109 
110 // Creates the URI suffix for a CancelOperation protocol request.
CreateCancelOperationUriSuffix(absl::string_view operation_name)111 absl::StatusOr<std::string> CreateCancelOperationUriSuffix(
112     absl::string_view operation_name) {
113   constexpr absl::string_view kCancelOperationUriSuffix = "/v1/$0:cancel";
114   FCP_ASSIGN_OR_RETURN(std::string encoded_operation_name,
115                        EncodeUriMultiplePathSegments(operation_name));
116   return absl::Substitute(kCancelOperationUriSuffix, encoded_operation_name);
117 }
118 
CheckResponseContentEncoding(absl::StatusOr<InMemoryHttpResponse> response)119 absl::StatusOr<InMemoryHttpResponse> CheckResponseContentEncoding(
120     absl::StatusOr<InMemoryHttpResponse> response) {
121   if (response.ok() && !response->content_encoding.empty()) {
122     // Note that the `HttpClient` API contract ensures that if we don't specify
123     // an Accept-Encoding request header, then the response should be delivered
124     // to us without any Content-Encoding applied to it. Hence, if we somehow do
125     // still see a Content-Encoding response header then the `HttpClient`
126     // implementation isn't adhering to its part of the API contract.
127     return absl::UnavailableError(
128         "HTTP response unexpectedly has a Content-Encoding");
129   }
130   return response;
131 }
132 
133 // Extract polling interval from the operation proto.
134 // The returned polling interval will be within the range of [1ms, 1min]. If
135 // the polling interval inside the operation proto is outside this range, it'll
136 // be clipped to the nearest boundary. If the polling interval is unset, 1ms
137 // will be returned.
138 // absl::Duration GetPollingInterval(Operation operation) {
139 //   absl::string_view type_url = operation.metadata().type_url();
140 //   google::protobuf::Duration polling_interval_proto;
141 //   if (type_url == kStartTaskAssignmentMetadata) {
142 //     StartTaskAssignmentMetadata metadata;
143 //     if (!operation.metadata().UnpackTo(&metadata)) {
144 //       return kDefaultLroPollingInterval;
145 //     }
146 //     polling_interval_proto = metadata.polling_interval();
147 //   } else if (type_url == kAdvertiseKeysMetadata) {
148 //     AdvertiseKeysMetadata metadata;
149 //     if (!operation.metadata().UnpackTo(&metadata)) {
150 //       return kDefaultLroPollingInterval;
151 //     }
152 //     polling_interval_proto = metadata.polling_interval();
153 //   } else if (type_url == kShareKeysMetadata) {
154 //     ShareKeysMetadata metadata;
155 //     if (!operation.metadata().UnpackTo(&metadata)) {
156 //       return kDefaultLroPollingInterval;
157 //     }
158 //     polling_interval_proto = metadata.polling_interval();
159 //   } else if (type_url == kSubmitSecureAggregationResultMetadata) {
160 //     SubmitSecureAggregationResultMetadata metadata;
161 //     if (!operation.metadata().UnpackTo(&metadata)) {
162 //       return kDefaultLroPollingInterval;
163 //     }
164 //     polling_interval_proto = metadata.polling_interval();
165 //   } else {
166 //     // Unknown type
167 //     return kDefaultLroPollingInterval;
168 //   }
169 
170 //   absl::Duration polling_interval =
171 //       TimeUtil::ConvertProtoToAbslDuration(polling_interval_proto);
172 //   if (polling_interval < absl::ZeroDuration()) {
173 //     return kDefaultLroPollingInterval;
174 //   } else if (polling_interval > kMaxLroPollingInterval) {
175 //     return kMaxLroPollingInterval;
176 //   } else {
177 //     return polling_interval;
178 //   }
179 // }
180 
181 }  // anonymous namespace
182 
ProtocolRequestCreator(absl::string_view request_base_uri,absl::string_view api_key,HeaderList request_headers,bool use_compression)183 ProtocolRequestCreator::ProtocolRequestCreator(
184     absl::string_view request_base_uri, absl::string_view api_key,
185     HeaderList request_headers, bool use_compression)
186     : next_request_base_uri_(request_base_uri),
187       api_key_(api_key),
188       next_request_headers_(std::move(request_headers)),
189       use_compression_(use_compression) {}
190 
191 absl::StatusOr<std::unique_ptr<HttpRequest>>
CreateProtocolRequest(absl::string_view uri_path_suffix,QueryParams params,HttpRequest::Method method,std::string request_body,bool is_protobuf_encoded) const192 ProtocolRequestCreator::CreateProtocolRequest(absl::string_view uri_path_suffix,
193                                               QueryParams params,
194                                               HttpRequest::Method method,
195                                               std::string request_body,
196                                               bool is_protobuf_encoded) const {
197   return CreateHttpRequest(uri_path_suffix, std::move(params), method,
198                            std::move(request_body), is_protobuf_encoded,
199                            use_compression_);
200 }
201 
202 absl::StatusOr<std::unique_ptr<HttpRequest>>
CreateGetOperationRequest(absl::string_view operation_name) const203 ProtocolRequestCreator::CreateGetOperationRequest(
204     absl::string_view operation_name) const {
205   FCP_ASSIGN_OR_RETURN(std::string uri_path_suffix,
206                        CreateGetOperationUriSuffix(operation_name));
207   return CreateHttpRequest(uri_path_suffix, {}, HttpRequest::Method::kGet, "",
208                            /*is_protobuf_encoded=*/true,
209                            /*use_compression=*/false);
210 }
211 
212 absl::StatusOr<std::unique_ptr<HttpRequest>>
CreateCancelOperationRequest(absl::string_view operation_name) const213 ProtocolRequestCreator::CreateCancelOperationRequest(
214     absl::string_view operation_name) const {
215   FCP_ASSIGN_OR_RETURN(std::string uri_path_suffix,
216                        CreateCancelOperationUriSuffix(operation_name));
217   return CreateHttpRequest(uri_path_suffix, {}, HttpRequest::Method::kGet, "",
218                            /*is_protobuf_encoded=*/true,
219                            /*use_compression=*/false);
220 }
221 
222 absl::StatusOr<std::unique_ptr<HttpRequest>>
CreateHttpRequest(absl::string_view uri_path_suffix,QueryParams params,HttpRequest::Method method,std::string request_body,bool is_protobuf_encoded,bool use_compression) const223 ProtocolRequestCreator::CreateHttpRequest(absl::string_view uri_path_suffix,
224                                           QueryParams params,
225                                           HttpRequest::Method method,
226                                           std::string request_body,
227                                           bool is_protobuf_encoded,
228                                           bool use_compression) const {
229   HeaderList request_headers = next_request_headers_;
230   request_headers.push_back({kApiKeyHdr, api_key_});
231   if (is_protobuf_encoded) {
232     if (!request_body.empty()) {
233       request_headers.push_back({kContentTypeHdr, kProtobufContentType});
234     }
235 
236     // %24alt is the percent encoded $alt.  "$" is prepended to alt to indicate
237     // that "alt" is a system parameter.
238     // https://cloud.google.com/apis/docs/system-parameters#http_mapping
239     params["%24alt"] = "proto";
240   }
241   std::string uri_with_params = std::string(uri_path_suffix);
242   if (!params.empty()) {
243     uri_with_params = CreateUriSuffixFromPathAndParams(uri_path_suffix, params);
244   }
245   FCP_ASSIGN_OR_RETURN(
246       std::string uri,
247       JoinBaseUriWithSuffix(next_request_base_uri_, uri_with_params));
248 
249   return InMemoryHttpRequest::Create(uri, method, request_headers,
250                                      std::move(request_body), use_compression);
251 }
252 
253 absl::StatusOr<std::unique_ptr<ProtocolRequestCreator>>
Create(absl::string_view api_key,const ForwardingInfo & forwarding_info,bool use_compression)254 ProtocolRequestCreator::Create(absl::string_view api_key,
255                                const ForwardingInfo& forwarding_info,
256                                bool use_compression) {
257   // Extract the base URI and headers to use for the subsequent request.
258   if (forwarding_info.target_uri_prefix().empty()) {
259     return absl::InvalidArgumentError(
260         "Missing `ForwardingInfo.target_uri_prefix`");
261   }
262   const auto& new_headers = forwarding_info.extra_request_headers();
263   return std::make_unique<ProtocolRequestCreator>(ProtocolRequestCreator(
264       forwarding_info.target_uri_prefix(), api_key,
265       HeaderList(new_headers.begin(), new_headers.end()), use_compression));
266 }
267 
ProtocolRequestHelper(HttpClient * http_client,int64_t * bytes_downloaded,int64_t * bytes_uploaded,WallClockStopwatch * network_stopwatch,Clock * clock)268 ProtocolRequestHelper::ProtocolRequestHelper(
269     HttpClient* http_client, int64_t* bytes_downloaded, int64_t* bytes_uploaded,
270     WallClockStopwatch* network_stopwatch, Clock* clock)
271     : http_client_(*http_client),
272       bytes_downloaded_(*bytes_downloaded),
273       bytes_uploaded_(*bytes_uploaded),
274       network_stopwatch_(*network_stopwatch),
275       clock_(*clock) {}
276 
277 absl::StatusOr<InMemoryHttpResponse>
PerformProtocolRequest(std::unique_ptr<HttpRequest> request,InterruptibleRunner & runner)278 ProtocolRequestHelper::PerformProtocolRequest(
279     std::unique_ptr<HttpRequest> request, InterruptibleRunner& runner) {
280   std::vector<std::unique_ptr<HttpRequest>> requests;
281   requests.push_back(std::move(request));
282   FCP_ASSIGN_OR_RETURN(
283       std::vector<absl::StatusOr<InMemoryHttpResponse>> response,
284       PerformMultipleProtocolRequests(std::move(requests), runner));
285   return std::move(response[0]);
286 }
287 
288 absl::StatusOr<std::vector<absl::StatusOr<InMemoryHttpResponse>>>
PerformMultipleProtocolRequests(std::vector<std::unique_ptr<http::HttpRequest>> requests,InterruptibleRunner & runner)289 ProtocolRequestHelper::PerformMultipleProtocolRequests(
290     std::vector<std::unique_ptr<http::HttpRequest>> requests,
291     InterruptibleRunner& runner) {
292   // Check whether issuing the request failed as a whole (generally indicating
293   // a programming error).
294   std::vector<absl::StatusOr<InMemoryHttpResponse>> responses;
295   {
296     auto started_stopwatch = network_stopwatch_.Start();
297     FCP_ASSIGN_OR_RETURN(responses,
298                          PerformMultipleRequestsInMemory(
299                              http_client_, runner, std::move(requests),
300                              &bytes_downloaded_, &bytes_uploaded_));
301   }
302   std::vector<absl::StatusOr<InMemoryHttpResponse>> results;
303   std::transform(responses.begin(), responses.end(),
304                  std::back_inserter(results), CheckResponseContentEncoding);
305   return results;
306 }
307 
308 // absl::StatusOr<::google::longrunning::Operation>
309 // ProtocolRequestHelper::PollOperationResponseUntilDone(
310 //     const Operation& initial_operation,
311 //     const ProtocolRequestCreator& request_creator,
312 //     InterruptibleRunner& runner) {
313 //   // There are three cases that lead to this method returning:
314 //   // - The HTTP response indicates an error.
315 //   // - The HTTP response cannot be parsed into an Operation proto.
316 //   // - The response `Operation.done` field is true.
317 //   //
318 //   // In all other cases we continue to poll the Operation via a subsequent
319 //   // GetOperationRequest.
320 //   Operation response_operation_proto = initial_operation;
321 //   while (true) {
322 //     // If the Operation is done then return it.
323 //     if (response_operation_proto.done()) {
324 //       return std::move(response_operation_proto);
325 //     }
326 
327 //     FCP_ASSIGN_OR_RETURN(std::string operation_name,
328 //                          ExtractOperationName(response_operation_proto));
329 
330 //     // Wait for server returned polling interval before sending next request.
331 //     clock_.Sleep(GetPollingInterval(response_operation_proto));
332 //     // The response Operation indicates that the result isn't ready yet. Poll
333 //     // again.
334 //     FCP_ASSIGN_OR_RETURN(
335 //         std::unique_ptr<HttpRequest> get_operation_request,
336 //         request_creator.CreateGetOperationRequest(operation_name));
337 //     absl::StatusOr<InMemoryHttpResponse> http_response =
338 //         PerformProtocolRequest(std::move(get_operation_request), runner);
339 //     FCP_ASSIGN_OR_RETURN(response_operation_proto,
340 //                          ParseOperationProtoFromHttpResponse(http_response));
341 //   }
342 // }
343 
344 // absl::StatusOr<InMemoryHttpResponse> ProtocolRequestHelper::CancelOperation(
345 //     absl::string_view operation_name,
346 //     const ProtocolRequestCreator& request_creator,
347 //     InterruptibleRunner& runner) {
348 //   FCP_ASSIGN_OR_RETURN(
349 //       std::unique_ptr<HttpRequest> cancel_operation_request,
350 //       request_creator.CreateCancelOperationRequest(operation_name));
351 //   return PerformProtocolRequest(std::move(cancel_operation_request), runner);
352 // }
353 
354 // absl::StatusOr<Operation> ParseOperationProtoFromHttpResponse(
355 //     absl::StatusOr<InMemoryHttpResponse> http_response) {
356 //   // If the HTTP response indicates an error then return that error.
357 //   FCP_RETURN_IF_ERROR(http_response);
358 //   Operation response_operation_proto;
359 //   // Parse the response.
360 //   if (!response_operation_proto.ParseFromString(
361 //           std::string(http_response->body))) {
362 //     return absl::InvalidArgumentError("could not parse Operation proto");
363 //   }
364 //   return response_operation_proto;
365 // }
366 
367 // absl::StatusOr<std::string> ExtractOperationName(const Operation& operation)
368 // {
369 //   if (!absl::StartsWith(operation.name(), "operations/")) {
370 //     return absl::InvalidArgumentError(
371 //         "Cannot cancel an Operation with an invalid name");
372 //   }
373 //   return operation.name();
374 // }
375 }  // namespace http
376 }  // namespace client
377 }  // namespace fcp
378