1 /*
2  * Copyright (C) 2020 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <condition_variable>
20 #include <functional>
21 #include <memory>
22 #include <vector>
23 
24 #include "common/libs/concurrency/semaphore.h"
25 #include "common/libs/concurrency/thread_safe_queue.h"
26 
27 namespace cuttlefish {
28 template <typename T, typename Queue>
29 class Multiplexer {
30  public:
31   using QueuePtr = std::unique_ptr<Queue>;
32   using QueueSelector = std::function<int(void)>;
33 
34   template <typename... Args>
CreateQueue(Args &&...args)35   static QueuePtr CreateQueue(Args&&... args) {
36     auto raw_ptr = new Queue(std::forward<Args>(args)...);
37     return QueuePtr(raw_ptr);
38   }
39 
Multiplexer()40   Multiplexer() : sem_items_{0} {}
41 
RegisterQueue(QueuePtr && queue)42   int RegisterQueue(QueuePtr&& queue) {
43     const int id_to_return = queues_.size();
44     queues_.push_back(std::move(queue));
45     return id_to_return;
46   }
47 
Push(const int idx,T && t)48   void Push(const int idx, T&& t) {
49     CheckIdx(idx);
50     queues_[idx]->Push(std::move(t));
51     sem_items_.SemPost();
52   }
53 
Pop(QueueSelector selector)54   T Pop(QueueSelector selector) {
55     SemWait();
56     int q_id = selector();
57     CheckIdx(q_id);  // check, if weird, will die there
58     QueuePtr& queue = queues_[q_id];
59     CHECK(queue) << "queue must not be null.";
60     return queue->Pop();
61   }
62 
Pop()63   T Pop() {
64     auto default_selector = [this]() -> int {
65       for (int i = 0; i < queues_.size(); i++) {
66         if (!queues_[i]->IsEmpty()) {
67           return i;
68         }
69       }
70       return -1;
71     };
72     return Pop(default_selector);
73   }
74 
IsEmpty(const int idx)75   bool IsEmpty(const int idx) { return queues_[idx]->IsEmpty(); }
76 
SemWait()77   void SemWait() { sem_items_.SemWait(); }
78 
79  private:
CheckIdx(const int idx)80   void CheckIdx(const int idx) {
81     CHECK(idx >= 0 && idx < queues_.size()) << "queues_ array out of bound";
82   }
83   // total items across the queues
84   Semaphore sem_items_;
85   std::vector<QueuePtr> queues_;
86   QueuePtr null_ptr_;
87 };
88 }  // end of namespace cuttlefish
89