xref: /aosp_15_r20/external/grpc-grpc-java/binder/src/main/java/io/grpc/binder/internal/BinderTransport.java (revision e07d83d3ffcef9ecfc9f7f475418ec639ff0e5fe)
1 /*
2  * Copyright 2020 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.binder.internal;
18 
19 import static com.google.common.base.Preconditions.checkNotNull;
20 import static com.google.common.base.Preconditions.checkState;
21 import static com.google.common.util.concurrent.Futures.immediateFuture;
22 
23 import android.content.Context;
24 import android.os.Binder;
25 import android.os.DeadObjectException;
26 import android.os.IBinder;
27 import android.os.Parcel;
28 import android.os.Process;
29 import android.os.RemoteException;
30 import android.os.TransactionTooLargeException;
31 import com.google.common.annotations.VisibleForTesting;
32 import com.google.common.base.Ticker;
33 import com.google.common.util.concurrent.ListenableFuture;
34 import io.grpc.Attributes;
35 import io.grpc.CallOptions;
36 import io.grpc.ClientStreamTracer;
37 import io.grpc.Grpc;
38 import io.grpc.Internal;
39 import io.grpc.InternalChannelz.SocketStats;
40 import io.grpc.InternalLogId;
41 import io.grpc.Metadata;
42 import io.grpc.MethodDescriptor;
43 import io.grpc.SecurityLevel;
44 import io.grpc.ServerStreamTracer;
45 import io.grpc.Status;
46 import io.grpc.StatusException;
47 import io.grpc.binder.AndroidComponentAddress;
48 import io.grpc.binder.BindServiceFlags;
49 import io.grpc.binder.InboundParcelablePolicy;
50 import io.grpc.binder.SecurityPolicy;
51 import io.grpc.internal.ClientStream;
52 import io.grpc.internal.ConnectionClientTransport;
53 import io.grpc.internal.FailingClientStream;
54 import io.grpc.internal.GrpcAttributes;
55 import io.grpc.internal.GrpcUtil;
56 import io.grpc.internal.ManagedClientTransport;
57 import io.grpc.internal.ObjectPool;
58 import io.grpc.internal.ServerStream;
59 import io.grpc.internal.ServerTransport;
60 import io.grpc.internal.ServerTransportListener;
61 import io.grpc.internal.StatsTraceContext;
62 import java.util.ArrayList;
63 import java.util.Iterator;
64 import java.util.LinkedHashSet;
65 import java.util.List;
66 import java.util.Map;
67 import java.util.NoSuchElementException;
68 import java.util.concurrent.ConcurrentHashMap;
69 import java.util.concurrent.Executor;
70 import java.util.concurrent.ScheduledExecutorService;
71 import java.util.concurrent.atomic.AtomicInteger;
72 import java.util.concurrent.atomic.AtomicLong;
73 import java.util.logging.Level;
74 import java.util.logging.Logger;
75 import javax.annotation.CheckReturnValue;
76 import javax.annotation.Nullable;
77 import javax.annotation.concurrent.GuardedBy;
78 import javax.annotation.concurrent.ThreadSafe;
79 
80 /**
81  * Base class for binder-based gRPC transport.
82  *
83  * <p>This is used on both the client and service sides of the transport.
84  *
85  * <p>A note on synchronization. The nature of this class's interaction with each stream
86  * (bi-directional communication between gRPC calls and binder transactions) means that acquiring
87  * multiple locks in two different orders can happen easily. E.g. binder transactions will arrive in
88  * this class, and need to passed to a stream instance, whereas gRPC calls on a stream instance will
89  * need to call into this class to send a transaction (possibly waiting for the transport to become
90  * ready).
91  *
92  * <p>The split between Outbound &amp; Inbound helps reduce this risk, but not entirely remove it.
93  *
94  * <p>For this reason, while most state within this class is guarded by this instance, methods
95  * exposed to individual stream instances need to use atomic or volatile types, since those calls
96  * will already be synchronized on the individual RPC objects.
97  *
98  * <p><b>IMPORTANT</b>: This implementation must comply with this published wire format.
99  * https://github.com/grpc/proposal/blob/master/L73-java-binderchannel/wireformat.md
100  */
101 @ThreadSafe
102 public abstract class BinderTransport
103     implements LeakSafeOneWayBinder.TransactionHandler, IBinder.DeathRecipient {
104 
105   private static final Logger logger = Logger.getLogger(BinderTransport.class.getName());
106 
107   /**
108    * Attribute used to store the Android UID of the remote app. This is guaranteed to be set on any
109    * active transport.
110    */
111   @Internal
112   public static final Attributes.Key<Integer> REMOTE_UID =
113       Attributes.Key.create("internal:remote-uid");
114 
115   /** The authority of the server. */
116   @Internal
117   public static final Attributes.Key<String> SERVER_AUTHORITY =
118       Attributes.Key.create("internal:server-authority");
119 
120   /** A transport attribute to hold the {@link InboundParcelablePolicy}. */
121   @Internal
122   public static final Attributes.Key<InboundParcelablePolicy> INBOUND_PARCELABLE_POLICY =
123       Attributes.Key.create("internal:inbound-parcelable-policy");
124 
125   /**
126    * Version code for this wire format.
127    *
128    * <p>Should this change, we should still endeavor to support earlier wire-format versions. If
129    * that's not possible, {@link EARLIEST_SUPPORTED_WIRE_FORMAT_VERSION} should be updated below.
130    */
131   @Internal
132   public static final int WIRE_FORMAT_VERSION = 1;
133 
134   /** The version code of the earliest wire format we support. */
135   @Internal
136   public static final int EARLIEST_SUPPORTED_WIRE_FORMAT_VERSION = 1;
137 
138   /** The max number of "in-flight" bytes before we start buffering transactions. */
139   private static final int TRANSACTION_BYTES_WINDOW = 128 * 1024;
140 
141   /** The number of in-flight bytes we should receive between sendings acks to our peer. */
142   private static final int TRANSACTION_BYTES_WINDOW_FORCE_ACK = 16 * 1024;
143 
144   /**
145    * Sent from the client to host service binder to initiate a new transport, and from the host to
146    * the binder. and from the host s Followed by: int wire_protocol_version IBinder
147    * client_transports_callback_binder
148    */
149   @Internal
150   public static final int SETUP_TRANSPORT = IBinder.FIRST_CALL_TRANSACTION;
151 
152   /** Send to shutdown the transport from either end. */
153   @Internal
154   public static final int SHUTDOWN_TRANSPORT = IBinder.FIRST_CALL_TRANSACTION + 1;
155 
156   /** Send to acknowledge receipt of rpc bytes, for flow control. */
157   static final int ACKNOWLEDGE_BYTES = IBinder.FIRST_CALL_TRANSACTION + 2;
158 
159   /** A ping request. */
160   private static final int PING = IBinder.FIRST_CALL_TRANSACTION + 3;
161 
162   /** A response to a ping. */
163   private static final int PING_RESPONSE = IBinder.FIRST_CALL_TRANSACTION + 4;
164 
165   /** Reserved transaction IDs for any special events we might need. */
166   private static final int RESERVED_TRANSACTIONS = 1000;
167 
168   /** The first call ID we can use. */
169   private static final int FIRST_CALL_ID = IBinder.FIRST_CALL_TRANSACTION + RESERVED_TRANSACTIONS;
170 
171   /** The last call ID we can use. */
172   private static final int LAST_CALL_ID = IBinder.LAST_CALL_TRANSACTION;
173 
174   /** The states of this transport. */
175   protected enum TransportState {
176     NOT_STARTED, // We haven't been started yet.
177     SETUP, // We're setting up the connection.
178     READY, // The transport is ready.
179     SHUTDOWN, // We've been shutdown and won't accept any additional calls (thought existing calls
180     // may continue).
181     SHUTDOWN_TERMINATED // We've been shutdown completely (or we failed to start). We can't send or
182     // receive any data.
183   }
184 
185   private final ObjectPool<ScheduledExecutorService> executorServicePool;
186   private final ScheduledExecutorService scheduledExecutorService;
187   private final InternalLogId logId;
188   private final LeakSafeOneWayBinder incomingBinder;
189 
190   protected final ConcurrentHashMap<Integer, Inbound<?>> ongoingCalls;
191 
192   @GuardedBy("this")
193   private final LinkedHashSet<Integer> callIdsToNotifyWhenReady = new LinkedHashSet<>();
194 
195   @GuardedBy("this")
196   protected Attributes attributes;
197 
198   @GuardedBy("this")
199   private TransportState transportState = TransportState.NOT_STARTED;
200 
201   @GuardedBy("this")
202   @Nullable
203   protected Status shutdownStatus;
204 
205   @Nullable private OneWayBinderProxy outgoingBinder;
206 
207   private final FlowController flowController;
208 
209   /** The number of incoming bytes we've received. */
210   private final AtomicLong numIncomingBytes;
211 
212   /** The number of incoming bytes we've told our peer we've received. */
213   private long acknowledgedIncomingBytes;
214 
BinderTransport( ObjectPool<ScheduledExecutorService> executorServicePool, Attributes attributes, InternalLogId logId)215   private BinderTransport(
216       ObjectPool<ScheduledExecutorService> executorServicePool,
217       Attributes attributes,
218       InternalLogId logId) {
219     this.executorServicePool = executorServicePool;
220     this.attributes = attributes;
221     this.logId = logId;
222     scheduledExecutorService = executorServicePool.getObject();
223     incomingBinder = new LeakSafeOneWayBinder(this);
224     ongoingCalls = new ConcurrentHashMap<>();
225     flowController = new FlowController(TRANSACTION_BYTES_WINDOW);
226     numIncomingBytes = new AtomicLong();
227   }
228 
229   // Override in child class.
getScheduledExecutorService()230   public final ScheduledExecutorService getScheduledExecutorService() {
231     return scheduledExecutorService;
232   }
233 
234   // Override in child class.
getStats()235   public final ListenableFuture<SocketStats> getStats() {
236     return immediateFuture(null);
237   }
238 
239   // Override in child class.
getLogId()240   public final InternalLogId getLogId() {
241     return logId;
242   }
243 
244   // Override in child class.
getAttributes()245   public final synchronized Attributes getAttributes() {
246     return attributes;
247   }
248 
249   /**
250    * Returns whether this transport is able to send rpc transactions. Intentionally unsynchronized
251    * since this will be called while Outbound is held.
252    */
isReady()253   final boolean isReady() {
254     return !flowController.isTransmitWindowFull();
255   }
256 
notifyShutdown(Status shutdownStatus)257   abstract void notifyShutdown(Status shutdownStatus);
258 
notifyTerminated()259   abstract void notifyTerminated();
260 
releaseExecutors()261   void releaseExecutors() {
262     executorServicePool.returnObject(scheduledExecutorService);
263   }
264 
265   @GuardedBy("this")
inState(TransportState transportState)266   boolean inState(TransportState transportState) {
267     return this.transportState == transportState;
268   }
269 
270   @GuardedBy("this")
isShutdown()271   boolean isShutdown() {
272     return inState(TransportState.SHUTDOWN) || inState(TransportState.SHUTDOWN_TERMINATED);
273   }
274 
275   @GuardedBy("this")
setState(TransportState newState)276   final void setState(TransportState newState) {
277     checkTransition(transportState, newState);
278     transportState = newState;
279   }
280 
281   @GuardedBy("this")
setOutgoingBinder(OneWayBinderProxy binder)282   protected boolean setOutgoingBinder(OneWayBinderProxy binder) {
283     this.outgoingBinder = binder;
284     try {
285       binder.getDelegate().linkToDeath(this, 0);
286       return true;
287     } catch (RemoteException re) {
288       return false;
289     }
290   }
291 
292   @Override
binderDied()293   public synchronized void binderDied() {
294     shutdownInternal(Status.UNAVAILABLE.withDescription("binderDied"), true);
295   }
296 
297   @GuardedBy("this")
shutdownInternal(Status shutdownStatus, boolean forceTerminate)298   final void shutdownInternal(Status shutdownStatus, boolean forceTerminate) {
299     if (!isShutdown()) {
300       this.shutdownStatus = shutdownStatus;
301       setState(TransportState.SHUTDOWN);
302       notifyShutdown(shutdownStatus);
303     }
304     if (!inState(TransportState.SHUTDOWN_TERMINATED)
305         && (forceTerminate || ongoingCalls.isEmpty())) {
306       incomingBinder.detach();
307       setState(TransportState.SHUTDOWN_TERMINATED);
308       sendShutdownTransaction();
309       ArrayList<Inbound<?>> calls = new ArrayList<>(ongoingCalls.values());
310       ongoingCalls.clear();
311       scheduledExecutorService.execute(
312           () -> {
313             for (Inbound<?> inbound : calls) {
314               synchronized (inbound) {
315                 inbound.closeAbnormal(shutdownStatus);
316               }
317             }
318             notifyTerminated();
319             releaseExecutors();
320           });
321     }
322   }
323 
324   @GuardedBy("this")
sendSetupTransaction()325   final void sendSetupTransaction() {
326     sendSetupTransaction(checkNotNull(outgoingBinder));
327   }
328 
329   @GuardedBy("this")
sendSetupTransaction(OneWayBinderProxy iBinder)330   final void sendSetupTransaction(OneWayBinderProxy iBinder) {
331     try (ParcelHolder parcel = ParcelHolder.obtain()) {
332       parcel.get().writeInt(WIRE_FORMAT_VERSION);
333       parcel.get().writeStrongBinder(incomingBinder);
334       iBinder.transact(SETUP_TRANSPORT, parcel);
335     } catch (RemoteException re) {
336       shutdownInternal(statusFromRemoteException(re), true);
337     }
338   }
339 
340   @GuardedBy("this")
sendShutdownTransaction()341   private final void sendShutdownTransaction() {
342     if (outgoingBinder != null) {
343       try {
344         outgoingBinder.getDelegate().unlinkToDeath(this, 0);
345       } catch (NoSuchElementException e) {
346         // Ignore.
347       }
348       try (ParcelHolder parcel = ParcelHolder.obtain()) {
349         // Send empty flags to avoid a memory leak linked to empty parcels (b/207778694).
350         parcel.get().writeInt(0);
351         outgoingBinder.transact(SHUTDOWN_TRANSPORT, parcel);
352       } catch (RemoteException re) {
353         // Ignore.
354       }
355     }
356   }
357 
sendPing(int id)358   protected synchronized void sendPing(int id) throws StatusException {
359     if (inState(TransportState.SHUTDOWN_TERMINATED)) {
360       throw shutdownStatus.asException();
361     } else if (outgoingBinder == null) {
362       throw Status.FAILED_PRECONDITION.withDescription("Transport not ready.").asException();
363     } else {
364       try (ParcelHolder parcel = ParcelHolder.obtain()) {
365         parcel.get().writeInt(id);
366         outgoingBinder.transact(PING, parcel);
367       } catch (RemoteException re) {
368         throw statusFromRemoteException(re).asException();
369       }
370     }
371   }
372 
unregisterInbound(Inbound<?> inbound)373   protected void unregisterInbound(Inbound<?> inbound) {
374     unregisterCall(inbound.callId);
375   }
376 
unregisterCall(int callId)377   final void unregisterCall(int callId) {
378     boolean removed = (ongoingCalls.remove(callId) != null);
379     if (removed && ongoingCalls.isEmpty()) {
380       // Possibly shutdown (not synchronously, since inbound is held).
381       scheduledExecutorService.execute(
382           () -> {
383             synchronized (this) {
384               if (inState(TransportState.SHUTDOWN)) {
385                 // No more ongoing calls, and we're shutdown. Finish the shutdown.
386                 shutdownInternal(shutdownStatus, true);
387               }
388             }
389           });
390     }
391   }
392 
sendTransaction(int callId, ParcelHolder parcel)393   final void sendTransaction(int callId, ParcelHolder parcel) throws StatusException {
394     int dataSize = parcel.get().dataSize();
395     try {
396       outgoingBinder.transact(callId, parcel);
397     } catch (RemoteException re) {
398       throw statusFromRemoteException(re).asException();
399     }
400     if (flowController.notifyBytesSent(dataSize)) {
401       logger.log(Level.FINE, "transmit window now full " + this);
402     }
403   }
404 
sendOutOfBandClose(int callId, Status status)405   final void sendOutOfBandClose(int callId, Status status) {
406     try (ParcelHolder parcel = ParcelHolder.obtain()) {
407       parcel.get().writeInt(0); // Placeholder for flags. Will be filled in below.
408       int flags = TransactionUtils.writeStatus(parcel.get(), status);
409       TransactionUtils.fillInFlags(parcel.get(), flags | TransactionUtils.FLAG_OUT_OF_BAND_CLOSE);
410       sendTransaction(callId, parcel);
411     } catch (StatusException e) {
412       logger.log(Level.WARNING, "Failed sending oob close transaction", e);
413     }
414   }
415 
416   @Override
handleTransaction(int code, Parcel parcel)417   public final boolean handleTransaction(int code, Parcel parcel) {
418     try {
419       return handleTransactionInternal(code, parcel);
420     } catch (RuntimeException e) {
421       logger.log(Level.SEVERE,
422           "Terminating transport for uncaught Exception in transaction " + code, e);
423       synchronized (this) {
424         // This unhandled exception may have put us in an inconsistent state. Force terminate the
425         // whole transport so our peer knows something is wrong and so that clients can retry with
426         // a fresh transport instance on both sides.
427         shutdownInternal(Status.INTERNAL.withCause(e), true);
428         return false;
429       }
430     }
431   }
432 
handleTransactionInternal(int code, Parcel parcel)433   private boolean handleTransactionInternal(int code, Parcel parcel) {
434     if (code < FIRST_CALL_ID) {
435       synchronized (this) {
436         switch (code) {
437           case ACKNOWLEDGE_BYTES:
438             handleAcknowledgedBytes(parcel.readLong());
439             break;
440           case SHUTDOWN_TRANSPORT:
441             shutdownInternal(
442                 Status.UNAVAILABLE.withDescription("transport shutdown by peer"), true);
443             break;
444           case SETUP_TRANSPORT:
445             handleSetupTransport(parcel);
446             break;
447           case PING:
448             handlePing(parcel);
449             break;
450           case PING_RESPONSE:
451             handlePingResponse(parcel);
452             break;
453           default:
454             return false;
455         }
456         return true;
457       }
458     } else {
459       int size = parcel.dataSize();
460       Inbound<?> inbound = ongoingCalls.get(code);
461       if (inbound == null) {
462         synchronized (this) {
463           if (!isShutdown()) {
464             // Create a new inbound. Strictly speaking we could end up doing this twice on
465             // two threads, hence the need to use putIfAbsent, and check its result.
466             inbound = createInbound(code);
467             if (inbound != null) {
468               Inbound<?> inbound2 = ongoingCalls.putIfAbsent(code, inbound);
469               if (inbound2 != null) {
470                 inbound = inbound2;
471               }
472             }
473           }
474         }
475       }
476       if (inbound != null) {
477         inbound.handleTransaction(parcel);
478       }
479       long nib = numIncomingBytes.addAndGet(size);
480       if ((nib - acknowledgedIncomingBytes) > TRANSACTION_BYTES_WINDOW_FORCE_ACK) {
481         synchronized (this) {
482           sendAcknowledgeBytes(checkNotNull(outgoingBinder));
483         }
484       }
485       return true;
486     }
487   }
488 
489   @Nullable
490   @GuardedBy("this")
createInbound(int callId)491   protected Inbound<?> createInbound(int callId) {
492     return null;
493   }
494 
495   @GuardedBy("this")
handleSetupTransport(Parcel parcel)496   protected void handleSetupTransport(Parcel parcel) {}
497 
498   @GuardedBy("this")
handlePing(Parcel requestParcel)499   private final void handlePing(Parcel requestParcel) {
500     int id = requestParcel.readInt();
501     if (transportState == TransportState.READY) {
502       try (ParcelHolder replyParcel = ParcelHolder.obtain()) {
503         replyParcel.get().writeInt(id);
504         outgoingBinder.transact(PING_RESPONSE, replyParcel);
505       } catch (RemoteException re) {
506         // Ignore.
507       }
508     }
509   }
510 
511   @GuardedBy("this")
handlePingResponse(Parcel parcel)512   protected void handlePingResponse(Parcel parcel) {}
513 
514   @GuardedBy("this")
sendAcknowledgeBytes(OneWayBinderProxy iBinder)515   private void sendAcknowledgeBytes(OneWayBinderProxy iBinder) {
516     // Send a transaction to acknowledge reception of incoming data.
517     long n = numIncomingBytes.get();
518     acknowledgedIncomingBytes = n;
519     try (ParcelHolder parcel = ParcelHolder.obtain()) {
520       parcel.get().writeLong(n);
521       iBinder.transact(ACKNOWLEDGE_BYTES, parcel);
522     } catch (RemoteException re) {
523       shutdownInternal(statusFromRemoteException(re), true);
524     }
525   }
526 
527   @GuardedBy("this")
handleAcknowledgedBytes(long numBytes)528   final void handleAcknowledgedBytes(long numBytes) {
529     if (flowController.handleAcknowledgedBytes(numBytes)) {
530       logger.log(
531           Level.FINE,
532           "handleAcknowledgedBytes: Transmit Window No-Longer Full. Unblock calls: " + this);
533 
534       // The LinkedHashSet contract guarantees that an id already present in this collection will
535       // not lose its priority if we re-insert it here.
536       callIdsToNotifyWhenReady.addAll(ongoingCalls.keySet());
537 
538       Iterator<Integer> i = callIdsToNotifyWhenReady.iterator();
539       while (isReady() && i.hasNext()) {
540         Inbound<?> inbound = ongoingCalls.get(i.next());
541         i.remove();
542         if (inbound != null) { // Calls can be removed out from under us.
543           inbound.onTransportReady();
544         }
545       }
546     }
547   }
548 
549   /** Concrete client-side transport implementation. */
550   @ThreadSafe
551   @Internal
552   public static final class BinderClientTransport extends BinderTransport
553       implements ConnectionClientTransport, Bindable.Observer {
554 
555     private final ObjectPool<? extends Executor> offloadExecutorPool;
556     private final Executor offloadExecutor;
557     private final SecurityPolicy securityPolicy;
558     private final Bindable serviceBinding;
559     /** Number of ongoing calls which keep this transport "in-use". */
560     private final AtomicInteger numInUseStreams;
561 
562     private final PingTracker pingTracker;
563 
564     @Nullable private ManagedClientTransport.Listener clientTransportListener;
565 
566     @GuardedBy("this")
567     private int latestCallId = FIRST_CALL_ID;
568 
BinderClientTransport( Context sourceContext, AndroidComponentAddress targetAddress, BindServiceFlags bindServiceFlags, Executor mainThreadExecutor, ObjectPool<ScheduledExecutorService> executorServicePool, ObjectPool<? extends Executor> offloadExecutorPool, SecurityPolicy securityPolicy, InboundParcelablePolicy inboundParcelablePolicy, Attributes eagAttrs)569     public BinderClientTransport(
570         Context sourceContext,
571         AndroidComponentAddress targetAddress,
572         BindServiceFlags bindServiceFlags,
573         Executor mainThreadExecutor,
574         ObjectPool<ScheduledExecutorService> executorServicePool,
575         ObjectPool<? extends Executor> offloadExecutorPool,
576         SecurityPolicy securityPolicy,
577         InboundParcelablePolicy inboundParcelablePolicy,
578         Attributes eagAttrs) {
579       super(
580           executorServicePool,
581           buildClientAttributes(eagAttrs, sourceContext, targetAddress, inboundParcelablePolicy),
582           buildLogId(sourceContext, targetAddress));
583       this.offloadExecutorPool = offloadExecutorPool;
584       this.securityPolicy = securityPolicy;
585       this.offloadExecutor = offloadExecutorPool.getObject();
586       numInUseStreams = new AtomicInteger();
587       pingTracker = new PingTracker(Ticker.systemTicker(), (id) -> sendPing(id));
588 
589       serviceBinding =
590           new ServiceBinding(
591               mainThreadExecutor,
592               sourceContext,
593               targetAddress.asBindIntent(),
594               bindServiceFlags.toInteger(),
595               this);
596     }
597 
598     @Override
releaseExecutors()599     void releaseExecutors() {
600       super.releaseExecutors();
601       offloadExecutorPool.returnObject(offloadExecutor);
602     }
603 
604     @Override
onBound(IBinder binder)605     public synchronized void onBound(IBinder binder) {
606       sendSetupTransaction(OneWayBinderProxy.wrap(binder, offloadExecutor));
607     }
608 
609     @Override
onUnbound(Status reason)610     public synchronized void onUnbound(Status reason) {
611       shutdownInternal(reason, true);
612     }
613 
614     @CheckReturnValue
615     @Override
start(ManagedClientTransport.Listener clientTransportListener)616     public synchronized Runnable start(ManagedClientTransport.Listener clientTransportListener) {
617       this.clientTransportListener = checkNotNull(clientTransportListener);
618       return () -> {
619         synchronized (BinderClientTransport.this) {
620           if (inState(TransportState.NOT_STARTED)) {
621             setState(TransportState.SETUP);
622             serviceBinding.bind();
623           }
624         }
625       };
626     }
627 
628     @Override
629     public synchronized ClientStream newStream(
630         final MethodDescriptor<?, ?> method,
631         final Metadata headers,
632         final CallOptions callOptions,
633         ClientStreamTracer[] tracers) {
634       if (!inState(TransportState.READY)) {
635         return newFailingClientStream(
636             isShutdown()
637                 ? shutdownStatus
638                 : Status.INTERNAL.withDescription("newStream() before transportReady()"),
639             attributes,
640             headers,
641             tracers);
642       }
643 
644       int callId = latestCallId++;
645       if (latestCallId == LAST_CALL_ID) {
646         latestCallId = FIRST_CALL_ID;
647       }
648       StatsTraceContext statsTraceContext =
649           StatsTraceContext.newClientContext(tracers, attributes, headers);
650       Inbound.ClientInbound inbound =
651           new Inbound.ClientInbound(
652               this, attributes, callId, GrpcUtil.shouldBeCountedForInUse(callOptions));
653       if (ongoingCalls.putIfAbsent(callId, inbound) != null) {
654         Status failure = Status.INTERNAL.withDescription("Clashing call IDs");
655         shutdownInternal(failure, true);
656         return newFailingClientStream(failure, attributes, headers, tracers);
657       } else {
658         if (inbound.countsForInUse() && numInUseStreams.getAndIncrement() == 0) {
659           clientTransportListener.transportInUse(true);
660         }
661         Outbound.ClientOutbound outbound =
662             new Outbound.ClientOutbound(this, callId, method, headers, statsTraceContext);
663         if (method.getType().clientSendsOneMessage()) {
664           return new SingleMessageClientStream(inbound, outbound, attributes);
665         } else {
666           return new MultiMessageClientStream(inbound, outbound, attributes);
667         }
668       }
669     }
670 
671     @Override
672     protected void unregisterInbound(Inbound<?> inbound) {
673       if (inbound.countsForInUse() && numInUseStreams.decrementAndGet() == 0) {
674         clientTransportListener.transportInUse(false);
675       }
676       super.unregisterInbound(inbound);
677     }
678 
679     @Override
680     public void ping(final PingCallback callback, Executor executor) {
681       pingTracker.startPing(callback, executor);
682     }
683 
684     @Override
685     public synchronized void shutdown(Status reason) {
686       checkNotNull(reason, "reason");
687       shutdownInternal(reason, false);
688     }
689 
690     @Override
691     public synchronized void shutdownNow(Status reason) {
692       checkNotNull(reason, "reason");
693       shutdownInternal(reason, true);
694     }
695 
696     @Override
697     @GuardedBy("this")
698     public void notifyShutdown(Status status) {
699       clientTransportListener.transportShutdown(status);
700     }
701 
702     @Override
703     @GuardedBy("this")
704     public void notifyTerminated() {
705       if (numInUseStreams.getAndSet(0) > 0) {
706         clientTransportListener.transportInUse(false);
707       }
708       serviceBinding.unbind();
709       clientTransportListener.transportTerminated();
710     }
711 
712     @Override
713     @GuardedBy("this")
714     protected void handleSetupTransport(Parcel parcel) {
715       // Add the remote uid to our attributes.
716       attributes = setSecurityAttrs(attributes, Binder.getCallingUid());
717       if (inState(TransportState.SETUP)) {
718         int version = parcel.readInt();
719         IBinder binder = parcel.readStrongBinder();
720         if (version != WIRE_FORMAT_VERSION) {
721           shutdownInternal(
722               Status.UNAVAILABLE.withDescription("Wire format version mismatch"), true);
723         } else if (binder == null) {
724           shutdownInternal(
725               Status.UNAVAILABLE.withDescription("Malformed SETUP_TRANSPORT data"), true);
726         } else {
727           offloadExecutor.execute(() -> checkSecurityPolicy(binder));
728         }
729       }
730     }
731 
732     private void checkSecurityPolicy(IBinder binder) {
733       Status authorization;
734       Integer remoteUid;
735       synchronized (this) {
736         remoteUid = attributes.get(REMOTE_UID);
737       }
738       if (remoteUid == null) {
739         authorization = Status.UNAUTHENTICATED.withDescription("No remote UID available");
740       } else {
741         authorization = securityPolicy.checkAuthorization(remoteUid);
742       }
743       synchronized (this) {
744         if (inState(TransportState.SETUP)) {
745           if (!authorization.isOk()) {
746             shutdownInternal(authorization, true);
747           } else if (!setOutgoingBinder(OneWayBinderProxy.wrap(binder, offloadExecutor))) {
748             shutdownInternal(
749                 Status.UNAVAILABLE.withDescription("Failed to observe outgoing binder"), true);
750           } else {
751             // Check state again, since a failure inside setOutgoingBinder (or a callback it
752             // triggers), could have shut us down.
753             if (!isShutdown()) {
754               setState(TransportState.READY);
755               clientTransportListener.transportReady();
756             }
757           }
758         }
759       }
760     }
761 
762     @GuardedBy("this")
763     @Override
764     protected void handlePingResponse(Parcel parcel) {
765       pingTracker.onPingResponse(parcel.readInt());
766     }
767 
768     private static ClientStream newFailingClientStream(
769         Status failure, Attributes attributes, Metadata headers,
770         ClientStreamTracer[] tracers) {
771       StatsTraceContext statsTraceContext =
772           StatsTraceContext.newClientContext(tracers, attributes, headers);
773       statsTraceContext.clientOutboundHeaders();
774       return new FailingClientStream(failure, tracers);
775     }
776 
777     private static InternalLogId buildLogId(
778         Context sourceContext, AndroidComponentAddress targetAddress) {
779       return InternalLogId.allocate(
780           BinderClientTransport.class,
781           sourceContext.getClass().getSimpleName()
782               + "->"
783               + targetAddress.getComponent().toShortString());
784     }
785 
786     private static Attributes buildClientAttributes(
787         Attributes eagAttrs,
788         Context sourceContext,
789         AndroidComponentAddress targetAddress,
790         InboundParcelablePolicy inboundParcelablePolicy) {
791       return Attributes.newBuilder()
792           .set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.NONE) // Trust noone for now.
793           .set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttrs)
794           .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, AndroidComponentAddress.forContext(sourceContext))
795           .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, targetAddress)
796           .set(INBOUND_PARCELABLE_POLICY, inboundParcelablePolicy)
797           .build();
798     }
799 
800     private static Attributes setSecurityAttrs(Attributes attributes, int uid) {
801       return attributes.toBuilder()
802           .set(REMOTE_UID, uid)
803           .set(
804               GrpcAttributes.ATTR_SECURITY_LEVEL,
805               uid == Process.myUid()
806                   ? SecurityLevel.PRIVACY_AND_INTEGRITY
807                   : SecurityLevel.INTEGRITY) // TODO: Have the SecrityPolicy decide this.
808           .build();
809     }
810   }
811 
812   /** Concrete server-side transport implementation. */
813   @Internal
814   public static final class BinderServerTransport extends BinderTransport implements ServerTransport {
815 
816     private final List<ServerStreamTracer.Factory> streamTracerFactories;
817     @Nullable private ServerTransportListener serverTransportListener;
818 
819     public BinderServerTransport(
820         ObjectPool<ScheduledExecutorService> executorServicePool,
821         Attributes attributes,
822         List<ServerStreamTracer.Factory> streamTracerFactories,
823         IBinder callbackBinder) {
824       super(executorServicePool, attributes, buildLogId(attributes));
825       this.streamTracerFactories = streamTracerFactories;
826       // TODO(jdcormie): Plumb in the Server's executor() and use it here instead.
827       setOutgoingBinder(OneWayBinderProxy.wrap(callbackBinder, getScheduledExecutorService()));
828     }
829 
830     public synchronized void setServerTransportListener(ServerTransportListener serverTransportListener) {
831       this.serverTransportListener = serverTransportListener;
832       if (isShutdown()) {
833         setState(TransportState.SHUTDOWN_TERMINATED);
834         notifyTerminated();
835         releaseExecutors();
836       } else {
837         sendSetupTransaction();
838         // Check we're not shutdown again, since a failure inside sendSetupTransaction (or a
839         // callback it triggers), could have shut us down.
840         if (!isShutdown()) {
841           setState(TransportState.READY);
842           attributes = serverTransportListener.transportReady(attributes);
843         }
844       }
845     }
846 
847     StatsTraceContext createStatsTraceContext(String methodName, Metadata headers) {
848       return StatsTraceContext.newServerContext(streamTracerFactories, methodName, headers);
849     }
850 
851     synchronized Status startStream(ServerStream stream, String methodName, Metadata headers) {
852       if (isShutdown()) {
853         return Status.UNAVAILABLE.withDescription("transport is shutdown");
854       } else {
855         serverTransportListener.streamCreated(stream, methodName, headers);
856         return Status.OK;
857       }
858     }
859 
860     @Override
861     @GuardedBy("this")
862     public void notifyShutdown(Status status) {
863       // Nothing to do.
864     }
865 
866     @Override
867     @GuardedBy("this")
868     public void notifyTerminated() {
869       if (serverTransportListener != null) {
870         serverTransportListener.transportTerminated();
871       }
872     }
873 
874     @Override
875     public synchronized void shutdown() {
876       shutdownInternal(Status.OK, false);
877     }
878 
879     @Override
880     public synchronized void shutdownNow(Status reason) {
881       shutdownInternal(reason, true);
882     }
883 
884     @Override
885     @Nullable
886     @GuardedBy("this")
887     protected Inbound<?> createInbound(int callId) {
888       return new Inbound.ServerInbound(this, attributes, callId);
889     }
890 
891     private static InternalLogId buildLogId(Attributes attributes) {
892       return InternalLogId.allocate(
893           BinderServerTransport.class, "from " + attributes.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
894     }
895   }
896 
897   private static void checkTransition(TransportState current, TransportState next) {
898     switch (next) {
899       case SETUP:
900         checkState(current == TransportState.NOT_STARTED);
901         break;
902       case READY:
903         checkState(current == TransportState.NOT_STARTED || current == TransportState.SETUP);
904         break;
905       case SHUTDOWN:
906         checkState(
907             current == TransportState.NOT_STARTED
908                 || current == TransportState.SETUP
909                 || current == TransportState.READY);
910         break;
911       case SHUTDOWN_TERMINATED:
912         checkState(current == TransportState.SHUTDOWN);
913         break;
914       default:
915         throw new AssertionError();
916     }
917   }
918 
919   @VisibleForTesting
920   Map<Integer, Inbound<?>> getOngoingCalls() {
921     return ongoingCalls;
922   }
923 
924   private static Status statusFromRemoteException(RemoteException e) {
925     if (e instanceof DeadObjectException || e instanceof TransactionTooLargeException) {
926       // These are to be expected from time to time and can simply be retried.
927       return Status.UNAVAILABLE.withCause(e);
928     }
929     // Otherwise, this exception from transact is unexpected.
930     return Status.INTERNAL.withCause(e);
931   }
932 }
933 
934