xref: /aosp_15_r20/external/federated-compute/fcp/client/federated_select.h (revision 14675a029014e728ec732f129a32e299b2da0601)
1 /*
2  * Copyright 2022 Google LLC
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #ifndef FCP_CLIENT_FEDERATED_SELECT_H_
17 #define FCP_CLIENT_FEDERATED_SELECT_H_
18 
19 #include <deque>
20 #include <functional>
21 #include <memory>
22 #include <string>
23 #include <utility>
24 
25 #include "absl/status/statusor.h"
26 #include "absl/strings/cord.h"
27 #include "absl/time/time.h"
28 #include "fcp/base/wall_clock_stopwatch.h"
29 #include "fcp/client/engine/example_iterator_factory.h"
30 #include "fcp/client/files.h"
31 #include "fcp/client/http/http_client.h"
32 #include "fcp/client/interruptible_runner.h"
33 #include "fcp/client/log_manager.h"
34 #include "fcp/client/simple_task_environment.h"
35 #include "fcp/client/stats.h"
36 #include "fcp/protos/plan.pb.h"
37 
38 namespace fcp {
39 namespace client {
40 
41 // The example query collection URI via which slice fetch requests will arrive.
42 inline static constexpr char kFederatedSelectCollectionUri[] =
43     "internal:/federated_select";
44 
45 // An interface via which a Federated Select `ExampleIteratorFactory` can be
46 // created. Each factory is expected to fetch slice data using the given
47 // `uri_template`, and to then serve the slice data by writing it to a file and
48 // by then returning that filename as a tf.Example to the plan.
49 class FederatedSelectManager {
50  public:
51   virtual std::unique_ptr<::fcp::client::engine::ExampleIteratorFactory>
52   CreateExampleIteratorFactoryForUriTemplate(
53       absl::string_view uri_template) = 0;
54 
55   // The best estimate of the over-the-wire bytes downloaded and uploadeded over
56   // the network, and the total duration of wall clock time spent waiting on
57   // network requests.
58 
59   // Note that if two different slice fetches are in flight from different
60   // threads, this should measure just the wall clock time spent completing both
61   // sets of fetches (i.e. it should not double-count the wall clock time by
62   // summing each per-thread duration individually).
63   //
64   // If possible, this estimate should also include time spent decompressing
65   // payloads after reading them from the network.
66   virtual NetworkStats GetNetworkStats() = 0;
67 
~FederatedSelectManager()68   virtual ~FederatedSelectManager() {}
69 };
70 
71 // An base class for `ExampleIteratorFactory` implementations that can handle
72 // Federated Select example queries.
73 class FederatedSelectExampleIteratorFactory
74     : public ::fcp::client::engine::ExampleIteratorFactory {
75  public:
CanHandle(const::google::internal::federated::plan::ExampleSelector & example_selector)76   bool CanHandle(const ::google::internal::federated::plan::ExampleSelector&
77                      example_selector) override {
78     return example_selector.collection_uri() == kFederatedSelectCollectionUri;
79   }
80 
ShouldCollectStats()81   bool ShouldCollectStats() override {
82     // Federated Select example queries should not be recorded in the OpStats
83     // DB, since the fact that Federated Select uses the example iterator
84     // interface is an internal implementation detail.
85     return false;
86   }
87 };
88 
89 class DisabledFederatedSelectManager : public FederatedSelectManager {
90  public:
91   explicit DisabledFederatedSelectManager(LogManager* log_manager);
92 
93   std::unique_ptr<::fcp::client::engine::ExampleIteratorFactory>
94   CreateExampleIteratorFactoryForUriTemplate(
95       absl::string_view uri_template) override;
96 
GetNetworkStats()97   NetworkStats GetNetworkStats() override { return NetworkStats(); }
98 
99  private:
100   LogManager& log_manager_;
101 };
102 
103 // A FederatedSelectManager implementation that actually issues HTTP requests to
104 // fetch slice data (i.e. the "real" implementation).
105 class HttpFederatedSelectManager : public FederatedSelectManager {
106  public:
107   HttpFederatedSelectManager(
108       LogManager* log_manager, Files* files,
109       fcp::client::http::HttpClient* http_client,
110       std::function<bool()> should_abort,
111       const InterruptibleRunner::TimingConfig& timing_config);
112 
113   std::unique_ptr<::fcp::client::engine::ExampleIteratorFactory>
114   CreateExampleIteratorFactoryForUriTemplate(
115       absl::string_view uri_template) override;
116 
GetNetworkStats()117   NetworkStats GetNetworkStats() override {
118     return {.bytes_downloaded = bytes_received_.load(),
119             .bytes_uploaded = bytes_sent_.load(),
120             .network_duration = network_stopwatch_->GetTotalDuration()};
121   }
122 
123  private:
124   LogManager& log_manager_;
125   Files& files_;
126   std::atomic<int64_t> bytes_sent_ = 0;
127   std::atomic<int64_t> bytes_received_ = 0;
128   std::unique_ptr<WallClockStopwatch> network_stopwatch_ =
129       WallClockStopwatch::Create();
130   fcp::client::http::HttpClient& http_client_;
131   std::unique_ptr<InterruptibleRunner> interruptible_runner_;
132 };
133 
134 // A Federated Select ExampleIterator that simply returns slice data that is
135 // already in-memory.
136 class InMemoryFederatedSelectExampleIterator : public ExampleIterator {
137  public:
138   // Each time another slice is requested by a call to Next(), the slice data at
139   // the front of the `slices` deque will be written to the `scratch_filename`
140   // and the filename will be returned as the example data. The scratch file
141   // will be deleted at the end of the iterator, or when the iterator is closed.
InMemoryFederatedSelectExampleIterator(std::string scratch_filename,std::deque<absl::Cord> slices)142   InMemoryFederatedSelectExampleIterator(std::string scratch_filename,
143                                          std::deque<absl::Cord> slices)
144       : scratch_filename_(scratch_filename), slices_(std::move(slices)) {}
145   absl::StatusOr<std::string> Next() override;
146   void Close() override;
147 
148   ~InMemoryFederatedSelectExampleIterator() override;
149 
150  private:
151   void CleanupInternal() ABSL_LOCKS_EXCLUDED(mutex_);
152 
153   std::string scratch_filename_;
154 
155   absl::Mutex mutex_;
156   std::deque<absl::Cord> slices_ ABSL_GUARDED_BY(mutex_);
157 };
158 
159 }  // namespace client
160 }  // namespace fcp
161 
162 #endif  // FCP_CLIENT_FEDERATED_SELECT_H_
163