1 /*
2 * Copyright 2023 Google LLC
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "fcp/client/engine/example_query_plan_engine.h"
17
18 #include <fcntl.h>
19
20 #include <cstdint>
21 #include <filesystem>
22 #include <functional>
23 #include <memory>
24 #include <string>
25 #include <utility>
26 #include <vector>
27
28 #include "absl/container/flat_hash_map.h"
29 #include "absl/status/status.h"
30 #include "absl/status/statusor.h"
31 #include "absl/strings/str_format.h"
32 #include "fcp/client/client_runner.h"
33 #include "fcp/client/engine/common.h"
34 #include "fcp/client/example_query_result.pb.h"
35 #include "fcp/client/test_helpers.h"
36 #include "fcp/protos/plan.pb.h"
37 #include "gmock/gmock.h"
38 #include "gtest/gtest.h"
39 #include "tensorflow/c/checkpoint_reader.h"
40 #include "tensorflow/c/tf_status.h"
41 #include "tensorflow/c/tf_status_helper.h"
42 #include "tensorflow/core/framework/tensor.h"
43 #include "tensorflow/core/framework/tensor_shape.h"
44 #include "tensorflow/core/framework/types.pb.h"
45
46 namespace fcp {
47 namespace client {
48 namespace engine {
49 namespace {
50
51 namespace tf = ::tensorflow;
52
53 using ::fcp::client::ExampleQueryResult;
54 using ::google::internal::federated::plan::AggregationConfig;
55 using ::google::internal::federated::plan::ClientOnlyPlan;
56 using ::google::internal::federated::plan::Dataset;
57 using ::google::internal::federated::plan::ExampleQuerySpec;
58 using ::google::internal::federated::plan::ExampleSelector;
59 using ::testing::StrictMock;
60
61 const char* const kCollectionUri = "app:/test_collection";
62 const char* const kOutputStringVectorName = "vector1";
63 const char* const kOutputIntVectorName = "vector2";
64 const char* const kOutputStringTensorName = "tensor1";
65 const char* const kOutputIntTensorName = "tensor2";
66
67 class InvalidExampleIteratorFactory : public ExampleIteratorFactory {
68 public:
69 InvalidExampleIteratorFactory() = default;
70
CanHandle(const google::internal::federated::plan::ExampleSelector & example_selector)71 bool CanHandle(const google::internal::federated::plan::ExampleSelector&
72 example_selector) override {
73 return false;
74 }
75
CreateExampleIterator(const ExampleSelector & example_selector)76 absl::StatusOr<std::unique_ptr<ExampleIterator>> CreateExampleIterator(
77 const ExampleSelector& example_selector) override {
78 absl::Status error(absl::StatusCode::kInternal, "");
79 return error;
80 }
81
ShouldCollectStats()82 bool ShouldCollectStats() override { return false; }
83 };
84
85 class NoIteratorExampleIteratorFactory : public ExampleIteratorFactory {
86 public:
87 NoIteratorExampleIteratorFactory() = default;
88
CanHandle(const google::internal::federated::plan::ExampleSelector & example_selector)89 bool CanHandle(const google::internal::federated::plan::ExampleSelector&
90 example_selector) override {
91 return true;
92 }
93
CreateExampleIterator(const ExampleSelector & example_selector)94 absl::StatusOr<std::unique_ptr<ExampleIterator>> CreateExampleIterator(
95 const ExampleSelector& example_selector) override {
96 absl::Status error(absl::StatusCode::kInternal, "");
97 return error;
98 }
99
ShouldCollectStats()100 bool ShouldCollectStats() override { return false; }
101 };
102
103 class TwoExampleIteratorsFactory : public ExampleIteratorFactory {
104 public:
TwoExampleIteratorsFactory(std::function<absl::StatusOr<std::unique_ptr<ExampleIterator>> (const google::internal::federated::plan::ExampleSelector &)> create_first_iterator_func,std::function<absl::StatusOr<std::unique_ptr<ExampleIterator>> (const google::internal::federated::plan::ExampleSelector &)> create_second_iterator_func,const std::string & first_collection_uri,const std::string & second_collection_uri)105 explicit TwoExampleIteratorsFactory(
106 std::function<absl::StatusOr<std::unique_ptr<ExampleIterator>>(
107 const google::internal::federated::plan::ExampleSelector&
108
109 )>
110 create_first_iterator_func,
111 std::function<absl::StatusOr<std::unique_ptr<ExampleIterator>>(
112 const google::internal::federated::plan::ExampleSelector&
113
114 )>
115 create_second_iterator_func,
116 const std::string& first_collection_uri,
117 const std::string& second_collection_uri)
118 : create_first_iterator_func_(create_first_iterator_func),
119 create_second_iterator_func_(create_second_iterator_func),
120 first_collection_uri_(first_collection_uri),
121 second_collection_uri_(second_collection_uri) {}
122
CanHandle(const google::internal::federated::plan::ExampleSelector & example_selector)123 bool CanHandle(const google::internal::federated::plan::ExampleSelector&
124 example_selector) override {
125 return true;
126 }
127
CreateExampleIterator(const google::internal::federated::plan::ExampleSelector & example_selector)128 absl::StatusOr<std::unique_ptr<ExampleIterator>> CreateExampleIterator(
129 const google::internal::federated::plan::ExampleSelector&
130 example_selector) override {
131 if (example_selector.collection_uri() == first_collection_uri_) {
132 return create_first_iterator_func_(example_selector);
133 } else if (example_selector.collection_uri() == second_collection_uri_) {
134 return create_second_iterator_func_(example_selector);
135 }
136 return absl::InvalidArgumentError("Unknown collection URI");
137 }
138
ShouldCollectStats()139 bool ShouldCollectStats() override { return false; }
140
141 private:
142 std::function<absl::StatusOr<std::unique_ptr<ExampleIterator>>(
143 const google::internal::federated::plan::ExampleSelector&)>
144 create_first_iterator_func_;
145 std::function<absl::StatusOr<std::unique_ptr<ExampleIterator>>(
146 const google::internal::federated::plan::ExampleSelector&)>
147 create_second_iterator_func_;
148 std::string first_collection_uri_;
149 std::string second_collection_uri_;
150 };
151
ReadTensors(std::string checkpoint_path)152 absl::StatusOr<absl::flat_hash_map<std::string, tf::Tensor>> ReadTensors(
153 std::string checkpoint_path) {
154 absl::flat_hash_map<std::string, tf::Tensor> tensors;
155 tf::TF_StatusPtr tf_status(TF_NewStatus());
156 tf::checkpoint::CheckpointReader tf_checkpoint_reader(checkpoint_path,
157 tf_status.get());
158 if (TF_GetCode(tf_status.get()) != TF_OK) {
159 return absl::NotFoundError("Couldn't read an input checkpoint");
160 }
161 for (const auto& [name, tf_dtype] :
162 tf_checkpoint_reader.GetVariableToDataTypeMap()) {
163 std::unique_ptr<tf::Tensor> tensor;
164 tf_checkpoint_reader.GetTensor(name, &tensor, tf_status.get());
165 if (TF_GetCode(tf_status.get()) != TF_OK) {
166 return absl::NotFoundError(
167 absl::StrFormat("Checkpoint doesn't have tensor %s", name));
168 }
169 tensors[name] = *tensor;
170 }
171
172 return tensors;
173 }
174
175 class ExampleQueryPlanEngineTest : public testing::Test {
176 protected:
Initialize()177 void Initialize() {
178 std::filesystem::path root_dir(testing::TempDir());
179 std::filesystem::path output_path = root_dir / std::string("output.ckpt");
180 output_checkpoint_filename_ = output_path.string();
181
182 ExampleQuerySpec::OutputVectorSpec string_vector_spec;
183 string_vector_spec.set_vector_name(kOutputStringVectorName);
184 string_vector_spec.set_data_type(
185 ExampleQuerySpec::OutputVectorSpec::STRING);
186 ExampleQuerySpec::OutputVectorSpec int_vector_spec;
187 int_vector_spec.set_vector_name(kOutputIntVectorName);
188 int_vector_spec.set_data_type(ExampleQuerySpec::OutputVectorSpec::INT64);
189
190 ExampleQuerySpec::ExampleQuery example_query;
191 example_query.mutable_example_selector()->set_collection_uri(
192 kCollectionUri);
193 (*example_query.mutable_output_vector_specs())[kOutputStringTensorName] =
194 string_vector_spec;
195 (*example_query.mutable_output_vector_specs())[kOutputIntTensorName] =
196 int_vector_spec;
197 client_only_plan_.mutable_phase()
198 ->mutable_example_query_spec()
199 ->mutable_example_queries()
200 ->Add(std::move(example_query));
201
202 AggregationConfig aggregation_config;
203 aggregation_config.mutable_tf_v1_checkpoint_aggregation();
204 (*client_only_plan_.mutable_phase()
205 ->mutable_federated_example_query()
206 ->mutable_aggregations())[kOutputStringTensorName] =
207 aggregation_config;
208 (*client_only_plan_.mutable_phase()
209 ->mutable_federated_example_query()
210 ->mutable_aggregations())[kOutputIntTensorName] = aggregation_config;
211
212 ExampleQueryResult::VectorData::Values int_values;
213 int_values.mutable_int64_values()->add_value(42);
214 int_values.mutable_int64_values()->add_value(24);
215 (*example_query_result_.mutable_vector_data()
216 ->mutable_vectors())[kOutputIntVectorName] = int_values;
217 ExampleQueryResult::VectorData::Values string_values;
218 string_values.mutable_string_values()->add_value("value1");
219 string_values.mutable_string_values()->add_value("value2");
220 (*example_query_result_.mutable_vector_data()
221 ->mutable_vectors())[kOutputStringVectorName] = string_values;
222 std::string example = example_query_result_.SerializeAsString();
223
224 Dataset::ClientDataset client_dataset;
225 client_dataset.set_client_id("client_id");
226 client_dataset.add_example(example);
227 dataset_.mutable_client_data()->Add(std::move(client_dataset));
228
229 num_examples_ = 1;
230 example_bytes_ = example.size();
231
232 example_iterator_factory_ =
233 std::make_unique<FunctionalExampleIteratorFactory>(
234 [&dataset = dataset_](
235 const google::internal::federated::plan::ExampleSelector&
236 selector) {
237 return std::make_unique<SimpleExampleIterator>(dataset);
238 });
239 }
240
241 fcp::client::FilesImpl files_impl_;
242 StrictMock<MockOpStatsLogger> mock_opstats_logger_;
243 std::unique_ptr<ExampleIteratorFactory> example_iterator_factory_;
244
245 ExampleQueryResult example_query_result_;
246 ClientOnlyPlan client_only_plan_;
247 Dataset dataset_;
248 std::string output_checkpoint_filename_;
249
250 int num_examples_ = 0;
251 int64_t example_bytes_ = 0;
252 };
253
TEST_F(ExampleQueryPlanEngineTest,PlanSucceeds)254 TEST_F(ExampleQueryPlanEngineTest, PlanSucceeds) {
255 Initialize();
256
257 EXPECT_CALL(
258 mock_opstats_logger_,
259 UpdateDatasetStats(kCollectionUri, num_examples_, example_bytes_));
260
261 ExampleQueryPlanEngine plan_engine({example_iterator_factory_.get()},
262 &mock_opstats_logger_);
263 engine::PlanResult result =
264 plan_engine.RunPlan(client_only_plan_.phase().example_query_spec(),
265 output_checkpoint_filename_);
266
267 EXPECT_THAT(result.outcome, PlanOutcome::kSuccess);
268
269 auto tensors = ReadTensors(output_checkpoint_filename_);
270 ASSERT_TRUE(tensors.ok());
271 tf::Tensor int_tensor = tensors.value()[kOutputIntTensorName];
272 ASSERT_EQ(int_tensor.shape(), tf::TensorShape({2}));
273 ASSERT_EQ(int_tensor.dtype(), tf::DT_INT64);
274 auto int_data = static_cast<int64_t*>(int_tensor.data());
275 std::vector<int64_t> expected_int_data({42, 24});
276 for (int i = 0; i < 2; ++i) {
277 ASSERT_EQ(int_data[i], expected_int_data[i]);
278 }
279
280 tf::Tensor string_tensor = tensors.value()[kOutputStringTensorName];
281 ASSERT_EQ(string_tensor.shape(), tf::TensorShape({2}));
282 ASSERT_EQ(string_tensor.dtype(), tf::DT_STRING);
283 auto string_data = static_cast<tf::tstring*>(string_tensor.data());
284 std::vector<std::string> expected_string_data({"value1", "value2"});
285 for (int i = 0; i < 2; ++i) {
286 ASSERT_EQ(static_cast<std::string>(string_data[i]),
287 expected_string_data[i]);
288 }
289 }
290
TEST_F(ExampleQueryPlanEngineTest,MultipleQueries)291 TEST_F(ExampleQueryPlanEngineTest, MultipleQueries) {
292 Initialize();
293
294 ExampleQuerySpec::OutputVectorSpec float_vector_spec;
295 float_vector_spec.set_vector_name("float_vector");
296 float_vector_spec.set_data_type(ExampleQuerySpec::OutputVectorSpec::FLOAT);
297 ExampleQuerySpec::OutputVectorSpec string_vector_spec;
298 // Same vector name as in the other ExampleQuery, but with a different output
299 // one to make sure these vectors are distinguished in
300 // example_query_plan_engine.
301 string_vector_spec.set_vector_name(kOutputStringVectorName);
302 string_vector_spec.set_data_type(ExampleQuerySpec::OutputVectorSpec::STRING);
303
304 ExampleQuerySpec::ExampleQuery second_example_query;
305 second_example_query.mutable_example_selector()->set_collection_uri(
306 "app:/second_collection");
307 (*second_example_query.mutable_output_vector_specs())["float_tensor"] =
308 float_vector_spec;
309 (*second_example_query
310 .mutable_output_vector_specs())["another_string_tensor"] =
311 string_vector_spec;
312 client_only_plan_.mutable_phase()
313 ->mutable_example_query_spec()
314 ->mutable_example_queries()
315 ->Add(std::move(second_example_query));
316
317 AggregationConfig aggregation_config;
318 aggregation_config.mutable_tf_v1_checkpoint_aggregation();
319 (*client_only_plan_.mutable_phase()
320 ->mutable_federated_example_query()
321 ->mutable_aggregations())["float_tensor"] = aggregation_config;
322
323 ExampleQueryResult second_example_query_result;
324 ExampleQueryResult::VectorData::Values float_values;
325 float_values.mutable_float_values()->add_value(0.24f);
326 float_values.mutable_float_values()->add_value(0.42f);
327 float_values.mutable_float_values()->add_value(0.33f);
328 ExampleQueryResult::VectorData::Values string_values;
329 string_values.mutable_string_values()->add_value("another_string_value");
330 (*second_example_query_result.mutable_vector_data()
331 ->mutable_vectors())["float_vector"] = float_values;
332 (*second_example_query_result.mutable_vector_data()
333 ->mutable_vectors())[kOutputStringVectorName] = string_values;
334 std::string example = second_example_query_result.SerializeAsString();
335
336 Dataset::ClientDataset dataset;
337 dataset.set_client_id("second_client_id");
338 dataset.add_example(example);
339 Dataset second_dataset;
340 second_dataset.mutable_client_data()->Add(std::move(dataset));
341
342 example_iterator_factory_ = std::make_unique<TwoExampleIteratorsFactory>(
343 [&dataset = dataset_](
344 const google::internal::federated::plan::ExampleSelector& selector) {
345 return std::make_unique<SimpleExampleIterator>(dataset);
346 },
347 [&dataset = second_dataset](
348 const google::internal::federated::plan::ExampleSelector& selector) {
349 return std::make_unique<SimpleExampleIterator>(dataset);
350 },
351 kCollectionUri, "app:/second_collection");
352
353 ExampleQueryPlanEngine plan_engine({example_iterator_factory_.get()},
354 &mock_opstats_logger_);
355 engine::PlanResult result =
356 plan_engine.RunPlan(client_only_plan_.phase().example_query_spec(),
357 output_checkpoint_filename_);
358
359 EXPECT_THAT(result.outcome, PlanOutcome::kSuccess);
360
361 auto tensors = ReadTensors(output_checkpoint_filename_);
362 // ASSERT_OK is not supported in AOSP.
363 ASSERT_TRUE(tensors.ok());
364 tf::Tensor int_tensor = tensors.value()[kOutputIntTensorName];
365 ASSERT_EQ(int_tensor.shape(), tf::TensorShape({2}));
366 ASSERT_EQ(int_tensor.dtype(), tf::DT_INT64);
367 auto int_data = static_cast<int64_t*>(int_tensor.data());
368 std::vector<int64_t> expected_int_data({42, 24});
369 for (int i = 0; i < 2; ++i) {
370 ASSERT_EQ(int_data[i], expected_int_data[i]);
371 }
372
373 tf::Tensor string_tensor = tensors.value()[kOutputStringTensorName];
374 ASSERT_EQ(string_tensor.shape(), tf::TensorShape({2}));
375 ASSERT_EQ(string_tensor.dtype(), tf::DT_STRING);
376 auto string_data = static_cast<tf::tstring*>(string_tensor.data());
377 std::vector<std::string> expected_string_data({"value1", "value2"});
378 for (int i = 0; i < 2; ++i) {
379 ASSERT_EQ(static_cast<std::string>(string_data[i]),
380 expected_string_data[i]);
381 }
382
383 tf::Tensor float_tensor = tensors.value()["float_tensor"];
384 ASSERT_EQ(float_tensor.shape(), tf::TensorShape({3}));
385 ASSERT_EQ(float_tensor.dtype(), tf::DT_FLOAT);
386 auto float_data = static_cast<float*>(float_tensor.data());
387 std::vector<float> expected_float_data({0.24f, 0.42f, 0.33f});
388 for (int i = 0; i < 3; ++i) {
389 ASSERT_EQ(float_data[i], expected_float_data[i]);
390 }
391
392 tf::Tensor second_query_string_tensor =
393 tensors.value()["another_string_tensor"];
394 ASSERT_EQ(second_query_string_tensor.shape(), tf::TensorShape({1}));
395 ASSERT_EQ(second_query_string_tensor.dtype(), tf::DT_STRING);
396 auto second_query_string_data =
397 static_cast<tf::tstring*>(second_query_string_tensor.data());
398 ASSERT_EQ(static_cast<std::string>(*second_query_string_data),
399 "another_string_value");
400 }
401
TEST_F(ExampleQueryPlanEngineTest,OutputVectorSpecMissingInResult)402 TEST_F(ExampleQueryPlanEngineTest, OutputVectorSpecMissingInResult) {
403 Initialize();
404
405 ExampleQuerySpec::OutputVectorSpec new_vector_spec;
406 new_vector_spec.set_vector_name("new_vector");
407 new_vector_spec.set_data_type(ExampleQuerySpec::OutputVectorSpec::DOUBLE);
408
409 ExampleQuerySpec::ExampleQuery example_query =
410 client_only_plan_.phase().example_query_spec().example_queries().at(0);
411 (*example_query.mutable_output_vector_specs())["new_tensor"] =
412 new_vector_spec;
413 client_only_plan_.mutable_phase()
414 ->mutable_example_query_spec()
415 ->clear_example_queries();
416 client_only_plan_.mutable_phase()
417 ->mutable_example_query_spec()
418 ->mutable_example_queries()
419 ->Add(std::move(example_query));
420
421 ExampleQueryResult example_query_result;
422 ExampleQueryResult::VectorData::Values bool_values;
423 bool_values.mutable_bool_values()->add_value(true);
424 (*example_query_result_.mutable_vector_data()
425 ->mutable_vectors())["new_vector"] = bool_values;
426 std::string example = example_query_result_.SerializeAsString();
427
428 Dataset::ClientDataset client_dataset;
429 client_dataset.set_client_id("client_id");
430 client_dataset.add_example(example);
431 dataset_.clear_client_data();
432 dataset_.mutable_client_data()->Add(std::move(client_dataset));
433
434 num_examples_ = 1;
435 example_bytes_ = example.size();
436
437 example_iterator_factory_ =
438 std::make_unique<FunctionalExampleIteratorFactory>(
439 [&dataset = dataset_](
440 const google::internal::federated::plan::ExampleSelector&
441 selector) {
442 return std::make_unique<SimpleExampleIterator>(dataset);
443 });
444
445 EXPECT_CALL(
446 mock_opstats_logger_,
447 UpdateDatasetStats(kCollectionUri, num_examples_, example_bytes_));
448
449 ExampleQueryPlanEngine plan_engine({example_iterator_factory_.get()},
450 &mock_opstats_logger_);
451 engine::PlanResult result =
452 plan_engine.RunPlan(client_only_plan_.phase().example_query_spec(),
453 output_checkpoint_filename_);
454
455 EXPECT_THAT(result.outcome, PlanOutcome::kExampleIteratorError);
456 }
457
TEST_F(ExampleQueryPlanEngineTest,OutputVectorSpecTypeMismatch)458 TEST_F(ExampleQueryPlanEngineTest, OutputVectorSpecTypeMismatch) {
459 Initialize();
460
461 ExampleQuerySpec::OutputVectorSpec new_vector_spec;
462 new_vector_spec.set_vector_name("new_vector");
463 new_vector_spec.set_data_type(ExampleQuerySpec::OutputVectorSpec::DOUBLE);
464
465 ExampleQuerySpec::ExampleQuery example_query =
466 client_only_plan_.phase().example_query_spec().example_queries().at(0);
467 (*example_query.mutable_output_vector_specs())["new_tensor"] =
468 new_vector_spec;
469 client_only_plan_.mutable_phase()
470 ->mutable_example_query_spec()
471 ->clear_example_queries();
472 client_only_plan_.mutable_phase()
473 ->mutable_example_query_spec()
474 ->mutable_example_queries()
475 ->Add(std::move(example_query));
476
477 EXPECT_CALL(
478 mock_opstats_logger_,
479 UpdateDatasetStats(kCollectionUri, num_examples_, example_bytes_));
480
481 ExampleQueryPlanEngine plan_engine({example_iterator_factory_.get()},
482 &mock_opstats_logger_);
483 engine::PlanResult result =
484 plan_engine.RunPlan(client_only_plan_.phase().example_query_spec(),
485 output_checkpoint_filename_);
486
487 EXPECT_THAT(result.outcome, PlanOutcome::kExampleIteratorError);
488 }
489
TEST_F(ExampleQueryPlanEngineTest,FactoryNotFound)490 TEST_F(ExampleQueryPlanEngineTest, FactoryNotFound) {
491 Initialize();
492 auto invalid_example_factory =
493 std::make_unique<InvalidExampleIteratorFactory>();
494
495 ExampleQueryPlanEngine plan_engine({invalid_example_factory.get()},
496 &mock_opstats_logger_);
497 engine::PlanResult result =
498 plan_engine.RunPlan(client_only_plan_.phase().example_query_spec(),
499 output_checkpoint_filename_);
500
501 EXPECT_THAT(result.outcome, PlanOutcome::kExampleIteratorError);
502 }
503
TEST_F(ExampleQueryPlanEngineTest,NoIteratorCreated)504 TEST_F(ExampleQueryPlanEngineTest, NoIteratorCreated) {
505 Initialize();
506 auto invalid_example_factory =
507 std::make_unique<NoIteratorExampleIteratorFactory>();
508
509 ExampleQueryPlanEngine plan_engine({invalid_example_factory.get()},
510 &mock_opstats_logger_);
511 engine::PlanResult result =
512 plan_engine.RunPlan(client_only_plan_.phase().example_query_spec(),
513 output_checkpoint_filename_);
514
515 EXPECT_THAT(result.outcome, PlanOutcome::kExampleIteratorError);
516 }
517
TEST_F(ExampleQueryPlanEngineTest,InvalidExampleQueryResultFormat)518 TEST_F(ExampleQueryPlanEngineTest, InvalidExampleQueryResultFormat) {
519 Initialize();
520 std::string invalid_example = "invalid_example";
521 Dataset::ClientDataset client_dataset;
522 client_dataset.add_example(invalid_example);
523 dataset_.clear_client_data();
524 dataset_.mutable_client_data()->Add(std::move(client_dataset));
525 example_iterator_factory_ =
526 std::make_unique<FunctionalExampleIteratorFactory>(
527 [&dataset = dataset_](
528 const google::internal::federated::plan::ExampleSelector&
529 selector) {
530 return std::make_unique<SimpleExampleIterator>(dataset);
531 });
532 EXPECT_CALL(mock_opstats_logger_,
533 UpdateDatasetStats(kCollectionUri, 1, invalid_example.size()));
534
535 ExampleQueryPlanEngine plan_engine({example_iterator_factory_.get()},
536 &mock_opstats_logger_);
537 engine::PlanResult result =
538 plan_engine.RunPlan(client_only_plan_.phase().example_query_spec(),
539 output_checkpoint_filename_);
540
541 EXPECT_THAT(result.outcome, PlanOutcome::kExampleIteratorError);
542 }
543
544 } // anonymous namespace
545 } // namespace engine
546 } // namespace client
547 } // namespace fcp