Skip to content
This repository was archived by the owner on Apr 12, 2020. It is now read-only.

Commit 58fd90d

Browse files
committed
Avoid multiple calls to initialize of AutoFlushHadler and extracted common config part MQTT pipeline
1 parent 8f7b7df commit 58fd90d

File tree

2 files changed

+65
-98
lines changed

2 files changed

+65
-98
lines changed

broker/src/main/java/io/moquette/broker/NewNettyAcceptor.java

Lines changed: 56 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -95,10 +95,10 @@ private abstract static class PipelineInitializer {
9595

9696
private static final Logger LOG = LoggerFactory.getLogger(NewNettyAcceptor.class);
9797

98-
EventLoopGroup m_bossGroup;
99-
EventLoopGroup m_workerGroup;
100-
BytesMetricsCollector m_bytesMetricsCollector = new BytesMetricsCollector();
101-
MessageMetricsCollector m_metricsCollector = new MessageMetricsCollector();
98+
private EventLoopGroup bossGroup;
99+
private EventLoopGroup workerGroup;
100+
private BytesMetricsCollector bytesMetricsCollector = new BytesMetricsCollector();
101+
private MessageMetricsCollector metricsCollector = new MessageMetricsCollector();
102102
private Optional<? extends ChannelInboundHandler> metrics;
103103
private Optional<? extends ChannelInboundHandler> errorsCather;
104104

@@ -125,13 +125,13 @@ public void initialize(NewNettyMQTTHandler mqttHandler, IConfig props, ISslConte
125125
boolean epoll = props.boolProp(BrokerConstants.NETTY_EPOLL_PROPERTY_NAME, false);
126126
if (epoll) {
127127
LOG.info("Netty is using Epoll");
128-
m_bossGroup = new EpollEventLoopGroup();
129-
m_workerGroup = new EpollEventLoopGroup();
128+
bossGroup = new EpollEventLoopGroup();
129+
workerGroup = new EpollEventLoopGroup();
130130
channelClass = EpollServerSocketChannel.class;
131131
} else {
132132
LOG.info("Netty is using NIO");
133-
m_bossGroup = new NioEventLoopGroup();
134-
m_workerGroup = new NioEventLoopGroup();
133+
bossGroup = new NioEventLoopGroup();
134+
workerGroup = new NioEventLoopGroup();
135135
channelClass = NioServerSocketChannel.class;
136136
}
137137

@@ -154,9 +154,7 @@ public void initialize(NewNettyMQTTHandler mqttHandler, IConfig props, ISslConte
154154
}
155155
initializePlainTCPTransport(mqttHandler, props);
156156
initializeWebSocketTransport(mqttHandler, props);
157-
String sslTcpPortProp = props.getProperty(BrokerConstants.SSL_PORT_PROPERTY_NAME);
158-
String wssPortProp = props.getProperty(BrokerConstants.WSS_PORT_PROPERTY_NAME);
159-
if (sslTcpPortProp != null || wssPortProp != null) {
157+
if (securityPortsConfigured(props)) {
160158
SslContext sslContext = sslCtxCreator.initSSLContext();
161159
if (sslContext == null) {
162160
LOG.error("Can't initialize SSLHandler layer! Exiting, check your configuration of jks");
@@ -167,15 +165,21 @@ public void initialize(NewNettyMQTTHandler mqttHandler, IConfig props, ISslConte
167165
}
168166
}
169167

170-
private void initFactory(String host, int port, String protocol, final PipelineInitializer pipeliner) {
168+
private boolean securityPortsConfigured(IConfig props) {
169+
String sslTcpPortProp = props.getProperty(BrokerConstants.SSL_PORT_PROPERTY_NAME);
170+
String wssPortProp = props.getProperty(BrokerConstants.WSS_PORT_PROPERTY_NAME);
171+
return sslTcpPortProp != null || wssPortProp != null;
172+
}
173+
174+
private void initFactory(String host, int port, String protocol, final PipelineInitializer pipelieInitializer) {
171175
LOG.debug("Initializing server. Protocol={}", protocol);
172176
ServerBootstrap b = new ServerBootstrap();
173-
b.group(m_bossGroup, m_workerGroup).channel(channelClass)
177+
b.group(bossGroup, workerGroup).channel(channelClass)
174178
.childHandler(new ChannelInitializer<SocketChannel>() {
175179

176180
@Override
177181
public void initChannel(SocketChannel ch) throws Exception {
178-
pipeliner.init(ch);
182+
pipelieInitializer.init(ch);
179183
}
180184
})
181185
.option(ChannelOption.SO_BACKLOG, nettySoBacklog)
@@ -200,7 +204,7 @@ private void initializePlainTCPTransport(NewNettyMQTTHandler handler, IConfig pr
200204
String tcpPortProp = props.getProperty(PORT_PROPERTY_NAME, DISABLED_PORT_BIND);
201205
if (DISABLED_PORT_BIND.equals(tcpPortProp)) {
202206
LOG.info("Property {} has been set to {}. TCP MQTT will be disabled", BrokerConstants.PORT_PROPERTY_NAME,
203-
DISABLED_PORT_BIND);
207+
DISABLED_PORT_BIND);
204208
return;
205209
}
206210
int port = Integer.parseInt(tcpPortProp);
@@ -209,33 +213,38 @@ private void initializePlainTCPTransport(NewNettyMQTTHandler handler, IConfig pr
209213
@Override
210214
void init(SocketChannel channel) {
211215
ChannelPipeline pipeline = channel.pipeline();
212-
pipeline.addFirst("idleStateHandler", new IdleStateHandler(nettyChannelTimeoutSeconds, 0, 0));
213-
pipeline.addAfter("idleStateHandler", "idleEventHandler", timeoutHandler);
214-
// pipeline.addLast("logger", new LoggingHandler("Netty", LogLevel.ERROR));
215-
if (errorsCather.isPresent()) {
216-
pipeline.addLast("bugsnagCatcher", errorsCather.get());
217-
}
218-
pipeline.addFirst("bytemetrics", new BytesMetricsHandler(m_bytesMetricsCollector));
219-
pipeline.addLast("autoflush", new AutoFlushHandler(1, TimeUnit.SECONDS));
220-
pipeline.addLast("decoder", new MqttDecoder(maxBytesInMessage));
221-
pipeline.addLast("encoder", MqttEncoder.INSTANCE);
222-
pipeline.addLast("metrics", new MessageMetricsHandler(m_metricsCollector));
223-
pipeline.addLast("messageLogger", new MQTTMessageLogger());
224-
if (metrics.isPresent()) {
225-
pipeline.addLast("wizardMetrics", metrics.get());
226-
}
227-
pipeline.addLast("handler", handler);
216+
configureMQTTPipeline(pipeline, timeoutHandler, handler);
228217
}
229218
});
230219
}
231220

221+
private void configureMQTTPipeline(ChannelPipeline pipeline, MoquetteIdleTimeoutHandler timeoutHandler,
222+
NewNettyMQTTHandler handler) {
223+
pipeline.addFirst("idleStateHandler", new IdleStateHandler(nettyChannelTimeoutSeconds, 0, 0));
224+
pipeline.addAfter("idleStateHandler", "idleEventHandler", timeoutHandler);
225+
// pipeline.addLast("logger", new LoggingHandler("Netty", LogLevel.ERROR));
226+
if (errorsCather.isPresent()) {
227+
pipeline.addLast("bugsnagCatcher", errorsCather.get());
228+
}
229+
pipeline.addFirst("bytemetrics", new BytesMetricsHandler(bytesMetricsCollector));
230+
pipeline.addLast("autoflush", new AutoFlushHandler(1, TimeUnit.SECONDS));
231+
pipeline.addLast("decoder", new MqttDecoder(maxBytesInMessage));
232+
pipeline.addLast("encoder", MqttEncoder.INSTANCE);
233+
pipeline.addLast("metrics", new MessageMetricsHandler(metricsCollector));
234+
pipeline.addLast("messageLogger", new MQTTMessageLogger());
235+
if (metrics.isPresent()) {
236+
pipeline.addLast("wizardMetrics", metrics.get());
237+
}
238+
pipeline.addLast("handler", handler);
239+
}
240+
232241
private void initializeWebSocketTransport(final NewNettyMQTTHandler handler, IConfig props) {
233242
LOG.debug("Configuring Websocket MQTT transport");
234243
String webSocketPortProp = props.getProperty(WEB_SOCKET_PORT_PROPERTY_NAME, DISABLED_PORT_BIND);
235244
if (DISABLED_PORT_BIND.equals(webSocketPortProp)) {
236245
// Do nothing no WebSocket configured
237246
LOG.info("Property {} has been setted to {}. Websocket MQTT will be disabled",
238-
BrokerConstants.WEB_SOCKET_PORT_PROPERTY_NAME, DISABLED_PORT_BIND);
247+
BrokerConstants.WEB_SOCKET_PORT_PROPERTY_NAME, DISABLED_PORT_BIND);
239248
return;
240249
}
241250
int port = Integer.parseInt(webSocketPortProp);
@@ -254,15 +263,7 @@ void init(SocketChannel channel) {
254263
new WebSocketServerProtocolHandler("/mqtt", MQTT_SUBPROTOCOL_CSV_LIST));
255264
pipeline.addLast("ws2bytebufDecoder", new WebSocketFrameToByteBufDecoder());
256265
pipeline.addLast("bytebuf2wsEncoder", new ByteBufToWebSocketFrameEncoder());
257-
pipeline.addFirst("idleStateHandler", new IdleStateHandler(nettyChannelTimeoutSeconds, 0, 0));
258-
pipeline.addAfter("idleStateHandler", "idleEventHandler", timeoutHandler);
259-
pipeline.addFirst("bytemetrics", new BytesMetricsHandler(m_bytesMetricsCollector));
260-
pipeline.addLast("autoflush", new AutoFlushHandler(1, TimeUnit.SECONDS));
261-
pipeline.addLast("decoder", new MqttDecoder(maxBytesInMessage));
262-
pipeline.addLast("encoder", MqttEncoder.INSTANCE);
263-
pipeline.addLast("metrics", new MessageMetricsHandler(m_metricsCollector));
264-
pipeline.addLast("messageLogger", new MQTTMessageLogger());
265-
pipeline.addLast("handler", handler);
266+
configureMQTTPipeline(pipeline, timeoutHandler, handler);
266267
}
267268
});
268269
}
@@ -273,7 +274,7 @@ private void initializeSSLTCPTransport(NewNettyMQTTHandler handler, IConfig prop
273274
if (DISABLED_PORT_BIND.equals(sslPortProp)) {
274275
// Do nothing no SSL configured
275276
LOG.info("Property {} has been set to {}. SSL MQTT will be disabled",
276-
BrokerConstants.SSL_PORT_PROPERTY_NAME, DISABLED_PORT_BIND);
277+
BrokerConstants.SSL_PORT_PROPERTY_NAME, DISABLED_PORT_BIND);
277278
return;
278279
}
279280

