xref: /aosp_15_r20/external/federated-compute/fcp/client/opstats/opstats_example_store_test.cc (revision 14675a029014e728ec732f129a32e299b2da0601)
1 /*
2  * Copyright 2021 Google LLC
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "fcp/client/opstats/opstats_example_store.h"
17 
18 #include <string>
19 #include <utility>
20 
21 #include "google/protobuf/any.pb.h"
22 #include "google/protobuf/util/time_util.h"
23 #include "gmock/gmock.h"
24 #include "gtest/gtest.h"
25 #include "fcp/client/test_helpers.h"
26 #include "fcp/protos/federated_api.pb.h"
27 #include "fcp/protos/plan.pb.h"
28 #include "fcp/testing/testing.h"
29 #include "tensorflow/core/example/example.pb.h"
30 #include "tensorflow/core/example/feature.pb.h"
31 
32 namespace fcp {
33 namespace client {
34 namespace opstats {
35 namespace {
36 
37 using ::google::internal::federated::plan::ExampleSelector;
38 using ::google::internal::federatedml::v2::RetryWindow;
39 using ::google::protobuf::util::TimeUtil;
40 using ::testing::Return;
41 
42 constexpr char kTestTaskName[] = "stefans_really_cool_task";
43 
44 class OpStatsExampleStoreTest : public testing::Test {
45  public:
OpStatsExampleStoreTest()46   OpStatsExampleStoreTest() {
47     EXPECT_CALL(mock_opstats_logger_, IsOpStatsEnabled())
48         .WillRepeatedly(Return(true));
49     EXPECT_CALL(mock_opstats_logger_, GetOpStatsDb())
50         .WillRepeatedly(Return(&mock_db_));
51     EXPECT_CALL(mock_opstats_logger_, GetCurrentTaskName())
52         .WillRepeatedly(Return(kTestTaskName));
53   }
54 
55  protected:
CreateEvent(OperationalStats::Event::EventKind event_kind,int64_t event_time_ms)56   static OperationalStats::Event CreateEvent(
57       OperationalStats::Event::EventKind event_kind, int64_t event_time_ms) {
58     OperationalStats::Event event;
59     event.set_event_type(event_kind);
60     *event.mutable_timestamp() =
61         TimeUtil::MillisecondsToTimestamp(event_time_ms);
62     return event;
63   }
64 
CreateDatasetStats(int64_t num_examples_read,int64_t num_bytes_read)65   static OperationalStats::DatasetStats CreateDatasetStats(
66       int64_t num_examples_read, int64_t num_bytes_read) {
67     OperationalStats::DatasetStats stats;
68     stats.set_num_bytes_read(num_bytes_read);
69     stats.set_num_examples_read(num_examples_read);
70     return stats;
71   }
72 
73   testing::StrictMock<MockOpStatsLogger> mock_opstats_logger_;
74   testing::StrictMock<MockOpStatsDb> mock_db_;
75   testing::StrictMock<MockLogManager> mock_log_manager_;
76   OpStatsExampleIteratorFactory iterator_factory_ =
77       OpStatsExampleIteratorFactory(
78           &mock_opstats_logger_, &mock_log_manager_,
79           /*opstats_last_successful_contribution_criteria=*/false);
80 };
81 
TEST_F(OpStatsExampleStoreTest,TestInvalidCollectionUrl)82 TEST_F(OpStatsExampleStoreTest, TestInvalidCollectionUrl) {
83   ExampleSelector selector;
84   selector.set_collection_uri("INVALID");
85   EXPECT_CALL(mock_log_manager_,
86               LogDiag(ProdDiagCode::OPSTATS_INCORRECT_COLLECTION_URI));
87 
88   EXPECT_FALSE(iterator_factory_.CanHandle(selector));
89 
90   absl::StatusOr<std::unique_ptr<ExampleIterator>> status_or =
91       iterator_factory_.CreateExampleIterator(selector);
92   EXPECT_THAT(status_or.status(), IsCode(absl::StatusCode::kInvalidArgument));
93 }
94 
TEST_F(OpStatsExampleStoreTest,TestMalformedCriteria)95 TEST_F(OpStatsExampleStoreTest, TestMalformedCriteria) {
96   ExampleSelector selector;
97   selector.set_collection_uri(kOpStatsCollectionUri);
98   selector.mutable_criteria()->set_value("NOT_A_PROTO");
99   EXPECT_CALL(mock_log_manager_,
100               LogDiag(ProdDiagCode::OPSTATS_INVALID_SELECTION_CRITERIA));
101   absl::StatusOr<std::unique_ptr<ExampleIterator>> status_or =
102       iterator_factory_.CreateExampleIterator(selector);
103   EXPECT_THAT(status_or.status(), IsCode(absl::StatusCode::kInvalidArgument));
104 }
105 
TEST_F(OpStatsExampleStoreTest,TestInvalidCriteria)106 TEST_F(OpStatsExampleStoreTest, TestInvalidCriteria) {
107   ExampleSelector selector;
108   selector.set_collection_uri(kOpStatsCollectionUri);
109   OpStatsSelectionCriteria criteria;
110   *criteria.mutable_start_time() = TimeUtil::MillisecondsToTimestamp(2000L);
111   *criteria.mutable_end_time() = TimeUtil::MillisecondsToTimestamp(1000L);
112   selector.mutable_criteria()->PackFrom(criteria);
113   EXPECT_CALL(mock_log_manager_,
114               LogDiag(ProdDiagCode::OPSTATS_INVALID_SELECTION_CRITERIA));
115   absl::StatusOr<std::unique_ptr<ExampleIterator>> status_or =
116       iterator_factory_.CreateExampleIterator(selector);
117   EXPECT_THAT(status_or.status(), IsCode(absl::StatusCode::kInvalidArgument));
118 }
119 
TEST_F(OpStatsExampleStoreTest,TestReadFromDbFailed)120 TEST_F(OpStatsExampleStoreTest, TestReadFromDbFailed) {
121   ExampleSelector selector;
122   selector.set_collection_uri(kOpStatsCollectionUri);
123   EXPECT_CALL(mock_db_, Read())
124       .WillOnce(Return(absl::InternalError("Something's wrong.")));
125   absl::StatusOr<std::unique_ptr<ExampleIterator>> status_or =
126       iterator_factory_.CreateExampleIterator(selector);
127   EXPECT_THAT(status_or.status(), IsCode(absl::StatusCode::kInternal));
128 }
129 
TEST_F(OpStatsExampleStoreTest,Success)130 TEST_F(OpStatsExampleStoreTest, Success) {
131   // Prepare some data
132   OpStatsSequence opstats_sequence;
133 
134   OperationalStats* stats_first = opstats_sequence.add_opstats();
135   std::string session_first = "session_first";
136   std::string population_first = "population_first";
137   stats_first->set_session_name(session_first);
138   stats_first->set_population_name(population_first);
139 
140   OperationalStats* stats_last = opstats_sequence.add_opstats();
141   std::string session_last = "session_last";
142   std::string population_last = "population_last";
143   stats_last->set_session_name(session_last);
144   stats_last->set_population_name(population_last);
145 
146   EXPECT_CALL(mock_db_, Read()).WillOnce(Return(opstats_sequence));
147 
148   ExampleSelector selector;
149   selector.set_collection_uri(kOpStatsCollectionUri);
150   absl::StatusOr<std::unique_ptr<ExampleIterator>> iterator_or =
151       iterator_factory_.CreateExampleIterator(selector);
152   ASSERT_TRUE(iterator_or.ok());
153   std::unique_ptr<ExampleIterator> iterator = std::move(iterator_or.value());
154   absl::StatusOr<std::string> example_or = iterator->Next();
155   ASSERT_TRUE(example_or.ok());
156   tensorflow::Example example_last;
157   ASSERT_TRUE(example_last.ParseFromString(example_or.value()));
158   example_or = iterator->Next();
159   ASSERT_TRUE(example_or.ok());
160   tensorflow::Example example_first;
161   ASSERT_TRUE(example_first.ParseFromString(example_or.value()));
162 
163   // Check if the examples contain the expected data. Opstats examples are
164   // returned in last in, first out order.
165   std::set<std::string> actual_session_names;
166   actual_session_names.insert(ExtractSingleString(example_last, kSessionName));
167   actual_session_names.insert(ExtractSingleString(example_first, kSessionName));
168   std::set<std::string> expected_session_names = {session_last, session_first};
169   EXPECT_EQ(actual_session_names, expected_session_names);
170 
171   std::set<std::string> actual_population_names;
172   actual_population_names.insert(
173       ExtractSingleString(example_last, kPopulationName));
174   actual_population_names.insert(
175       ExtractSingleString(example_first, kPopulationName));
176   std::set<std::string> expected_population_names = {population_last,
177                                                      population_first};
178   EXPECT_EQ(actual_population_names, expected_population_names);
179 
180   // We should have arrived at the end of the iterator.
181   example_or = iterator->Next();
182   EXPECT_THAT(example_or.status(), IsCode(absl::StatusCode::kOutOfRange));
183 
184   // Subsequent Next() calls should all return OUT_OF_RANGE.
185   example_or = iterator->Next();
186   EXPECT_THAT(example_or.status(), IsCode(absl::StatusCode::kOutOfRange));
187 
188   // Close() should work without exceptions.
189   iterator->Close();
190 }
191 
TEST_F(OpStatsExampleStoreTest,EmptyData)192 TEST_F(OpStatsExampleStoreTest, EmptyData) {
193   EXPECT_CALL(mock_db_, Read())
194       .WillOnce(Return(OpStatsSequence::default_instance()));
195 
196   ExampleSelector selector;
197   selector.set_collection_uri(kOpStatsCollectionUri);
198   absl::StatusOr<std::unique_ptr<ExampleIterator>> iterator_or =
199       iterator_factory_.CreateExampleIterator(selector);
200   ASSERT_TRUE(iterator_or.ok());
201   std::unique_ptr<ExampleIterator> iterator = std::move(iterator_or.value());
202   absl::StatusOr<std::string> status_or = iterator->Next();
203   EXPECT_THAT(status_or.status(), IsCode(absl::StatusCode::kOutOfRange));
204 }
205 
TEST_F(OpStatsExampleStoreTest,DataIsFilteredBySelectionCriteria)206 TEST_F(OpStatsExampleStoreTest, DataIsFilteredBySelectionCriteria) {
207   OperationalStats included;
208   included.mutable_events()->Add(CreateEvent(
209       OperationalStats::Event::EVENT_KIND_COMPUTATION_STARTED, 900L));
210   included.mutable_events()->Add(CreateEvent(
211       OperationalStats::Event::EVENT_KIND_COMPUTATION_FINISHED, 1000L));
212 
213   OperationalStats excluded_early;
214   excluded_early.mutable_events()->Add(CreateEvent(
215       OperationalStats::Event::EVENT_KIND_COMPUTATION_STARTED, 500L));
216   excluded_early.mutable_events()->Add(CreateEvent(
217       OperationalStats::Event::EVENT_KIND_COMPUTATION_FINISHED, 700L));
218 
219   OperationalStats excluded_late;
220   excluded_late.mutable_events()->Add(CreateEvent(
221       OperationalStats::Event::EVENT_KIND_COMPUTATION_STARTED, 800L));
222   excluded_late.mutable_events()->Add(CreateEvent(
223       OperationalStats::Event::EVENT_KIND_COMPUTATION_FINISHED, 2001L));
224 
225   OpStatsSequence opstats_sequence;
226   *opstats_sequence.add_opstats() = std::move(excluded_early);
227   *opstats_sequence.add_opstats() = std::move(included);
228   *opstats_sequence.add_opstats() = std::move(excluded_late);
229   EXPECT_CALL(mock_db_, Read()).WillOnce(Return(opstats_sequence));
230 
231   ExampleSelector selector;
232   selector.set_collection_uri(kOpStatsCollectionUri);
233   OpStatsSelectionCriteria criteria;
234   *criteria.mutable_start_time() = TimeUtil::MillisecondsToTimestamp(1000L);
235   *criteria.mutable_end_time() = TimeUtil::MillisecondsToTimestamp(2000L);
236   selector.mutable_criteria()->PackFrom(criteria);
237   absl::StatusOr<std::unique_ptr<ExampleIterator>> iterator_or =
238       iterator_factory_.CreateExampleIterator(selector);
239 
240   ASSERT_TRUE(iterator_or.ok());
241   std::unique_ptr<ExampleIterator> iterator = std::move(iterator_or.value());
242   absl::StatusOr<std::string> example_or = iterator->Next();
243   ASSERT_TRUE(example_or.ok());
244   tensorflow::Example example;
245   example.ParseFromString(example_or.value());
246   auto event_type_list = ExtractRepeatedInt64(example, kEventsEventType);
247   ASSERT_EQ(event_type_list.at(0),
248             OperationalStats::Event::EVENT_KIND_COMPUTATION_STARTED);
249   ASSERT_EQ(event_type_list.at(1),
250             OperationalStats::Event::EVENT_KIND_COMPUTATION_FINISHED);
251   auto event_time_ms_list =
252       ExtractRepeatedInt64(example, kEventsTimestampMillis);
253   ASSERT_EQ(event_time_ms_list.at(0), 900);
254   ASSERT_EQ(event_time_ms_list.at(1), 1000);
255 
256   // We expect the iterator reaches the end because there's only 1 example.
257   example_or = iterator->Next();
258   EXPECT_THAT(example_or.status(), IsCode(absl::StatusCode::kOutOfRange));
259 }
260 
TEST_F(OpStatsExampleStoreTest,SelectionCriteriaOnlyContainsBeginTime)261 TEST_F(OpStatsExampleStoreTest, SelectionCriteriaOnlyContainsBeginTime) {
262   OperationalStats included;
263   included.mutable_events()->Add(CreateEvent(
264       OperationalStats::Event::EVENT_KIND_COMPUTATION_STARTED, 900L));
265   included.mutable_events()->Add(CreateEvent(
266       OperationalStats::Event::EVENT_KIND_COMPUTATION_FINISHED, 1000L));
267 
268   OperationalStats excluded_early;
269   excluded_early.mutable_events()->Add(CreateEvent(
270       OperationalStats::Event::EVENT_KIND_COMPUTATION_STARTED, 500L));
271   excluded_early.mutable_events()->Add(CreateEvent(
272       OperationalStats::Event::EVENT_KIND_COMPUTATION_FINISHED, 700L));
273 
274   OpStatsSequence opstats_sequence;
275   *opstats_sequence.add_opstats() = std::move(excluded_early);
276   *opstats_sequence.add_opstats() = std::move(included);
277   EXPECT_CALL(mock_db_, Read()).WillOnce(Return(opstats_sequence));
278 
279   ExampleSelector selector;
280   selector.set_collection_uri(kOpStatsCollectionUri);
281   OpStatsSelectionCriteria criteria;
282   *criteria.mutable_start_time() = TimeUtil::MillisecondsToTimestamp(1000L);
283   selector.mutable_criteria()->PackFrom(criteria);
284   absl::StatusOr<std::unique_ptr<ExampleIterator>> iterator_or =
285       iterator_factory_.CreateExampleIterator(selector);
286 
287   ASSERT_TRUE(iterator_or.ok());
288   std::unique_ptr<ExampleIterator> iterator = std::move(iterator_or.value());
289   absl::StatusOr<std::string> example_or = iterator->Next();
290   ASSERT_TRUE(example_or.ok());
291   tensorflow::Example example;
292   example.ParseFromString(example_or.value());
293   auto event_type_list = ExtractRepeatedInt64(example, kEventsEventType);
294   ASSERT_EQ(event_type_list.at(0),
295             OperationalStats::Event::EVENT_KIND_COMPUTATION_STARTED);
296   ASSERT_EQ(event_type_list.at(1),
297             OperationalStats::Event::EVENT_KIND_COMPUTATION_FINISHED);
298   auto event_time_ms_list =
299       ExtractRepeatedInt64(example, kEventsTimestampMillis);
300   ASSERT_EQ(event_time_ms_list.at(0), 900);
301   ASSERT_EQ(event_time_ms_list.at(1), 1000);
302 
303   // We expect the iterator reaches the end because there's only 1 example.
304   example_or = iterator->Next();
305   EXPECT_THAT(example_or.status(), IsCode(absl::StatusCode::kOutOfRange));
306 }
307 
TEST_F(OpStatsExampleStoreTest,SelectionCriteriaOnlyContainsEndTime)308 TEST_F(OpStatsExampleStoreTest, SelectionCriteriaOnlyContainsEndTime) {
309   OperationalStats included;
310   included.mutable_events()->Add(CreateEvent(
311       OperationalStats::Event::EVENT_KIND_COMPUTATION_STARTED, 900L));
312   included.mutable_events()->Add(CreateEvent(
313       OperationalStats::Event::EVENT_KIND_COMPUTATION_FINISHED, 1000L));
314 
315   OperationalStats excluded_late;
316   excluded_late.mutable_events()->Add(CreateEvent(
317       OperationalStats::Event::EVENT_KIND_COMPUTATION_STARTED, 800L));
318   excluded_late.mutable_events()->Add(CreateEvent(
319       OperationalStats::Event::EVENT_KIND_COMPUTATION_FINISHED, 2001L));
320 
321   OpStatsSequence opstats_sequence;
322   *opstats_sequence.add_opstats() = std::move(included);
323   *opstats_sequence.add_opstats() = std::move(excluded_late);
324   EXPECT_CALL(mock_db_, Read()).WillOnce(Return(opstats_sequence));
325 
326   ExampleSelector selector;
327   selector.set_collection_uri(kOpStatsCollectionUri);
328   OpStatsSelectionCriteria criteria;
329   *criteria.mutable_start_time() = TimeUtil::MillisecondsToTimestamp(1000L);
330   *criteria.mutable_end_time() = TimeUtil::MillisecondsToTimestamp(2000L);
331   selector.mutable_criteria()->PackFrom(criteria);
332   absl::StatusOr<std::unique_ptr<ExampleIterator>> iterator_or =
333       iterator_factory_.CreateExampleIterator(selector);
334 
335   ASSERT_TRUE(iterator_or.ok());
336   std::unique_ptr<ExampleIterator> iterator = std::move(iterator_or.value());
337   absl::StatusOr<std::string> example_or = iterator->Next();
338   ASSERT_TRUE(example_or.ok());
339   tensorflow::Example example;
340   example.ParseFromString(example_or.value());
341   auto event_type_list = ExtractRepeatedInt64(example, kEventsEventType);
342   ASSERT_EQ(event_type_list.at(0),
343             OperationalStats::Event::EVENT_KIND_COMPUTATION_STARTED);
344   ASSERT_EQ(event_type_list.at(1),
345             OperationalStats::Event::EVENT_KIND_COMPUTATION_FINISHED);
346   auto event_time_ms_list =
347       ExtractRepeatedInt64(example, kEventsTimestampMillis);
348   ASSERT_EQ(event_time_ms_list.at(0), 900);
349   ASSERT_EQ(event_time_ms_list.at(1), 1000);
350 
351   // We expect the iterator reaches the end because there's only 1 example.
352   example_or = iterator->Next();
353   EXPECT_THAT(example_or.status(), IsCode(absl::StatusCode::kOutOfRange));
354 }
355 
TEST_F(OpStatsExampleStoreTest,SelectionCriteriaLastSuccessfulContributionEnabledAndExists)356 TEST_F(OpStatsExampleStoreTest,
357        SelectionCriteriaLastSuccessfulContributionEnabledAndExists) {
358   OpStatsExampleIteratorFactory iterator_factory =
359       OpStatsExampleIteratorFactory(
360           &mock_opstats_logger_, &mock_log_manager_,
361           /*opstats_last_successful_contribution_criteria=*/true);
362   OperationalStats included;
363   included.set_task_name(kTestTaskName);
364   included.mutable_events()->Add(CreateEvent(
365       OperationalStats::Event::EVENT_KIND_COMPUTATION_STARTED, 900L));
366   included.mutable_events()->Add(CreateEvent(
367       OperationalStats::Event::EVENT_KIND_RESULT_UPLOAD_STARTED, 1000L));
368 
369   OperationalStats last_successful_contribution;
370   last_successful_contribution.set_task_name(kTestTaskName);
371   last_successful_contribution.mutable_events()->Add(CreateEvent(
372       OperationalStats::Event::EVENT_KIND_COMPUTATION_STARTED, 1200L));
373   last_successful_contribution.mutable_events()->Add(CreateEvent(
374       OperationalStats::Event::EVENT_KIND_RESULT_UPLOAD_STARTED, 2001L));
375 
376   OpStatsSequence opstats_sequence;
377   *opstats_sequence.add_opstats() = std::move(included);
378   *opstats_sequence.add_opstats() = std::move(last_successful_contribution);
379   EXPECT_CALL(mock_db_, Read()).WillOnce(Return(opstats_sequence));
380 
381   ExampleSelector selector;
382   selector.set_collection_uri(kOpStatsCollectionUri);
383   OpStatsSelectionCriteria criteria;
384   criteria.set_last_successful_contribution(true);
385   selector.mutable_criteria()->PackFrom(criteria);
386   absl::StatusOr<std::unique_ptr<ExampleIterator>> iterator_or =
387       iterator_factory.CreateExampleIterator(selector);
388 
389   EXPECT_OK(iterator_or);
390   std::unique_ptr<ExampleIterator> iterator = std::move(iterator_or.value());
391   absl::StatusOr<std::string> example_or = iterator->Next();
392   EXPECT_OK(example_or);
393   tensorflow::Example example;
394   example.ParseFromString(example_or.value());
395   auto event_type_list = ExtractRepeatedInt64(example, kEventsEventType);
396   ASSERT_EQ(event_type_list.at(0),
397             OperationalStats::Event::EVENT_KIND_COMPUTATION_STARTED);
398   ASSERT_EQ(event_type_list.at(1),
399             OperationalStats::Event::EVENT_KIND_RESULT_UPLOAD_STARTED);
400   auto event_time_ms_list =
401       ExtractRepeatedInt64(example, kEventsTimestampMillis);
402   ASSERT_EQ(event_time_ms_list.at(0), 1200L);
403   ASSERT_EQ(event_time_ms_list.at(1), 2001L);
404 
405   // We expect the iterator reaches the end because there's only 1 example.
406   example_or = iterator->Next();
407   EXPECT_THAT(example_or.status(), IsCode(absl::StatusCode::kOutOfRange));
408 }
409 
TEST_F(OpStatsExampleStoreTest,SelectionCriteriaLastSuccessfulContributionEnabledAndDoesNotExist)410 TEST_F(OpStatsExampleStoreTest,
411        SelectionCriteriaLastSuccessfulContributionEnabledAndDoesNotExist) {
412   OpStatsExampleIteratorFactory iterator_factory =
413       OpStatsExampleIteratorFactory(
414           &mock_opstats_logger_, &mock_log_manager_,
415           /*opstats_last_successful_contribution_criteria=*/true);
416   OperationalStats non_matching;
417   non_matching.set_task_name("non_matching_task_name");
418   non_matching.mutable_events()->Add(CreateEvent(
419       OperationalStats::Event::EVENT_KIND_COMPUTATION_STARTED, 900L));
420   non_matching.mutable_events()->Add(CreateEvent(
421       OperationalStats::Event::EVENT_KIND_RESULT_UPLOAD_STARTED, 1000L));
422 
423   OperationalStats matching_but_no_upload;
424   matching_but_no_upload.set_task_name(kTestTaskName);
425   matching_but_no_upload.mutable_events()->Add(CreateEvent(
426       OperationalStats::Event::EVENT_KIND_COMPUTATION_STARTED, 1200L));
427 
428   OpStatsSequence opstats_sequence;
429   *opstats_sequence.add_opstats() = std::move(non_matching);
430   *opstats_sequence.add_opstats() = std::move(matching_but_no_upload);
431   EXPECT_CALL(mock_db_, Read()).WillOnce(Return(opstats_sequence));
432 
433   ExampleSelector selector;
434   selector.set_collection_uri(kOpStatsCollectionUri);
435   OpStatsSelectionCriteria criteria;
436   criteria.set_last_successful_contribution(true);
437   selector.mutable_criteria()->PackFrom(criteria);
438   absl::StatusOr<std::unique_ptr<ExampleIterator>> iterator_or =
439       iterator_factory.CreateExampleIterator(selector);
440 
441   EXPECT_OK(iterator_or);
442   std::unique_ptr<ExampleIterator> iterator = std::move(iterator_or.value());
443   absl::StatusOr<std::string> example_or = iterator->Next();
444   EXPECT_THAT(example_or.status(), IsCode(absl::StatusCode::kOutOfRange));
445 }
446 
TEST_F(OpStatsExampleStoreTest,SelectionCriteriaLastSuccessfulContributionDisabled)447 TEST_F(OpStatsExampleStoreTest,
448        SelectionCriteriaLastSuccessfulContributionDisabled) {
449   // disable the feature but put in some matching entries.
450   OpStatsExampleIteratorFactory iterator_factory =
451       OpStatsExampleIteratorFactory(
452           &mock_opstats_logger_, &mock_log_manager_,
453           /*opstats_last_successful_contribution_criteria=*/false);
454 
455   OperationalStats included;
456   included.set_task_name(kTestTaskName);
457   included.mutable_events()->Add(CreateEvent(
458       OperationalStats::Event::EVENT_KIND_COMPUTATION_STARTED, 900L));
459   included.mutable_events()->Add(CreateEvent(
460       OperationalStats::Event::EVENT_KIND_RESULT_UPLOAD_STARTED, 1000L));
461 
462   OperationalStats last_successful_contribution;
463   last_successful_contribution.set_task_name(kTestTaskName);
464   last_successful_contribution.mutable_events()->Add(CreateEvent(
465       OperationalStats::Event::EVENT_KIND_COMPUTATION_STARTED, 1200L));
466   last_successful_contribution.mutable_events()->Add(CreateEvent(
467       OperationalStats::Event::EVENT_KIND_RESULT_UPLOAD_STARTED, 2001L));
468 
469   OpStatsSequence opstats_sequence;
470   *opstats_sequence.add_opstats() = std::move(included);
471   *opstats_sequence.add_opstats() = std::move(last_successful_contribution);
472   EXPECT_CALL(mock_db_, Read()).WillOnce(Return(opstats_sequence));
473 
474   ExampleSelector selector;
475   selector.set_collection_uri(kOpStatsCollectionUri);
476   OpStatsSelectionCriteria criteria;
477   criteria.set_last_successful_contribution(true);
478   selector.mutable_criteria()->PackFrom(criteria);
479   absl::StatusOr<std::unique_ptr<ExampleIterator>> iterator_or =
480       iterator_factory.CreateExampleIterator(selector);
481   // Enabling last successful contribution in the criteria when it's not enabled
482   // in the client returns INVALID_ARGUMENT.
483   EXPECT_THAT(iterator_or.status(), IsCode(absl::StatusCode::kInvalidArgument));
484 }
485 
TEST_F(OpStatsExampleStoreTest,FullSerialization)486 TEST_F(OpStatsExampleStoreTest, FullSerialization) {
487   OperationalStats stats;
488   // Set singular fields
489   std::string session = "session";
490   std::string population = "population";
491   std::string task_name = "task";
492   std::string error = "error";
493   int64_t chunking_layer_bytes_downloaded = 200;
494   int64_t chunking_layer_bytes_uploaded = 600;
495   int64_t network_duration_ms = 700;
496   stats.set_session_name(session);
497   stats.set_population_name(population);
498   stats.set_task_name(task_name);
499   stats.set_error_message(error);
500   stats.set_chunking_layer_bytes_downloaded(chunking_layer_bytes_downloaded);
501   stats.set_chunking_layer_bytes_uploaded(chunking_layer_bytes_uploaded);
502   *stats.mutable_network_duration() =
503       TimeUtil::MillisecondsToDuration(network_duration_ms);
504 
505   // Set two events
506   OperationalStats::Event::EventKind event_kind_a =
507       OperationalStats::Event::EVENT_KIND_COMPUTATION_STARTED;
508   int64_t event_time_ms_a = 1000;
509   OperationalStats::Event::EventKind event_kind_b =
510       OperationalStats::Event::EVENT_KIND_COMPUTATION_FINISHED;
511   int64_t event_time_ms_b = 1500;
512   stats.mutable_events()->Add(CreateEvent(event_kind_a, event_time_ms_a));
513   stats.mutable_events()->Add(CreateEvent(event_kind_b, event_time_ms_b));
514 
515   // Set two dataset stats
516   std::string uri_a = "app:/train";
517   int64_t num_examples_a = 10;
518   int64_t example_bytes_a = 1000;
519   std::string uri_b = "app:/test";
520   int64_t num_examples_b = 5;
521   int64_t example_bytes_b = 500;
522   (*stats.mutable_dataset_stats())[uri_a] =
523       CreateDatasetStats(num_examples_a, example_bytes_a);
524   (*stats.mutable_dataset_stats())[uri_b] =
525       CreateDatasetStats(num_examples_b, example_bytes_b);
526 
527   // Set retry window
528   int64_t min_delay_ms = 5000;
529   int64_t max_delay_ms = 9000;
530   RetryWindow retry;
531   retry.set_retry_token("token");
532   *retry.mutable_delay_min() = TimeUtil::MillisecondsToDuration(min_delay_ms);
533   *retry.mutable_delay_max() = TimeUtil::MillisecondsToDuration(max_delay_ms);
534   *stats.mutable_retry_window() = retry;
535 
536   OpStatsSequence opstats_sequence;
537   ::google::protobuf::Timestamp currentTime = TimeUtil::GetCurrentTime();
538   *opstats_sequence.mutable_earliest_trustworthy_time() = currentTime;
539   *opstats_sequence.add_opstats() = std::move(stats);
540   EXPECT_CALL(mock_db_, Read()).WillOnce(Return(opstats_sequence));
541 
542   ExampleSelector selector;
543   selector.set_collection_uri(kOpStatsCollectionUri);
544   absl::StatusOr<std::unique_ptr<ExampleIterator>> iterator_or =
545       iterator_factory_.CreateExampleIterator(selector);
546 
547   ASSERT_TRUE(iterator_or.ok());
548   std::unique_ptr<ExampleIterator> iterator = std::move(iterator_or.value());
549   absl::StatusOr<std::string> example_or = iterator->Next();
550   ASSERT_TRUE(example_or.ok());
551   tensorflow::Example example;
552   example.ParseFromString(example_or.value());
553 
554   // Verify the example contains all the correct information.
555   // Singular fields
556   ASSERT_EQ(ExtractSingleString(example, kSessionName), session);
557   ASSERT_EQ(ExtractSingleString(example, kPopulationName), population);
558   ASSERT_EQ(ExtractSingleString(example, kTaskName), task_name);
559   ASSERT_EQ(ExtractSingleString(example, kErrorMessage), error);
560   ASSERT_EQ(ExtractSingleInt64(example, kChunkingLayerBytesDownloaded),
561             chunking_layer_bytes_downloaded);
562   ASSERT_EQ(ExtractSingleInt64(example, kChunkingLayerBytesUploaded),
563             chunking_layer_bytes_uploaded);
564   ASSERT_EQ(ExtractSingleInt64(example, kNetworkDuration), network_duration_ms);
565   ASSERT_EQ(ExtractSingleInt64(example, kEarliestTrustWorthyTimeMillis),
566             TimeUtil::TimestampToMilliseconds(currentTime));
567 
568   // Events
569   auto event_types = ExtractRepeatedInt64(example, kEventsEventType);
570   ASSERT_EQ(event_types.at(0), event_kind_a);
571   ASSERT_EQ(event_types.at(1), event_kind_b);
572   auto event_times = ExtractRepeatedInt64(example, kEventsTimestampMillis);
573   ASSERT_EQ(event_times.at(0), event_time_ms_a);
574   ASSERT_EQ(event_times.at(1), event_time_ms_b);
575 
576   // Dataset stats
577   auto dataset_urls = ExtractRepeatedString(example, kDatasetStatsUri);
578   // The order of the dataset stats doesn't matter, but should be consistent
579   // across the individual features.
580   int index_a = dataset_urls.at(1) == uri_a;
581   ASSERT_EQ(dataset_urls.at(index_a), uri_a);
582   ASSERT_EQ(dataset_urls.at(1 - index_a), uri_b);
583   auto example_counts =
584       ExtractRepeatedInt64(example, kDatasetStatsNumExamplesRead);
585   ASSERT_EQ(example_counts.at(index_a), num_examples_a);
586   ASSERT_EQ(example_counts.at(1 - index_a), num_examples_b);
587   auto example_bytes = ExtractRepeatedInt64(example, kDatasetStatsNumBytesRead);
588   ASSERT_EQ(example_bytes.at(index_a), example_bytes_a);
589   ASSERT_EQ(example_bytes.at(1 - index_a), example_bytes_b);
590 
591   // RetryWindow
592   ASSERT_EQ(ExtractSingleInt64(example, kRetryWindowDelayMinMillis),
593             min_delay_ms);
594   ASSERT_EQ(ExtractSingleInt64(example, kRetryWindowDelayMaxMillis),
595             max_delay_ms);
596 }
597 
598 }  // anonymous namespace
599 }  // namespace opstats
600 }  // namespace client
601 }  // namespace fcp
602