1 // Copyright 2019 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/message_loop/message_pump_kqueue.h"
6
7 #include <mach/mach.h>
8 #include <mach/message.h>
9
10 #include <utility>
11
12 #include "base/functional/bind.h"
13 #include "base/memory/ptr_util.h"
14 #include "base/memory/raw_ptr.h"
15 #include "base/memory/scoped_refptr.h"
16 #include "base/run_loop.h"
17 #include "base/task/single_thread_task_executor.h"
18 #include "base/task/single_thread_task_runner.h"
19 #include "testing/gtest/include/gtest/gtest.h"
20
21 namespace base {
22 namespace {
23
24 class MessagePumpKqueueTest : public testing::Test {
25 public:
MessagePumpKqueueTest()26 MessagePumpKqueueTest()
27 : pump_(new MessagePumpKqueue()), executor_(WrapUnique(pump_.get())) {}
28
pump()29 MessagePumpKqueue* pump() { return pump_; }
30
CreatePortPair(apple::ScopedMachReceiveRight * receive,apple::ScopedMachSendRight * send)31 static void CreatePortPair(apple::ScopedMachReceiveRight* receive,
32 apple::ScopedMachSendRight* send) {
33 mach_port_options_t options{};
34 options.flags = MPO_INSERT_SEND_RIGHT;
35 apple::ScopedMachReceiveRight port;
36 kern_return_t kr = mach_port_construct(
37 mach_task_self(), &options, 0,
38 apple::ScopedMachReceiveRight::Receiver(*receive).get());
39 ASSERT_EQ(kr, KERN_SUCCESS);
40 *send = apple::ScopedMachSendRight(receive->get());
41 }
42
SendEmptyMessage(mach_port_t remote_port,mach_msg_id_t msgid)43 static mach_msg_return_t SendEmptyMessage(mach_port_t remote_port,
44 mach_msg_id_t msgid) {
45 mach_msg_empty_send_t message{};
46 message.header.msgh_bits = MACH_MSGH_BITS_REMOTE(MACH_MSG_TYPE_COPY_SEND);
47 message.header.msgh_size = sizeof(message);
48 message.header.msgh_remote_port = remote_port;
49 message.header.msgh_id = msgid;
50 return mach_msg_send(&message.header);
51 }
52
53 private:
54 raw_ptr<MessagePumpKqueue, DanglingUntriaged>
55 pump_; // Weak, owned by |executor_|.
56 SingleThreadTaskExecutor executor_;
57 };
58
59 class PortWatcher : public MessagePumpKqueue::MachPortWatcher {
60 public:
PortWatcher(RepeatingClosure callback)61 PortWatcher(RepeatingClosure callback) : callback_(std::move(callback)) {}
~PortWatcher()62 ~PortWatcher() override {}
63
OnMachMessageReceived(mach_port_t port)64 void OnMachMessageReceived(mach_port_t port) override {
65 mach_msg_empty_rcv_t message{};
66 kern_return_t kr = mach_msg(&message.header, MACH_RCV_MSG, 0,
67 sizeof(message), port, 0, MACH_PORT_NULL);
68 ASSERT_EQ(kr, KERN_SUCCESS);
69
70 messages_.push_back(message.header);
71
72 callback_.Run();
73 }
74
75 std::vector<mach_msg_header_t> messages_;
76
77 private:
78 RepeatingClosure callback_;
79 };
80
TEST_F(MessagePumpKqueueTest,MachPortBasicWatch)81 TEST_F(MessagePumpKqueueTest, MachPortBasicWatch) {
82 apple::ScopedMachReceiveRight port;
83 apple::ScopedMachSendRight send_right;
84 CreatePortPair(&port, &send_right);
85
86 mach_msg_id_t msgid = 'helo';
87
88 RunLoop run_loop;
89 PortWatcher watcher(run_loop.QuitClosure());
90 MessagePumpKqueue::MachPortWatchController controller(FROM_HERE);
91
92 SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
93 FROM_HERE, BindOnce(
94 [](mach_port_t port, mach_msg_id_t msgid, RunLoop* loop) {
95 mach_msg_return_t kr = SendEmptyMessage(port, msgid);
96 EXPECT_EQ(kr, KERN_SUCCESS);
97 if (kr != KERN_SUCCESS) {
98 loop->Quit();
99 }
100 },
101 port.get(), msgid, Unretained(&run_loop)));
102
103 pump()->WatchMachReceivePort(port.get(), &controller, &watcher);
104
105 run_loop.Run();
106
107 ASSERT_EQ(1u, watcher.messages_.size());
108 EXPECT_EQ(port.get(), watcher.messages_[0].msgh_local_port);
109 EXPECT_EQ(msgid, watcher.messages_[0].msgh_id);
110 }
111
TEST_F(MessagePumpKqueueTest,MachPortStopWatching)112 TEST_F(MessagePumpKqueueTest, MachPortStopWatching) {
113 apple::ScopedMachReceiveRight port;
114 apple::ScopedMachSendRight send_right;
115 CreatePortPair(&port, &send_right);
116
117 RunLoop run_loop;
118 PortWatcher watcher(run_loop.QuitClosure());
119 MessagePumpKqueue::MachPortWatchController controller(FROM_HERE);
120
121 pump()->WatchMachReceivePort(port.get(), &controller, &watcher);
122
123 SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
124 FROM_HERE,
125 BindOnce(
126 [](MessagePumpKqueue::MachPortWatchController* controller) {
127 controller->StopWatchingMachPort();
128 },
129 Unretained(&controller)));
130
131 SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
132 FROM_HERE, BindOnce(
133 [](mach_port_t port) {
134 EXPECT_EQ(KERN_SUCCESS, SendEmptyMessage(port, 100));
135 },
136 port.get()));
137
138 run_loop.RunUntilIdle();
139
140 EXPECT_EQ(0u, watcher.messages_.size());
141 }
142
TEST_F(MessagePumpKqueueTest,MultipleMachWatchers)143 TEST_F(MessagePumpKqueueTest, MultipleMachWatchers) {
144 apple::ScopedMachReceiveRight port1, port2;
145 apple::ScopedMachSendRight send_right1, send_right2;
146 CreatePortPair(&port1, &send_right1);
147 CreatePortPair(&port2, &send_right2);
148
149 RunLoop run_loop;
150
151 int port1_count = 0, port2_count = 0;
152
153 // Whenever port1 receives a message, it will send to port2.
154 // Whenever port2 receives a message, it will send to port1.
155 // When port2 has sent 3 messages to port1, it will stop.
156
157 PortWatcher watcher1(BindRepeating(
158 [](mach_port_t port2, int* port2_count, RunLoop* loop) {
159 mach_msg_id_t id = (0x2 << 16) | ++(*port2_count);
160 mach_msg_return_t kr = SendEmptyMessage(port2, id);
161 EXPECT_EQ(kr, KERN_SUCCESS);
162 if (kr != KERN_SUCCESS) {
163 loop->Quit();
164 }
165 },
166 port2.get(), &port2_count, &run_loop));
167 MessagePumpKqueue::MachPortWatchController controller1(FROM_HERE);
168
169 PortWatcher watcher2(BindRepeating(
170 [](mach_port_t port1, int* port1_count, RunLoop* loop) {
171 if (*port1_count == 3) {
172 loop->Quit();
173 return;
174 }
175 mach_msg_id_t id = (0x1 << 16) | ++(*port1_count);
176 mach_msg_return_t kr = SendEmptyMessage(port1, id);
177 EXPECT_EQ(kr, KERN_SUCCESS);
178 if (kr != KERN_SUCCESS) {
179 loop->Quit();
180 }
181 },
182 port1.get(), &port1_count, &run_loop));
183 MessagePumpKqueue::MachPortWatchController controller2(FROM_HERE);
184
185 pump()->WatchMachReceivePort(port1.get(), &controller1, &watcher1);
186 pump()->WatchMachReceivePort(port2.get(), &controller2, &watcher2);
187
188 // Start ping-ponging with by sending the first message to port1.
189 SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
190 FROM_HERE, BindOnce(
191 [](mach_port_t port1) {
192 ASSERT_EQ(KERN_SUCCESS,
193 SendEmptyMessage(port1, 0xf000f));
194 },
195 port1.get()));
196
197 run_loop.Run();
198
199 ASSERT_EQ(4u, watcher1.messages_.size());
200 ASSERT_EQ(4u, watcher2.messages_.size());
201
202 EXPECT_EQ(0xf000f, watcher1.messages_[0].msgh_id);
203 EXPECT_EQ(0x10001, watcher1.messages_[1].msgh_id);
204 EXPECT_EQ(0x10002, watcher1.messages_[2].msgh_id);
205 EXPECT_EQ(0x10003, watcher1.messages_[3].msgh_id);
206
207 EXPECT_EQ(0x20001, watcher2.messages_[0].msgh_id);
208 EXPECT_EQ(0x20002, watcher2.messages_[1].msgh_id);
209 EXPECT_EQ(0x20003, watcher2.messages_[2].msgh_id);
210 EXPECT_EQ(0x20004, watcher2.messages_[3].msgh_id);
211 }
212
213 } // namespace
214 } // namespace base
215