1 /*
2  * Copyright (C) 2020 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <condition_variable>
20 #include <deque>
21 #include <memory>
22 #include <mutex>
23 #include <thread>
24 
25 #include "common/libs/concurrency/semaphore.h"
26 
27 namespace cuttlefish {
28 // move-based concurrent queue
29 template <typename T>
30 class ScreenConnectorQueue {
31  public:
32   static_assert(is_movable<T>::value,
33                 "Items in ScreenConnectorQueue should be std::mov-able");
34 
35   ScreenConnectorQueue(const int q_max_size = 2)
q_mutex_(std::make_unique<std::mutex> ())36       : q_mutex_(std::make_unique<std::mutex>()), q_max_size_{q_max_size} {}
37   ScreenConnectorQueue(ScreenConnectorQueue&& cq) = delete;
38   ScreenConnectorQueue(const ScreenConnectorQueue& cq) = delete;
39   ScreenConnectorQueue& operator=(const ScreenConnectorQueue& cq) = delete;
40   ScreenConnectorQueue& operator=(ScreenConnectorQueue&& cq) = delete;
41 
IsEmpty()42   bool IsEmpty() const {
43     const std::lock_guard<std::mutex> lock(*q_mutex_);
44     return buffer_.empty();
45   }
46 
Size()47   auto Size() const {
48     const std::lock_guard<std::mutex> lock(*q_mutex_);
49     return buffer_.size();
50   }
51 
WaitEmpty()52   void WaitEmpty() {
53     auto is_empty = [this](void) { return buffer_.empty(); };
54     std::unique_lock<std::mutex> lock(*q_mutex_);
55     q_empty_.wait(lock, is_empty);
56   }
57 
58   /*
59    * Push( std::move(src) );
60    *
61    * Note: this queue is supposed to be used only by ScreenConnector-
62    * related components such as ScreenConnectorSource
63    *
64    * The traditional assumption was that when webRTC calls
65    * OnFrameAfter, the call should be block until it could return
66    * one frame.
67    *
68    * Thus, the producers of this queue must not produce frames
69    * much faster than the consumer, WebRTC consumes.
70    * Therefore, when the small buffer is full -- which means
71    * WebRTC would not call OnNextFrame --, the producer
72    * should stop adding items to the queue.
73    *
74    */
Push(T && item)75   void Push(T&& item) {
76     std::unique_lock<std::mutex> lock(*q_mutex_);
77     if (Full()) {
78       auto is_empty = [this](void) { return buffer_.empty(); };
79       q_empty_.wait(lock, is_empty);
80     }
81     buffer_.push_back(std::move(item));
82   }
83   void Push(T& item) = delete;
84   void Push(const T& item) = delete;
85 
Pop()86   T Pop() {
87     const std::lock_guard<std::mutex> lock(*q_mutex_);
88     auto item = std::move(buffer_.front());
89     buffer_.pop_front();
90     if (buffer_.empty()) {
91       q_empty_.notify_all();
92     }
93     return item;
94   }
95 
96  private:
Full()97   bool Full() const {
98     // call this in a critical section
99     // after acquiring q_mutex_
100     return q_max_size_ == buffer_.size();
101   }
102   std::deque<T> buffer_;
103   std::unique_ptr<std::mutex> q_mutex_;
104   std::condition_variable q_empty_;
105   const int q_max_size_;
106 };
107 
108 }  // namespace cuttlefish
109