1 /*
2  * Copyright (C) 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 // #define LOG_NDEBUG 0
18 #define LOG_TAG "GCH_ResultDispatcher"
19 #define ATRACE_TAG ATRACE_TAG_CAMERA
20 
21 #include "result_dispatcher.h"
22 
23 #include <inttypes.h>
24 #include <log/log.h>
25 #include <sys/resource.h>
26 #include <utils/Trace.h>
27 
28 #include <string>
29 #include <string_view>
30 
31 #include "hal_types.h"
32 #include "utils.h"
33 
34 namespace android {
35 namespace google_camera_hal {
36 
Create(uint32_t partial_result_count,ProcessCaptureResultFunc process_capture_result,ProcessBatchCaptureResultFunc process_batch_capture_result,NotifyFunc notify,const StreamConfiguration & stream_config,std::string_view name)37 std::unique_ptr<ResultDispatcher> ResultDispatcher::Create(
38     uint32_t partial_result_count,
39     ProcessCaptureResultFunc process_capture_result,
40     ProcessBatchCaptureResultFunc process_batch_capture_result,
41     NotifyFunc notify, const StreamConfiguration& stream_config,
42     std::string_view name) {
43   ATRACE_CALL();
44   auto dispatcher = std::make_unique<ResultDispatcher>(
45       partial_result_count, process_capture_result,
46       process_batch_capture_result, notify, stream_config, name);
47   if (dispatcher == nullptr) {
48     ALOGE("[%s] %s: Creating ResultDispatcher failed.",
49           std::string(name).c_str(), __FUNCTION__);
50     return nullptr;
51   }
52 
53   return dispatcher;
54 }
55 
ResultDispatcher(uint32_t partial_result_count,ProcessCaptureResultFunc process_capture_result,ProcessBatchCaptureResultFunc process_batch_capture_result,NotifyFunc notify,const StreamConfiguration & stream_config,std::string_view name)56 ResultDispatcher::ResultDispatcher(
57     uint32_t partial_result_count,
58     ProcessCaptureResultFunc process_capture_result,
59     ProcessBatchCaptureResultFunc process_batch_capture_result,
60     NotifyFunc notify, const StreamConfiguration& stream_config,
61     std::string_view name)
62     : kPartialResultCount(partial_result_count),
63       name_(name),
64       process_capture_result_(process_capture_result),
65       process_batch_capture_result_(process_batch_capture_result),
66       notify_(notify) {
67   ATRACE_CALL();
68   pending_shutters_ = DispatchQueue<PendingShutter>(name_, "shutter");
69   pending_early_metadata_ =
70       DispatchQueue<PendingResultMetadata>(name_, "early result metadata");
71   pending_final_metadata_ =
72       DispatchQueue<PendingResultMetadata>(name_, "final result metadata");
73 
74   notify_callback_thread_ =
75       std::thread([this] { this->NotifyCallbackThreadLoop(); });
76 
77   // Assign higher priority to reduce the preemption when CPU usage is high
78   //
79   // As from b/295977499, we need to make it realtime for priority inheritance
80   // to avoid CameraServer thread being the bottleneck
81   status_t res =
82       utils::SetRealtimeThread(notify_callback_thread_.native_handle());
83   if (res != OK) {
84     ALOGE("[%s] %s: SetRealtimeThread fail", name_.c_str(), __FUNCTION__);
85   } else {
86     ALOGI("[%s] %s: SetRealtimeThread OK", name_.c_str(), __FUNCTION__);
87   }
88   InitializeGroupStreamIdsMap(stream_config);
89 }
90 
~ResultDispatcher()91 ResultDispatcher::~ResultDispatcher() {
92   ATRACE_CALL();
93   {
94     std::unique_lock<std::mutex> lock(notify_callback_lock_);
95     notify_callback_thread_exiting_ = true;
96   }
97 
98   notify_callback_condition_.notify_one();
99   notify_callback_thread_.join();
100 }
101 
RemovePendingRequest(uint32_t frame_number)102 void ResultDispatcher::RemovePendingRequest(uint32_t frame_number) {
103   ATRACE_CALL();
104   std::lock_guard<std::mutex> lock(result_lock_);
105   RemovePendingRequestLocked(frame_number);
106 }
107 
AddPendingRequest(const CaptureRequest & pending_request)108 status_t ResultDispatcher::AddPendingRequest(
109     const CaptureRequest& pending_request) {
110   ATRACE_CALL();
111   std::lock_guard<std::mutex> lock(result_lock_);
112 
113   status_t res = AddPendingRequestLocked(pending_request);
114   if (res != OK) {
115     ALOGE("[%s] %s: Adding a pending request failed: %s(%d).", name_.c_str(),
116           __FUNCTION__, strerror(-res), res);
117     RemovePendingRequestLocked(pending_request.frame_number);
118     return res;
119   }
120 
121   return OK;
122 }
123 
AddPendingRequestLocked(const CaptureRequest & pending_request)124 status_t ResultDispatcher::AddPendingRequestLocked(
125     const CaptureRequest& pending_request) {
126   ATRACE_CALL();
127   uint32_t frame_number = pending_request.frame_number;
128   const RequestType request_type = pending_request.input_buffers.empty()
129                                        ? RequestType::kNormal
130                                        : RequestType::kReprocess;
131 
132   status_t res = pending_shutters_.AddRequest(frame_number, request_type);
133   if (res != OK) {
134     ALOGE("[%s] %s: Adding pending shutter for frame %u failed: %s(%d)",
135           name_.c_str(), __FUNCTION__, frame_number, strerror(-res), res);
136     return res;
137   }
138 
139   res = pending_early_metadata_.AddRequest(frame_number, request_type);
140   if (res != OK) {
141     ALOGE("[%s] %s: Adding pending early metadata for frame %u failed: %s(%d)",
142           name_.c_str(), __FUNCTION__, frame_number, strerror(-res), res);
143     return res;
144   }
145 
146   res = pending_final_metadata_.AddRequest(frame_number, request_type);
147   if (res != OK) {
148     ALOGE("[%s] %s: Adding pending final metadata for frame %u failed: %s(%d)",
149           name_.c_str(), __FUNCTION__, frame_number, strerror(-res), res);
150     return res;
151   }
152 
153   for (auto& buffer : pending_request.input_buffers) {
154     res = AddPendingBufferLocked(frame_number, buffer, request_type);
155     if (res != OK) {
156       ALOGE("[%s] %s: Adding pending input buffer for frame %u failed: %s(%d)",
157             name_.c_str(), __FUNCTION__, frame_number, strerror(-res), res);
158       return res;
159     }
160   }
161 
162   for (auto& buffer : pending_request.output_buffers) {
163     res = AddPendingBufferLocked(frame_number, buffer, request_type);
164     if (res != OK) {
165       ALOGE("[%s] %s: Adding pending output buffer for frame %u failed: %s(%d)",
166             name_.c_str(), __FUNCTION__, frame_number, strerror(-res), res);
167       return res;
168     }
169   }
170 
171   return OK;
172 }
173 
AddPendingBufferLocked(uint32_t frame_number,const StreamBuffer & buffer,RequestType request_type)174 status_t ResultDispatcher::AddPendingBufferLocked(uint32_t frame_number,
175                                                   const StreamBuffer& buffer,
176                                                   RequestType request_type) {
177   ATRACE_CALL();
178   StreamKey stream_key = CreateStreamKey(buffer.stream_id);
179   if (!stream_pending_buffers_map_.contains(stream_key)) {
180     stream_pending_buffers_map_[stream_key] = DispatchQueue<PendingBuffer>(
181         name_, "buffer of stream " + DumpStreamKey(stream_key));
182   }
183 
184   return stream_pending_buffers_map_[stream_key].AddRequest(frame_number,
185                                                             request_type);
186 }
187 
RemovePendingRequestLocked(uint32_t frame_number)188 void ResultDispatcher::RemovePendingRequestLocked(uint32_t frame_number) {
189   ATRACE_CALL();
190   pending_shutters_.RemoveRequest(frame_number);
191   pending_early_metadata_.RemoveRequest(frame_number);
192   pending_final_metadata_.RemoveRequest(frame_number);
193 
194   for (auto& pending_buffers : stream_pending_buffers_map_) {
195     pending_buffers.second.RemoveRequest(frame_number);
196   }
197 }
198 
AddResultImpl(std::unique_ptr<CaptureResult> result)199 status_t ResultDispatcher::AddResultImpl(std::unique_ptr<CaptureResult> result) {
200   status_t res;
201   bool failed = false;
202   uint32_t frame_number = result->frame_number;
203 
204   if (result->result_metadata != nullptr) {
205     res = AddResultMetadata(frame_number, std::move(result->result_metadata),
206                             std::move(result->physical_metadata),
207                             result->partial_result);
208     if (res != OK) {
209       ALOGE("[%s] %s: Adding result metadata failed: %s (%d)", name_.c_str(),
210             __FUNCTION__, strerror(-res), res);
211       failed = true;
212     }
213   }
214 
215   for (auto& buffer : result->output_buffers) {
216     res = AddBuffer(frame_number, buffer, /*is_input=*/false);
217     if (res != OK) {
218       ALOGE("[%s] %s: Adding an output buffer failed: %s (%d)", name_.c_str(),
219             __FUNCTION__, strerror(-res), res);
220       failed = true;
221     }
222   }
223 
224   for (auto& buffer : result->input_buffers) {
225     res = AddBuffer(frame_number, buffer, /*is_input=*/true);
226     if (res != OK) {
227       ALOGE("[%s] %s: Adding an input buffer failed: %s (%d)", name_.c_str(),
228             __FUNCTION__, strerror(-res), res);
229       failed = true;
230     }
231   }
232 
233   return failed ? UNKNOWN_ERROR : OK;
234 }
235 
AddResult(std::unique_ptr<CaptureResult> result)236 status_t ResultDispatcher::AddResult(std::unique_ptr<CaptureResult> result) {
237   ATRACE_CALL();
238   const status_t res = AddResultImpl(std::move(result));
239   {
240     std::unique_lock<std::mutex> lock(notify_callback_lock_);
241     is_result_shutter_updated_ = true;
242     notify_callback_condition_.notify_one();
243   }
244   return res;
245 }
246 
AddBatchResult(std::vector<std::unique_ptr<CaptureResult>> results)247 status_t ResultDispatcher::AddBatchResult(
248     std::vector<std::unique_ptr<CaptureResult>> results) {
249   std::optional<status_t> last_error;
250   for (auto& result : results) {
251     const status_t res = AddResultImpl(std::move(result));
252     if (res != OK) {
253       last_error = res;
254     }
255   }
256   {
257     std::unique_lock<std::mutex> lock(notify_callback_lock_);
258     is_result_shutter_updated_ = true;
259     notify_callback_condition_.notify_one();
260   }
261   return last_error.value_or(OK);
262 }
263 
AddShutter(uint32_t frame_number,int64_t timestamp_ns,int64_t readout_timestamp_ns)264 status_t ResultDispatcher::AddShutter(uint32_t frame_number,
265                                       int64_t timestamp_ns,
266                                       int64_t readout_timestamp_ns) {
267   ATRACE_CALL();
268 
269   {
270     std::lock_guard<std::mutex> lock(result_lock_);
271     status_t res = pending_shutters_.AddResult(
272         frame_number, PendingShutter{
273                           .timestamp_ns = timestamp_ns,
274                           .readout_timestamp_ns = readout_timestamp_ns,
275                           .ready = true,
276                       });
277     if (res != OK) {
278       ALOGE(
279           "[%s] %s: Failed to add shutter for frame %u , New timestamp "
280           "%" PRId64,
281           name_.c_str(), __FUNCTION__, frame_number, timestamp_ns);
282       return res;
283     }
284   }
285   {
286     std::unique_lock<std::mutex> lock(notify_callback_lock_);
287     is_result_shutter_updated_ = true;
288     notify_callback_condition_.notify_one();
289   }
290   return OK;
291 }
292 
AddError(const ErrorMessage & error)293 status_t ResultDispatcher::AddError(const ErrorMessage& error) {
294   ATRACE_CALL();
295   std::lock_guard<std::mutex> lock(result_lock_);
296   uint32_t frame_number = error.frame_number;
297   // No need to deliver the shutter message on an error
298   if (error.error_code == ErrorCode::kErrorDevice ||
299       error.error_code == ErrorCode::kErrorResult ||
300       error.error_code == ErrorCode::kErrorRequest) {
301     pending_shutters_.RemoveRequest(frame_number);
302   }
303   // No need to deliver the result metadata on a result metadata error
304   if (error.error_code == ErrorCode::kErrorResult ||
305       error.error_code == ErrorCode::kErrorRequest) {
306     pending_early_metadata_.RemoveRequest(frame_number);
307     pending_final_metadata_.RemoveRequest(frame_number);
308   }
309 
310   NotifyMessage message = {.type = MessageType::kError, .message.error = error};
311   ALOGV("[%s] %s: Notify error %u for frame %u stream %d", name_.c_str(),
312         __FUNCTION__, error.error_code, frame_number, error.error_stream_id);
313   notify_(message);
314 
315   return OK;
316 }
317 
MakeResultMetadata(uint32_t frame_number,std::unique_ptr<HalCameraMetadata> metadata,std::vector<PhysicalCameraMetadata> physical_metadata,uint32_t partial_result)318 std::unique_ptr<CaptureResult> ResultDispatcher::MakeResultMetadata(
319     uint32_t frame_number, std::unique_ptr<HalCameraMetadata> metadata,
320     std::vector<PhysicalCameraMetadata> physical_metadata,
321     uint32_t partial_result) {
322   ATRACE_CALL();
323   auto result = std::make_unique<CaptureResult>(CaptureResult({}));
324   result->frame_number = frame_number;
325   result->result_metadata = std::move(metadata);
326   result->physical_metadata = std::move(physical_metadata);
327   result->partial_result = partial_result;
328   return result;
329 }
330 
AddResultMetadata(uint32_t frame_number,std::unique_ptr<HalCameraMetadata> metadata,std::vector<PhysicalCameraMetadata> physical_metadata,uint32_t partial_result)331 status_t ResultDispatcher::AddResultMetadata(
332     uint32_t frame_number, std::unique_ptr<HalCameraMetadata> metadata,
333     std::vector<PhysicalCameraMetadata> physical_metadata,
334     uint32_t partial_result) {
335   ATRACE_CALL();
336   if (metadata == nullptr) {
337     ALOGE("[%s] %s: metadata is nullptr.", name_.c_str(), __FUNCTION__);
338     return BAD_VALUE;
339   }
340 
341   if (partial_result > kPartialResultCount) {
342     ALOGE(
343         "[%s] %s: partial_result %u cannot be larger than partial result count "
344         "%u",
345         name_.c_str(), __FUNCTION__, partial_result, kPartialResultCount);
346     return BAD_VALUE;
347   }
348 
349   std::lock_guard<std::mutex> lock(result_lock_);
350   DispatchQueue<PendingResultMetadata>& queue =
351       partial_result < kPartialResultCount ? pending_early_metadata_
352                                            : pending_final_metadata_;
353   return queue.AddResult(frame_number,
354                          PendingResultMetadata{
355                              .metadata = std::move(metadata),
356                              .physical_metadata = std::move(physical_metadata),
357                              .partial_result_count = partial_result,
358                              .ready = true,
359                          });
360 }
361 
AddBuffer(uint32_t frame_number,StreamBuffer buffer,bool is_input)362 status_t ResultDispatcher::AddBuffer(uint32_t frame_number, StreamBuffer buffer,
363                                      bool is_input) {
364   ATRACE_CALL();
365   std::lock_guard<std::mutex> lock(result_lock_);
366 
367   StreamKey stream_key = CreateStreamKey(buffer.stream_id);
368   auto pending_buffers_it = stream_pending_buffers_map_.find(stream_key);
369   if (pending_buffers_it == stream_pending_buffers_map_.end()) {
370     ALOGE("[%s] %s: Cannot find the pending buffer for stream %s",
371           name_.c_str(), __FUNCTION__, DumpStreamKey(stream_key).c_str());
372     return NAME_NOT_FOUND;
373   }
374 
375   return pending_buffers_it->second.AddResult(frame_number,
376                                               PendingBuffer{
377                                                   .buffer = buffer,
378                                                   .is_input = is_input,
379                                                   .ready = true,
380                                               });
381 }
382 
NotifyCallbackThreadLoop()383 void ResultDispatcher::NotifyCallbackThreadLoop() {
384   // '\0' counts toward the 16-character restriction.
385   constexpr int kPthreadNameLenMinusOne = 16 - 1;
386   pthread_setname_np(
387       pthread_self(),
388       name_.substr(/*pos=*/0, /*count=*/kPthreadNameLenMinusOne).c_str());
389 
390   while (1) {
391     NotifyShutters();
392     NotifyResultMetadata();
393     NotifyBuffers();
394 
395     std::unique_lock<std::mutex> lock(notify_callback_lock_);
396     if (notify_callback_thread_exiting_) {
397       ALOGV("[%s] %s: NotifyCallbackThreadLoop exits.", name_.c_str(),
398             __FUNCTION__);
399       return;
400     }
401     if (!is_result_shutter_updated_) {
402       if (notify_callback_condition_.wait_for(
403               lock, std::chrono::milliseconds(kCallbackThreadTimeoutMs)) ==
404           std::cv_status::timeout) {
405         PrintTimeoutMessages();
406       }
407     }
408     is_result_shutter_updated_ = false;
409   }
410 }
411 
PrintTimeoutMessages()412 void ResultDispatcher::PrintTimeoutMessages() {
413   std::lock_guard<std::mutex> lock(result_lock_);
414   pending_shutters_.PrintTimeoutMessages();
415   pending_early_metadata_.PrintTimeoutMessages();
416   pending_final_metadata_.PrintTimeoutMessages();
417 
418   for (auto& [stream_key, pending_buffers] : stream_pending_buffers_map_) {
419     pending_buffers.PrintTimeoutMessages();
420   }
421 }
422 
InitializeGroupStreamIdsMap(const StreamConfiguration & stream_config)423 void ResultDispatcher::InitializeGroupStreamIdsMap(
424     const StreamConfiguration& stream_config) {
425   std::lock_guard<std::mutex> lock(result_lock_);
426   for (const auto& stream : stream_config.streams) {
427     if (stream.group_id != -1) {
428       group_stream_map_[stream.id] = stream.group_id;
429     }
430   }
431 }
432 
CreateStreamKey(int32_t stream_id) const433 ResultDispatcher::StreamKey ResultDispatcher::CreateStreamKey(
434     int32_t stream_id) const {
435   if (group_stream_map_.count(stream_id) == 0) {
436     return StreamKey(stream_id, StreamKeyType::kSingleStream);
437   } else {
438     return StreamKey(group_stream_map_.at(stream_id),
439                      StreamKeyType::kGroupStream);
440   }
441 }
442 
DumpStreamKey(const StreamKey & stream_key) const443 std::string ResultDispatcher::DumpStreamKey(const StreamKey& stream_key) const {
444   switch (stream_key.second) {
445     case StreamKeyType::kSingleStream:
446       return std::to_string(stream_key.first);
447     case StreamKeyType::kGroupStream:
448       return "group " + std::to_string(stream_key.first);
449     default:
450       return "Invalid stream key type";
451   }
452 }
453 
NotifyShutters()454 void ResultDispatcher::NotifyShutters() {
455   ATRACE_CALL();
456   NotifyMessage message = {};
457   // TODO: b/347771898 - Update to not depend on running faster than data is
458   // ready
459   while (true) {
460     uint32_t frame_number = 0;
461     PendingShutter pending_shutter;
462     std::lock_guard<std::mutex> lock(result_lock_);
463     if (pending_shutters_.GetReadyData(frame_number, pending_shutter) != OK) {
464       break;
465     }
466     message.type = MessageType::kShutter;
467     message.message.shutter.frame_number = frame_number;
468     message.message.shutter.timestamp_ns = pending_shutter.timestamp_ns;
469     message.message.shutter.readout_timestamp_ns =
470         pending_shutter.readout_timestamp_ns;
471     ALOGV("[%s] %s: Notify shutter for frame %u timestamp %" PRIu64
472           " readout_timestamp %" PRIu64,
473           name_.c_str(), __FUNCTION__, message.message.shutter.frame_number,
474           message.message.shutter.timestamp_ns,
475           message.message.shutter.readout_timestamp_ns);
476     notify_(message);
477   }
478 }
479 
NotifyCaptureResults(std::vector<std::unique_ptr<CaptureResult>> results)480 void ResultDispatcher::NotifyCaptureResults(
481     std::vector<std::unique_ptr<CaptureResult>> results) {
482   ATRACE_CALL();
483   std::lock_guard<std::mutex> lock(process_capture_result_lock_);
484   if (process_batch_capture_result_ != nullptr) {
485     process_batch_capture_result_(std::move(results));
486   } else {
487     for (auto& result : results) {
488       process_capture_result_(std::move(result));
489     }
490   }
491 }
492 
NotifyResultMetadata()493 void ResultDispatcher::NotifyResultMetadata() {
494   ATRACE_CALL();
495   uint32_t frame_number = 0;
496   std::vector<std::unique_ptr<CaptureResult>> early_results;
497   std::vector<std::unique_ptr<CaptureResult>> final_results;
498   PendingResultMetadata early_result_metadata;
499   PendingResultMetadata final_result_metadata;
500   // TODO: b/347771898 - Assess if notify can hold the lock for less time
501   {
502     std::lock_guard<std::mutex> lock(result_lock_);
503     while (pending_early_metadata_.GetReadyData(frame_number,
504                                                 early_result_metadata) == OK) {
505       ALOGV("[%s] %s: Notify early metadata for frame %u", name_.c_str(),
506             __FUNCTION__, frame_number);
507       early_results.push_back(MakeResultMetadata(
508           frame_number, std::move(early_result_metadata.metadata),
509           std::move(early_result_metadata.physical_metadata),
510           early_result_metadata.partial_result_count));
511     }
512 
513     while (pending_final_metadata_.GetReadyData(frame_number,
514                                                 final_result_metadata) == OK) {
515       ALOGV("[%s] %s: Notify final metadata for frame %u", name_.c_str(),
516             __FUNCTION__, frame_number);
517       // Removes the pending early metadata if it exists, in case the HAL only
518       // sent the final metadata
519       pending_early_metadata_.RemoveRequest(frame_number);
520 
521       final_results.push_back(MakeResultMetadata(
522           frame_number, std::move(final_result_metadata.metadata),
523           std::move(final_result_metadata.physical_metadata),
524           final_result_metadata.partial_result_count));
525     }
526   }
527   if (!early_results.empty()) {
528     NotifyCaptureResults(std::move(early_results));
529   }
530   if (!final_results.empty()) {
531     NotifyCaptureResults(std::move(final_results));
532   }
533 }
534 
GetReadyBufferResult(std::unique_ptr<CaptureResult> * result)535 status_t ResultDispatcher::GetReadyBufferResult(
536     std::unique_ptr<CaptureResult>* result) {
537   ATRACE_CALL();
538   std::lock_guard<std::mutex> lock(result_lock_);
539   if (result == nullptr) {
540     ALOGE("[%s] %s: result is nullptr.", name_.c_str(), __FUNCTION__);
541     return BAD_VALUE;
542   }
543 
544   *result = nullptr;
545 
546   for (auto& pending_buffers : stream_pending_buffers_map_) {
547     uint32_t frame_number = 0;
548     PendingBuffer buffer_data;
549     if (pending_buffers.second.GetReadyData(frame_number, buffer_data) == OK) {
550       std::unique_ptr<CaptureResult> buffer_result =
551           std::make_unique<CaptureResult>(CaptureResult({}));
552 
553       buffer_result->frame_number = frame_number;
554       if (buffer_data.is_input) {
555         buffer_result->input_buffers.push_back(buffer_data.buffer);
556       } else {
557         buffer_result->output_buffers.push_back(buffer_data.buffer);
558       }
559       *result = std::move(buffer_result);
560       return OK;
561     }
562   }
563 
564   return NAME_NOT_FOUND;
565 }
566 
NotifyBuffers()567 void ResultDispatcher::NotifyBuffers() {
568   ATRACE_CALL();
569   std::vector<std::unique_ptr<CaptureResult>> results;
570   std::unique_ptr<CaptureResult> result;
571 
572   // TODO: b/347771898 - Update to not depend on running faster than data is
573   // ready
574   while (GetReadyBufferResult(&result) == OK) {
575     if (result == nullptr) {
576       ALOGE("[%s] %s: result is nullptr", name_.c_str(), __FUNCTION__);
577       return;
578     }
579     ALOGV("[%s] %s: Notify Buffer for frame %u", name_.c_str(), __FUNCTION__,
580           result->frame_number);
581     results.push_back(std::move(result));
582   }
583   if (!results.empty()) {
584     NotifyCaptureResults(std::move(results));
585   }
586 }
587 
588 template <typename FrameData>
DispatchQueue(std::string_view dispatcher_name,std::string_view data_name)589 ResultDispatcher::DispatchQueue<FrameData>::DispatchQueue(
590     std::string_view dispatcher_name, std::string_view data_name)
591     : dispatcher_name_(dispatcher_name), data_name_(data_name) {
592 }
593 
594 template <typename FrameData>
AddRequest(uint32_t frame_number,RequestType request_type)595 status_t ResultDispatcher::DispatchQueue<FrameData>::AddRequest(
596     uint32_t frame_number, RequestType request_type) {
597   if (normal_request_map_.contains(frame_number) ||
598       reprocess_request_map_.contains(frame_number)) {
599     ALOGE("[%s] %s: Pending %s for frame %u already exists.",
600           std::string(dispatcher_name_).c_str(), __FUNCTION__,
601           data_name_.c_str(), frame_number);
602     return ALREADY_EXISTS;
603   }
604   if (request_type == RequestType::kNormal) {
605     normal_request_map_[frame_number] = FrameData();
606   } else {
607     reprocess_request_map_[frame_number] = FrameData();
608   }
609   return OK;
610 }
611 
612 template <typename FrameData>
RemoveRequest(uint32_t frame_number)613 void ResultDispatcher::DispatchQueue<FrameData>::RemoveRequest(
614     uint32_t frame_number) {
615   normal_request_map_.erase(frame_number);
616   reprocess_request_map_.erase(frame_number);
617 }
618 
619 template <typename FrameData>
AddResult(uint32_t frame_number,FrameData result)620 status_t ResultDispatcher::DispatchQueue<FrameData>::AddResult(
621     uint32_t frame_number, FrameData result) {
622   auto it = normal_request_map_.find(frame_number);
623   if (it == normal_request_map_.end()) {
624     it = reprocess_request_map_.find(frame_number);
625     if (it == reprocess_request_map_.end()) {
626       ALOGE("[%s] %s: Cannot find the pending %s for frame %u",
627             std::string(dispatcher_name_).c_str(), __FUNCTION__,
628             data_name_.c_str(), frame_number);
629       return NAME_NOT_FOUND;
630     }
631   }
632 
633   if (it->second.ready) {
634     ALOGE("[%s] %s: Already received %s for frame %u",
635           std::string(dispatcher_name_).c_str(), __FUNCTION__,
636           data_name_.c_str(), frame_number);
637     return ALREADY_EXISTS;
638   }
639 
640   it->second = std::move(result);
641   return OK;
642 }
643 
644 template <typename FrameData>
GetReadyData(uint32_t & frame_number,FrameData & ready_data)645 status_t ResultDispatcher::DispatchQueue<FrameData>::GetReadyData(
646     uint32_t& frame_number, FrameData& ready_data) {
647   auto it = normal_request_map_.begin();
648   if (it != normal_request_map_.end() && it->second.ready) {
649     frame_number = it->first;
650     ready_data = std::move(it->second);
651     normal_request_map_.erase(it);
652     return OK;
653   }
654 
655   it = reprocess_request_map_.begin();
656   if (it != reprocess_request_map_.end() && it->second.ready) {
657     frame_number = it->first;
658     ready_data = std::move(it->second);
659     reprocess_request_map_.erase(it);
660     return OK;
661   }
662   // The first pending data is not ready
663   return NAME_NOT_FOUND;
664 }
665 
666 template <typename FrameData>
PrintTimeoutMessages()667 void ResultDispatcher::DispatchQueue<FrameData>::PrintTimeoutMessages() {
668   for (auto& [frame_number, pending_data] : normal_request_map_) {
669     ALOGW("[%s] %s: pending %s for frame %u ready %d",
670           std::string(dispatcher_name_).c_str(), __FUNCTION__,
671           data_name_.c_str(), frame_number, pending_data.ready);
672   }
673   for (auto& [frame_number, pending_data] : reprocess_request_map_) {
674     ALOGW("[%s] %s: pending %s for frame %u ready %d",
675           std::string(dispatcher_name_).c_str(), __FUNCTION__,
676           data_name_.c_str(), frame_number, pending_data.ready);
677   }
678 }
679 template class ResultDispatcher::DispatchQueue<ResultDispatcher::PendingShutter>;
680 template class ResultDispatcher::DispatchQueue<ResultDispatcher::PendingBuffer>;
681 template class ResultDispatcher::DispatchQueue<
682     ResultDispatcher::PendingResultMetadata>;
683 
684 }  // namespace google_camera_hal
685 }  // namespace android
686