1 /* 2 * Copyright 2022 Google LLC 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 #ifndef FCP_CLIENT_FEDERATED_SELECT_H_ 17 #define FCP_CLIENT_FEDERATED_SELECT_H_ 18 19 #include <deque> 20 #include <functional> 21 #include <memory> 22 #include <string> 23 #include <utility> 24 25 #include "absl/status/statusor.h" 26 #include "absl/strings/cord.h" 27 #include "absl/time/time.h" 28 #include "fcp/base/wall_clock_stopwatch.h" 29 #include "fcp/client/engine/example_iterator_factory.h" 30 #include "fcp/client/files.h" 31 #include "fcp/client/http/http_client.h" 32 #include "fcp/client/interruptible_runner.h" 33 #include "fcp/client/log_manager.h" 34 #include "fcp/client/simple_task_environment.h" 35 #include "fcp/client/stats.h" 36 #include "fcp/protos/plan.pb.h" 37 38 namespace fcp { 39 namespace client { 40 41 // The example query collection URI via which slice fetch requests will arrive. 42 inline static constexpr char kFederatedSelectCollectionUri[] = 43 "internal:/federated_select"; 44 45 // An interface via which a Federated Select `ExampleIteratorFactory` can be 46 // created. Each factory is expected to fetch slice data using the given 47 // `uri_template`, and to then serve the slice data by writing it to a file and 48 // by then returning that filename as a tf.Example to the plan. 49 class FederatedSelectManager { 50 public: 51 virtual std::unique_ptr<::fcp::client::engine::ExampleIteratorFactory> 52 CreateExampleIteratorFactoryForUriTemplate( 53 absl::string_view uri_template) = 0; 54 55 // The best estimate of the over-the-wire bytes downloaded and uploadeded over 56 // the network, and the total duration of wall clock time spent waiting on 57 // network requests. 58 59 // Note that if two different slice fetches are in flight from different 60 // threads, this should measure just the wall clock time spent completing both 61 // sets of fetches (i.e. it should not double-count the wall clock time by 62 // summing each per-thread duration individually). 63 // 64 // If possible, this estimate should also include time spent decompressing 65 // payloads after reading them from the network. 66 virtual NetworkStats GetNetworkStats() = 0; 67 ~FederatedSelectManager()68 virtual ~FederatedSelectManager() {} 69 }; 70 71 // An base class for `ExampleIteratorFactory` implementations that can handle 72 // Federated Select example queries. 73 class FederatedSelectExampleIteratorFactory 74 : public ::fcp::client::engine::ExampleIteratorFactory { 75 public: CanHandle(const::google::internal::federated::plan::ExampleSelector & example_selector)76 bool CanHandle(const ::google::internal::federated::plan::ExampleSelector& 77 example_selector) override { 78 return example_selector.collection_uri() == kFederatedSelectCollectionUri; 79 } 80 ShouldCollectStats()81 bool ShouldCollectStats() override { 82 // Federated Select example queries should not be recorded in the OpStats 83 // DB, since the fact that Federated Select uses the example iterator 84 // interface is an internal implementation detail. 85 return false; 86 } 87 }; 88 89 class DisabledFederatedSelectManager : public FederatedSelectManager { 90 public: 91 explicit DisabledFederatedSelectManager(LogManager* log_manager); 92 93 std::unique_ptr<::fcp::client::engine::ExampleIteratorFactory> 94 CreateExampleIteratorFactoryForUriTemplate( 95 absl::string_view uri_template) override; 96 GetNetworkStats()97 NetworkStats GetNetworkStats() override { return NetworkStats(); } 98 99 private: 100 LogManager& log_manager_; 101 }; 102 103 // A FederatedSelectManager implementation that actually issues HTTP requests to 104 // fetch slice data (i.e. the "real" implementation). 105 class HttpFederatedSelectManager : public FederatedSelectManager { 106 public: 107 HttpFederatedSelectManager( 108 LogManager* log_manager, Files* files, 109 fcp::client::http::HttpClient* http_client, 110 std::function<bool()> should_abort, 111 const InterruptibleRunner::TimingConfig& timing_config); 112 113 std::unique_ptr<::fcp::client::engine::ExampleIteratorFactory> 114 CreateExampleIteratorFactoryForUriTemplate( 115 absl::string_view uri_template) override; 116 GetNetworkStats()117 NetworkStats GetNetworkStats() override { 118 return {.bytes_downloaded = bytes_received_.load(), 119 .bytes_uploaded = bytes_sent_.load(), 120 .network_duration = network_stopwatch_->GetTotalDuration()}; 121 } 122 123 private: 124 LogManager& log_manager_; 125 Files& files_; 126 std::atomic<int64_t> bytes_sent_ = 0; 127 std::atomic<int64_t> bytes_received_ = 0; 128 std::unique_ptr<WallClockStopwatch> network_stopwatch_ = 129 WallClockStopwatch::Create(); 130 fcp::client::http::HttpClient& http_client_; 131 std::unique_ptr<InterruptibleRunner> interruptible_runner_; 132 }; 133 134 // A Federated Select ExampleIterator that simply returns slice data that is 135 // already in-memory. 136 class InMemoryFederatedSelectExampleIterator : public ExampleIterator { 137 public: 138 // Each time another slice is requested by a call to Next(), the slice data at 139 // the front of the `slices` deque will be written to the `scratch_filename` 140 // and the filename will be returned as the example data. The scratch file 141 // will be deleted at the end of the iterator, or when the iterator is closed. InMemoryFederatedSelectExampleIterator(std::string scratch_filename,std::deque<absl::Cord> slices)142 InMemoryFederatedSelectExampleIterator(std::string scratch_filename, 143 std::deque<absl::Cord> slices) 144 : scratch_filename_(scratch_filename), slices_(std::move(slices)) {} 145 absl::StatusOr<std::string> Next() override; 146 void Close() override; 147 148 ~InMemoryFederatedSelectExampleIterator() override; 149 150 private: 151 void CleanupInternal() ABSL_LOCKS_EXCLUDED(mutex_); 152 153 std::string scratch_filename_; 154 155 absl::Mutex mutex_; 156 std::deque<absl::Cord> slices_ ABSL_GUARDED_BY(mutex_); 157 }; 158 159 } // namespace client 160 } // namespace fcp 161 162 #endif // FCP_CLIENT_FEDERATED_SELECT_H_ 163