1 /*
2 * Copyright (C) 2016 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #pragma once
18
19 #include <android-base/unique_fd.h>
20 #include <cutils/ashmem.h>
21 #include <fmq/EventFlag.h>
22 #include <sys/mman.h>
23 #include <sys/user.h>
24 #include <utils/Log.h>
25 #include <utils/SystemClock.h>
26 #include <atomic>
27 #include <functional>
28
29 using android::hardware::kSynchronizedReadWrite;
30 using android::hardware::kUnsynchronizedWrite;
31 using android::hardware::MQFlavor;
32
33 namespace android {
34
35 /* sentinel payload type that indicates the MQ will be used with a mismatching
36 MQDescriptor type, where type safety must be enforced elsewhere because the real
37 element type T is not statically known. This is used to instantiate
38 MessageQueueBase instances for Rust where we cannot generate additional template
39 instantiations across the language boundary. */
40 enum MQErased {};
41
42 template <template <typename, MQFlavor> class MQDescriptorType, typename T, MQFlavor flavor>
43 struct MessageQueueBase {
44 typedef MQDescriptorType<T, flavor> Descriptor;
45 enum Error : int {
46 NONE,
47 POINTER_CORRUPTION, /** Read/write pointers mismatch */
48 };
49 using ErrorHandler = std::function<void(Error, std::string&&)>;
50
51 /**
52 * @param Desc MQDescriptor describing the FMQ.
53 * @param resetPointers bool indicating whether the read/write pointers
54 * should be reset or not.
55 */
56 MessageQueueBase(const Descriptor& Desc, bool resetPointers = true);
57
58 ~MessageQueueBase();
59
60 /**
61 * This constructor uses Ashmem shared memory to create an FMQ
62 * that can contain a maximum of 'numElementsInQueue' elements of type T.
63 *
64 * @param numElementsInQueue Capacity of the MessageQueue in terms of T.
65 * @param configureEventFlagWord Boolean that specifies if memory should
66 * also be allocated and mapped for an EventFlag word.
67 * @param bufferFd User-supplied file descriptor to map the memory for the ringbuffer
68 * By default, bufferFd=-1 means library will allocate ashmem region for ringbuffer.
69 * MessageQueue takes ownership of the file descriptor.
70 * @param bufferSize size of buffer in bytes that bufferFd represents. This
71 * size must be larger than or equal to (numElementsInQueue * sizeof(T)).
72 * Otherwise, operations will cause out-of-bounds memory access.
73 */
74
MessageQueueBaseMessageQueueBase75 MessageQueueBase(size_t numElementsInQueue, bool configureEventFlagWord,
76 android::base::unique_fd bufferFd, size_t bufferSize)
77 : MessageQueueBase(numElementsInQueue, configureEventFlagWord, std::move(bufferFd),
78 bufferSize, sizeof(T)) {
79 /* We must not pass sizeof(T) as quantum for MQErased element type. */
80 static_assert(!std::is_same_v<T, MQErased>,
81 "MessageQueueBase<..., MQErased, ...> must be constructed via a"
82 " constructor that accepts a descriptor or a quantum size");
83 };
84
85 MessageQueueBase(size_t numElementsInQueue, bool configureEventFlagWord = false)
86 : MessageQueueBase(numElementsInQueue, configureEventFlagWord, android::base::unique_fd(),
87 0) {}
88
89 /**
90 * Set a client side error handler function which will be invoked when the FMQ detects
91 * one of the error situations defined by the 'Error' type.
92 */
setErrorHandlerMessageQueueBase93 void setErrorHandler(ErrorHandler&& handler) { mErrorHandler.swap(handler); }
94
95 /**
96 * @return Number of items of type T that can be written into the FMQ
97 * without a read.
98 */
99 size_t availableToWrite() const;
100
101 /**
102 * @return Number of items of type T that are waiting to be read from the
103 * FMQ.
104 */
105 size_t availableToRead() const;
106
107 /**
108 * Returns the size of type T in bytes.
109 *
110 * @return Size of T.
111 */
112 size_t getQuantumSize() const;
113
114 /**
115 * Returns the size of the FMQ in terms of the size of type T.
116 *
117 * @return Number of items of type T that will fit in the FMQ.
118 */
119 size_t getQuantumCount() const;
120
121 /**
122 * @return Whether the FMQ is configured correctly.
123 */
124 bool isValid() const;
125
126 /**
127 * Non-blocking write to FMQ.
128 *
129 * @param data Pointer to the object of type T to be written into the FMQ.
130 *
131 * @return Whether the write was successful.
132 */
133 bool write(const T* data);
134
135 /**
136 * Non-blocking read from FMQ.
137 *
138 * @param data Pointer to the memory where the object read from the FMQ is
139 * copied to.
140 *
141 * @return Whether the read was successful.
142 */
143 bool read(T* data);
144
145 /**
146 * Write some data into the FMQ without blocking.
147 *
148 * @param data Pointer to the array of items of type T.
149 * @param count Number of items in array.
150 *
151 * @return Whether the write was successful.
152 */
153 bool write(const T* data, size_t count);
154
155 /**
156 * Perform a blocking write of 'count' items into the FMQ using EventFlags.
157 * Does not support partial writes.
158 *
159 * If 'evFlag' is nullptr, it is checked whether there is an EventFlag object
160 * associated with the FMQ and it is used in that case.
161 *
162 * The application code must ensure that 'evFlag' used by the
163 * reader(s)/writer is based upon the same EventFlag word.
164 *
165 * The method will return false without blocking if any of the following
166 * conditions are true:
167 * - If 'evFlag' is nullptr and the FMQ does not own an EventFlag object.
168 * - If the 'readNotification' bit mask is zero.
169 * - If 'count' is greater than the FMQ size.
170 *
171 * If the there is insufficient space available to write into it, the
172 * EventFlag bit mask 'readNotification' is is waited upon.
173 *
174 * This method should only be used with a MessageQueue of the flavor
175 * 'kSynchronizedReadWrite'.
176 *
177 * Upon a successful write, wake is called on 'writeNotification' (if
178 * non-zero).
179 *
180 * @param data Pointer to the array of items of type T.
181 * @param count Number of items in array.
182 * @param readNotification The EventFlag bit mask to wait on if there is not
183 * enough space in FMQ to write 'count' items.
184 * @param writeNotification The EventFlag bit mask to call wake on
185 * a successful write. No wake is called if 'writeNotification' is zero.
186 * @param timeOutNanos Number of nanoseconds after which the blocking
187 * write attempt is aborted.
188 * @param evFlag The EventFlag object to be used for blocking. If nullptr,
189 * it is checked whether the FMQ owns an EventFlag object and that is used
190 * for blocking instead.
191 *
192 * @return Whether the write was successful.
193 */
194 bool writeBlocking(const T* data, size_t count, uint32_t readNotification,
195 uint32_t writeNotification, int64_t timeOutNanos = 0,
196 android::hardware::EventFlag* evFlag = nullptr);
197
198 bool writeBlocking(const T* data, size_t count, int64_t timeOutNanos = 0);
199
200 /**
201 * Read some data from the FMQ without blocking.
202 *
203 * @param data Pointer to the array to which read data is to be written.
204 * @param count Number of items to be read.
205 *
206 * @return Whether the read was successful.
207 */
208 bool read(T* data, size_t count);
209
210 /**
211 * Perform a blocking read operation of 'count' items from the FMQ. Does not
212 * perform a partial read.
213 *
214 * If 'evFlag' is nullptr, it is checked whether there is an EventFlag object
215 * associated with the FMQ and it is used in that case.
216 *
217 * The application code must ensure that 'evFlag' used by the
218 * reader(s)/writer is based upon the same EventFlag word.
219 *
220 * The method will return false without blocking if any of the following
221 * conditions are true:
222 * -If 'evFlag' is nullptr and the FMQ does not own an EventFlag object.
223 * -If the 'writeNotification' bit mask is zero.
224 * -If 'count' is greater than the FMQ size.
225 *
226 * This method should only be used with a MessageQueue of the flavor
227 * 'kSynchronizedReadWrite'.
228
229 * If FMQ does not contain 'count' items, the eventFlag bit mask
230 * 'writeNotification' is waited upon. Upon a successful read from the FMQ,
231 * wake is called on 'readNotification' (if non-zero).
232 *
233 * @param data Pointer to the array to which read data is to be written.
234 * @param count Number of items to be read.
235 * @param readNotification The EventFlag bit mask to call wake on after
236 * a successful read. No wake is called if 'readNotification' is zero.
237 * @param writeNotification The EventFlag bit mask to call a wait on
238 * if there is insufficient data in the FMQ to be read.
239 * @param timeOutNanos Number of nanoseconds after which the blocking
240 * read attempt is aborted.
241 * @param evFlag The EventFlag object to be used for blocking.
242 *
243 * @return Whether the read was successful.
244 */
245 bool readBlocking(T* data, size_t count, uint32_t readNotification, uint32_t writeNotification,
246 int64_t timeOutNanos = 0, android::hardware::EventFlag* evFlag = nullptr);
247
248 bool readBlocking(T* data, size_t count, int64_t timeOutNanos = 0);
249
250 /**
251 * Get a pointer to the MQDescriptor object that describes this FMQ.
252 *
253 * @return Pointer to the MQDescriptor associated with the FMQ.
254 */
getDescMessageQueueBase255 const Descriptor* getDesc() const { return mDesc.get(); }
256
257 /**
258 * Get a pointer to the EventFlag word if there is one associated with this FMQ.
259 *
260 * @return Pointer to an EventFlag word, will return nullptr if not
261 * configured. This method does not transfer ownership. The EventFlag
262 * word will be unmapped by the MessageQueue destructor.
263 */
getEventFlagWordMessageQueueBase264 std::atomic<uint32_t>* getEventFlagWord() const { return mEvFlagWord; }
265
266 /**
267 * Describes a memory region in the FMQ.
268 */
269 struct MemRegion {
MemRegionMessageQueueBase::MemRegion270 MemRegion() : MemRegion(nullptr, 0) {}
271
MemRegionMessageQueueBase::MemRegion272 MemRegion(T* base, size_t size) : address(base), length(size) {}
273
274 MemRegion& operator=(const MemRegion& other) {
275 address = other.address;
276 length = other.length;
277 return *this;
278 }
279
280 /**
281 * Gets a pointer to the base address of the MemRegion.
282 */
getAddressMessageQueueBase::MemRegion283 inline T* getAddress() const { return address; }
284
285 /**
286 * Gets the length of the MemRegion. This would equal to the number
287 * of items of type T that can be read from/written into the MemRegion.
288 */
getLengthMessageQueueBase::MemRegion289 inline size_t getLength() const { return length; }
290
291 /**
292 * Gets the length of the MemRegion in bytes.
293 */
294 template <class U = T>
getLengthInBytesMessageQueueBase::MemRegion295 inline std::enable_if_t<!std::is_same_v<U, MQErased>, size_t> getLengthInBytes() const {
296 return length * kQuantumValue<U>;
297 }
298
299 private:
300 /* Base address */
301 T* address;
302
303 /*
304 * Number of items of type T that can be written to/read from the base
305 * address.
306 */
307 size_t length;
308 };
309
310 /**
311 * Describes the memory regions to be used for a read or write.
312 * The struct contains two MemRegion objects since the FMQ is a ring
313 * buffer and a read or write operation can wrap around. A single message
314 * of type T will never be broken between the two MemRegions.
315 */
316 struct MemTransaction {
MemTransactionMessageQueueBase::MemTransaction317 MemTransaction() : MemTransaction(MemRegion(), MemRegion()) {}
318
MemTransactionMessageQueueBase::MemTransaction319 MemTransaction(const MemRegion& regionFirst, const MemRegion& regionSecond)
320 : first(regionFirst), second(regionSecond) {}
321
322 MemTransaction& operator=(const MemTransaction& other) {
323 first = other.first;
324 second = other.second;
325 return *this;
326 }
327
328 /**
329 * Helper method to calculate the address for a particular index for
330 * the MemTransaction object.
331 *
332 * @param idx Index of the slot to be read/written. If the
333 * MemTransaction object is representing the memory region to read/write
334 * N items of type T, the valid range of idx is between 0 and N-1.
335 *
336 * @return Pointer to the slot idx. Will be nullptr for an invalid idx.
337 */
338 T* getSlot(size_t idx);
339
340 /**
341 * Helper method to write 'nMessages' items of type T into the memory
342 * regions described by the object starting from 'startIdx'. This method
343 * uses memcpy() and is not to meant to be used for a zero copy operation.
344 * Partial writes are not supported.
345 *
346 * @param data Pointer to the source buffer.
347 * @param nMessages Number of items of type T.
348 * @param startIdx The slot number to begin the write from. If the
349 * MemTransaction object is representing the memory region to read/write
350 * N items of type T, the valid range of startIdx is between 0 and N-1;
351 *
352 * @return Whether the write operation of size 'nMessages' succeeded.
353 */
354 bool copyTo(const T* data, size_t startIdx, size_t nMessages = 1);
355
356 /*
357 * Helper method to read 'nMessages' items of type T from the memory
358 * regions described by the object starting from 'startIdx'. This method uses
359 * memcpy() and is not meant to be used for a zero copy operation. Partial reads
360 * are not supported.
361 *
362 * @param data Pointer to the destination buffer.
363 * @param nMessages Number of items of type T.
364 * @param startIdx The slot number to begin the read from. If the
365 * MemTransaction object is representing the memory region to read/write
366 * N items of type T, the valid range of startIdx is between 0 and N-1.
367 *
368 * @return Whether the read operation of size 'nMessages' succeeded.
369 */
370 bool copyFrom(T* data, size_t startIdx, size_t nMessages = 1);
371
372 /**
373 * Returns a const reference to the first MemRegion in the
374 * MemTransaction object.
375 */
getFirstRegionMessageQueueBase::MemTransaction376 inline const MemRegion& getFirstRegion() const { return first; }
377
378 /**
379 * Returns a const reference to the second MemRegion in the
380 * MemTransaction object.
381 */
getSecondRegionMessageQueueBase::MemTransaction382 inline const MemRegion& getSecondRegion() const { return second; }
383
384 private:
385 friend MessageQueueBase<MQDescriptorType, T, flavor>;
386
387 bool copyToSized(const T* data, size_t startIdx, size_t nMessages, size_t messageSize);
388 bool copyFromSized(T* data, size_t startIdx, size_t nMessages, size_t messageSize);
389
390 /*
391 * Given a start index and the number of messages to be
392 * read/written, this helper method calculates the
393 * number of messages that should should be written to both the first
394 * and second MemRegions and the base addresses to be used for
395 * the read/write operation.
396 *
397 * Returns false if the 'startIdx' and 'nMessages' is
398 * invalid for the MemTransaction object.
399 */
400 bool inline getMemRegionInfo(size_t idx, size_t nMessages, size_t& firstCount,
401 size_t& secondCount, T** firstBaseAddress,
402 T** secondBaseAddress);
403 MemRegion first;
404 MemRegion second;
405 };
406
407 /**
408 * Get a MemTransaction object to write 'nMessages' items of type T.
409 * Once the write is performed using the information from MemTransaction,
410 * the write operation is to be committed using a call to commitWrite().
411 *
412 * @param nMessages Number of messages of type T.
413 * @param Pointer to MemTransaction struct that describes memory to write 'nMessages'
414 * items of type T. If a write of size 'nMessages' is not possible, the base
415 * addresses in the MemTransaction object would be set to nullptr.
416 *
417 * @return Whether it is possible to write 'nMessages' items of type T
418 * into the FMQ.
419 */
420 bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
421
422 /**
423 * Commit a write of size 'nMessages'. To be only used after a call to beginWrite().
424 *
425 * @param nMessages number of messages of type T to be written.
426 *
427 * @return Whether the write operation of size 'nMessages' succeeded.
428 */
429 bool commitWrite(size_t nMessages);
430
431 /**
432 * Get a MemTransaction object to read 'nMessages' items of type T.
433 * Once the read is performed using the information from MemTransaction,
434 * the read operation is to be committed using a call to commitRead().
435 *
436 * @param nMessages Number of messages of type T.
437 * @param pointer to MemTransaction struct that describes memory to read 'nMessages'
438 * items of type T. If a read of size 'nMessages' is not possible, the base
439 * pointers in the MemTransaction object returned will be set to nullptr.
440 *
441 * @return bool Whether it is possible to read 'nMessages' items of type T
442 * from the FMQ.
443 */
444 bool beginRead(size_t nMessages, MemTransaction* memTx) const;
445
446 /**
447 * Commit a read of size 'nMessages'. To be only used after a call to beginRead().
448 * For the unsynchronized flavor of FMQ, this method will return a failure
449 * if a write overflow happened after beginRead() was invoked.
450 *
451 * @param nMessages number of messages of type T to be read.
452 *
453 * @return bool Whether the read operation of size 'nMessages' succeeded.
454 */
455 bool commitRead(size_t nMessages);
456
457 /**
458 * Get the pointer to the ring buffer. Useful for debugging and fuzzing.
459 */
getRingBufferPtrMessageQueueBase460 uint8_t* getRingBufferPtr() const { return mRing; }
461
462 protected:
463 /**
464 * Protected constructor that can manually specify the quantum to use.
465 * The only external consumer of this ctor is ErasedMessageQueue, but the
466 * constructor cannot be private because this is a base class.
467 *
468 * @param quantum Size of the element type, in bytes.
469 * Other parameters have semantics given in the corresponding public ctor.
470 */
471
472 MessageQueueBase(size_t numElementsInQueue, bool configureEventFlagWord,
473 android::base::unique_fd bufferFd, size_t bufferSize, size_t quantum);
474
475 private:
476 template <class U = T,
477 typename std::enable_if<!std::is_same<U, MQErased>::value, bool>::type = true>
478 static constexpr size_t kQuantumValue = sizeof(T);
479 inline size_t quantum() const;
480 size_t availableToWriteBytes() const;
481 size_t availableToReadBytes() const;
482
483 MessageQueueBase(const MessageQueueBase& other) = delete;
484 MessageQueueBase& operator=(const MessageQueueBase& other) = delete;
485
486 void* mapGrantorDescr(uint32_t grantorIdx);
487 void unmapGrantorDescr(void* address, uint32_t grantorIdx);
488 void initMemory(bool resetPointers);
489 bool processOverflow(uint64_t readPtr, uint64_t writePtr) const;
490
491 enum DefaultEventNotification : uint32_t {
492 /*
493 * These are only used internally by the readBlocking()/writeBlocking()
494 * methods and hence once other bit combinations are not required.
495 */
496 FMQ_NOT_FULL = 0x01,
497 FMQ_NOT_EMPTY = 0x02
498 };
499 std::unique_ptr<Descriptor> mDesc;
500 uint8_t* mRing = nullptr;
501 /*
502 * TODO(b/31550092): Change to 32 bit read and write pointer counters.
503 */
504 std::atomic<uint64_t>* mReadPtr = nullptr;
505 std::atomic<uint64_t>* mWritePtr = nullptr;
506
507 std::atomic<uint32_t>* mEvFlagWord = nullptr;
508
509 /*
510 * This EventFlag object will be owned by the FMQ and will have the same
511 * lifetime.
512 */
513 android::hardware::EventFlag* mEventFlag = nullptr;
514
515 ErrorHandler mErrorHandler;
516
517 const size_t kPageSize = getpagesize();
518 };
519
520 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
getSlot(size_t idx)521 T* MessageQueueBase<MQDescriptorType, T, flavor>::MemTransaction::getSlot(size_t idx) {
522 size_t firstRegionLength = first.getLength();
523 size_t secondRegionLength = second.getLength();
524
525 if (idx > firstRegionLength + secondRegionLength) {
526 return nullptr;
527 }
528
529 if (idx < firstRegionLength) {
530 return first.getAddress() + idx;
531 }
532
533 return second.getAddress() + idx - firstRegionLength;
534 }
535
536 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
getMemRegionInfo(size_t startIdx,size_t nMessages,size_t & firstCount,size_t & secondCount,T ** firstBaseAddress,T ** secondBaseAddress)537 bool MessageQueueBase<MQDescriptorType, T, flavor>::MemTransaction::getMemRegionInfo(
538 size_t startIdx, size_t nMessages, size_t& firstCount, size_t& secondCount,
539 T** firstBaseAddress, T** secondBaseAddress) {
540 size_t firstRegionLength = first.getLength();
541 size_t secondRegionLength = second.getLength();
542
543 if (startIdx + nMessages > firstRegionLength + secondRegionLength) {
544 /*
545 * Return false if 'nMessages' starting at 'startIdx' cannot be
546 * accommodated by the MemTransaction object.
547 */
548 return false;
549 }
550
551 /* Number of messages to be read/written to the first MemRegion. */
552 firstCount =
553 startIdx < firstRegionLength ? std::min(nMessages, firstRegionLength - startIdx) : 0;
554
555 /* Number of messages to be read/written to the second MemRegion. */
556 secondCount = nMessages - firstCount;
557
558 if (firstCount != 0) {
559 *firstBaseAddress = first.getAddress() + startIdx;
560 }
561
562 if (secondCount != 0) {
563 size_t secondStartIdx = startIdx > firstRegionLength ? startIdx - firstRegionLength : 0;
564 *secondBaseAddress = second.getAddress() + secondStartIdx;
565 }
566
567 return true;
568 }
569
570 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
copyFrom(T * data,size_t startIdx,size_t nMessages)571 bool MessageQueueBase<MQDescriptorType, T, flavor>::MemTransaction::copyFrom(T* data,
572 size_t startIdx,
573 size_t nMessages) {
574 if constexpr (!std::is_same<T, MQErased>::value) {
575 return copyFromSized(data, startIdx, nMessages, kQuantumValue<T>);
576 } else {
577 /* Compile error. */
578 static_assert(!std::is_same<T, MQErased>::value,
579 "copyFrom without messageSize argument cannot be used with MQErased (use "
580 "copyFromSized)");
581 }
582 }
583
584 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
copyFromSized(T * data,size_t startIdx,size_t nMessages,size_t messageSize)585 bool MessageQueueBase<MQDescriptorType, T, flavor>::MemTransaction::copyFromSized(
586 T* data, size_t startIdx, size_t nMessages, size_t messageSize) {
587 if (data == nullptr) {
588 return false;
589 }
590
591 size_t firstReadCount = 0, secondReadCount = 0;
592 T *firstBaseAddress = nullptr, *secondBaseAddress = nullptr;
593
594 if (getMemRegionInfo(startIdx, nMessages, firstReadCount, secondReadCount, &firstBaseAddress,
595 &secondBaseAddress) == false) {
596 /*
597 * Returns false if 'startIdx' and 'nMessages' are invalid for this
598 * MemTransaction object.
599 */
600 return false;
601 }
602
603 if (firstReadCount != 0) {
604 memcpy(data, firstBaseAddress, firstReadCount * messageSize);
605 }
606
607 if (secondReadCount != 0) {
608 memcpy(data + firstReadCount, secondBaseAddress, secondReadCount * messageSize);
609 }
610
611 return true;
612 }
613
614 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
copyTo(const T * data,size_t startIdx,size_t nMessages)615 bool MessageQueueBase<MQDescriptorType, T, flavor>::MemTransaction::copyTo(const T* data,
616 size_t startIdx,
617 size_t nMessages) {
618 if constexpr (!std::is_same<T, MQErased>::value) {
619 return copyToSized(data, startIdx, nMessages, kQuantumValue<T>);
620 } else {
621 /* Compile error. */
622 static_assert(!std::is_same<T, MQErased>::value,
623 "copyTo without messageSize argument cannot be used with MQErased (use "
624 "copyToSized)");
625 }
626 }
627
628 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
copyToSized(const T * data,size_t startIdx,size_t nMessages,size_t messageSize)629 bool MessageQueueBase<MQDescriptorType, T, flavor>::MemTransaction::copyToSized(
630 const T* data, size_t startIdx, size_t nMessages, size_t messageSize) {
631 if (data == nullptr) {
632 return false;
633 }
634
635 size_t firstWriteCount = 0, secondWriteCount = 0;
636 T *firstBaseAddress = nullptr, *secondBaseAddress = nullptr;
637
638 if (getMemRegionInfo(startIdx, nMessages, firstWriteCount, secondWriteCount, &firstBaseAddress,
639 &secondBaseAddress) == false) {
640 /*
641 * Returns false if 'startIdx' and 'nMessages' are invalid for this
642 * MemTransaction object.
643 */
644 return false;
645 }
646
647 if (firstWriteCount != 0) {
648 memcpy(firstBaseAddress, data, firstWriteCount * messageSize);
649 }
650
651 if (secondWriteCount != 0) {
652 memcpy(secondBaseAddress, data + firstWriteCount, secondWriteCount * messageSize);
653 }
654
655 return true;
656 }
657
658 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
initMemory(bool resetPointers)659 void MessageQueueBase<MQDescriptorType, T, flavor>::initMemory(bool resetPointers) {
660 /*
661 * Verify that the Descriptor contains the minimum number of grantors
662 * the native_handle is valid and T matches quantum size.
663 */
664 if ((mDesc == nullptr) || !mDesc->isHandleValid() ||
665 (mDesc->countGrantors() < hardware::details::kMinGrantorCount)) {
666 return;
667 }
668 if (mDesc->getQuantum() != quantum()) {
669 hardware::details::logError(
670 "Payload size differs between the queue instantiation and the "
671 "MQDescriptor.");
672 return;
673 }
674
675 if (flavor == kSynchronizedReadWrite) {
676 mReadPtr = reinterpret_cast<std::atomic<uint64_t>*>(
677 mapGrantorDescr(hardware::details::READPTRPOS));
678 } else {
679 /*
680 * The unsynchronized write flavor of the FMQ may have multiple readers
681 * and each reader would have their own read pointer counter.
682 */
683 mReadPtr = new (std::nothrow) std::atomic<uint64_t>;
684 }
685 if (mReadPtr == nullptr) goto error;
686
687 mWritePtr = reinterpret_cast<std::atomic<uint64_t>*>(
688 mapGrantorDescr(hardware::details::WRITEPTRPOS));
689 if (mWritePtr == nullptr) goto error;
690
691 if (resetPointers) {
692 mReadPtr->store(0, std::memory_order_release);
693 mWritePtr->store(0, std::memory_order_release);
694 } else if (flavor != kSynchronizedReadWrite) {
695 // Always reset the read pointer.
696 mReadPtr->store(0, std::memory_order_release);
697 }
698
699 mRing = reinterpret_cast<uint8_t*>(mapGrantorDescr(hardware::details::DATAPTRPOS));
700 if (mRing == nullptr) goto error;
701
702 if (mDesc->countGrantors() > hardware::details::EVFLAGWORDPOS) {
703 mEvFlagWord = static_cast<std::atomic<uint32_t>*>(
704 mapGrantorDescr(hardware::details::EVFLAGWORDPOS));
705 if (mEvFlagWord == nullptr) goto error;
706 android::hardware::EventFlag::createEventFlag(mEvFlagWord, &mEventFlag);
707 }
708 return;
709 error:
710 if (mReadPtr) {
711 if (flavor == kSynchronizedReadWrite) {
712 unmapGrantorDescr(mReadPtr, hardware::details::READPTRPOS);
713 } else {
714 delete mReadPtr;
715 }
716 mReadPtr = nullptr;
717 }
718 if (mWritePtr) {
719 unmapGrantorDescr(mWritePtr, hardware::details::WRITEPTRPOS);
720 mWritePtr = nullptr;
721 }
722 if (mRing) {
723 unmapGrantorDescr(mRing, hardware::details::EVFLAGWORDPOS);
724 mRing = nullptr;
725 }
726 }
727
728 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
MessageQueueBase(const Descriptor & Desc,bool resetPointers)729 MessageQueueBase<MQDescriptorType, T, flavor>::MessageQueueBase(const Descriptor& Desc,
730 bool resetPointers) {
731 mDesc = std::unique_ptr<Descriptor>(new (std::nothrow) Descriptor(Desc));
732 if (mDesc == nullptr || mDesc->getSize() == 0) {
733 hardware::details::logError("MQDescriptor is invalid or queue size is 0.");
734 return;
735 }
736
737 initMemory(resetPointers);
738 }
739
740 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
MessageQueueBase(size_t numElementsInQueue,bool configureEventFlagWord,android::base::unique_fd bufferFd,size_t bufferSize,size_t quantum)741 MessageQueueBase<MQDescriptorType, T, flavor>::MessageQueueBase(size_t numElementsInQueue,
742 bool configureEventFlagWord,
743 android::base::unique_fd bufferFd,
744 size_t bufferSize, size_t quantum) {
745 // Check if the buffer size would not overflow size_t
746 if (numElementsInQueue > SIZE_MAX / quantum) {
747 hardware::details::logError("Requested message queue size too large. Size of elements: " +
748 std::to_string(quantum) +
749 ". Number of elements: " + std::to_string(numElementsInQueue));
750 return;
751 }
752 if (numElementsInQueue == 0) {
753 hardware::details::logError("Requested queue size of 0.");
754 return;
755 }
756 if (bufferFd != -1 && numElementsInQueue * quantum > bufferSize) {
757 hardware::details::logError("The supplied buffer size(" + std::to_string(bufferSize) +
758 ") is smaller than the required size(" +
759 std::to_string(numElementsInQueue * quantum) + ").");
760 return;
761 }
762 /*
763 * The FMQ needs to allocate memory for the ringbuffer as well as for the
764 * read and write pointer counters. If an EventFlag word is to be configured,
765 * we also need to allocate memory for the same/
766 */
767 size_t kQueueSizeBytes = numElementsInQueue * quantum;
768 size_t kMetaDataSize = 2 * sizeof(android::hardware::details::RingBufferPosition);
769
770 if (configureEventFlagWord) {
771 kMetaDataSize += sizeof(std::atomic<uint32_t>);
772 }
773
774 /*
775 * Ashmem memory region size needs to be specified in page-aligned bytes.
776 * kQueueSizeBytes needs to be aligned to word boundary so that all offsets
777 * in the grantorDescriptor will be word aligned.
778 */
779 size_t kAshmemSizePageAligned;
780 if (bufferFd != -1) {
781 // Allocate read counter and write counter only. User-supplied memory will be used for the
782 // ringbuffer.
783 kAshmemSizePageAligned = (kMetaDataSize + kPageSize - 1) & ~(kPageSize - 1);
784 } else {
785 // Allocate ringbuffer, read counter and write counter.
786 kAshmemSizePageAligned = (hardware::details::alignToWordBoundary(kQueueSizeBytes) +
787 kMetaDataSize + kPageSize - 1) &
788 ~(kPageSize - 1);
789 }
790
791 /*
792 * The native handle will contain the fds to be mapped.
793 */
794 int numFds = (bufferFd != -1) ? 2 : 1;
795 native_handle_t* mqHandle = native_handle_create(numFds, 0 /* numInts */);
796 if (mqHandle == nullptr) {
797 return;
798 }
799
800 /*
801 * Create an ashmem region to map the memory.
802 */
803 int ashmemFd = ashmem_create_region("MessageQueue", kAshmemSizePageAligned);
804 ashmem_set_prot_region(ashmemFd, PROT_READ | PROT_WRITE);
805 mqHandle->data[0] = ashmemFd;
806
807 if (bufferFd != -1) {
808 // Use user-supplied file descriptor for fdIndex 1
809 mqHandle->data[1] = bufferFd.get();
810 // release ownership of fd. mqHandle owns it now.
811 if (bufferFd.release() < 0) {
812 hardware::details::logError("Error releasing supplied bufferFd");
813 }
814
815 std::vector<android::hardware::GrantorDescriptor> grantors;
816 grantors.resize(configureEventFlagWord ? hardware::details::kMinGrantorCountForEvFlagSupport
817 : hardware::details::kMinGrantorCount);
818
819 size_t memSize[] = {
820 sizeof(hardware::details::RingBufferPosition), /* memory to be allocated for read
821 pointer counter */
822 sizeof(hardware::details::RingBufferPosition), /* memory to be allocated for write
823 pointer counter */
824 kQueueSizeBytes, /* memory to be allocated for data buffer */
825 sizeof(std::atomic<uint32_t>) /* memory to be allocated for EventFlag word */
826 };
827
828 for (size_t grantorPos = 0, offset = 0; grantorPos < grantors.size(); grantorPos++) {
829 uint32_t grantorFdIndex;
830 size_t grantorOffset;
831 if (grantorPos == hardware::details::DATAPTRPOS) {
832 grantorFdIndex = 1;
833 grantorOffset = 0;
834 } else {
835 grantorFdIndex = 0;
836 grantorOffset = offset;
837 offset += memSize[grantorPos];
838 }
839 grantors[grantorPos] = {
840 0 /* grantor flags */, grantorFdIndex,
841 static_cast<uint32_t>(hardware::details::alignToWordBoundary(grantorOffset)),
842 memSize[grantorPos]};
843 }
844
845 mDesc = std::unique_ptr<Descriptor>(new (std::nothrow)
846 Descriptor(grantors, mqHandle, quantum));
847 } else {
848 mDesc = std::unique_ptr<Descriptor>(new (std::nothrow) Descriptor(
849 kQueueSizeBytes, mqHandle, quantum, configureEventFlagWord));
850 }
851 if (mDesc == nullptr) {
852 native_handle_close(mqHandle);
853 native_handle_delete(mqHandle);
854 return;
855 }
856 initMemory(true);
857 }
858
859 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
~MessageQueueBase()860 MessageQueueBase<MQDescriptorType, T, flavor>::~MessageQueueBase() {
861 if (flavor == kSynchronizedReadWrite && mReadPtr != nullptr) {
862 unmapGrantorDescr(mReadPtr, hardware::details::READPTRPOS);
863 } else if (mReadPtr != nullptr) {
864 delete mReadPtr;
865 }
866 if (mWritePtr != nullptr) {
867 unmapGrantorDescr(mWritePtr, hardware::details::WRITEPTRPOS);
868 }
869 if (mRing != nullptr) {
870 unmapGrantorDescr(mRing, hardware::details::DATAPTRPOS);
871 }
872 if (mEvFlagWord != nullptr) {
873 unmapGrantorDescr(mEvFlagWord, hardware::details::EVFLAGWORDPOS);
874 android::hardware::EventFlag::deleteEventFlag(&mEventFlag);
875 }
876 }
877
878 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
write(const T * data)879 bool MessageQueueBase<MQDescriptorType, T, flavor>::write(const T* data) {
880 return write(data, 1);
881 }
882
883 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
read(T * data)884 bool MessageQueueBase<MQDescriptorType, T, flavor>::read(T* data) {
885 return read(data, 1);
886 }
887
888 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
write(const T * data,size_t nMessages)889 bool MessageQueueBase<MQDescriptorType, T, flavor>::write(const T* data, size_t nMessages) {
890 MemTransaction tx;
891 return beginWrite(nMessages, &tx) &&
892 tx.copyToSized(data, 0 /* startIdx */, nMessages, quantum()) && commitWrite(nMessages);
893 }
894
895 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
writeBlocking(const T * data,size_t count,uint32_t readNotification,uint32_t writeNotification,int64_t timeOutNanos,android::hardware::EventFlag * evFlag)896 bool MessageQueueBase<MQDescriptorType, T, flavor>::writeBlocking(
897 const T* data, size_t count, uint32_t readNotification, uint32_t writeNotification,
898 int64_t timeOutNanos, android::hardware::EventFlag* evFlag) {
899 static_assert(flavor == kSynchronizedReadWrite,
900 "writeBlocking can only be used with the "
901 "kSynchronizedReadWrite flavor.");
902 /*
903 * If evFlag is null and the FMQ does not have its own EventFlag object
904 * return false;
905 * If the flavor is kSynchronizedReadWrite and the readNotification
906 * bit mask is zero return false;
907 * If the count is greater than queue size, return false
908 * to prevent blocking until timeOut.
909 */
910 if (evFlag == nullptr) {
911 evFlag = mEventFlag;
912 if (evFlag == nullptr) {
913 hardware::details::logError(
914 "writeBlocking failed: called on MessageQueue with no Eventflag"
915 "configured or provided");
916 return false;
917 }
918 }
919
920 if (readNotification == 0 || (count > getQuantumCount())) {
921 return false;
922 }
923
924 /*
925 * There is no need to wait for a readNotification if there is sufficient
926 * space to write is already present in the FMQ. The latter would be the case when
927 * read operations read more number of messages than write operations write.
928 * In other words, a single large read may clear the FMQ after multiple small
929 * writes. This would fail to clear a pending readNotification bit since
930 * EventFlag bits can only be cleared by a wait() call, however the bit would
931 * be correctly cleared by the next writeBlocking() call.
932 */
933
934 bool result = write(data, count);
935 if (result) {
936 if (writeNotification) {
937 evFlag->wake(writeNotification);
938 }
939 return result;
940 }
941
942 bool shouldTimeOut = timeOutNanos != 0;
943 int64_t prevTimeNanos = shouldTimeOut ? android::elapsedRealtimeNano() : 0;
944
945 while (true) {
946 /* It is not required to adjust 'timeOutNanos' if 'shouldTimeOut' is false */
947 if (shouldTimeOut) {
948 /*
949 * The current time and 'prevTimeNanos' are both CLOCK_BOOTTIME clock values(converted
950 * to Nanoseconds)
951 */
952 int64_t currentTimeNs = android::elapsedRealtimeNano();
953 /*
954 * Decrement 'timeOutNanos' to account for the time taken to complete the last
955 * iteration of the while loop.
956 */
957 timeOutNanos -= currentTimeNs - prevTimeNanos;
958 prevTimeNanos = currentTimeNs;
959
960 if (timeOutNanos <= 0) {
961 /*
962 * Attempt write in case a context switch happened outside of
963 * evFlag->wait().
964 */
965 result = write(data, count);
966 break;
967 }
968 }
969
970 /*
971 * wait() will return immediately if there was a pending read
972 * notification.
973 */
974 uint32_t efState = 0;
975 status_t status = evFlag->wait(readNotification, &efState, timeOutNanos,
976 true /* retry on spurious wake */);
977
978 if (status != android::TIMED_OUT && status != android::NO_ERROR) {
979 hardware::details::logError("Unexpected error code from EventFlag Wait status " +
980 std::to_string(status));
981 break;
982 }
983
984 if (status == android::TIMED_OUT) {
985 break;
986 }
987
988 /*
989 * If there is still insufficient space to write to the FMQ,
990 * keep waiting for another readNotification.
991 */
992 if ((efState & readNotification) && write(data, count)) {
993 result = true;
994 break;
995 }
996 }
997
998 if (result && writeNotification != 0) {
999 evFlag->wake(writeNotification);
1000 }
1001
1002 return result;
1003 }
1004
1005 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
writeBlocking(const T * data,size_t count,int64_t timeOutNanos)1006 bool MessageQueueBase<MQDescriptorType, T, flavor>::writeBlocking(const T* data, size_t count,
1007 int64_t timeOutNanos) {
1008 return writeBlocking(data, count, FMQ_NOT_FULL, FMQ_NOT_EMPTY, timeOutNanos);
1009 }
1010
1011 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
readBlocking(T * data,size_t count,uint32_t readNotification,uint32_t writeNotification,int64_t timeOutNanos,android::hardware::EventFlag * evFlag)1012 bool MessageQueueBase<MQDescriptorType, T, flavor>::readBlocking(
1013 T* data, size_t count, uint32_t readNotification, uint32_t writeNotification,
1014 int64_t timeOutNanos, android::hardware::EventFlag* evFlag) {
1015 static_assert(flavor == kSynchronizedReadWrite,
1016 "readBlocking can only be used with the "
1017 "kSynchronizedReadWrite flavor.");
1018
1019 /*
1020 * If evFlag is null and the FMQ does not own its own EventFlag object
1021 * return false;
1022 * If the writeNotification bit mask is zero return false;
1023 * If the count is greater than queue size, return false to prevent
1024 * blocking until timeOut.
1025 */
1026 if (evFlag == nullptr) {
1027 evFlag = mEventFlag;
1028 if (evFlag == nullptr) {
1029 hardware::details::logError(
1030 "readBlocking failed: called on MessageQueue with no Eventflag"
1031 "configured or provided");
1032 return false;
1033 }
1034 }
1035
1036 if (writeNotification == 0 || count > getQuantumCount()) {
1037 return false;
1038 }
1039
1040 /*
1041 * There is no need to wait for a write notification if sufficient
1042 * data to read is already present in the FMQ. This would be the
1043 * case when read operations read lesser number of messages than
1044 * a write operation and multiple reads would be required to clear the queue
1045 * after a single write operation. This check would fail to clear a pending
1046 * writeNotification bit since EventFlag bits can only be cleared
1047 * by a wait() call, however the bit would be correctly cleared by the next
1048 * readBlocking() call.
1049 */
1050
1051 bool result = read(data, count);
1052 if (result) {
1053 if (readNotification) {
1054 evFlag->wake(readNotification);
1055 }
1056 return result;
1057 }
1058
1059 bool shouldTimeOut = timeOutNanos != 0;
1060 int64_t prevTimeNanos = shouldTimeOut ? android::elapsedRealtimeNano() : 0;
1061
1062 while (true) {
1063 /* It is not required to adjust 'timeOutNanos' if 'shouldTimeOut' is false */
1064 if (shouldTimeOut) {
1065 /*
1066 * The current time and 'prevTimeNanos' are both CLOCK_BOOTTIME clock values(converted
1067 * to Nanoseconds)
1068 */
1069 int64_t currentTimeNs = android::elapsedRealtimeNano();
1070 /*
1071 * Decrement 'timeOutNanos' to account for the time taken to complete the last
1072 * iteration of the while loop.
1073 */
1074 timeOutNanos -= currentTimeNs - prevTimeNanos;
1075 prevTimeNanos = currentTimeNs;
1076
1077 if (timeOutNanos <= 0) {
1078 /*
1079 * Attempt read in case a context switch happened outside of
1080 * evFlag->wait().
1081 */
1082 result = read(data, count);
1083 break;
1084 }
1085 }
1086
1087 /*
1088 * wait() will return immediately if there was a pending write
1089 * notification.
1090 */
1091 uint32_t efState = 0;
1092 status_t status = evFlag->wait(writeNotification, &efState, timeOutNanos,
1093 true /* retry on spurious wake */);
1094
1095 if (status != android::TIMED_OUT && status != android::NO_ERROR) {
1096 hardware::details::logError("Unexpected error code from EventFlag Wait status " +
1097 std::to_string(status));
1098 break;
1099 }
1100
1101 if (status == android::TIMED_OUT) {
1102 break;
1103 }
1104
1105 /*
1106 * If the data in FMQ is still insufficient, go back to waiting
1107 * for another write notification.
1108 */
1109 if ((efState & writeNotification) && read(data, count)) {
1110 result = true;
1111 break;
1112 }
1113 }
1114
1115 if (result && readNotification != 0) {
1116 evFlag->wake(readNotification);
1117 }
1118 return result;
1119 }
1120
1121 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
readBlocking(T * data,size_t count,int64_t timeOutNanos)1122 bool MessageQueueBase<MQDescriptorType, T, flavor>::readBlocking(T* data, size_t count,
1123 int64_t timeOutNanos) {
1124 return readBlocking(data, count, FMQ_NOT_FULL, FMQ_NOT_EMPTY, timeOutNanos);
1125 }
1126
1127 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
quantum()1128 inline size_t MessageQueueBase<MQDescriptorType, T, flavor>::quantum() const {
1129 if constexpr (std::is_same<T, MQErased>::value) {
1130 return mDesc->getQuantum();
1131 } else {
1132 return kQuantumValue<T>;
1133 }
1134 }
1135
1136 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
availableToWriteBytes()1137 size_t MessageQueueBase<MQDescriptorType, T, flavor>::availableToWriteBytes() const {
1138 size_t queueSizeBytes = mDesc->getSize();
1139 size_t availableBytes = availableToReadBytes();
1140 if (queueSizeBytes < availableBytes) {
1141 std::string errorMsg =
1142 "The write or read pointer has become corrupted. Writing to the queue is no "
1143 "longer possible. Queue size: " +
1144 std::to_string(queueSizeBytes) + ", available: " + std::to_string(availableBytes);
1145 hardware::details::logError(errorMsg);
1146 if (mErrorHandler) {
1147 mErrorHandler(Error::POINTER_CORRUPTION, std::move(errorMsg));
1148 }
1149 return 0;
1150 }
1151 return queueSizeBytes - availableBytes;
1152 }
1153
1154 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
availableToWrite()1155 size_t MessageQueueBase<MQDescriptorType, T, flavor>::availableToWrite() const {
1156 return availableToWriteBytes() / quantum();
1157 }
1158
1159 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
availableToRead()1160 size_t MessageQueueBase<MQDescriptorType, T, flavor>::availableToRead() const {
1161 return availableToReadBytes() / quantum();
1162 }
1163
1164 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
beginWrite(size_t nMessages,MemTransaction * result)1165 bool MessageQueueBase<MQDescriptorType, T, flavor>::beginWrite(size_t nMessages,
1166 MemTransaction* result) const {
1167 /*
1168 * If nMessages is greater than size of FMQ or in case of the synchronized
1169 * FMQ flavor, if there is not enough space to write nMessages, then return
1170 * result with null addresses.
1171 */
1172 if ((flavor == kSynchronizedReadWrite && (availableToWrite() < nMessages)) ||
1173 nMessages > getQuantumCount()) {
1174 *result = MemTransaction();
1175 return false;
1176 }
1177
1178 auto writePtr = mWritePtr->load(std::memory_order_relaxed);
1179 if (writePtr % quantum() != 0) {
1180 std::string errorMsg =
1181 "The write pointer has become misaligned. Writing to the queue is not possible. "
1182 "Pointer: " +
1183 std::to_string(writePtr) + ", quantum: " + std::to_string(quantum());
1184 hardware::details::logError(errorMsg);
1185 hardware::details::errorWriteLog(0x534e4554, "184963385");
1186 if (mErrorHandler) {
1187 mErrorHandler(Error::POINTER_CORRUPTION, std::move(errorMsg));
1188 }
1189 return false;
1190 }
1191 size_t writeOffset = writePtr % mDesc->getSize();
1192
1193 /*
1194 * From writeOffset, the number of messages that can be written
1195 * contiguously without wrapping around the ring buffer are calculated.
1196 */
1197 size_t contiguousMessages = (mDesc->getSize() - writeOffset) / quantum();
1198
1199 if (contiguousMessages < nMessages) {
1200 /*
1201 * Wrap around is required. Both result.first and result.second are
1202 * populated.
1203 */
1204 *result = MemTransaction(
1205 MemRegion(reinterpret_cast<T*>(mRing + writeOffset), contiguousMessages),
1206 MemRegion(reinterpret_cast<T*>(mRing), nMessages - contiguousMessages));
1207 } else {
1208 /*
1209 * A wrap around is not required to write nMessages. Only result.first
1210 * is populated.
1211 */
1212 *result = MemTransaction(MemRegion(reinterpret_cast<T*>(mRing + writeOffset), nMessages),
1213 MemRegion());
1214 }
1215
1216 return true;
1217 }
1218
1219 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
1220 /*
1221 * Disable integer sanitization since integer overflow here is allowed
1222 * and legal.
1223 */
1224 __attribute__((no_sanitize("integer"))) bool
commitWrite(size_t nMessages)1225 MessageQueueBase<MQDescriptorType, T, flavor>::commitWrite(size_t nMessages) {
1226 size_t nBytesWritten = nMessages * quantum();
1227 auto writePtr = mWritePtr->load(std::memory_order_relaxed);
1228 writePtr += nBytesWritten;
1229 mWritePtr->store(writePtr, std::memory_order_release);
1230 /*
1231 * This method cannot fail now since we are only incrementing the writePtr
1232 * counter.
1233 */
1234 return true;
1235 }
1236
1237 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
availableToReadBytes()1238 size_t MessageQueueBase<MQDescriptorType, T, flavor>::availableToReadBytes() const {
1239 /*
1240 * This method is invoked by implementations of both read() and write() and
1241 * hence requires a memory_order_acquired load for both mReadPtr and
1242 * mWritePtr.
1243 */
1244 uint64_t writePtr = mWritePtr->load(std::memory_order_acquire);
1245 uint64_t readPtr = mReadPtr->load(std::memory_order_acquire);
1246 if (writePtr < readPtr) {
1247 std::string errorMsg =
1248 "The write or read pointer has become corrupted. Reading from the queue is no "
1249 "longer possible. Write pointer: " +
1250 std::to_string(writePtr) + ", read pointer: " + std::to_string(readPtr);
1251 hardware::details::logError(errorMsg);
1252 if (mErrorHandler) {
1253 mErrorHandler(Error::POINTER_CORRUPTION, std::move(errorMsg));
1254 }
1255 return 0;
1256 }
1257 return writePtr - readPtr;
1258 }
1259
1260 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
read(T * data,size_t nMessages)1261 bool MessageQueueBase<MQDescriptorType, T, flavor>::read(T* data, size_t nMessages) {
1262 MemTransaction tx;
1263 return beginRead(nMessages, &tx) &&
1264 tx.copyFromSized(data, 0 /* startIdx */, nMessages, quantum()) && commitRead(nMessages);
1265 }
1266
1267 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
1268 /*
1269 * Disable integer sanitization since integer overflow here is allowed
1270 * and legal.
1271 */
1272 __attribute__((no_sanitize("integer"))) bool
processOverflow(uint64_t readPtr,uint64_t writePtr)1273 MessageQueueBase<MQDescriptorType, T, flavor>::processOverflow(uint64_t readPtr,
1274 uint64_t writePtr) const {
1275 if (writePtr - readPtr > mDesc->getSize()) {
1276 /*
1277 * Preserved history can be as big as mDesc->getSize() but we expose only half of that.
1278 * Half of the buffer will be discarded to make space for fast writers and
1279 * reduce chance of repeated overflows. The other half is available to read.
1280 */
1281 size_t historyOffset = getQuantumCount() / 2 * getQuantumSize();
1282 mReadPtr->store(writePtr - historyOffset, std::memory_order_release);
1283 hardware::details::logError("Read failed after an overflow. Resetting read pointer.");
1284 return true;
1285 }
1286 return false;
1287 }
1288
1289 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
1290 /*
1291 * Disable integer sanitization since integer overflow here is allowed
1292 * and legal.
1293 */
1294 __attribute__((no_sanitize("integer"))) bool
beginRead(size_t nMessages,MemTransaction * result)1295 MessageQueueBase<MQDescriptorType, T, flavor>::beginRead(size_t nMessages,
1296 MemTransaction* result) const {
1297 *result = MemTransaction();
1298 /*
1299 * If it is detected that the data in the queue was overwritten
1300 * due to the reader process being too slow, the read pointer counter
1301 * is set to the same as the write pointer counter to indicate error
1302 * and the read returns false;
1303 * Need acquire/release memory ordering for mWritePtr.
1304 */
1305 auto writePtr = mWritePtr->load(std::memory_order_acquire);
1306 /*
1307 * A relaxed load is sufficient for mReadPtr since there will be no
1308 * stores to mReadPtr from a different thread.
1309 */
1310 auto readPtr = mReadPtr->load(std::memory_order_relaxed);
1311 if (writePtr % quantum() != 0 || readPtr % quantum() != 0) {
1312 hardware::details::logError(
1313 "The write or read pointer has become misaligned. Reading from the queue is no "
1314 "longer possible.");
1315 hardware::details::errorWriteLog(0x534e4554, "184963385");
1316 return false;
1317 }
1318
1319 if (processOverflow(readPtr, writePtr)) {
1320 return false;
1321 }
1322
1323 size_t nBytesDesired = nMessages * quantum();
1324 /*
1325 * Return if insufficient data to read in FMQ.
1326 */
1327 if (writePtr - readPtr < nBytesDesired) {
1328 return false;
1329 }
1330
1331 size_t readOffset = readPtr % mDesc->getSize();
1332 /*
1333 * From readOffset, the number of messages that can be read contiguously
1334 * without wrapping around the ring buffer are calculated.
1335 */
1336 size_t contiguousMessages = (mDesc->getSize() - readOffset) / quantum();
1337
1338 if (contiguousMessages < nMessages) {
1339 /*
1340 * A wrap around is required. Both result.first and result.second
1341 * are populated.
1342 */
1343 *result = MemTransaction(
1344 MemRegion(reinterpret_cast<T*>(mRing + readOffset), contiguousMessages),
1345 MemRegion(reinterpret_cast<T*>(mRing), nMessages - contiguousMessages));
1346 } else {
1347 /*
1348 * A wrap around is not required. Only result.first need to be
1349 * populated.
1350 */
1351 *result = MemTransaction(MemRegion(reinterpret_cast<T*>(mRing + readOffset), nMessages),
1352 MemRegion());
1353 }
1354
1355 return true;
1356 }
1357
1358 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
1359 /*
1360 * Disable integer sanitization since integer overflow here is allowed
1361 * and legal.
1362 */
1363 __attribute__((no_sanitize("integer"))) bool
commitRead(size_t nMessages)1364 MessageQueueBase<MQDescriptorType, T, flavor>::commitRead(size_t nMessages) {
1365 // TODO: Use a local copy of readPtr to avoid relazed mReadPtr loads.
1366 auto readPtr = mReadPtr->load(std::memory_order_relaxed);
1367 auto writePtr = mWritePtr->load(std::memory_order_acquire);
1368
1369 /*
1370 * If the flavor is unsynchronized, it is possible that a write overflow may
1371 * have occurred between beginRead() and commitRead().
1372 */
1373 if (processOverflow(readPtr, writePtr)) {
1374 return false;
1375 }
1376
1377 size_t nBytesRead = nMessages * quantum();
1378 readPtr += nBytesRead;
1379 mReadPtr->store(readPtr, std::memory_order_release);
1380 return true;
1381 }
1382
1383 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
getQuantumSize()1384 size_t MessageQueueBase<MQDescriptorType, T, flavor>::getQuantumSize() const {
1385 return mDesc->getQuantum();
1386 }
1387
1388 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
getQuantumCount()1389 size_t MessageQueueBase<MQDescriptorType, T, flavor>::getQuantumCount() const {
1390 return mDesc->getSize() / mDesc->getQuantum();
1391 }
1392
1393 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
isValid()1394 bool MessageQueueBase<MQDescriptorType, T, flavor>::isValid() const {
1395 return mRing != nullptr && mReadPtr != nullptr && mWritePtr != nullptr;
1396 }
1397
1398 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
mapGrantorDescr(uint32_t grantorIdx)1399 void* MessageQueueBase<MQDescriptorType, T, flavor>::mapGrantorDescr(uint32_t grantorIdx) {
1400 const native_handle_t* handle = mDesc->handle();
1401 const std::vector<android::hardware::GrantorDescriptor> grantors = mDesc->grantors();
1402 if (handle == nullptr) {
1403 hardware::details::logError("mDesc->handle is null");
1404 return nullptr;
1405 }
1406
1407 if (grantorIdx >= grantors.size()) {
1408 hardware::details::logError(std::string("grantorIdx must be less than ") +
1409 std::to_string(grantors.size()));
1410 return nullptr;
1411 }
1412
1413 int fdIndex = grantors[grantorIdx].fdIndex;
1414 if (fdIndex < 0 || fdIndex >= handle->numFds) {
1415 hardware::details::logError(
1416 std::string("fdIndex (" + std::to_string(fdIndex) + ") from grantor (index " +
1417 std::to_string(grantorIdx) +
1418 ") must be smaller than the number of fds in the handle: " +
1419 std::to_string(handle->numFds)));
1420 return nullptr;
1421 }
1422
1423 /*
1424 * Offset for mmap must be a multiple of kPageSize.
1425 */
1426 if (!hardware::details::isAlignedToWordBoundary(grantors[grantorIdx].offset)) {
1427 hardware::details::logError("Grantor (index " + std::to_string(grantorIdx) +
1428 ") offset needs to be aligned to word boundary but is: " +
1429 std::to_string(grantors[grantorIdx].offset));
1430 return nullptr;
1431 }
1432
1433 /*
1434 * Expect some grantors to be at least a min size
1435 */
1436 for (uint32_t i = 0; i < grantors.size(); i++) {
1437 switch (i) {
1438 case hardware::details::READPTRPOS:
1439 if (grantors[i].extent < sizeof(uint64_t)) return nullptr;
1440 break;
1441 case hardware::details::WRITEPTRPOS:
1442 if (grantors[i].extent < sizeof(uint64_t)) return nullptr;
1443 break;
1444 case hardware::details::DATAPTRPOS:
1445 // We don't expect specific data size
1446 break;
1447 case hardware::details::EVFLAGWORDPOS:
1448 if (grantors[i].extent < sizeof(uint32_t)) return nullptr;
1449 break;
1450 default:
1451 // We don't care about unknown grantors
1452 break;
1453 }
1454 }
1455
1456 int mapOffset = (grantors[grantorIdx].offset / kPageSize) * kPageSize;
1457 if (grantors[grantorIdx].extent < 0 || grantors[grantorIdx].extent > INT_MAX - kPageSize) {
1458 hardware::details::logError(std::string("Grantor (index " + std::to_string(grantorIdx) +
1459 ") extent value is too large or negative: " +
1460 std::to_string(grantors[grantorIdx].extent)));
1461 return nullptr;
1462 }
1463 int mapLength = grantors[grantorIdx].offset - mapOffset + grantors[grantorIdx].extent;
1464
1465 void* address = mmap(0, mapLength, PROT_READ | PROT_WRITE, MAP_SHARED, handle->data[fdIndex],
1466 mapOffset);
1467 if (address == MAP_FAILED && errno == EPERM && flavor == kUnsynchronizedWrite) {
1468 // If the supplied memory is read-only, it would fail with EPERM.
1469 // Try again to mmap read-only for the kUnsynchronizedWrite case.
1470 // kSynchronizedReadWrite cannot use read-only memory because the
1471 // read pointer is stored in the shared memory as well.
1472 address = mmap(0, mapLength, PROT_READ, MAP_SHARED, handle->data[fdIndex], mapOffset);
1473 }
1474 if (address == MAP_FAILED) {
1475 hardware::details::logError(std::string("mmap failed: ") + std::to_string(errno));
1476 return nullptr;
1477 }
1478 return reinterpret_cast<uint8_t*>(address) + (grantors[grantorIdx].offset - mapOffset);
1479 }
1480
1481 template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
unmapGrantorDescr(void * address,uint32_t grantorIdx)1482 void MessageQueueBase<MQDescriptorType, T, flavor>::unmapGrantorDescr(void* address,
1483 uint32_t grantorIdx) {
1484 auto grantors = mDesc->grantors();
1485 if ((address == nullptr) || (grantorIdx >= grantors.size())) {
1486 return;
1487 }
1488
1489 int mapOffset = (grantors[grantorIdx].offset / kPageSize) * kPageSize;
1490 int mapLength = grantors[grantorIdx].offset - mapOffset + grantors[grantorIdx].extent;
1491 void* baseAddress =
1492 reinterpret_cast<uint8_t*>(address) - (grantors[grantorIdx].offset - mapOffset);
1493 if (baseAddress) munmap(baseAddress, mapLength);
1494 }
1495
1496 } // namespace android
1497