1 /* 2 * Copyright 2019 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.xds; 18 19 import static com.google.common.base.Preconditions.checkArgument; 20 import static com.google.common.base.Preconditions.checkNotNull; 21 import static com.google.common.base.Preconditions.checkState; 22 import static io.grpc.xds.InternalXdsAttributes.ATTR_DRAIN_GRACE_NANOS; 23 import static io.grpc.xds.InternalXdsAttributes.ATTR_FILTER_CHAIN_SELECTOR_MANAGER; 24 25 import com.google.common.annotations.VisibleForTesting; 26 import com.google.errorprone.annotations.DoNotCall; 27 import io.grpc.Attributes; 28 import io.grpc.ExperimentalApi; 29 import io.grpc.ForwardingServerBuilder; 30 import io.grpc.Internal; 31 import io.grpc.Server; 32 import io.grpc.ServerBuilder; 33 import io.grpc.ServerCredentials; 34 import io.grpc.netty.InternalNettyServerBuilder; 35 import io.grpc.netty.InternalNettyServerCredentials; 36 import io.grpc.netty.InternalProtocolNegotiator; 37 import io.grpc.netty.NettyServerBuilder; 38 import io.grpc.xds.FilterChainMatchingProtocolNegotiators.FilterChainMatchingNegotiatorServerFactory; 39 import io.grpc.xds.XdsNameResolverProvider.XdsClientPoolFactory; 40 import java.util.Map; 41 import java.util.concurrent.TimeUnit; 42 import java.util.concurrent.atomic.AtomicBoolean; 43 import java.util.logging.Logger; 44 45 /** 46 * A version of {@link ServerBuilder} to create xDS managed servers. 47 */ 48 @ExperimentalApi("https://github.com/grpc/grpc-java/issues/7514") 49 public final class XdsServerBuilder extends ForwardingServerBuilder<XdsServerBuilder> { 50 private static final long AS_LARGE_AS_INFINITE = TimeUnit.DAYS.toNanos(1000); 51 52 private final NettyServerBuilder delegate; 53 private final int port; 54 private XdsServingStatusListener xdsServingStatusListener; 55 private AtomicBoolean isServerBuilt = new AtomicBoolean(false); 56 private final FilterRegistry filterRegistry = FilterRegistry.getDefaultRegistry(); 57 private XdsClientPoolFactory xdsClientPoolFactory = 58 SharedXdsClientPoolProvider.getDefaultProvider(); 59 private long drainGraceTime = 10; 60 private TimeUnit drainGraceTimeUnit = TimeUnit.MINUTES; 61 XdsServerBuilder(NettyServerBuilder nettyDelegate, int port)62 private XdsServerBuilder(NettyServerBuilder nettyDelegate, int port) { 63 this.delegate = nettyDelegate; 64 this.port = port; 65 xdsServingStatusListener = new DefaultListener("port:" + port); 66 } 67 68 @Override 69 @Internal delegate()70 protected ServerBuilder<?> delegate() { 71 checkState(!isServerBuilt.get(), "Server already built!"); 72 return delegate; 73 } 74 75 /** Set the {@link XdsServingStatusListener} to receive "serving" and "not serving" states. */ xdsServingStatusListener( XdsServingStatusListener xdsServingStatusListener)76 public XdsServerBuilder xdsServingStatusListener( 77 XdsServingStatusListener xdsServingStatusListener) { 78 this.xdsServingStatusListener = 79 checkNotNull(xdsServingStatusListener, "xdsServingStatusListener"); 80 return this; 81 } 82 83 /** 84 * Sets the grace time when draining connections with outdated configuration. When an xDS config 85 * update changes connection configuration, pre-existing connections stop accepting new RPCs to be 86 * replaced by new connections. RPCs on those pre-existing connections have the grace time to 87 * complete. RPCs that do not complete in time will be cancelled, allowing the connection to 88 * terminate. {@code Long.MAX_VALUE} nano seconds or an unreasonably large value are considered 89 * infinite. The default is 10 minutes. 90 */ drainGraceTime(long drainGraceTime, TimeUnit drainGraceTimeUnit)91 public XdsServerBuilder drainGraceTime(long drainGraceTime, TimeUnit drainGraceTimeUnit) { 92 checkArgument(drainGraceTime >= 0, "drain grace time must be non-negative: %s", 93 drainGraceTime); 94 checkNotNull(drainGraceTimeUnit, "drainGraceTimeUnit"); 95 if (drainGraceTimeUnit.toNanos(drainGraceTime) >= AS_LARGE_AS_INFINITE) { 96 drainGraceTimeUnit = null; 97 } 98 this.drainGraceTime = drainGraceTime; 99 this.drainGraceTimeUnit = drainGraceTimeUnit; 100 return this; 101 } 102 103 @DoNotCall("Unsupported. Use forPort(int, ServerCredentials) instead") forPort(int port)104 public static ServerBuilder<?> forPort(int port) { 105 throw new UnsupportedOperationException( 106 "Unsupported call - use forPort(int, ServerCredentials)"); 107 } 108 109 /** Creates a gRPC server builder for the given port. */ forPort(int port, ServerCredentials serverCredentials)110 public static XdsServerBuilder forPort(int port, ServerCredentials serverCredentials) { 111 checkNotNull(serverCredentials, "serverCredentials"); 112 InternalProtocolNegotiator.ServerFactory originalNegotiatorFactory = 113 InternalNettyServerCredentials.toNegotiator(serverCredentials); 114 ServerCredentials wrappedCredentials = InternalNettyServerCredentials.create( 115 new FilterChainMatchingNegotiatorServerFactory(originalNegotiatorFactory)); 116 NettyServerBuilder nettyDelegate = NettyServerBuilder.forPort(port, wrappedCredentials); 117 return new XdsServerBuilder(nettyDelegate, port); 118 } 119 120 @Override build()121 public Server build() { 122 checkState(isServerBuilt.compareAndSet(false, true), "Server already built!"); 123 FilterChainSelectorManager filterChainSelectorManager = new FilterChainSelectorManager(); 124 Attributes.Builder builder = Attributes.newBuilder() 125 .set(ATTR_FILTER_CHAIN_SELECTOR_MANAGER, filterChainSelectorManager); 126 if (drainGraceTimeUnit != null) { 127 builder.set(ATTR_DRAIN_GRACE_NANOS, drainGraceTimeUnit.toNanos(drainGraceTime)); 128 } 129 InternalNettyServerBuilder.eagAttributes(delegate, builder.build()); 130 return new XdsServerWrapper("0.0.0.0:" + port, delegate, xdsServingStatusListener, 131 filterChainSelectorManager, xdsClientPoolFactory, filterRegistry); 132 } 133 134 @VisibleForTesting xdsClientPoolFactory(XdsClientPoolFactory xdsClientPoolFactory)135 XdsServerBuilder xdsClientPoolFactory(XdsClientPoolFactory xdsClientPoolFactory) { 136 this.xdsClientPoolFactory = checkNotNull(xdsClientPoolFactory, "xdsClientPoolFactory"); 137 return this; 138 } 139 140 /** 141 * Allows providing bootstrap override, useful for testing. 142 */ overrideBootstrapForTest(Map<String, ?> bootstrapOverride)143 public XdsServerBuilder overrideBootstrapForTest(Map<String, ?> bootstrapOverride) { 144 checkNotNull(bootstrapOverride, "bootstrapOverride"); 145 if (this.xdsClientPoolFactory == SharedXdsClientPoolProvider.getDefaultProvider()) { 146 this.xdsClientPoolFactory = new SharedXdsClientPoolProvider(); 147 } 148 this.xdsClientPoolFactory.setBootstrapOverride(bootstrapOverride); 149 return this; 150 } 151 152 /** 153 * Returns the delegate {@link NettyServerBuilder} to allow experimental level 154 * transport-specific configuration. Note this API will always be experimental. 155 */ transportBuilder()156 public ServerBuilder<?> transportBuilder() { 157 return delegate; 158 } 159 160 /** 161 * Applications can register this listener to receive "serving" and "not serving" states of 162 * the server using {@link #xdsServingStatusListener(XdsServingStatusListener)}. 163 */ 164 public interface XdsServingStatusListener { 165 166 /** Callback invoked when server begins serving. */ onServing()167 void onServing(); 168 169 /** Callback invoked when server is forced to be "not serving" due to an error. 170 * @param throwable cause of the error 171 */ onNotServing(Throwable throwable)172 void onNotServing(Throwable throwable); 173 } 174 175 /** Default implementation of {@link XdsServingStatusListener} that logs at WARNING level. */ 176 private static class DefaultListener implements XdsServingStatusListener { 177 private final Logger logger; 178 private final String prefix; 179 boolean notServingDueToError; 180 DefaultListener(String prefix)181 DefaultListener(String prefix) { 182 logger = Logger.getLogger(DefaultListener.class.getName()); 183 this.prefix = prefix; 184 } 185 186 /** Log calls to onServing() following a call to onNotServing() at WARNING level. */ 187 @Override onServing()188 public void onServing() { 189 if (notServingDueToError) { 190 notServingDueToError = false; 191 logger.warning("[" + prefix + "] Entering serving state."); 192 } 193 } 194 195 @Override onNotServing(Throwable throwable)196 public void onNotServing(Throwable throwable) { 197 logger.warning("[" + prefix + "] " + throwable.getMessage()); 198 notServingDueToError = true; 199 } 200 } 201 } 202