@@ -290,16 +291,7 @@ private void initializeSSLTCPTransport(NewNettyMQTTHandler handler, IConfig prop
290291
void init(SocketChannel channel) throws Exception {
291292
ChannelPipeline pipeline = channel.pipeline();
292293
pipeline.addLast("ssl", createSslHandler(channel, sslContext, needsClientAuth));
293-
pipeline.addFirst("idleStateHandler", new IdleStateHandler(nettyChannelTimeoutSeconds, 0, 0));
294-
pipeline.addAfter("idleStateHandler", "idleEventHandler", timeoutHandler);
295-
// pipeline.addLast("logger", new LoggingHandler("Netty", LogLevel.ERROR));
296-
pipeline.addFirst("bytemetrics", new BytesMetricsHandler(m_bytesMetricsCollector));
297-
pipeline.addLast("autoflush", new AutoFlushHandler(1, TimeUnit.SECONDS));
298-
pipeline.addLast("decoder", new MqttDecoder(maxBytesInMessage));
299-
pipeline.addLast("encoder", MqttEncoder.INSTANCE);
300-
pipeline.addLast("metrics", new MessageMetricsHandler(m_metricsCollector));
301-
pipeline.addLast("messageLogger", new MQTTMessageLogger());
302-
pipeline.addLast("handler", handler);
294+
configureMQTTPipeline(pipeline, timeoutHandler, handler);
303295
}
304296
});
305297
}
@@ -331,28 +323,21 @@ void init(SocketChannel channel) throws Exception {
331323
new WebSocketServerProtocolHandler("/mqtt", MQTT_SUBPROTOCOL_CSV_LIST));
332324
pipeline.addLast("ws2bytebufDecoder", new WebSocketFrameToByteBufDecoder());
333325
pipeline.addLast("bytebuf2wsEncoder", new ByteBufToWebSocketFrameEncoder());
334-
pipeline.addFirst("idleStateHandler", new IdleStateHandler(nettyChannelTimeoutSeconds, 0, 0));
335-
pipeline.addAfter("idleStateHandler", "idleEventHandler", timeoutHandler);
336-
pipeline.addFirst("bytemetrics", new BytesMetricsHandler(m_bytesMetricsCollector));
337-
pipeline.addLast("autoflush", new AutoFlushHandler(1, TimeUnit.SECONDS));
338-
pipeline.addLast("decoder", new MqttDecoder(maxBytesInMessage));
339-
pipeline.addLast("encoder", MqttEncoder.INSTANCE);
340-
pipeline.addLast("metrics", new MessageMetricsHandler(m_metricsCollector));
341-
pipeline.addLast("messageLogger", new MQTTMessageLogger());
342-
pipeline.addLast("handler", handler);
326+
327+
configureMQTTPipeline(pipeline, timeoutHandler, handler);
343328
}
344329
});
345330
}
346331

