1 /* 2 * Copyright 2016 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.cronet; 18 19 import com.google.common.base.Preconditions; 20 import com.google.common.util.concurrent.ListenableFuture; 21 import com.google.common.util.concurrent.SettableFuture; 22 import io.grpc.Attributes; 23 import io.grpc.CallOptions; 24 import io.grpc.ClientStreamTracer; 25 import io.grpc.InternalChannelz.SocketStats; 26 import io.grpc.InternalLogId; 27 import io.grpc.Metadata; 28 import io.grpc.MethodDescriptor; 29 import io.grpc.SecurityLevel; 30 import io.grpc.Status; 31 import io.grpc.Status.Code; 32 import io.grpc.cronet.CronetChannelBuilder.StreamBuilderFactory; 33 import io.grpc.internal.ConnectionClientTransport; 34 import io.grpc.internal.GrpcAttributes; 35 import io.grpc.internal.GrpcUtil; 36 import io.grpc.internal.StatsTraceContext; 37 import io.grpc.internal.TransportTracer; 38 import java.net.InetSocketAddress; 39 import java.util.ArrayList; 40 import java.util.Collections; 41 import java.util.IdentityHashMap; 42 import java.util.Set; 43 import java.util.concurrent.Executor; 44 import javax.annotation.Nullable; 45 import javax.annotation.concurrent.GuardedBy; 46 47 /** 48 * A cronet-based {@link ConnectionClientTransport} implementation. 49 */ 50 class CronetClientTransport implements ConnectionClientTransport { 51 private final InternalLogId logId; 52 private final InetSocketAddress address; 53 private final String authority; 54 private final String userAgent; 55 private Listener listener; 56 private final Object lock = new Object(); 57 @GuardedBy("lock") 58 private final Set<CronetClientStream> streams = Collections.newSetFromMap( 59 new IdentityHashMap<CronetClientStream, Boolean>()); 60 private final Executor executor; 61 private final int maxMessageSize; 62 private final boolean alwaysUsePut; 63 private final TransportTracer transportTracer; 64 private final Attributes attrs; 65 private final boolean useGetForSafeMethods; 66 private final boolean usePutForIdempotentMethods; 67 // Indicates the transport is in go-away state: no new streams will be processed, 68 // but existing streams may continue. 69 @GuardedBy("lock") 70 private boolean goAway; 71 // Used to indicate the special phase while we are going to enter go-away state but before 72 // goAway is turned to true, see the comment at where this is set about why it is needed. 73 @GuardedBy("lock") 74 private boolean startedGoAway; 75 @GuardedBy("lock") 76 private Status goAwayStatus; 77 @GuardedBy("lock") 78 private boolean stopped; 79 @GuardedBy("lock") 80 // Whether this transport has started. 81 private boolean started; 82 private StreamBuilderFactory streamFactory; 83 CronetClientTransport( StreamBuilderFactory streamFactory, InetSocketAddress address, String authority, @Nullable String userAgent, Attributes eagAttrs, Executor executor, int maxMessageSize, boolean alwaysUsePut, TransportTracer transportTracer, boolean useGetForSafeMethods, boolean usePutForIdempotentMethods)84 CronetClientTransport( 85 StreamBuilderFactory streamFactory, 86 InetSocketAddress address, 87 String authority, 88 @Nullable String userAgent, 89 Attributes eagAttrs, 90 Executor executor, 91 int maxMessageSize, 92 boolean alwaysUsePut, 93 TransportTracer transportTracer, 94 boolean useGetForSafeMethods, 95 boolean usePutForIdempotentMethods) { 96 this.address = Preconditions.checkNotNull(address, "address"); 97 this.logId = InternalLogId.allocate(getClass(), address.toString()); 98 this.authority = authority; 99 this.userAgent = GrpcUtil.getGrpcUserAgent("cronet", userAgent); 100 this.maxMessageSize = maxMessageSize; 101 this.alwaysUsePut = alwaysUsePut; 102 this.executor = Preconditions.checkNotNull(executor, "executor"); 103 this.streamFactory = Preconditions.checkNotNull(streamFactory, "streamFactory"); 104 this.transportTracer = Preconditions.checkNotNull(transportTracer, "transportTracer"); 105 this.attrs = Attributes.newBuilder() 106 .set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.PRIVACY_AND_INTEGRITY) 107 .set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttrs) 108 .build(); 109 this.useGetForSafeMethods = useGetForSafeMethods; 110 this.usePutForIdempotentMethods = usePutForIdempotentMethods; 111 } 112 113 @Override getStats()114 public ListenableFuture<SocketStats> getStats() { 115 SettableFuture<SocketStats> f = SettableFuture.create(); 116 f.set(null); 117 return f; 118 } 119 120 @Override newStream(final MethodDescriptor<?, ?> method, final Metadata headers, final CallOptions callOptions, ClientStreamTracer[] tracers)121 public CronetClientStream newStream(final MethodDescriptor<?, ?> method, final Metadata headers, 122 final CallOptions callOptions, ClientStreamTracer[] tracers) { 123 Preconditions.checkNotNull(method, "method"); 124 Preconditions.checkNotNull(headers, "headers"); 125 126 final String defaultPath = "/" + method.getFullMethodName(); 127 final String url = "https://" + authority + defaultPath; 128 129 final StatsTraceContext statsTraceCtx = 130 StatsTraceContext.newClientContext(tracers, attrs, headers); 131 class StartCallback implements Runnable { 132 final CronetClientStream clientStream = new CronetClientStream( 133 url, userAgent, executor, headers, CronetClientTransport.this, this, lock, maxMessageSize, 134 alwaysUsePut, method, statsTraceCtx, callOptions, transportTracer, useGetForSafeMethods, 135 usePutForIdempotentMethods); 136 137 @Override 138 public void run() { 139 synchronized (lock) { 140 if (goAway) { 141 clientStream.transportState().transportReportStatus(goAwayStatus, true, new Metadata()); 142 } else if (started) { 143 startStream(clientStream); 144 } else { 145 throw new AssertionError("Transport is not started"); 146 } 147 } 148 } 149 } 150 151 return new StartCallback().clientStream; 152 } 153 154 @SuppressWarnings("GuardedBy") 155 @GuardedBy("lock") startStream(CronetClientStream stream)156 private void startStream(CronetClientStream stream) { 157 streams.add(stream); 158 // TODO(b/145386688): This access should be guarded by 'stream.transportState().lock'; instead 159 // found: 'this.lock' 160 stream.transportState().start(streamFactory); 161 } 162 163 @Override start(Listener listener)164 public Runnable start(Listener listener) { 165 this.listener = Preconditions.checkNotNull(listener, "listener"); 166 synchronized (lock) { 167 started = true; 168 } 169 return new Runnable() { 170 @Override 171 public void run() { 172 // Listener callbacks should not be called simultaneously 173 CronetClientTransport.this.listener.transportReady(); 174 } 175 }; 176 } 177 178 @Override 179 public String toString() { 180 return super.toString() + "(" + address + ")"; 181 } 182 183 public void shutdown() { 184 shutdown(Status.UNAVAILABLE.withDescription("Transport stopped")); 185 } 186 187 @Override 188 public void shutdown(Status status) { 189 synchronized (lock) { 190 if (goAway) { 191 return; 192 } 193 } 194 195 startGoAway(status); 196 } 197 198 @Override 199 public void shutdownNow(Status status) { 200 shutdown(status); 201 ArrayList<CronetClientStream> streamsCopy; 202 synchronized (lock) { 203 // A copy is always necessary since cancel() can call finishStream() which calls 204 // streams.remove() 205 streamsCopy = new ArrayList<>(streams); 206 } 207 for (int i = 0; i < streamsCopy.size(); i++) { 208 // Avoid deadlock by calling into stream without lock held 209 streamsCopy.get(i).cancel(status); 210 } 211 stopIfNecessary(); 212 } 213 214 @Override 215 public Attributes getAttributes() { 216 return attrs; 217 } 218 219 private void startGoAway(Status status) { 220 synchronized (lock) { 221 if (startedGoAway) { 222 // Another go-away is in progress, ignore this one. 223 return; 224 } 225 // We use startedGoAway here instead of goAway, because once the goAway becomes true, other 226 // thread in stopIfNecessary() may stop the transport and cause the 227 // listener.transportTerminated() be called before listener.transportShutdown(). 228 startedGoAway = true; 229 } 230 231 listener.transportShutdown(status); 232 233 synchronized (lock) { 234 goAway = true; 235 goAwayStatus = status; 236 } 237 238 stopIfNecessary(); 239 } 240 241 @Override 242 public void ping(final PingCallback callback, Executor executor) { 243 // TODO(ericgribkoff): depend on cronet implemenetation 244 throw new UnsupportedOperationException(); 245 } 246 247 @Override 248 public InternalLogId getLogId() { 249 return logId; 250 } 251 252 /** 253 * When the transport is in goAway state, we should stop it once all active streams finish. 254 */ 255 void stopIfNecessary() { 256 synchronized (lock) { 257 if (goAway && !stopped && streams.size() == 0) { 258 stopped = true; 259 } else { 260 return; 261 } 262 } 263 listener.transportTerminated(); 264 } 265 266 void finishStream(CronetClientStream stream, Status status) { 267 synchronized (lock) { 268 if (streams.remove(stream)) { 269 boolean isCancelled = (status.getCode() == Code.CANCELLED 270 || status.getCode() == Code.DEADLINE_EXCEEDED); 271 stream.transportState().transportReportStatus(status, isCancelled, new Metadata()); 272 } else { 273 return; 274 } 275 } 276 stopIfNecessary(); 277 } 278 } 279