1 /*
2 * Copyright 2022 Google LLC
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "fcp/client/http/protocol_request_helper.h"
17
18 #include "absl/strings/substitute.h"
19 #include "fcp/base/time_util.h"
20 #include "fcp/client/http/http_client_util.h"
21 #include "fcp/protos/federatedcompute/secure_aggregations.pb.h"
22 #include "fcp/protos/federatedcompute/task_assignments.pb.h"
23 #include "google/protobuf/any.pb.h"
24
25 namespace fcp {
26 namespace client {
27 namespace http {
28
29 // The default interval when polling pending operations.
30 const absl::Duration kDefaultLroPollingInterval = absl::Milliseconds(500);
31 // The maximum interval when polling pending operations.
32 const absl::Duration kMaxLroPollingInterval = absl::Minutes(1);
33
34 constexpr absl::string_view kStartTaskAssignmentMetadata =
35 "type.googleapis.com/"
36 "google.internal.federatedcompute.v1.StartTaskAssignmentMetadata"; // NOLINT
37 constexpr absl::string_view kAdvertiseKeysMetadata =
38 "type.googleapis.com/"
39 "google.internal.federatedcompute.v1.AdvertiseKeysMetadata"; // NOLINT
40 constexpr absl::string_view kShareKeysMetadata =
41 "type.googleapis.com/google.internal.federatedcompute.v1.ShareKeysMetadata";
42 constexpr absl::string_view kSubmitSecureAggregationResultMetadata =
43 "type.googleapis.com/"
44 "google.internal.federatedcompute.v1."
45 "SubmitSecureAggregationResultMetadata"; // NOLINT
46
47 using ::google::internal::federatedcompute::v1::AdvertiseKeysMetadata;
48 using ::google::internal::federatedcompute::v1::ForwardingInfo;
49 using ::google::internal::federatedcompute::v1::ShareKeysMetadata;
50 using ::google::internal::federatedcompute::v1::StartTaskAssignmentMetadata;
51 using ::google::internal::federatedcompute::v1::
52 SubmitSecureAggregationResultMetadata;
53 // using ::google::longrunning::Operation;
54
55 namespace {
56 // A note on error handling:
57 //
58 // The implementation here makes a distinction between what we call 'transient'
59 // and 'permanent' errors. While the exact categorization of transient vs.
60 // permanent errors is defined by a flag, the intent is that transient errors
61 // are those types of errors that may occur in the regular course of business,
62 // e.g. due to an interrupted network connection, a load balancer temporarily
63 // rejecting our request etc. Generally, these are expected to be resolvable by
64 // merely retrying the request at a slightly later time. Permanent errors are
65 // intended to be those that are not expected to be resolvable as quickly or by
66 // merely retrying the request. E.g. if a client checks in to the server with a
67 // population name that doesn't exist, then the server may return NOT_FOUND, and
68 // until the server-side configuration is changed, it will continue returning
69 // such an error. Hence, such errors can warrant a longer retry period (to waste
70 // less of both the client's and server's resources).
71 //
72 // The errors also differ in how they interact with the server-specified retry
73 // windows that are returned via the EligbilityEvalTaskResponse message.
74 // - If a permanent error occurs, then we will always return a retry window
75 // based on the target 'permanent errors retry period' flag, regardless of
76 // whether we received an EligbilityEvalTaskResponse from the server at an
77 // earlier time.
78 // - If a transient error occurs, then we will only return a retry window
79 // based on the target 'transient errors retry period' flag if the server
80 // didn't already return an EligibilityEvalTaskResponse. If it did return such
81 // a response, then one of the retry windows in that message will be used
82 // instead.
83 //
84 // Finally, note that for simplicity's sake we generally check whether a
85 // permanent error was received at the level of this class's public methods,
86 // rather than deeper down in each of our helper methods that actually call
87 // directly into the HTTP stack. This keeps our state-managing code simpler, but
88 // does mean that if any of our helper methods like
89 // PerformEligibilityEvalTaskRequest produce a permanent error code locally
90 // (i.e. without it being sent by the server), it will be treated as if the
91 // server sent it and the permanent error retry period will be used. We consider
92 // this a reasonable tradeoff.
93
CreateUriSuffixFromPathAndParams(absl::string_view path,const QueryParams & params)94 std::string CreateUriSuffixFromPathAndParams(absl::string_view path,
95 const QueryParams& params) {
96 return absl::StrCat(path, "?",
97 absl::StrJoin(params.begin(), params.end(), "&",
98 absl::PairFormatter("=")));
99 }
100
101 // Creates the URI suffix for a GetOperation protocol request.
CreateGetOperationUriSuffix(absl::string_view operation_name)102 absl::StatusOr<std::string> CreateGetOperationUriSuffix(
103 absl::string_view operation_name) {
104 constexpr absl::string_view kGetOperationUriSuffix = "/v1/$0";
105 FCP_ASSIGN_OR_RETURN(std::string encoded_operation_name,
106 EncodeUriMultiplePathSegments(operation_name));
107 return absl::Substitute(kGetOperationUriSuffix, encoded_operation_name);
108 }
109
110 // Creates the URI suffix for a CancelOperation protocol request.
CreateCancelOperationUriSuffix(absl::string_view operation_name)111 absl::StatusOr<std::string> CreateCancelOperationUriSuffix(
112 absl::string_view operation_name) {
113 constexpr absl::string_view kCancelOperationUriSuffix = "/v1/$0:cancel";
114 FCP_ASSIGN_OR_RETURN(std::string encoded_operation_name,
115 EncodeUriMultiplePathSegments(operation_name));
116 return absl::Substitute(kCancelOperationUriSuffix, encoded_operation_name);
117 }
118
CheckResponseContentEncoding(absl::StatusOr<InMemoryHttpResponse> response)119 absl::StatusOr<InMemoryHttpResponse> CheckResponseContentEncoding(
120 absl::StatusOr<InMemoryHttpResponse> response) {
121 if (response.ok() && !response->content_encoding.empty()) {
122 // Note that the `HttpClient` API contract ensures that if we don't specify
123 // an Accept-Encoding request header, then the response should be delivered
124 // to us without any Content-Encoding applied to it. Hence, if we somehow do
125 // still see a Content-Encoding response header then the `HttpClient`
126 // implementation isn't adhering to its part of the API contract.
127 return absl::UnavailableError(
128 "HTTP response unexpectedly has a Content-Encoding");
129 }
130 return response;
131 }
132
133 // Extract polling interval from the operation proto.
134 // The returned polling interval will be within the range of [1ms, 1min]. If
135 // the polling interval inside the operation proto is outside this range, it'll
136 // be clipped to the nearest boundary. If the polling interval is unset, 1ms
137 // will be returned.
138 // absl::Duration GetPollingInterval(Operation operation) {
139 // absl::string_view type_url = operation.metadata().type_url();
140 // google::protobuf::Duration polling_interval_proto;
141 // if (type_url == kStartTaskAssignmentMetadata) {
142 // StartTaskAssignmentMetadata metadata;
143 // if (!operation.metadata().UnpackTo(&metadata)) {
144 // return kDefaultLroPollingInterval;
145 // }
146 // polling_interval_proto = metadata.polling_interval();
147 // } else if (type_url == kAdvertiseKeysMetadata) {
148 // AdvertiseKeysMetadata metadata;
149 // if (!operation.metadata().UnpackTo(&metadata)) {
150 // return kDefaultLroPollingInterval;
151 // }
152 // polling_interval_proto = metadata.polling_interval();
153 // } else if (type_url == kShareKeysMetadata) {
154 // ShareKeysMetadata metadata;
155 // if (!operation.metadata().UnpackTo(&metadata)) {
156 // return kDefaultLroPollingInterval;
157 // }
158 // polling_interval_proto = metadata.polling_interval();
159 // } else if (type_url == kSubmitSecureAggregationResultMetadata) {
160 // SubmitSecureAggregationResultMetadata metadata;
161 // if (!operation.metadata().UnpackTo(&metadata)) {
162 // return kDefaultLroPollingInterval;
163 // }
164 // polling_interval_proto = metadata.polling_interval();
165 // } else {
166 // // Unknown type
167 // return kDefaultLroPollingInterval;
168 // }
169
170 // absl::Duration polling_interval =
171 // TimeUtil::ConvertProtoToAbslDuration(polling_interval_proto);
172 // if (polling_interval < absl::ZeroDuration()) {
173 // return kDefaultLroPollingInterval;
174 // } else if (polling_interval > kMaxLroPollingInterval) {
175 // return kMaxLroPollingInterval;
176 // } else {
177 // return polling_interval;
178 // }
179 // }
180
181 } // anonymous namespace
182
ProtocolRequestCreator(absl::string_view request_base_uri,absl::string_view api_key,HeaderList request_headers,bool use_compression)183 ProtocolRequestCreator::ProtocolRequestCreator(
184 absl::string_view request_base_uri, absl::string_view api_key,
185 HeaderList request_headers, bool use_compression)
186 : next_request_base_uri_(request_base_uri),
187 api_key_(api_key),
188 next_request_headers_(std::move(request_headers)),
189 use_compression_(use_compression) {}
190
191 absl::StatusOr<std::unique_ptr<HttpRequest>>
CreateProtocolRequest(absl::string_view uri_path_suffix,QueryParams params,HttpRequest::Method method,std::string request_body,bool is_protobuf_encoded) const192 ProtocolRequestCreator::CreateProtocolRequest(absl::string_view uri_path_suffix,
193 QueryParams params,
194 HttpRequest::Method method,
195 std::string request_body,
196 bool is_protobuf_encoded) const {
197 return CreateHttpRequest(uri_path_suffix, std::move(params), method,
198 std::move(request_body), is_protobuf_encoded,
199 use_compression_);
200 }
201
202 absl::StatusOr<std::unique_ptr<HttpRequest>>
CreateGetOperationRequest(absl::string_view operation_name) const203 ProtocolRequestCreator::CreateGetOperationRequest(
204 absl::string_view operation_name) const {
205 FCP_ASSIGN_OR_RETURN(std::string uri_path_suffix,
206 CreateGetOperationUriSuffix(operation_name));
207 return CreateHttpRequest(uri_path_suffix, {}, HttpRequest::Method::kGet, "",
208 /*is_protobuf_encoded=*/true,
209 /*use_compression=*/false);
210 }
211
212 absl::StatusOr<std::unique_ptr<HttpRequest>>
CreateCancelOperationRequest(absl::string_view operation_name) const213 ProtocolRequestCreator::CreateCancelOperationRequest(
214 absl::string_view operation_name) const {
215 FCP_ASSIGN_OR_RETURN(std::string uri_path_suffix,
216 CreateCancelOperationUriSuffix(operation_name));
217 return CreateHttpRequest(uri_path_suffix, {}, HttpRequest::Method::kGet, "",
218 /*is_protobuf_encoded=*/true,
219 /*use_compression=*/false);
220 }
221
222 absl::StatusOr<std::unique_ptr<HttpRequest>>
CreateHttpRequest(absl::string_view uri_path_suffix,QueryParams params,HttpRequest::Method method,std::string request_body,bool is_protobuf_encoded,bool use_compression) const223 ProtocolRequestCreator::CreateHttpRequest(absl::string_view uri_path_suffix,
224 QueryParams params,
225 HttpRequest::Method method,
226 std::string request_body,
227 bool is_protobuf_encoded,
228 bool use_compression) const {
229 HeaderList request_headers = next_request_headers_;
230 request_headers.push_back({kApiKeyHdr, api_key_});
231 if (is_protobuf_encoded) {
232 if (!request_body.empty()) {
233 request_headers.push_back({kContentTypeHdr, kProtobufContentType});
234 }
235
236 // %24alt is the percent encoded $alt. "$" is prepended to alt to indicate
237 // that "alt" is a system parameter.
238 // https://cloud.google.com/apis/docs/system-parameters#http_mapping
239 params["%24alt"] = "proto";
240 }
241 std::string uri_with_params = std::string(uri_path_suffix);
242 if (!params.empty()) {
243 uri_with_params = CreateUriSuffixFromPathAndParams(uri_path_suffix, params);
244 }
245 FCP_ASSIGN_OR_RETURN(
246 std::string uri,
247 JoinBaseUriWithSuffix(next_request_base_uri_, uri_with_params));
248
249 return InMemoryHttpRequest::Create(uri, method, request_headers,
250 std::move(request_body), use_compression);
251 }
252
253 absl::StatusOr<std::unique_ptr<ProtocolRequestCreator>>
Create(absl::string_view api_key,const ForwardingInfo & forwarding_info,bool use_compression)254 ProtocolRequestCreator::Create(absl::string_view api_key,
255 const ForwardingInfo& forwarding_info,
256 bool use_compression) {
257 // Extract the base URI and headers to use for the subsequent request.
258 if (forwarding_info.target_uri_prefix().empty()) {
259 return absl::InvalidArgumentError(
260 "Missing `ForwardingInfo.target_uri_prefix`");
261 }
262 const auto& new_headers = forwarding_info.extra_request_headers();
263 return std::make_unique<ProtocolRequestCreator>(ProtocolRequestCreator(
264 forwarding_info.target_uri_prefix(), api_key,
265 HeaderList(new_headers.begin(), new_headers.end()), use_compression));
266 }
267
ProtocolRequestHelper(HttpClient * http_client,int64_t * bytes_downloaded,int64_t * bytes_uploaded,WallClockStopwatch * network_stopwatch,Clock * clock)268 ProtocolRequestHelper::ProtocolRequestHelper(
269 HttpClient* http_client, int64_t* bytes_downloaded, int64_t* bytes_uploaded,
270 WallClockStopwatch* network_stopwatch, Clock* clock)
271 : http_client_(*http_client),
272 bytes_downloaded_(*bytes_downloaded),
273 bytes_uploaded_(*bytes_uploaded),
274 network_stopwatch_(*network_stopwatch),
275 clock_(*clock) {}
276
277 absl::StatusOr<InMemoryHttpResponse>
PerformProtocolRequest(std::unique_ptr<HttpRequest> request,InterruptibleRunner & runner)278 ProtocolRequestHelper::PerformProtocolRequest(
279 std::unique_ptr<HttpRequest> request, InterruptibleRunner& runner) {
280 std::vector<std::unique_ptr<HttpRequest>> requests;
281 requests.push_back(std::move(request));
282 FCP_ASSIGN_OR_RETURN(
283 std::vector<absl::StatusOr<InMemoryHttpResponse>> response,
284 PerformMultipleProtocolRequests(std::move(requests), runner));
285 return std::move(response[0]);
286 }
287
288 absl::StatusOr<std::vector<absl::StatusOr<InMemoryHttpResponse>>>
PerformMultipleProtocolRequests(std::vector<std::unique_ptr<http::HttpRequest>> requests,InterruptibleRunner & runner)289 ProtocolRequestHelper::PerformMultipleProtocolRequests(
290 std::vector<std::unique_ptr<http::HttpRequest>> requests,
291 InterruptibleRunner& runner) {
292 // Check whether issuing the request failed as a whole (generally indicating
293 // a programming error).
294 std::vector<absl::StatusOr<InMemoryHttpResponse>> responses;
295 {
296 auto started_stopwatch = network_stopwatch_.Start();
297 FCP_ASSIGN_OR_RETURN(responses,
298 PerformMultipleRequestsInMemory(
299 http_client_, runner, std::move(requests),
300 &bytes_downloaded_, &bytes_uploaded_));
301 }
302 std::vector<absl::StatusOr<InMemoryHttpResponse>> results;
303 std::transform(responses.begin(), responses.end(),
304 std::back_inserter(results), CheckResponseContentEncoding);
305 return results;
306 }
307
308 // absl::StatusOr<::google::longrunning::Operation>
309 // ProtocolRequestHelper::PollOperationResponseUntilDone(
310 // const Operation& initial_operation,
311 // const ProtocolRequestCreator& request_creator,
312 // InterruptibleRunner& runner) {
313 // // There are three cases that lead to this method returning:
314 // // - The HTTP response indicates an error.
315 // // - The HTTP response cannot be parsed into an Operation proto.
316 // // - The response `Operation.done` field is true.
317 // //
318 // // In all other cases we continue to poll the Operation via a subsequent
319 // // GetOperationRequest.
320 // Operation response_operation_proto = initial_operation;
321 // while (true) {
322 // // If the Operation is done then return it.
323 // if (response_operation_proto.done()) {
324 // return std::move(response_operation_proto);
325 // }
326
327 // FCP_ASSIGN_OR_RETURN(std::string operation_name,
328 // ExtractOperationName(response_operation_proto));
329
330 // // Wait for server returned polling interval before sending next request.
331 // clock_.Sleep(GetPollingInterval(response_operation_proto));
332 // // The response Operation indicates that the result isn't ready yet. Poll
333 // // again.
334 // FCP_ASSIGN_OR_RETURN(
335 // std::unique_ptr<HttpRequest> get_operation_request,
336 // request_creator.CreateGetOperationRequest(operation_name));
337 // absl::StatusOr<InMemoryHttpResponse> http_response =
338 // PerformProtocolRequest(std::move(get_operation_request), runner);
339 // FCP_ASSIGN_OR_RETURN(response_operation_proto,
340 // ParseOperationProtoFromHttpResponse(http_response));
341 // }
342 // }
343
344 // absl::StatusOr<InMemoryHttpResponse> ProtocolRequestHelper::CancelOperation(
345 // absl::string_view operation_name,
346 // const ProtocolRequestCreator& request_creator,
347 // InterruptibleRunner& runner) {
348 // FCP_ASSIGN_OR_RETURN(
349 // std::unique_ptr<HttpRequest> cancel_operation_request,
350 // request_creator.CreateCancelOperationRequest(operation_name));
351 // return PerformProtocolRequest(std::move(cancel_operation_request), runner);
352 // }
353
354 // absl::StatusOr<Operation> ParseOperationProtoFromHttpResponse(
355 // absl::StatusOr<InMemoryHttpResponse> http_response) {
356 // // If the HTTP response indicates an error then return that error.
357 // FCP_RETURN_IF_ERROR(http_response);
358 // Operation response_operation_proto;
359 // // Parse the response.
360 // if (!response_operation_proto.ParseFromString(
361 // std::string(http_response->body))) {
362 // return absl::InvalidArgumentError("could not parse Operation proto");
363 // }
364 // return response_operation_proto;
365 // }
366
367 // absl::StatusOr<std::string> ExtractOperationName(const Operation& operation)
368 // {
369 // if (!absl::StartsWith(operation.name(), "operations/")) {
370 // return absl::InvalidArgumentError(
371 // "Cannot cancel an Operation with an invalid name");
372 // }
373 // return operation.name();
374 // }
375 } // namespace http
376 } // namespace client
377 } // namespace fcp
378