xref: /aosp_15_r20/frameworks/native/libs/input/InputConsumerNoResampling.cpp (revision 38e8c45f13ce32b0dcecb25141ffecaf386fa17f)
1 /**
2  * Copyright 2024 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define LOG_TAG "InputConsumerNoResampling"
18 #define ATRACE_TAG ATRACE_TAG_INPUT
19 
20 #include <inttypes.h>
21 
22 #include <android-base/logging.h>
23 #include <android-base/properties.h>
24 #include <android-base/stringprintf.h>
25 #include <cutils/properties.h>
26 #include <ftl/enum.h>
27 #include <utils/Trace.h>
28 
29 #include <com_android_input_flags.h>
30 #include <input/InputConsumerNoResampling.h>
31 #include <input/PrintTools.h>
32 #include <input/TraceTools.h>
33 
34 namespace android {
35 
36 namespace {
37 
38 using std::chrono::nanoseconds;
39 
40 using android::base::Result;
41 
42 /**
43  * Log debug messages relating to the consumer end of the transport channel.
44  * Enable this via "adb shell setprop log.tag.InputTransportConsumer DEBUG" (requires restart)
45  */
46 const bool DEBUG_TRANSPORT_CONSUMER =
47         __android_log_is_loggable(ANDROID_LOG_DEBUG, LOG_TAG "Consumer", ANDROID_LOG_INFO);
48 
createKeyEvent(const InputMessage & msg)49 std::unique_ptr<KeyEvent> createKeyEvent(const InputMessage& msg) {
50     std::unique_ptr<KeyEvent> event = std::make_unique<KeyEvent>();
51     event->initialize(msg.body.key.eventId, msg.body.key.deviceId, msg.body.key.source,
52                       ui::LogicalDisplayId{msg.body.key.displayId}, msg.body.key.hmac,
53                       msg.body.key.action, msg.body.key.flags, msg.body.key.keyCode,
54                       msg.body.key.scanCode, msg.body.key.metaState, msg.body.key.repeatCount,
55                       msg.body.key.downTime, msg.body.key.eventTime);
56     return event;
57 }
58 
createFocusEvent(const InputMessage & msg)59 std::unique_ptr<FocusEvent> createFocusEvent(const InputMessage& msg) {
60     std::unique_ptr<FocusEvent> event = std::make_unique<FocusEvent>();
61     event->initialize(msg.body.focus.eventId, msg.body.focus.hasFocus);
62     return event;
63 }
64 
createCaptureEvent(const InputMessage & msg)65 std::unique_ptr<CaptureEvent> createCaptureEvent(const InputMessage& msg) {
66     std::unique_ptr<CaptureEvent> event = std::make_unique<CaptureEvent>();
67     event->initialize(msg.body.capture.eventId, msg.body.capture.pointerCaptureEnabled);
68     return event;
69 }
70 
createDragEvent(const InputMessage & msg)71 std::unique_ptr<DragEvent> createDragEvent(const InputMessage& msg) {
72     std::unique_ptr<DragEvent> event = std::make_unique<DragEvent>();
73     event->initialize(msg.body.drag.eventId, msg.body.drag.x, msg.body.drag.y,
74                       msg.body.drag.isExiting);
75     return event;
76 }
77 
createMotionEvent(const InputMessage & msg)78 std::unique_ptr<MotionEvent> createMotionEvent(const InputMessage& msg) {
79     std::unique_ptr<MotionEvent> event = std::make_unique<MotionEvent>();
80     const uint32_t pointerCount = msg.body.motion.pointerCount;
81     std::vector<PointerProperties> pointerProperties;
82     pointerProperties.reserve(pointerCount);
83     std::vector<PointerCoords> pointerCoords;
84     pointerCoords.reserve(pointerCount);
85     for (uint32_t i = 0; i < pointerCount; i++) {
86         pointerProperties.push_back(msg.body.motion.pointers[i].properties);
87         pointerCoords.push_back(msg.body.motion.pointers[i].coords);
88     }
89 
90     ui::Transform transform;
91     transform.set({msg.body.motion.dsdx, msg.body.motion.dtdx, msg.body.motion.tx,
92                    msg.body.motion.dtdy, msg.body.motion.dsdy, msg.body.motion.ty, 0, 0, 1});
93     ui::Transform displayTransform;
94     displayTransform.set({msg.body.motion.dsdxRaw, msg.body.motion.dtdxRaw, msg.body.motion.txRaw,
95                           msg.body.motion.dtdyRaw, msg.body.motion.dsdyRaw, msg.body.motion.tyRaw,
96                           0, 0, 1});
97     event->initialize(msg.body.motion.eventId, msg.body.motion.deviceId, msg.body.motion.source,
98                       ui::LogicalDisplayId{msg.body.motion.displayId}, msg.body.motion.hmac,
99                       msg.body.motion.action, msg.body.motion.actionButton, msg.body.motion.flags,
100                       msg.body.motion.edgeFlags, msg.body.motion.metaState,
101                       msg.body.motion.buttonState, msg.body.motion.classification, transform,
102                       msg.body.motion.xPrecision, msg.body.motion.yPrecision,
103                       msg.body.motion.xCursorPosition, msg.body.motion.yCursorPosition,
104                       displayTransform, msg.body.motion.downTime, msg.body.motion.eventTime,
105                       pointerCount, pointerProperties.data(), pointerCoords.data());
106     return event;
107 }
108 
addSample(MotionEvent & event,const InputMessage & msg)109 void addSample(MotionEvent& event, const InputMessage& msg) {
110     uint32_t pointerCount = msg.body.motion.pointerCount;
111     std::vector<PointerCoords> pointerCoords;
112     pointerCoords.reserve(pointerCount);
113     for (uint32_t i = 0; i < pointerCount; i++) {
114         pointerCoords.push_back(msg.body.motion.pointers[i].coords);
115     }
116 
117     // TODO(b/329770983): figure out if it's safe to combine events with mismatching metaState
118     event.setMetaState(event.getMetaState() | msg.body.motion.metaState);
119     event.addSample(msg.body.motion.eventTime, pointerCoords.data(), msg.body.motion.eventId);
120 }
121 
createTouchModeEvent(const InputMessage & msg)122 std::unique_ptr<TouchModeEvent> createTouchModeEvent(const InputMessage& msg) {
123     std::unique_ptr<TouchModeEvent> event = std::make_unique<TouchModeEvent>();
124     event->initialize(msg.body.touchMode.eventId, msg.body.touchMode.isInTouchMode);
125     return event;
126 }
127 
outboundMessageToString(const InputMessage & outboundMsg)128 std::string outboundMessageToString(const InputMessage& outboundMsg) {
129     switch (outboundMsg.header.type) {
130         case InputMessage::Type::FINISHED: {
131             return android::base::StringPrintf("  Finish: seq=%" PRIu32 " handled=%s",
132                                                outboundMsg.header.seq,
133                                                toString(outboundMsg.body.finished.handled));
134         }
135         case InputMessage::Type::TIMELINE: {
136             return android::base::
137                     StringPrintf("  Timeline: inputEventId=%" PRId32 " gpuCompletedTime=%" PRId64
138                                  ", presentTime=%" PRId64,
139                                  outboundMsg.body.timeline.eventId,
140                                  outboundMsg.body.timeline
141                                          .graphicsTimeline[GraphicsTimeline::GPU_COMPLETED_TIME],
142                                  outboundMsg.body.timeline
143                                          .graphicsTimeline[GraphicsTimeline::PRESENT_TIME]);
144         }
145         default: {
146             LOG(FATAL) << "Outbound message must be FINISHED or TIMELINE, got "
147                        << ftl::enum_string(outboundMsg.header.type);
148             return "Unreachable";
149         }
150     }
151 }
152 
createFinishedMessage(uint32_t seq,bool handled,nsecs_t consumeTime)153 InputMessage createFinishedMessage(uint32_t seq, bool handled, nsecs_t consumeTime) {
154     InputMessage msg;
155     msg.header.type = InputMessage::Type::FINISHED;
156     msg.header.seq = seq;
157     msg.body.finished.handled = handled;
158     msg.body.finished.consumeTime = consumeTime;
159     return msg;
160 }
161 
createTimelineMessage(int32_t inputEventId,nsecs_t gpuCompletedTime,nsecs_t presentTime)162 InputMessage createTimelineMessage(int32_t inputEventId, nsecs_t gpuCompletedTime,
163                                    nsecs_t presentTime) {
164     InputMessage msg;
165     msg.header.type = InputMessage::Type::TIMELINE;
166     msg.header.seq = 0;
167     msg.body.timeline.eventId = inputEventId;
168     msg.body.timeline.graphicsTimeline[GraphicsTimeline::GPU_COMPLETED_TIME] = gpuCompletedTime;
169     msg.body.timeline.graphicsTimeline[GraphicsTimeline::PRESENT_TIME] = presentTime;
170     return msg;
171 }
172 
operator <<(std::ostream & out,const InputMessage & msg)173 std::ostream& operator<<(std::ostream& out, const InputMessage& msg) {
174     out << ftl::enum_string(msg.header.type);
175     return out;
176 }
177 
178 } // namespace
179 
180 // --- InputConsumerNoResampling ---
181 
InputConsumerNoResampling(const std::shared_ptr<InputChannel> & channel,sp<Looper> looper,InputConsumerCallbacks & callbacks,std::function<std::unique_ptr<Resampler> ()> resamplerCreator)182 InputConsumerNoResampling::InputConsumerNoResampling(
183         const std::shared_ptr<InputChannel>& channel, sp<Looper> looper,
184         InputConsumerCallbacks& callbacks,
185         std::function<std::unique_ptr<Resampler>()> resamplerCreator)
186       : mChannel{channel},
187         mLooper{looper},
188         mCallbacks{callbacks},
189         mResamplerCreator{std::move(resamplerCreator)},
190         mFdEvents(0) {
191     LOG_ALWAYS_FATAL_IF(mLooper == nullptr);
192     mCallback = sp<LooperEventCallback>::make(
193             std::bind(&InputConsumerNoResampling::handleReceiveCallback, this,
194                       std::placeholders::_1));
195     // In the beginning, there are no pending outbounds events; we only care about receiving
196     // incoming data.
197     setFdEvents(ALOOPER_EVENT_INPUT);
198 }
199 
~InputConsumerNoResampling()200 InputConsumerNoResampling::~InputConsumerNoResampling() {
201     ensureCalledOnLooperThread(__func__);
202     // If there are any remaining unread batches, send an ack for them and don't deliver
203     // them to callbacks.
204     for (auto& [_, batches] : mBatches) {
205         while (!batches.empty()) {
206             finishInputEvent(batches.front().header.seq, /*handled=*/false);
207             batches.pop();
208         }
209     }
210 
211     while (!mOutboundQueue.empty()) {
212         processOutboundEvents();
213         // This is our last chance to ack the events. If we don't ack them here, we will get an ANR,
214         // so keep trying to send the events as long as they are present in the queue.
215     }
216     // However, it is still up to the app to finish any events that have already been delivered
217     // to the callbacks. If we wanted to change that behaviour and auto-finish all unfinished events
218     // that were already sent to callbacks, we could potentially loop through "mConsumeTimes"
219     // instead. We can't use "mBatchedSequenceNumbers" for this purpose, because it only contains
220     // batchable (i.e., ACTION_MOVE) events that were sent to the callbacks.
221     const size_t unfinishedEvents = mConsumeTimes.size();
222     LOG_IF(INFO, unfinishedEvents != 0)
223             << getName() << " has " << unfinishedEvents << " unfinished event(s)";
224     // Remove the fd from epoll, so that Looper does not call 'handleReceiveCallback' anymore.
225     // This must be done at the end of the destructor; otherwise, some of the other functions may
226     // call 'setFdEvents' as a side-effect, thus adding the fd back to the epoll set of the looper.
227     setFdEvents(0);
228 }
229 
handleReceiveCallback(int events)230 int InputConsumerNoResampling::handleReceiveCallback(int events) {
231     // Allowed return values of this function as documented in LooperCallback::handleEvent
232     constexpr int REMOVE_CALLBACK = 0;
233     constexpr int KEEP_CALLBACK = 1;
234 
235     if (events & (ALOOPER_EVENT_ERROR | ALOOPER_EVENT_HANGUP)) {
236         // This error typically occurs when the publisher has closed the input channel
237         // as part of removing a window or finishing an IME session, in which case
238         // the consumer will soon be disposed as well.
239         if (DEBUG_TRANSPORT_CONSUMER) {
240             LOG(INFO) << "The channel was hung up or an error occurred: " << mChannel->getName();
241         }
242         return REMOVE_CALLBACK;
243     }
244 
245     int handledEvents = 0;
246     if (events & ALOOPER_EVENT_INPUT) {
247         handleMessages(readAllMessages());
248         handledEvents |= ALOOPER_EVENT_INPUT;
249     }
250 
251     if (events & ALOOPER_EVENT_OUTPUT) {
252         processOutboundEvents();
253         handledEvents |= ALOOPER_EVENT_OUTPUT;
254     }
255     if (handledEvents != events) {
256         LOG(FATAL) << "Mismatch: handledEvents=" << handledEvents << ", events=" << events;
257     }
258     return KEEP_CALLBACK;
259 }
260 
processOutboundEvents()261 void InputConsumerNoResampling::processOutboundEvents() {
262     while (!mOutboundQueue.empty()) {
263         const InputMessage& outboundMsg = mOutboundQueue.front();
264 
265         const status_t result = mChannel->sendMessage(&outboundMsg);
266         if (result == OK) {
267             if (outboundMsg.header.type == InputMessage::Type::FINISHED) {
268                 ATRACE_ASYNC_END("InputConsumer processing", /*cookie=*/outboundMsg.header.seq);
269             }
270             // Successful send. Erase the entry and keep trying to send more
271             mOutboundQueue.pop();
272             continue;
273         }
274 
275         // Publisher is busy, try again later. Keep this entry (do not erase)
276         if (result == WOULD_BLOCK) {
277             setFdEvents(ALOOPER_EVENT_INPUT | ALOOPER_EVENT_OUTPUT);
278             return; // try again later
279         }
280 
281         if (result == DEAD_OBJECT) {
282             // If there's no one to receive events in the channel, there's no point in sending them.
283             // Drop all outbound events.
284             LOG(INFO) << "Channel " << mChannel->getName() << " died. Dropping outbound event "
285                       << outboundMsg;
286             mOutboundQueue.pop();
287             setFdEvents(0);
288             continue;
289         }
290         // Some other error. Give up
291         LOG(FATAL) << "Failed to send outbound event on channel '" << mChannel->getName()
292                    << "'.  status=" << statusToString(result) << "(" << result << ")";
293     }
294 
295     // The queue is now empty. Tell looper there's no more output to expect.
296     setFdEvents(ALOOPER_EVENT_INPUT);
297 }
298 
finishInputEvent(uint32_t seq,bool handled)299 void InputConsumerNoResampling::finishInputEvent(uint32_t seq, bool handled) {
300     ensureCalledOnLooperThread(__func__);
301     mOutboundQueue.push(createFinishedMessage(seq, handled, popConsumeTime(seq)));
302     // also produce finish events for all batches for this seq (if any)
303     const auto it = mBatchedSequenceNumbers.find(seq);
304     if (it != mBatchedSequenceNumbers.end()) {
305         for (uint32_t subSeq : it->second) {
306             mOutboundQueue.push(createFinishedMessage(subSeq, handled, popConsumeTime(subSeq)));
307         }
308         mBatchedSequenceNumbers.erase(it);
309     }
310     processOutboundEvents();
311 }
312 
probablyHasInput() const313 bool InputConsumerNoResampling::probablyHasInput() const {
314     // Ideally, this would only be allowed to run on the looper thread, and in production, it will.
315     // However, for testing, it's convenient to call this while the looper thread is blocked, so
316     // we do not call ensureCalledOnLooperThread here.
317     return (!mBatches.empty()) || mChannel->probablyHasInput();
318 }
319 
reportTimeline(int32_t inputEventId,nsecs_t gpuCompletedTime,nsecs_t presentTime)320 void InputConsumerNoResampling::reportTimeline(int32_t inputEventId, nsecs_t gpuCompletedTime,
321                                                nsecs_t presentTime) {
322     ensureCalledOnLooperThread(__func__);
323     mOutboundQueue.push(createTimelineMessage(inputEventId, gpuCompletedTime, presentTime));
324     processOutboundEvents();
325 }
326 
popConsumeTime(uint32_t seq)327 nsecs_t InputConsumerNoResampling::popConsumeTime(uint32_t seq) {
328     auto it = mConsumeTimes.find(seq);
329     // Consume time will be missing if either 'finishInputEvent' is called twice, or if it was
330     // called for the wrong (synthetic?) input event. Either way, it is a bug that should be fixed.
331     LOG_ALWAYS_FATAL_IF(it == mConsumeTimes.end(), "Could not find consume time for seq=%" PRIu32,
332                         seq);
333     nsecs_t consumeTime = it->second;
334     mConsumeTimes.erase(it);
335     return consumeTime;
336 }
337 
setFdEvents(int events)338 void InputConsumerNoResampling::setFdEvents(int events) {
339     if (mFdEvents != events) {
340         mFdEvents = events;
341         if (events != 0) {
342             mLooper->addFd(mChannel->getFd(), 0, events, mCallback, nullptr);
343         } else {
344             mLooper->removeFd(mChannel->getFd());
345         }
346     }
347 }
348 
handleMessages(std::vector<InputMessage> && messages)349 void InputConsumerNoResampling::handleMessages(std::vector<InputMessage>&& messages) {
350     for (const InputMessage& msg : messages) {
351         if (msg.header.type == InputMessage::Type::MOTION) {
352             const int32_t action = msg.body.motion.action;
353             const DeviceId deviceId = msg.body.motion.deviceId;
354             const int32_t source = msg.body.motion.source;
355             const bool batchableEvent = (action == AMOTION_EVENT_ACTION_MOVE ||
356                                          action == AMOTION_EVENT_ACTION_HOVER_MOVE) &&
357                     (isFromSource(source, AINPUT_SOURCE_CLASS_POINTER) ||
358                      isFromSource(source, AINPUT_SOURCE_CLASS_JOYSTICK));
359 
360             const bool canResample = (mResamplerCreator != nullptr) &&
361                     (isFromSource(source, AINPUT_SOURCE_CLASS_POINTER));
362             if (canResample) {
363                 if (action == AMOTION_EVENT_ACTION_DOWN) {
364                     if (std::unique_ptr<Resampler> resampler = mResamplerCreator();
365                         resampler != nullptr) {
366                         const auto [_, inserted] =
367                                 mResamplers.insert(std::pair(deviceId, std::move(resampler)));
368                         LOG_IF(WARNING, !inserted) << deviceId << "already exists in mResamplers";
369                     }
370                 }
371             }
372 
373             if (batchableEvent) {
374                 // add it to batch
375                 mBatches[deviceId].emplace(msg);
376             } else {
377                 // consume all pending batches for this device immediately
378                 consumeBatchedInputEvents(deviceId, /*requestedFrameTime=*/
379                                           std::numeric_limits<nsecs_t>::max());
380                 if (canResample &&
381                     (action == AMOTION_EVENT_ACTION_UP || action == AMOTION_EVENT_ACTION_CANCEL)) {
382                     LOG_IF(INFO, mResamplers.erase(deviceId) == 0)
383                             << deviceId << "does not exist in mResamplers";
384                 }
385                 handleMessage(msg);
386             }
387         } else {
388             // Non-motion events shouldn't force the consumption of pending batched events
389             handleMessage(msg);
390         }
391     }
392     // At the end of this, if we still have pending batches, notify the receiver about it.
393 
394     // We need to carefully notify the InputConsumerCallbacks about the pending batch. The receiver
395     // could choose to consume all events when notified about the batch. That means that the
396     // "mBatches" variable could change when 'InputConsumerCallbacks::onBatchedInputEventPending' is
397     // invoked. We also can't notify the InputConsumerCallbacks in a while loop until mBatches is
398     // empty, because the receiver could choose to not consume the batch immediately.
399     std::set<int32_t> pendingBatchSources;
400     for (const auto& [_, pendingMessages] : mBatches) {
401         // Assume that all messages for a given device has the same source.
402         pendingBatchSources.insert(pendingMessages.front().body.motion.source);
403     }
404     for (const int32_t source : pendingBatchSources) {
405         const bool sourceStillRemaining =
406                 std::any_of(mBatches.begin(), mBatches.end(), [=](const auto& pair) {
407                     return pair.second.front().body.motion.source == source;
408                 });
409         if (sourceStillRemaining) {
410             mCallbacks.onBatchedInputEventPending(source);
411         }
412     }
413 }
414 
readAllMessages()415 std::vector<InputMessage> InputConsumerNoResampling::readAllMessages() {
416     std::vector<InputMessage> messages;
417     while (true) {
418         android::base::Result<InputMessage> result = mChannel->receiveMessage();
419         if (result.ok()) {
420             const InputMessage& msg = *result;
421             const auto [_, inserted] =
422                     mConsumeTimes.emplace(msg.header.seq, systemTime(SYSTEM_TIME_MONOTONIC));
423             LOG_ALWAYS_FATAL_IF(!inserted, "Already have a consume time for seq=%" PRIu32,
424                                 msg.header.seq);
425 
426             // Trace the event processing timeline - event was just read from the socket
427             // TODO(b/329777420): distinguish between multiple instances of InputConsumer
428             // in the same process.
429             ATRACE_ASYNC_BEGIN("InputConsumer processing", /*cookie=*/msg.header.seq);
430             messages.push_back(msg);
431         } else { // !result.ok()
432             switch (result.error().code()) {
433                 case WOULD_BLOCK: {
434                     return messages;
435                 }
436                 case DEAD_OBJECT: {
437                     LOG(FATAL) << "Got a dead object for " << mChannel->getName();
438                     break;
439                 }
440                 case BAD_VALUE: {
441                     LOG(FATAL) << "Got a bad value for " << mChannel->getName();
442                     break;
443                 }
444                 default: {
445                     LOG(FATAL) << "Unexpected error: " << result.error().message();
446                     break;
447                 }
448             }
449         }
450     }
451 }
452 
handleMessage(const InputMessage & msg) const453 void InputConsumerNoResampling::handleMessage(const InputMessage& msg) const {
454     switch (msg.header.type) {
455         case InputMessage::Type::KEY: {
456             std::unique_ptr<KeyEvent> keyEvent = createKeyEvent(msg);
457             mCallbacks.onKeyEvent(std::move(keyEvent), msg.header.seq);
458             break;
459         }
460 
461         case InputMessage::Type::MOTION: {
462             std::unique_ptr<MotionEvent> motionEvent = createMotionEvent(msg);
463             mCallbacks.onMotionEvent(std::move(motionEvent), msg.header.seq);
464             break;
465         }
466 
467         case InputMessage::Type::FINISHED:
468         case InputMessage::Type::TIMELINE: {
469             LOG(FATAL) << "Consumed a " << ftl::enum_string(msg.header.type)
470                        << " message, which should never be seen by InputConsumer on "
471                        << mChannel->getName();
472             break;
473         }
474 
475         case InputMessage::Type::FOCUS: {
476             std::unique_ptr<FocusEvent> focusEvent = createFocusEvent(msg);
477             mCallbacks.onFocusEvent(std::move(focusEvent), msg.header.seq);
478             break;
479         }
480 
481         case InputMessage::Type::CAPTURE: {
482             std::unique_ptr<CaptureEvent> captureEvent = createCaptureEvent(msg);
483             mCallbacks.onCaptureEvent(std::move(captureEvent), msg.header.seq);
484             break;
485         }
486 
487         case InputMessage::Type::DRAG: {
488             std::unique_ptr<DragEvent> dragEvent = createDragEvent(msg);
489             mCallbacks.onDragEvent(std::move(dragEvent), msg.header.seq);
490             break;
491         }
492 
493         case InputMessage::Type::TOUCH_MODE: {
494             std::unique_ptr<TouchModeEvent> touchModeEvent = createTouchModeEvent(msg);
495             mCallbacks.onTouchModeEvent(std::move(touchModeEvent), msg.header.seq);
496             break;
497         }
498     }
499 }
500 
501 std::pair<std::unique_ptr<MotionEvent>, std::optional<uint32_t>>
createBatchedMotionEvent(const std::optional<nsecs_t> requestedFrameTime,std::queue<InputMessage> & messages)502 InputConsumerNoResampling::createBatchedMotionEvent(const std::optional<nsecs_t> requestedFrameTime,
503                                                     std::queue<InputMessage>& messages) {
504     std::unique_ptr<MotionEvent> motionEvent;
505     std::optional<uint32_t> firstSeqForBatch;
506 
507     LOG_IF(FATAL, messages.empty()) << "messages queue is empty!";
508     const DeviceId deviceId = messages.front().body.motion.deviceId;
509     const auto resampler = mResamplers.find(deviceId);
510     const nanoseconds resampleLatency = (resampler != mResamplers.cend())
511             ? resampler->second->getResampleLatency()
512             : nanoseconds{0};
513     // When batching is not enabled, we want to consume all events. That's equivalent to having an
514     // infinite requestedFrameTime.
515     const nanoseconds adjustedFrameTime = (requestedFrameTime.has_value())
516             ? (nanoseconds{*requestedFrameTime} - resampleLatency)
517             : nanoseconds{std::numeric_limits<nsecs_t>::max()};
518 
519     while (!messages.empty() &&
520            (messages.front().body.motion.eventTime <= adjustedFrameTime.count())) {
521         if (motionEvent == nullptr) {
522             motionEvent = createMotionEvent(messages.front());
523             firstSeqForBatch = messages.front().header.seq;
524             const auto [_, inserted] = mBatchedSequenceNumbers.insert({*firstSeqForBatch, {}});
525             LOG_IF(FATAL, !inserted)
526                     << "The sequence " << messages.front().header.seq << " was already present!";
527         } else {
528             addSample(*motionEvent, messages.front());
529             mBatchedSequenceNumbers[*firstSeqForBatch].push_back(messages.front().header.seq);
530         }
531         messages.pop();
532     }
533 
534     // Check if resampling should be performed.
535     InputMessage* futureSample = nullptr;
536     if (!messages.empty()) {
537         futureSample = &messages.front();
538     }
539     if ((motionEvent != nullptr) && (resampler != mResamplers.cend()) &&
540         (requestedFrameTime.has_value())) {
541         resampler->second->resampleMotionEvent(nanoseconds{*requestedFrameTime}, *motionEvent,
542                                                futureSample);
543     }
544 
545     return std::make_pair(std::move(motionEvent), firstSeqForBatch);
546 }
547 
consumeBatchedInputEvents(std::optional<DeviceId> deviceId,std::optional<nsecs_t> requestedFrameTime)548 bool InputConsumerNoResampling::consumeBatchedInputEvents(
549         std::optional<DeviceId> deviceId, std::optional<nsecs_t> requestedFrameTime) {
550     ensureCalledOnLooperThread(__func__);
551     bool producedEvents = false;
552 
553     for (auto deviceIdIter = (deviceId.has_value()) ? (mBatches.find(*deviceId))
554                                                     : (mBatches.begin());
555          deviceIdIter != mBatches.cend(); ++deviceIdIter) {
556         std::queue<InputMessage>& messages = deviceIdIter->second;
557         auto [motion, firstSeqForBatch] = createBatchedMotionEvent(requestedFrameTime, messages);
558         if (motion != nullptr) {
559             LOG_ALWAYS_FATAL_IF(!firstSeqForBatch.has_value());
560             mCallbacks.onMotionEvent(std::move(motion), *firstSeqForBatch);
561             producedEvents = true;
562         } else {
563             // This is OK, it just means that the requestedFrameTime is too old (all events that we
564             // have pending are in the future of the requestedFrameTime). Maybe print a warning? If
565             // there are multiple devices active though, this might be normal and can just be
566             // ignored, unless none of them resulted in any consumption (in that case, this function
567             // would already return "false" so we could just leave it up to the caller).
568         }
569 
570         if (deviceId.has_value()) {
571             // We already consumed events for this device. Break here to prevent iterating over the
572             // other devices.
573             break;
574         }
575     }
576     std::erase_if(mBatches, [](const auto& pair) { return pair.second.empty(); });
577     return producedEvents;
578 }
579 
consumeBatchedInputEvents(std::optional<nsecs_t> requestedFrameTime)580 bool InputConsumerNoResampling::consumeBatchedInputEvents(
581         std::optional<nsecs_t> requestedFrameTime) {
582     return consumeBatchedInputEvents(/*deviceId=*/std::nullopt, requestedFrameTime);
583 }
584 
ensureCalledOnLooperThread(const char * func) const585 void InputConsumerNoResampling::ensureCalledOnLooperThread(const char* func) const {
586     sp<Looper> callingThreadLooper = Looper::getForThread();
587     if (callingThreadLooper != mLooper) {
588         LOG(FATAL) << "The function " << func << " can only be called on the looper thread";
589     }
590 }
591 
dump() const592 std::string InputConsumerNoResampling::dump() const {
593     ensureCalledOnLooperThread(__func__);
594     std::string out;
595     if (mOutboundQueue.empty()) {
596         out += "mOutboundQueue: <empty>\n";
597     } else {
598         out += "mOutboundQueue:\n";
599         // Make a copy of mOutboundQueue for printing destructively. Unfortunately std::queue
600         // doesn't provide a good way to iterate over the entire container.
601         std::queue<InputMessage> tmpQueue = mOutboundQueue;
602         while (!tmpQueue.empty()) {
603             out += std::string("  ") + outboundMessageToString(tmpQueue.front()) + "\n";
604             tmpQueue.pop();
605         }
606     }
607 
608     if (mBatches.empty()) {
609         out += "mBatches: <empty>\n";
610     } else {
611         out += "mBatches:\n";
612         for (const auto& [deviceId, messages] : mBatches) {
613             out += "  Device id ";
614             out += std::to_string(deviceId);
615             out += ":\n";
616             // Make a copy of mOutboundQueue for printing destructively. Unfortunately std::queue
617             // doesn't provide a good way to iterate over the entire container.
618             std::queue<InputMessage> tmpQueue = messages;
619             while (!tmpQueue.empty()) {
620                 LOG_ALWAYS_FATAL_IF(tmpQueue.front().header.type != InputMessage::Type::MOTION);
621                 std::unique_ptr<MotionEvent> motion = createMotionEvent(tmpQueue.front());
622                 out += std::string("    ") + streamableToString(*motion) + "\n";
623                 tmpQueue.pop();
624             }
625         }
626     }
627 
628     return out;
629 }
630 
631 } // namespace android
632