xref: /aosp_15_r20/external/federated-compute/fcp/client/phase_logger_impl.cc (revision 14675a029014e728ec732f129a32e299b2da0601)
1 /*
2  * Copyright 2022 Google LLC
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "fcp/client/phase_logger_impl.h"
17 
18 #include <string>
19 
20 #include "absl/time/time.h"
21 #include "fcp/base/monitoring.h"
22 
23 namespace fcp {
24 namespace client {
25 namespace {
26 constexpr absl::string_view kInitializationErrorPrefix =
27     "Error during initialization: ";
28 constexpr absl::string_view kEligibilityCheckinErrorPrefix =
29     "Error during eligibility check-in: ";
30 constexpr absl::string_view kEligibilityComputationErrorPrefix =
31     "Error during eligibility eval computation: ";
32 constexpr absl::string_view kCheckinErrorPrefix = "Error during check-in: ";
33 constexpr absl::string_view kComputationErrorPrefix =
34     "Error during computation: ";
35 constexpr absl::string_view kResultUploadErrorPrefix =
36     "Error reporting results: ";
37 constexpr absl::string_view kFailureUploadErrorPrefix =
38     "Error reporting computation failure: ";
39 }  // anonymous namespace
40 
41 using ::fcp::client::opstats::OperationalStats;
42 using ::google::internal::federatedml::v2::RetryWindow;
43 
UpdateRetryWindowAndNetworkStats(const RetryWindow & retry_window,const NetworkStats & network_stats)44 void PhaseLoggerImpl::UpdateRetryWindowAndNetworkStats(
45     const RetryWindow& retry_window, const NetworkStats& network_stats) {
46   opstats_logger_->SetRetryWindow(retry_window);
47 
48   // Update the network stats.
49   opstats_logger_->SetNetworkStats(network_stats);
50 }
51 
SetModelIdentifier(absl::string_view model_identifier)52 void PhaseLoggerImpl::SetModelIdentifier(absl::string_view model_identifier) {
53   event_publisher_->SetModelIdentifier(std::string(model_identifier));
54   log_manager_->SetModelIdentifier(std::string(model_identifier));
55 }
56 
LogTaskNotStarted(absl::string_view error_message)57 void PhaseLoggerImpl::LogTaskNotStarted(absl::string_view error_message) {
58   event_publisher_->PublishTaskNotStarted(error_message);
59   opstats_logger_->AddEventWithErrorMessage(
60       OperationalStats::Event::EVENT_KIND_TRAIN_NOT_STARTED,
61       std::string(error_message));
62 }
63 
LogNonfatalInitializationError(absl::Status error_status)64 void PhaseLoggerImpl::LogNonfatalInitializationError(
65     absl::Status error_status) {
66   std::string error_message = GetErrorMessage(
67       error_status, kInitializationErrorPrefix, /* keep_error_message= */ true);
68   event_publisher_->PublishNonfatalInitializationError(error_message);
69   opstats_logger_->AddEventWithErrorMessage(
70       OperationalStats::Event::EVENT_KIND_INITIALIZATION_ERROR_FATAL,
71       error_message);
72 }
73 
LogFatalInitializationError(absl::Status error_status)74 void PhaseLoggerImpl::LogFatalInitializationError(absl::Status error_status) {
75   std::string error_message = GetErrorMessage(
76       error_status, kInitializationErrorPrefix, /* keep_error_message= */ true);
77   event_publisher_->PublishFatalInitializationError(error_message);
78   opstats_logger_->AddEventWithErrorMessage(
79       OperationalStats::Event::EVENT_KIND_INITIALIZATION_ERROR_NONFATAL,
80       error_message);
81 }
82 
LogEligibilityEvalCheckinStarted()83 void PhaseLoggerImpl::LogEligibilityEvalCheckinStarted() {
84   event_publisher_->PublishEligibilityEvalCheckin();
85   opstats_logger_->AddEvent(
86       OperationalStats::Event::EVENT_KIND_ELIGIBILITY_CHECKIN_STARTED);
87 }
88 
LogEligibilityEvalCheckinIOError(absl::Status error_status,const NetworkStats & network_stats,absl::Time time_before_checkin)89 void PhaseLoggerImpl::LogEligibilityEvalCheckinIOError(
90     absl::Status error_status, const NetworkStats& network_stats,
91     absl::Time time_before_checkin) {
92   std::string error_message =
93       GetErrorMessage(error_status, kEligibilityCheckinErrorPrefix,
94                       /* keep_error_message= */ true);
95   event_publisher_->PublishEligibilityEvalCheckinIoError(
96       error_message, network_stats, absl::Now() - time_before_checkin);
97   opstats_logger_->AddEventWithErrorMessage(
98       OperationalStats::Event::EVENT_KIND_ELIGIBILITY_CHECKIN_ERROR_IO,
99       error_message);
100   LogEligibilityEvalCheckinLatency(time_before_checkin);
101 }
102 
LogEligibilityEvalCheckinClientInterrupted(absl::Status error_status,const NetworkStats & network_stats,absl::Time time_before_checkin)103 void PhaseLoggerImpl::LogEligibilityEvalCheckinClientInterrupted(
104     absl::Status error_status, const NetworkStats& network_stats,
105     absl::Time time_before_checkin) {
106   std::string error_message =
107       GetErrorMessage(error_status, kEligibilityCheckinErrorPrefix,
108                       /* keep_error_message= */ true);
109   event_publisher_->PublishEligibilityEvalCheckinClientInterrupted(
110       error_message, network_stats, absl::Now() - time_before_checkin);
111   opstats_logger_->AddEventWithErrorMessage(
112       OperationalStats::Event::
113           EVENT_KIND_ELIGIBILITY_CHECKIN_CLIENT_INTERRUPTED,
114       error_message);
115   LogEligibilityEvalCheckinLatency(time_before_checkin);
116 }
117 
LogEligibilityEvalCheckinServerAborted(absl::Status error_status,const NetworkStats & network_stats,absl::Time time_before_checkin)118 void PhaseLoggerImpl::LogEligibilityEvalCheckinServerAborted(
119     absl::Status error_status, const NetworkStats& network_stats,
120     absl::Time time_before_checkin) {
121   std::string error_message =
122       GetErrorMessage(error_status, kEligibilityCheckinErrorPrefix,
123                       /* keep_error_message= */ true);
124   event_publisher_->PublishEligibilityEvalCheckinServerAborted(
125       error_message, network_stats, absl::Now() - time_before_checkin);
126   opstats_logger_->AddEventWithErrorMessage(
127       OperationalStats::Event::EVENT_KIND_ELIGIBILITY_CHECKIN_SERVER_ABORTED,
128       error_message);
129   LogEligibilityEvalCheckinLatency(time_before_checkin);
130 }
131 
LogEligibilityEvalNotConfigured(const NetworkStats & network_stats,absl::Time time_before_checkin)132 void PhaseLoggerImpl::LogEligibilityEvalNotConfigured(
133     const NetworkStats& network_stats, absl::Time time_before_checkin) {
134   event_publisher_->PublishEligibilityEvalNotConfigured(
135       network_stats, absl::Now() - time_before_checkin);
136   opstats_logger_->AddEvent(
137       OperationalStats::Event::EVENT_KIND_ELIGIBILITY_DISABLED);
138   LogEligibilityEvalCheckinLatency(time_before_checkin);
139 }
140 
LogEligibilityEvalCheckinTurnedAway(const NetworkStats & network_stats,absl::Time time_before_checkin)141 void PhaseLoggerImpl::LogEligibilityEvalCheckinTurnedAway(
142     const NetworkStats& network_stats, absl::Time time_before_checkin) {
143   event_publisher_->PublishEligibilityEvalRejected(
144       network_stats, absl::Now() - time_before_checkin);
145   opstats_logger_->AddEvent(
146       OperationalStats::Event::EVENT_KIND_ELIGIBILITY_REJECTED);
147   LogEligibilityEvalCheckinLatency(time_before_checkin);
148 }
149 
LogEligibilityEvalCheckinInvalidPayloadError(absl::string_view error_message,const NetworkStats & network_stats,absl::Time time_before_checkin)150 void PhaseLoggerImpl::LogEligibilityEvalCheckinInvalidPayloadError(
151     absl::string_view error_message, const NetworkStats& network_stats,
152     absl::Time time_before_checkin) {
153   log_manager_->LogDiag(
154       ProdDiagCode::
155           BACKGROUND_TRAINING_ELIGIBILITY_EVAL_FAILED_CANNOT_PARSE_PLAN);
156   event_publisher_->PublishEligibilityEvalCheckinErrorInvalidPayload(
157       error_message, network_stats, absl::Now() - time_before_checkin);
158   opstats_logger_->AddEventWithErrorMessage(
159       OperationalStats::Event::
160           EVENT_KIND_ELIGIBILITY_CHECKIN_ERROR_INVALID_PAYLOAD,
161       std::string(error_message));
162   LogEligibilityEvalCheckinLatency(time_before_checkin);
163 }
164 
LogEligibilityEvalCheckinPlanUriReceived(const NetworkStats & network_stats,absl::Time time_before_checkin)165 void PhaseLoggerImpl::LogEligibilityEvalCheckinPlanUriReceived(
166     const NetworkStats& network_stats, absl::Time time_before_checkin) {
167   opstats_logger_->AddEvent(
168       OperationalStats::Event::EVENT_KIND_ELIGIBILITY_PLAN_URI_RECEIVED);
169   event_publisher_->PublishEligibilityEvalPlanUriReceived(
170       network_stats, absl::Now() - time_before_checkin);
171 }
172 
LogEligibilityEvalCheckinCompleted(const NetworkStats & network_stats,absl::Time time_before_checkin,absl::Time time_before_plan_download)173 void PhaseLoggerImpl::LogEligibilityEvalCheckinCompleted(
174     const NetworkStats& network_stats, absl::Time time_before_checkin,
175     absl::Time time_before_plan_download) {
176   opstats_logger_->AddEvent(
177       OperationalStats::Event::EVENT_KIND_ELIGIBILITY_ENABLED);
178   absl::Time before_time = time_before_plan_download;
179   event_publisher_->PublishEligibilityEvalPlanReceived(
180       network_stats, absl::Now() - before_time);
181 
182   // The 'EligibilityEvalCheckinLatency' should cover the whole period from
183   // eligibility eval checkin to completion (and not just the period from EET
184   // plan URIs being received to completion).
185   LogEligibilityEvalCheckinLatency(time_before_checkin);
186 }
187 
LogEligibilityEvalComputationStarted()188 void PhaseLoggerImpl::LogEligibilityEvalComputationStarted() {
189   event_publisher_->PublishEligibilityEvalComputationStarted();
190   opstats_logger_->AddEvent(
191       OperationalStats::Event::EVENT_KIND_ELIGIBILITY_COMPUTATION_STARTED);
192 }
193 
LogEligibilityEvalComputationInvalidArgument(absl::Status error_status,const ExampleStats & example_stats,absl::Time run_plan_start_time)194 void PhaseLoggerImpl::LogEligibilityEvalComputationInvalidArgument(
195     absl::Status error_status, const ExampleStats& example_stats,
196     absl::Time run_plan_start_time) {
197   std::string error_message =
198       GetErrorMessage(error_status, kEligibilityComputationErrorPrefix,
199                       /* keep_error_message= */ true);
200   log_manager_->LogDiag(
201       ProdDiagCode::BACKGROUND_TRAINING_FAILED_PLAN_FAILS_SANITY_CHECK);
202   event_publisher_->PublishEligibilityEvalComputationInvalidArgument(
203       error_message, example_stats, absl::Now() - run_plan_start_time);
204   opstats_logger_->AddEventWithErrorMessage(
205       OperationalStats::Event::
206           EVENT_KIND_ELIGIBILITY_COMPUTATION_ERROR_INVALID_ARGUMENT,
207       error_message);
208 }
209 
LogEligibilityEvalComputationExampleIteratorError(absl::Status error_status,const ExampleStats & example_stats,absl::Time run_plan_start_time)210 void PhaseLoggerImpl::LogEligibilityEvalComputationExampleIteratorError(
211     absl::Status error_status, const ExampleStats& example_stats,
212     absl::Time run_plan_start_time) {
213   std::string error_message =
214       GetErrorMessage(error_status, kEligibilityComputationErrorPrefix,
215                       /* keep_error_message= */ true);
216   event_publisher_->PublishEligibilityEvalComputationExampleIteratorError(
217       error_message, example_stats, absl::Now() - run_plan_start_time);
218   opstats_logger_->AddEventWithErrorMessage(
219       OperationalStats::Event::
220           EVENT_KIND_ELIGIBILITY_COMPUTATION_ERROR_EXAMPLE_ITERATOR,
221       error_message);
222 }
223 
LogEligibilityEvalComputationTensorflowError(absl::Status error_status,const ExampleStats & example_stats,absl::Time run_plan_start_time,absl::Time reference_time)224 void PhaseLoggerImpl::LogEligibilityEvalComputationTensorflowError(
225     absl::Status error_status, const ExampleStats& example_stats,
226     absl::Time run_plan_start_time, absl::Time reference_time) {
227   std::string error_message =
228       GetErrorMessage(error_status, kEligibilityComputationErrorPrefix,
229                       log_tensorflow_error_messages_);
230   event_publisher_->PublishEligibilityEvalComputationTensorflowError(
231       error_message, example_stats, absl::Now() - run_plan_start_time);
232   opstats_logger_->AddEventWithErrorMessage(
233       OperationalStats::Event::
234           EVENT_KIND_ELIGIBILITY_COMPUTATION_ERROR_TENSORFLOW,
235       error_message);
236   LogEligibilityEvalComputationLatency(run_plan_start_time, reference_time);
237 }
238 
LogEligibilityEvalComputationInterrupted(absl::Status error_status,const ExampleStats & example_stats,absl::Time run_plan_start_time,absl::Time reference_time)239 void PhaseLoggerImpl::LogEligibilityEvalComputationInterrupted(
240     absl::Status error_status, const ExampleStats& example_stats,
241     absl::Time run_plan_start_time, absl::Time reference_time) {
242   std::string error_message =
243       GetErrorMessage(error_status, kEligibilityComputationErrorPrefix,
244                       /* keep_error_message= */ true);
245   event_publisher_->PublishEligibilityEvalComputationInterrupted(
246       error_message, example_stats, absl::Now() - run_plan_start_time);
247   opstats_logger_->AddEventWithErrorMessage(
248       OperationalStats::Event::
249           EVENT_KIND_ELIGIBILITY_COMPUTATION_CLIENT_INTERRUPTED,
250       error_message);
251   LogEligibilityEvalComputationLatency(run_plan_start_time, reference_time);
252 }
253 
LogEligibilityEvalComputationCompleted(const ExampleStats & example_stats,absl::Time run_plan_start_time,absl::Time reference_time)254 void PhaseLoggerImpl::LogEligibilityEvalComputationCompleted(
255     const ExampleStats& example_stats, absl::Time run_plan_start_time,
256     absl::Time reference_time) {
257   event_publisher_->PublishEligibilityEvalComputationCompleted(
258       example_stats, absl::Now() - run_plan_start_time);
259   opstats_logger_->AddEvent(
260       OperationalStats::Event::EVENT_KIND_ELIGIBILITY_COMPUTATION_FINISHED);
261   log_manager_->LogToLongHistogram(
262       HistogramCounters::TRAINING_OVERALL_EXAMPLE_SIZE,
263       example_stats.example_size_bytes);
264   log_manager_->LogToLongHistogram(
265       HistogramCounters::TRAINING_OVERALL_EXAMPLE_COUNT,
266       example_stats.example_count);
267   LogEligibilityEvalComputationLatency(run_plan_start_time, reference_time);
268 }
269 
LogCheckinStarted()270 void PhaseLoggerImpl::LogCheckinStarted() {
271   // Log that we are about to check in with the server.
272   event_publisher_->PublishCheckin();
273   opstats_logger_->AddEvent(
274       OperationalStats::Event::EVENT_KIND_CHECKIN_STARTED);
275 }
276 
LogCheckinIOError(absl::Status error_status,const NetworkStats & network_stats,absl::Time time_before_checkin,absl::Time reference_time)277 void PhaseLoggerImpl::LogCheckinIOError(absl::Status error_status,
278                                         const NetworkStats& network_stats,
279                                         absl::Time time_before_checkin,
280                                         absl::Time reference_time) {
281   std::string error_message = GetErrorMessage(error_status, kCheckinErrorPrefix,
282                                               /* keep_error_message= */ true);
283   event_publisher_->PublishCheckinIoError(error_message, network_stats,
284                                           absl::Now() - time_before_checkin);
285   opstats_logger_->AddEventWithErrorMessage(
286       OperationalStats::Event::EVENT_KIND_CHECKIN_ERROR_IO, error_message);
287   LogCheckinLatency(time_before_checkin, reference_time);
288 }
289 
LogCheckinClientInterrupted(absl::Status error_status,const NetworkStats & network_stats,absl::Time time_before_checkin,absl::Time reference_time)290 void PhaseLoggerImpl::LogCheckinClientInterrupted(
291     absl::Status error_status, const NetworkStats& network_stats,
292     absl::Time time_before_checkin, absl::Time reference_time) {
293   std::string error_message = GetErrorMessage(error_status, kCheckinErrorPrefix,
294                                               /* keep_error_message= */ true);
295   event_publisher_->PublishCheckinClientInterrupted(
296       error_message, network_stats, absl::Now() - time_before_checkin);
297   opstats_logger_->AddEventWithErrorMessage(
298       OperationalStats::Event::EVENT_KIND_CHECKIN_CLIENT_INTERRUPTED,
299       error_message);
300   LogCheckinLatency(time_before_checkin, reference_time);
301 }
302 
LogCheckinServerAborted(absl::Status error_status,const NetworkStats & network_stats,absl::Time time_before_checkin,absl::Time reference_time)303 void PhaseLoggerImpl::LogCheckinServerAborted(absl::Status error_status,
304                                               const NetworkStats& network_stats,
305                                               absl::Time time_before_checkin,
306                                               absl::Time reference_time) {
307   std::string error_message = GetErrorMessage(error_status, kCheckinErrorPrefix,
308                                               /* keep_error_message= */ true);
309   event_publisher_->PublishCheckinServerAborted(
310       error_message, network_stats, absl::Now() - time_before_checkin);
311   opstats_logger_->AddEventWithErrorMessage(
312       OperationalStats::Event::EVENT_KIND_CHECKIN_SERVER_ABORTED,
313       error_message);
314   LogCheckinLatency(time_before_checkin, reference_time);
315 }
316 
LogCheckinTurnedAway(const NetworkStats & network_stats,absl::Time time_before_checkin,absl::Time reference_time)317 void PhaseLoggerImpl::LogCheckinTurnedAway(const NetworkStats& network_stats,
318                                            absl::Time time_before_checkin,
319                                            absl::Time reference_time) {
320   event_publisher_->PublishRejected(network_stats,
321                                     absl::Now() - time_before_checkin);
322   opstats_logger_->AddEvent(
323       OperationalStats::Event::EVENT_KIND_CHECKIN_REJECTED);
324   LogCheckinLatency(time_before_checkin, reference_time);
325 }
326 
LogCheckinInvalidPayload(absl::string_view error_message,const NetworkStats & network_stats,absl::Time time_before_checkin,absl::Time reference_time)327 void PhaseLoggerImpl::LogCheckinInvalidPayload(
328     absl::string_view error_message, const NetworkStats& network_stats,
329     absl::Time time_before_checkin, absl::Time reference_time) {
330   log_manager_->LogDiag(
331       ProdDiagCode::BACKGROUND_TRAINING_FAILED_CANNOT_PARSE_PLAN);
332   event_publisher_->PublishCheckinInvalidPayload(
333       error_message, network_stats, absl::Now() - time_before_checkin);
334   opstats_logger_->AddEventWithErrorMessage(
335       OperationalStats::Event::EVENT_KIND_CHECKIN_ERROR_INVALID_PAYLOAD,
336       std::string(error_message));
337   LogCheckinLatency(time_before_checkin, reference_time);
338 }
339 
LogCheckinPlanUriReceived(absl::string_view task_name,const NetworkStats & network_stats,absl::Time time_before_checkin)340 void PhaseLoggerImpl::LogCheckinPlanUriReceived(
341     absl::string_view task_name, const NetworkStats& network_stats,
342     absl::Time time_before_checkin) {
343   event_publisher_->PublishCheckinPlanUriReceived(
344       network_stats, absl::Now() - time_before_checkin);
345   opstats_logger_->AddEventAndSetTaskName(
346       std::string(task_name),
347       OperationalStats::Event::EVENT_KIND_CHECKIN_PLAN_URI_RECEIVED);
348 }
349 
LogCheckinCompleted(absl::string_view task_name,const NetworkStats & network_stats,absl::Time time_before_checkin,absl::Time time_before_plan_download,absl::Time reference_time)350 void PhaseLoggerImpl::LogCheckinCompleted(absl::string_view task_name,
351                                           const NetworkStats& network_stats,
352                                           absl::Time time_before_checkin,
353                                           absl::Time time_before_plan_download,
354                                           absl::Time reference_time) {
355   absl::Duration duration = absl::Now() - time_before_plan_download;
356   event_publisher_->PublishCheckinFinishedV2(network_stats, duration);
357   // We already have set the task name when LogCheckinPlanUriReceived was
358   // called, so we only have to add the event.
359   opstats_logger_->AddEvent(
360       OperationalStats::Event::EVENT_KIND_CHECKIN_ACCEPTED);
361   // The 'EligibilityEvalCheckinLatency' should cover the whole period from
362   // eligibility eval checkin to completion (and not just the period from EET
363   // plan URIs being received to completion).
364   LogCheckinLatency(time_before_checkin, reference_time);
365 }
366 
LogComputationStarted()367 void PhaseLoggerImpl::LogComputationStarted() {
368   event_publisher_->PublishComputationStarted();
369   opstats_logger_->AddEvent(
370       OperationalStats::Event::EVENT_KIND_COMPUTATION_STARTED);
371 }
372 
LogComputationInvalidArgument(absl::Status error_status,const ExampleStats & example_stats,const NetworkStats & network_stats,absl::Time run_plan_start_time)373 void PhaseLoggerImpl::LogComputationInvalidArgument(
374     absl::Status error_status, const ExampleStats& example_stats,
375     const NetworkStats& network_stats, absl::Time run_plan_start_time) {
376   std::string error_message =
377       GetErrorMessage(error_status, kComputationErrorPrefix,
378                       /* keep_error_message= */ true);
379   log_manager_->LogDiag(
380       ProdDiagCode::BACKGROUND_TRAINING_FAILED_PLAN_FAILS_SANITY_CHECK);
381   event_publisher_->PublishComputationInvalidArgument(
382       error_message, example_stats, network_stats,
383       absl::Now() - run_plan_start_time);
384   opstats_logger_->AddEventWithErrorMessage(
385       OperationalStats::Event::EVENT_KIND_COMPUTATION_ERROR_INVALID_ARGUMENT,
386       error_message);
387 }
388 
LogComputationIOError(absl::Status error_status,const ExampleStats & example_stats,const NetworkStats & network_stats,absl::Time run_plan_start_time)389 void PhaseLoggerImpl::LogComputationIOError(absl::Status error_status,
390                                             const ExampleStats& example_stats,
391                                             const NetworkStats& network_stats,
392                                             absl::Time run_plan_start_time) {
393   std::string error_message =
394       GetErrorMessage(error_status, kComputationErrorPrefix,
395                       /* keep_error_message= */ true);
396   event_publisher_->PublishComputationIOError(
397       error_message, example_stats, network_stats,
398       absl::Now() - run_plan_start_time);
399   opstats_logger_->AddEventWithErrorMessage(
400       OperationalStats::Event::EVENT_KIND_COMPUTATION_ERROR_IO, error_message);
401 }
402 
LogComputationExampleIteratorError(absl::Status error_status,const ExampleStats & example_stats,const NetworkStats & network_stats,absl::Time run_plan_start_time)403 void PhaseLoggerImpl::LogComputationExampleIteratorError(
404     absl::Status error_status, const ExampleStats& example_stats,
405     const NetworkStats& network_stats, absl::Time run_plan_start_time) {
406   std::string error_message = GetErrorMessage(
407       error_status, kComputationErrorPrefix, /* keep_error_message= */ true);
408   event_publisher_->PublishComputationExampleIteratorError(
409       error_message, example_stats, network_stats,
410       absl::Now() - run_plan_start_time);
411   opstats_logger_->AddEventWithErrorMessage(
412       OperationalStats::Event::EVENT_KIND_COMPUTATION_ERROR_EXAMPLE_ITERATOR,
413       error_message);
414 }
415 
LogComputationTensorflowError(absl::Status error_status,const ExampleStats & example_stats,const NetworkStats & network_stats,absl::Time run_plan_start_time,absl::Time reference_time)416 void PhaseLoggerImpl::LogComputationTensorflowError(
417     absl::Status error_status, const ExampleStats& example_stats,
418     const NetworkStats& network_stats, absl::Time run_plan_start_time,
419     absl::Time reference_time) {
420   std::string error_message = GetErrorMessage(
421       error_status, kComputationErrorPrefix, log_tensorflow_error_messages_);
422   event_publisher_->PublishComputationTensorflowError(
423       error_message, example_stats, network_stats,
424       absl::Now() - run_plan_start_time);
425   opstats_logger_->AddEventWithErrorMessage(
426       OperationalStats::Event::EVENT_KIND_COMPUTATION_ERROR_TENSORFLOW,
427       error_message);
428   LogComputationLatency(run_plan_start_time, reference_time);
429 }
430 
LogComputationInterrupted(absl::Status error_status,const ExampleStats & example_stats,const NetworkStats & network_stats,absl::Time run_plan_start_time,absl::Time reference_time)431 void PhaseLoggerImpl::LogComputationInterrupted(
432     absl::Status error_status, const ExampleStats& example_stats,
433     const NetworkStats& network_stats, absl::Time run_plan_start_time,
434     absl::Time reference_time) {
435   std::string error_message =
436       GetErrorMessage(error_status, kComputationErrorPrefix,
437                       /* keep_error_message= */ true);
438   event_publisher_->PublishComputationInterrupted(
439       error_message, example_stats, network_stats,
440       absl::Now() - run_plan_start_time);
441   opstats_logger_->AddEventWithErrorMessage(
442       OperationalStats::Event::EVENT_KIND_COMPUTATION_CLIENT_INTERRUPTED,
443       error_message);
444   LogComputationLatency(run_plan_start_time, reference_time);
445 }
446 
LogComputationCompleted(const ExampleStats & example_stats,const NetworkStats & network_stats,absl::Time run_plan_start_time,absl::Time reference_time)447 void PhaseLoggerImpl::LogComputationCompleted(const ExampleStats& example_stats,
448                                               const NetworkStats& network_stats,
449                                               absl::Time run_plan_start_time,
450                                               absl::Time reference_time) {
451   event_publisher_->PublishComputationCompleted(
452       example_stats, network_stats, absl::Now() - run_plan_start_time);
453   opstats_logger_->AddEvent(
454       OperationalStats::Event::EVENT_KIND_COMPUTATION_FINISHED);
455   log_manager_->LogToLongHistogram(
456       HistogramCounters::TRAINING_OVERALL_EXAMPLE_SIZE,
457       example_stats.example_size_bytes);
458   log_manager_->LogToLongHistogram(
459       HistogramCounters::TRAINING_OVERALL_EXAMPLE_COUNT,
460       example_stats.example_count);
461   LogComputationLatency(run_plan_start_time, reference_time);
462 }
463 
LogResultUploadStarted()464 absl::Status PhaseLoggerImpl::LogResultUploadStarted() {
465   opstats_logger_->AddEvent(
466       OperationalStats::Event::EVENT_KIND_RESULT_UPLOAD_STARTED);
467   // Commit the run data accumulated thus far to Opstats and fail if
468   // something goes wrong.
469   FCP_RETURN_IF_ERROR(opstats_logger_->CommitToStorage());
470   event_publisher_->PublishResultUploadStarted();
471   return absl::OkStatus();
472 }
473 
LogResultUploadIOError(absl::Status error_status,const NetworkStats & network_stats,absl::Time time_before_result_upload,absl::Time reference_time)474 void PhaseLoggerImpl::LogResultUploadIOError(
475     absl::Status error_status, const NetworkStats& network_stats,
476     absl::Time time_before_result_upload, absl::Time reference_time) {
477   std::string error_message =
478       GetErrorMessage(error_status, kResultUploadErrorPrefix,
479                       /* keep_error_message= */ true);
480   event_publisher_->PublishResultUploadIOError(
481       error_message, network_stats, absl::Now() - time_before_result_upload);
482   opstats_logger_->AddEventWithErrorMessage(
483       OperationalStats::Event::EVENT_KIND_RESULT_UPLOAD_ERROR_IO,
484       error_message);
485   LogReportLatency(time_before_result_upload, reference_time);
486 }
487 
LogResultUploadClientInterrupted(absl::Status error_status,const NetworkStats & network_stats,absl::Time time_before_result_upload,absl::Time reference_time)488 void PhaseLoggerImpl::LogResultUploadClientInterrupted(
489     absl::Status error_status, const NetworkStats& network_stats,
490     absl::Time time_before_result_upload, absl::Time reference_time) {
491   std::string error_message =
492       GetErrorMessage(error_status, kResultUploadErrorPrefix,
493                       /* keep_error_message= */ true);
494   event_publisher_->PublishResultUploadClientInterrupted(
495       error_message, network_stats, absl::Now() - time_before_result_upload);
496   opstats_logger_->AddEventWithErrorMessage(
497       OperationalStats::Event::EVENT_KIND_RESULT_UPLOAD_CLIENT_INTERRUPTED,
498       error_message);
499   LogReportLatency(time_before_result_upload, reference_time);
500 }
501 
LogResultUploadServerAborted(absl::Status error_status,const NetworkStats & network_stats,absl::Time time_before_result_upload,absl::Time reference_time)502 void PhaseLoggerImpl::LogResultUploadServerAborted(
503     absl::Status error_status, const NetworkStats& network_stats,
504     absl::Time time_before_result_upload, absl::Time reference_time) {
505   std::string error_message =
506       GetErrorMessage(error_status, kResultUploadErrorPrefix,
507                       /* keep_error_message= */ true);
508   event_publisher_->PublishResultUploadServerAborted(
509       error_message, network_stats, absl::Now() - time_before_result_upload);
510   opstats_logger_->AddEventWithErrorMessage(
511       OperationalStats::Event::EVENT_KIND_RESULT_UPLOAD_SERVER_ABORTED,
512       error_message);
513   LogReportLatency(time_before_result_upload, reference_time);
514 }
515 
LogResultUploadCompleted(const NetworkStats & network_stats,absl::Time time_before_result_upload,absl::Time reference_time)516 void PhaseLoggerImpl::LogResultUploadCompleted(
517     const NetworkStats& network_stats, absl::Time time_before_result_upload,
518     absl::Time reference_time) {
519   event_publisher_->PublishResultUploadCompleted(
520       network_stats, absl::Now() - time_before_result_upload);
521   opstats_logger_->AddEvent(
522       OperationalStats::Event::EVENT_KIND_RESULT_UPLOAD_FINISHED);
523   LogReportLatency(time_before_result_upload, reference_time);
524 }
525 
LogFailureUploadStarted()526 absl::Status PhaseLoggerImpl::LogFailureUploadStarted() {
527   opstats_logger_->AddEvent(
528       OperationalStats::Event::EVENT_KIND_FAILURE_UPLOAD_STARTED);
529   // Commit the run data accumulated thus far to Opstats and fail if
530   // something goes wrong.
531   FCP_RETURN_IF_ERROR(opstats_logger_->CommitToStorage());
532   event_publisher_->PublishFailureUploadStarted();
533   return absl::OkStatus();
534 }
535 
LogFailureUploadIOError(absl::Status error_status,const NetworkStats & network_stats,absl::Time time_before_failure_upload,absl::Time reference_time)536 void PhaseLoggerImpl::LogFailureUploadIOError(
537     absl::Status error_status, const NetworkStats& network_stats,
538     absl::Time time_before_failure_upload, absl::Time reference_time) {
539   std::string error_message =
540       GetErrorMessage(error_status, kFailureUploadErrorPrefix,
541                       /* keep_error_message= */ true);
542   event_publisher_->PublishFailureUploadIOError(
543       error_message, network_stats, absl::Now() - time_before_failure_upload);
544   opstats_logger_->AddEventWithErrorMessage(
545       OperationalStats::Event::EVENT_KIND_FAILURE_UPLOAD_ERROR_IO,
546       error_message);
547   LogReportLatency(time_before_failure_upload, reference_time);
548 }
549 
LogFailureUploadClientInterrupted(absl::Status error_status,const NetworkStats & network_stats,absl::Time time_before_failure_upload,absl::Time reference_time)550 void PhaseLoggerImpl::LogFailureUploadClientInterrupted(
551     absl::Status error_status, const NetworkStats& network_stats,
552     absl::Time time_before_failure_upload, absl::Time reference_time) {
553   std::string error_message =
554       GetErrorMessage(error_status, kFailureUploadErrorPrefix,
555                       /* keep_error_message= */ true);
556   event_publisher_->PublishFailureUploadClientInterrupted(
557       error_message, network_stats, absl::Now() - time_before_failure_upload);
558   opstats_logger_->AddEventWithErrorMessage(
559       OperationalStats::Event::EVENT_KIND_FAILURE_UPLOAD_CLIENT_INTERRUPTED,
560       error_message);
561   LogReportLatency(time_before_failure_upload, reference_time);
562 }
563 
LogFailureUploadServerAborted(absl::Status error_status,const NetworkStats & network_stats,absl::Time time_before_failure_upload,absl::Time reference_time)564 void PhaseLoggerImpl::LogFailureUploadServerAborted(
565     absl::Status error_status, const NetworkStats& network_stats,
566     absl::Time time_before_failure_upload, absl::Time reference_time) {
567   std::string error_message =
568       GetErrorMessage(error_status, kFailureUploadErrorPrefix,
569                       /* keep_error_message= */ true);
570   event_publisher_->PublishFailureUploadServerAborted(
571       error_message, network_stats, absl::Now() - time_before_failure_upload);
572   opstats_logger_->AddEventWithErrorMessage(
573       OperationalStats::Event::EVENT_KIND_FAILURE_UPLOAD_SERVER_ABORTED,
574       error_message);
575   LogReportLatency(time_before_failure_upload, reference_time);
576 }
577 
LogFailureUploadCompleted(const NetworkStats & network_stats,absl::Time time_before_failure_upload,absl::Time reference_time)578 void PhaseLoggerImpl::LogFailureUploadCompleted(
579     const NetworkStats& network_stats, absl::Time time_before_failure_upload,
580     absl::Time reference_time) {
581   event_publisher_->PublishFailureUploadCompleted(
582       network_stats, absl::Now() - time_before_failure_upload);
583   opstats_logger_->AddEvent(
584       OperationalStats::Event::EVENT_KIND_FAILURE_UPLOAD_FINISHED);
585   LogReportLatency(time_before_failure_upload, reference_time);
586 }
587 
LogTimeSince(HistogramCounters histogram_counter,absl::Time reference_time)588 void PhaseLoggerImpl::LogTimeSince(HistogramCounters histogram_counter,
589                                    absl::Time reference_time) {
590   absl::Duration duration = absl::Now() - reference_time;
591   log_manager_->LogToLongHistogram(histogram_counter,
592                                    absl::ToInt64Milliseconds(duration));
593 }
594 
LogEligibilityEvalCheckinLatency(absl::Time time_before_checkin)595 void PhaseLoggerImpl::LogEligibilityEvalCheckinLatency(
596     absl::Time time_before_checkin) {
597   LogTimeSince(HistogramCounters::TRAINING_FL_ELIGIBILITY_EVAL_CHECKIN_LATENCY,
598                time_before_checkin);
599 }
600 
LogEligibilityEvalComputationLatency(absl::Time run_plan_start_time,absl::Time reference_time)601 void PhaseLoggerImpl::LogEligibilityEvalComputationLatency(
602     absl::Time run_plan_start_time, absl::Time reference_time) {
603   LogTimeSince(HistogramCounters::TRAINING_RUN_PHASE_LATENCY,
604                run_plan_start_time);
605   LogTimeSince(HistogramCounters::TRAINING_RUN_PHASE_END_TIME, reference_time);
606 }
607 
LogCheckinLatency(absl::Time time_before_checkin,absl::Time reference_time)608 void PhaseLoggerImpl::LogCheckinLatency(absl::Time time_before_checkin,
609                                         absl::Time reference_time) {
610   LogTimeSince(HistogramCounters::TRAINING_FL_CHECKIN_LATENCY,
611                time_before_checkin);
612   LogTimeSince(HistogramCounters::TRAINING_FL_CHECKIN_END_TIME, reference_time);
613 }
614 
LogComputationLatency(absl::Time run_plan_start_time,absl::Time reference_time)615 void PhaseLoggerImpl::LogComputationLatency(absl::Time run_plan_start_time,
616                                             absl::Time reference_time) {
617   LogTimeSince(HistogramCounters::TRAINING_RUN_PHASE_LATENCY,
618                run_plan_start_time);
619   LogTimeSince(HistogramCounters::TRAINING_RUN_PHASE_END_TIME, reference_time);
620 }
621 
LogReportLatency(absl::Time time_before_report,absl::Time reference_time)622 void PhaseLoggerImpl::LogReportLatency(absl::Time time_before_report,
623                                        absl::Time reference_time) {
624   LogTimeSince(HistogramCounters::TRAINING_FL_REPORT_RESULTS_LATENCY,
625                time_before_report);
626   LogTimeSince(HistogramCounters::TRAINING_FL_REPORT_RESULTS_END_TIME,
627                reference_time);
628 }
629 
GetErrorMessage(absl::Status error_status,absl::string_view error_prefix,bool keep_error_message)630 std::string PhaseLoggerImpl::GetErrorMessage(absl::Status error_status,
631                                              absl::string_view error_prefix,
632                                              bool keep_error_message) {
633   return absl::StrCat(error_prefix, "code: ", error_status.code(), ", error: ",
634                       keep_error_message ? error_status.message() : "");
635 }
636 
637 }  // namespace client
638 }  // namespace fcp
639