1 /* 2 * Copyright (C) 2020 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #pragma once 18 19 #include <condition_variable> 20 #include <functional> 21 #include <memory> 22 #include <vector> 23 24 #include "common/libs/concurrency/semaphore.h" 25 #include "common/libs/concurrency/thread_safe_queue.h" 26 27 namespace cuttlefish { 28 template <typename T, typename Queue> 29 class Multiplexer { 30 public: 31 using QueuePtr = std::unique_ptr<Queue>; 32 using QueueSelector = std::function<int(void)>; 33 34 template <typename... Args> CreateQueue(Args &&...args)35 static QueuePtr CreateQueue(Args&&... args) { 36 auto raw_ptr = new Queue(std::forward<Args>(args)...); 37 return QueuePtr(raw_ptr); 38 } 39 Multiplexer()40 Multiplexer() : sem_items_{0} {} 41 RegisterQueue(QueuePtr && queue)42 int RegisterQueue(QueuePtr&& queue) { 43 const int id_to_return = queues_.size(); 44 queues_.push_back(std::move(queue)); 45 return id_to_return; 46 } 47 Push(const int idx,T && t)48 void Push(const int idx, T&& t) { 49 CheckIdx(idx); 50 queues_[idx]->Push(std::move(t)); 51 sem_items_.SemPost(); 52 } 53 Pop(QueueSelector selector)54 T Pop(QueueSelector selector) { 55 SemWait(); 56 int q_id = selector(); 57 CheckIdx(q_id); // check, if weird, will die there 58 QueuePtr& queue = queues_[q_id]; 59 CHECK(queue) << "queue must not be null."; 60 return queue->Pop(); 61 } 62 Pop()63 T Pop() { 64 auto default_selector = [this]() -> int { 65 for (int i = 0; i < queues_.size(); i++) { 66 if (!queues_[i]->IsEmpty()) { 67 return i; 68 } 69 } 70 return -1; 71 }; 72 return Pop(default_selector); 73 } 74 IsEmpty(const int idx)75 bool IsEmpty(const int idx) { return queues_[idx]->IsEmpty(); } 76 SemWait()77 void SemWait() { sem_items_.SemWait(); } 78 79 private: CheckIdx(const int idx)80 void CheckIdx(const int idx) { 81 CHECK(idx >= 0 && idx < queues_.size()) << "queues_ array out of bound"; 82 } 83 // total items across the queues 84 Semaphore sem_items_; 85 std::vector<QueuePtr> queues_; 86 QueuePtr null_ptr_; 87 }; 88 } // end of namespace cuttlefish 89