Skip to content

Commit 82b7630

Browse files
netty: support listening on multiple port
1 parent bcd2372 commit 82b7630

File tree

8 files changed

+208
-116
lines changed

8 files changed

+208
-116
lines changed

core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ protected InternalServer newServer(List<ServerStreamTracer.Factory> streamTracer
4444

4545
@Override
4646
protected InternalServer newServer(
47-
InternalServer server, List<ServerStreamTracer.Factory> streamTracerFactories) {
47+
int port, List<ServerStreamTracer.Factory> streamTracerFactories) {
4848
return newServer(streamTracerFactories);
4949
}
5050

netty/src/main/java/io/grpc/netty/NettyServer.java

Lines changed: 134 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323

2424
import com.google.common.base.MoreObjects;
2525
import com.google.common.base.Preconditions;
26-
import com.google.common.collect.ImmutableList;
2726
import com.google.common.util.concurrent.ListenableFuture;
2827
import com.google.common.util.concurrent.SettableFuture;
2928
import io.grpc.InternalChannelz;
@@ -54,9 +53,14 @@
5453
import java.io.IOException;
5554
import java.net.InetSocketAddress;
5655
import java.net.SocketAddress;
56+
import java.util.ArrayList;
57+
import java.util.Collections;
5758
import java.util.HashMap;
5859
import java.util.List;
5960
import java.util.Map;
61+
import java.util.concurrent.CopyOnWriteArrayList;
62+
import java.util.concurrent.atomic.AtomicInteger;
63+
import java.util.concurrent.atomic.AtomicReference;
6064
import java.util.logging.Level;
6165
import java.util.logging.Logger;
6266
import javax.annotation.Nullable;
@@ -68,7 +72,7 @@ class NettyServer implements InternalServer, InternalWithLogId {
6872
private static final Logger log = Logger.getLogger(InternalServer.class.getName());
6973

7074
private final InternalLogId logId;
71-
private final SocketAddress address;
75+
private final List<SocketAddress> requestedListenAddresses;
7276
private final Class<? extends ServerChannel> channelType;
7377
private final Map<ChannelOption<?>, ?> channelOptions;
7478
private final ProtocolNegotiator protocolNegotiator;
@@ -78,7 +82,7 @@ class NettyServer implements InternalServer, InternalWithLogId {
7882
private EventLoopGroup bossGroup;
7983
private EventLoopGroup workerGroup;
8084
private ServerListener listener;
81-
private Channel channel;
85+
private List<Channel> parentChannels = new ArrayList<>();
8286
private final int flowControlWindow;
8387
private final int maxMessageSize;
8488
private final int maxHeaderListSize;
@@ -93,13 +97,13 @@ class NettyServer implements InternalServer, InternalWithLogId {
9397
private final List<? extends ServerStreamTracer.Factory> streamTracerFactories;
9498
private final TransportTracer.Factory transportTracerFactory;
9599
private final InternalChannelz channelz;
96-
// Only modified in event loop but safe to read any time. Set at startup and unset at shutdown.
100+
// Only modified in event loop but safe to read any time. Set at startup and cleared at shutdown.
97101
// In the future we may have >1 listen socket.
98-
private volatile ImmutableList<InternalInstrumented<SocketStats>> listenSockets
99-
= ImmutableList.of();
102+
private final List<ListenSocket> listenSocketStats = new CopyOnWriteArrayList<>();
103+
private final List<SocketAddress> boundListenAddresses = new CopyOnWriteArrayList<>();
100104

101105
NettyServer(
102-
SocketAddress address, Class<? extends ServerChannel> channelType,
106+
List<SocketAddress> listenAddresses, Class<? extends ServerChannel> channelType,
103107
Map<ChannelOption<?>, ?> channelOptions,
104108
@Nullable EventLoopGroup bossGroup, @Nullable EventLoopGroup workerGroup,
105109
ProtocolNegotiator protocolNegotiator,
@@ -111,7 +115,8 @@ class NettyServer implements InternalServer, InternalWithLogId {
111115
long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos,
112116
boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos,
113117
InternalChannelz channelz) {
114-
this.address = address;
118+
this.requestedListenAddresses =
119+
Collections.unmodifiableList(checkNotNull(listenAddresses, "listenAddresses"));
115120
this.channelType = checkNotNull(channelType, "channelType");
116121
checkNotNull(channelOptions, "channelOptions");
117122
this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions);
@@ -134,63 +139,51 @@ class NettyServer implements InternalServer, InternalWithLogId {
134139
this.permitKeepAliveWithoutCalls = permitKeepAliveWithoutCalls;
135140
this.permitKeepAliveTimeInNanos = permitKeepAliveTimeInNanos;
136141
this.channelz = Preconditions.checkNotNull(channelz);
137-
this.logId =
138-
InternalLogId.allocate(getClass(), address != null ? address.toString() : "No address");
142+
this.logId = InternalLogId.allocate(getClass(), requestedListenAddresses.toString());
139143
}
140144

141145
@Override
142146
public int getPort() {
143-
if (channel == null) {
144-
return -1;
145-
}
146-
SocketAddress localAddr = channel.localAddress();
147-
if (!(localAddr instanceof InetSocketAddress)) {
148-
return -1;
147+
for (SocketAddress boundAddress : boundListenAddresses) {
148+
if (boundAddress instanceof InetSocketAddress) {
149+
return ((InetSocketAddress) boundAddress).getPort();
150+
}
149151
}
150-
return ((InetSocketAddress) localAddr).getPort();
152+
return -1;
153+
}
154+
155+
public List<SocketAddress> getListenAddresses() {
156+
return Collections.unmodifiableList(new ArrayList<>(boundListenAddresses));
151157
}
152158

153159
@Override
154160
public List<InternalInstrumented<SocketStats>> getListenSockets() {
155-
return listenSockets;
161+
return Collections.unmodifiableList(
162+
new ArrayList<InternalInstrumented<SocketStats>>(listenSocketStats));
156163
}
157164

158165
@Override
159166
public void start(ServerListener serverListener) throws IOException {
160167
listener = checkNotNull(serverListener, "serverListener");
161-
162168
// If using the shared groups, get references to them.
163169
allocateSharedGroups();
164170

165-
ServerBootstrap b = new ServerBootstrap();
166-
b.group(bossGroup, workerGroup);
167-
b.channel(channelType);
168-
if (NioServerSocketChannel.class.isAssignableFrom(channelType)) {
169-
b.option(SO_BACKLOG, 128);
170-
b.childOption(SO_KEEPALIVE, true);
171-
}
171+
final class ServerInit extends ChannelInitializer<Channel> {
172+
private final AtomicReference<Channel> parentChannelRef;
172173

173-
if (channelOptions != null) {
174-
for (Map.Entry<ChannelOption<?>, ?> entry : channelOptions.entrySet()) {
175-
@SuppressWarnings("unchecked")
176-
ChannelOption<Object> key = (ChannelOption<Object>) entry.getKey();
177-
b.childOption(key, entry.getValue());
174+
ServerInit(AtomicReference<Channel> parentChannelRef) {
175+
this.parentChannelRef = parentChannelRef;
178176
}
179-
}
180177

181-
b.childHandler(new ChannelInitializer<Channel>() {
182178
@Override
183-
public void initChannel(Channel ch) throws Exception {
184-
179+
protected void initChannel(Channel ch) throws Exception {
185180
ChannelPromise channelDone = ch.newPromise();
186-
187181
long maxConnectionAgeInNanos = NettyServer.this.maxConnectionAgeInNanos;
188182
if (maxConnectionAgeInNanos != MAX_CONNECTION_AGE_NANOS_DISABLED) {
189183
// apply a random jitter of +/-10% to max connection age
190184
maxConnectionAgeInNanos =
191185
(long) ((.9D + Math.random() * .2D) * maxConnectionAgeInNanos);
192186
}
193-
194187
NettyServerTransport transport =
195188
new NettyServerTransport(
196189
ch,
@@ -209,10 +202,12 @@ public void initChannel(Channel ch) throws Exception {
209202
maxConnectionAgeGraceInNanos,
210203
permitKeepAliveWithoutCalls,
211204
permitKeepAliveTimeInNanos);
205+
212206
ServerTransportListener transportListener;
213207
// This is to order callbacks on the listener, not to guard access to channel.
214208
synchronized (NettyServer.this) {
215-
if (channel != null && !channel.isOpen()) {
209+
Channel localParentChannel = parentChannelRef.get();
210+
if (localParentChannel == null || !localParentChannel.isOpen()) {
216211
// Server already shutdown.
217212
ch.close();
218213
return;
@@ -223,76 +218,120 @@ public void initChannel(Channel ch) throws Exception {
223218
transportListener = listener.transportCreated(transport);
224219
}
225220

226-
/**
227-
* Releases the event loop if the channel is "done", possibly due to the channel closing.
228-
*/
229-
final class LoopReleaser implements ChannelFutureListener {
230-
boolean done;
231-
232-
@Override
233-
public void operationComplete(ChannelFuture future) throws Exception {
234-
if (!done) {
235-
done = true;
236-
eventLoopReferenceCounter.release();
237-
}
238-
}
239-
}
240-
241221
transport.start(transportListener);
242222
ChannelFutureListener loopReleaser = new LoopReleaser();
243223
channelDone.addListener(loopReleaser);
244224
ch.closeFuture().addListener(loopReleaser);
245225
}
246-
});
247-
// Bind and start to accept incoming connections.
248-
ChannelFuture future = b.bind(address);
249-
try {
250-
future.await();
251-
} catch (InterruptedException ex) {
252-
Thread.currentThread().interrupt();
253-
throw new RuntimeException("Interrupted waiting for bind");
254-
}
255-
if (!future.isSuccess()) {
256-
throw new IOException("Failed to bind", future.cause());
226+
227+
/**
228+
* Releases the event loop if the channel is "done", possibly due to the channel closing.
229+
*/
230+
final class LoopReleaser implements ChannelFutureListener {
231+
boolean done;
232+
233+
@Override
234+
public void operationComplete(ChannelFuture future) throws Exception {
235+
if (!done) {
236+
done = true;
237+
eventLoopReferenceCounter.release();
238+
}
239+
}
240+
}
257241
}
258-
channel = future.channel();
259-
Future<?> channelzFuture = channel.eventLoop().submit(new Runnable() {
260-
@Override
261-
public void run() {
262-
InternalInstrumented<SocketStats> listenSocket = new ListenSocket(channel);
263-
listenSockets = ImmutableList.of(listenSocket);
264-
channelz.addListenSocket(listenSocket);
242+
243+
for (SocketAddress requestedListenAddress : requestedListenAddresses) {
244+
ServerBootstrap b = new ServerBootstrap();
245+
b.group(bossGroup, workerGroup);
246+
b.channel(channelType);
247+
if (NioServerSocketChannel.class.isAssignableFrom(channelType)) {
248+
b.option(SO_BACKLOG, 128);
249+
b.childOption(SO_KEEPALIVE, true);
250+
}
251+
if (channelOptions != null) {
252+
for (Map.Entry<ChannelOption<?>, ?> entry : channelOptions.entrySet()) {
253+
@SuppressWarnings("unchecked")
254+
ChannelOption<Object> key = (ChannelOption<Object>) entry.getKey();
255+
b.childOption(key, entry.getValue());
256+
}
257+
}
258+
AtomicReference<Channel> parentChannelRef = new AtomicReference<>();
259+
b.childHandler(new ServerInit(parentChannelRef));
260+
// Bind and start to accept incoming connections.
261+
ChannelFuture future = b.bind(requestedListenAddress);
262+
try {
263+
future.await();
264+
} catch (InterruptedException ex) {
265+
Thread.currentThread().interrupt();
266+
throw new RuntimeException("Interrupted waiting for bind", ex);
267+
}
268+
if (!future.isSuccess()) {
269+
throw new IOException("Failed to bind", future.cause());
270+
}
271+
final Channel channel = future.channel();
272+
parentChannelRef.set(channel);
273+
parentChannels.add(channel);
274+
boundListenAddresses.add(channel.localAddress());
275+
Future<?> channelzFuture = channel.eventLoop().submit(new Runnable() {
276+
@Override
277+
public void run() {
278+
ListenSocket listenSocket = new ListenSocket(channel);
279+
listenSocketStats.add(listenSocket);
280+
channelz.addListenSocket(listenSocket);
281+
}
282+
});
283+
try {
284+
channelzFuture.await();
285+
} catch (InterruptedException ex) {
286+
throw new RuntimeException("Interrupted while registering listen socket to channelz", ex);
265287
}
266-
});
267-
try {
268-
channelzFuture.await();
269-
} catch (InterruptedException ex) {
270-
throw new RuntimeException("Interrupted while registering listen socket to channelz", ex);
271288
}
272289
}
273290

274291
@Override
275292
public void shutdown() {
276-
if (channel == null || !channel.isOpen()) {
277-
// Already closed.
293+
List<Channel> localParentChannels = parentChannels;
294+
final AtomicInteger remaining = new AtomicInteger(localParentChannels.size());
295+
parentChannels = Collections.emptyList();
296+
if (remaining.get() == 0) {
297+
// Degenerate case, no sockets.
298+
synchronized (NettyServer.this) {
299+
listener.serverShutdown();
300+
}
278301
return;
279302
}
280-
channel.close().addListener(new ChannelFutureListener() {
281-
@Override
282-
public void operationComplete(ChannelFuture future) throws Exception {
283-
if (!future.isSuccess()) {
284-
log.log(Level.WARNING, "Error shutting down server", future.cause());
285-
}
286-
for (InternalInstrumented<SocketStats> listenSocket : listenSockets) {
287-
channelz.removeListenSocket(listenSocket);
288-
}
289-
listenSockets = null;
290-
synchronized (NettyServer.this) {
291-
listener.serverShutdown();
292-
}
293-
eventLoopReferenceCounter.release();
303+
for (final Channel localParentChannel : localParentChannels) {
304+
if (!localParentChannel.isOpen()) {
305+
// Already closed.
306+
continue;
294307
}
295-
});
308+
final SocketAddress boundListenAddress = localParentChannel.localAddress();
309+
localParentChannel.close().addListener(new ChannelFutureListener() {
310+
@Override
311+
public void operationComplete(ChannelFuture future) {
312+
if (!future.isSuccess()) {
313+
log.log(Level.WARNING, "Error shutting down server", future.cause());
314+
}
315+
boundListenAddresses.remove(boundListenAddress);
316+
// This ends up being quadratic, but presumably the number of listen sockets is small.
317+
// Also, it's probably not worth maintaining another map of channel to listenSocket.
318+
for (ListenSocket listenSocket : listenSocketStats) {
319+
// compare to localParentChannel instead of future.channel(), which may be null.
320+
if (listenSocket.ch == localParentChannel) {
321+
channelz.removeListenSocket(listenSocket);
322+
break;
323+
}
324+
}
325+
if (remaining.decrementAndGet() == 0) {
326+
synchronized (NettyServer.this) {
327+
listenSocketStats.clear();
328+
listener.serverShutdown();
329+
}
330+
}
331+
}
332+
});
333+
}
334+
eventLoopReferenceCounter.release();
296335
}
297336

298337
private void allocateSharedGroups() {
@@ -313,11 +352,11 @@ public InternalLogId getLogId() {
313352
public String toString() {
314353
return MoreObjects.toStringHelper(this)
315354
.add("logId", logId.getId())
316-
.add("address", address)
355+
.add("addresses", boundListenAddresses)
317356
.toString();
318357
}
319358

320-
class EventLoopReferenceCounter extends AbstractReferenceCounted {
359+
final class EventLoopReferenceCounter extends AbstractReferenceCounted {
321360
@Override
322361
protected void deallocate() {
323362
try {

0 commit comments

Comments
 (0)