xref: /aosp_15_r20/external/federated-compute/fcp/client/engine/example_query_plan_engine_test.cc (revision 14675a029014e728ec732f129a32e299b2da0601)
1 /*
2  * Copyright 2023 Google LLC
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "fcp/client/engine/example_query_plan_engine.h"
17 
18 #include <fcntl.h>
19 
20 #include <cstdint>
21 #include <filesystem>
22 #include <functional>
23 #include <memory>
24 #include <string>
25 #include <utility>
26 #include <vector>
27 
28 #include "absl/container/flat_hash_map.h"
29 #include "absl/status/status.h"
30 #include "absl/status/statusor.h"
31 #include "absl/strings/str_format.h"
32 #include "fcp/client/client_runner.h"
33 #include "fcp/client/engine/common.h"
34 #include "fcp/client/example_query_result.pb.h"
35 #include "fcp/client/test_helpers.h"
36 #include "fcp/protos/plan.pb.h"
37 #include "gmock/gmock.h"
38 #include "gtest/gtest.h"
39 #include "tensorflow/c/checkpoint_reader.h"
40 #include "tensorflow/c/tf_status.h"
41 #include "tensorflow/c/tf_status_helper.h"
42 #include "tensorflow/core/framework/tensor.h"
43 #include "tensorflow/core/framework/tensor_shape.h"
44 #include "tensorflow/core/framework/types.pb.h"
45 
46 namespace fcp {
47 namespace client {
48 namespace engine {
49 namespace {
50 
51 namespace tf = ::tensorflow;
52 
53 using ::fcp::client::ExampleQueryResult;
54 using ::google::internal::federated::plan::AggregationConfig;
55 using ::google::internal::federated::plan::ClientOnlyPlan;
56 using ::google::internal::federated::plan::Dataset;
57 using ::google::internal::federated::plan::ExampleQuerySpec;
58 using ::google::internal::federated::plan::ExampleSelector;
59 using ::testing::StrictMock;
60 
61 const char* const kCollectionUri = "app:/test_collection";
62 const char* const kOutputStringVectorName = "vector1";
63 const char* const kOutputIntVectorName = "vector2";
64 const char* const kOutputStringTensorName = "tensor1";
65 const char* const kOutputIntTensorName = "tensor2";
66 
67 class InvalidExampleIteratorFactory : public ExampleIteratorFactory {
68  public:
69   InvalidExampleIteratorFactory() = default;
70 
CanHandle(const google::internal::federated::plan::ExampleSelector & example_selector)71   bool CanHandle(const google::internal::federated::plan::ExampleSelector&
72                      example_selector) override {
73     return false;
74   }
75 
CreateExampleIterator(const ExampleSelector & example_selector)76   absl::StatusOr<std::unique_ptr<ExampleIterator>> CreateExampleIterator(
77       const ExampleSelector& example_selector) override {
78     absl::Status error(absl::StatusCode::kInternal, "");
79     return error;
80   }
81 
ShouldCollectStats()82   bool ShouldCollectStats() override { return false; }
83 };
84 
85 class NoIteratorExampleIteratorFactory : public ExampleIteratorFactory {
86  public:
87   NoIteratorExampleIteratorFactory() = default;
88 
CanHandle(const google::internal::federated::plan::ExampleSelector & example_selector)89   bool CanHandle(const google::internal::federated::plan::ExampleSelector&
90                      example_selector) override {
91     return true;
92   }
93 
CreateExampleIterator(const ExampleSelector & example_selector)94   absl::StatusOr<std::unique_ptr<ExampleIterator>> CreateExampleIterator(
95       const ExampleSelector& example_selector) override {
96     absl::Status error(absl::StatusCode::kInternal, "");
97     return error;
98   }
99 
ShouldCollectStats()100   bool ShouldCollectStats() override { return false; }
101 };
102 
103 class TwoExampleIteratorsFactory : public ExampleIteratorFactory {
104  public:
TwoExampleIteratorsFactory(std::function<absl::StatusOr<std::unique_ptr<ExampleIterator>> (const google::internal::federated::plan::ExampleSelector &)> create_first_iterator_func,std::function<absl::StatusOr<std::unique_ptr<ExampleIterator>> (const google::internal::federated::plan::ExampleSelector &)> create_second_iterator_func,const std::string & first_collection_uri,const std::string & second_collection_uri)105   explicit TwoExampleIteratorsFactory(
106       std::function<absl::StatusOr<std::unique_ptr<ExampleIterator>>(
107           const google::internal::federated::plan::ExampleSelector&
108 
109           )>
110           create_first_iterator_func,
111       std::function<absl::StatusOr<std::unique_ptr<ExampleIterator>>(
112           const google::internal::federated::plan::ExampleSelector&
113 
114           )>
115           create_second_iterator_func,
116       const std::string& first_collection_uri,
117       const std::string& second_collection_uri)
118       : create_first_iterator_func_(create_first_iterator_func),
119         create_second_iterator_func_(create_second_iterator_func),
120         first_collection_uri_(first_collection_uri),
121         second_collection_uri_(second_collection_uri) {}
122 
CanHandle(const google::internal::federated::plan::ExampleSelector & example_selector)123   bool CanHandle(const google::internal::federated::plan::ExampleSelector&
124                      example_selector) override {
125     return true;
126   }
127 
CreateExampleIterator(const google::internal::federated::plan::ExampleSelector & example_selector)128   absl::StatusOr<std::unique_ptr<ExampleIterator>> CreateExampleIterator(
129       const google::internal::federated::plan::ExampleSelector&
130           example_selector) override {
131     if (example_selector.collection_uri() == first_collection_uri_) {
132       return create_first_iterator_func_(example_selector);
133     } else if (example_selector.collection_uri() == second_collection_uri_) {
134       return create_second_iterator_func_(example_selector);
135     }
136     return absl::InvalidArgumentError("Unknown collection URI");
137   }
138 
ShouldCollectStats()139   bool ShouldCollectStats() override { return false; }
140 
141  private:
142   std::function<absl::StatusOr<std::unique_ptr<ExampleIterator>>(
143       const google::internal::federated::plan::ExampleSelector&)>
144       create_first_iterator_func_;
145   std::function<absl::StatusOr<std::unique_ptr<ExampleIterator>>(
146       const google::internal::federated::plan::ExampleSelector&)>
147       create_second_iterator_func_;
148   std::string first_collection_uri_;
149   std::string second_collection_uri_;
150 };
151 
ReadTensors(std::string checkpoint_path)152 absl::StatusOr<absl::flat_hash_map<std::string, tf::Tensor>> ReadTensors(
153     std::string checkpoint_path) {
154   absl::flat_hash_map<std::string, tf::Tensor> tensors;
155   tf::TF_StatusPtr tf_status(TF_NewStatus());
156   tf::checkpoint::CheckpointReader tf_checkpoint_reader(checkpoint_path,
157                                                         tf_status.get());
158   if (TF_GetCode(tf_status.get()) != TF_OK) {
159     return absl::NotFoundError("Couldn't read an input checkpoint");
160   }
161   for (const auto& [name, tf_dtype] :
162        tf_checkpoint_reader.GetVariableToDataTypeMap()) {
163     std::unique_ptr<tf::Tensor> tensor;
164     tf_checkpoint_reader.GetTensor(name, &tensor, tf_status.get());
165     if (TF_GetCode(tf_status.get()) != TF_OK) {
166       return absl::NotFoundError(
167           absl::StrFormat("Checkpoint doesn't have tensor %s", name));
168     }
169     tensors[name] = *tensor;
170   }
171 
172   return tensors;
173 }
174 
175 class ExampleQueryPlanEngineTest : public testing::Test {
176  protected:
Initialize()177   void Initialize() {
178     std::filesystem::path root_dir(testing::TempDir());
179     std::filesystem::path output_path = root_dir / std::string("output.ckpt");
180     output_checkpoint_filename_ = output_path.string();
181 
182     ExampleQuerySpec::OutputVectorSpec string_vector_spec;
183     string_vector_spec.set_vector_name(kOutputStringVectorName);
184     string_vector_spec.set_data_type(
185         ExampleQuerySpec::OutputVectorSpec::STRING);
186     ExampleQuerySpec::OutputVectorSpec int_vector_spec;
187     int_vector_spec.set_vector_name(kOutputIntVectorName);
188     int_vector_spec.set_data_type(ExampleQuerySpec::OutputVectorSpec::INT64);
189 
190     ExampleQuerySpec::ExampleQuery example_query;
191     example_query.mutable_example_selector()->set_collection_uri(
192         kCollectionUri);
193     (*example_query.mutable_output_vector_specs())[kOutputStringTensorName] =
194         string_vector_spec;
195     (*example_query.mutable_output_vector_specs())[kOutputIntTensorName] =
196         int_vector_spec;
197     client_only_plan_.mutable_phase()
198         ->mutable_example_query_spec()
199         ->mutable_example_queries()
200         ->Add(std::move(example_query));
201 
202     AggregationConfig aggregation_config;
203     aggregation_config.mutable_tf_v1_checkpoint_aggregation();
204     (*client_only_plan_.mutable_phase()
205           ->mutable_federated_example_query()
206           ->mutable_aggregations())[kOutputStringTensorName] =
207         aggregation_config;
208     (*client_only_plan_.mutable_phase()
209           ->mutable_federated_example_query()
210           ->mutable_aggregations())[kOutputIntTensorName] = aggregation_config;
211 
212     ExampleQueryResult::VectorData::Values int_values;
213     int_values.mutable_int64_values()->add_value(42);
214     int_values.mutable_int64_values()->add_value(24);
215     (*example_query_result_.mutable_vector_data()
216           ->mutable_vectors())[kOutputIntVectorName] = int_values;
217     ExampleQueryResult::VectorData::Values string_values;
218     string_values.mutable_string_values()->add_value("value1");
219     string_values.mutable_string_values()->add_value("value2");
220     (*example_query_result_.mutable_vector_data()
221           ->mutable_vectors())[kOutputStringVectorName] = string_values;
222     std::string example = example_query_result_.SerializeAsString();
223 
224     Dataset::ClientDataset client_dataset;
225     client_dataset.set_client_id("client_id");
226     client_dataset.add_example(example);
227     dataset_.mutable_client_data()->Add(std::move(client_dataset));
228 
229     num_examples_ = 1;
230     example_bytes_ = example.size();
231 
232     example_iterator_factory_ =
233         std::make_unique<FunctionalExampleIteratorFactory>(
234             [&dataset = dataset_](
235                 const google::internal::federated::plan::ExampleSelector&
236                     selector) {
237               return std::make_unique<SimpleExampleIterator>(dataset);
238             });
239   }
240 
241   fcp::client::FilesImpl files_impl_;
242   StrictMock<MockOpStatsLogger> mock_opstats_logger_;
243   std::unique_ptr<ExampleIteratorFactory> example_iterator_factory_;
244 
245   ExampleQueryResult example_query_result_;
246   ClientOnlyPlan client_only_plan_;
247   Dataset dataset_;
248   std::string output_checkpoint_filename_;
249 
250   int num_examples_ = 0;
251   int64_t example_bytes_ = 0;
252 };
253 
TEST_F(ExampleQueryPlanEngineTest,PlanSucceeds)254 TEST_F(ExampleQueryPlanEngineTest, PlanSucceeds) {
255   Initialize();
256 
257   EXPECT_CALL(
258       mock_opstats_logger_,
259       UpdateDatasetStats(kCollectionUri, num_examples_, example_bytes_));
260 
261   ExampleQueryPlanEngine plan_engine({example_iterator_factory_.get()},
262                                      &mock_opstats_logger_);
263   engine::PlanResult result =
264       plan_engine.RunPlan(client_only_plan_.phase().example_query_spec(),
265                           output_checkpoint_filename_);
266 
267   EXPECT_THAT(result.outcome, PlanOutcome::kSuccess);
268 
269   auto tensors = ReadTensors(output_checkpoint_filename_);
270   ASSERT_TRUE(tensors.ok());
271   tf::Tensor int_tensor = tensors.value()[kOutputIntTensorName];
272   ASSERT_EQ(int_tensor.shape(), tf::TensorShape({2}));
273   ASSERT_EQ(int_tensor.dtype(), tf::DT_INT64);
274   auto int_data = static_cast<int64_t*>(int_tensor.data());
275   std::vector<int64_t> expected_int_data({42, 24});
276   for (int i = 0; i < 2; ++i) {
277     ASSERT_EQ(int_data[i], expected_int_data[i]);
278   }
279 
280   tf::Tensor string_tensor = tensors.value()[kOutputStringTensorName];
281   ASSERT_EQ(string_tensor.shape(), tf::TensorShape({2}));
282   ASSERT_EQ(string_tensor.dtype(), tf::DT_STRING);
283   auto string_data = static_cast<tf::tstring*>(string_tensor.data());
284   std::vector<std::string> expected_string_data({"value1", "value2"});
285   for (int i = 0; i < 2; ++i) {
286     ASSERT_EQ(static_cast<std::string>(string_data[i]),
287               expected_string_data[i]);
288   }
289 }
290 
TEST_F(ExampleQueryPlanEngineTest,MultipleQueries)291 TEST_F(ExampleQueryPlanEngineTest, MultipleQueries) {
292   Initialize();
293 
294   ExampleQuerySpec::OutputVectorSpec float_vector_spec;
295   float_vector_spec.set_vector_name("float_vector");
296   float_vector_spec.set_data_type(ExampleQuerySpec::OutputVectorSpec::FLOAT);
297   ExampleQuerySpec::OutputVectorSpec string_vector_spec;
298   // Same vector name as in the other ExampleQuery, but with a different output
299   // one to make sure these vectors are distinguished in
300   // example_query_plan_engine.
301   string_vector_spec.set_vector_name(kOutputStringVectorName);
302   string_vector_spec.set_data_type(ExampleQuerySpec::OutputVectorSpec::STRING);
303 
304   ExampleQuerySpec::ExampleQuery second_example_query;
305   second_example_query.mutable_example_selector()->set_collection_uri(
306       "app:/second_collection");
307   (*second_example_query.mutable_output_vector_specs())["float_tensor"] =
308       float_vector_spec;
309   (*second_example_query
310         .mutable_output_vector_specs())["another_string_tensor"] =
311       string_vector_spec;
312   client_only_plan_.mutable_phase()
313       ->mutable_example_query_spec()
314       ->mutable_example_queries()
315       ->Add(std::move(second_example_query));
316 
317   AggregationConfig aggregation_config;
318   aggregation_config.mutable_tf_v1_checkpoint_aggregation();
319   (*client_only_plan_.mutable_phase()
320         ->mutable_federated_example_query()
321         ->mutable_aggregations())["float_tensor"] = aggregation_config;
322 
323   ExampleQueryResult second_example_query_result;
324   ExampleQueryResult::VectorData::Values float_values;
325   float_values.mutable_float_values()->add_value(0.24f);
326   float_values.mutable_float_values()->add_value(0.42f);
327   float_values.mutable_float_values()->add_value(0.33f);
328   ExampleQueryResult::VectorData::Values string_values;
329   string_values.mutable_string_values()->add_value("another_string_value");
330   (*second_example_query_result.mutable_vector_data()
331         ->mutable_vectors())["float_vector"] = float_values;
332   (*second_example_query_result.mutable_vector_data()
333         ->mutable_vectors())[kOutputStringVectorName] = string_values;
334   std::string example = second_example_query_result.SerializeAsString();
335 
336   Dataset::ClientDataset dataset;
337   dataset.set_client_id("second_client_id");
338   dataset.add_example(example);
339   Dataset second_dataset;
340   second_dataset.mutable_client_data()->Add(std::move(dataset));
341 
342   example_iterator_factory_ = std::make_unique<TwoExampleIteratorsFactory>(
343       [&dataset = dataset_](
344           const google::internal::federated::plan::ExampleSelector& selector) {
345         return std::make_unique<SimpleExampleIterator>(dataset);
346       },
347       [&dataset = second_dataset](
348           const google::internal::federated::plan::ExampleSelector& selector) {
349         return std::make_unique<SimpleExampleIterator>(dataset);
350       },
351       kCollectionUri, "app:/second_collection");
352 
353   ExampleQueryPlanEngine plan_engine({example_iterator_factory_.get()},
354                                      &mock_opstats_logger_);
355   engine::PlanResult result =
356       plan_engine.RunPlan(client_only_plan_.phase().example_query_spec(),
357                           output_checkpoint_filename_);
358 
359   EXPECT_THAT(result.outcome, PlanOutcome::kSuccess);
360 
361   auto tensors = ReadTensors(output_checkpoint_filename_);
362   // ASSERT_OK is not supported in AOSP.
363   ASSERT_TRUE(tensors.ok());
364   tf::Tensor int_tensor = tensors.value()[kOutputIntTensorName];
365   ASSERT_EQ(int_tensor.shape(), tf::TensorShape({2}));
366   ASSERT_EQ(int_tensor.dtype(), tf::DT_INT64);
367   auto int_data = static_cast<int64_t*>(int_tensor.data());
368   std::vector<int64_t> expected_int_data({42, 24});
369   for (int i = 0; i < 2; ++i) {
370     ASSERT_EQ(int_data[i], expected_int_data[i]);
371   }
372 
373   tf::Tensor string_tensor = tensors.value()[kOutputStringTensorName];
374   ASSERT_EQ(string_tensor.shape(), tf::TensorShape({2}));
375   ASSERT_EQ(string_tensor.dtype(), tf::DT_STRING);
376   auto string_data = static_cast<tf::tstring*>(string_tensor.data());
377   std::vector<std::string> expected_string_data({"value1", "value2"});
378   for (int i = 0; i < 2; ++i) {
379     ASSERT_EQ(static_cast<std::string>(string_data[i]),
380               expected_string_data[i]);
381   }
382 
383   tf::Tensor float_tensor = tensors.value()["float_tensor"];
384   ASSERT_EQ(float_tensor.shape(), tf::TensorShape({3}));
385   ASSERT_EQ(float_tensor.dtype(), tf::DT_FLOAT);
386   auto float_data = static_cast<float*>(float_tensor.data());
387   std::vector<float> expected_float_data({0.24f, 0.42f, 0.33f});
388   for (int i = 0; i < 3; ++i) {
389     ASSERT_EQ(float_data[i], expected_float_data[i]);
390   }
391 
392   tf::Tensor second_query_string_tensor =
393       tensors.value()["another_string_tensor"];
394   ASSERT_EQ(second_query_string_tensor.shape(), tf::TensorShape({1}));
395   ASSERT_EQ(second_query_string_tensor.dtype(), tf::DT_STRING);
396   auto second_query_string_data =
397       static_cast<tf::tstring*>(second_query_string_tensor.data());
398   ASSERT_EQ(static_cast<std::string>(*second_query_string_data),
399             "another_string_value");
400 }
401 
TEST_F(ExampleQueryPlanEngineTest,OutputVectorSpecMissingInResult)402 TEST_F(ExampleQueryPlanEngineTest, OutputVectorSpecMissingInResult) {
403   Initialize();
404 
405   ExampleQuerySpec::OutputVectorSpec new_vector_spec;
406   new_vector_spec.set_vector_name("new_vector");
407   new_vector_spec.set_data_type(ExampleQuerySpec::OutputVectorSpec::DOUBLE);
408 
409   ExampleQuerySpec::ExampleQuery example_query =
410       client_only_plan_.phase().example_query_spec().example_queries().at(0);
411   (*example_query.mutable_output_vector_specs())["new_tensor"] =
412       new_vector_spec;
413   client_only_plan_.mutable_phase()
414       ->mutable_example_query_spec()
415       ->clear_example_queries();
416   client_only_plan_.mutable_phase()
417       ->mutable_example_query_spec()
418       ->mutable_example_queries()
419       ->Add(std::move(example_query));
420 
421   ExampleQueryResult example_query_result;
422   ExampleQueryResult::VectorData::Values bool_values;
423   bool_values.mutable_bool_values()->add_value(true);
424   (*example_query_result_.mutable_vector_data()
425         ->mutable_vectors())["new_vector"] = bool_values;
426   std::string example = example_query_result_.SerializeAsString();
427 
428   Dataset::ClientDataset client_dataset;
429   client_dataset.set_client_id("client_id");
430   client_dataset.add_example(example);
431   dataset_.clear_client_data();
432   dataset_.mutable_client_data()->Add(std::move(client_dataset));
433 
434   num_examples_ = 1;
435   example_bytes_ = example.size();
436 
437   example_iterator_factory_ =
438       std::make_unique<FunctionalExampleIteratorFactory>(
439           [&dataset = dataset_](
440               const google::internal::federated::plan::ExampleSelector&
441                   selector) {
442             return std::make_unique<SimpleExampleIterator>(dataset);
443           });
444 
445   EXPECT_CALL(
446       mock_opstats_logger_,
447       UpdateDatasetStats(kCollectionUri, num_examples_, example_bytes_));
448 
449   ExampleQueryPlanEngine plan_engine({example_iterator_factory_.get()},
450                                      &mock_opstats_logger_);
451   engine::PlanResult result =
452       plan_engine.RunPlan(client_only_plan_.phase().example_query_spec(),
453                           output_checkpoint_filename_);
454 
455   EXPECT_THAT(result.outcome, PlanOutcome::kExampleIteratorError);
456 }
457 
TEST_F(ExampleQueryPlanEngineTest,OutputVectorSpecTypeMismatch)458 TEST_F(ExampleQueryPlanEngineTest, OutputVectorSpecTypeMismatch) {
459   Initialize();
460 
461   ExampleQuerySpec::OutputVectorSpec new_vector_spec;
462   new_vector_spec.set_vector_name("new_vector");
463   new_vector_spec.set_data_type(ExampleQuerySpec::OutputVectorSpec::DOUBLE);
464 
465   ExampleQuerySpec::ExampleQuery example_query =
466       client_only_plan_.phase().example_query_spec().example_queries().at(0);
467   (*example_query.mutable_output_vector_specs())["new_tensor"] =
468       new_vector_spec;
469   client_only_plan_.mutable_phase()
470       ->mutable_example_query_spec()
471       ->clear_example_queries();
472   client_only_plan_.mutable_phase()
473       ->mutable_example_query_spec()
474       ->mutable_example_queries()
475       ->Add(std::move(example_query));
476 
477   EXPECT_CALL(
478       mock_opstats_logger_,
479       UpdateDatasetStats(kCollectionUri, num_examples_, example_bytes_));
480 
481   ExampleQueryPlanEngine plan_engine({example_iterator_factory_.get()},
482                                      &mock_opstats_logger_);
483   engine::PlanResult result =
484       plan_engine.RunPlan(client_only_plan_.phase().example_query_spec(),
485                           output_checkpoint_filename_);
486 
487   EXPECT_THAT(result.outcome, PlanOutcome::kExampleIteratorError);
488 }
489 
TEST_F(ExampleQueryPlanEngineTest,FactoryNotFound)490 TEST_F(ExampleQueryPlanEngineTest, FactoryNotFound) {
491   Initialize();
492   auto invalid_example_factory =
493       std::make_unique<InvalidExampleIteratorFactory>();
494 
495   ExampleQueryPlanEngine plan_engine({invalid_example_factory.get()},
496                                      &mock_opstats_logger_);
497   engine::PlanResult result =
498       plan_engine.RunPlan(client_only_plan_.phase().example_query_spec(),
499                           output_checkpoint_filename_);
500 
501   EXPECT_THAT(result.outcome, PlanOutcome::kExampleIteratorError);
502 }
503 
TEST_F(ExampleQueryPlanEngineTest,NoIteratorCreated)504 TEST_F(ExampleQueryPlanEngineTest, NoIteratorCreated) {
505   Initialize();
506   auto invalid_example_factory =
507       std::make_unique<NoIteratorExampleIteratorFactory>();
508 
509   ExampleQueryPlanEngine plan_engine({invalid_example_factory.get()},
510                                      &mock_opstats_logger_);
511   engine::PlanResult result =
512       plan_engine.RunPlan(client_only_plan_.phase().example_query_spec(),
513                           output_checkpoint_filename_);
514 
515   EXPECT_THAT(result.outcome, PlanOutcome::kExampleIteratorError);
516 }
517 
TEST_F(ExampleQueryPlanEngineTest,InvalidExampleQueryResultFormat)518 TEST_F(ExampleQueryPlanEngineTest, InvalidExampleQueryResultFormat) {
519   Initialize();
520   std::string invalid_example = "invalid_example";
521   Dataset::ClientDataset client_dataset;
522   client_dataset.add_example(invalid_example);
523   dataset_.clear_client_data();
524   dataset_.mutable_client_data()->Add(std::move(client_dataset));
525   example_iterator_factory_ =
526       std::make_unique<FunctionalExampleIteratorFactory>(
527           [&dataset = dataset_](
528               const google::internal::federated::plan::ExampleSelector&
529                   selector) {
530             return std::make_unique<SimpleExampleIterator>(dataset);
531           });
532   EXPECT_CALL(mock_opstats_logger_,
533               UpdateDatasetStats(kCollectionUri, 1, invalid_example.size()));
534 
535   ExampleQueryPlanEngine plan_engine({example_iterator_factory_.get()},
536                                      &mock_opstats_logger_);
537   engine::PlanResult result =
538       plan_engine.RunPlan(client_only_plan_.phase().example_query_spec(),
539                           output_checkpoint_filename_);
540 
541   EXPECT_THAT(result.outcome, PlanOutcome::kExampleIteratorError);
542 }
543 
544 }  // anonymous namespace
545 }  // namespace engine
546 }  // namespace client
547 }  // namespace fcp