Skip to content

Commit 6b1e974

Browse files
netty: support listening on multiple port
1 parent 3a38e59 commit 6b1e974

File tree

14 files changed

+117
-79
lines changed

14 files changed

+117
-79
lines changed

core/src/main/java/io/grpc/InternalChannelz.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,7 @@ public static final class Builder {
311311
private long callsSucceeded;
312312
private long callsFailed;
313313
private long lastCallStartedNanos;
314-
public List<InternalInstrumented<SocketStats>> listenSockets = Collections.emptyList();
314+
public List<InternalInstrumented<SocketStats>> listenSockets = new ArrayList<>();
315315

316316
public Builder setCallsStarted(long callsStarted) {
317317
this.callsStarted = callsStarted;
@@ -334,10 +334,11 @@ public Builder setLastCallStartedNanos(long lastCallStartedNanos) {
334334
}
335335

336336
/** Sets the listen sockets. */
337-
public Builder setListenSockets(List<InternalInstrumented<SocketStats>> listenSockets) {
338-
checkNotNull(listenSockets);
339-
this.listenSockets = Collections.unmodifiableList(
340-
new ArrayList<InternalInstrumented<SocketStats>>(listenSockets));
337+
public Builder addListenSockets(List<InternalInstrumented<SocketStats>> listenSockets) {
338+
checkNotNull(listenSockets, "listenSockets");
339+
for (InternalInstrumented<SocketStats> ss : listenSockets) {
340+
this.listenSockets.add(checkNotNull(ss, "null listen socket"));
341+
}
341342
return this;
342343
}
343344

core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import io.grpc.internal.ObjectPool;
2828
import io.grpc.internal.SharedResourcePool;
2929
import java.io.File;
30+
import java.util.Collections;
3031
import java.util.List;
3132
import java.util.UUID;
3233
import java.util.concurrent.ScheduledExecutorService;
@@ -145,9 +146,9 @@ public InProcessServerBuilder maxInboundMetadataSize(int bytes) {
145146
}
146147

147148
@Override
148-
protected InProcessServer buildTransportServer(
149+
protected List<InProcessServer> buildTransportServers(
149150
List<? extends ServerStreamTracer.Factory> streamTracerFactories) {
150-
return new InProcessServer(this, streamTracerFactories);
151+
return Collections.singletonList(new InProcessServer(this, streamTracerFactories));
151152
}
152153

153154
@Override

core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ protected void setTracingEnabled(boolean value) {
220220
public final Server build() {
221221
ServerImpl server = new ServerImpl(
222222
this,
223-
buildTransportServer(getTracerFactories()),
223+
buildTransportServers(getTracerFactories()),
224224
Context.ROOT);
225225
for (InternalNotifyOnServerBuild notifyTarget : notifyOnBuildList) {
226226
notifyTarget.notifyOnBuild(server);
@@ -266,7 +266,7 @@ protected final TransportTracer.Factory getTransportTracerFactory() {
266266
*
267267
* @param streamTracerFactories an immutable list of stream tracer factories
268268
*/
269-
protected abstract io.grpc.internal.InternalServer buildTransportServer(
269+
protected abstract List<? extends io.grpc.internal.InternalServer> buildTransportServers(
270270
List<? extends ServerStreamTracer.Factory> streamTracerFactories);
271271

272272
private T thisT() {

core/src/main/java/io/grpc/internal/ServerImpl.java

Lines changed: 43 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -101,12 +101,11 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
101101
@GuardedBy("lock") private boolean serverShutdownCallbackInvoked;
102102
@GuardedBy("lock") private boolean terminated;
103103
/** Service encapsulating something similar to an accept() socket. */
104-
private final InternalServer transportServer;
104+
private final List<? extends InternalServer> transportServers;
105105
private final Object lock = new Object();
106-
@GuardedBy("lock") private boolean transportServerTerminated;
106+
@GuardedBy("lock") private boolean transportServersTerminated;
107107
/** {@code transportServer} and services encapsulating something similar to a TCP connection. */
108-
@GuardedBy("lock") private final Collection<ServerTransport> transports =
109-
new HashSet<ServerTransport>();
108+
@GuardedBy("lock") private final Collection<ServerTransport> transports = new HashSet<>();
110109

111110
private final Context rootContext;
112111

@@ -126,14 +125,18 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
126125
*/
127126
ServerImpl(
128127
AbstractServerImplBuilder<?> builder,
129-
InternalServer transportServer,
128+
List<? extends InternalServer> transportServers,
130129
Context rootContext) {
131130
this.executorPool = Preconditions.checkNotNull(builder.executorPool, "executorPool");
132131
this.registry = Preconditions.checkNotNull(builder.registryBuilder.build(), "registryBuilder");
133132
this.fallbackRegistry =
134133
Preconditions.checkNotNull(builder.fallbackRegistry, "fallbackRegistry");
135-
this.transportServer = Preconditions.checkNotNull(transportServer, "transportServer");
136-
this.logId = InternalLogId.allocate("Server", String.valueOf(transportServer.getPort()));
134+
Preconditions.checkNotNull(transportServers, "transportServers");
135+
Preconditions.checkArgument(!transportServers.isEmpty(), "no servers provided");
136+
this.transportServers = new ArrayList<>(transportServers);
137+
// TODO(notcarl): concatenate all listening ports in the Log Id.
138+
this.logId =
139+
InternalLogId.allocate("Server", String.valueOf(transportServers.get(0).getPort()));
137140
// Fork from the passed in context so that it does not propagate cancellation, it only
138141
// inherits values.
139142
this.rootContext = Preconditions.checkNotNull(rootContext, "rootContext").fork();
@@ -163,8 +166,12 @@ public ServerImpl start() throws IOException {
163166
synchronized (lock) {
164167
checkState(!started, "Already started");
165168
checkState(!shutdown, "Shutting down");
166-
// Start and wait for any port to actually be bound.
167-
transportServer.start(new ServerListenerImpl());
169+
// Start and wait for any ports to actually be bound.
170+
171+
for (InternalServer ts : transportServers) {
172+
ServerListenerImpl listener = new ServerListenerImpl(ts);
173+
ts.start(listener);
174+
}
168175
executor = Preconditions.checkNotNull(executorPool.getObject(), "executor");
169176
started = true;
170177
return this;
@@ -176,7 +183,13 @@ public int getPort() {
176183
synchronized (lock) {
177184
checkState(started, "Not started");
178185
checkState(!terminated, "Already terminated");
179-
return transportServer.getPort();
186+
for (InternalServer ts : transportServers) {
187+
int port = ts.getPort();
188+
if (port != -1) {
189+
return port;
190+
}
191+
}
192+
return -1;
180193
}
181194
}
182195

@@ -211,20 +224,22 @@ public List<ServerServiceDefinition> getMutableServices() {
211224
*/
212225
@Override
213226
public ServerImpl shutdown() {
214-
boolean shutdownTransportServer;
227+
boolean shutdownTransportServers;
215228
synchronized (lock) {
216229
if (shutdown) {
217230
return this;
218231
}
219232
shutdown = true;
220-
shutdownTransportServer = started;
221-
if (!shutdownTransportServer) {
222-
transportServerTerminated = true;
233+
shutdownTransportServers = started;
234+
if (!shutdownTransportServers) {
235+
transportServersTerminated = true;
223236
checkForTermination();
224237
}
225238
}
226-
if (shutdownTransportServer) {
227-
transportServer.shutdown();
239+
if (shutdownTransportServers) {
240+
for (InternalServer ts : transportServers) {
241+
ts.shutdown();
242+
}
228243
}
229244
return this;
230245
}
@@ -311,7 +326,7 @@ private void transportClosed(ServerTransport transport) {
311326
/** Notify of complete shutdown if necessary. */
312327
private void checkForTermination() {
313328
synchronized (lock) {
314-
if (shutdown && transports.isEmpty() && transportServerTerminated) {
329+
if (shutdown && transports.isEmpty() && transportServersTerminated) {
315330
if (terminated) {
316331
throw new AssertionError("Server already terminated");
317332
}
@@ -327,6 +342,11 @@ private void checkForTermination() {
327342
}
328343

329344
private final class ServerListenerImpl implements ServerListener {
345+
346+
ServerListenerImpl(InternalServer transportServer) {
347+
348+
}
349+
330350
@Override
331351
public ServerTransportListener transportCreated(ServerTransport transport) {
332352
synchronized (lock) {
@@ -356,7 +376,7 @@ public void serverShutdown() {
356376
}
357377
}
358378
synchronized (lock) {
359-
transportServerTerminated = true;
379+
transportServersTerminated = true;
360380
checkForTermination();
361381
}
362382
}
@@ -577,9 +597,10 @@ public InternalLogId getLogId() {
577597

578598
@Override
579599
public ListenableFuture<ServerStats> getStats() {
580-
ServerStats.Builder builder
581-
= new ServerStats.Builder()
582-
.setListenSockets(transportServer.getListenSockets());
600+
ServerStats.Builder builder = new ServerStats.Builder();
601+
for (InternalServer ts : transportServers) {
602+
builder.addListenSockets(ts.getListenSockets());
603+
}
583604
serverCallTracer.updateBuilder(builder);
584605
SettableFuture<ServerStats> ret = SettableFuture.create();
585606
ret.set(builder.build());
@@ -590,7 +611,7 @@ public ListenableFuture<ServerStats> getStats() {
590611
public String toString() {
591612
return MoreObjects.toStringHelper(this)
592613
.add("logId", logId.getId())
593-
.add("transportServer", transportServer)
614+
.add("transportServers", transportServers)
594615
.toString();
595616
}
596617

core/src/test/java/io/grpc/inprocess/InProcessServerBuilderTest.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static org.junit.Assert.assertNotNull;
2323
import static org.junit.Assert.assertSame;
2424

25+
import com.google.common.collect.Iterables;
2526
import io.grpc.ServerStreamTracer.Factory;
2627
import io.grpc.internal.FakeClock;
2728
import io.grpc.internal.ObjectPool;
@@ -54,7 +55,8 @@ public void generateName() {
5455
@Test
5556
public void scheduledExecutorService_default() {
5657
InProcessServerBuilder builder = InProcessServerBuilder.forName("foo");
57-
InProcessServer server = builder.buildTransportServer(new ArrayList<Factory>());
58+
InProcessServer server =
59+
Iterables.getOnlyElement(builder.buildTransportServers(new ArrayList<Factory>()));
5860

5961
ObjectPool<ScheduledExecutorService> scheduledExecutorServicePool =
6062
server.getScheduledExecutorServicePool();
@@ -78,7 +80,8 @@ public void scheduledExecutorService_custom() {
7880
InProcessServerBuilder builder1 = builder.scheduledExecutorService(scheduledExecutorService);
7981
assertSame(builder, builder1);
8082

81-
InProcessServer server = builder1.buildTransportServer(new ArrayList<Factory>());
83+
InProcessServer server =
84+
Iterables.getOnlyElement(builder1.buildTransportServers(new ArrayList<Factory>()));
8285
ObjectPool<ScheduledExecutorService> scheduledExecutorServicePool =
8386
server.getScheduledExecutorServicePool();
8487

core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.grpc.internal.InternalServer;
2222
import io.grpc.internal.ManagedClientTransport;
2323
import io.grpc.internal.testing.AbstractTransportTest;
24+
import java.util.Collections;
2425
import java.util.List;
2526
import org.junit.Ignore;
2627
import org.junit.Test;
@@ -35,16 +36,17 @@ public class InProcessTransportTest extends AbstractTransportTest {
3536
private static final String USER_AGENT = "a-testing-user-agent";
3637

3738
@Override
38-
protected InternalServer newServer(List<ServerStreamTracer.Factory> streamTracerFactories) {
39+
protected List<? extends InternalServer> newServer(
40+
List<ServerStreamTracer.Factory> streamTracerFactories) {
3941
InProcessServerBuilder builder = InProcessServerBuilder
4042
.forName(TRANSPORT_NAME)
4143
.maxInboundMetadataSize(GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE);
42-
return new InProcessServer(builder, streamTracerFactories);
44+
return Collections.singletonList(new InProcessServer(builder, streamTracerFactories));
4345
}
4446

4547
@Override
46-
protected InternalServer newServer(
47-
InternalServer server, List<ServerStreamTracer.Factory> streamTracerFactories) {
48+
protected List<? extends InternalServer> newServer(
49+
int port, List<ServerStreamTracer.Factory> streamTracerFactories) {
4850
return newServer(streamTracerFactories);
4951
}
5052

core/src/test/java/io/grpc/internal/AbstractServerImplBuilderTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ static class Builder extends AbstractServerImplBuilder<Builder> {
101101
}
102102

103103
@Override
104-
protected io.grpc.internal.InternalServer buildTransportServer(
104+
protected List<io.grpc.internal.InternalServer> buildTransportServers(
105105
List<? extends ServerStreamTracer.Factory> streamTracerFactories) {
106106
throw new UnsupportedOperationException();
107107
}

core/src/test/java/io/grpc/internal/ServerImplTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1326,7 +1326,7 @@ private void createServer() {
13261326

13271327
builder.fallbackHandlerRegistry(fallbackRegistry);
13281328
builder.executorPool = executorPool;
1329-
server = new ServerImpl(builder, transportServer, SERVER_CONTEXT);
1329+
server = new ServerImpl(builder, Collections.singletonList(transportServer), SERVER_CONTEXT);
13301330
}
13311331

13321332
private void verifyExecutorsAcquired() {
@@ -1411,7 +1411,7 @@ public ListenableFuture<SocketStats> getStats() {
14111411
}
14121412

14131413
private static class Builder extends AbstractServerImplBuilder<Builder> {
1414-
@Override protected InternalServer buildTransportServer(
1414+
@Override protected List<InternalServer> buildTransportServers(
14151415
List<? extends ServerStreamTracer.Factory> streamTracerFactories) {
14161416
throw new UnsupportedOperationException();
14171417
}

netty/src/main/java/io/grpc/netty/NettyServerBuilder.java

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
import java.io.InputStream;
4040
import java.net.InetSocketAddress;
4141
import java.net.SocketAddress;
42+
import java.util.ArrayList;
43+
import java.util.Collections;
4244
import java.util.HashMap;
4345
import java.util.List;
4446
import java.util.Map;
@@ -65,7 +67,7 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
6567
private static final long MIN_MAX_CONNECTION_AGE_NANO = TimeUnit.SECONDS.toNanos(1L);
6668
private static final long AS_LARGE_AS_INFINITE = TimeUnit.DAYS.toNanos(1000L);
6769

68-
private final SocketAddress address;
70+
private final List<SocketAddress> listenAddresses = new ArrayList<>();
6971
private Class<? extends ServerChannel> channelType = NioServerSocketChannel.class;
7072
private final Map<ChannelOption<?>, Object> channelOptions = new HashMap<>();
7173
@Nullable
@@ -110,12 +112,12 @@ public static NettyServerBuilder forAddress(SocketAddress address) {
110112

111113
@CheckReturnValue
112114
private NettyServerBuilder(int port) {
113-
this.address = new InetSocketAddress(port);
115+
this.listenAddresses.add(new InetSocketAddress(port));
114116
}
115117

116118
@CheckReturnValue
117119
private NettyServerBuilder(SocketAddress address) {
118-
this.address = address;
120+
this.listenAddresses.add(address);
119121
}
120122

121123
/**
@@ -436,22 +438,26 @@ public NettyServerBuilder permitKeepAliveWithoutCalls(boolean permit) {
436438

437439
@Override
438440
@CheckReturnValue
439-
protected NettyServer buildTransportServer(
441+
protected List<NettyServer> buildTransportServers(
440442
List<? extends ServerStreamTracer.Factory> streamTracerFactories) {
441443
ProtocolNegotiator negotiator = protocolNegotiator;
442444
if (negotiator == null) {
443445
negotiator = sslContext != null ? ProtocolNegotiators.serverTls(sslContext) :
444446
ProtocolNegotiators.serverPlaintext();
445447
}
446-
447-
return new NettyServer(
448-
address, channelType, channelOptions, bossEventLoopGroup, workerEventLoopGroup,
449-
negotiator, streamTracerFactories, getTransportTracerFactory(),
450-
maxConcurrentCallsPerConnection, flowControlWindow,
451-
maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos,
452-
maxConnectionIdleInNanos,
453-
maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
454-
permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos, getChannelz());
448+
List<NettyServer> transportServers = new ArrayList<>(listenAddresses.size());
449+
for (SocketAddress listenAddress : listenAddresses) {
450+
NettyServer transportServer = new NettyServer(
451+
listenAddress, channelType, channelOptions, bossEventLoopGroup, workerEventLoopGroup,
452+
negotiator, streamTracerFactories, getTransportTracerFactory(),
453+
maxConcurrentCallsPerConnection, flowControlWindow,
454+
maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos,
455+
maxConnectionIdleInNanos,
456+
maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
457+
permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos, getChannelz());
458+
transportServers.add(transportServer);
459+
}
460+
return Collections.unmodifiableList(transportServers);
455461
}
456462

457463
@Override

netty/src/test/java/io/grpc/netty/NettyTransportTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,23 +55,23 @@ public void releaseClientFactory() {
5555
}
5656

5757
@Override
58-
protected InternalServer newServer(List<ServerStreamTracer.Factory> streamTracerFactories) {
58+
protected List<? extends InternalServer> newServer(
59+
List<ServerStreamTracer.Factory> streamTracerFactories) {
5960
return NettyServerBuilder
6061
.forPort(0)
6162
.flowControlWindow(65 * 1024)
6263
.setTransportTracerFactory(fakeClockTransportTracer)
63-
.buildTransportServer(streamTracerFactories);
64+
.buildTransportServers(streamTracerFactories);
6465
}
6566

6667
@Override
67-
protected InternalServer newServer(
68-
InternalServer server, List<ServerStreamTracer.Factory> streamTracerFactories) {
69-
int port = server.getPort();
68+
protected List<? extends InternalServer> newServer(
69+
int port, List<ServerStreamTracer.Factory> streamTracerFactories) {
7070
return NettyServerBuilder
7171
.forPort(port)
7272
.flowControlWindow(65 * 1024)
7373
.setTransportTracerFactory(fakeClockTransportTracer)
74-
.buildTransportServer(streamTracerFactories);
74+
.buildTransportServers(streamTracerFactories);
7575
}
7676

7777
@Override

0 commit comments

Comments
 (0)