1 /*
2 * Copyright 2022 Google LLC
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "fcp/client/phase_logger_impl.h"
17
18 #include <string>
19
20 #include "absl/time/time.h"
21 #include "fcp/base/monitoring.h"
22
23 namespace fcp {
24 namespace client {
25 namespace {
26 constexpr absl::string_view kInitializationErrorPrefix =
27 "Error during initialization: ";
28 constexpr absl::string_view kEligibilityCheckinErrorPrefix =
29 "Error during eligibility check-in: ";
30 constexpr absl::string_view kEligibilityComputationErrorPrefix =
31 "Error during eligibility eval computation: ";
32 constexpr absl::string_view kCheckinErrorPrefix = "Error during check-in: ";
33 constexpr absl::string_view kComputationErrorPrefix =
34 "Error during computation: ";
35 constexpr absl::string_view kResultUploadErrorPrefix =
36 "Error reporting results: ";
37 constexpr absl::string_view kFailureUploadErrorPrefix =
38 "Error reporting computation failure: ";
39 } // anonymous namespace
40
41 using ::fcp::client::opstats::OperationalStats;
42 using ::google::internal::federatedml::v2::RetryWindow;
43
UpdateRetryWindowAndNetworkStats(const RetryWindow & retry_window,const NetworkStats & network_stats)44 void PhaseLoggerImpl::UpdateRetryWindowAndNetworkStats(
45 const RetryWindow& retry_window, const NetworkStats& network_stats) {
46 opstats_logger_->SetRetryWindow(retry_window);
47
48 // Update the network stats.
49 opstats_logger_->SetNetworkStats(network_stats);
50 }
51
SetModelIdentifier(absl::string_view model_identifier)52 void PhaseLoggerImpl::SetModelIdentifier(absl::string_view model_identifier) {
53 event_publisher_->SetModelIdentifier(std::string(model_identifier));
54 log_manager_->SetModelIdentifier(std::string(model_identifier));
55 }
56
LogTaskNotStarted(absl::string_view error_message)57 void PhaseLoggerImpl::LogTaskNotStarted(absl::string_view error_message) {
58 event_publisher_->PublishTaskNotStarted(error_message);
59 opstats_logger_->AddEventWithErrorMessage(
60 OperationalStats::Event::EVENT_KIND_TRAIN_NOT_STARTED,
61 std::string(error_message));
62 }
63
LogNonfatalInitializationError(absl::Status error_status)64 void PhaseLoggerImpl::LogNonfatalInitializationError(
65 absl::Status error_status) {
66 std::string error_message = GetErrorMessage(
67 error_status, kInitializationErrorPrefix, /* keep_error_message= */ true);
68 event_publisher_->PublishNonfatalInitializationError(error_message);
69 opstats_logger_->AddEventWithErrorMessage(
70 OperationalStats::Event::EVENT_KIND_INITIALIZATION_ERROR_FATAL,
71 error_message);
72 }
73
LogFatalInitializationError(absl::Status error_status)74 void PhaseLoggerImpl::LogFatalInitializationError(absl::Status error_status) {
75 std::string error_message = GetErrorMessage(
76 error_status, kInitializationErrorPrefix, /* keep_error_message= */ true);
77 event_publisher_->PublishFatalInitializationError(error_message);
78 opstats_logger_->AddEventWithErrorMessage(
79 OperationalStats::Event::EVENT_KIND_INITIALIZATION_ERROR_NONFATAL,
80 error_message);
81 }
82
LogEligibilityEvalCheckinStarted()83 void PhaseLoggerImpl::LogEligibilityEvalCheckinStarted() {
84 event_publisher_->PublishEligibilityEvalCheckin();
85 opstats_logger_->AddEvent(
86 OperationalStats::Event::EVENT_KIND_ELIGIBILITY_CHECKIN_STARTED);
87 }
88
LogEligibilityEvalCheckinIOError(absl::Status error_status,const NetworkStats & network_stats,absl::Time time_before_checkin)89 void PhaseLoggerImpl::LogEligibilityEvalCheckinIOError(
90 absl::Status error_status, const NetworkStats& network_stats,
91 absl::Time time_before_checkin) {
92 std::string error_message =
93 GetErrorMessage(error_status, kEligibilityCheckinErrorPrefix,
94 /* keep_error_message= */ true);
95 event_publisher_->PublishEligibilityEvalCheckinIoError(
96 error_message, network_stats, absl::Now() - time_before_checkin);
97 opstats_logger_->AddEventWithErrorMessage(
98 OperationalStats::Event::EVENT_KIND_ELIGIBILITY_CHECKIN_ERROR_IO,
99 error_message);
100 LogEligibilityEvalCheckinLatency(time_before_checkin);
101 }
102
LogEligibilityEvalCheckinClientInterrupted(absl::Status error_status,const NetworkStats & network_stats,absl::Time time_before_checkin)103 void PhaseLoggerImpl::LogEligibilityEvalCheckinClientInterrupted(
104 absl::Status error_status, const NetworkStats& network_stats,
105 absl::Time time_before_checkin) {
106 std::string error_message =
107 GetErrorMessage(error_status, kEligibilityCheckinErrorPrefix,
108 /* keep_error_message= */ true);
109 event_publisher_->PublishEligibilityEvalCheckinClientInterrupted(
110 error_message, network_stats, absl::Now() - time_before_checkin);
111 opstats_logger_->AddEventWithErrorMessage(
112 OperationalStats::Event::
113 EVENT_KIND_ELIGIBILITY_CHECKIN_CLIENT_INTERRUPTED,
114 error_message);
115 LogEligibilityEvalCheckinLatency(time_before_checkin);
116 }
117
LogEligibilityEvalCheckinServerAborted(absl::Status error_status,const NetworkStats & network_stats,absl::Time time_before_checkin)118 void PhaseLoggerImpl::LogEligibilityEvalCheckinServerAborted(
119 absl::Status error_status, const NetworkStats& network_stats,
120 absl::Time time_before_checkin) {
121 std::string error_message =
122 GetErrorMessage(error_status, kEligibilityCheckinErrorPrefix,
123 /* keep_error_message= */ true);
124 event_publisher_->PublishEligibilityEvalCheckinServerAborted(
125 error_message, network_stats, absl::Now() - time_before_checkin);
126 opstats_logger_->AddEventWithErrorMessage(
127 OperationalStats::Event::EVENT_KIND_ELIGIBILITY_CHECKIN_SERVER_ABORTED,
128 error_message);
129 LogEligibilityEvalCheckinLatency(time_before_checkin);
130 }
131
LogEligibilityEvalNotConfigured(const NetworkStats & network_stats,absl::Time time_before_checkin)132 void PhaseLoggerImpl::LogEligibilityEvalNotConfigured(
133 const NetworkStats& network_stats, absl::Time time_before_checkin) {
134 event_publisher_->PublishEligibilityEvalNotConfigured(
135 network_stats, absl::Now() - time_before_checkin);
136 opstats_logger_->AddEvent(
137 OperationalStats::Event::EVENT_KIND_ELIGIBILITY_DISABLED);
138 LogEligibilityEvalCheckinLatency(time_before_checkin);
139 }
140
LogEligibilityEvalCheckinTurnedAway(const NetworkStats & network_stats,absl::Time time_before_checkin)141 void PhaseLoggerImpl::LogEligibilityEvalCheckinTurnedAway(
142 const NetworkStats& network_stats, absl::Time time_before_checkin) {
143 event_publisher_->PublishEligibilityEvalRejected(
144 network_stats, absl::Now() - time_before_checkin);
145 opstats_logger_->AddEvent(
146 OperationalStats::Event::EVENT_KIND_ELIGIBILITY_REJECTED);
147 LogEligibilityEvalCheckinLatency(time_before_checkin);
148 }
149
LogEligibilityEvalCheckinInvalidPayloadError(absl::string_view error_message,const NetworkStats & network_stats,absl::Time time_before_checkin)150 void PhaseLoggerImpl::LogEligibilityEvalCheckinInvalidPayloadError(
151 absl::string_view error_message, const NetworkStats& network_stats,
152 absl::Time time_before_checkin) {
153 log_manager_->LogDiag(
154 ProdDiagCode::
155 BACKGROUND_TRAINING_ELIGIBILITY_EVAL_FAILED_CANNOT_PARSE_PLAN);
156 event_publisher_->PublishEligibilityEvalCheckinErrorInvalidPayload(
157 error_message, network_stats, absl::Now() - time_before_checkin);
158 opstats_logger_->AddEventWithErrorMessage(
159 OperationalStats::Event::
160 EVENT_KIND_ELIGIBILITY_CHECKIN_ERROR_INVALID_PAYLOAD,
161 std::string(error_message));
162 LogEligibilityEvalCheckinLatency(time_before_checkin);
163 }
164
LogEligibilityEvalCheckinPlanUriReceived(const NetworkStats & network_stats,absl::Time time_before_checkin)165 void PhaseLoggerImpl::LogEligibilityEvalCheckinPlanUriReceived(
166 const NetworkStats& network_stats, absl::Time time_before_checkin) {
167 opstats_logger_->AddEvent(
168 OperationalStats::Event::EVENT_KIND_ELIGIBILITY_PLAN_URI_RECEIVED);
169 event_publisher_->PublishEligibilityEvalPlanUriReceived(
170 network_stats, absl::Now() - time_before_checkin);
171 }
172
LogEligibilityEvalCheckinCompleted(const NetworkStats & network_stats,absl::Time time_before_checkin,absl::Time time_before_plan_download)173 void PhaseLoggerImpl::LogEligibilityEvalCheckinCompleted(
174 const NetworkStats& network_stats, absl::Time time_before_checkin,
175 absl::Time time_before_plan_download) {
176 opstats_logger_->AddEvent(
177 OperationalStats::Event::EVENT_KIND_ELIGIBILITY_ENABLED);
178 absl::Time before_time = time_before_plan_download;
179 event_publisher_->PublishEligibilityEvalPlanReceived(
180 network_stats, absl::Now() - before_time);
181
182 // The 'EligibilityEvalCheckinLatency' should cover the whole period from
183 // eligibility eval checkin to completion (and not just the period from EET
184 // plan URIs being received to completion).
185 LogEligibilityEvalCheckinLatency(time_before_checkin);
186 }
187
LogEligibilityEvalComputationStarted()188 void PhaseLoggerImpl::LogEligibilityEvalComputationStarted() {
189 event_publisher_->PublishEligibilityEvalComputationStarted();
190 opstats_logger_->AddEvent(
191 OperationalStats::Event::EVENT_KIND_ELIGIBILITY_COMPUTATION_STARTED);
192 }
193
LogEligibilityEvalComputationInvalidArgument(absl::Status error_status,const ExampleStats & example_stats,absl::Time run_plan_start_time)194 void PhaseLoggerImpl::LogEligibilityEvalComputationInvalidArgument(
195 absl::Status error_status, const ExampleStats& example_stats,
196 absl::Time run_plan_start_time) {
197 std::string error_message =
198 GetErrorMessage(error_status, kEligibilityComputationErrorPrefix,
199 /* keep_error_message= */ true);
200 log_manager_->LogDiag(
201 ProdDiagCode::BACKGROUND_TRAINING_FAILED_PLAN_FAILS_SANITY_CHECK);
202 event_publisher_->PublishEligibilityEvalComputationInvalidArgument(
203 error_message, example_stats, absl::Now() - run_plan_start_time);
204 opstats_logger_->AddEventWithErrorMessage(
205 OperationalStats::Event::
206 EVENT_KIND_ELIGIBILITY_COMPUTATION_ERROR_INVALID_ARGUMENT,
207 error_message);
208 }
209
LogEligibilityEvalComputationExampleIteratorError(absl::Status error_status,const ExampleStats & example_stats,absl::Time run_plan_start_time)210 void PhaseLoggerImpl::LogEligibilityEvalComputationExampleIteratorError(
211 absl::Status error_status, const ExampleStats& example_stats,
212 absl::Time run_plan_start_time) {
213 std::string error_message =
214 GetErrorMessage(error_status, kEligibilityComputationErrorPrefix,
215 /* keep_error_message= */ true);
216 event_publisher_->PublishEligibilityEvalComputationExampleIteratorError(
217 error_message, example_stats, absl::Now() - run_plan_start_time);
218 opstats_logger_->AddEventWithErrorMessage(
219 OperationalStats::Event::
220 EVENT_KIND_ELIGIBILITY_COMPUTATION_ERROR_EXAMPLE_ITERATOR,
221 error_message);
222 }
223
LogEligibilityEvalComputationTensorflowError(absl::Status error_status,const ExampleStats & example_stats,absl::Time run_plan_start_time,absl::Time reference_time)224 void PhaseLoggerImpl::LogEligibilityEvalComputationTensorflowError(
225 absl::Status error_status, const ExampleStats& example_stats,
226 absl::Time run_plan_start_time, absl::Time reference_time) {
227 std::string error_message =
228 GetErrorMessage(error_status, kEligibilityComputationErrorPrefix,
229 log_tensorflow_error_messages_);
230 event_publisher_->PublishEligibilityEvalComputationTensorflowError(
231 error_message, example_stats, absl::Now() - run_plan_start_time);
232 opstats_logger_->AddEventWithErrorMessage(
233 OperationalStats::Event::
234 EVENT_KIND_ELIGIBILITY_COMPUTATION_ERROR_TENSORFLOW,
235 error_message);
236 LogEligibilityEvalComputationLatency(run_plan_start_time, reference_time);
237 }
238
LogEligibilityEvalComputationInterrupted(absl::Status error_status,const ExampleStats & example_stats,absl::Time run_plan_start_time,absl::Time reference_time)239 void PhaseLoggerImpl::LogEligibilityEvalComputationInterrupted(
240 absl::Status error_status, const ExampleStats& example_stats,
241 absl::Time run_plan_start_time, absl::Time reference_time) {
242 std::string error_message =
243 GetErrorMessage(error_status, kEligibilityComputationErrorPrefix,
244 /* keep_error_message= */ true);
245 event_publisher_->PublishEligibilityEvalComputationInterrupted(
246 error_message, example_stats, absl::Now() - run_plan_start_time);
247 opstats_logger_->AddEventWithErrorMessage(
248 OperationalStats::Event::
249 EVENT_KIND_ELIGIBILITY_COMPUTATION_CLIENT_INTERRUPTED,
250 error_message);
251 LogEligibilityEvalComputationLatency(run_plan_start_time, reference_time);
252 }
253
LogEligibilityEvalComputationCompleted(const ExampleStats & example_stats,absl::Time run_plan_start_time,absl::Time reference_time)254 void PhaseLoggerImpl::LogEligibilityEvalComputationCompleted(
255 const ExampleStats& example_stats, absl::Time run_plan_start_time,
256 absl::Time reference_time) {
257 event_publisher_->PublishEligibilityEvalComputationCompleted(
258 example_stats, absl::Now() - run_plan_start_time);
259 opstats_logger_->AddEvent(
260 OperationalStats::Event::EVENT_KIND_ELIGIBILITY_COMPUTATION_FINISHED);
261 log_manager_->LogToLongHistogram(
262 HistogramCounters::TRAINING_OVERALL_EXAMPLE_SIZE,
263 example_stats.example_size_bytes);
264 log_manager_->LogToLongHistogram(
265 HistogramCounters::TRAINING_OVERALL_EXAMPLE_COUNT,
266 example_stats.example_count);
267 LogEligibilityEvalComputationLatency(run_plan_start_time, reference_time);
268 }
269
LogCheckinStarted()270 void PhaseLoggerImpl::LogCheckinStarted() {
271 // Log that we are about to check in with the server.
272 event_publisher_->PublishCheckin();
273 opstats_logger_->AddEvent(
274 OperationalStats::Event::EVENT_KIND_CHECKIN_STARTED);
275 }
276
LogCheckinIOError(absl::Status error_status,const NetworkStats & network_stats,absl::Time time_before_checkin,absl::Time reference_time)277 void PhaseLoggerImpl::LogCheckinIOError(absl::Status error_status,
278 const NetworkStats& network_stats,
279 absl::Time time_before_checkin,
280 absl::Time reference_time) {
281 std::string error_message = GetErrorMessage(error_status, kCheckinErrorPrefix,
282 /* keep_error_message= */ true);
283 event_publisher_->PublishCheckinIoError(error_message, network_stats,
284 absl::Now() - time_before_checkin);
285 opstats_logger_->AddEventWithErrorMessage(
286 OperationalStats::Event::EVENT_KIND_CHECKIN_ERROR_IO, error_message);
287 LogCheckinLatency(time_before_checkin, reference_time);
288 }
289
LogCheckinClientInterrupted(absl::Status error_status,const NetworkStats & network_stats,absl::Time time_before_checkin,absl::Time reference_time)290 void PhaseLoggerImpl::LogCheckinClientInterrupted(
291 absl::Status error_status, const NetworkStats& network_stats,
292 absl::Time time_before_checkin, absl::Time reference_time) {
293 std::string error_message = GetErrorMessage(error_status, kCheckinErrorPrefix,
294 /* keep_error_message= */ true);
295 event_publisher_->PublishCheckinClientInterrupted(
296 error_message, network_stats, absl::Now() - time_before_checkin);
297 opstats_logger_->AddEventWithErrorMessage(
298 OperationalStats::Event::EVENT_KIND_CHECKIN_CLIENT_INTERRUPTED,
299 error_message);
300 LogCheckinLatency(time_before_checkin, reference_time);
301 }
302
LogCheckinServerAborted(absl::Status error_status,const NetworkStats & network_stats,absl::Time time_before_checkin,absl::Time reference_time)303 void PhaseLoggerImpl::LogCheckinServerAborted(absl::Status error_status,
304 const NetworkStats& network_stats,
305 absl::Time time_before_checkin,
306 absl::Time reference_time) {
307 std::string error_message = GetErrorMessage(error_status, kCheckinErrorPrefix,
308 /* keep_error_message= */ true);
309 event_publisher_->PublishCheckinServerAborted(
310 error_message, network_stats, absl::Now() - time_before_checkin);
311 opstats_logger_->AddEventWithErrorMessage(
312 OperationalStats::Event::EVENT_KIND_CHECKIN_SERVER_ABORTED,
313 error_message);
314 LogCheckinLatency(time_before_checkin, reference_time);
315 }
316
LogCheckinTurnedAway(const NetworkStats & network_stats,absl::Time time_before_checkin,absl::Time reference_time)317 void PhaseLoggerImpl::LogCheckinTurnedAway(const NetworkStats& network_stats,
318 absl::Time time_before_checkin,
319 absl::Time reference_time) {
320 event_publisher_->PublishRejected(network_stats,
321 absl::Now() - time_before_checkin);
322 opstats_logger_->AddEvent(
323 OperationalStats::Event::EVENT_KIND_CHECKIN_REJECTED);
324 LogCheckinLatency(time_before_checkin, reference_time);
325 }
326
LogCheckinInvalidPayload(absl::string_view error_message,const NetworkStats & network_stats,absl::Time time_before_checkin,absl::Time reference_time)327 void PhaseLoggerImpl::LogCheckinInvalidPayload(
328 absl::string_view error_message, const NetworkStats& network_stats,
329 absl::Time time_before_checkin, absl::Time reference_time) {
330 log_manager_->LogDiag(
331 ProdDiagCode::BACKGROUND_TRAINING_FAILED_CANNOT_PARSE_PLAN);
332 event_publisher_->PublishCheckinInvalidPayload(
333 error_message, network_stats, absl::Now() - time_before_checkin);
334 opstats_logger_->AddEventWithErrorMessage(
335 OperationalStats::Event::EVENT_KIND_CHECKIN_ERROR_INVALID_PAYLOAD,
336 std::string(error_message));
337 LogCheckinLatency(time_before_checkin, reference_time);
338 }
339
LogCheckinPlanUriReceived(absl::string_view task_name,const NetworkStats & network_stats,absl::Time time_before_checkin)340 void PhaseLoggerImpl::LogCheckinPlanUriReceived(
341 absl::string_view task_name, const NetworkStats& network_stats,
342 absl::Time time_before_checkin) {
343 event_publisher_->PublishCheckinPlanUriReceived(
344 network_stats, absl::Now() - time_before_checkin);
345 opstats_logger_->AddEventAndSetTaskName(
346 std::string(task_name),
347 OperationalStats::Event::EVENT_KIND_CHECKIN_PLAN_URI_RECEIVED);
348 }
349
LogCheckinCompleted(absl::string_view task_name,const NetworkStats & network_stats,absl::Time time_before_checkin,absl::Time time_before_plan_download,absl::Time reference_time)350 void PhaseLoggerImpl::LogCheckinCompleted(absl::string_view task_name,
351 const NetworkStats& network_stats,
352 absl::Time time_before_checkin,
353 absl::Time time_before_plan_download,
354 absl::Time reference_time) {
355 absl::Duration duration = absl::Now() - time_before_plan_download;
356 event_publisher_->PublishCheckinFinishedV2(network_stats, duration);
357 // We already have set the task name when LogCheckinPlanUriReceived was
358 // called, so we only have to add the event.
359 opstats_logger_->AddEvent(
360 OperationalStats::Event::EVENT_KIND_CHECKIN_ACCEPTED);
361 // The 'EligibilityEvalCheckinLatency' should cover the whole period from
362 // eligibility eval checkin to completion (and not just the period from EET
363 // plan URIs being received to completion).
364 LogCheckinLatency(time_before_checkin, reference_time);
365 }
366
LogComputationStarted()367 void PhaseLoggerImpl::LogComputationStarted() {
368 event_publisher_->PublishComputationStarted();
369 opstats_logger_->AddEvent(
370 OperationalStats::Event::EVENT_KIND_COMPUTATION_STARTED);
371 }
372
LogComputationInvalidArgument(absl::Status error_status,const ExampleStats & example_stats,const NetworkStats & network_stats,absl::Time run_plan_start_time)373 void PhaseLoggerImpl::LogComputationInvalidArgument(
374 absl::Status error_status, const ExampleStats& example_stats,
375 const NetworkStats& network_stats, absl::Time run_plan_start_time) {
376 std::string error_message =
377 GetErrorMessage(error_status, kComputationErrorPrefix,
378 /* keep_error_message= */ true);
379 log_manager_->LogDiag(
380 ProdDiagCode::BACKGROUND_TRAINING_FAILED_PLAN_FAILS_SANITY_CHECK);
381 event_publisher_->PublishComputationInvalidArgument(
382 error_message, example_stats, network_stats,
383 absl::Now() - run_plan_start_time);
384 opstats_logger_->AddEventWithErrorMessage(
385 OperationalStats::Event::EVENT_KIND_COMPUTATION_ERROR_INVALID_ARGUMENT,
386 error_message);
387 }
388
LogComputationIOError(absl::Status error_status,const ExampleStats & example_stats,const NetworkStats & network_stats,absl::Time run_plan_start_time)389 void PhaseLoggerImpl::LogComputationIOError(absl::Status error_status,
390 const ExampleStats& example_stats,
391 const NetworkStats& network_stats,
392 absl::Time run_plan_start_time) {
393 std::string error_message =
394 GetErrorMessage(error_status, kComputationErrorPrefix,
395 /* keep_error_message= */ true);
396 event_publisher_->PublishComputationIOError(
397 error_message, example_stats, network_stats,
398 absl::Now() - run_plan_start_time);
399 opstats_logger_->AddEventWithErrorMessage(
400 OperationalStats::Event::EVENT_KIND_COMPUTATION_ERROR_IO, error_message);
401 }
402
LogComputationExampleIteratorError(absl::Status error_status,const ExampleStats & example_stats,const NetworkStats & network_stats,absl::Time run_plan_start_time)403 void PhaseLoggerImpl::LogComputationExampleIteratorError(
404 absl::Status error_status, const ExampleStats& example_stats,
405 const NetworkStats& network_stats, absl::Time run_plan_start_time) {
406 std::string error_message = GetErrorMessage(
407 error_status, kComputationErrorPrefix, /* keep_error_message= */ true);
408 event_publisher_->PublishComputationExampleIteratorError(
409 error_message, example_stats, network_stats,
410 absl::Now() - run_plan_start_time);
411 opstats_logger_->AddEventWithErrorMessage(
412 OperationalStats::Event::EVENT_KIND_COMPUTATION_ERROR_EXAMPLE_ITERATOR,
413 error_message);
414 }
415
LogComputationTensorflowError(absl::Status error_status,const ExampleStats & example_stats,const NetworkStats & network_stats,absl::Time run_plan_start_time,absl::Time reference_time)416 void PhaseLoggerImpl::LogComputationTensorflowError(
417 absl::Status error_status, const ExampleStats& example_stats,
418 const NetworkStats& network_stats, absl::Time run_plan_start_time,
419 absl::Time reference_time) {
420 std::string error_message = GetErrorMessage(
421 error_status, kComputationErrorPrefix, log_tensorflow_error_messages_);
422 event_publisher_->PublishComputationTensorflowError(
423 error_message, example_stats, network_stats,
424 absl::Now() - run_plan_start_time);
425 opstats_logger_->AddEventWithErrorMessage(
426 OperationalStats::Event::EVENT_KIND_COMPUTATION_ERROR_TENSORFLOW,
427 error_message);
428 LogComputationLatency(run_plan_start_time, reference_time);
429 }
430
LogComputationInterrupted(absl::Status error_status,const ExampleStats & example_stats,const NetworkStats & network_stats,absl::Time run_plan_start_time,absl::Time reference_time)431 void PhaseLoggerImpl::LogComputationInterrupted(
432 absl::Status error_status, const ExampleStats& example_stats,
433 const NetworkStats& network_stats, absl::Time run_plan_start_time,
434 absl::Time reference_time) {
435 std::string error_message =
436 GetErrorMessage(error_status, kComputationErrorPrefix,
437 /* keep_error_message= */ true);
438 event_publisher_->PublishComputationInterrupted(
439 error_message, example_stats, network_stats,
440 absl::Now() - run_plan_start_time);
441 opstats_logger_->AddEventWithErrorMessage(
442 OperationalStats::Event::EVENT_KIND_COMPUTATION_CLIENT_INTERRUPTED,
443 error_message);
444 LogComputationLatency(run_plan_start_time, reference_time);
445 }
446
LogComputationCompleted(const ExampleStats & example_stats,const NetworkStats & network_stats,absl::Time run_plan_start_time,absl::Time reference_time)447 void PhaseLoggerImpl::LogComputationCompleted(const ExampleStats& example_stats,
448 const NetworkStats& network_stats,
449 absl::Time run_plan_start_time,
450 absl::Time reference_time) {
451 event_publisher_->PublishComputationCompleted(
452 example_stats, network_stats, absl::Now() - run_plan_start_time);
453 opstats_logger_->AddEvent(
454 OperationalStats::Event::EVENT_KIND_COMPUTATION_FINISHED);
455 log_manager_->LogToLongHistogram(
456 HistogramCounters::TRAINING_OVERALL_EXAMPLE_SIZE,
457 example_stats.example_size_bytes);
458 log_manager_->LogToLongHistogram(
459 HistogramCounters::TRAINING_OVERALL_EXAMPLE_COUNT,
460 example_stats.example_count);
461 LogComputationLatency(run_plan_start_time, reference_time);
462 }
463
LogResultUploadStarted()464 absl::Status PhaseLoggerImpl::LogResultUploadStarted() {
465 opstats_logger_->AddEvent(
466 OperationalStats::Event::EVENT_KIND_RESULT_UPLOAD_STARTED);
467 // Commit the run data accumulated thus far to Opstats and fail if
468 // something goes wrong.
469 FCP_RETURN_IF_ERROR(opstats_logger_->CommitToStorage());
470 event_publisher_->PublishResultUploadStarted();
471 return absl::OkStatus();
472 }
473
LogResultUploadIOError(absl::Status error_status,const NetworkStats & network_stats,absl::Time time_before_result_upload,absl::Time reference_time)474 void PhaseLoggerImpl::LogResultUploadIOError(
475 absl::Status error_status, const NetworkStats& network_stats,
476 absl::Time time_before_result_upload, absl::Time reference_time) {
477 std::string error_message =
478 GetErrorMessage(error_status, kResultUploadErrorPrefix,
479 /* keep_error_message= */ true);
480 event_publisher_->PublishResultUploadIOError(
481 error_message, network_stats, absl::Now() - time_before_result_upload);
482 opstats_logger_->AddEventWithErrorMessage(
483 OperationalStats::Event::EVENT_KIND_RESULT_UPLOAD_ERROR_IO,
484 error_message);
485 LogReportLatency(time_before_result_upload, reference_time);
486 }
487
LogResultUploadClientInterrupted(absl::Status error_status,const NetworkStats & network_stats,absl::Time time_before_result_upload,absl::Time reference_time)488 void PhaseLoggerImpl::LogResultUploadClientInterrupted(
489 absl::Status error_status, const NetworkStats& network_stats,
490 absl::Time time_before_result_upload, absl::Time reference_time) {
491 std::string error_message =
492 GetErrorMessage(error_status, kResultUploadErrorPrefix,
493 /* keep_error_message= */ true);
494 event_publisher_->PublishResultUploadClientInterrupted(
495 error_message, network_stats, absl::Now() - time_before_result_upload);
496 opstats_logger_->AddEventWithErrorMessage(
497 OperationalStats::Event::EVENT_KIND_RESULT_UPLOAD_CLIENT_INTERRUPTED,
498 error_message);
499 LogReportLatency(time_before_result_upload, reference_time);
500 }
501
LogResultUploadServerAborted(absl::Status error_status,const NetworkStats & network_stats,absl::Time time_before_result_upload,absl::Time reference_time)502 void PhaseLoggerImpl::LogResultUploadServerAborted(
503 absl::Status error_status, const NetworkStats& network_stats,
504 absl::Time time_before_result_upload, absl::Time reference_time) {
505 std::string error_message =
506 GetErrorMessage(error_status, kResultUploadErrorPrefix,
507 /* keep_error_message= */ true);
508 event_publisher_->PublishResultUploadServerAborted(
509 error_message, network_stats, absl::Now() - time_before_result_upload);
510 opstats_logger_->AddEventWithErrorMessage(
511 OperationalStats::Event::EVENT_KIND_RESULT_UPLOAD_SERVER_ABORTED,
512 error_message);
513 LogReportLatency(time_before_result_upload, reference_time);
514 }
515
LogResultUploadCompleted(const NetworkStats & network_stats,absl::Time time_before_result_upload,absl::Time reference_time)516 void PhaseLoggerImpl::LogResultUploadCompleted(
517 const NetworkStats& network_stats, absl::Time time_before_result_upload,
518 absl::Time reference_time) {
519 event_publisher_->PublishResultUploadCompleted(
520 network_stats, absl::Now() - time_before_result_upload);
521 opstats_logger_->AddEvent(
522 OperationalStats::Event::EVENT_KIND_RESULT_UPLOAD_FINISHED);
523 LogReportLatency(time_before_result_upload, reference_time);
524 }
525
LogFailureUploadStarted()526 absl::Status PhaseLoggerImpl::LogFailureUploadStarted() {
527 opstats_logger_->AddEvent(
528 OperationalStats::Event::EVENT_KIND_FAILURE_UPLOAD_STARTED);
529 // Commit the run data accumulated thus far to Opstats and fail if
530 // something goes wrong.
531 FCP_RETURN_IF_ERROR(opstats_logger_->CommitToStorage());
532 event_publisher_->PublishFailureUploadStarted();
533 return absl::OkStatus();
534 }
535
LogFailureUploadIOError(absl::Status error_status,const NetworkStats & network_stats,absl::Time time_before_failure_upload,absl::Time reference_time)536 void PhaseLoggerImpl::LogFailureUploadIOError(
537 absl::Status error_status, const NetworkStats& network_stats,
538 absl::Time time_before_failure_upload, absl::Time reference_time) {
539 std::string error_message =
540 GetErrorMessage(error_status, kFailureUploadErrorPrefix,
541 /* keep_error_message= */ true);
542 event_publisher_->PublishFailureUploadIOError(
543 error_message, network_stats, absl::Now() - time_before_failure_upload);
544 opstats_logger_->AddEventWithErrorMessage(
545 OperationalStats::Event::EVENT_KIND_FAILURE_UPLOAD_ERROR_IO,
546 error_message);
547 LogReportLatency(time_before_failure_upload, reference_time);
548 }
549
LogFailureUploadClientInterrupted(absl::Status error_status,const NetworkStats & network_stats,absl::Time time_before_failure_upload,absl::Time reference_time)550 void PhaseLoggerImpl::LogFailureUploadClientInterrupted(
551 absl::Status error_status, const NetworkStats& network_stats,
552 absl::Time time_before_failure_upload, absl::Time reference_time) {
553 std::string error_message =
554 GetErrorMessage(error_status, kFailureUploadErrorPrefix,
555 /* keep_error_message= */ true);
556 event_publisher_->PublishFailureUploadClientInterrupted(
557 error_message, network_stats, absl::Now() - time_before_failure_upload);
558 opstats_logger_->AddEventWithErrorMessage(
559 OperationalStats::Event::EVENT_KIND_FAILURE_UPLOAD_CLIENT_INTERRUPTED,
560 error_message);
561 LogReportLatency(time_before_failure_upload, reference_time);
562 }
563
LogFailureUploadServerAborted(absl::Status error_status,const NetworkStats & network_stats,absl::Time time_before_failure_upload,absl::Time reference_time)564 void PhaseLoggerImpl::LogFailureUploadServerAborted(
565 absl::Status error_status, const NetworkStats& network_stats,
566 absl::Time time_before_failure_upload, absl::Time reference_time) {
567 std::string error_message =
568 GetErrorMessage(error_status, kFailureUploadErrorPrefix,
569 /* keep_error_message= */ true);
570 event_publisher_->PublishFailureUploadServerAborted(
571 error_message, network_stats, absl::Now() - time_before_failure_upload);
572 opstats_logger_->AddEventWithErrorMessage(
573 OperationalStats::Event::EVENT_KIND_FAILURE_UPLOAD_SERVER_ABORTED,
574 error_message);
575 LogReportLatency(time_before_failure_upload, reference_time);
576 }
577
LogFailureUploadCompleted(const NetworkStats & network_stats,absl::Time time_before_failure_upload,absl::Time reference_time)578 void PhaseLoggerImpl::LogFailureUploadCompleted(
579 const NetworkStats& network_stats, absl::Time time_before_failure_upload,
580 absl::Time reference_time) {
581 event_publisher_->PublishFailureUploadCompleted(
582 network_stats, absl::Now() - time_before_failure_upload);
583 opstats_logger_->AddEvent(
584 OperationalStats::Event::EVENT_KIND_FAILURE_UPLOAD_FINISHED);
585 LogReportLatency(time_before_failure_upload, reference_time);
586 }
587
LogTimeSince(HistogramCounters histogram_counter,absl::Time reference_time)588 void PhaseLoggerImpl::LogTimeSince(HistogramCounters histogram_counter,
589 absl::Time reference_time) {
590 absl::Duration duration = absl::Now() - reference_time;
591 log_manager_->LogToLongHistogram(histogram_counter,
592 absl::ToInt64Milliseconds(duration));
593 }
594
LogEligibilityEvalCheckinLatency(absl::Time time_before_checkin)595 void PhaseLoggerImpl::LogEligibilityEvalCheckinLatency(
596 absl::Time time_before_checkin) {
597 LogTimeSince(HistogramCounters::TRAINING_FL_ELIGIBILITY_EVAL_CHECKIN_LATENCY,
598 time_before_checkin);
599 }
600
LogEligibilityEvalComputationLatency(absl::Time run_plan_start_time,absl::Time reference_time)601 void PhaseLoggerImpl::LogEligibilityEvalComputationLatency(
602 absl::Time run_plan_start_time, absl::Time reference_time) {
603 LogTimeSince(HistogramCounters::TRAINING_RUN_PHASE_LATENCY,
604 run_plan_start_time);
605 LogTimeSince(HistogramCounters::TRAINING_RUN_PHASE_END_TIME, reference_time);
606 }
607
LogCheckinLatency(absl::Time time_before_checkin,absl::Time reference_time)608 void PhaseLoggerImpl::LogCheckinLatency(absl::Time time_before_checkin,
609 absl::Time reference_time) {
610 LogTimeSince(HistogramCounters::TRAINING_FL_CHECKIN_LATENCY,
611 time_before_checkin);
612 LogTimeSince(HistogramCounters::TRAINING_FL_CHECKIN_END_TIME, reference_time);
613 }
614
LogComputationLatency(absl::Time run_plan_start_time,absl::Time reference_time)615 void PhaseLoggerImpl::LogComputationLatency(absl::Time run_plan_start_time,
616 absl::Time reference_time) {
617 LogTimeSince(HistogramCounters::TRAINING_RUN_PHASE_LATENCY,
618 run_plan_start_time);
619 LogTimeSince(HistogramCounters::TRAINING_RUN_PHASE_END_TIME, reference_time);
620 }
621
LogReportLatency(absl::Time time_before_report,absl::Time reference_time)622 void PhaseLoggerImpl::LogReportLatency(absl::Time time_before_report,
623 absl::Time reference_time) {
624 LogTimeSince(HistogramCounters::TRAINING_FL_REPORT_RESULTS_LATENCY,
625 time_before_report);
626 LogTimeSince(HistogramCounters::TRAINING_FL_REPORT_RESULTS_END_TIME,
627 reference_time);
628 }
629
GetErrorMessage(absl::Status error_status,absl::string_view error_prefix,bool keep_error_message)630 std::string PhaseLoggerImpl::GetErrorMessage(absl::Status error_status,
631 absl::string_view error_prefix,
632 bool keep_error_message) {
633 return absl::StrCat(error_prefix, "code: ", error_status.code(), ", error: ",
634 keep_error_message ? error_status.message() : "");
635 }
636
637 } // namespace client
638 } // namespace fcp
639