347332
@SuppressWarnings("FutureReturnValueIgnored")
348333
public void close() {
349334
LOG.debug("Closing Netty acceptor...");
350-
if (m_workerGroup == null || m_bossGroup == null) {
335+
if (workerGroup == null || bossGroup == null) {
351336
LOG.error("Netty acceptor is not initialized");
352337
throw new IllegalStateException("Invoked close on an Acceptor that wasn't initialized");
353338
}
354-
Future<?> workerWaiter = m_workerGroup.shutdownGracefully();
355-
Future<?> bossWaiter = m_bossGroup.shutdownGracefully();
339+
Future<?> workerWaiter = workerGroup.shutdownGracefully();
340+
Future<?> bossWaiter = bossGroup.shutdownGracefully();
356341

357342
/*
358343
* We shouldn't raise an IllegalStateException if we are interrupted. If we did so, the
@@ -366,20 +351,20 @@ public void close() {
366351
LOG.warn("An InterruptedException was caught while waiting for event loops to terminate...");
367352
}
368353

369-
if (!m_workerGroup.isTerminated()) {
354+
if (!workerGroup.isTerminated()) {
370355
LOG.warn("Forcing shutdown of worker event loop...");
371-
m_workerGroup.shutdownGracefully(0L, 0L, TimeUnit.MILLISECONDS);
356+
workerGroup.shutdownGracefully(0L, 0L, TimeUnit.MILLISECONDS);
372357
}
373358

374-
if (!m_bossGroup.isTerminated()) {
359+
if (!bossGroup.isTerminated()) {
375360
LOG.warn("Forcing shutdown of boss event loop...");
376-
m_bossGroup.shutdownGracefully(0L, 0L, TimeUnit.MILLISECONDS);
361+
bossGroup.shutdownGracefully(0L, 0L, TimeUnit.MILLISECONDS);
377362
}
378363

379-
MessageMetrics metrics = m_metricsCollector.computeMetrics();
380-
BytesMetrics bytesMetrics = m_bytesMetricsCollector.computeMetrics();
364+
MessageMetrics metrics = metricsCollector.computeMetrics();
365+
BytesMetrics bytesMetrics = bytesMetricsCollector.computeMetrics();
381366
LOG.info("Metrics messages[read={}, write={}] bytes[read={}, write={}]", metrics.messagesRead(),
382-
metrics.messagesWrote(), bytesMetrics.readBytes(), bytesMetrics.wroteBytes());
367+
metrics.messagesWrote(), bytesMetrics.readBytes(), bytesMetrics.wroteBytes());
383368
}
384369

385370
private ChannelHandler createSslHandler(SocketChannel channel, SslContext sslContext, boolean needsClientAuth) {

broker/src/main/java/io/moquette/server/netty/AutoFlushHandler.java

Lines changed: 9 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.netty.util.concurrent.EventExecutor;
2222
import org.slf4j.Logger;
2323
import org.slf4j.LoggerFactory;
24+
2425
import java.util.concurrent.ScheduledFuture;
2526
import java.util.concurrent.TimeUnit;
2627

@@ -49,7 +50,7 @@ public AutoFlushHandler(long writerIdleTime, TimeUnit unit) {
4950
}
5051

5152
@Override
52-
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
53+
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
5354
if (ctx.channel().isActive() && ctx.channel().isRegistered()) {
5455
// channelActive() event has been fired already, which means this.channelActive() will
5556
// not be invoked. We have to initialize here instead.
@@ -68,9 +69,9 @@ public void handlerRemoved(ChannelHandlerContext ctx) {
6869
@Override
6970
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
7071
// Initialize early if channel is active already.
71-
if (ctx.channel().isActive()) {
72-
initialize(ctx);
73-
}
72+
// if (ctx.channel().isActive()) {
73+
// initialize(ctx);
74+
// }
7475
super.channelRegistered(ctx);
7576
}
7677

@@ -79,7 +80,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
7980
// This method will be invoked only if this handler was added
8081
// before channelActive() event is fired. If a user adds this handler
8182
// after the channelActive() event, initialize() will be called by beforeAdd().
82-
initialize(ctx);
83+
// initialize(ctx);
8384
super.channelActive(ctx);
8485
}
8586

@@ -89,24 +90,6 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
8990
super.channelInactive(ctx);
9091
}
9192

92-
// @Override
93-
// public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
94-
// if (writerrIdleTimeNanos > 0) {
95-
// reading = true;
96-
// firstReaderIdleEvent = true;
97-
// }
98-
// ctx.fireChannelRead(msg);
99-
// }
100-
//
101-
// @Override
102-
// public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
103-
// if (writerrIdleTimeNanos > 0) {
104-
// lastReadTime = System.nanoTime();
105-
// reading = false;
106-
// }
107-
// ctx.fireChannelReadComplete();
108-
// }
109-
11093
private void initialize(ChannelHandlerContext ctx) {
11194
// Avoid the case where destroy() is called before scheduling timeouts.
11295
// See: https://github.com/netty/netty/issues/143
@@ -140,13 +123,12 @@ private void destroy() {
140123
/**
141124
* Is called when the write timeout expire.
142125
*
143-
* @param ctx
144-
* the channel context.
126+
* @param ctx the channel context.
145127
*/
146128
private void channelIdle(ChannelHandlerContext ctx) {
147129
// ctx.fireUserEventTriggered(evt);
148-
if (LOG.isDebugEnabled()) {
149-
LOG.debug("Flushing idle Netty channel {} Cid: {}", ctx.channel(), NettyUtils.clientID(ctx.channel()));
130+
if (LOG.isTraceEnabled()) {
131+
LOG.trace("Flushing idle Netty channel {} Cid: {}", ctx.channel(), NettyUtils.clientID(ctx.channel()));
150132
}
151133
ctx.channel().flush();
152134
}

0 commit comments

Comments
 (0)