1 // Copyright 2012 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #ifndef IPC_IPC_CHANNEL_PROXY_H_
6 #define IPC_IPC_CHANNEL_PROXY_H_
7
8 #include <stdint.h>
9
10 #include <map>
11 #include <memory>
12 #include <string>
13 #include <vector>
14
15 #include "base/component_export.h"
16 #include "base/functional/bind.h"
17 #include "base/functional/callback.h"
18 #include "base/memory/raw_ptr.h"
19 #include "base/memory/ref_counted.h"
20 #include "base/sequence_checker.h"
21 #include "base/synchronization/lock.h"
22 #include "build/build_config.h"
23 #include "ipc/ipc.mojom.h"
24 #include "ipc/ipc_channel.h"
25 #include "ipc/ipc_channel_handle.h"
26 #include "ipc/ipc_listener.h"
27 #include "ipc/ipc_sender.h"
28 #include "mojo/public/cpp/bindings/associated_remote.h"
29 #include "mojo/public/cpp/bindings/generic_pending_associated_receiver.h"
30 #include "mojo/public/cpp/bindings/pending_associated_receiver.h"
31 #include "mojo/public/cpp/bindings/pending_associated_remote.h"
32 #include "mojo/public/cpp/bindings/scoped_interface_endpoint_handle.h"
33 #include "mojo/public/cpp/bindings/shared_associated_remote.h"
34
35 namespace base {
36 class SingleThreadTaskRunner;
37 }
38
39 namespace IPC {
40
41 class ChannelFactory;
42 class MessageFilter;
43 class MessageFilterRouter;
44 class UrgentMessageObserver;
45
46 //-----------------------------------------------------------------------------
47 // IPC::ChannelProxy
48 //
49 // This class is a helper class that is useful when you wish to run an IPC
50 // channel on a background thread. It provides you with the option of either
51 // handling IPC messages on that background thread or having them dispatched to
52 // your main thread (the thread on which the IPC::ChannelProxy is created).
53 //
54 // The API for an IPC::ChannelProxy is very similar to that of an IPC::Channel.
55 // When you send a message to an IPC::ChannelProxy, the message is routed to
56 // the background thread, where it is then passed to the IPC::Channel's Send
57 // method. This means that you can send a message from your thread and your
58 // message will be sent over the IPC channel when possible instead of being
59 // delayed until your thread returns to its message loop. (Often IPC messages
60 // will queue up on the IPC::Channel when there is a lot of traffic, and the
61 // channel will not get cycles to flush its message queue until the thread, on
62 // which it is running, returns to its message loop.)
63 //
64 // An IPC::ChannelProxy can have a MessageFilter associated with it, which will
65 // be notified of incoming messages on the IPC::Channel's thread. This gives
66 // the consumer of IPC::ChannelProxy the ability to respond to incoming
67 // messages on this background thread instead of on their own thread, which may
68 // be bogged down with other processing. The result can be greatly improved
69 // latency for messages that can be handled on a background thread.
70 //
71 // The consumer of IPC::ChannelProxy is responsible for allocating the Thread
72 // instance where the IPC::Channel will be created and operated.
73 //
74 // Thread-safe send
75 //
76 // If a particular |Channel| implementation has a thread-safe |Send()| operation
77 // then ChannelProxy skips the inter-thread hop and calls |Send()| directly. In
78 // this case the |channel_| variable is touched by multiple threads so
79 // |channel_lifetime_lock_| is used to protect it. The locking overhead is only
80 // paid if the underlying channel supports thread-safe |Send|.
81 //
COMPONENT_EXPORT(IPC)82 class COMPONENT_EXPORT(IPC) ChannelProxy : public Sender {
83 public:
84 #if defined(ENABLE_IPC_FUZZER)
85 // Interface for a filter to be imposed on outgoing messages which can
86 // re-write the message. Used for testing.
87 class OutgoingMessageFilter {
88 public:
89 virtual Message* Rewrite(Message* message) = 0;
90 };
91 #endif
92
93 // Initializes a channel proxy. The channel_handle and mode parameters are
94 // passed directly to the underlying IPC::Channel. The listener is called on
95 // the thread that creates the ChannelProxy. The filter's OnMessageReceived
96 // method is called on the thread where the IPC::Channel is running. The
97 // filter may be null if the consumer is not interested in handling messages
98 // on the background thread. Any message not handled by the filter will be
99 // dispatched to the listener. The given task runner correspond to a thread
100 // on which IPC::Channel is created and used (e.g. IO thread).
101 static std::unique_ptr<ChannelProxy> Create(
102 const IPC::ChannelHandle& channel_handle,
103 Channel::Mode mode,
104 Listener* listener,
105 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
106 const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner);
107
108 static std::unique_ptr<ChannelProxy> Create(
109 std::unique_ptr<ChannelFactory> factory,
110 Listener* listener,
111 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
112 const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner);
113
114 // Constructs a ChannelProxy without initializing it.
115 ChannelProxy(
116 Listener* listener,
117 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
118 const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner);
119
120 ~ChannelProxy() override;
121
122 // Initializes the channel proxy. Only call this once to initialize a channel
123 // proxy that was not initialized in its constructor. If |create_pipe_now| is
124 // true, the pipe is created synchronously. Otherwise it's created on the IO
125 // thread.
126 void Init(const IPC::ChannelHandle& channel_handle,
127 Channel::Mode mode,
128 bool create_pipe_now);
129 void Init(std::unique_ptr<ChannelFactory> factory,
130 bool create_pipe_now);
131
132 // Pause the channel. Subsequent calls to Send() will be internally queued
133 // until Unpause() is called. Queued messages will not be sent until the
134 // channel is flushed.
135 void Pause();
136
137 // Unpause the channel. If |flush| is true the channel will be flushed as soon
138 // as it's unpaused (see Flush() below.) Otherwise you must explicitly call
139 // Flush() to flush messages which were queued while the channel was paused.
140 void Unpause(bool flush);
141
142 // Flush the channel. This sends any messages which were queued before calling
143 // Connect. Only useful if Unpause(false) was called previously.
144 void Flush();
145
146 // Close the IPC::Channel. This operation completes asynchronously, once the
147 // background thread processes the command to close the channel. It is ok to
148 // call this method multiple times. Redundant calls are ignored.
149 //
150 // WARNING: MessageFilter objects held by the ChannelProxy is also
151 // released asynchronously, and it may in fact have its final reference
152 // released on the background thread. The caller should be careful to deal
153 // with / allow for this possibility.
154 void Close();
155
156 // Send a message asynchronously. The message is routed to the background
157 // thread where it is passed to the IPC::Channel's Send method.
158 bool Send(Message* message) override;
159
160 // Used to intercept messages as they are received on the background thread.
161 //
162 // Ordinarily, messages sent to the ChannelProxy are routed to the matching
163 // listener on the worker thread. This API allows code to intercept messages
164 // before they are sent to the worker thread.
165 // If you call this before the target process is launched, then you're
166 // guaranteed to not miss any messages. But if you call this anytime after,
167 // then some messages might be missed since the filter is added internally on
168 // the IO thread.
169 void AddFilter(MessageFilter* filter);
170 void RemoveFilter(MessageFilter* filter);
171
172 // Set the `UrgentMessageObserver` for the channel. Must be called on the
173 // proxy thread before initialization.
174 void SetUrgentMessageObserver(UrgentMessageObserver* observer);
175
176 using GenericAssociatedInterfaceFactory =
177 base::RepeatingCallback<void(mojo::ScopedInterfaceEndpointHandle)>;
178
179 // Adds a generic associated interface factory to bind incoming interface
180 // requests directly on the IO thread. MUST be called either before Init() or
181 // before the remote end of the Channel is able to send messages (e.g. before
182 // its process is launched.)
183 void AddGenericAssociatedInterfaceForIOThread(
184 const std::string& name,
185 const GenericAssociatedInterfaceFactory& factory);
186
187 template <typename Interface>
188 using AssociatedInterfaceFactory =
189 base::RepeatingCallback<void(mojo::PendingAssociatedReceiver<Interface>)>;
190
191 // Helper to bind an IO-thread associated interface factory, inferring the
192 // interface name from the callback argument's type. MUST be called before
193 // Init().
194 template <typename Interface>
195 void AddAssociatedInterfaceForIOThread(
196 const AssociatedInterfaceFactory<Interface>& factory) {
197 AddGenericAssociatedInterfaceForIOThread(
198 Interface::Name_,
199 base::BindRepeating(
200 &ChannelProxy::BindPendingAssociatedReceiver<Interface>, factory));
201 }
202
203 // Requests an associated interface from the remote endpoint.
204 void GetRemoteAssociatedInterface(
205 mojo::GenericPendingAssociatedReceiver receiver);
206
207 // Template helper to receive associated interfaces from the remote endpoint.
208 template <typename Interface>
209 void GetRemoteAssociatedInterface(mojo::AssociatedRemote<Interface>* proxy) {
210 GetRemoteAssociatedInterface(proxy->BindNewEndpointAndPassReceiver());
211 }
212
213 #if defined(ENABLE_IPC_FUZZER)
214 void set_outgoing_message_filter(OutgoingMessageFilter* filter) {
215 outgoing_message_filter_ = filter;
216 }
217 #endif
218
219 // Creates a SharedAssociatedRemote for |Interface|. This object may be used
220 // to send messages on the interface from any thread and those messages will
221 // remain ordered with respect to other messages sent on the same thread over
222 // other SharedAssociatedRemotes associated with the same Channel.
223 template <typename Interface>
224 void GetThreadSafeRemoteAssociatedInterface(
225 scoped_refptr<mojo::SharedAssociatedRemote<Interface>>* out_remote) {
226 mojo::PendingAssociatedRemote<Interface> pending_remote;
227 auto receiver = pending_remote.InitWithNewEndpointAndPassReceiver();
228 GetGenericRemoteAssociatedInterface(Interface::Name_,
229 receiver.PassHandle());
230 *out_remote = mojo::SharedAssociatedRemote<Interface>::Create(
231 std::move(pending_remote), ipc_task_runner());
232 }
233
234 base::SingleThreadTaskRunner* ipc_task_runner() const {
235 return context_->ipc_task_runner();
236 }
237
238 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner_refptr()
239 const {
240 return context_->ipc_task_runner_refptr();
241 }
242
243 // Called to clear the pointer to the IPC task runner when it's going away.
244 void ClearIPCTaskRunner();
245
246 protected:
247 class Context;
248 // A subclass uses this constructor if it needs to add more information
249 // to the internal state.
250 explicit ChannelProxy(Context* context);
251
252 // Used internally to hold state that is referenced on the IPC thread.
253 class Context : public base::RefCountedThreadSafe<Context>,
254 public Listener {
255 public:
256 Context(Listener* listener,
257 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
258 const scoped_refptr<base::SingleThreadTaskRunner>&
259 listener_task_runner);
260 void ClearIPCTaskRunner();
261 base::SingleThreadTaskRunner* ipc_task_runner() const {
262 return ipc_task_runner_.get();
263 }
264 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner_refptr()
265 const {
266 return ipc_task_runner_;
267 }
268
269 scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner() {
270 return default_listener_task_runner_;
271 }
272
273 // Dispatches a message on the listener thread.
274 void OnDispatchMessage(const Message& message);
275
276 // Sends |message| from appropriate thread.
277 void Send(Message* message);
278
279 // Adds |task_runner| for the task to be executed later.
280 void AddListenerTaskRunner(
281 int32_t routing_id,
282 scoped_refptr<base::SingleThreadTaskRunner> task_runner);
283
284 // Removes task runner for |routing_id|.
285 void RemoveListenerTaskRunner(int32_t routing_id);
286
287 // Called on the IPC::Channel thread.
288 // Returns the task runner associated with |routing_id|.
289 scoped_refptr<base::SingleThreadTaskRunner> GetTaskRunner(
290 int32_t routing_id);
291
292 protected:
293 friend class base::RefCountedThreadSafe<Context>;
294 ~Context() override;
295
296 // IPC::Listener methods:
297 bool OnMessageReceived(const Message& message) override;
298 void OnChannelConnected(int32_t peer_pid) override;
299 void OnChannelError() override;
300 void OnAssociatedInterfaceRequest(
301 const std::string& interface_name,
302 mojo::ScopedInterfaceEndpointHandle handle) override;
303
304 // Like OnMessageReceived but doesn't try the filters.
305 bool OnMessageReceivedNoFilter(const Message& message);
306
307 // Gives the filters a chance at processing |message|.
308 // Returns true if the message was processed, false otherwise.
309 bool TryFilters(const Message& message);
310
311 void PauseChannel();
312 void UnpauseChannel(bool flush);
313 void FlushChannel();
314
315 // Like Open and Close, but called on the IPC thread.
316 virtual void OnChannelOpened();
317 virtual void OnChannelClosed();
318
319 // Called on the consumers thread when the ChannelProxy is closed. At that
320 // point the consumer is telling us that they don't want to receive any
321 // more messages, so we honor that wish by forgetting them!
322 virtual void Clear();
323
324 private:
325 friend class ChannelProxy;
326 friend class IpcSecurityTestUtil;
327
328 // Create the Channel
329 void CreateChannel(std::unique_ptr<ChannelFactory> factory);
330
331 // Methods called on the IO thread.
332 void OnSendMessage(std::unique_ptr<Message> message_ptr);
333 void OnAddFilter();
334 void OnRemoveFilter(MessageFilter* filter);
335
336 // Methods called on the listener thread.
337 void AddFilter(MessageFilter* filter);
338 void OnDispatchConnected();
339 void OnDispatchError();
340 void OnDispatchBadMessage(const Message& message);
341 void OnDispatchAssociatedInterfaceRequest(
342 const std::string& interface_name,
343 mojo::ScopedInterfaceEndpointHandle handle);
344 void SetUrgentMessageObserver(UrgentMessageObserver* observer);
345
346 void ClearChannel();
347
348 mojom::Channel& thread_safe_channel() {
349 return thread_safe_channel_->proxy();
350 }
351
352 void AddGenericAssociatedInterfaceForIOThread(
353 const std::string& name,
354 const GenericAssociatedInterfaceFactory& factory);
355
356 base::Lock listener_thread_task_runners_lock_;
357 // Map of routing_id and listener's thread task runner.
358 std::map<int32_t, scoped_refptr<base::SingleThreadTaskRunner>>
359 listener_thread_task_runners_
360 GUARDED_BY(listener_thread_task_runners_lock_);
361
362 scoped_refptr<base::SingleThreadTaskRunner> default_listener_task_runner_;
363 raw_ptr<Listener> listener_;
364
365 // List of filters. This is only accessed on the IPC thread.
366 std::vector<scoped_refptr<MessageFilter> > filters_;
367 scoped_refptr<base::SingleThreadTaskRunner> ipc_task_runner_;
368
369 // Note, channel_ may be set on the Listener thread or the IPC thread.
370 // But once it has been set, it must only be read or cleared on the IPC
371 // thread.
372 // One exception is the thread-safe send. See the class comment.
373 std::unique_ptr<Channel> channel_;
374 bool channel_connected_called_;
375
376 // Lock for |channel_| value. This is only relevant in the context of
377 // thread-safe send.
378 base::Lock channel_lifetime_lock_;
379
380 // Routes a given message to a proper subset of |filters_|, depending
381 // on which message classes a filter might support.
382 std::unique_ptr<MessageFilterRouter> message_filter_router_;
383
384 // Holds filters between the AddFilter call on the listerner thread and the
385 // IPC thread when they're added to filters_.
386 std::vector<scoped_refptr<MessageFilter> > pending_filters_;
387 // Lock for pending_filters_.
388 base::Lock pending_filters_lock_;
389
390 // Cached copy of the peer process ID. Set on IPC but read on both IPC and
391 // listener threads.
392 base::ProcessId peer_pid_;
393 base::Lock peer_pid_lock_;
394
395 // A thread-safe mojom::Channel interface we use to make remote interface
396 // requests from the proxy thread.
397 std::unique_ptr<mojo::ThreadSafeForwarder<mojom::Channel>>
398 thread_safe_channel_;
399
400 // Holds associated interface binders added by
401 // AddGenericAssociatedInterfaceForIOThread until the underlying channel has
402 // been initialized.
403 base::Lock pending_io_thread_interfaces_lock_;
404 std::vector<std::pair<std::string, GenericAssociatedInterfaceFactory>>
405 pending_io_thread_interfaces_;
406 raw_ptr<UrgentMessageObserver> urgent_message_observer_ = nullptr;
407 };
408
409 Context* context() { return context_.get(); }
410
411 #if defined(ENABLE_IPC_FUZZER)
412 OutgoingMessageFilter* outgoing_message_filter() const {
413 return outgoing_message_filter_;
414 }
415 #endif
416
417 bool did_init() const { return did_init_; }
418
419 // A Send() which doesn't DCHECK if the message is synchronous.
420 void SendInternal(Message* message);
421
422 private:
423 friend class IpcSecurityTestUtil;
424
425 template <typename Interface>
426 static void BindPendingAssociatedReceiver(
427 const AssociatedInterfaceFactory<Interface>& factory,
428 mojo::ScopedInterfaceEndpointHandle handle) {
429 factory.Run(mojo::PendingAssociatedReceiver<Interface>(std::move(handle)));
430 }
431
432 // Always called once immediately after Init.
433 virtual void OnChannelInit();
434
435 // By maintaining this indirection (ref-counted) to our internal state, we
436 // can safely be destroyed while the background thread continues to do stuff
437 // that involves this data.
438 scoped_refptr<Context> context_;
439
440 // Whether the channel has been initialized.
441 bool did_init_ = false;
442
443 #if defined(ENABLE_IPC_FUZZER)
444 raw_ptr<OutgoingMessageFilter> outgoing_message_filter_ = nullptr;
445 #endif
446
447 SEQUENCE_CHECKER(sequence_checker_);
448 };
449
450 } // namespace IPC
451
452 #endif // IPC_IPC_CHANNEL_PROXY_H_
453