Skip to content

Commit dcb9aef

Browse files
netty: support listening on multiple port
1 parent ea9bdab commit dcb9aef

File tree

8 files changed

+208
-115
lines changed

8 files changed

+208
-115
lines changed

core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ protected InternalServer newServer(List<ServerStreamTracer.Factory> streamTracer
4444

4545
@Override
4646
protected InternalServer newServer(
47-
InternalServer server, List<ServerStreamTracer.Factory> streamTracerFactories) {
47+
int port, List<ServerStreamTracer.Factory> streamTracerFactories) {
4848
return newServer(streamTracerFactories);
4949
}
5050

netty/src/main/java/io/grpc/netty/NettyServer.java

Lines changed: 134 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323

2424
import com.google.common.base.MoreObjects;
2525
import com.google.common.base.Preconditions;
26-
import com.google.common.collect.ImmutableList;
2726
import com.google.common.util.concurrent.ListenableFuture;
2827
import com.google.common.util.concurrent.SettableFuture;
2928
import io.grpc.InternalChannelz;
@@ -54,9 +53,14 @@
5453
import java.io.IOException;
5554
import java.net.InetSocketAddress;
5655
import java.net.SocketAddress;
56+
import java.util.ArrayList;
57+
import java.util.Collections;
5758
import java.util.HashMap;
5859
import java.util.List;
5960
import java.util.Map;
61+
import java.util.concurrent.CopyOnWriteArrayList;
62+
import java.util.concurrent.atomic.AtomicInteger;
63+
import java.util.concurrent.atomic.AtomicReference;
6064
import java.util.logging.Level;
6165
import java.util.logging.Logger;
6266
import javax.annotation.Nullable;
@@ -68,7 +72,7 @@ class NettyServer implements InternalServer, InternalWithLogId {
6872
private static final Logger log = Logger.getLogger(InternalServer.class.getName());
6973

7074
private final InternalLogId logId = InternalLogId.allocate(getClass().getName());
71-
private final SocketAddress address;
75+
private final List<SocketAddress> requestedListenAddresses;
7276
private final Class<? extends ServerChannel> channelType;
7377
private final Map<ChannelOption<?>, ?> channelOptions;
7478
private final ProtocolNegotiator protocolNegotiator;
@@ -78,7 +82,7 @@ class NettyServer implements InternalServer, InternalWithLogId {
7882
private EventLoopGroup bossGroup;
7983
private EventLoopGroup workerGroup;
8084
private ServerListener listener;
81-
private Channel channel;
85+
private List<Channel> parentChannels = new ArrayList<>();
8286
private final int flowControlWindow;
8387
private final int maxMessageSize;
8488
private final int maxHeaderListSize;
@@ -93,13 +97,13 @@ class NettyServer implements InternalServer, InternalWithLogId {
9397
private final List<ServerStreamTracer.Factory> streamTracerFactories;
9498
private final TransportTracer.Factory transportTracerFactory;
9599
private final InternalChannelz channelz;
96-
// Only modified in event loop but safe to read any time. Set at startup and unset at shutdown.
100+
// Only modified in event loop but safe to read any time. Set at startup and cleared at shutdown.
97101
// In the future we may have >1 listen socket.
98-
private volatile ImmutableList<InternalInstrumented<SocketStats>> listenSockets
99-
= ImmutableList.of();
102+
private final List<ListenSocket> listenSocketStats = new CopyOnWriteArrayList<>();
103+
private final List<SocketAddress> boundListenAddresses = new CopyOnWriteArrayList<>();
100104

101105
NettyServer(
102-
SocketAddress address, Class<? extends ServerChannel> channelType,
106+
List<SocketAddress> listenAddresses, Class<? extends ServerChannel> channelType,
103107
Map<ChannelOption<?>, ?> channelOptions,
104108
@Nullable EventLoopGroup bossGroup, @Nullable EventLoopGroup workerGroup,
105109
ProtocolNegotiator protocolNegotiator, List<ServerStreamTracer.Factory> streamTracerFactories,
@@ -110,7 +114,8 @@ class NettyServer implements InternalServer, InternalWithLogId {
110114
long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos,
111115
boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos,
112116
InternalChannelz channelz) {
113-
this.address = address;
117+
this.requestedListenAddresses =
118+
Collections.unmodifiableList(checkNotNull(listenAddresses, "listenAddresses"));
114119
this.channelType = checkNotNull(channelType, "channelType");
115120
checkNotNull(channelOptions, "channelOptions");
116121
this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions);
@@ -137,57 +142,46 @@ class NettyServer implements InternalServer, InternalWithLogId {
137142

138143
@Override
139144
public int getPort() {
140-
if (channel == null) {
141-
return -1;
142-
}
143-
SocketAddress localAddr = channel.localAddress();
144-
if (!(localAddr instanceof InetSocketAddress)) {
145-
return -1;
145+
for (SocketAddress boundAddress : boundListenAddresses) {
146+
if (boundAddress instanceof InetSocketAddress) {
147+
return ((InetSocketAddress) boundAddress).getPort();
148+
}
146149
}
147-
return ((InetSocketAddress) localAddr).getPort();
150+
return -1;
151+
}
152+
153+
public List<SocketAddress> getListenAddresses() {
154+
return Collections.unmodifiableList(new ArrayList<>(boundListenAddresses));
148155
}
149156

150157
@Override
151158
public List<InternalInstrumented<SocketStats>> getListenSockets() {
152-
return listenSockets;
159+
return Collections.unmodifiableList(
160+
new ArrayList<InternalInstrumented<SocketStats>>(listenSocketStats));
153161
}
154162

155163
@Override
156164
public void start(ServerListener serverListener) throws IOException {
157165
listener = checkNotNull(serverListener, "serverListener");
158-
159166
// If using the shared groups, get references to them.
160167
allocateSharedGroups();
161168

162-
ServerBootstrap b = new ServerBootstrap();
163-
b.group(bossGroup, workerGroup);
164-
b.channel(channelType);
165-
if (NioServerSocketChannel.class.isAssignableFrom(channelType)) {
166-
b.option(SO_BACKLOG, 128);
167-
b.childOption(SO_KEEPALIVE, true);
168-
}
169+
final class ServerInit extends ChannelInitializer<Channel> {
170+
private final AtomicReference<Channel> parentChannelRef;
169171

170-
if (channelOptions != null) {
171-
for (Map.Entry<ChannelOption<?>, ?> entry : channelOptions.entrySet()) {
172-
@SuppressWarnings("unchecked")
173-
ChannelOption<Object> key = (ChannelOption<Object>) entry.getKey();
174-
b.childOption(key, entry.getValue());
172+
ServerInit(AtomicReference<Channel> parentChannelRef) {
173+
this.parentChannelRef = parentChannelRef;
175174
}
176-
}
177175

178-
b.childHandler(new ChannelInitializer<Channel>() {
179176
@Override
180-
public void initChannel(Channel ch) throws Exception {
181-
177+
protected void initChannel(Channel ch) throws Exception {
182178
ChannelPromise channelDone = ch.newPromise();
183-
184179
long maxConnectionAgeInNanos = NettyServer.this.maxConnectionAgeInNanos;
185180
if (maxConnectionAgeInNanos != MAX_CONNECTION_AGE_NANOS_DISABLED) {
186181
// apply a random jitter of +/-10% to max connection age
187182
maxConnectionAgeInNanos =
188183
(long) ((.9D + Math.random() * .2D) * maxConnectionAgeInNanos);
189184
}
190-
191185
NettyServerTransport transport =
192186
new NettyServerTransport(
193187
ch,
@@ -206,10 +200,12 @@ public void initChannel(Channel ch) throws Exception {
206200
maxConnectionAgeGraceInNanos,
207201
permitKeepAliveWithoutCalls,
208202
permitKeepAliveTimeInNanos);
203+
209204
ServerTransportListener transportListener;
210205
// This is to order callbacks on the listener, not to guard access to channel.
211206
synchronized (NettyServer.this) {
212-
if (channel != null && !channel.isOpen()) {
207+
Channel localParentChannel = parentChannelRef.get();
208+
if (localParentChannel == null || !localParentChannel.isOpen()) {
213209
// Server already shutdown.
214210
ch.close();
215211
return;
@@ -220,76 +216,120 @@ public void initChannel(Channel ch) throws Exception {
220216
transportListener = listener.transportCreated(transport);
221217
}
222218

223-
/**
224-
* Releases the event loop if the channel is "done", possibly due to the channel closing.
225-
*/
226-
final class LoopReleaser implements ChannelFutureListener {
227-
boolean done;
228-
229-
@Override
230-
public void operationComplete(ChannelFuture future) throws Exception {
231-
if (!done) {
232-
done = true;
233-
eventLoopReferenceCounter.release();
234-
}
235-
}
236-
}
237-
238219
transport.start(transportListener);
239220
ChannelFutureListener loopReleaser = new LoopReleaser();
240221
channelDone.addListener(loopReleaser);
241222
ch.closeFuture().addListener(loopReleaser);
242223
}
243-
});
244-
// Bind and start to accept incoming connections.
245-
ChannelFuture future = b.bind(address);
246-
try {
247-
future.await();
248-
} catch (InterruptedException ex) {
249-
Thread.currentThread().interrupt();
250-
throw new RuntimeException("Interrupted waiting for bind");
251-
}
252-
if (!future.isSuccess()) {
253-
throw new IOException("Failed to bind", future.cause());
224+
225+
/**
226+
* Releases the event loop if the channel is "done", possibly due to the channel closing.
227+
*/
228+
final class LoopReleaser implements ChannelFutureListener {
229+
boolean done;
230+
231+
@Override
232+
public void operationComplete(ChannelFuture future) throws Exception {
233+
if (!done) {
234+
done = true;
235+
eventLoopReferenceCounter.release();
236+
}
237+
}
238+
}
254239
}
255-
channel = future.channel();
256-
Future<?> channelzFuture = channel.eventLoop().submit(new Runnable() {
257-
@Override
258-
public void run() {
259-
InternalInstrumented<SocketStats> listenSocket = new ListenSocket(channel);
260-
listenSockets = ImmutableList.of(listenSocket);
261-
channelz.addListenSocket(listenSocket);
240+
241+
for (SocketAddress requestedListenAddress : requestedListenAddresses) {
242+
ServerBootstrap b = new ServerBootstrap();
243+
b.group(bossGroup, workerGroup);
244+
b.channel(channelType);
245+
if (NioServerSocketChannel.class.isAssignableFrom(channelType)) {
246+
b.option(SO_BACKLOG, 128);
247+
b.childOption(SO_KEEPALIVE, true);
248+
}
249+
if (channelOptions != null) {
250+
for (Map.Entry<ChannelOption<?>, ?> entry : channelOptions.entrySet()) {
251+
@SuppressWarnings("unchecked")
252+
ChannelOption<Object> key = (ChannelOption<Object>) entry.getKey();
253+
b.childOption(key, entry.getValue());
254+
}
255+
}
256+
AtomicReference<Channel> parentChannelRef = new AtomicReference<>();
257+
b.childHandler(new ServerInit(parentChannelRef));
258+
// Bind and start to accept incoming connections.
259+
ChannelFuture future = b.bind(requestedListenAddress);
260+
try {
261+
future.await();
262+
} catch (InterruptedException ex) {
263+
Thread.currentThread().interrupt();
264+
throw new RuntimeException("Interrupted waiting for bind", ex);
265+
}
266+
if (!future.isSuccess()) {
267+
throw new IOException("Failed to bind", future.cause());
268+
}
269+
final Channel channel = future.channel();
270+
parentChannelRef.set(channel);
271+
parentChannels.add(channel);
272+
boundListenAddresses.add(channel.localAddress());
273+
Future<?> channelzFuture = channel.eventLoop().submit(new Runnable() {
274+
@Override
275+
public void run() {
276+
ListenSocket listenSocket = new ListenSocket(channel);
277+
listenSocketStats.add(listenSocket);
278+
channelz.addListenSocket(listenSocket);
279+
}
280+
});
281+
try {
282+
channelzFuture.await();
283+
} catch (InterruptedException ex) {
284+
throw new RuntimeException("Interrupted while registering listen socket to channelz", ex);
262285
}
263-
});
264-
try {
265-
channelzFuture.await();
266-
} catch (InterruptedException ex) {
267-
throw new RuntimeException("Interrupted while registering listen socket to channelz", ex);
268286
}
269287
}
270288

271289
@Override
272290
public void shutdown() {
273-
if (channel == null || !channel.isOpen()) {
274-
// Already closed.
291+
List<Channel> localParentChannels = parentChannels;
292+
final AtomicInteger remaining = new AtomicInteger(localParentChannels.size());
293+
parentChannels = Collections.emptyList();
294+
if (remaining.get() == 0) {
295+
// Degenerate case, no sockets.
296+
synchronized (NettyServer.this) {
297+
listener.serverShutdown();
298+
}
275299
return;
276300
}
277-
channel.close().addListener(new ChannelFutureListener() {
278-
@Override
279-
public void operationComplete(ChannelFuture future) throws Exception {
280-
if (!future.isSuccess()) {
281-
log.log(Level.WARNING, "Error shutting down server", future.cause());
282-
}
283-
for (InternalInstrumented<SocketStats> listenSocket : listenSockets) {
284-
channelz.removeListenSocket(listenSocket);
285-
}
286-
listenSockets = null;
287-
synchronized (NettyServer.this) {
288-
listener.serverShutdown();
289-
}
290-
eventLoopReferenceCounter.release();
301+
for (final Channel localParentChannel : localParentChannels) {
302+
if (!localParentChannel.isOpen()) {
303+
// Already closed.
304+
continue;
291305
}
292-
});
306+
final SocketAddress boundListenAddress = localParentChannel.localAddress();
307+
localParentChannel.close().addListener(new ChannelFutureListener() {
308+
@Override
309+
public void operationComplete(ChannelFuture future) {
310+
if (!future.isSuccess()) {
311+
log.log(Level.WARNING, "Error shutting down server", future.cause());
312+
}
313+
boundListenAddresses.remove(boundListenAddress);
314+
// This ends up being quadratic, but presumably the number of listen sockets is small.
315+
// Also, it's probably not worth maintaining another map of channel to listenSocket.
316+
for (ListenSocket listenSocket : listenSocketStats) {
317+
// compare to localParentChannel instead of future.channel(), which may be null.
318+
if (listenSocket.ch == localParentChannel) {
319+
channelz.removeListenSocket(listenSocket);
320+
break;
321+
}
322+
}
323+
if (remaining.decrementAndGet() == 0) {
324+
synchronized (NettyServer.this) {
325+
listenSocketStats.clear();
326+
listener.serverShutdown();
327+
}
328+
}
329+
}
330+
});
331+
}
332+
eventLoopReferenceCounter.release();
293333
}
294334

295335
private void allocateSharedGroups() {
@@ -310,11 +350,11 @@ public InternalLogId getLogId() {
310350
public String toString() {
311351
return MoreObjects.toStringHelper(this)
312352
.add("logId", logId.getId())
313-
.add("address", address)
353+
.add("addresses", boundListenAddresses)
314354
.toString();
315355
}
316356

317-
class EventLoopReferenceCounter extends AbstractReferenceCounted {
357+
final class EventLoopReferenceCounter extends AbstractReferenceCounted {
318358
@Override
319359
protected void deallocate() {
320360
try {
@@ -344,7 +384,7 @@ public ReferenceCounted touch(Object hint) {
344384
*/
345385
private static final class ListenSocket implements InternalInstrumented<SocketStats> {
346386
private final InternalLogId id = InternalLogId.allocate(getClass().getName());
347-
private final Channel ch;
387+
final Channel ch;
348388

349389
ListenSocket(Channel ch) {
350390
this.ch = ch;

0 commit comments

Comments
 (0)