1 /* 2 * Copyright 2022 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #pragma once 18 19 #include <atomic> 20 #include <optional> 21 22 // Single consumer multi producer queue. We can understand the two operations independently to see 23 // why they are without race condition. 24 // 25 // push is responsible for maintaining a linked list stored in mPush, and called from multiple 26 // threads without lock. We can see that if two threads never observe the same value from 27 // mPush.load, it just functions as a normal linked list. In the case where two threads observe the 28 // same value, one of them has to execute the compare_exchange first. The one that doesn't execute 29 // the compare exchange first, will receive false from compare_exchange. previousHead is updated (by 30 // compare_exchange) to the most recent value of mPush, and we try again. It's relatively clear to 31 // see that the process can repeat with an arbitrary number of threads. 32 // 33 // Pop is much simpler. If mPop is empty (as it begins) it atomically exchanges 34 // the entire push list with null. This is safe, since the only other reader (push) 35 // of mPush will retry if it changes in between it's read and atomic compare. We 36 // then store the list and pop one element. 37 // 38 // If we already had something in the pop list we just pop directly. 39 template <typename T> 40 class LocklessQueue { 41 public: isEmpty()42 bool isEmpty() { return (mPush.load() == nullptr) && (mPop.load() == nullptr); } 43 push(T value)44 void push(T value) { 45 Entry* entry = new Entry(std::move(value)); 46 Entry* previousHead = mPush.load(/*std::memory_order_relaxed*/); 47 do { 48 entry->mNext = previousHead; 49 } while (!mPush.compare_exchange_weak(previousHead, entry)); /*std::memory_order_release*/ 50 } 51 pop()52 std::optional<T> pop() { 53 Entry* popped = mPop.load(/*std::memory_order_acquire*/); 54 if (popped) { 55 // Single consumer so this is fine 56 mPop.store(popped->mNext /* , std::memory_order_release */); 57 auto value = std::move(popped->mValue); 58 delete popped; 59 return value; 60 } else { 61 Entry* grabbedList = mPush.exchange(nullptr /* , std::memory_order_acquire */); 62 if (!grabbedList) return std::nullopt; 63 // Reverse the list 64 while (grabbedList->mNext) { 65 Entry* next = grabbedList->mNext; 66 grabbedList->mNext = popped; 67 popped = grabbedList; 68 grabbedList = next; 69 } 70 mPop.store(popped /* , std::memory_order_release */); 71 auto value = std::move(grabbedList->mValue); 72 delete grabbedList; 73 return value; 74 } 75 } 76 77 private: 78 class Entry { 79 public: 80 T mValue; 81 std::atomic<Entry*> mNext; Entry(T value)82 Entry(T value) : mValue(value) {} 83 }; 84 std::atomic<Entry*> mPush = nullptr; 85 std::atomic<Entry*> mPop = nullptr; 86 }; 87