1 /*
2  * Copyright 2016 Google LLC
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions are
6  * met:
7  *
8  *     * Redistributions of source code must retain the above copyright
9  * notice, this list of conditions and the following disclaimer.
10  *     * Redistributions in binary form must reproduce the above
11  * copyright notice, this list of conditions and the following disclaimer
12  * in the documentation and/or other materials provided with the
13  * distribution.
14  *     * Neither the name of Google LLC nor the names of its
15  * contributors may be used to endorse or promote products derived from
16  * this software without specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29  */
30 package com.google.api.gax.grpc.testing;
31 
32 import com.google.api.core.BetaApi;
33 import com.google.api.gax.grpc.GrpcHeaderInterceptor;
34 import com.google.api.gax.grpc.GrpcTransportChannel;
35 import com.google.api.gax.rpc.FixedHeaderProvider;
36 import com.google.api.gax.rpc.HeaderProvider;
37 import com.google.api.gax.rpc.TransportChannel;
38 import com.google.api.gax.rpc.TransportChannelProvider;
39 import com.google.auth.Credentials;
40 import io.grpc.CallOptions;
41 import io.grpc.Channel;
42 import io.grpc.ClientCall;
43 import io.grpc.ClientInterceptor;
44 import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
45 import io.grpc.ManagedChannelBuilder;
46 import io.grpc.Metadata;
47 import io.grpc.MethodDescriptor;
48 import io.grpc.inprocess.InProcessChannelBuilder;
49 import java.io.IOException;
50 import java.util.List;
51 import java.util.Map;
52 import java.util.concurrent.CopyOnWriteArrayList;
53 import java.util.concurrent.Executor;
54 import java.util.concurrent.ScheduledExecutorService;
55 import java.util.regex.Pattern;
56 
57 /** LocalChannelProvider creates channels for in-memory gRPC services. */
58 @BetaApi
59 public class LocalChannelProvider implements TransportChannelProvider {
60 
61   private final List<LocalHeaderInterceptor> interceptors;
62   private final String address;
63 
64   private volatile HeaderProvider headerProvider;
65 
LocalChannelProvider(String address, HeaderProvider headerProvider)66   private LocalChannelProvider(String address, HeaderProvider headerProvider) {
67     this.interceptors = new CopyOnWriteArrayList<>();
68     this.address = address;
69     this.headerProvider = headerProvider;
70   }
71 
72   @Override
shouldAutoClose()73   public boolean shouldAutoClose() {
74     return true;
75   }
76 
77   @Deprecated
78   @Override
needsExecutor()79   public boolean needsExecutor() {
80     return false;
81   }
82 
83   @Deprecated
84   @Override
withExecutor(ScheduledExecutorService executor)85   public TransportChannelProvider withExecutor(ScheduledExecutorService executor) {
86     return withExecutor((Executor) executor);
87   }
88 
89   @Override
withExecutor(Executor executor)90   public TransportChannelProvider withExecutor(Executor executor) {
91     throw new UnsupportedOperationException("LocalChannelProvider doesn't need an executor");
92   }
93 
94   @Override
needsHeaders()95   public boolean needsHeaders() {
96     return headerProvider == null;
97   }
98 
99   @Override
needsEndpoint()100   public boolean needsEndpoint() {
101     return false;
102   }
103 
104   @Override
withEndpoint(String endpoint)105   public TransportChannelProvider withEndpoint(String endpoint) {
106     throw new UnsupportedOperationException("LocalChannelProvider doesn't need an endpoint");
107   }
108 
109   @Override
110   @BetaApi("The surface for customizing pool size is not stable yet and may change in the future.")
acceptsPoolSize()111   public boolean acceptsPoolSize() {
112     return false;
113   }
114 
115   @Override
116   @BetaApi("The surface for customizing pool size is not stable yet and may change in the future.")
withPoolSize(int size)117   public TransportChannelProvider withPoolSize(int size) {
118     throw new UnsupportedOperationException(
119         "LocalChannelProvider doesn't allow pool size customization");
120   }
121 
122   @Override
withHeaders(Map<String, String> headers)123   public TransportChannelProvider withHeaders(Map<String, String> headers) {
124     this.headerProvider = FixedHeaderProvider.create(headers);
125     return this;
126   }
127 
128   @Override
getTransportChannel()129   public TransportChannel getTransportChannel() throws IOException {
130     ManagedChannelBuilder channelBuilder = InProcessChannelBuilder.forName(address).usePlaintext();
131     if (headerProvider != null) {
132       GrpcHeaderInterceptor interceptor = new GrpcHeaderInterceptor(headerProvider.getHeaders());
133       LocalHeaderInterceptor localHeaderInterceptor = new LocalHeaderInterceptor(interceptor);
134       interceptors.add(localHeaderInterceptor);
135       channelBuilder.intercept(localHeaderInterceptor).userAgent(interceptor.getUserAgentHeader());
136     }
137     return GrpcTransportChannel.newBuilder().setManagedChannel(channelBuilder.build()).build();
138   }
139 
140   @Override
getTransportName()141   public String getTransportName() {
142     return GrpcTransportChannel.getGrpcTransportName();
143   }
144 
145   @Override
needsCredentials()146   public boolean needsCredentials() {
147     return false;
148   }
149 
150   @Override
withCredentials(Credentials credentials)151   public TransportChannelProvider withCredentials(Credentials credentials) {
152     throw new UnsupportedOperationException("LocalChannelProvider doesn't need credentials");
153   }
154 
155   /** Creates a LocalChannelProvider. */
create(String addressString)156   public static LocalChannelProvider create(String addressString) {
157     return new LocalChannelProvider(addressString, null);
158   }
159 
isHeaderSent(String headerKey, Pattern headerPattern)160   public boolean isHeaderSent(String headerKey, Pattern headerPattern) {
161     Metadata.Key<String> key = Metadata.Key.of(headerKey, Metadata.ASCII_STRING_MARSHALLER);
162 
163     if (interceptors.isEmpty()) {
164       return false;
165     }
166     for (LocalHeaderInterceptor interceptor : interceptors) {
167       if (interceptor.getSubmittedHeaders().isEmpty()) {
168         return false;
169       }
170       for (Metadata submittedHeaders : interceptor.getSubmittedHeaders()) {
171         String headerValue = submittedHeaders.get(key);
172         if (headerValue == null || !headerPattern.matcher(headerValue).matches()) {
173           return false;
174         }
175       }
176     }
177     return true;
178   }
179 
180   private static class LocalHeaderInterceptor implements ClientInterceptor {
181     private final ClientInterceptor innerInterceptor;
182     private final List<Metadata> submittedHeaders;
183 
LocalHeaderInterceptor(ClientInterceptor innerInterceptor)184     private LocalHeaderInterceptor(ClientInterceptor innerInterceptor) {
185       this.innerInterceptor = innerInterceptor;
186       this.submittedHeaders = new CopyOnWriteArrayList<>();
187     }
188 
189     @Override
interceptCall( MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next)190     public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
191         MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
192       ClientCall<ReqT, RespT> call = innerInterceptor.interceptCall(method, callOptions, next);
193       return new SimpleForwardingClientCall<ReqT, RespT>(call) {
194         @Override
195         public void start(Listener<RespT> responseListener, Metadata headers) {
196           super.start(responseListener, headers);
197           submittedHeaders.add(headers);
198         }
199       };
200     }
201 
getSubmittedHeaders()202     List<Metadata> getSubmittedHeaders() {
203       return submittedHeaders;
204     }
205   }
206 }
207