1 /*
2 * Copyright 2021 Google LLC
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "fcp/client/opstats/opstats_example_store.h"
17
18 #include <string>
19 #include <utility>
20
21 #include "google/protobuf/any.pb.h"
22 #include "google/protobuf/util/time_util.h"
23 #include "gmock/gmock.h"
24 #include "gtest/gtest.h"
25 #include "fcp/client/test_helpers.h"
26 #include "fcp/protos/federated_api.pb.h"
27 #include "fcp/protos/plan.pb.h"
28 #include "fcp/testing/testing.h"
29 #include "tensorflow/core/example/example.pb.h"
30 #include "tensorflow/core/example/feature.pb.h"
31
32 namespace fcp {
33 namespace client {
34 namespace opstats {
35 namespace {
36
37 using ::google::internal::federated::plan::ExampleSelector;
38 using ::google::internal::federatedml::v2::RetryWindow;
39 using ::google::protobuf::util::TimeUtil;
40 using ::testing::Return;
41
42 constexpr char kTestTaskName[] = "stefans_really_cool_task";
43
44 class OpStatsExampleStoreTest : public testing::Test {
45 public:
OpStatsExampleStoreTest()46 OpStatsExampleStoreTest() {
47 EXPECT_CALL(mock_opstats_logger_, IsOpStatsEnabled())
48 .WillRepeatedly(Return(true));
49 EXPECT_CALL(mock_opstats_logger_, GetOpStatsDb())
50 .WillRepeatedly(Return(&mock_db_));
51 EXPECT_CALL(mock_opstats_logger_, GetCurrentTaskName())
52 .WillRepeatedly(Return(kTestTaskName));
53 }
54
55 protected:
CreateEvent(OperationalStats::Event::EventKind event_kind,int64_t event_time_ms)56 static OperationalStats::Event CreateEvent(
57 OperationalStats::Event::EventKind event_kind, int64_t event_time_ms) {
58 OperationalStats::Event event;
59 event.set_event_type(event_kind);
60 *event.mutable_timestamp() =
61 TimeUtil::MillisecondsToTimestamp(event_time_ms);
62 return event;
63 }
64
CreateDatasetStats(int64_t num_examples_read,int64_t num_bytes_read)65 static OperationalStats::DatasetStats CreateDatasetStats(
66 int64_t num_examples_read, int64_t num_bytes_read) {
67 OperationalStats::DatasetStats stats;
68 stats.set_num_bytes_read(num_bytes_read);
69 stats.set_num_examples_read(num_examples_read);
70 return stats;
71 }
72
73 testing::StrictMock<MockOpStatsLogger> mock_opstats_logger_;
74 testing::StrictMock<MockOpStatsDb> mock_db_;
75 testing::StrictMock<MockLogManager> mock_log_manager_;
76 OpStatsExampleIteratorFactory iterator_factory_ =
77 OpStatsExampleIteratorFactory(
78 &mock_opstats_logger_, &mock_log_manager_,
79 /*opstats_last_successful_contribution_criteria=*/false);
80 };
81
TEST_F(OpStatsExampleStoreTest,TestInvalidCollectionUrl)82 TEST_F(OpStatsExampleStoreTest, TestInvalidCollectionUrl) {
83 ExampleSelector selector;
84 selector.set_collection_uri("INVALID");
85 EXPECT_CALL(mock_log_manager_,
86 LogDiag(ProdDiagCode::OPSTATS_INCORRECT_COLLECTION_URI));
87
88 EXPECT_FALSE(iterator_factory_.CanHandle(selector));
89
90 absl::StatusOr<std::unique_ptr<ExampleIterator>> status_or =
91 iterator_factory_.CreateExampleIterator(selector);
92 EXPECT_THAT(status_or.status(), IsCode(absl::StatusCode::kInvalidArgument));
93 }
94
TEST_F(OpStatsExampleStoreTest,TestMalformedCriteria)95 TEST_F(OpStatsExampleStoreTest, TestMalformedCriteria) {
96 ExampleSelector selector;
97 selector.set_collection_uri(kOpStatsCollectionUri);
98 selector.mutable_criteria()->set_value("NOT_A_PROTO");
99 EXPECT_CALL(mock_log_manager_,
100 LogDiag(ProdDiagCode::OPSTATS_INVALID_SELECTION_CRITERIA));
101 absl::StatusOr<std::unique_ptr<ExampleIterator>> status_or =
102 iterator_factory_.CreateExampleIterator(selector);
103 EXPECT_THAT(status_or.status(), IsCode(absl::StatusCode::kInvalidArgument));
104 }
105
TEST_F(OpStatsExampleStoreTest,TestInvalidCriteria)106 TEST_F(OpStatsExampleStoreTest, TestInvalidCriteria) {
107 ExampleSelector selector;
108 selector.set_collection_uri(kOpStatsCollectionUri);
109 OpStatsSelectionCriteria criteria;
110 *criteria.mutable_start_time() = TimeUtil::MillisecondsToTimestamp(2000L);
111 *criteria.mutable_end_time() = TimeUtil::MillisecondsToTimestamp(1000L);
112 selector.mutable_criteria()->PackFrom(criteria);
113 EXPECT_CALL(mock_log_manager_,
114 LogDiag(ProdDiagCode::OPSTATS_INVALID_SELECTION_CRITERIA));
115 absl::StatusOr<std::unique_ptr<ExampleIterator>> status_or =
116 iterator_factory_.CreateExampleIterator(selector);
117 EXPECT_THAT(status_or.status(), IsCode(absl::StatusCode::kInvalidArgument));
118 }
119
TEST_F(OpStatsExampleStoreTest,TestReadFromDbFailed)120 TEST_F(OpStatsExampleStoreTest, TestReadFromDbFailed) {
121 ExampleSelector selector;
122 selector.set_collection_uri(kOpStatsCollectionUri);
123 EXPECT_CALL(mock_db_, Read())
124 .WillOnce(Return(absl::InternalError("Something's wrong.")));
125 absl::StatusOr<std::unique_ptr<ExampleIterator>> status_or =
126 iterator_factory_.CreateExampleIterator(selector);
127 EXPECT_THAT(status_or.status(), IsCode(absl::StatusCode::kInternal));
128 }
129
TEST_F(OpStatsExampleStoreTest,Success)130 TEST_F(OpStatsExampleStoreTest, Success) {
131 // Prepare some data
132 OpStatsSequence opstats_sequence;
133
134 OperationalStats* stats_first = opstats_sequence.add_opstats();
135 std::string session_first = "session_first";
136 std::string population_first = "population_first";
137 stats_first->set_session_name(session_first);
138 stats_first->set_population_name(population_first);
139
140 OperationalStats* stats_last = opstats_sequence.add_opstats();
141 std::string session_last = "session_last";
142 std::string population_last = "population_last";
143 stats_last->set_session_name(session_last);
144 stats_last->set_population_name(population_last);
145
146 EXPECT_CALL(mock_db_, Read()).WillOnce(Return(opstats_sequence));
147
148 ExampleSelector selector;
149 selector.set_collection_uri(kOpStatsCollectionUri);
150 absl::StatusOr<std::unique_ptr<ExampleIterator>> iterator_or =
151 iterator_factory_.CreateExampleIterator(selector);
152 ASSERT_TRUE(iterator_or.ok());
153 std::unique_ptr<ExampleIterator> iterator = std::move(iterator_or.value());
154 absl::StatusOr<std::string> example_or = iterator->Next();
155 ASSERT_TRUE(example_or.ok());
156 tensorflow::Example example_last;
157 ASSERT_TRUE(example_last.ParseFromString(example_or.value()));
158 example_or = iterator->Next();
159 ASSERT_TRUE(example_or.ok());
160 tensorflow::Example example_first;
161 ASSERT_TRUE(example_first.ParseFromString(example_or.value()));
162
163 // Check if the examples contain the expected data. Opstats examples are
164 // returned in last in, first out order.
165 std::set<std::string> actual_session_names;
166 actual_session_names.insert(ExtractSingleString(example_last, kSessionName));
167 actual_session_names.insert(ExtractSingleString(example_first, kSessionName));
168 std::set<std::string> expected_session_names = {session_last, session_first};
169 EXPECT_EQ(actual_session_names, expected_session_names);
170
171 std::set<std::string> actual_population_names;
172 actual_population_names.insert(
173 ExtractSingleString(example_last, kPopulationName));
174 actual_population_names.insert(
175 ExtractSingleString(example_first, kPopulationName));
176 std::set<std::string> expected_population_names = {population_last,
177 population_first};
178 EXPECT_EQ(actual_population_names, expected_population_names);
179
180 // We should have arrived at the end of the iterator.
181 example_or = iterator->Next();
182 EXPECT_THAT(example_or.status(), IsCode(absl::StatusCode::kOutOfRange));
183
184 // Subsequent Next() calls should all return OUT_OF_RANGE.
185 example_or = iterator->Next();
186 EXPECT_THAT(example_or.status(), IsCode(absl::StatusCode::kOutOfRange));
187
188 // Close() should work without exceptions.
189 iterator->Close();
190 }
191
TEST_F(OpStatsExampleStoreTest,EmptyData)192 TEST_F(OpStatsExampleStoreTest, EmptyData) {
193 EXPECT_CALL(mock_db_, Read())
194 .WillOnce(Return(OpStatsSequence::default_instance()));
195
196 ExampleSelector selector;
197 selector.set_collection_uri(kOpStatsCollectionUri);
198 absl::StatusOr<std::unique_ptr<ExampleIterator>> iterator_or =
199 iterator_factory_.CreateExampleIterator(selector);
200 ASSERT_TRUE(iterator_or.ok());
201 std::unique_ptr<ExampleIterator> iterator = std::move(iterator_or.value());
202 absl::StatusOr<std::string> status_or = iterator->Next();
203 EXPECT_THAT(status_or.status(), IsCode(absl::StatusCode::kOutOfRange));
204 }
205
TEST_F(OpStatsExampleStoreTest,DataIsFilteredBySelectionCriteria)206 TEST_F(OpStatsExampleStoreTest, DataIsFilteredBySelectionCriteria) {
207 OperationalStats included;
208 included.mutable_events()->Add(CreateEvent(
209 OperationalStats::Event::EVENT_KIND_COMPUTATION_STARTED, 900L));
210 included.mutable_events()->Add(CreateEvent(
211 OperationalStats::Event::EVENT_KIND_COMPUTATION_FINISHED, 1000L));
212
213 OperationalStats excluded_early;
214 excluded_early.mutable_events()->Add(CreateEvent(
215 OperationalStats::Event::EVENT_KIND_COMPUTATION_STARTED, 500L));
216 excluded_early.mutable_events()->Add(CreateEvent(
217 OperationalStats::Event::EVENT_KIND_COMPUTATION_FINISHED, 700L));
218
219 OperationalStats excluded_late;
220 excluded_late.mutable_events()->Add(CreateEvent(
221 OperationalStats::Event::EVENT_KIND_COMPUTATION_STARTED, 800L));
222 excluded_late.mutable_events()->Add(CreateEvent(
223 OperationalStats::Event::EVENT_KIND_COMPUTATION_FINISHED, 2001L));
224
225 OpStatsSequence opstats_sequence;
226 *opstats_sequence.add_opstats() = std::move(excluded_early);
227 *opstats_sequence.add_opstats() = std::move(included);
228 *opstats_sequence.add_opstats() = std::move(excluded_late);
229 EXPECT_CALL(mock_db_, Read()).WillOnce(Return(opstats_sequence));
230
231 ExampleSelector selector;
232 selector.set_collection_uri(kOpStatsCollectionUri);
233 OpStatsSelectionCriteria criteria;
234 *criteria.mutable_start_time() = TimeUtil::MillisecondsToTimestamp(1000L);
235 *criteria.mutable_end_time() = TimeUtil::MillisecondsToTimestamp(2000L);
236 selector.mutable_criteria()->PackFrom(criteria);
237 absl::StatusOr<std::unique_ptr<ExampleIterator>> iterator_or =
238 iterator_factory_.CreateExampleIterator(selector);
239
240 ASSERT_TRUE(iterator_or.ok());
241 std::unique_ptr<ExampleIterator> iterator = std::move(iterator_or.value());
242 absl::StatusOr<std::string> example_or = iterator->Next();
243 ASSERT_TRUE(example_or.ok());
244 tensorflow::Example example;
245 example.ParseFromString(example_or.value());
246 auto event_type_list = ExtractRepeatedInt64(example, kEventsEventType);
247 ASSERT_EQ(event_type_list.at(0),
248 OperationalStats::Event::EVENT_KIND_COMPUTATION_STARTED);
249 ASSERT_EQ(event_type_list.at(1),
250 OperationalStats::Event::EVENT_KIND_COMPUTATION_FINISHED);
251 auto event_time_ms_list =
252 ExtractRepeatedInt64(example, kEventsTimestampMillis);
253 ASSERT_EQ(event_time_ms_list.at(0), 900);
254 ASSERT_EQ(event_time_ms_list.at(1), 1000);
255
256 // We expect the iterator reaches the end because there's only 1 example.
257 example_or = iterator->Next();
258 EXPECT_THAT(example_or.status(), IsCode(absl::StatusCode::kOutOfRange));
259 }
260
TEST_F(OpStatsExampleStoreTest,SelectionCriteriaOnlyContainsBeginTime)261 TEST_F(OpStatsExampleStoreTest, SelectionCriteriaOnlyContainsBeginTime) {
262 OperationalStats included;
263 included.mutable_events()->Add(CreateEvent(
264 OperationalStats::Event::EVENT_KIND_COMPUTATION_STARTED, 900L));
265 included.mutable_events()->Add(CreateEvent(
266 OperationalStats::Event::EVENT_KIND_COMPUTATION_FINISHED, 1000L));
267
268 OperationalStats excluded_early;
269 excluded_early.mutable_events()->Add(CreateEvent(
270 OperationalStats::Event::EVENT_KIND_COMPUTATION_STARTED, 500L));
271 excluded_early.mutable_events()->Add(CreateEvent(
272 OperationalStats::Event::EVENT_KIND_COMPUTATION_FINISHED, 700L));
273
274 OpStatsSequence opstats_sequence;
275 *opstats_sequence.add_opstats() = std::move(excluded_early);
276 *opstats_sequence.add_opstats() = std::move(included);
277 EXPECT_CALL(mock_db_, Read()).WillOnce(Return(opstats_sequence));
278
279 ExampleSelector selector;
280 selector.set_collection_uri(kOpStatsCollectionUri);
281 OpStatsSelectionCriteria criteria;
282 *criteria.mutable_start_time() = TimeUtil::MillisecondsToTimestamp(1000L);
283 selector.mutable_criteria()->PackFrom(criteria);
284 absl::StatusOr<std::unique_ptr<ExampleIterator>> iterator_or =
285 iterator_factory_.CreateExampleIterator(selector);
286
287 ASSERT_TRUE(iterator_or.ok());
288 std::unique_ptr<ExampleIterator> iterator = std::move(iterator_or.value());
289 absl::StatusOr<std::string> example_or = iterator->Next();
290 ASSERT_TRUE(example_or.ok());
291 tensorflow::Example example;
292 example.ParseFromString(example_or.value());
293 auto event_type_list = ExtractRepeatedInt64(example, kEventsEventType);
294 ASSERT_EQ(event_type_list.at(0),
295 OperationalStats::Event::EVENT_KIND_COMPUTATION_STARTED);
296 ASSERT_EQ(event_type_list.at(1),
297 OperationalStats::Event::EVENT_KIND_COMPUTATION_FINISHED);
298 auto event_time_ms_list =
299 ExtractRepeatedInt64(example, kEventsTimestampMillis);
300 ASSERT_EQ(event_time_ms_list.at(0), 900);
301 ASSERT_EQ(event_time_ms_list.at(1), 1000);
302
303 // We expect the iterator reaches the end because there's only 1 example.
304 example_or = iterator->Next();
305 EXPECT_THAT(example_or.status(), IsCode(absl::StatusCode::kOutOfRange));
306 }
307
TEST_F(OpStatsExampleStoreTest,SelectionCriteriaOnlyContainsEndTime)308 TEST_F(OpStatsExampleStoreTest, SelectionCriteriaOnlyContainsEndTime) {
309 OperationalStats included;
310 included.mutable_events()->Add(CreateEvent(
311 OperationalStats::Event::EVENT_KIND_COMPUTATION_STARTED, 900L));
312 included.mutable_events()->Add(CreateEvent(
313 OperationalStats::Event::EVENT_KIND_COMPUTATION_FINISHED, 1000L));
314
315 OperationalStats excluded_late;
316 excluded_late.mutable_events()->Add(CreateEvent(
317 OperationalStats::Event::EVENT_KIND_COMPUTATION_STARTED, 800L));
318 excluded_late.mutable_events()->Add(CreateEvent(
319 OperationalStats::Event::EVENT_KIND_COMPUTATION_FINISHED, 2001L));
320
321 OpStatsSequence opstats_sequence;
322 *opstats_sequence.add_opstats() = std::move(included);
323 *opstats_sequence.add_opstats() = std::move(excluded_late);
324 EXPECT_CALL(mock_db_, Read()).WillOnce(Return(opstats_sequence));
325
326 ExampleSelector selector;
327 selector.set_collection_uri(kOpStatsCollectionUri);
328 OpStatsSelectionCriteria criteria;
329 *criteria.mutable_start_time() = TimeUtil::MillisecondsToTimestamp(1000L);
330 *criteria.mutable_end_time() = TimeUtil::MillisecondsToTimestamp(2000L);
331 selector.mutable_criteria()->PackFrom(criteria);
332 absl::StatusOr<std::unique_ptr<ExampleIterator>> iterator_or =
333 iterator_factory_.CreateExampleIterator(selector);
334
335 ASSERT_TRUE(iterator_or.ok());
336 std::unique_ptr<ExampleIterator> iterator = std::move(iterator_or.value());
337 absl::StatusOr<std::string> example_or = iterator->Next();
338 ASSERT_TRUE(example_or.ok());
339 tensorflow::Example example;
340 example.ParseFromString(example_or.value());
341 auto event_type_list = ExtractRepeatedInt64(example, kEventsEventType);
342 ASSERT_EQ(event_type_list.at(0),
343 OperationalStats::Event::EVENT_KIND_COMPUTATION_STARTED);
344 ASSERT_EQ(event_type_list.at(1),
345 OperationalStats::Event::EVENT_KIND_COMPUTATION_FINISHED);
346 auto event_time_ms_list =
347 ExtractRepeatedInt64(example, kEventsTimestampMillis);
348 ASSERT_EQ(event_time_ms_list.at(0), 900);
349 ASSERT_EQ(event_time_ms_list.at(1), 1000);
350
351 // We expect the iterator reaches the end because there's only 1 example.
352 example_or = iterator->Next();
353 EXPECT_THAT(example_or.status(), IsCode(absl::StatusCode::kOutOfRange));
354 }
355
TEST_F(OpStatsExampleStoreTest,SelectionCriteriaLastSuccessfulContributionEnabledAndExists)356 TEST_F(OpStatsExampleStoreTest,
357 SelectionCriteriaLastSuccessfulContributionEnabledAndExists) {
358 OpStatsExampleIteratorFactory iterator_factory =
359 OpStatsExampleIteratorFactory(
360 &mock_opstats_logger_, &mock_log_manager_,
361 /*opstats_last_successful_contribution_criteria=*/true);
362 OperationalStats included;
363 included.set_task_name(kTestTaskName);
364 included.mutable_events()->Add(CreateEvent(
365 OperationalStats::Event::EVENT_KIND_COMPUTATION_STARTED, 900L));
366 included.mutable_events()->Add(CreateEvent(
367 OperationalStats::Event::EVENT_KIND_RESULT_UPLOAD_STARTED, 1000L));
368
369 OperationalStats last_successful_contribution;
370 last_successful_contribution.set_task_name(kTestTaskName);
371 last_successful_contribution.mutable_events()->Add(CreateEvent(
372 OperationalStats::Event::EVENT_KIND_COMPUTATION_STARTED, 1200L));
373 last_successful_contribution.mutable_events()->Add(CreateEvent(
374 OperationalStats::Event::EVENT_KIND_RESULT_UPLOAD_STARTED, 2001L));
375
376 OpStatsSequence opstats_sequence;
377 *opstats_sequence.add_opstats() = std::move(included);
378 *opstats_sequence.add_opstats() = std::move(last_successful_contribution);
379 EXPECT_CALL(mock_db_, Read()).WillOnce(Return(opstats_sequence));
380
381 ExampleSelector selector;
382 selector.set_collection_uri(kOpStatsCollectionUri);
383 OpStatsSelectionCriteria criteria;
384 criteria.set_last_successful_contribution(true);
385 selector.mutable_criteria()->PackFrom(criteria);
386 absl::StatusOr<std::unique_ptr<ExampleIterator>> iterator_or =
387 iterator_factory.CreateExampleIterator(selector);
388
389 EXPECT_OK(iterator_or);
390 std::unique_ptr<ExampleIterator> iterator = std::move(iterator_or.value());
391 absl::StatusOr<std::string> example_or = iterator->Next();
392 EXPECT_OK(example_or);
393 tensorflow::Example example;
394 example.ParseFromString(example_or.value());
395 auto event_type_list = ExtractRepeatedInt64(example, kEventsEventType);
396 ASSERT_EQ(event_type_list.at(0),
397 OperationalStats::Event::EVENT_KIND_COMPUTATION_STARTED);
398 ASSERT_EQ(event_type_list.at(1),
399 OperationalStats::Event::EVENT_KIND_RESULT_UPLOAD_STARTED);
400 auto event_time_ms_list =
401 ExtractRepeatedInt64(example, kEventsTimestampMillis);
402 ASSERT_EQ(event_time_ms_list.at(0), 1200L);
403 ASSERT_EQ(event_time_ms_list.at(1), 2001L);
404
405 // We expect the iterator reaches the end because there's only 1 example.
406 example_or = iterator->Next();
407 EXPECT_THAT(example_or.status(), IsCode(absl::StatusCode::kOutOfRange));
408 }
409
TEST_F(OpStatsExampleStoreTest,SelectionCriteriaLastSuccessfulContributionEnabledAndDoesNotExist)410 TEST_F(OpStatsExampleStoreTest,
411 SelectionCriteriaLastSuccessfulContributionEnabledAndDoesNotExist) {
412 OpStatsExampleIteratorFactory iterator_factory =
413 OpStatsExampleIteratorFactory(
414 &mock_opstats_logger_, &mock_log_manager_,
415 /*opstats_last_successful_contribution_criteria=*/true);
416 OperationalStats non_matching;
417 non_matching.set_task_name("non_matching_task_name");
418 non_matching.mutable_events()->Add(CreateEvent(
419 OperationalStats::Event::EVENT_KIND_COMPUTATION_STARTED, 900L));
420 non_matching.mutable_events()->Add(CreateEvent(
421 OperationalStats::Event::EVENT_KIND_RESULT_UPLOAD_STARTED, 1000L));
422
423 OperationalStats matching_but_no_upload;
424 matching_but_no_upload.set_task_name(kTestTaskName);
425 matching_but_no_upload.mutable_events()->Add(CreateEvent(
426 OperationalStats::Event::EVENT_KIND_COMPUTATION_STARTED, 1200L));
427
428 OpStatsSequence opstats_sequence;
429 *opstats_sequence.add_opstats() = std::move(non_matching);
430 *opstats_sequence.add_opstats() = std::move(matching_but_no_upload);
431 EXPECT_CALL(mock_db_, Read()).WillOnce(Return(opstats_sequence));
432
433 ExampleSelector selector;
434 selector.set_collection_uri(kOpStatsCollectionUri);
435 OpStatsSelectionCriteria criteria;
436 criteria.set_last_successful_contribution(true);
437 selector.mutable_criteria()->PackFrom(criteria);
438 absl::StatusOr<std::unique_ptr<ExampleIterator>> iterator_or =
439 iterator_factory.CreateExampleIterator(selector);
440
441 EXPECT_OK(iterator_or);
442 std::unique_ptr<ExampleIterator> iterator = std::move(iterator_or.value());
443 absl::StatusOr<std::string> example_or = iterator->Next();
444 EXPECT_THAT(example_or.status(), IsCode(absl::StatusCode::kOutOfRange));
445 }
446
TEST_F(OpStatsExampleStoreTest,SelectionCriteriaLastSuccessfulContributionDisabled)447 TEST_F(OpStatsExampleStoreTest,
448 SelectionCriteriaLastSuccessfulContributionDisabled) {
449 // disable the feature but put in some matching entries.
450 OpStatsExampleIteratorFactory iterator_factory =
451 OpStatsExampleIteratorFactory(
452 &mock_opstats_logger_, &mock_log_manager_,
453 /*opstats_last_successful_contribution_criteria=*/false);
454
455 OperationalStats included;
456 included.set_task_name(kTestTaskName);
457 included.mutable_events()->Add(CreateEvent(
458 OperationalStats::Event::EVENT_KIND_COMPUTATION_STARTED, 900L));
459 included.mutable_events()->Add(CreateEvent(
460 OperationalStats::Event::EVENT_KIND_RESULT_UPLOAD_STARTED, 1000L));
461
462 OperationalStats last_successful_contribution;
463 last_successful_contribution.set_task_name(kTestTaskName);
464 last_successful_contribution.mutable_events()->Add(CreateEvent(
465 OperationalStats::Event::EVENT_KIND_COMPUTATION_STARTED, 1200L));
466 last_successful_contribution.mutable_events()->Add(CreateEvent(
467 OperationalStats::Event::EVENT_KIND_RESULT_UPLOAD_STARTED, 2001L));
468
469 OpStatsSequence opstats_sequence;
470 *opstats_sequence.add_opstats() = std::move(included);
471 *opstats_sequence.add_opstats() = std::move(last_successful_contribution);
472 EXPECT_CALL(mock_db_, Read()).WillOnce(Return(opstats_sequence));
473
474 ExampleSelector selector;
475 selector.set_collection_uri(kOpStatsCollectionUri);
476 OpStatsSelectionCriteria criteria;
477 criteria.set_last_successful_contribution(true);
478 selector.mutable_criteria()->PackFrom(criteria);
479 absl::StatusOr<std::unique_ptr<ExampleIterator>> iterator_or =
480 iterator_factory.CreateExampleIterator(selector);
481 // Enabling last successful contribution in the criteria when it's not enabled
482 // in the client returns INVALID_ARGUMENT.
483 EXPECT_THAT(iterator_or.status(), IsCode(absl::StatusCode::kInvalidArgument));
484 }
485
TEST_F(OpStatsExampleStoreTest,FullSerialization)486 TEST_F(OpStatsExampleStoreTest, FullSerialization) {
487 OperationalStats stats;
488 // Set singular fields
489 std::string session = "session";
490 std::string population = "population";
491 std::string task_name = "task";
492 std::string error = "error";
493 int64_t chunking_layer_bytes_downloaded = 200;
494 int64_t chunking_layer_bytes_uploaded = 600;
495 int64_t network_duration_ms = 700;
496 stats.set_session_name(session);
497 stats.set_population_name(population);
498 stats.set_task_name(task_name);
499 stats.set_error_message(error);
500 stats.set_chunking_layer_bytes_downloaded(chunking_layer_bytes_downloaded);
501 stats.set_chunking_layer_bytes_uploaded(chunking_layer_bytes_uploaded);
502 *stats.mutable_network_duration() =
503 TimeUtil::MillisecondsToDuration(network_duration_ms);
504
505 // Set two events
506 OperationalStats::Event::EventKind event_kind_a =
507 OperationalStats::Event::EVENT_KIND_COMPUTATION_STARTED;
508 int64_t event_time_ms_a = 1000;
509 OperationalStats::Event::EventKind event_kind_b =
510 OperationalStats::Event::EVENT_KIND_COMPUTATION_FINISHED;
511 int64_t event_time_ms_b = 1500;
512 stats.mutable_events()->Add(CreateEvent(event_kind_a, event_time_ms_a));
513 stats.mutable_events()->Add(CreateEvent(event_kind_b, event_time_ms_b));
514
515 // Set two dataset stats
516 std::string uri_a = "app:/train";
517 int64_t num_examples_a = 10;
518 int64_t example_bytes_a = 1000;
519 std::string uri_b = "app:/test";
520 int64_t num_examples_b = 5;
521 int64_t example_bytes_b = 500;
522 (*stats.mutable_dataset_stats())[uri_a] =
523 CreateDatasetStats(num_examples_a, example_bytes_a);
524 (*stats.mutable_dataset_stats())[uri_b] =
525 CreateDatasetStats(num_examples_b, example_bytes_b);
526
527 // Set retry window
528 int64_t min_delay_ms = 5000;
529 int64_t max_delay_ms = 9000;
530 RetryWindow retry;
531 retry.set_retry_token("token");
532 *retry.mutable_delay_min() = TimeUtil::MillisecondsToDuration(min_delay_ms);
533 *retry.mutable_delay_max() = TimeUtil::MillisecondsToDuration(max_delay_ms);
534 *stats.mutable_retry_window() = retry;
535
536 OpStatsSequence opstats_sequence;
537 ::google::protobuf::Timestamp currentTime = TimeUtil::GetCurrentTime();
538 *opstats_sequence.mutable_earliest_trustworthy_time() = currentTime;
539 *opstats_sequence.add_opstats() = std::move(stats);
540 EXPECT_CALL(mock_db_, Read()).WillOnce(Return(opstats_sequence));
541
542 ExampleSelector selector;
543 selector.set_collection_uri(kOpStatsCollectionUri);
544 absl::StatusOr<std::unique_ptr<ExampleIterator>> iterator_or =
545 iterator_factory_.CreateExampleIterator(selector);
546
547 ASSERT_TRUE(iterator_or.ok());
548 std::unique_ptr<ExampleIterator> iterator = std::move(iterator_or.value());
549 absl::StatusOr<std::string> example_or = iterator->Next();
550 ASSERT_TRUE(example_or.ok());
551 tensorflow::Example example;
552 example.ParseFromString(example_or.value());
553
554 // Verify the example contains all the correct information.
555 // Singular fields
556 ASSERT_EQ(ExtractSingleString(example, kSessionName), session);
557 ASSERT_EQ(ExtractSingleString(example, kPopulationName), population);
558 ASSERT_EQ(ExtractSingleString(example, kTaskName), task_name);
559 ASSERT_EQ(ExtractSingleString(example, kErrorMessage), error);
560 ASSERT_EQ(ExtractSingleInt64(example, kChunkingLayerBytesDownloaded),
561 chunking_layer_bytes_downloaded);
562 ASSERT_EQ(ExtractSingleInt64(example, kChunkingLayerBytesUploaded),
563 chunking_layer_bytes_uploaded);
564 ASSERT_EQ(ExtractSingleInt64(example, kNetworkDuration), network_duration_ms);
565 ASSERT_EQ(ExtractSingleInt64(example, kEarliestTrustWorthyTimeMillis),
566 TimeUtil::TimestampToMilliseconds(currentTime));
567
568 // Events
569 auto event_types = ExtractRepeatedInt64(example, kEventsEventType);
570 ASSERT_EQ(event_types.at(0), event_kind_a);
571 ASSERT_EQ(event_types.at(1), event_kind_b);
572 auto event_times = ExtractRepeatedInt64(example, kEventsTimestampMillis);
573 ASSERT_EQ(event_times.at(0), event_time_ms_a);
574 ASSERT_EQ(event_times.at(1), event_time_ms_b);
575
576 // Dataset stats
577 auto dataset_urls = ExtractRepeatedString(example, kDatasetStatsUri);
578 // The order of the dataset stats doesn't matter, but should be consistent
579 // across the individual features.
580 int index_a = dataset_urls.at(1) == uri_a;
581 ASSERT_EQ(dataset_urls.at(index_a), uri_a);
582 ASSERT_EQ(dataset_urls.at(1 - index_a), uri_b);
583 auto example_counts =
584 ExtractRepeatedInt64(example, kDatasetStatsNumExamplesRead);
585 ASSERT_EQ(example_counts.at(index_a), num_examples_a);
586 ASSERT_EQ(example_counts.at(1 - index_a), num_examples_b);
587 auto example_bytes = ExtractRepeatedInt64(example, kDatasetStatsNumBytesRead);
588 ASSERT_EQ(example_bytes.at(index_a), example_bytes_a);
589 ASSERT_EQ(example_bytes.at(1 - index_a), example_bytes_b);
590
591 // RetryWindow
592 ASSERT_EQ(ExtractSingleInt64(example, kRetryWindowDelayMinMillis),
593 min_delay_ms);
594 ASSERT_EQ(ExtractSingleInt64(example, kRetryWindowDelayMaxMillis),
595 max_delay_ms);
596 }
597
598 } // anonymous namespace
599 } // namespace opstats
600 } // namespace client
601 } // namespace fcp
602