xref: /aosp_15_r20/external/cronet/ipc/ipc_channel_proxy.h (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright 2012 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef IPC_IPC_CHANNEL_PROXY_H_
6 #define IPC_IPC_CHANNEL_PROXY_H_
7 
8 #include <stdint.h>
9 
10 #include <map>
11 #include <memory>
12 #include <string>
13 #include <vector>
14 
15 #include "base/component_export.h"
16 #include "base/functional/bind.h"
17 #include "base/functional/callback.h"
18 #include "base/memory/raw_ptr.h"
19 #include "base/memory/ref_counted.h"
20 #include "base/sequence_checker.h"
21 #include "base/synchronization/lock.h"
22 #include "build/build_config.h"
23 #include "ipc/ipc.mojom.h"
24 #include "ipc/ipc_channel.h"
25 #include "ipc/ipc_channel_handle.h"
26 #include "ipc/ipc_listener.h"
27 #include "ipc/ipc_sender.h"
28 #include "mojo/public/cpp/bindings/associated_remote.h"
29 #include "mojo/public/cpp/bindings/generic_pending_associated_receiver.h"
30 #include "mojo/public/cpp/bindings/pending_associated_receiver.h"
31 #include "mojo/public/cpp/bindings/pending_associated_remote.h"
32 #include "mojo/public/cpp/bindings/scoped_interface_endpoint_handle.h"
33 #include "mojo/public/cpp/bindings/shared_associated_remote.h"
34 
35 namespace base {
36 class SingleThreadTaskRunner;
37 }
38 
39 namespace IPC {
40 
41 class ChannelFactory;
42 class MessageFilter;
43 class MessageFilterRouter;
44 class UrgentMessageObserver;
45 
46 //-----------------------------------------------------------------------------
47 // IPC::ChannelProxy
48 //
49 // This class is a helper class that is useful when you wish to run an IPC
50 // channel on a background thread.  It provides you with the option of either
51 // handling IPC messages on that background thread or having them dispatched to
52 // your main thread (the thread on which the IPC::ChannelProxy is created).
53 //
54 // The API for an IPC::ChannelProxy is very similar to that of an IPC::Channel.
55 // When you send a message to an IPC::ChannelProxy, the message is routed to
56 // the background thread, where it is then passed to the IPC::Channel's Send
57 // method.  This means that you can send a message from your thread and your
58 // message will be sent over the IPC channel when possible instead of being
59 // delayed until your thread returns to its message loop.  (Often IPC messages
60 // will queue up on the IPC::Channel when there is a lot of traffic, and the
61 // channel will not get cycles to flush its message queue until the thread, on
62 // which it is running, returns to its message loop.)
63 //
64 // An IPC::ChannelProxy can have a MessageFilter associated with it, which will
65 // be notified of incoming messages on the IPC::Channel's thread.  This gives
66 // the consumer of IPC::ChannelProxy the ability to respond to incoming
67 // messages on this background thread instead of on their own thread, which may
68 // be bogged down with other processing.  The result can be greatly improved
69 // latency for messages that can be handled on a background thread.
70 //
71 // The consumer of IPC::ChannelProxy is responsible for allocating the Thread
72 // instance where the IPC::Channel will be created and operated.
73 //
74 // Thread-safe send
75 //
76 // If a particular |Channel| implementation has a thread-safe |Send()| operation
77 // then ChannelProxy skips the inter-thread hop and calls |Send()| directly. In
78 // this case the |channel_| variable is touched by multiple threads so
79 // |channel_lifetime_lock_| is used to protect it. The locking overhead is only
80 // paid if the underlying channel supports thread-safe |Send|.
81 //
COMPONENT_EXPORT(IPC)82 class COMPONENT_EXPORT(IPC) ChannelProxy : public Sender {
83  public:
84 #if defined(ENABLE_IPC_FUZZER)
85   // Interface for a filter to be imposed on outgoing messages which can
86   // re-write the message. Used for testing.
87   class OutgoingMessageFilter {
88    public:
89     virtual Message* Rewrite(Message* message) = 0;
90   };
91 #endif
92 
93   // Initializes a channel proxy.  The channel_handle and mode parameters are
94   // passed directly to the underlying IPC::Channel.  The listener is called on
95   // the thread that creates the ChannelProxy.  The filter's OnMessageReceived
96   // method is called on the thread where the IPC::Channel is running.  The
97   // filter may be null if the consumer is not interested in handling messages
98   // on the background thread.  Any message not handled by the filter will be
99   // dispatched to the listener.  The given task runner correspond to a thread
100   // on which IPC::Channel is created and used (e.g. IO thread).
101   static std::unique_ptr<ChannelProxy> Create(
102       const IPC::ChannelHandle& channel_handle,
103       Channel::Mode mode,
104       Listener* listener,
105       const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
106       const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner);
107 
108   static std::unique_ptr<ChannelProxy> Create(
109       std::unique_ptr<ChannelFactory> factory,
110       Listener* listener,
111       const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
112       const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner);
113 
114   // Constructs a ChannelProxy without initializing it.
115   ChannelProxy(
116       Listener* listener,
117       const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
118       const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner);
119 
120   ~ChannelProxy() override;
121 
122   // Initializes the channel proxy. Only call this once to initialize a channel
123   // proxy that was not initialized in its constructor. If |create_pipe_now| is
124   // true, the pipe is created synchronously. Otherwise it's created on the IO
125   // thread.
126   void Init(const IPC::ChannelHandle& channel_handle,
127             Channel::Mode mode,
128             bool create_pipe_now);
129   void Init(std::unique_ptr<ChannelFactory> factory,
130             bool create_pipe_now);
131 
132   // Pause the channel. Subsequent calls to Send() will be internally queued
133   // until Unpause() is called. Queued messages will not be sent until the
134   // channel is flushed.
135   void Pause();
136 
137   // Unpause the channel. If |flush| is true the channel will be flushed as soon
138   // as it's unpaused (see Flush() below.) Otherwise you must explicitly call
139   // Flush() to flush messages which were queued while the channel was paused.
140   void Unpause(bool flush);
141 
142   // Flush the channel. This sends any messages which were queued before calling
143   // Connect. Only useful if Unpause(false) was called previously.
144   void Flush();
145 
146   // Close the IPC::Channel.  This operation completes asynchronously, once the
147   // background thread processes the command to close the channel.  It is ok to
148   // call this method multiple times.  Redundant calls are ignored.
149   //
150   // WARNING: MessageFilter objects held by the ChannelProxy is also
151   // released asynchronously, and it may in fact have its final reference
152   // released on the background thread.  The caller should be careful to deal
153   // with / allow for this possibility.
154   void Close();
155 
156   // Send a message asynchronously.  The message is routed to the background
157   // thread where it is passed to the IPC::Channel's Send method.
158   bool Send(Message* message) override;
159 
160   // Used to intercept messages as they are received on the background thread.
161   //
162   // Ordinarily, messages sent to the ChannelProxy are routed to the matching
163   // listener on the worker thread.  This API allows code to intercept messages
164   // before they are sent to the worker thread.
165   // If you call this before the target process is launched, then you're
166   // guaranteed to not miss any messages.  But if you call this anytime after,
167   // then some messages might be missed since the filter is added internally on
168   // the IO thread.
169   void AddFilter(MessageFilter* filter);
170   void RemoveFilter(MessageFilter* filter);
171 
172   // Set the `UrgentMessageObserver` for the channel. Must be called on the
173   // proxy thread before initialization.
174   void SetUrgentMessageObserver(UrgentMessageObserver* observer);
175 
176   using GenericAssociatedInterfaceFactory =
177       base::RepeatingCallback<void(mojo::ScopedInterfaceEndpointHandle)>;
178 
179   // Adds a generic associated interface factory to bind incoming interface
180   // requests directly on the IO thread. MUST be called either before Init() or
181   // before the remote end of the Channel is able to send messages (e.g. before
182   // its process is launched.)
183   void AddGenericAssociatedInterfaceForIOThread(
184       const std::string& name,
185       const GenericAssociatedInterfaceFactory& factory);
186 
187   template <typename Interface>
188   using AssociatedInterfaceFactory =
189       base::RepeatingCallback<void(mojo::PendingAssociatedReceiver<Interface>)>;
190 
191   // Helper to bind an IO-thread associated interface factory, inferring the
192   // interface name from the callback argument's type. MUST be called before
193   // Init().
194   template <typename Interface>
195   void AddAssociatedInterfaceForIOThread(
196       const AssociatedInterfaceFactory<Interface>& factory) {
197     AddGenericAssociatedInterfaceForIOThread(
198         Interface::Name_,
199         base::BindRepeating(
200             &ChannelProxy::BindPendingAssociatedReceiver<Interface>, factory));
201   }
202 
203   // Requests an associated interface from the remote endpoint.
204   void GetRemoteAssociatedInterface(
205       mojo::GenericPendingAssociatedReceiver receiver);
206 
207   // Template helper to receive associated interfaces from the remote endpoint.
208   template <typename Interface>
209   void GetRemoteAssociatedInterface(mojo::AssociatedRemote<Interface>* proxy) {
210     GetRemoteAssociatedInterface(proxy->BindNewEndpointAndPassReceiver());
211   }
212 
213 #if defined(ENABLE_IPC_FUZZER)
214   void set_outgoing_message_filter(OutgoingMessageFilter* filter) {
215     outgoing_message_filter_ = filter;
216   }
217 #endif
218 
219   // Creates a SharedAssociatedRemote for |Interface|. This object may be used
220   // to send messages on the interface from any thread and those messages will
221   // remain ordered with respect to other messages sent on the same thread over
222   // other SharedAssociatedRemotes associated with the same Channel.
223   template <typename Interface>
224   void GetThreadSafeRemoteAssociatedInterface(
225       scoped_refptr<mojo::SharedAssociatedRemote<Interface>>* out_remote) {
226     mojo::PendingAssociatedRemote<Interface> pending_remote;
227     auto receiver = pending_remote.InitWithNewEndpointAndPassReceiver();
228     GetGenericRemoteAssociatedInterface(Interface::Name_,
229                                         receiver.PassHandle());
230     *out_remote = mojo::SharedAssociatedRemote<Interface>::Create(
231         std::move(pending_remote), ipc_task_runner());
232   }
233 
234   base::SingleThreadTaskRunner* ipc_task_runner() const {
235     return context_->ipc_task_runner();
236   }
237 
238   const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner_refptr()
239       const {
240     return context_->ipc_task_runner_refptr();
241   }
242 
243   // Called to clear the pointer to the IPC task runner when it's going away.
244   void ClearIPCTaskRunner();
245 
246  protected:
247   class Context;
248   // A subclass uses this constructor if it needs to add more information
249   // to the internal state.
250   explicit ChannelProxy(Context* context);
251 
252   // Used internally to hold state that is referenced on the IPC thread.
253   class Context : public base::RefCountedThreadSafe<Context>,
254                   public Listener {
255    public:
256     Context(Listener* listener,
257             const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
258             const scoped_refptr<base::SingleThreadTaskRunner>&
259                 listener_task_runner);
260     void ClearIPCTaskRunner();
261     base::SingleThreadTaskRunner* ipc_task_runner() const {
262       return ipc_task_runner_.get();
263     }
264     const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner_refptr()
265         const {
266       return ipc_task_runner_;
267     }
268 
269     scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner() {
270       return default_listener_task_runner_;
271     }
272 
273     // Dispatches a message on the listener thread.
274     void OnDispatchMessage(const Message& message);
275 
276     // Sends |message| from appropriate thread.
277     void Send(Message* message);
278 
279     // Adds |task_runner| for the task to be executed later.
280     void AddListenerTaskRunner(
281         int32_t routing_id,
282         scoped_refptr<base::SingleThreadTaskRunner> task_runner);
283 
284     // Removes task runner for |routing_id|.
285     void RemoveListenerTaskRunner(int32_t routing_id);
286 
287     // Called on the IPC::Channel thread.
288     // Returns the task runner associated with |routing_id|.
289     scoped_refptr<base::SingleThreadTaskRunner> GetTaskRunner(
290         int32_t routing_id);
291 
292    protected:
293     friend class base::RefCountedThreadSafe<Context>;
294     ~Context() override;
295 
296     // IPC::Listener methods:
297     bool OnMessageReceived(const Message& message) override;
298     void OnChannelConnected(int32_t peer_pid) override;
299     void OnChannelError() override;
300     void OnAssociatedInterfaceRequest(
301         const std::string& interface_name,
302         mojo::ScopedInterfaceEndpointHandle handle) override;
303 
304     // Like OnMessageReceived but doesn't try the filters.
305     bool OnMessageReceivedNoFilter(const Message& message);
306 
307     // Gives the filters a chance at processing |message|.
308     // Returns true if the message was processed, false otherwise.
309     bool TryFilters(const Message& message);
310 
311     void PauseChannel();
312     void UnpauseChannel(bool flush);
313     void FlushChannel();
314 
315     // Like Open and Close, but called on the IPC thread.
316     virtual void OnChannelOpened();
317     virtual void OnChannelClosed();
318 
319     // Called on the consumers thread when the ChannelProxy is closed.  At that
320     // point the consumer is telling us that they don't want to receive any
321     // more messages, so we honor that wish by forgetting them!
322     virtual void Clear();
323 
324    private:
325     friend class ChannelProxy;
326     friend class IpcSecurityTestUtil;
327 
328     // Create the Channel
329     void CreateChannel(std::unique_ptr<ChannelFactory> factory);
330 
331     // Methods called on the IO thread.
332     void OnSendMessage(std::unique_ptr<Message> message_ptr);
333     void OnAddFilter();
334     void OnRemoveFilter(MessageFilter* filter);
335 
336     // Methods called on the listener thread.
337     void AddFilter(MessageFilter* filter);
338     void OnDispatchConnected();
339     void OnDispatchError();
340     void OnDispatchBadMessage(const Message& message);
341     void OnDispatchAssociatedInterfaceRequest(
342         const std::string& interface_name,
343         mojo::ScopedInterfaceEndpointHandle handle);
344     void SetUrgentMessageObserver(UrgentMessageObserver* observer);
345 
346     void ClearChannel();
347 
348     mojom::Channel& thread_safe_channel() {
349       return thread_safe_channel_->proxy();
350     }
351 
352     void AddGenericAssociatedInterfaceForIOThread(
353         const std::string& name,
354         const GenericAssociatedInterfaceFactory& factory);
355 
356     base::Lock listener_thread_task_runners_lock_;
357     // Map of routing_id and listener's thread task runner.
358     std::map<int32_t, scoped_refptr<base::SingleThreadTaskRunner>>
359         listener_thread_task_runners_
360             GUARDED_BY(listener_thread_task_runners_lock_);
361 
362     scoped_refptr<base::SingleThreadTaskRunner> default_listener_task_runner_;
363     raw_ptr<Listener> listener_;
364 
365     // List of filters.  This is only accessed on the IPC thread.
366     std::vector<scoped_refptr<MessageFilter> > filters_;
367     scoped_refptr<base::SingleThreadTaskRunner> ipc_task_runner_;
368 
369     // Note, channel_ may be set on the Listener thread or the IPC thread.
370     // But once it has been set, it must only be read or cleared on the IPC
371     // thread.
372     // One exception is the thread-safe send. See the class comment.
373     std::unique_ptr<Channel> channel_;
374     bool channel_connected_called_;
375 
376     // Lock for |channel_| value. This is only relevant in the context of
377     // thread-safe send.
378     base::Lock channel_lifetime_lock_;
379 
380     // Routes a given message to a proper subset of |filters_|, depending
381     // on which message classes a filter might support.
382     std::unique_ptr<MessageFilterRouter> message_filter_router_;
383 
384     // Holds filters between the AddFilter call on the listerner thread and the
385     // IPC thread when they're added to filters_.
386     std::vector<scoped_refptr<MessageFilter> > pending_filters_;
387     // Lock for pending_filters_.
388     base::Lock pending_filters_lock_;
389 
390     // Cached copy of the peer process ID. Set on IPC but read on both IPC and
391     // listener threads.
392     base::ProcessId peer_pid_;
393     base::Lock peer_pid_lock_;
394 
395     // A thread-safe mojom::Channel interface we use to make remote interface
396     // requests from the proxy thread.
397     std::unique_ptr<mojo::ThreadSafeForwarder<mojom::Channel>>
398         thread_safe_channel_;
399 
400     // Holds associated interface binders added by
401     // AddGenericAssociatedInterfaceForIOThread until the underlying channel has
402     // been initialized.
403     base::Lock pending_io_thread_interfaces_lock_;
404     std::vector<std::pair<std::string, GenericAssociatedInterfaceFactory>>
405         pending_io_thread_interfaces_;
406     raw_ptr<UrgentMessageObserver> urgent_message_observer_ = nullptr;
407   };
408 
409   Context* context() { return context_.get(); }
410 
411 #if defined(ENABLE_IPC_FUZZER)
412   OutgoingMessageFilter* outgoing_message_filter() const {
413     return outgoing_message_filter_;
414   }
415 #endif
416 
417   bool did_init() const { return did_init_; }
418 
419   // A Send() which doesn't DCHECK if the message is synchronous.
420   void SendInternal(Message* message);
421 
422  private:
423   friend class IpcSecurityTestUtil;
424 
425   template <typename Interface>
426   static void BindPendingAssociatedReceiver(
427       const AssociatedInterfaceFactory<Interface>& factory,
428       mojo::ScopedInterfaceEndpointHandle handle) {
429     factory.Run(mojo::PendingAssociatedReceiver<Interface>(std::move(handle)));
430   }
431 
432   // Always called once immediately after Init.
433   virtual void OnChannelInit();
434 
435   // By maintaining this indirection (ref-counted) to our internal state, we
436   // can safely be destroyed while the background thread continues to do stuff
437   // that involves this data.
438   scoped_refptr<Context> context_;
439 
440   // Whether the channel has been initialized.
441   bool did_init_ = false;
442 
443 #if defined(ENABLE_IPC_FUZZER)
444   raw_ptr<OutgoingMessageFilter> outgoing_message_filter_ = nullptr;
445 #endif
446 
447   SEQUENCE_CHECKER(sequence_checker_);
448 };
449 
450 }  // namespace IPC
451 
452 #endif  // IPC_IPC_CHANNEL_PROXY_H_
453