xref: /aosp_15_r20/system/libfmq/include/fmq/MessageQueueBase.h (revision be431cd81a9a2349eaea34eb56fcf6d1608da596)
1 /*
2  * Copyright (C) 2016 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <android-base/unique_fd.h>
20 #include <cutils/ashmem.h>
21 #include <fmq/EventFlag.h>
22 #include <sys/mman.h>
23 #include <sys/user.h>
24 #include <utils/Log.h>
25 #include <utils/SystemClock.h>
26 #include <atomic>
27 #include <functional>
28 
29 using android::hardware::kSynchronizedReadWrite;
30 using android::hardware::kUnsynchronizedWrite;
31 using android::hardware::MQFlavor;
32 
33 namespace android {
34 
35 /* sentinel payload type that indicates the MQ will be used with a mismatching
36 MQDescriptor type, where type safety must be enforced elsewhere because the real
37 element type T is not statically known. This is used to instantiate
38 MessageQueueBase instances for Rust where we cannot generate additional template
39 instantiations across the language boundary. */
40 enum MQErased {};
41 
42 template <template <typename, MQFlavor> class MQDescriptorType, typename T, MQFlavor flavor>
43 struct MessageQueueBase {
44     typedef MQDescriptorType<T, flavor> Descriptor;
45     enum Error : int {
46         NONE,
47         POINTER_CORRUPTION, /** Read/write pointers mismatch */
48     };
49     using ErrorHandler = std::function<void(Error, std::string&&)>;
50 
51     /**
52      * @param Desc MQDescriptor describing the FMQ.
53      * @param resetPointers bool indicating whether the read/write pointers
54      * should be reset or not.
55      */
56     MessageQueueBase(const Descriptor& Desc, bool resetPointers = true);
57 
58     ~MessageQueueBase();
59 
60     /**
61      * This constructor uses Ashmem shared memory to create an FMQ
62      * that can contain a maximum of 'numElementsInQueue' elements of type T.
63      *
64      * @param numElementsInQueue Capacity of the MessageQueue in terms of T.
65      * @param configureEventFlagWord Boolean that specifies if memory should
66      * also be allocated and mapped for an EventFlag word.
67      * @param bufferFd User-supplied file descriptor to map the memory for the ringbuffer
68      * By default, bufferFd=-1 means library will allocate ashmem region for ringbuffer.
69      * MessageQueue takes ownership of the file descriptor.
70      * @param bufferSize size of buffer in bytes that bufferFd represents. This
71      * size must be larger than or equal to (numElementsInQueue * sizeof(T)).
72      * Otherwise, operations will cause out-of-bounds memory access.
73      */
74 
MessageQueueBaseMessageQueueBase75     MessageQueueBase(size_t numElementsInQueue, bool configureEventFlagWord,
76                      android::base::unique_fd bufferFd, size_t bufferSize)
77         : MessageQueueBase(numElementsInQueue, configureEventFlagWord, std::move(bufferFd),
78                            bufferSize, sizeof(T)) {
79         /* We must not pass sizeof(T) as quantum for MQErased element type. */
80         static_assert(!std::is_same_v<T, MQErased>,
81                       "MessageQueueBase<..., MQErased, ...> must be constructed via a"
82                       " constructor that accepts a descriptor or a quantum size");
83     };
84 
85     MessageQueueBase(size_t numElementsInQueue, bool configureEventFlagWord = false)
86         : MessageQueueBase(numElementsInQueue, configureEventFlagWord, android::base::unique_fd(),
87                            0) {}
88 
89     /**
90      * Set a client side error handler function which will be invoked when the FMQ detects
91      * one of the error situations defined by the 'Error' type.
92      */
setErrorHandlerMessageQueueBase93     void setErrorHandler(ErrorHandler&& handler) { mErrorHandler.swap(handler); }
94 
95     /**
96      * @return Number of items of type T that can be written into the FMQ
97      * without a read.
98      */
99     size_t availableToWrite() const;
100 
101     /**
102      * @return Number of items of type T that are waiting to be read from the
103      * FMQ.
104      */
105     size_t availableToRead() const;
106 
107     /**
108      * Returns the size of type T in bytes.
109      *
110      * @return Size of T.
111      */
112     size_t getQuantumSize() const;
113 
114     /**
115      * Returns the size of the FMQ in terms of the size of type T.
116      *
117      * @return Number of items of type T that will fit in the FMQ.
118      */
119     size_t getQuantumCount() const;
120 
121     /**
122      * @return Whether the FMQ is configured correctly.
123      */
124     bool isValid() const;
125 
126     /**
127      * Non-blocking write to FMQ.
128      *
129      * @param data Pointer to the object of type T to be written into the FMQ.
130      *
131      * @return Whether the write was successful.
132      */
133     bool write(const T* data);
134 
135     /**
136      * Non-blocking read from FMQ.
137      *
138      * @param data Pointer to the memory where the object read from the FMQ is
139      * copied to.
140      *
141      * @return Whether the read was successful.
142      */
143     bool read(T* data);
144 
145     /**
146      * Write some data into the FMQ without blocking.
147      *
148      * @param data Pointer to the array of items of type T.
149      * @param count Number of items in array.
150      *
151      * @return Whether the write was successful.
152      */
153     bool write(const T* data, size_t count);
154 
155     /**
156      * Perform a blocking write of 'count' items into the FMQ using EventFlags.
157      * Does not support partial writes.
158      *
159      * If 'evFlag' is nullptr, it is checked whether there is an EventFlag object
160      * associated with the FMQ and it is used in that case.
161      *
162      * The application code must ensure that 'evFlag' used by the
163      * reader(s)/writer is based upon the same EventFlag word.
164      *
165      * The method will return false without blocking if any of the following
166      * conditions are true:
167      * - If 'evFlag' is nullptr and the FMQ does not own an EventFlag object.
168      * - If the 'readNotification' bit mask is zero.
169      * - If 'count' is greater than the FMQ size.
170      *
171      * If the there is insufficient space available to write into it, the
172      * EventFlag bit mask 'readNotification' is is waited upon.
173      *
174      * This method should only be used with a MessageQueue of the flavor
175      * 'kSynchronizedReadWrite'.
176      *
177      * Upon a successful write, wake is called on 'writeNotification' (if
178      * non-zero).
179      *
180      * @param data Pointer to the array of items of type T.
181      * @param count Number of items in array.
182      * @param readNotification The EventFlag bit mask to wait on if there is not
183      * enough space in FMQ to write 'count' items.
184      * @param writeNotification The EventFlag bit mask to call wake on
185      * a successful write. No wake is called if 'writeNotification' is zero.
186      * @param timeOutNanos Number of nanoseconds after which the blocking
187      * write attempt is aborted.
188      * @param evFlag The EventFlag object to be used for blocking. If nullptr,
189      * it is checked whether the FMQ owns an EventFlag object and that is used
190      * for blocking instead.
191      *
192      * @return Whether the write was successful.
193      */
194     bool writeBlocking(const T* data, size_t count, uint32_t readNotification,
195                        uint32_t writeNotification, int64_t timeOutNanos = 0,
196                        android::hardware::EventFlag* evFlag = nullptr);
197 
198     bool writeBlocking(const T* data, size_t count, int64_t timeOutNanos = 0);
199 
200     /**
201      * Read some data from the FMQ without blocking.
202      *
203      * @param data Pointer to the array to which read data is to be written.
204      * @param count Number of items to be read.
205      *
206      * @return Whether the read was successful.
207      */
208     bool read(T* data, size_t count);
209 
210     /**
211      * Perform a blocking read operation of 'count' items from the FMQ. Does not
212      * perform a partial read.
213      *
214      * If 'evFlag' is nullptr, it is checked whether there is an EventFlag object
215      * associated with the FMQ and it is used in that case.
216      *
217      * The application code must ensure that 'evFlag' used by the
218      * reader(s)/writer is based upon the same EventFlag word.
219      *
220      * The method will return false without blocking if any of the following
221      * conditions are true:
222      * -If 'evFlag' is nullptr and the FMQ does not own an EventFlag object.
223      * -If the 'writeNotification' bit mask is zero.
224      * -If 'count' is greater than the FMQ size.
225      *
226      * This method should only be used with a MessageQueue of the flavor
227      * 'kSynchronizedReadWrite'.
228 
229      * If FMQ does not contain 'count' items, the eventFlag bit mask
230      * 'writeNotification' is waited upon. Upon a successful read from the FMQ,
231      * wake is called on 'readNotification' (if non-zero).
232      *
233      * @param data Pointer to the array to which read data is to be written.
234      * @param count Number of items to be read.
235      * @param readNotification The EventFlag bit mask to call wake on after
236      * a successful read. No wake is called if 'readNotification' is zero.
237      * @param writeNotification The EventFlag bit mask to call a wait on
238      * if there is insufficient data in the FMQ to be read.
239      * @param timeOutNanos Number of nanoseconds after which the blocking
240      * read attempt is aborted.
241      * @param evFlag The EventFlag object to be used for blocking.
242      *
243      * @return Whether the read was successful.
244      */
245     bool readBlocking(T* data, size_t count, uint32_t readNotification, uint32_t writeNotification,
246                       int64_t timeOutNanos = 0, android::hardware::EventFlag* evFlag = nullptr);
247 
248     bool readBlocking(T* data, size_t count, int64_t timeOutNanos = 0);
249 
250     /**
251      * Get a pointer to the MQDescriptor object that describes this FMQ.
252      *
253      * @return Pointer to the MQDescriptor associated with the FMQ.
254      */
getDescMessageQueueBase255     const Descriptor* getDesc() const { return mDesc.get(); }
256 
257     /**
258      * Get a pointer to the EventFlag word if there is one associated with this FMQ.
259      *
260      * @return Pointer to an EventFlag word, will return nullptr if not
261      * configured. This method does not transfer ownership. The EventFlag
262      * word will be unmapped by the MessageQueue destructor.
263      */
getEventFlagWordMessageQueueBase264     std::atomic<uint32_t>* getEventFlagWord() const { return mEvFlagWord; }
265 
266     /**
267      * Describes a memory region in the FMQ.
268      */
269     struct MemRegion {
MemRegionMessageQueueBase::MemRegion270         MemRegion() : MemRegion(nullptr, 0) {}
271 
MemRegionMessageQueueBase::MemRegion272         MemRegion(T* base, size_t size) : address(base), length(size) {}
273 
274         MemRegion& operator=(const MemRegion& other) {
275             address = other.address;
276             length = other.length;
277             return *this;
278         }
279 
280         /**
281          * Gets a pointer to the base address of the MemRegion.
282          */
getAddressMessageQueueBase::MemRegion283         inline T* getAddress() const { return address; }
284 
285         /**
286          * Gets the length of the MemRegion. This would equal to the number
287          * of items of type T that can be read from/written into the MemRegion.
288          */
getLengthMessageQueueBase::MemRegion289         inline size_t getLength() const { return length; }
290 
291         /**
292          * Gets the length of the MemRegion in bytes.
293          */
294         template <class U = T>
getLengthInBytesMessageQueueBase::MemRegion295         inline std::enable_if_t<!std::is_same_v<U, MQErased>, size_t> getLengthInBytes() const {
296             return length * kQuantumValue<U>;
297         }
298 
299       private:
300         /* Base address */
301         T* address;
302 
303         /*
304          * Number of items of type T that can be written to/read from the base
305          * address.
306          */
307         size_t length;
308     };
309 
310     /**
311      * Describes the memory regions to be used for a read or write.
312      * The struct contains two MemRegion objects since the FMQ is a ring
313      * buffer and a read or write operation can wrap around. A single message
314      * of type T will never be broken between the two MemRegions.
315      */
316     struct MemTransaction {
MemTransactionMessageQueueBase::MemTransaction317         MemTransaction() : MemTransaction(MemRegion(), MemRegion()) {}
318 
MemTransactionMessageQueueBase::MemTransaction319         MemTransaction(const MemRegion& regionFirst, const MemRegion& regionSecond)
320             : first(regionFirst), second(regionSecond) {}
321 
322         MemTransaction& operator=(const MemTransaction& other) {
323             first = other.first;
324             second = other.second;
325             return *this;
326         }
327 
328         /**
329          * Helper method to calculate the address for a particular index for
330          * the MemTransaction object.
331          *
332          * @param idx Index of the slot to be read/written. If the
333          * MemTransaction object is representing the memory region to read/write
334          * N items of type T, the valid range of idx is between 0 and N-1.
335          *
336          * @return Pointer to the slot idx. Will be nullptr for an invalid idx.
337          */
338         T* getSlot(size_t idx);
339 
340         /**
341          * Helper method to write 'nMessages' items of type T into the memory
342          * regions described by the object starting from 'startIdx'. This method
343          * uses memcpy() and is not to meant to be used for a zero copy operation.
344          * Partial writes are not supported.
345          *
346          * @param data Pointer to the source buffer.
347          * @param nMessages Number of items of type T.
348          * @param startIdx The slot number to begin the write from. If the
349          * MemTransaction object is representing the memory region to read/write
350          * N items of type T, the valid range of startIdx is between 0 and N-1;
351          *
352          * @return Whether the write operation of size 'nMessages' succeeded.
353          */
354         bool copyTo(const T* data, size_t startIdx, size_t nMessages = 1);
355 
356         /*
357          * Helper method to read 'nMessages' items of type T from the memory
358          * regions described by the object starting from 'startIdx'. This method uses
359          * memcpy() and is not meant to be used for a zero copy operation. Partial reads
360          * are not supported.
361          *
362          * @param data Pointer to the destination buffer.
363          * @param nMessages Number of items of type T.
364          * @param startIdx The slot number to begin the read from. If the
365          * MemTransaction object is representing the memory region to read/write
366          * N items of type T, the valid range of startIdx is between 0 and N-1.
367          *
368          * @return Whether the read operation of size 'nMessages' succeeded.
369          */
370         bool copyFrom(T* data, size_t startIdx, size_t nMessages = 1);
371 
372         /**
373          * Returns a const reference to the first MemRegion in the
374          * MemTransaction object.
375          */
getFirstRegionMessageQueueBase::MemTransaction376         inline const MemRegion& getFirstRegion() const { return first; }
377 
378         /**
379          * Returns a const reference to the second MemRegion in the
380          * MemTransaction object.
381          */
getSecondRegionMessageQueueBase::MemTransaction382         inline const MemRegion& getSecondRegion() const { return second; }
383 
384       private:
385         friend MessageQueueBase<MQDescriptorType, T, flavor>;
386 
387         bool copyToSized(const T* data, size_t startIdx, size_t nMessages, size_t messageSize);
388         bool copyFromSized(T* data, size_t startIdx, size_t nMessages, size_t messageSize);
389 
390         /*
391          * Given a start index and the number of messages to be
392          * read/written, this helper method calculates the
393          * number of messages that should should be written to both the first
394          * and second MemRegions and the base addresses to be used for
395          * the read/write operation.
396          *
397          * Returns false if the 'startIdx' and 'nMessages' is
398          * invalid for the MemTransaction object.
399          */
400         bool inline getMemRegionInfo(size_t idx, size_t nMessages, size_t& firstCount,
401                                      size_t& secondCount, T** firstBaseAddress,
402                                      T** secondBaseAddress);
403         MemRegion first;
404         MemRegion second;
405     };
406 
407     /**
408      * Get a MemTransaction object to write 'nMessages' items of type T.
409      * Once the write is performed using the information from MemTransaction,
410      * the write operation is to be committed using a call to commitWrite().
411      *
412      * @param nMessages Number of messages of type T.
413      * @param Pointer to MemTransaction struct that describes memory to write 'nMessages'
414      * items of type T. If a write of size 'nMessages' is not possible, the base
415      * addresses in the MemTransaction object would be set to nullptr.
416      *
417      * @return Whether it is possible to write 'nMessages' items of type T
418      * into the FMQ.
419      */
420     bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
421 
422     /**
423      * Commit a write of size 'nMessages'. To be only used after a call to beginWrite().
424      *
425      * @param nMessages number of messages of type T to be written.
426      *
427      * @return Whether the write operation of size 'nMessages' succeeded.
428      */
429     bool commitWrite(size_t nMessages);
430 
431     /**
432      * Get a MemTransaction object to read 'nMessages' items of type T.
433      * Once the read is performed using the information from MemTransaction,
434      * the read operation is to be committed using a call to commitRead().
435      *
436      * @param nMessages Number of messages of type T.
437      * @param pointer to MemTransaction struct that describes memory to read 'nMessages'
438      * items of type T. If a read of size 'nMessages' is not possible, the base
439      * pointers in the MemTransaction object returned will be set to nullptr.
440      *
441      * @return bool Whether it is possible to read 'nMessages' items of type T
442      * from the FMQ.
443      */
444     bool beginRead(size_t nMessages, MemTransaction* memTx) const;
445 
446     /**
447      * Commit a read of size 'nMessages'. To be only used after a call to beginRead().
448      * For the unsynchronized flavor of FMQ, this method will return a failure
449      * if a write overflow happened after beginRead() was invoked.
450      *
451      * @param nMessages number of messages of type T to be read.
452      *
453      * @return bool Whether the read operation of size 'nMessages' succeeded.
454      */
455     bool commitRead(size_t nMessages);
456 
457     /**
458      * Get the pointer to the ring buffer. Useful for debugging and fuzzing.
459      */
getRingBufferPtrMessageQueueBase460     uint8_t* getRingBufferPtr() const { return mRing; }
461 
462   protected:
463     /**
464      * Protected constructor that can manually specify the quantum to use.
465      * The only external consumer of this ctor is ErasedMessageQueue, but the
466      * constructor cannot be private because this is a base class.
467      *
468      * @param quantum Size of the element type, in bytes.
469      * Other parameters have semantics given in the corresponding public ctor.
470      */
471 
472     MessageQueueBase(size_t numElementsInQueue, bool configureEventFlagWord,
473                      android::base::unique_fd bufferFd, size_t bufferSize, size_t quantum);
474 
475   private:
476     template <class U = T,
477               typename std::enable_if<!std::is_same<U, MQErased>::value, bool>::type = true>
478     static constexpr size_t kQuantumValue = sizeof(T);
479     inline size_t quantum() const;
480     size_t availableToWriteBytes() const;
481     size_t availableToReadBytes() const;
482 
483     MessageQueueBase(const MessageQueueBase& other) = delete;
484     MessageQueueBase& operator=(const MessageQueueBase& other) = delete;
485 
486     void* mapGrantorDescr(uint32_t grantorIdx);
487     void unmapGrantorDescr(void* address, uint32_t grantorIdx);
488     void initMemory(bool resetPointers);
489     bool processOverflow(uint64_t readPtr, uint64_t writePtr) const;
490 
491     enum DefaultEventNotification : uint32_t {
492         /*
493          * These are only used internally by the readBlocking()/writeBlocking()
494          * methods and hence once other bit combinations are not required.
495          */
496         FMQ_NOT_FULL = 0x01,
497         FMQ_NOT_EMPTY = 0x02
498     };
499     std::unique_ptr<Descriptor> mDesc;
500     uint8_t* mRing = nullptr;
501     /*
502      * TODO(b/31550092): Change to 32 bit read and write pointer counters.
503      */
504     std::atomic<uint64_t>* mReadPtr = nullptr;
505     std::atomic<uint64_t>* mWritePtr = nullptr;
506 
507     std::atomic<uint32_t>* mEvFlagWord = nullptr;
508 
509     /*
510      * This EventFlag object will be owned by the FMQ and will have the same
511      * lifetime.
512      */
513     android::hardware::EventFlag* mEventFlag = nullptr;
514 
515     ErrorHandler mErrorHandler;
516 
517     const size_t kPageSize = getpagesize();
518 };
519 
520 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
getSlot(size_t idx)521 T* MessageQueueBase<MQDescriptorType, T, flavor>::MemTransaction::getSlot(size_t idx) {
522     size_t firstRegionLength = first.getLength();
523     size_t secondRegionLength = second.getLength();
524 
525     if (idx > firstRegionLength + secondRegionLength) {
526         return nullptr;
527     }
528 
529     if (idx < firstRegionLength) {
530         return first.getAddress() + idx;
531     }
532 
533     return second.getAddress() + idx - firstRegionLength;
534 }
535 
536 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
getMemRegionInfo(size_t startIdx,size_t nMessages,size_t & firstCount,size_t & secondCount,T ** firstBaseAddress,T ** secondBaseAddress)537 bool MessageQueueBase<MQDescriptorType, T, flavor>::MemTransaction::getMemRegionInfo(
538         size_t startIdx, size_t nMessages, size_t& firstCount, size_t& secondCount,
539         T** firstBaseAddress, T** secondBaseAddress) {
540     size_t firstRegionLength = first.getLength();
541     size_t secondRegionLength = second.getLength();
542 
543     if (startIdx + nMessages > firstRegionLength + secondRegionLength) {
544         /*
545          * Return false if 'nMessages' starting at 'startIdx' cannot be
546          * accommodated by the MemTransaction object.
547          */
548         return false;
549     }
550 
551     /* Number of messages to be read/written to the first MemRegion. */
552     firstCount =
553             startIdx < firstRegionLength ? std::min(nMessages, firstRegionLength - startIdx) : 0;
554 
555     /* Number of messages to be read/written to the second MemRegion. */
556     secondCount = nMessages - firstCount;
557 
558     if (firstCount != 0) {
559         *firstBaseAddress = first.getAddress() + startIdx;
560     }
561 
562     if (secondCount != 0) {
563         size_t secondStartIdx = startIdx > firstRegionLength ? startIdx - firstRegionLength : 0;
564         *secondBaseAddress = second.getAddress() + secondStartIdx;
565     }
566 
567     return true;
568 }
569 
570 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
copyFrom(T * data,size_t startIdx,size_t nMessages)571 bool MessageQueueBase<MQDescriptorType, T, flavor>::MemTransaction::copyFrom(T* data,
572                                                                              size_t startIdx,
573                                                                              size_t nMessages) {
574     if constexpr (!std::is_same<T, MQErased>::value) {
575         return copyFromSized(data, startIdx, nMessages, kQuantumValue<T>);
576     } else {
577         /* Compile error. */
578         static_assert(!std::is_same<T, MQErased>::value,
579                       "copyFrom without messageSize argument cannot be used with MQErased (use "
580                       "copyFromSized)");
581     }
582 }
583 
584 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
copyFromSized(T * data,size_t startIdx,size_t nMessages,size_t messageSize)585 bool MessageQueueBase<MQDescriptorType, T, flavor>::MemTransaction::copyFromSized(
586         T* data, size_t startIdx, size_t nMessages, size_t messageSize) {
587     if (data == nullptr) {
588         return false;
589     }
590 
591     size_t firstReadCount = 0, secondReadCount = 0;
592     T *firstBaseAddress = nullptr, *secondBaseAddress = nullptr;
593 
594     if (getMemRegionInfo(startIdx, nMessages, firstReadCount, secondReadCount, &firstBaseAddress,
595                          &secondBaseAddress) == false) {
596         /*
597          * Returns false if 'startIdx' and 'nMessages' are invalid for this
598          * MemTransaction object.
599          */
600         return false;
601     }
602 
603     if (firstReadCount != 0) {
604         memcpy(data, firstBaseAddress, firstReadCount * messageSize);
605     }
606 
607     if (secondReadCount != 0) {
608         memcpy(data + firstReadCount, secondBaseAddress, secondReadCount * messageSize);
609     }
610 
611     return true;
612 }
613 
614 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
copyTo(const T * data,size_t startIdx,size_t nMessages)615 bool MessageQueueBase<MQDescriptorType, T, flavor>::MemTransaction::copyTo(const T* data,
616                                                                            size_t startIdx,
617                                                                            size_t nMessages) {
618     if constexpr (!std::is_same<T, MQErased>::value) {
619         return copyToSized(data, startIdx, nMessages, kQuantumValue<T>);
620     } else {
621         /* Compile error. */
622         static_assert(!std::is_same<T, MQErased>::value,
623                       "copyTo without messageSize argument cannot be used with MQErased (use "
624                       "copyToSized)");
625     }
626 }
627 
628 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
copyToSized(const T * data,size_t startIdx,size_t nMessages,size_t messageSize)629 bool MessageQueueBase<MQDescriptorType, T, flavor>::MemTransaction::copyToSized(
630         const T* data, size_t startIdx, size_t nMessages, size_t messageSize) {
631     if (data == nullptr) {
632         return false;
633     }
634 
635     size_t firstWriteCount = 0, secondWriteCount = 0;
636     T *firstBaseAddress = nullptr, *secondBaseAddress = nullptr;
637 
638     if (getMemRegionInfo(startIdx, nMessages, firstWriteCount, secondWriteCount, &firstBaseAddress,
639                          &secondBaseAddress) == false) {
640         /*
641          * Returns false if 'startIdx' and 'nMessages' are invalid for this
642          * MemTransaction object.
643          */
644         return false;
645     }
646 
647     if (firstWriteCount != 0) {
648         memcpy(firstBaseAddress, data, firstWriteCount * messageSize);
649     }
650 
651     if (secondWriteCount != 0) {
652         memcpy(secondBaseAddress, data + firstWriteCount, secondWriteCount * messageSize);
653     }
654 
655     return true;
656 }
657 
658 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
initMemory(bool resetPointers)659 void MessageQueueBase<MQDescriptorType, T, flavor>::initMemory(bool resetPointers) {
660     /*
661      * Verify that the Descriptor contains the minimum number of grantors
662      * the native_handle is valid and T matches quantum size.
663      */
664     if ((mDesc == nullptr) || !mDesc->isHandleValid() ||
665         (mDesc->countGrantors() < hardware::details::kMinGrantorCount)) {
666         return;
667     }
668     if (mDesc->getQuantum() != quantum()) {
669         hardware::details::logError(
670                 "Payload size differs between the queue instantiation and the "
671                 "MQDescriptor.");
672         return;
673     }
674 
675     if (flavor == kSynchronizedReadWrite) {
676         mReadPtr = reinterpret_cast<std::atomic<uint64_t>*>(
677                 mapGrantorDescr(hardware::details::READPTRPOS));
678     } else {
679         /*
680          * The unsynchronized write flavor of the FMQ may have multiple readers
681          * and each reader would have their own read pointer counter.
682          */
683         mReadPtr = new (std::nothrow) std::atomic<uint64_t>;
684     }
685     if (mReadPtr == nullptr) goto error;
686 
687     mWritePtr = reinterpret_cast<std::atomic<uint64_t>*>(
688             mapGrantorDescr(hardware::details::WRITEPTRPOS));
689     if (mWritePtr == nullptr) goto error;
690 
691     if (resetPointers) {
692         mReadPtr->store(0, std::memory_order_release);
693         mWritePtr->store(0, std::memory_order_release);
694     } else if (flavor != kSynchronizedReadWrite) {
695         // Always reset the read pointer.
696         mReadPtr->store(0, std::memory_order_release);
697     }
698 
699     mRing = reinterpret_cast<uint8_t*>(mapGrantorDescr(hardware::details::DATAPTRPOS));
700     if (mRing == nullptr) goto error;
701 
702     if (mDesc->countGrantors() > hardware::details::EVFLAGWORDPOS) {
703         mEvFlagWord = static_cast<std::atomic<uint32_t>*>(
704                 mapGrantorDescr(hardware::details::EVFLAGWORDPOS));
705         if (mEvFlagWord == nullptr) goto error;
706         android::hardware::EventFlag::createEventFlag(mEvFlagWord, &mEventFlag);
707     }
708     return;
709 error:
710     if (mReadPtr) {
711         if (flavor == kSynchronizedReadWrite) {
712             unmapGrantorDescr(mReadPtr, hardware::details::READPTRPOS);
713         } else {
714             delete mReadPtr;
715         }
716         mReadPtr = nullptr;
717     }
718     if (mWritePtr) {
719         unmapGrantorDescr(mWritePtr, hardware::details::WRITEPTRPOS);
720         mWritePtr = nullptr;
721     }
722     if (mRing) {
723         unmapGrantorDescr(mRing, hardware::details::EVFLAGWORDPOS);
724         mRing = nullptr;
725     }
726 }
727 
728 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
MessageQueueBase(const Descriptor & Desc,bool resetPointers)729 MessageQueueBase<MQDescriptorType, T, flavor>::MessageQueueBase(const Descriptor& Desc,
730                                                                 bool resetPointers) {
731     mDesc = std::unique_ptr<Descriptor>(new (std::nothrow) Descriptor(Desc));
732     if (mDesc == nullptr || mDesc->getSize() == 0) {
733         hardware::details::logError("MQDescriptor is invalid or queue size is 0.");
734         return;
735     }
736 
737     initMemory(resetPointers);
738 }
739 
740 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
MessageQueueBase(size_t numElementsInQueue,bool configureEventFlagWord,android::base::unique_fd bufferFd,size_t bufferSize,size_t quantum)741 MessageQueueBase<MQDescriptorType, T, flavor>::MessageQueueBase(size_t numElementsInQueue,
742                                                                 bool configureEventFlagWord,
743                                                                 android::base::unique_fd bufferFd,
744                                                                 size_t bufferSize, size_t quantum) {
745     // Check if the buffer size would not overflow size_t
746     if (numElementsInQueue > SIZE_MAX / quantum) {
747         hardware::details::logError("Requested message queue size too large. Size of elements: " +
748                                     std::to_string(quantum) +
749                                     ". Number of elements: " + std::to_string(numElementsInQueue));
750         return;
751     }
752     if (numElementsInQueue == 0) {
753         hardware::details::logError("Requested queue size of 0.");
754         return;
755     }
756     if (bufferFd != -1 && numElementsInQueue * quantum > bufferSize) {
757         hardware::details::logError("The supplied buffer size(" + std::to_string(bufferSize) +
758                                     ") is smaller than the required size(" +
759                                     std::to_string(numElementsInQueue * quantum) + ").");
760         return;
761     }
762     /*
763      * The FMQ needs to allocate memory for the ringbuffer as well as for the
764      * read and write pointer counters. If an EventFlag word is to be configured,
765      * we also need to allocate memory for the same/
766      */
767     size_t kQueueSizeBytes = numElementsInQueue * quantum;
768     size_t kMetaDataSize = 2 * sizeof(android::hardware::details::RingBufferPosition);
769 
770     if (configureEventFlagWord) {
771         kMetaDataSize += sizeof(std::atomic<uint32_t>);
772     }
773 
774     /*
775      * Ashmem memory region size needs to be specified in page-aligned bytes.
776      * kQueueSizeBytes needs to be aligned to word boundary so that all offsets
777      * in the grantorDescriptor will be word aligned.
778      */
779     size_t kAshmemSizePageAligned;
780     if (bufferFd != -1) {
781         // Allocate read counter and write counter only. User-supplied memory will be used for the
782         // ringbuffer.
783         kAshmemSizePageAligned = (kMetaDataSize + kPageSize - 1) & ~(kPageSize - 1);
784     } else {
785         // Allocate ringbuffer, read counter and write counter.
786         kAshmemSizePageAligned = (hardware::details::alignToWordBoundary(kQueueSizeBytes) +
787                                   kMetaDataSize + kPageSize - 1) &
788                                  ~(kPageSize - 1);
789     }
790 
791     /*
792      * The native handle will contain the fds to be mapped.
793      */
794     int numFds = (bufferFd != -1) ? 2 : 1;
795     native_handle_t* mqHandle = native_handle_create(numFds, 0 /* numInts */);
796     if (mqHandle == nullptr) {
797         return;
798     }
799 
800     /*
801      * Create an ashmem region to map the memory.
802      */
803     int ashmemFd = ashmem_create_region("MessageQueue", kAshmemSizePageAligned);
804     ashmem_set_prot_region(ashmemFd, PROT_READ | PROT_WRITE);
805     mqHandle->data[0] = ashmemFd;
806 
807     if (bufferFd != -1) {
808         // Use user-supplied file descriptor for fdIndex 1
809         mqHandle->data[1] = bufferFd.get();
810         // release ownership of fd. mqHandle owns it now.
811         if (bufferFd.release() < 0) {
812             hardware::details::logError("Error releasing supplied bufferFd");
813         }
814 
815         std::vector<android::hardware::GrantorDescriptor> grantors;
816         grantors.resize(configureEventFlagWord ? hardware::details::kMinGrantorCountForEvFlagSupport
817                                                : hardware::details::kMinGrantorCount);
818 
819         size_t memSize[] = {
820                 sizeof(hardware::details::RingBufferPosition), /* memory to be allocated for read
821                                                                   pointer counter */
822                 sizeof(hardware::details::RingBufferPosition), /* memory to be allocated for write
823                                                                   pointer counter */
824                 kQueueSizeBytes,              /* memory to be allocated for data buffer */
825                 sizeof(std::atomic<uint32_t>) /* memory to be allocated for EventFlag word */
826         };
827 
828         for (size_t grantorPos = 0, offset = 0; grantorPos < grantors.size(); grantorPos++) {
829             uint32_t grantorFdIndex;
830             size_t grantorOffset;
831             if (grantorPos == hardware::details::DATAPTRPOS) {
832                 grantorFdIndex = 1;
833                 grantorOffset = 0;
834             } else {
835                 grantorFdIndex = 0;
836                 grantorOffset = offset;
837                 offset += memSize[grantorPos];
838             }
839             grantors[grantorPos] = {
840                     0 /* grantor flags */, grantorFdIndex,
841                     static_cast<uint32_t>(hardware::details::alignToWordBoundary(grantorOffset)),
842                     memSize[grantorPos]};
843         }
844 
845         mDesc = std::unique_ptr<Descriptor>(new (std::nothrow)
846                                                     Descriptor(grantors, mqHandle, quantum));
847     } else {
848         mDesc = std::unique_ptr<Descriptor>(new (std::nothrow) Descriptor(
849                 kQueueSizeBytes, mqHandle, quantum, configureEventFlagWord));
850     }
851     if (mDesc == nullptr) {
852         native_handle_close(mqHandle);
853         native_handle_delete(mqHandle);
854         return;
855     }
856     initMemory(true);
857 }
858 
859 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
~MessageQueueBase()860 MessageQueueBase<MQDescriptorType, T, flavor>::~MessageQueueBase() {
861     if (flavor == kSynchronizedReadWrite && mReadPtr != nullptr) {
862         unmapGrantorDescr(mReadPtr, hardware::details::READPTRPOS);
863     } else if (mReadPtr != nullptr) {
864         delete mReadPtr;
865     }
866     if (mWritePtr != nullptr) {
867         unmapGrantorDescr(mWritePtr, hardware::details::WRITEPTRPOS);
868     }
869     if (mRing != nullptr) {
870         unmapGrantorDescr(mRing, hardware::details::DATAPTRPOS);
871     }
872     if (mEvFlagWord != nullptr) {
873         unmapGrantorDescr(mEvFlagWord, hardware::details::EVFLAGWORDPOS);
874         android::hardware::EventFlag::deleteEventFlag(&mEventFlag);
875     }
876 }
877 
878 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
write(const T * data)879 bool MessageQueueBase<MQDescriptorType, T, flavor>::write(const T* data) {
880     return write(data, 1);
881 }
882 
883 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
read(T * data)884 bool MessageQueueBase<MQDescriptorType, T, flavor>::read(T* data) {
885     return read(data, 1);
886 }
887 
888 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
write(const T * data,size_t nMessages)889 bool MessageQueueBase<MQDescriptorType, T, flavor>::write(const T* data, size_t nMessages) {
890     MemTransaction tx;
891     return beginWrite(nMessages, &tx) &&
892            tx.copyToSized(data, 0 /* startIdx */, nMessages, quantum()) && commitWrite(nMessages);
893 }
894 
895 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
writeBlocking(const T * data,size_t count,uint32_t readNotification,uint32_t writeNotification,int64_t timeOutNanos,android::hardware::EventFlag * evFlag)896 bool MessageQueueBase<MQDescriptorType, T, flavor>::writeBlocking(
897         const T* data, size_t count, uint32_t readNotification, uint32_t writeNotification,
898         int64_t timeOutNanos, android::hardware::EventFlag* evFlag) {
899     static_assert(flavor == kSynchronizedReadWrite,
900                   "writeBlocking can only be used with the "
901                   "kSynchronizedReadWrite flavor.");
902     /*
903      * If evFlag is null and the FMQ does not have its own EventFlag object
904      * return false;
905      * If the flavor is kSynchronizedReadWrite and the readNotification
906      * bit mask is zero return false;
907      * If the count is greater than queue size, return false
908      * to prevent blocking until timeOut.
909      */
910     if (evFlag == nullptr) {
911         evFlag = mEventFlag;
912         if (evFlag == nullptr) {
913             hardware::details::logError(
914                     "writeBlocking failed: called on MessageQueue with no Eventflag"
915                     "configured or provided");
916             return false;
917         }
918     }
919 
920     if (readNotification == 0 || (count > getQuantumCount())) {
921         return false;
922     }
923 
924     /*
925      * There is no need to wait for a readNotification if there is sufficient
926      * space to write is already present in the FMQ. The latter would be the case when
927      * read operations read more number of messages than write operations write.
928      * In other words, a single large read may clear the FMQ after multiple small
929      * writes. This would fail to clear a pending readNotification bit since
930      * EventFlag bits can only be cleared by a wait() call, however the bit would
931      * be correctly cleared by the next writeBlocking() call.
932      */
933 
934     bool result = write(data, count);
935     if (result) {
936         if (writeNotification) {
937             evFlag->wake(writeNotification);
938         }
939         return result;
940     }
941 
942     bool shouldTimeOut = timeOutNanos != 0;
943     int64_t prevTimeNanos = shouldTimeOut ? android::elapsedRealtimeNano() : 0;
944 
945     while (true) {
946         /* It is not required to adjust 'timeOutNanos' if 'shouldTimeOut' is false */
947         if (shouldTimeOut) {
948             /*
949              * The current time and 'prevTimeNanos' are both CLOCK_BOOTTIME clock values(converted
950              * to Nanoseconds)
951              */
952             int64_t currentTimeNs = android::elapsedRealtimeNano();
953             /*
954              * Decrement 'timeOutNanos' to account for the time taken to complete the last
955              * iteration of the while loop.
956              */
957             timeOutNanos -= currentTimeNs - prevTimeNanos;
958             prevTimeNanos = currentTimeNs;
959 
960             if (timeOutNanos <= 0) {
961                 /*
962                  * Attempt write in case a context switch happened outside of
963                  * evFlag->wait().
964                  */
965                 result = write(data, count);
966                 break;
967             }
968         }
969 
970         /*
971          * wait() will return immediately if there was a pending read
972          * notification.
973          */
974         uint32_t efState = 0;
975         status_t status = evFlag->wait(readNotification, &efState, timeOutNanos,
976                                        true /* retry on spurious wake */);
977 
978         if (status != android::TIMED_OUT && status != android::NO_ERROR) {
979             hardware::details::logError("Unexpected error code from EventFlag Wait status " +
980                                         std::to_string(status));
981             break;
982         }
983 
984         if (status == android::TIMED_OUT) {
985             break;
986         }
987 
988         /*
989          * If there is still insufficient space to write to the FMQ,
990          * keep waiting for another readNotification.
991          */
992         if ((efState & readNotification) && write(data, count)) {
993             result = true;
994             break;
995         }
996     }
997 
998     if (result && writeNotification != 0) {
999         evFlag->wake(writeNotification);
1000     }
1001 
1002     return result;
1003 }
1004 
1005 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
writeBlocking(const T * data,size_t count,int64_t timeOutNanos)1006 bool MessageQueueBase<MQDescriptorType, T, flavor>::writeBlocking(const T* data, size_t count,
1007                                                                   int64_t timeOutNanos) {
1008     return writeBlocking(data, count, FMQ_NOT_FULL, FMQ_NOT_EMPTY, timeOutNanos);
1009 }
1010 
1011 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
readBlocking(T * data,size_t count,uint32_t readNotification,uint32_t writeNotification,int64_t timeOutNanos,android::hardware::EventFlag * evFlag)1012 bool MessageQueueBase<MQDescriptorType, T, flavor>::readBlocking(
1013         T* data, size_t count, uint32_t readNotification, uint32_t writeNotification,
1014         int64_t timeOutNanos, android::hardware::EventFlag* evFlag) {
1015     static_assert(flavor == kSynchronizedReadWrite,
1016                   "readBlocking can only be used with the "
1017                   "kSynchronizedReadWrite flavor.");
1018 
1019     /*
1020      * If evFlag is null and the FMQ does not own its own EventFlag object
1021      * return false;
1022      * If the writeNotification bit mask is zero return false;
1023      * If the count is greater than queue size, return false to prevent
1024      * blocking until timeOut.
1025      */
1026     if (evFlag == nullptr) {
1027         evFlag = mEventFlag;
1028         if (evFlag == nullptr) {
1029             hardware::details::logError(
1030                     "readBlocking failed: called on MessageQueue with no Eventflag"
1031                     "configured or provided");
1032             return false;
1033         }
1034     }
1035 
1036     if (writeNotification == 0 || count > getQuantumCount()) {
1037         return false;
1038     }
1039 
1040     /*
1041      * There is no need to wait for a write notification if sufficient
1042      * data to read is already present in the FMQ. This would be the
1043      * case when read operations read lesser number of messages than
1044      * a write operation and multiple reads would be required to clear the queue
1045      * after a single write operation. This check would fail to clear a pending
1046      * writeNotification bit since EventFlag bits can only be cleared
1047      * by a wait() call, however the bit would be correctly cleared by the next
1048      * readBlocking() call.
1049      */
1050 
1051     bool result = read(data, count);
1052     if (result) {
1053         if (readNotification) {
1054             evFlag->wake(readNotification);
1055         }
1056         return result;
1057     }
1058 
1059     bool shouldTimeOut = timeOutNanos != 0;
1060     int64_t prevTimeNanos = shouldTimeOut ? android::elapsedRealtimeNano() : 0;
1061 
1062     while (true) {
1063         /* It is not required to adjust 'timeOutNanos' if 'shouldTimeOut' is false */
1064         if (shouldTimeOut) {
1065             /*
1066              * The current time and 'prevTimeNanos' are both CLOCK_BOOTTIME clock values(converted
1067              * to Nanoseconds)
1068              */
1069             int64_t currentTimeNs = android::elapsedRealtimeNano();
1070             /*
1071              * Decrement 'timeOutNanos' to account for the time taken to complete the last
1072              * iteration of the while loop.
1073              */
1074             timeOutNanos -= currentTimeNs - prevTimeNanos;
1075             prevTimeNanos = currentTimeNs;
1076 
1077             if (timeOutNanos <= 0) {
1078                 /*
1079                  * Attempt read in case a context switch happened outside of
1080                  * evFlag->wait().
1081                  */
1082                 result = read(data, count);
1083                 break;
1084             }
1085         }
1086 
1087         /*
1088          * wait() will return immediately if there was a pending write
1089          * notification.
1090          */
1091         uint32_t efState = 0;
1092         status_t status = evFlag->wait(writeNotification, &efState, timeOutNanos,
1093                                        true /* retry on spurious wake */);
1094 
1095         if (status != android::TIMED_OUT && status != android::NO_ERROR) {
1096             hardware::details::logError("Unexpected error code from EventFlag Wait status " +
1097                                         std::to_string(status));
1098             break;
1099         }
1100 
1101         if (status == android::TIMED_OUT) {
1102             break;
1103         }
1104 
1105         /*
1106          * If the data in FMQ is still insufficient, go back to waiting
1107          * for another write notification.
1108          */
1109         if ((efState & writeNotification) && read(data, count)) {
1110             result = true;
1111             break;
1112         }
1113     }
1114 
1115     if (result && readNotification != 0) {
1116         evFlag->wake(readNotification);
1117     }
1118     return result;
1119 }
1120 
1121 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
readBlocking(T * data,size_t count,int64_t timeOutNanos)1122 bool MessageQueueBase<MQDescriptorType, T, flavor>::readBlocking(T* data, size_t count,
1123                                                                  int64_t timeOutNanos) {
1124     return readBlocking(data, count, FMQ_NOT_FULL, FMQ_NOT_EMPTY, timeOutNanos);
1125 }
1126 
1127 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
quantum()1128 inline size_t MessageQueueBase<MQDescriptorType, T, flavor>::quantum() const {
1129     if constexpr (std::is_same<T, MQErased>::value) {
1130         return mDesc->getQuantum();
1131     } else {
1132         return kQuantumValue<T>;
1133     }
1134 }
1135 
1136 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
availableToWriteBytes()1137 size_t MessageQueueBase<MQDescriptorType, T, flavor>::availableToWriteBytes() const {
1138     size_t queueSizeBytes = mDesc->getSize();
1139     size_t availableBytes = availableToReadBytes();
1140     if (queueSizeBytes < availableBytes) {
1141         std::string errorMsg =
1142                 "The write or read pointer has become corrupted. Writing to the queue is no "
1143                 "longer possible. Queue size: " +
1144                 std::to_string(queueSizeBytes) + ", available: " + std::to_string(availableBytes);
1145         hardware::details::logError(errorMsg);
1146         if (mErrorHandler) {
1147             mErrorHandler(Error::POINTER_CORRUPTION, std::move(errorMsg));
1148         }
1149         return 0;
1150     }
1151     return queueSizeBytes - availableBytes;
1152 }
1153 
1154 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
availableToWrite()1155 size_t MessageQueueBase<MQDescriptorType, T, flavor>::availableToWrite() const {
1156     return availableToWriteBytes() / quantum();
1157 }
1158 
1159 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
availableToRead()1160 size_t MessageQueueBase<MQDescriptorType, T, flavor>::availableToRead() const {
1161     return availableToReadBytes() / quantum();
1162 }
1163 
1164 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
beginWrite(size_t nMessages,MemTransaction * result)1165 bool MessageQueueBase<MQDescriptorType, T, flavor>::beginWrite(size_t nMessages,
1166                                                                MemTransaction* result) const {
1167     /*
1168      * If nMessages is greater than size of FMQ or in case of the synchronized
1169      * FMQ flavor, if there is not enough space to write nMessages, then return
1170      * result with null addresses.
1171      */
1172     if ((flavor == kSynchronizedReadWrite && (availableToWrite() < nMessages)) ||
1173         nMessages > getQuantumCount()) {
1174         *result = MemTransaction();
1175         return false;
1176     }
1177 
1178     auto writePtr = mWritePtr->load(std::memory_order_relaxed);
1179     if (writePtr % quantum() != 0) {
1180         std::string errorMsg =
1181                 "The write pointer has become misaligned. Writing to the queue is not possible. "
1182                 "Pointer: " +
1183                 std::to_string(writePtr) + ", quantum: " + std::to_string(quantum());
1184         hardware::details::logError(errorMsg);
1185         hardware::details::errorWriteLog(0x534e4554, "184963385");
1186         if (mErrorHandler) {
1187             mErrorHandler(Error::POINTER_CORRUPTION, std::move(errorMsg));
1188         }
1189         return false;
1190     }
1191     size_t writeOffset = writePtr % mDesc->getSize();
1192 
1193     /*
1194      * From writeOffset, the number of messages that can be written
1195      * contiguously without wrapping around the ring buffer are calculated.
1196      */
1197     size_t contiguousMessages = (mDesc->getSize() - writeOffset) / quantum();
1198 
1199     if (contiguousMessages < nMessages) {
1200         /*
1201          * Wrap around is required. Both result.first and result.second are
1202          * populated.
1203          */
1204         *result = MemTransaction(
1205                 MemRegion(reinterpret_cast<T*>(mRing + writeOffset), contiguousMessages),
1206                 MemRegion(reinterpret_cast<T*>(mRing), nMessages - contiguousMessages));
1207     } else {
1208         /*
1209          * A wrap around is not required to write nMessages. Only result.first
1210          * is populated.
1211          */
1212         *result = MemTransaction(MemRegion(reinterpret_cast<T*>(mRing + writeOffset), nMessages),
1213                                  MemRegion());
1214     }
1215 
1216     return true;
1217 }
1218 
1219 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
1220 /*
1221  * Disable integer sanitization since integer overflow here is allowed
1222  * and legal.
1223  */
1224 __attribute__((no_sanitize("integer"))) bool
commitWrite(size_t nMessages)1225 MessageQueueBase<MQDescriptorType, T, flavor>::commitWrite(size_t nMessages) {
1226     size_t nBytesWritten = nMessages * quantum();
1227     auto writePtr = mWritePtr->load(std::memory_order_relaxed);
1228     writePtr += nBytesWritten;
1229     mWritePtr->store(writePtr, std::memory_order_release);
1230     /*
1231      * This method cannot fail now since we are only incrementing the writePtr
1232      * counter.
1233      */
1234     return true;
1235 }
1236 
1237 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
availableToReadBytes()1238 size_t MessageQueueBase<MQDescriptorType, T, flavor>::availableToReadBytes() const {
1239     /*
1240      * This method is invoked by implementations of both read() and write() and
1241      * hence requires a memory_order_acquired load for both mReadPtr and
1242      * mWritePtr.
1243      */
1244     uint64_t writePtr = mWritePtr->load(std::memory_order_acquire);
1245     uint64_t readPtr = mReadPtr->load(std::memory_order_acquire);
1246     if (writePtr < readPtr) {
1247         std::string errorMsg =
1248                 "The write or read pointer has become corrupted. Reading from the queue is no "
1249                 "longer possible. Write pointer: " +
1250                 std::to_string(writePtr) + ", read pointer: " + std::to_string(readPtr);
1251         hardware::details::logError(errorMsg);
1252         if (mErrorHandler) {
1253             mErrorHandler(Error::POINTER_CORRUPTION, std::move(errorMsg));
1254         }
1255         return 0;
1256     }
1257     return writePtr - readPtr;
1258 }
1259 
1260 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
read(T * data,size_t nMessages)1261 bool MessageQueueBase<MQDescriptorType, T, flavor>::read(T* data, size_t nMessages) {
1262     MemTransaction tx;
1263     return beginRead(nMessages, &tx) &&
1264            tx.copyFromSized(data, 0 /* startIdx */, nMessages, quantum()) && commitRead(nMessages);
1265 }
1266 
1267 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
1268 /*
1269  * Disable integer sanitization since integer overflow here is allowed
1270  * and legal.
1271  */
1272 __attribute__((no_sanitize("integer"))) bool
processOverflow(uint64_t readPtr,uint64_t writePtr)1273 MessageQueueBase<MQDescriptorType, T, flavor>::processOverflow(uint64_t readPtr,
1274                                                                uint64_t writePtr) const {
1275     if (writePtr - readPtr > mDesc->getSize()) {
1276         /*
1277          * Preserved history can be as big as mDesc->getSize() but we expose only half of that.
1278          * Half of the buffer will be discarded to make space for fast writers and
1279          * reduce chance of repeated overflows. The other half is available to read.
1280          */
1281         size_t historyOffset = getQuantumCount() / 2 * getQuantumSize();
1282         mReadPtr->store(writePtr - historyOffset, std::memory_order_release);
1283         hardware::details::logError("Read failed after an overflow. Resetting read pointer.");
1284         return true;
1285     }
1286     return false;
1287 }
1288 
1289 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
1290 /*
1291  * Disable integer sanitization since integer overflow here is allowed
1292  * and legal.
1293  */
1294 __attribute__((no_sanitize("integer"))) bool
beginRead(size_t nMessages,MemTransaction * result)1295 MessageQueueBase<MQDescriptorType, T, flavor>::beginRead(size_t nMessages,
1296                                                          MemTransaction* result) const {
1297     *result = MemTransaction();
1298     /*
1299      * If it is detected that the data in the queue was overwritten
1300      * due to the reader process being too slow, the read pointer counter
1301      * is set to the same as the write pointer counter to indicate error
1302      * and the read returns false;
1303      * Need acquire/release memory ordering for mWritePtr.
1304      */
1305     auto writePtr = mWritePtr->load(std::memory_order_acquire);
1306     /*
1307      * A relaxed load is sufficient for mReadPtr since there will be no
1308      * stores to mReadPtr from a different thread.
1309      */
1310     auto readPtr = mReadPtr->load(std::memory_order_relaxed);
1311     if (writePtr % quantum() != 0 || readPtr % quantum() != 0) {
1312         hardware::details::logError(
1313                 "The write or read pointer has become misaligned. Reading from the queue is no "
1314                 "longer possible.");
1315         hardware::details::errorWriteLog(0x534e4554, "184963385");
1316         return false;
1317     }
1318 
1319     if (processOverflow(readPtr, writePtr)) {
1320         return false;
1321     }
1322 
1323     size_t nBytesDesired = nMessages * quantum();
1324     /*
1325      * Return if insufficient data to read in FMQ.
1326      */
1327     if (writePtr - readPtr < nBytesDesired) {
1328         return false;
1329     }
1330 
1331     size_t readOffset = readPtr % mDesc->getSize();
1332     /*
1333      * From readOffset, the number of messages that can be read contiguously
1334      * without wrapping around the ring buffer are calculated.
1335      */
1336     size_t contiguousMessages = (mDesc->getSize() - readOffset) / quantum();
1337 
1338     if (contiguousMessages < nMessages) {
1339         /*
1340          * A wrap around is required. Both result.first and result.second
1341          * are populated.
1342          */
1343         *result = MemTransaction(
1344                 MemRegion(reinterpret_cast<T*>(mRing + readOffset), contiguousMessages),
1345                 MemRegion(reinterpret_cast<T*>(mRing), nMessages - contiguousMessages));
1346     } else {
1347         /*
1348          * A wrap around is not required. Only result.first need to be
1349          * populated.
1350          */
1351         *result = MemTransaction(MemRegion(reinterpret_cast<T*>(mRing + readOffset), nMessages),
1352                                  MemRegion());
1353     }
1354 
1355     return true;
1356 }
1357 
1358 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
1359 /*
1360  * Disable integer sanitization since integer overflow here is allowed
1361  * and legal.
1362  */
1363 __attribute__((no_sanitize("integer"))) bool
commitRead(size_t nMessages)1364 MessageQueueBase<MQDescriptorType, T, flavor>::commitRead(size_t nMessages) {
1365     // TODO: Use a local copy of readPtr to avoid relazed mReadPtr loads.
1366     auto readPtr = mReadPtr->load(std::memory_order_relaxed);
1367     auto writePtr = mWritePtr->load(std::memory_order_acquire);
1368 
1369     /*
1370      * If the flavor is unsynchronized, it is possible that a write overflow may
1371      * have occurred between beginRead() and commitRead().
1372      */
1373     if (processOverflow(readPtr, writePtr)) {
1374         return false;
1375     }
1376 
1377     size_t nBytesRead = nMessages * quantum();
1378     readPtr += nBytesRead;
1379     mReadPtr->store(readPtr, std::memory_order_release);
1380     return true;
1381 }
1382 
1383 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
getQuantumSize()1384 size_t MessageQueueBase<MQDescriptorType, T, flavor>::getQuantumSize() const {
1385     return mDesc->getQuantum();
1386 }
1387 
1388 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
getQuantumCount()1389 size_t MessageQueueBase<MQDescriptorType, T, flavor>::getQuantumCount() const {
1390     return mDesc->getSize() / mDesc->getQuantum();
1391 }
1392 
1393 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
isValid()1394 bool MessageQueueBase<MQDescriptorType, T, flavor>::isValid() const {
1395     return mRing != nullptr && mReadPtr != nullptr && mWritePtr != nullptr;
1396 }
1397 
1398 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
mapGrantorDescr(uint32_t grantorIdx)1399 void* MessageQueueBase<MQDescriptorType, T, flavor>::mapGrantorDescr(uint32_t grantorIdx) {
1400     const native_handle_t* handle = mDesc->handle();
1401     const std::vector<android::hardware::GrantorDescriptor> grantors = mDesc->grantors();
1402     if (handle == nullptr) {
1403         hardware::details::logError("mDesc->handle is null");
1404         return nullptr;
1405     }
1406 
1407     if (grantorIdx >= grantors.size()) {
1408         hardware::details::logError(std::string("grantorIdx must be less than ") +
1409                                     std::to_string(grantors.size()));
1410         return nullptr;
1411     }
1412 
1413     int fdIndex = grantors[grantorIdx].fdIndex;
1414     if (fdIndex < 0 || fdIndex >= handle->numFds) {
1415         hardware::details::logError(
1416                 std::string("fdIndex (" + std::to_string(fdIndex) + ") from grantor (index " +
1417                             std::to_string(grantorIdx) +
1418                             ") must be smaller than the number of fds in the handle: " +
1419                             std::to_string(handle->numFds)));
1420         return nullptr;
1421     }
1422 
1423     /*
1424      * Offset for mmap must be a multiple of kPageSize.
1425      */
1426     if (!hardware::details::isAlignedToWordBoundary(grantors[grantorIdx].offset)) {
1427         hardware::details::logError("Grantor (index " + std::to_string(grantorIdx) +
1428                                     ") offset needs to be aligned to word boundary but is: " +
1429                                     std::to_string(grantors[grantorIdx].offset));
1430         return nullptr;
1431     }
1432 
1433     /*
1434      * Expect some grantors to be at least a min size
1435      */
1436     for (uint32_t i = 0; i < grantors.size(); i++) {
1437         switch (i) {
1438             case hardware::details::READPTRPOS:
1439                 if (grantors[i].extent < sizeof(uint64_t)) return nullptr;
1440                 break;
1441             case hardware::details::WRITEPTRPOS:
1442                 if (grantors[i].extent < sizeof(uint64_t)) return nullptr;
1443                 break;
1444             case hardware::details::DATAPTRPOS:
1445                 // We don't expect specific data size
1446                 break;
1447             case hardware::details::EVFLAGWORDPOS:
1448                 if (grantors[i].extent < sizeof(uint32_t)) return nullptr;
1449                 break;
1450             default:
1451                 // We don't care about unknown grantors
1452                 break;
1453         }
1454     }
1455 
1456     int mapOffset = (grantors[grantorIdx].offset / kPageSize) * kPageSize;
1457     if (grantors[grantorIdx].extent < 0 || grantors[grantorIdx].extent > INT_MAX - kPageSize) {
1458         hardware::details::logError(std::string("Grantor (index " + std::to_string(grantorIdx) +
1459                                                 ") extent value is too large or negative: " +
1460                                                 std::to_string(grantors[grantorIdx].extent)));
1461         return nullptr;
1462     }
1463     int mapLength = grantors[grantorIdx].offset - mapOffset + grantors[grantorIdx].extent;
1464 
1465     void* address = mmap(0, mapLength, PROT_READ | PROT_WRITE, MAP_SHARED, handle->data[fdIndex],
1466                          mapOffset);
1467     if (address == MAP_FAILED && errno == EPERM && flavor == kUnsynchronizedWrite) {
1468         // If the supplied memory is read-only, it would fail with EPERM.
1469         // Try again to mmap read-only for the kUnsynchronizedWrite case.
1470         // kSynchronizedReadWrite cannot use read-only memory because the
1471         // read pointer is stored in the shared memory as well.
1472         address = mmap(0, mapLength, PROT_READ, MAP_SHARED, handle->data[fdIndex], mapOffset);
1473     }
1474     if (address == MAP_FAILED) {
1475         hardware::details::logError(std::string("mmap failed: ") + std::to_string(errno));
1476         return nullptr;
1477     }
1478     return reinterpret_cast<uint8_t*>(address) + (grantors[grantorIdx].offset - mapOffset);
1479 }
1480 
1481 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
unmapGrantorDescr(void * address,uint32_t grantorIdx)1482 void MessageQueueBase<MQDescriptorType, T, flavor>::unmapGrantorDescr(void* address,
1483                                                                       uint32_t grantorIdx) {
1484     auto grantors = mDesc->grantors();
1485     if ((address == nullptr) || (grantorIdx >= grantors.size())) {
1486         return;
1487     }
1488 
1489     int mapOffset = (grantors[grantorIdx].offset / kPageSize) * kPageSize;
1490     int mapLength = grantors[grantorIdx].offset - mapOffset + grantors[grantorIdx].extent;
1491     void* baseAddress =
1492             reinterpret_cast<uint8_t*>(address) - (grantors[grantorIdx].offset - mapOffset);
1493     if (baseAddress) munmap(baseAddress, mapLength);
1494 }
1495 
1496 }  // namespace android
1497