xref: /aosp_15_r20/external/grpc-grpc-java/cronet/src/main/java/io/grpc/cronet/CronetClientTransport.java (revision e07d83d3ffcef9ecfc9f7f475418ec639ff0e5fe)
1 /*
2  * Copyright 2016 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.cronet;
18 
19 import com.google.common.base.Preconditions;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import com.google.common.util.concurrent.SettableFuture;
22 import io.grpc.Attributes;
23 import io.grpc.CallOptions;
24 import io.grpc.ClientStreamTracer;
25 import io.grpc.InternalChannelz.SocketStats;
26 import io.grpc.InternalLogId;
27 import io.grpc.Metadata;
28 import io.grpc.MethodDescriptor;
29 import io.grpc.SecurityLevel;
30 import io.grpc.Status;
31 import io.grpc.Status.Code;
32 import io.grpc.cronet.CronetChannelBuilder.StreamBuilderFactory;
33 import io.grpc.internal.ConnectionClientTransport;
34 import io.grpc.internal.GrpcAttributes;
35 import io.grpc.internal.GrpcUtil;
36 import io.grpc.internal.StatsTraceContext;
37 import io.grpc.internal.TransportTracer;
38 import java.net.InetSocketAddress;
39 import java.util.ArrayList;
40 import java.util.Collections;
41 import java.util.IdentityHashMap;
42 import java.util.Set;
43 import java.util.concurrent.Executor;
44 import javax.annotation.Nullable;
45 import javax.annotation.concurrent.GuardedBy;
46 
47 /**
48  * A cronet-based {@link ConnectionClientTransport} implementation.
49  */
50 class CronetClientTransport implements ConnectionClientTransport {
51   private final InternalLogId logId;
52   private final InetSocketAddress address;
53   private final String authority;
54   private final String userAgent;
55   private Listener listener;
56   private final Object lock = new Object();
57   @GuardedBy("lock")
58   private final Set<CronetClientStream> streams = Collections.newSetFromMap(
59           new IdentityHashMap<CronetClientStream, Boolean>());
60   private final Executor executor;
61   private final int maxMessageSize;
62   private final boolean alwaysUsePut;
63   private final TransportTracer transportTracer;
64   private final Attributes attrs;
65   private final boolean useGetForSafeMethods;
66   private final boolean usePutForIdempotentMethods;
67   // Indicates the transport is in go-away state: no new streams will be processed,
68   // but existing streams may continue.
69   @GuardedBy("lock")
70   private boolean goAway;
71   // Used to indicate the special phase while we are going to enter go-away state but before
72   // goAway is turned to true, see the comment at where this is set about why it is needed.
73   @GuardedBy("lock")
74   private boolean startedGoAway;
75   @GuardedBy("lock")
76   private Status goAwayStatus;
77   @GuardedBy("lock")
78   private boolean stopped;
79   @GuardedBy("lock")
80   // Whether this transport has started.
81   private boolean started;
82   private StreamBuilderFactory streamFactory;
83 
CronetClientTransport( StreamBuilderFactory streamFactory, InetSocketAddress address, String authority, @Nullable String userAgent, Attributes eagAttrs, Executor executor, int maxMessageSize, boolean alwaysUsePut, TransportTracer transportTracer, boolean useGetForSafeMethods, boolean usePutForIdempotentMethods)84   CronetClientTransport(
85       StreamBuilderFactory streamFactory,
86       InetSocketAddress address,
87       String authority,
88       @Nullable String userAgent,
89       Attributes eagAttrs,
90       Executor executor,
91       int maxMessageSize,
92       boolean alwaysUsePut,
93       TransportTracer transportTracer,
94       boolean useGetForSafeMethods,
95       boolean usePutForIdempotentMethods) {
96     this.address = Preconditions.checkNotNull(address, "address");
97     this.logId = InternalLogId.allocate(getClass(), address.toString());
98     this.authority = authority;
99     this.userAgent = GrpcUtil.getGrpcUserAgent("cronet", userAgent);
100     this.maxMessageSize = maxMessageSize;
101     this.alwaysUsePut = alwaysUsePut;
102     this.executor = Preconditions.checkNotNull(executor, "executor");
103     this.streamFactory = Preconditions.checkNotNull(streamFactory, "streamFactory");
104     this.transportTracer = Preconditions.checkNotNull(transportTracer, "transportTracer");
105     this.attrs = Attributes.newBuilder()
106         .set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.PRIVACY_AND_INTEGRITY)
107         .set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttrs)
108         .build();
109     this.useGetForSafeMethods = useGetForSafeMethods;
110     this.usePutForIdempotentMethods = usePutForIdempotentMethods;
111   }
112 
113   @Override
getStats()114   public ListenableFuture<SocketStats> getStats() {
115     SettableFuture<SocketStats> f = SettableFuture.create();
116     f.set(null);
117     return f;
118   }
119 
120   @Override
newStream(final MethodDescriptor<?, ?> method, final Metadata headers, final CallOptions callOptions, ClientStreamTracer[] tracers)121   public CronetClientStream newStream(final MethodDescriptor<?, ?> method, final Metadata headers,
122       final CallOptions callOptions, ClientStreamTracer[] tracers) {
123     Preconditions.checkNotNull(method, "method");
124     Preconditions.checkNotNull(headers, "headers");
125 
126     final String defaultPath = "/" + method.getFullMethodName();
127     final String url = "https://" + authority + defaultPath;
128 
129     final StatsTraceContext statsTraceCtx =
130         StatsTraceContext.newClientContext(tracers, attrs, headers);
131     class StartCallback implements Runnable {
132       final CronetClientStream clientStream = new CronetClientStream(
133           url, userAgent, executor, headers, CronetClientTransport.this, this, lock, maxMessageSize,
134           alwaysUsePut, method, statsTraceCtx, callOptions, transportTracer, useGetForSafeMethods,
135           usePutForIdempotentMethods);
136 
137       @Override
138       public void run() {
139         synchronized (lock) {
140           if (goAway) {
141             clientStream.transportState().transportReportStatus(goAwayStatus, true, new Metadata());
142           } else if (started) {
143             startStream(clientStream);
144           } else {
145             throw new AssertionError("Transport is not started");
146           }
147         }
148       }
149     }
150 
151     return new StartCallback().clientStream;
152   }
153 
154   @SuppressWarnings("GuardedBy")
155   @GuardedBy("lock")
startStream(CronetClientStream stream)156   private void startStream(CronetClientStream stream) {
157     streams.add(stream);
158     // TODO(b/145386688): This access should be guarded by 'stream.transportState().lock'; instead
159     // found: 'this.lock'
160     stream.transportState().start(streamFactory);
161   }
162 
163   @Override
start(Listener listener)164   public Runnable start(Listener listener) {
165     this.listener = Preconditions.checkNotNull(listener, "listener");
166     synchronized (lock) {
167       started = true;
168     }
169     return new Runnable() {
170       @Override
171       public void run() {
172         // Listener callbacks should not be called simultaneously
173         CronetClientTransport.this.listener.transportReady();
174       }
175     };
176   }
177 
178   @Override
179   public String toString() {
180     return super.toString() + "(" + address + ")";
181   }
182 
183   public void shutdown() {
184     shutdown(Status.UNAVAILABLE.withDescription("Transport stopped"));
185   }
186 
187   @Override
188   public void shutdown(Status status) {
189     synchronized (lock) {
190       if (goAway) {
191         return;
192       }
193     }
194 
195     startGoAway(status);
196   }
197 
198   @Override
199   public void shutdownNow(Status status) {
200     shutdown(status);
201     ArrayList<CronetClientStream> streamsCopy;
202     synchronized (lock) {
203       // A copy is always necessary since cancel() can call finishStream() which calls
204       // streams.remove()
205       streamsCopy = new ArrayList<>(streams);
206     }
207     for (int i = 0; i < streamsCopy.size(); i++) {
208       // Avoid deadlock by calling into stream without lock held
209       streamsCopy.get(i).cancel(status);
210     }
211     stopIfNecessary();
212   }
213 
214   @Override
215   public Attributes getAttributes() {
216     return attrs;
217   }
218 
219   private void startGoAway(Status status) {
220     synchronized (lock) {
221       if (startedGoAway) {
222         // Another go-away is in progress, ignore this one.
223         return;
224       }
225       // We use startedGoAway here instead of goAway, because once the goAway becomes true, other
226       // thread in stopIfNecessary() may stop the transport and cause the
227       // listener.transportTerminated() be called before listener.transportShutdown().
228       startedGoAway = true;
229     }
230 
231     listener.transportShutdown(status);
232 
233     synchronized (lock) {
234       goAway = true;
235       goAwayStatus = status;
236     }
237 
238     stopIfNecessary();
239   }
240 
241   @Override
242   public void ping(final PingCallback callback, Executor executor) {
243     // TODO(ericgribkoff): depend on cronet implemenetation
244     throw new UnsupportedOperationException();
245   }
246 
247   @Override
248   public InternalLogId getLogId() {
249     return logId;
250   }
251 
252   /**
253    * When the transport is in goAway state, we should stop it once all active streams finish.
254    */
255   void stopIfNecessary() {
256     synchronized (lock) {
257       if (goAway && !stopped && streams.size() == 0) {
258         stopped = true;
259       } else {
260         return;
261       }
262     }
263     listener.transportTerminated();
264   }
265 
266   void finishStream(CronetClientStream stream, Status status) {
267     synchronized (lock) {
268       if (streams.remove(stream)) {
269         boolean isCancelled = (status.getCode() == Code.CANCELLED
270             || status.getCode() == Code.DEADLINE_EXCEEDED);
271         stream.transportState().transportReportStatus(status, isCancelled, new Metadata());
272       } else {
273         return;
274       }
275     }
276     stopIfNecessary();
277   }
278 }
279