Skip to content

Commit ed0a9f3

Browse files
netty: support listening on multiple ports
1 parent 4ba168f commit ed0a9f3

File tree

15 files changed

+180
-80
lines changed

15 files changed

+180
-80
lines changed

core/src/main/java/io/grpc/InternalChannelz.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,7 @@ public static final class Builder {
311311
private long callsSucceeded;
312312
private long callsFailed;
313313
private long lastCallStartedNanos;
314-
public List<InternalInstrumented<SocketStats>> listenSockets = Collections.emptyList();
314+
public List<InternalInstrumented<SocketStats>> listenSockets = new ArrayList<>();
315315

316316
public Builder setCallsStarted(long callsStarted) {
317317
this.callsStarted = callsStarted;
@@ -334,10 +334,11 @@ public Builder setLastCallStartedNanos(long lastCallStartedNanos) {
334334
}
335335

336336
/** Sets the listen sockets. */
337-
public Builder setListenSockets(List<InternalInstrumented<SocketStats>> listenSockets) {
338-
checkNotNull(listenSockets);
339-
this.listenSockets = Collections.unmodifiableList(
340-
new ArrayList<InternalInstrumented<SocketStats>>(listenSockets));
337+
public Builder addListenSockets(List<InternalInstrumented<SocketStats>> listenSockets) {
338+
checkNotNull(listenSockets, "listenSockets");
339+
for (InternalInstrumented<SocketStats> ss : listenSockets) {
340+
this.listenSockets.add(checkNotNull(ss, "null listen socket"));
341+
}
341342
return this;
342343
}
343344

core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import io.grpc.internal.ObjectPool;
2828
import io.grpc.internal.SharedResourcePool;
2929
import java.io.File;
30+
import java.util.Collections;
3031
import java.util.List;
3132
import java.util.UUID;
3233
import java.util.concurrent.ScheduledExecutorService;
@@ -145,9 +146,9 @@ public InProcessServerBuilder maxInboundMetadataSize(int bytes) {
145146
}
146147

147148
@Override
148-
protected InProcessServer buildTransportServer(
149+
protected List<InProcessServer> buildTransportServers(
149150
List<? extends ServerStreamTracer.Factory> streamTracerFactories) {
150-
return new InProcessServer(this, streamTracerFactories);
151+
return Collections.singletonList(new InProcessServer(this, streamTracerFactories));
151152
}
152153

153154
@Override

core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ protected void setTracingEnabled(boolean value) {
220220
public final Server build() {
221221
ServerImpl server = new ServerImpl(
222222
this,
223-
buildTransportServer(getTracerFactories()),
223+
buildTransportServers(getTracerFactories()),
224224
Context.ROOT);
225225
for (InternalNotifyOnServerBuild notifyTarget : notifyOnBuildList) {
226226
notifyTarget.notifyOnBuild(server);
@@ -266,7 +266,7 @@ protected final TransportTracer.Factory getTransportTracerFactory() {
266266
*
267267
* @param streamTracerFactories an immutable list of stream tracer factories
268268
*/
269-
protected abstract io.grpc.internal.InternalServer buildTransportServer(
269+
protected abstract List<? extends io.grpc.internal.InternalServer> buildTransportServers(
270270
List<? extends ServerStreamTracer.Factory> streamTracerFactories);
271271

272272
private T thisT() {

core/src/main/java/io/grpc/internal/ServerImpl.java

Lines changed: 47 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import java.util.Collections;
5757
import java.util.HashSet;
5858
import java.util.List;
59+
import java.util.Set;
5960
import java.util.concurrent.Executor;
6061
import java.util.concurrent.Future;
6162
import java.util.concurrent.FutureTask;
@@ -101,12 +102,12 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
101102
@GuardedBy("lock") private boolean serverShutdownCallbackInvoked;
102103
@GuardedBy("lock") private boolean terminated;
103104
/** Service encapsulating something similar to an accept() socket. */
104-
private final InternalServer transportServer;
105+
private final List<? extends InternalServer> transportServers;
105106
private final Object lock = new Object();
106-
@GuardedBy("lock") private boolean transportServerTerminated;
107+
@GuardedBy("lock") private boolean transportServersTerminated;
107108
/** {@code transportServer} and services encapsulating something similar to a TCP connection. */
108-
@GuardedBy("lock") private final Collection<ServerTransport> transports =
109-
new HashSet<ServerTransport>();
109+
@GuardedBy("lock") private final Set<ServerTransport> transports = new HashSet<>();
110+
@GuardedBy("lock") private int activeTransportServers;
110111

111112
private final Context rootContext;
112113

@@ -126,14 +127,18 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
126127
*/
127128
ServerImpl(
128129
AbstractServerImplBuilder<?> builder,
129-
InternalServer transportServer,
130+
List<? extends InternalServer> transportServers,
130131
Context rootContext) {
131132
this.executorPool = Preconditions.checkNotNull(builder.executorPool, "executorPool");
132133
this.registry = Preconditions.checkNotNull(builder.registryBuilder.build(), "registryBuilder");
133134
this.fallbackRegistry =
134135
Preconditions.checkNotNull(builder.fallbackRegistry, "fallbackRegistry");
135-
this.transportServer = Preconditions.checkNotNull(transportServer, "transportServer");
136-
this.logId = InternalLogId.allocate("Server", String.valueOf(transportServer.getPort()));
136+
Preconditions.checkNotNull(transportServers, "transportServers");
137+
Preconditions.checkArgument(!transportServers.isEmpty(), "no servers provided");
138+
this.transportServers = new ArrayList<>(transportServers);
139+
// TODO(notcarl): concatenate all listening ports in the Log Id.
140+
this.logId =
141+
InternalLogId.allocate("Server", String.valueOf(transportServers.get(0).getPort()));
137142
// Fork from the passed in context so that it does not propagate cancellation, it only
138143
// inherits values.
139144
this.rootContext = Preconditions.checkNotNull(rootContext, "rootContext").fork();
@@ -163,8 +168,13 @@ public ServerImpl start() throws IOException {
163168
synchronized (lock) {
164169
checkState(!started, "Already started");
165170
checkState(!shutdown, "Shutting down");
166-
// Start and wait for any port to actually be bound.
167-
transportServer.start(new ServerListenerImpl());
171+
// Start and wait for any ports to actually be bound.
172+
173+
ServerListenerImpl listener = new ServerListenerImpl();
174+
for (InternalServer ts : transportServers) {
175+
ts.start(listener);
176+
activeTransportServers++;
177+
}
168178
executor = Preconditions.checkNotNull(executorPool.getObject(), "executor");
169179
started = true;
170180
return this;
@@ -176,7 +186,13 @@ public int getPort() {
176186
synchronized (lock) {
177187
checkState(started, "Not started");
178188
checkState(!terminated, "Already terminated");
179-
return transportServer.getPort();
189+
for (InternalServer ts : transportServers) {
190+
int port = ts.getPort();
191+
if (port != -1) {
192+
return port;
193+
}
194+
}
195+
return -1;
180196
}
181197
}
182198

@@ -211,20 +227,22 @@ public List<ServerServiceDefinition> getMutableServices() {
211227
*/
212228
@Override
213229
public ServerImpl shutdown() {
214-
boolean shutdownTransportServer;
230+
boolean shutdownTransportServers;
215231
synchronized (lock) {
216232
if (shutdown) {
217233
return this;
218234
}
219235
shutdown = true;
220-
shutdownTransportServer = started;
221-
if (!shutdownTransportServer) {
222-
transportServerTerminated = true;
236+
shutdownTransportServers = started;
237+
if (!shutdownTransportServers) {
238+
transportServersTerminated = true;
223239
checkForTermination();
224240
}
225241
}
226-
if (shutdownTransportServer) {
227-
transportServer.shutdown();
242+
if (shutdownTransportServers) {
243+
for (InternalServer ts : transportServers) {
244+
ts.shutdown();
245+
}
228246
}
229247
return this;
230248
}
@@ -311,7 +329,7 @@ private void transportClosed(ServerTransport transport) {
311329
/** Notify of complete shutdown if necessary. */
312330
private void checkForTermination() {
313331
synchronized (lock) {
314-
if (shutdown && transports.isEmpty() && transportServerTerminated) {
332+
if (shutdown && transports.isEmpty() && transportServersTerminated) {
315333
if (terminated) {
316334
throw new AssertionError("Server already terminated");
317335
}
@@ -320,13 +338,13 @@ private void checkForTermination() {
320338
if (executor != null) {
321339
executor = executorPool.returnObject(executor);
322340
}
323-
// TODO(carl-mastrangelo): move this outside the synchronized block.
324341
lock.notifyAll();
325342
}
326343
}
327344
}
328345

329346
private final class ServerListenerImpl implements ServerListener {
347+
330348
@Override
331349
public ServerTransportListener transportCreated(ServerTransport transport) {
332350
synchronized (lock) {
@@ -342,6 +360,11 @@ public void serverShutdown() {
342360
ArrayList<ServerTransport> copiedTransports;
343361
Status shutdownNowStatusCopy;
344362
synchronized (lock) {
363+
activeTransportServers--;
364+
if (activeTransportServers != 0) {
365+
return;
366+
}
367+
345368
// transports collection can be modified during shutdown(), even if we hold the lock, due
346369
// to reentrancy.
347370
copiedTransports = new ArrayList<>(transports);
@@ -356,7 +379,7 @@ public void serverShutdown() {
356379
}
357380
}
358381
synchronized (lock) {
359-
transportServerTerminated = true;
382+
transportServersTerminated = true;
360383
checkForTermination();
361384
}
362385
}
@@ -577,9 +600,10 @@ public InternalLogId getLogId() {
577600

578601
@Override
579602
public ListenableFuture<ServerStats> getStats() {
580-
ServerStats.Builder builder
581-
= new ServerStats.Builder()
582-
.setListenSockets(transportServer.getListenSockets());
603+
ServerStats.Builder builder = new ServerStats.Builder();
604+
for (InternalServer ts : transportServers) {
605+
builder.addListenSockets(ts.getListenSockets());
606+
}
583607
serverCallTracer.updateBuilder(builder);
584608
SettableFuture<ServerStats> ret = SettableFuture.create();
585609
ret.set(builder.build());
@@ -590,7 +614,7 @@ public ListenableFuture<ServerStats> getStats() {
590614
public String toString() {
591615
return MoreObjects.toStringHelper(this)
592616
.add("logId", logId.getId())
593-
.add("transportServer", transportServer)
617+
.add("transportServers", transportServers)
594618
.toString();
595619
}
596620

core/src/test/java/io/grpc/inprocess/InProcessServerBuilderTest.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static org.junit.Assert.assertNotNull;
2323
import static org.junit.Assert.assertSame;
2424

25+
import com.google.common.collect.Iterables;
2526
import io.grpc.ServerStreamTracer.Factory;
2627
import io.grpc.internal.FakeClock;
2728
import io.grpc.internal.ObjectPool;
@@ -54,7 +55,8 @@ public void generateName() {
5455
@Test
5556
public void scheduledExecutorService_default() {
5657
InProcessServerBuilder builder = InProcessServerBuilder.forName("foo");
57-
InProcessServer server = builder.buildTransportServer(new ArrayList<Factory>());
58+
InProcessServer server =
59+
Iterables.getOnlyElement(builder.buildTransportServers(new ArrayList<Factory>()));
5860

5961
ObjectPool<ScheduledExecutorService> scheduledExecutorServicePool =
6062
server.getScheduledExecutorServicePool();
@@ -78,7 +80,8 @@ public void scheduledExecutorService_custom() {
7880
InProcessServerBuilder builder1 = builder.scheduledExecutorService(scheduledExecutorService);
7981
assertSame(builder, builder1);
8082

81-
InProcessServer server = builder1.buildTransportServer(new ArrayList<Factory>());
83+
InProcessServer server =
84+
Iterables.getOnlyElement(builder1.buildTransportServers(new ArrayList<Factory>()));
8285
ObjectPool<ScheduledExecutorService> scheduledExecutorServicePool =
8386
server.getScheduledExecutorServicePool();
8487

core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.grpc.internal.InternalServer;
2222
import io.grpc.internal.ManagedClientTransport;
2323
import io.grpc.internal.testing.AbstractTransportTest;
24+
import java.util.Collections;
2425
import java.util.List;
2526
import org.junit.Ignore;
2627
import org.junit.Test;
@@ -35,16 +36,17 @@ public class InProcessTransportTest extends AbstractTransportTest {
3536
private static final String USER_AGENT = "a-testing-user-agent";
3637

3738
@Override
38-
protected InternalServer newServer(List<ServerStreamTracer.Factory> streamTracerFactories) {
39+
protected List<? extends InternalServer> newServer(
40+
List<ServerStreamTracer.Factory> streamTracerFactories) {
3941
InProcessServerBuilder builder = InProcessServerBuilder
4042
.forName(TRANSPORT_NAME)
4143
.maxInboundMetadataSize(GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE);
42-
return new InProcessServer(builder, streamTracerFactories);
44+
return Collections.singletonList(new InProcessServer(builder, streamTracerFactories));
4345
}
4446

4547
@Override
46-
protected InternalServer newServer(
47-
InternalServer server, List<ServerStreamTracer.Factory> streamTracerFactories) {
48+
protected List<? extends InternalServer> newServer(
49+
int port, List<ServerStreamTracer.Factory> streamTracerFactories) {
4850
return newServer(streamTracerFactories);
4951
}
5052

core/src/test/java/io/grpc/internal/AbstractServerImplBuilderTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ static class Builder extends AbstractServerImplBuilder<Builder> {
101101
}
102102

103103
@Override
104-
protected io.grpc.internal.InternalServer buildTransportServer(
104+
protected List<io.grpc.internal.InternalServer> buildTransportServers(
105105
List<? extends ServerStreamTracer.Factory> streamTracerFactories) {
106106
throw new UnsupportedOperationException();
107107
}

core/src/test/java/io/grpc/internal/ServerImplTest.java

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import static org.mockito.Mockito.verifyNoMoreInteractions;
4545
import static org.mockito.Mockito.when;
4646

47+
import com.google.common.collect.ImmutableList;
4748
import com.google.common.util.concurrent.ListenableFuture;
4849
import com.google.common.util.concurrent.MoreExecutors;
4950
import com.google.common.util.concurrent.SettableFuture;
@@ -87,6 +88,7 @@
8788
import java.util.Collections;
8889
import java.util.LinkedList;
8990
import java.util.List;
91+
import java.util.concurrent.CountDownLatch;
9092
import java.util.concurrent.CyclicBarrier;
9193
import java.util.concurrent.Executor;
9294
import java.util.concurrent.ScheduledExecutorService;
@@ -210,6 +212,42 @@ public void noPendingTasks() {
210212
assertEquals(0, timer.numPendingTasks());
211213
}
212214

215+
@Test
216+
public void multiport() throws Exception {
217+
final CountDownLatch starts = new CountDownLatch(2);
218+
final CountDownLatch shutdowns = new CountDownLatch(2);
219+
220+
final class Serv extends SimpleServer {
221+
@Override
222+
public void start(ServerListener listener) throws IOException {
223+
super.start(listener);
224+
starts.countDown();
225+
}
226+
227+
@Override
228+
public void shutdown() {
229+
super.shutdown();
230+
shutdowns.countDown();
231+
}
232+
}
233+
234+
SimpleServer transportServer1 = new Serv();
235+
SimpleServer transportServer2 = new Serv();
236+
assertNull(server);
237+
builder.fallbackHandlerRegistry(fallbackRegistry);
238+
builder.executorPool = executorPool;
239+
server = new ServerImpl(
240+
builder, ImmutableList.of(transportServer1, transportServer2), SERVER_CONTEXT);
241+
242+
server.start();
243+
assertTrue(starts.await(1, TimeUnit.SECONDS));
244+
assertEquals(2, shutdowns.getCount());
245+
246+
server.shutdown();
247+
assertTrue(shutdowns.await(1, TimeUnit.SECONDS));
248+
assertTrue(server.awaitTermination(1, TimeUnit.SECONDS));
249+
}
250+
213251
@Test
214252
public void startStopImmediate() throws IOException {
215253
transportServer = new SimpleServer() {
@@ -1326,7 +1364,7 @@ private void createServer() {
13261364

13271365
builder.fallbackHandlerRegistry(fallbackRegistry);
13281366
builder.executorPool = executorPool;
1329-
server = new ServerImpl(builder, transportServer, SERVER_CONTEXT);
1367+
server = new ServerImpl(builder, Collections.singletonList(transportServer), SERVER_CONTEXT);
13301368
}
13311369

13321370
private void verifyExecutorsAcquired() {
@@ -1411,7 +1449,7 @@ public ListenableFuture<SocketStats> getStats() {
14111449
}
14121450

14131451
private static class Builder extends AbstractServerImplBuilder<Builder> {
1414-
@Override protected InternalServer buildTransportServer(
1452+
@Override protected List<InternalServer> buildTransportServers(
14151453
List<? extends ServerStreamTracer.Factory> streamTracerFactories) {
14161454
throw new UnsupportedOperationException();
14171455
}

0 commit comments

Comments
 (0)