1*795d594fSAndroid Build Coastguard Worker /* 2*795d594fSAndroid Build Coastguard Worker * Copyright (C) 2020 The Android Open Source Project 3*795d594fSAndroid Build Coastguard Worker * 4*795d594fSAndroid Build Coastguard Worker * Licensed under the Apache License, Version 2.0 (the "License"); 5*795d594fSAndroid Build Coastguard Worker * you may not use this file except in compliance with the License. 6*795d594fSAndroid Build Coastguard Worker * You may obtain a copy of the License at 7*795d594fSAndroid Build Coastguard Worker * 8*795d594fSAndroid Build Coastguard Worker * http://www.apache.org/licenses/LICENSE-2.0 9*795d594fSAndroid Build Coastguard Worker * 10*795d594fSAndroid Build Coastguard Worker * Unless required by applicable law or agreed to in writing, software 11*795d594fSAndroid Build Coastguard Worker * distributed under the License is distributed on an "AS IS" BASIS, 12*795d594fSAndroid Build Coastguard Worker * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13*795d594fSAndroid Build Coastguard Worker * See the License for the specific language governing permissions and 14*795d594fSAndroid Build Coastguard Worker * limitations under the License. 15*795d594fSAndroid Build Coastguard Worker */ 16*795d594fSAndroid Build Coastguard Worker 17*795d594fSAndroid Build Coastguard Worker #ifndef ART_RUNTIME_BASE_MESSAGE_QUEUE_H_ 18*795d594fSAndroid Build Coastguard Worker #define ART_RUNTIME_BASE_MESSAGE_QUEUE_H_ 19*795d594fSAndroid Build Coastguard Worker 20*795d594fSAndroid Build Coastguard Worker #include <deque> 21*795d594fSAndroid Build Coastguard Worker #include <optional> 22*795d594fSAndroid Build Coastguard Worker #include <variant> 23*795d594fSAndroid Build Coastguard Worker 24*795d594fSAndroid Build Coastguard Worker #include "base/time_utils.h" 25*795d594fSAndroid Build Coastguard Worker #include "mutex.h" 26*795d594fSAndroid Build Coastguard Worker #include "thread.h" 27*795d594fSAndroid Build Coastguard Worker 28*795d594fSAndroid Build Coastguard Worker #pragma clang diagnostic push 29*795d594fSAndroid Build Coastguard Worker #pragma clang diagnostic error "-Wconversion" 30*795d594fSAndroid Build Coastguard Worker 31*795d594fSAndroid Build Coastguard Worker namespace art HIDDEN { 32*795d594fSAndroid Build Coastguard Worker 33*795d594fSAndroid Build Coastguard Worker struct TimeoutExpiredMessage {}; 34*795d594fSAndroid Build Coastguard Worker 35*795d594fSAndroid Build Coastguard Worker // MessageQueue is an unbounded multiple producer, multiple consumer (MPMC) queue that can be 36*795d594fSAndroid Build Coastguard Worker // specialized to send messages between threads. The queue is parameterized by a set of types that 37*795d594fSAndroid Build Coastguard Worker // serve as the message types. Note that messages are passed by value, so smaller messages should be 38*795d594fSAndroid Build Coastguard Worker // used when possible. 39*795d594fSAndroid Build Coastguard Worker // 40*795d594fSAndroid Build Coastguard Worker // Example: 41*795d594fSAndroid Build Coastguard Worker // 42*795d594fSAndroid Build Coastguard Worker // struct IntMessage { int value; }; 43*795d594fSAndroid Build Coastguard Worker // struct DoubleMessage { double value; }; 44*795d594fSAndroid Build Coastguard Worker // 45*795d594fSAndroid Build Coastguard Worker // MessageQueue<IntMessage, DoubleMessage> queue; 46*795d594fSAndroid Build Coastguard Worker // 47*795d594fSAndroid Build Coastguard Worker // queue.SendMessage(IntMessage{42}); 48*795d594fSAndroid Build Coastguard Worker // queue.SendMessage(DoubleMessage{42.0}); 49*795d594fSAndroid Build Coastguard Worker // 50*795d594fSAndroid Build Coastguard Worker // auto message = queue.ReceiveMessage(); // message is a std::variant of the different 51*795d594fSAndroid Build Coastguard Worker // // message types. 52*795d594fSAndroid Build Coastguard Worker // 53*795d594fSAndroid Build Coastguard Worker // if (std::holds_alternative<IntMessage>(message)) { 54*795d594fSAndroid Build Coastguard Worker // cout << "Received int message with value " << std::get<IntMessage>(message) << "\n"; 55*795d594fSAndroid Build Coastguard Worker // } 56*795d594fSAndroid Build Coastguard Worker // 57*795d594fSAndroid Build Coastguard Worker // The message queue also supports a special timeout message. This is scheduled to be sent by the 58*795d594fSAndroid Build Coastguard Worker // SetTimeout method, which will cause the MessageQueue to deliver a TimeoutExpiredMessage after the 59*795d594fSAndroid Build Coastguard Worker // time period has elapsed. Note that only one timeout can be active can be active at a time, and 60*795d594fSAndroid Build Coastguard Worker // subsequent calls to SetTimeout will overwrite any existing timeout. 61*795d594fSAndroid Build Coastguard Worker // 62*795d594fSAndroid Build Coastguard Worker // Example: 63*795d594fSAndroid Build Coastguard Worker // 64*795d594fSAndroid Build Coastguard Worker // queue.SetTimeout(5000); // request to send TimeoutExpiredMessage in 5000ms. 65*795d594fSAndroid Build Coastguard Worker // 66*795d594fSAndroid Build Coastguard Worker // auto message = queue.ReceiveMessage(); // blocks for 5000ms and returns 67*795d594fSAndroid Build Coastguard Worker // // TimeoutExpiredMessage 68*795d594fSAndroid Build Coastguard Worker // 69*795d594fSAndroid Build Coastguard Worker // Note additional messages can be sent in the meantime and a ReceiveMessage call will wake up to 70*795d594fSAndroid Build Coastguard Worker // return that message. The TimeoutExpiredMessage will still be sent at the right time. 71*795d594fSAndroid Build Coastguard Worker // 72*795d594fSAndroid Build Coastguard Worker // Finally, MessageQueue has a SwitchReceive method that can be used to run different code depending 73*795d594fSAndroid Build Coastguard Worker // on the type of message received. SwitchReceive takes a set of lambda expressions that take one 74*795d594fSAndroid Build Coastguard Worker // argument of one of the allowed message types. An additional lambda expression that takes a single 75*795d594fSAndroid Build Coastguard Worker // auto argument can be used to serve as a catch-all case. 76*795d594fSAndroid Build Coastguard Worker // 77*795d594fSAndroid Build Coastguard Worker // Example: 78*795d594fSAndroid Build Coastguard Worker // 79*795d594fSAndroid Build Coastguard Worker // queue.SwitchReceive( 80*795d594fSAndroid Build Coastguard Worker // [&](IntMessage message) { 81*795d594fSAndroid Build Coastguard Worker // cout << "Received int: " << message.value << "\n"; 82*795d594fSAndroid Build Coastguard Worker // }, 83*795d594fSAndroid Build Coastguard Worker // [&](DoubleMessage message) { 84*795d594fSAndroid Build Coastguard Worker // cout << "Received double: " << message.value << "\n"; 85*795d594fSAndroid Build Coastguard Worker // }, 86*795d594fSAndroid Build Coastguard Worker // [&](auto other_message) { 87*795d594fSAndroid Build Coastguard Worker // // Another message was received. In this case, it's TimeoutExpiredMessage. 88*795d594fSAndroid Build Coastguard Worker // } 89*795d594fSAndroid Build Coastguard Worker // ) 90*795d594fSAndroid Build Coastguard Worker // 91*795d594fSAndroid Build Coastguard Worker // For additional examples, see message_queue_test.cc. 92*795d594fSAndroid Build Coastguard Worker template <typename... MessageTypes> 93*795d594fSAndroid Build Coastguard Worker class MessageQueue { 94*795d594fSAndroid Build Coastguard Worker public: 95*795d594fSAndroid Build Coastguard Worker using Message = std::variant<TimeoutExpiredMessage, MessageTypes...>; 96*795d594fSAndroid Build Coastguard Worker 97*795d594fSAndroid Build Coastguard Worker // Adds a message to the message queue, which can later be received with ReceiveMessage. See class 98*795d594fSAndroid Build Coastguard Worker // comment for more details. SendMessage(Message message)99*795d594fSAndroid Build Coastguard Worker void SendMessage(Message message) { 100*795d594fSAndroid Build Coastguard Worker // TimeoutExpiredMessage should not be sent manually. 101*795d594fSAndroid Build Coastguard Worker DCHECK(!std::holds_alternative<TimeoutExpiredMessage>(message)); 102*795d594fSAndroid Build Coastguard Worker Thread* self = Thread::Current(); 103*795d594fSAndroid Build Coastguard Worker MutexLock lock{self, mutex_}; 104*795d594fSAndroid Build Coastguard Worker messages_.push_back(message); 105*795d594fSAndroid Build Coastguard Worker cv_.Signal(self); 106*795d594fSAndroid Build Coastguard Worker } 107*795d594fSAndroid Build Coastguard Worker 108*795d594fSAndroid Build Coastguard Worker // Schedule a TimeoutExpiredMessage to be delivered in timeout_milliseconds. See class comment for 109*795d594fSAndroid Build Coastguard Worker // more details. SetTimeout(uint64_t timeout_milliseconds)110*795d594fSAndroid Build Coastguard Worker void SetTimeout(uint64_t timeout_milliseconds) { 111*795d594fSAndroid Build Coastguard Worker Thread* self = Thread::Current(); 112*795d594fSAndroid Build Coastguard Worker MutexLock lock{self, mutex_}; 113*795d594fSAndroid Build Coastguard Worker deadline_milliseconds_ = timeout_milliseconds + MilliTime(); 114*795d594fSAndroid Build Coastguard Worker cv_.Signal(self); 115*795d594fSAndroid Build Coastguard Worker } 116*795d594fSAndroid Build Coastguard Worker 117*795d594fSAndroid Build Coastguard Worker // Remove and return a message from the queue. If no message is available, ReceiveMessage will 118*795d594fSAndroid Build Coastguard Worker // block until one becomes available. See class comment for more details. ReceiveMessage()119*795d594fSAndroid Build Coastguard Worker Message ReceiveMessage() { 120*795d594fSAndroid Build Coastguard Worker Thread* self = Thread::Current(); 121*795d594fSAndroid Build Coastguard Worker MutexLock lock{self, mutex_}; 122*795d594fSAndroid Build Coastguard Worker 123*795d594fSAndroid Build Coastguard Worker // Loop until we receive a message 124*795d594fSAndroid Build Coastguard Worker while (true) { 125*795d594fSAndroid Build Coastguard Worker uint64_t const current_time = MilliTime(); 126*795d594fSAndroid Build Coastguard Worker // First check if the deadline has passed. 127*795d594fSAndroid Build Coastguard Worker if (deadline_milliseconds_.has_value() && deadline_milliseconds_.value() < current_time) { 128*795d594fSAndroid Build Coastguard Worker deadline_milliseconds_.reset(); 129*795d594fSAndroid Build Coastguard Worker return TimeoutExpiredMessage{}; 130*795d594fSAndroid Build Coastguard Worker } 131*795d594fSAndroid Build Coastguard Worker 132*795d594fSAndroid Build Coastguard Worker // Check if there is a message in the queue. 133*795d594fSAndroid Build Coastguard Worker if (messages_.size() > 0) { 134*795d594fSAndroid Build Coastguard Worker Message message = messages_.front(); 135*795d594fSAndroid Build Coastguard Worker messages_.pop_front(); 136*795d594fSAndroid Build Coastguard Worker return message; 137*795d594fSAndroid Build Coastguard Worker } 138*795d594fSAndroid Build Coastguard Worker 139*795d594fSAndroid Build Coastguard Worker // Otherwise, wait until we have a message or a timeout. 140*795d594fSAndroid Build Coastguard Worker if (deadline_milliseconds_.has_value()) { 141*795d594fSAndroid Build Coastguard Worker DCHECK_LE(current_time, deadline_milliseconds_.value()); 142*795d594fSAndroid Build Coastguard Worker int64_t timeout = static_cast<int64_t>(deadline_milliseconds_.value() - current_time); 143*795d594fSAndroid Build Coastguard Worker cv_.TimedWait(self, timeout, /*ns=*/0); 144*795d594fSAndroid Build Coastguard Worker } else { 145*795d594fSAndroid Build Coastguard Worker cv_.Wait(self); 146*795d594fSAndroid Build Coastguard Worker } 147*795d594fSAndroid Build Coastguard Worker } 148*795d594fSAndroid Build Coastguard Worker } 149*795d594fSAndroid Build Coastguard Worker 150*795d594fSAndroid Build Coastguard Worker // Waits for a message and applies the appropriate function argument to the received message. See 151*795d594fSAndroid Build Coastguard Worker // class comment for more details. 152*795d594fSAndroid Build Coastguard Worker template <typename ReturnType = void, typename... Fn> SwitchReceive(Fn...case_fn)153*795d594fSAndroid Build Coastguard Worker ReturnType SwitchReceive(Fn... case_fn) { 154*795d594fSAndroid Build Coastguard Worker struct Matcher : Fn... { 155*795d594fSAndroid Build Coastguard Worker using Fn::operator()...; 156*795d594fSAndroid Build Coastguard Worker } matcher{case_fn...}; 157*795d594fSAndroid Build Coastguard Worker return std::visit(matcher, ReceiveMessage()); 158*795d594fSAndroid Build Coastguard Worker } 159*795d594fSAndroid Build Coastguard Worker 160*795d594fSAndroid Build Coastguard Worker private: 161*795d594fSAndroid Build Coastguard Worker Mutex mutex_{"MessageQueue Mutex"}; 162*795d594fSAndroid Build Coastguard Worker ConditionVariable cv_{"MessageQueue ConditionVariable", mutex_}; 163*795d594fSAndroid Build Coastguard Worker 164*795d594fSAndroid Build Coastguard Worker std::deque<Message> messages_ GUARDED_BY(mutex_); 165*795d594fSAndroid Build Coastguard Worker std::optional<uint64_t> deadline_milliseconds_ GUARDED_BY(mutex_); 166*795d594fSAndroid Build Coastguard Worker }; 167*795d594fSAndroid Build Coastguard Worker 168*795d594fSAndroid Build Coastguard Worker } // namespace art 169*795d594fSAndroid Build Coastguard Worker 170*795d594fSAndroid Build Coastguard Worker #pragma clang diagnostic pop // -Wconversion 171*795d594fSAndroid Build Coastguard Worker 172*795d594fSAndroid Build Coastguard Worker #endif // ART_RUNTIME_BASE_MESSAGE_QUEUE_H_ 173