1 /* 2 * Copyright 2020 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.binder.internal; 18 19 import static com.google.common.base.Preconditions.checkNotNull; 20 import static com.google.common.base.Preconditions.checkState; 21 import static com.google.common.util.concurrent.Futures.immediateFuture; 22 23 import android.content.Context; 24 import android.os.Binder; 25 import android.os.DeadObjectException; 26 import android.os.IBinder; 27 import android.os.Parcel; 28 import android.os.Process; 29 import android.os.RemoteException; 30 import android.os.TransactionTooLargeException; 31 import com.google.common.annotations.VisibleForTesting; 32 import com.google.common.base.Ticker; 33 import com.google.common.util.concurrent.ListenableFuture; 34 import io.grpc.Attributes; 35 import io.grpc.CallOptions; 36 import io.grpc.ClientStreamTracer; 37 import io.grpc.Grpc; 38 import io.grpc.Internal; 39 import io.grpc.InternalChannelz.SocketStats; 40 import io.grpc.InternalLogId; 41 import io.grpc.Metadata; 42 import io.grpc.MethodDescriptor; 43 import io.grpc.SecurityLevel; 44 import io.grpc.ServerStreamTracer; 45 import io.grpc.Status; 46 import io.grpc.StatusException; 47 import io.grpc.binder.AndroidComponentAddress; 48 import io.grpc.binder.BindServiceFlags; 49 import io.grpc.binder.InboundParcelablePolicy; 50 import io.grpc.binder.SecurityPolicy; 51 import io.grpc.internal.ClientStream; 52 import io.grpc.internal.ConnectionClientTransport; 53 import io.grpc.internal.FailingClientStream; 54 import io.grpc.internal.GrpcAttributes; 55 import io.grpc.internal.GrpcUtil; 56 import io.grpc.internal.ManagedClientTransport; 57 import io.grpc.internal.ObjectPool; 58 import io.grpc.internal.ServerStream; 59 import io.grpc.internal.ServerTransport; 60 import io.grpc.internal.ServerTransportListener; 61 import io.grpc.internal.StatsTraceContext; 62 import java.util.ArrayList; 63 import java.util.Iterator; 64 import java.util.LinkedHashSet; 65 import java.util.List; 66 import java.util.Map; 67 import java.util.NoSuchElementException; 68 import java.util.concurrent.ConcurrentHashMap; 69 import java.util.concurrent.Executor; 70 import java.util.concurrent.ScheduledExecutorService; 71 import java.util.concurrent.atomic.AtomicInteger; 72 import java.util.concurrent.atomic.AtomicLong; 73 import java.util.logging.Level; 74 import java.util.logging.Logger; 75 import javax.annotation.CheckReturnValue; 76 import javax.annotation.Nullable; 77 import javax.annotation.concurrent.GuardedBy; 78 import javax.annotation.concurrent.ThreadSafe; 79 80 /** 81 * Base class for binder-based gRPC transport. 82 * 83 * <p>This is used on both the client and service sides of the transport. 84 * 85 * <p>A note on synchronization. The nature of this class's interaction with each stream 86 * (bi-directional communication between gRPC calls and binder transactions) means that acquiring 87 * multiple locks in two different orders can happen easily. E.g. binder transactions will arrive in 88 * this class, and need to passed to a stream instance, whereas gRPC calls on a stream instance will 89 * need to call into this class to send a transaction (possibly waiting for the transport to become 90 * ready). 91 * 92 * <p>The split between Outbound & Inbound helps reduce this risk, but not entirely remove it. 93 * 94 * <p>For this reason, while most state within this class is guarded by this instance, methods 95 * exposed to individual stream instances need to use atomic or volatile types, since those calls 96 * will already be synchronized on the individual RPC objects. 97 * 98 * <p><b>IMPORTANT</b>: This implementation must comply with this published wire format. 99 * https://github.com/grpc/proposal/blob/master/L73-java-binderchannel/wireformat.md 100 */ 101 @ThreadSafe 102 public abstract class BinderTransport 103 implements LeakSafeOneWayBinder.TransactionHandler, IBinder.DeathRecipient { 104 105 private static final Logger logger = Logger.getLogger(BinderTransport.class.getName()); 106 107 /** 108 * Attribute used to store the Android UID of the remote app. This is guaranteed to be set on any 109 * active transport. 110 */ 111 @Internal 112 public static final Attributes.Key<Integer> REMOTE_UID = 113 Attributes.Key.create("internal:remote-uid"); 114 115 /** The authority of the server. */ 116 @Internal 117 public static final Attributes.Key<String> SERVER_AUTHORITY = 118 Attributes.Key.create("internal:server-authority"); 119 120 /** A transport attribute to hold the {@link InboundParcelablePolicy}. */ 121 @Internal 122 public static final Attributes.Key<InboundParcelablePolicy> INBOUND_PARCELABLE_POLICY = 123 Attributes.Key.create("internal:inbound-parcelable-policy"); 124 125 /** 126 * Version code for this wire format. 127 * 128 * <p>Should this change, we should still endeavor to support earlier wire-format versions. If 129 * that's not possible, {@link EARLIEST_SUPPORTED_WIRE_FORMAT_VERSION} should be updated below. 130 */ 131 @Internal 132 public static final int WIRE_FORMAT_VERSION = 1; 133 134 /** The version code of the earliest wire format we support. */ 135 @Internal 136 public static final int EARLIEST_SUPPORTED_WIRE_FORMAT_VERSION = 1; 137 138 /** The max number of "in-flight" bytes before we start buffering transactions. */ 139 private static final int TRANSACTION_BYTES_WINDOW = 128 * 1024; 140 141 /** The number of in-flight bytes we should receive between sendings acks to our peer. */ 142 private static final int TRANSACTION_BYTES_WINDOW_FORCE_ACK = 16 * 1024; 143 144 /** 145 * Sent from the client to host service binder to initiate a new transport, and from the host to 146 * the binder. and from the host s Followed by: int wire_protocol_version IBinder 147 * client_transports_callback_binder 148 */ 149 @Internal 150 public static final int SETUP_TRANSPORT = IBinder.FIRST_CALL_TRANSACTION; 151 152 /** Send to shutdown the transport from either end. */ 153 @Internal 154 public static final int SHUTDOWN_TRANSPORT = IBinder.FIRST_CALL_TRANSACTION + 1; 155 156 /** Send to acknowledge receipt of rpc bytes, for flow control. */ 157 static final int ACKNOWLEDGE_BYTES = IBinder.FIRST_CALL_TRANSACTION + 2; 158 159 /** A ping request. */ 160 private static final int PING = IBinder.FIRST_CALL_TRANSACTION + 3; 161 162 /** A response to a ping. */ 163 private static final int PING_RESPONSE = IBinder.FIRST_CALL_TRANSACTION + 4; 164 165 /** Reserved transaction IDs for any special events we might need. */ 166 private static final int RESERVED_TRANSACTIONS = 1000; 167 168 /** The first call ID we can use. */ 169 private static final int FIRST_CALL_ID = IBinder.FIRST_CALL_TRANSACTION + RESERVED_TRANSACTIONS; 170 171 /** The last call ID we can use. */ 172 private static final int LAST_CALL_ID = IBinder.LAST_CALL_TRANSACTION; 173 174 /** The states of this transport. */ 175 protected enum TransportState { 176 NOT_STARTED, // We haven't been started yet. 177 SETUP, // We're setting up the connection. 178 READY, // The transport is ready. 179 SHUTDOWN, // We've been shutdown and won't accept any additional calls (thought existing calls 180 // may continue). 181 SHUTDOWN_TERMINATED // We've been shutdown completely (or we failed to start). We can't send or 182 // receive any data. 183 } 184 185 private final ObjectPool<ScheduledExecutorService> executorServicePool; 186 private final ScheduledExecutorService scheduledExecutorService; 187 private final InternalLogId logId; 188 private final LeakSafeOneWayBinder incomingBinder; 189 190 protected final ConcurrentHashMap<Integer, Inbound<?>> ongoingCalls; 191 192 @GuardedBy("this") 193 private final LinkedHashSet<Integer> callIdsToNotifyWhenReady = new LinkedHashSet<>(); 194 195 @GuardedBy("this") 196 protected Attributes attributes; 197 198 @GuardedBy("this") 199 private TransportState transportState = TransportState.NOT_STARTED; 200 201 @GuardedBy("this") 202 @Nullable 203 protected Status shutdownStatus; 204 205 @Nullable private OneWayBinderProxy outgoingBinder; 206 207 private final FlowController flowController; 208 209 /** The number of incoming bytes we've received. */ 210 private final AtomicLong numIncomingBytes; 211 212 /** The number of incoming bytes we've told our peer we've received. */ 213 private long acknowledgedIncomingBytes; 214 BinderTransport( ObjectPool<ScheduledExecutorService> executorServicePool, Attributes attributes, InternalLogId logId)215 private BinderTransport( 216 ObjectPool<ScheduledExecutorService> executorServicePool, 217 Attributes attributes, 218 InternalLogId logId) { 219 this.executorServicePool = executorServicePool; 220 this.attributes = attributes; 221 this.logId = logId; 222 scheduledExecutorService = executorServicePool.getObject(); 223 incomingBinder = new LeakSafeOneWayBinder(this); 224 ongoingCalls = new ConcurrentHashMap<>(); 225 flowController = new FlowController(TRANSACTION_BYTES_WINDOW); 226 numIncomingBytes = new AtomicLong(); 227 } 228 229 // Override in child class. getScheduledExecutorService()230 public final ScheduledExecutorService getScheduledExecutorService() { 231 return scheduledExecutorService; 232 } 233 234 // Override in child class. getStats()235 public final ListenableFuture<SocketStats> getStats() { 236 return immediateFuture(null); 237 } 238 239 // Override in child class. getLogId()240 public final InternalLogId getLogId() { 241 return logId; 242 } 243 244 // Override in child class. getAttributes()245 public final synchronized Attributes getAttributes() { 246 return attributes; 247 } 248 249 /** 250 * Returns whether this transport is able to send rpc transactions. Intentionally unsynchronized 251 * since this will be called while Outbound is held. 252 */ isReady()253 final boolean isReady() { 254 return !flowController.isTransmitWindowFull(); 255 } 256 notifyShutdown(Status shutdownStatus)257 abstract void notifyShutdown(Status shutdownStatus); 258 notifyTerminated()259 abstract void notifyTerminated(); 260 releaseExecutors()261 void releaseExecutors() { 262 executorServicePool.returnObject(scheduledExecutorService); 263 } 264 265 @GuardedBy("this") inState(TransportState transportState)266 boolean inState(TransportState transportState) { 267 return this.transportState == transportState; 268 } 269 270 @GuardedBy("this") isShutdown()271 boolean isShutdown() { 272 return inState(TransportState.SHUTDOWN) || inState(TransportState.SHUTDOWN_TERMINATED); 273 } 274 275 @GuardedBy("this") setState(TransportState newState)276 final void setState(TransportState newState) { 277 checkTransition(transportState, newState); 278 transportState = newState; 279 } 280 281 @GuardedBy("this") setOutgoingBinder(OneWayBinderProxy binder)282 protected boolean setOutgoingBinder(OneWayBinderProxy binder) { 283 this.outgoingBinder = binder; 284 try { 285 binder.getDelegate().linkToDeath(this, 0); 286 return true; 287 } catch (RemoteException re) { 288 return false; 289 } 290 } 291 292 @Override binderDied()293 public synchronized void binderDied() { 294 shutdownInternal(Status.UNAVAILABLE.withDescription("binderDied"), true); 295 } 296 297 @GuardedBy("this") shutdownInternal(Status shutdownStatus, boolean forceTerminate)298 final void shutdownInternal(Status shutdownStatus, boolean forceTerminate) { 299 if (!isShutdown()) { 300 this.shutdownStatus = shutdownStatus; 301 setState(TransportState.SHUTDOWN); 302 notifyShutdown(shutdownStatus); 303 } 304 if (!inState(TransportState.SHUTDOWN_TERMINATED) 305 && (forceTerminate || ongoingCalls.isEmpty())) { 306 incomingBinder.detach(); 307 setState(TransportState.SHUTDOWN_TERMINATED); 308 sendShutdownTransaction(); 309 ArrayList<Inbound<?>> calls = new ArrayList<>(ongoingCalls.values()); 310 ongoingCalls.clear(); 311 scheduledExecutorService.execute( 312 () -> { 313 for (Inbound<?> inbound : calls) { 314 synchronized (inbound) { 315 inbound.closeAbnormal(shutdownStatus); 316 } 317 } 318 notifyTerminated(); 319 releaseExecutors(); 320 }); 321 } 322 } 323 324 @GuardedBy("this") sendSetupTransaction()325 final void sendSetupTransaction() { 326 sendSetupTransaction(checkNotNull(outgoingBinder)); 327 } 328 329 @GuardedBy("this") sendSetupTransaction(OneWayBinderProxy iBinder)330 final void sendSetupTransaction(OneWayBinderProxy iBinder) { 331 try (ParcelHolder parcel = ParcelHolder.obtain()) { 332 parcel.get().writeInt(WIRE_FORMAT_VERSION); 333 parcel.get().writeStrongBinder(incomingBinder); 334 iBinder.transact(SETUP_TRANSPORT, parcel); 335 } catch (RemoteException re) { 336 shutdownInternal(statusFromRemoteException(re), true); 337 } 338 } 339 340 @GuardedBy("this") sendShutdownTransaction()341 private final void sendShutdownTransaction() { 342 if (outgoingBinder != null) { 343 try { 344 outgoingBinder.getDelegate().unlinkToDeath(this, 0); 345 } catch (NoSuchElementException e) { 346 // Ignore. 347 } 348 try (ParcelHolder parcel = ParcelHolder.obtain()) { 349 // Send empty flags to avoid a memory leak linked to empty parcels (b/207778694). 350 parcel.get().writeInt(0); 351 outgoingBinder.transact(SHUTDOWN_TRANSPORT, parcel); 352 } catch (RemoteException re) { 353 // Ignore. 354 } 355 } 356 } 357 sendPing(int id)358 protected synchronized void sendPing(int id) throws StatusException { 359 if (inState(TransportState.SHUTDOWN_TERMINATED)) { 360 throw shutdownStatus.asException(); 361 } else if (outgoingBinder == null) { 362 throw Status.FAILED_PRECONDITION.withDescription("Transport not ready.").asException(); 363 } else { 364 try (ParcelHolder parcel = ParcelHolder.obtain()) { 365 parcel.get().writeInt(id); 366 outgoingBinder.transact(PING, parcel); 367 } catch (RemoteException re) { 368 throw statusFromRemoteException(re).asException(); 369 } 370 } 371 } 372 unregisterInbound(Inbound<?> inbound)373 protected void unregisterInbound(Inbound<?> inbound) { 374 unregisterCall(inbound.callId); 375 } 376 unregisterCall(int callId)377 final void unregisterCall(int callId) { 378 boolean removed = (ongoingCalls.remove(callId) != null); 379 if (removed && ongoingCalls.isEmpty()) { 380 // Possibly shutdown (not synchronously, since inbound is held). 381 scheduledExecutorService.execute( 382 () -> { 383 synchronized (this) { 384 if (inState(TransportState.SHUTDOWN)) { 385 // No more ongoing calls, and we're shutdown. Finish the shutdown. 386 shutdownInternal(shutdownStatus, true); 387 } 388 } 389 }); 390 } 391 } 392 sendTransaction(int callId, ParcelHolder parcel)393 final void sendTransaction(int callId, ParcelHolder parcel) throws StatusException { 394 int dataSize = parcel.get().dataSize(); 395 try { 396 outgoingBinder.transact(callId, parcel); 397 } catch (RemoteException re) { 398 throw statusFromRemoteException(re).asException(); 399 } 400 if (flowController.notifyBytesSent(dataSize)) { 401 logger.log(Level.FINE, "transmit window now full " + this); 402 } 403 } 404 sendOutOfBandClose(int callId, Status status)405 final void sendOutOfBandClose(int callId, Status status) { 406 try (ParcelHolder parcel = ParcelHolder.obtain()) { 407 parcel.get().writeInt(0); // Placeholder for flags. Will be filled in below. 408 int flags = TransactionUtils.writeStatus(parcel.get(), status); 409 TransactionUtils.fillInFlags(parcel.get(), flags | TransactionUtils.FLAG_OUT_OF_BAND_CLOSE); 410 sendTransaction(callId, parcel); 411 } catch (StatusException e) { 412 logger.log(Level.WARNING, "Failed sending oob close transaction", e); 413 } 414 } 415 416 @Override handleTransaction(int code, Parcel parcel)417 public final boolean handleTransaction(int code, Parcel parcel) { 418 try { 419 return handleTransactionInternal(code, parcel); 420 } catch (RuntimeException e) { 421 logger.log(Level.SEVERE, 422 "Terminating transport for uncaught Exception in transaction " + code, e); 423 synchronized (this) { 424 // This unhandled exception may have put us in an inconsistent state. Force terminate the 425 // whole transport so our peer knows something is wrong and so that clients can retry with 426 // a fresh transport instance on both sides. 427 shutdownInternal(Status.INTERNAL.withCause(e), true); 428 return false; 429 } 430 } 431 } 432 handleTransactionInternal(int code, Parcel parcel)433 private boolean handleTransactionInternal(int code, Parcel parcel) { 434 if (code < FIRST_CALL_ID) { 435 synchronized (this) { 436 switch (code) { 437 case ACKNOWLEDGE_BYTES: 438 handleAcknowledgedBytes(parcel.readLong()); 439 break; 440 case SHUTDOWN_TRANSPORT: 441 shutdownInternal( 442 Status.UNAVAILABLE.withDescription("transport shutdown by peer"), true); 443 break; 444 case SETUP_TRANSPORT: 445 handleSetupTransport(parcel); 446 break; 447 case PING: 448 handlePing(parcel); 449 break; 450 case PING_RESPONSE: 451 handlePingResponse(parcel); 452 break; 453 default: 454 return false; 455 } 456 return true; 457 } 458 } else { 459 int size = parcel.dataSize(); 460 Inbound<?> inbound = ongoingCalls.get(code); 461 if (inbound == null) { 462 synchronized (this) { 463 if (!isShutdown()) { 464 // Create a new inbound. Strictly speaking we could end up doing this twice on 465 // two threads, hence the need to use putIfAbsent, and check its result. 466 inbound = createInbound(code); 467 if (inbound != null) { 468 Inbound<?> inbound2 = ongoingCalls.putIfAbsent(code, inbound); 469 if (inbound2 != null) { 470 inbound = inbound2; 471 } 472 } 473 } 474 } 475 } 476 if (inbound != null) { 477 inbound.handleTransaction(parcel); 478 } 479 long nib = numIncomingBytes.addAndGet(size); 480 if ((nib - acknowledgedIncomingBytes) > TRANSACTION_BYTES_WINDOW_FORCE_ACK) { 481 synchronized (this) { 482 sendAcknowledgeBytes(checkNotNull(outgoingBinder)); 483 } 484 } 485 return true; 486 } 487 } 488 489 @Nullable 490 @GuardedBy("this") createInbound(int callId)491 protected Inbound<?> createInbound(int callId) { 492 return null; 493 } 494 495 @GuardedBy("this") handleSetupTransport(Parcel parcel)496 protected void handleSetupTransport(Parcel parcel) {} 497 498 @GuardedBy("this") handlePing(Parcel requestParcel)499 private final void handlePing(Parcel requestParcel) { 500 int id = requestParcel.readInt(); 501 if (transportState == TransportState.READY) { 502 try (ParcelHolder replyParcel = ParcelHolder.obtain()) { 503 replyParcel.get().writeInt(id); 504 outgoingBinder.transact(PING_RESPONSE, replyParcel); 505 } catch (RemoteException re) { 506 // Ignore. 507 } 508 } 509 } 510 511 @GuardedBy("this") handlePingResponse(Parcel parcel)512 protected void handlePingResponse(Parcel parcel) {} 513 514 @GuardedBy("this") sendAcknowledgeBytes(OneWayBinderProxy iBinder)515 private void sendAcknowledgeBytes(OneWayBinderProxy iBinder) { 516 // Send a transaction to acknowledge reception of incoming data. 517 long n = numIncomingBytes.get(); 518 acknowledgedIncomingBytes = n; 519 try (ParcelHolder parcel = ParcelHolder.obtain()) { 520 parcel.get().writeLong(n); 521 iBinder.transact(ACKNOWLEDGE_BYTES, parcel); 522 } catch (RemoteException re) { 523 shutdownInternal(statusFromRemoteException(re), true); 524 } 525 } 526 527 @GuardedBy("this") handleAcknowledgedBytes(long numBytes)528 final void handleAcknowledgedBytes(long numBytes) { 529 if (flowController.handleAcknowledgedBytes(numBytes)) { 530 logger.log( 531 Level.FINE, 532 "handleAcknowledgedBytes: Transmit Window No-Longer Full. Unblock calls: " + this); 533 534 // The LinkedHashSet contract guarantees that an id already present in this collection will 535 // not lose its priority if we re-insert it here. 536 callIdsToNotifyWhenReady.addAll(ongoingCalls.keySet()); 537 538 Iterator<Integer> i = callIdsToNotifyWhenReady.iterator(); 539 while (isReady() && i.hasNext()) { 540 Inbound<?> inbound = ongoingCalls.get(i.next()); 541 i.remove(); 542 if (inbound != null) { // Calls can be removed out from under us. 543 inbound.onTransportReady(); 544 } 545 } 546 } 547 } 548 549 /** Concrete client-side transport implementation. */ 550 @ThreadSafe 551 @Internal 552 public static final class BinderClientTransport extends BinderTransport 553 implements ConnectionClientTransport, Bindable.Observer { 554 555 private final ObjectPool<? extends Executor> offloadExecutorPool; 556 private final Executor offloadExecutor; 557 private final SecurityPolicy securityPolicy; 558 private final Bindable serviceBinding; 559 /** Number of ongoing calls which keep this transport "in-use". */ 560 private final AtomicInteger numInUseStreams; 561 562 private final PingTracker pingTracker; 563 564 @Nullable private ManagedClientTransport.Listener clientTransportListener; 565 566 @GuardedBy("this") 567 private int latestCallId = FIRST_CALL_ID; 568 BinderClientTransport( Context sourceContext, AndroidComponentAddress targetAddress, BindServiceFlags bindServiceFlags, Executor mainThreadExecutor, ObjectPool<ScheduledExecutorService> executorServicePool, ObjectPool<? extends Executor> offloadExecutorPool, SecurityPolicy securityPolicy, InboundParcelablePolicy inboundParcelablePolicy, Attributes eagAttrs)569 public BinderClientTransport( 570 Context sourceContext, 571 AndroidComponentAddress targetAddress, 572 BindServiceFlags bindServiceFlags, 573 Executor mainThreadExecutor, 574 ObjectPool<ScheduledExecutorService> executorServicePool, 575 ObjectPool<? extends Executor> offloadExecutorPool, 576 SecurityPolicy securityPolicy, 577 InboundParcelablePolicy inboundParcelablePolicy, 578 Attributes eagAttrs) { 579 super( 580 executorServicePool, 581 buildClientAttributes(eagAttrs, sourceContext, targetAddress, inboundParcelablePolicy), 582 buildLogId(sourceContext, targetAddress)); 583 this.offloadExecutorPool = offloadExecutorPool; 584 this.securityPolicy = securityPolicy; 585 this.offloadExecutor = offloadExecutorPool.getObject(); 586 numInUseStreams = new AtomicInteger(); 587 pingTracker = new PingTracker(Ticker.systemTicker(), (id) -> sendPing(id)); 588 589 serviceBinding = 590 new ServiceBinding( 591 mainThreadExecutor, 592 sourceContext, 593 targetAddress.asBindIntent(), 594 bindServiceFlags.toInteger(), 595 this); 596 } 597 598 @Override releaseExecutors()599 void releaseExecutors() { 600 super.releaseExecutors(); 601 offloadExecutorPool.returnObject(offloadExecutor); 602 } 603 604 @Override onBound(IBinder binder)605 public synchronized void onBound(IBinder binder) { 606 sendSetupTransaction(OneWayBinderProxy.wrap(binder, offloadExecutor)); 607 } 608 609 @Override onUnbound(Status reason)610 public synchronized void onUnbound(Status reason) { 611 shutdownInternal(reason, true); 612 } 613 614 @CheckReturnValue 615 @Override start(ManagedClientTransport.Listener clientTransportListener)616 public synchronized Runnable start(ManagedClientTransport.Listener clientTransportListener) { 617 this.clientTransportListener = checkNotNull(clientTransportListener); 618 return () -> { 619 synchronized (BinderClientTransport.this) { 620 if (inState(TransportState.NOT_STARTED)) { 621 setState(TransportState.SETUP); 622 serviceBinding.bind(); 623 } 624 } 625 }; 626 } 627 628 @Override 629 public synchronized ClientStream newStream( 630 final MethodDescriptor<?, ?> method, 631 final Metadata headers, 632 final CallOptions callOptions, 633 ClientStreamTracer[] tracers) { 634 if (!inState(TransportState.READY)) { 635 return newFailingClientStream( 636 isShutdown() 637 ? shutdownStatus 638 : Status.INTERNAL.withDescription("newStream() before transportReady()"), 639 attributes, 640 headers, 641 tracers); 642 } 643 644 int callId = latestCallId++; 645 if (latestCallId == LAST_CALL_ID) { 646 latestCallId = FIRST_CALL_ID; 647 } 648 StatsTraceContext statsTraceContext = 649 StatsTraceContext.newClientContext(tracers, attributes, headers); 650 Inbound.ClientInbound inbound = 651 new Inbound.ClientInbound( 652 this, attributes, callId, GrpcUtil.shouldBeCountedForInUse(callOptions)); 653 if (ongoingCalls.putIfAbsent(callId, inbound) != null) { 654 Status failure = Status.INTERNAL.withDescription("Clashing call IDs"); 655 shutdownInternal(failure, true); 656 return newFailingClientStream(failure, attributes, headers, tracers); 657 } else { 658 if (inbound.countsForInUse() && numInUseStreams.getAndIncrement() == 0) { 659 clientTransportListener.transportInUse(true); 660 } 661 Outbound.ClientOutbound outbound = 662 new Outbound.ClientOutbound(this, callId, method, headers, statsTraceContext); 663 if (method.getType().clientSendsOneMessage()) { 664 return new SingleMessageClientStream(inbound, outbound, attributes); 665 } else { 666 return new MultiMessageClientStream(inbound, outbound, attributes); 667 } 668 } 669 } 670 671 @Override 672 protected void unregisterInbound(Inbound<?> inbound) { 673 if (inbound.countsForInUse() && numInUseStreams.decrementAndGet() == 0) { 674 clientTransportListener.transportInUse(false); 675 } 676 super.unregisterInbound(inbound); 677 } 678 679 @Override 680 public void ping(final PingCallback callback, Executor executor) { 681 pingTracker.startPing(callback, executor); 682 } 683 684 @Override 685 public synchronized void shutdown(Status reason) { 686 checkNotNull(reason, "reason"); 687 shutdownInternal(reason, false); 688 } 689 690 @Override 691 public synchronized void shutdownNow(Status reason) { 692 checkNotNull(reason, "reason"); 693 shutdownInternal(reason, true); 694 } 695 696 @Override 697 @GuardedBy("this") 698 public void notifyShutdown(Status status) { 699 clientTransportListener.transportShutdown(status); 700 } 701 702 @Override 703 @GuardedBy("this") 704 public void notifyTerminated() { 705 if (numInUseStreams.getAndSet(0) > 0) { 706 clientTransportListener.transportInUse(false); 707 } 708 serviceBinding.unbind(); 709 clientTransportListener.transportTerminated(); 710 } 711 712 @Override 713 @GuardedBy("this") 714 protected void handleSetupTransport(Parcel parcel) { 715 // Add the remote uid to our attributes. 716 attributes = setSecurityAttrs(attributes, Binder.getCallingUid()); 717 if (inState(TransportState.SETUP)) { 718 int version = parcel.readInt(); 719 IBinder binder = parcel.readStrongBinder(); 720 if (version != WIRE_FORMAT_VERSION) { 721 shutdownInternal( 722 Status.UNAVAILABLE.withDescription("Wire format version mismatch"), true); 723 } else if (binder == null) { 724 shutdownInternal( 725 Status.UNAVAILABLE.withDescription("Malformed SETUP_TRANSPORT data"), true); 726 } else { 727 offloadExecutor.execute(() -> checkSecurityPolicy(binder)); 728 } 729 } 730 } 731 732 private void checkSecurityPolicy(IBinder binder) { 733 Status authorization; 734 Integer remoteUid; 735 synchronized (this) { 736 remoteUid = attributes.get(REMOTE_UID); 737 } 738 if (remoteUid == null) { 739 authorization = Status.UNAUTHENTICATED.withDescription("No remote UID available"); 740 } else { 741 authorization = securityPolicy.checkAuthorization(remoteUid); 742 } 743 synchronized (this) { 744 if (inState(TransportState.SETUP)) { 745 if (!authorization.isOk()) { 746 shutdownInternal(authorization, true); 747 } else if (!setOutgoingBinder(OneWayBinderProxy.wrap(binder, offloadExecutor))) { 748 shutdownInternal( 749 Status.UNAVAILABLE.withDescription("Failed to observe outgoing binder"), true); 750 } else { 751 // Check state again, since a failure inside setOutgoingBinder (or a callback it 752 // triggers), could have shut us down. 753 if (!isShutdown()) { 754 setState(TransportState.READY); 755 clientTransportListener.transportReady(); 756 } 757 } 758 } 759 } 760 } 761 762 @GuardedBy("this") 763 @Override 764 protected void handlePingResponse(Parcel parcel) { 765 pingTracker.onPingResponse(parcel.readInt()); 766 } 767 768 private static ClientStream newFailingClientStream( 769 Status failure, Attributes attributes, Metadata headers, 770 ClientStreamTracer[] tracers) { 771 StatsTraceContext statsTraceContext = 772 StatsTraceContext.newClientContext(tracers, attributes, headers); 773 statsTraceContext.clientOutboundHeaders(); 774 return new FailingClientStream(failure, tracers); 775 } 776 777 private static InternalLogId buildLogId( 778 Context sourceContext, AndroidComponentAddress targetAddress) { 779 return InternalLogId.allocate( 780 BinderClientTransport.class, 781 sourceContext.getClass().getSimpleName() 782 + "->" 783 + targetAddress.getComponent().toShortString()); 784 } 785 786 private static Attributes buildClientAttributes( 787 Attributes eagAttrs, 788 Context sourceContext, 789 AndroidComponentAddress targetAddress, 790 InboundParcelablePolicy inboundParcelablePolicy) { 791 return Attributes.newBuilder() 792 .set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.NONE) // Trust noone for now. 793 .set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttrs) 794 .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, AndroidComponentAddress.forContext(sourceContext)) 795 .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, targetAddress) 796 .set(INBOUND_PARCELABLE_POLICY, inboundParcelablePolicy) 797 .build(); 798 } 799 800 private static Attributes setSecurityAttrs(Attributes attributes, int uid) { 801 return attributes.toBuilder() 802 .set(REMOTE_UID, uid) 803 .set( 804 GrpcAttributes.ATTR_SECURITY_LEVEL, 805 uid == Process.myUid() 806 ? SecurityLevel.PRIVACY_AND_INTEGRITY 807 : SecurityLevel.INTEGRITY) // TODO: Have the SecrityPolicy decide this. 808 .build(); 809 } 810 } 811 812 /** Concrete server-side transport implementation. */ 813 @Internal 814 public static final class BinderServerTransport extends BinderTransport implements ServerTransport { 815 816 private final List<ServerStreamTracer.Factory> streamTracerFactories; 817 @Nullable private ServerTransportListener serverTransportListener; 818 819 public BinderServerTransport( 820 ObjectPool<ScheduledExecutorService> executorServicePool, 821 Attributes attributes, 822 List<ServerStreamTracer.Factory> streamTracerFactories, 823 IBinder callbackBinder) { 824 super(executorServicePool, attributes, buildLogId(attributes)); 825 this.streamTracerFactories = streamTracerFactories; 826 // TODO(jdcormie): Plumb in the Server's executor() and use it here instead. 827 setOutgoingBinder(OneWayBinderProxy.wrap(callbackBinder, getScheduledExecutorService())); 828 } 829 830 public synchronized void setServerTransportListener(ServerTransportListener serverTransportListener) { 831 this.serverTransportListener = serverTransportListener; 832 if (isShutdown()) { 833 setState(TransportState.SHUTDOWN_TERMINATED); 834 notifyTerminated(); 835 releaseExecutors(); 836 } else { 837 sendSetupTransaction(); 838 // Check we're not shutdown again, since a failure inside sendSetupTransaction (or a 839 // callback it triggers), could have shut us down. 840 if (!isShutdown()) { 841 setState(TransportState.READY); 842 attributes = serverTransportListener.transportReady(attributes); 843 } 844 } 845 } 846 847 StatsTraceContext createStatsTraceContext(String methodName, Metadata headers) { 848 return StatsTraceContext.newServerContext(streamTracerFactories, methodName, headers); 849 } 850 851 synchronized Status startStream(ServerStream stream, String methodName, Metadata headers) { 852 if (isShutdown()) { 853 return Status.UNAVAILABLE.withDescription("transport is shutdown"); 854 } else { 855 serverTransportListener.streamCreated(stream, methodName, headers); 856 return Status.OK; 857 } 858 } 859 860 @Override 861 @GuardedBy("this") 862 public void notifyShutdown(Status status) { 863 // Nothing to do. 864 } 865 866 @Override 867 @GuardedBy("this") 868 public void notifyTerminated() { 869 if (serverTransportListener != null) { 870 serverTransportListener.transportTerminated(); 871 } 872 } 873 874 @Override 875 public synchronized void shutdown() { 876 shutdownInternal(Status.OK, false); 877 } 878 879 @Override 880 public synchronized void shutdownNow(Status reason) { 881 shutdownInternal(reason, true); 882 } 883 884 @Override 885 @Nullable 886 @GuardedBy("this") 887 protected Inbound<?> createInbound(int callId) { 888 return new Inbound.ServerInbound(this, attributes, callId); 889 } 890 891 private static InternalLogId buildLogId(Attributes attributes) { 892 return InternalLogId.allocate( 893 BinderServerTransport.class, "from " + attributes.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR)); 894 } 895 } 896 897 private static void checkTransition(TransportState current, TransportState next) { 898 switch (next) { 899 case SETUP: 900 checkState(current == TransportState.NOT_STARTED); 901 break; 902 case READY: 903 checkState(current == TransportState.NOT_STARTED || current == TransportState.SETUP); 904 break; 905 case SHUTDOWN: 906 checkState( 907 current == TransportState.NOT_STARTED 908 || current == TransportState.SETUP 909 || current == TransportState.READY); 910 break; 911 case SHUTDOWN_TERMINATED: 912 checkState(current == TransportState.SHUTDOWN); 913 break; 914 default: 915 throw new AssertionError(); 916 } 917 } 918 919 @VisibleForTesting 920 Map<Integer, Inbound<?>> getOngoingCalls() { 921 return ongoingCalls; 922 } 923 924 private static Status statusFromRemoteException(RemoteException e) { 925 if (e instanceof DeadObjectException || e instanceof TransactionTooLargeException) { 926 // These are to be expected from time to time and can simply be retried. 927 return Status.UNAVAILABLE.withCause(e); 928 } 929 // Otherwise, this exception from transact is unexpected. 930 return Status.INTERNAL.withCause(e); 931 } 932 } 933 934