Skip to content
This repository was archived by the owner on Apr 12, 2020. It is now read-only.

Commit c248096

Browse files
komamitsuandsel
authored andcommitted
Flush written data in MQTTConnection#sendIfWritableElseDrop (moquette-io#454)
* Flush written data in MQTTConnection#sendIfWritableElseDrop * Add a new configuration `immediateBufferFlush` * Add BrokerConfigurationTest * Fix NPE in SessionTest
1 parent e59217b commit c248096

12 files changed

+116
-14
lines changed

broker/src/main/java/io/moquette/BrokerConstants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ public final class BrokerConstants {
6868
public static final String NETTY_EPOLL_PROPERTY_NAME = "netty.epoll";
6969
public static final String NETTY_MAX_BYTES_PROPERTY_NAME = "netty.mqtt.message_size";
7070
public static final int DEFAULT_NETTY_MAX_BYTES_IN_MESSAGE = 8092;
71+
public static final String IMMEDIATE_BUFFER_FLUSH_PROPERTY_NAME = "immediate_buffer_flush";
7172
public static final String METRICS_ENABLE_PROPERTY_NAME = "use_metrics";
7273
public static final String METRICS_LIBRATO_EMAIL_PROPERTY_NAME = "metrics.librato.email";
7374
public static final String METRICS_LIBRATO_TOKEN_PROPERTY_NAME = "metrics.librato.token";

broker/src/main/java/io/moquette/broker/BrokerConfiguration.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,21 @@ class BrokerConfiguration {
2323
private final boolean allowAnonymous;
2424
private final boolean allowZeroByteClientId;
2525
private final boolean reauthorizeSubscriptionsOnConnect;
26+
private final boolean immediateBufferFlush;
2627

2728
BrokerConfiguration(IConfig props) {
2829
allowAnonymous = props.boolProp(BrokerConstants.ALLOW_ANONYMOUS_PROPERTY_NAME, true);
2930
allowZeroByteClientId = props.boolProp(BrokerConstants.ALLOW_ZERO_BYTE_CLIENT_ID_PROPERTY_NAME, false);
3031
reauthorizeSubscriptionsOnConnect = props.boolProp(BrokerConstants.REAUTHORIZE_SUBSCRIPTIONS_ON_CONNECT, false);
32+
immediateBufferFlush = props.boolProp(BrokerConstants.IMMEDIATE_BUFFER_FLUSH_PROPERTY_NAME, false);
3133
}
3234

3335
public BrokerConfiguration(boolean allowAnonymous, boolean allowZeroByteClientId,
34-
boolean reauthorizeSubscriptionsOnConnect) {
36+
boolean reauthorizeSubscriptionsOnConnect, boolean immediateBufferFlush) {
3537
this.allowAnonymous = allowAnonymous;
3638
this.allowZeroByteClientId = allowZeroByteClientId;
3739
this.reauthorizeSubscriptionsOnConnect = reauthorizeSubscriptionsOnConnect;
40+
this.immediateBufferFlush = immediateBufferFlush;
3841
}
3942

4043
public boolean isAllowAnonymous() {
@@ -48,4 +51,8 @@ public boolean isAllowZeroByteClientId() {
4851
public boolean isReauthorizeSubscriptionsOnConnect() {
4952
return reauthorizeSubscriptionsOnConnect;
5053
}
54+
55+
public boolean isImmediateBufferFlush() {
56+
return immediateBufferFlush;
57+
}
5158
}

broker/src/main/java/io/moquette/broker/MQTTConnection.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.moquette.broker.security.IAuthenticator;
2020
import io.netty.buffer.ByteBuf;
2121
import io.netty.channel.Channel;
22+
import io.netty.channel.ChannelFuture;
2223
import io.netty.channel.ChannelPipeline;
2324
import io.netty.handler.codec.mqtt.*;
2425
import io.netty.handler.timeout.IdleStateHandler;
@@ -402,7 +403,14 @@ void sendIfWritableElseDrop(MqttMessage msg) {
402403
LOG.debug("OUT {} on channel {}", msg.fixedHeader().messageType(), channel);
403404
}
404405
if (channel.isWritable()) {
405-
channel.write(msg).addListener(FIRE_EXCEPTION_ON_FAILURE);
406+
ChannelFuture channelFuture;
407+
if (brokerConfig.isImmediateBufferFlush()) {
408+
channelFuture = channel.writeAndFlush(msg);
409+
}
410+
else {
411+
channelFuture = channel.write(msg);
412+
}
413+
channelFuture.addListener(FIRE_EXCEPTION_ON_FAILURE);
406414
}
407415
}
408416

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Copyright (c) 2012-2018 The original author or authors
3+
* ------------------------------------------------------
4+
* All rights reserved. This program and the accompanying materials
5+
* are made available under the terms of the Eclipse Public License v1.0
6+
* and Apache License v2.0 which accompanies this distribution.
7+
*
8+
* The Eclipse Public License is available at
9+
* http://www.eclipse.org/legal/epl-v10.html
10+
*
11+
* The Apache License v2.0 is available at
12+
* http://www.opensource.org/licenses/apache2.0.php
13+
*
14+
* You may elect to redistribute this code under either of these licenses.
15+
*/
16+
package io.moquette.broker;
17+
18+
import io.moquette.BrokerConstants;
19+
import io.moquette.broker.config.MemoryConfig;
20+
import org.junit.Test;
21+
22+
import java.util.Properties;
23+
24+
import static org.junit.Assert.*;
25+
26+
public class BrokerConfigurationTest {
27+
28+
@Test
29+
public void defaultConfig() {
30+
MemoryConfig config = new MemoryConfig(new Properties());
31+
BrokerConfiguration brokerConfiguration = new BrokerConfiguration(config);
32+
assertTrue(brokerConfiguration.isAllowAnonymous());
33+
assertFalse(brokerConfiguration.isAllowZeroByteClientId());
34+
assertFalse(brokerConfiguration.isReauthorizeSubscriptionsOnConnect());
35+
assertFalse(brokerConfiguration.isImmediateBufferFlush());
36+
}
37+
38+
@Test
39+
public void configureAllowAnonymous() {
40+
Properties properties = new Properties();
41+
properties.put(BrokerConstants.ALLOW_ANONYMOUS_PROPERTY_NAME, "false");
42+
MemoryConfig config = new MemoryConfig(properties);
43+
BrokerConfiguration brokerConfiguration = new BrokerConfiguration(config);
44+
assertFalse(brokerConfiguration.isAllowAnonymous());
45+
assertFalse(brokerConfiguration.isAllowZeroByteClientId());
46+
assertFalse(brokerConfiguration.isReauthorizeSubscriptionsOnConnect());
47+
assertFalse(brokerConfiguration.isImmediateBufferFlush());
48+
}
49+
50+
@Test
51+
public void configureAllowZeroByteClientId() {
52+
Properties properties = new Properties();
53+
properties.put(BrokerConstants.ALLOW_ZERO_BYTE_CLIENT_ID_PROPERTY_NAME, "true");
54+
MemoryConfig config = new MemoryConfig(properties);
55+
BrokerConfiguration brokerConfiguration = new BrokerConfiguration(config);
56+
assertTrue(brokerConfiguration.isAllowAnonymous());
57+
assertTrue(brokerConfiguration.isAllowZeroByteClientId());
58+
assertFalse(brokerConfiguration.isReauthorizeSubscriptionsOnConnect());
59+
assertFalse(brokerConfiguration.isImmediateBufferFlush());
60+
}
61+
62+
@Test
63+
public void configureReauthorizeSubscriptionsOnConnect() {
64+
Properties properties = new Properties();
65+
properties.put(BrokerConstants.REAUTHORIZE_SUBSCRIPTIONS_ON_CONNECT, "true");
66+
MemoryConfig config = new MemoryConfig(properties);
67+
BrokerConfiguration brokerConfiguration = new BrokerConfiguration(config);
68+
assertTrue(brokerConfiguration.isAllowAnonymous());
69+
assertFalse(brokerConfiguration.isAllowZeroByteClientId());
70+
assertTrue(brokerConfiguration.isReauthorizeSubscriptionsOnConnect());
71+
assertFalse(brokerConfiguration.isImmediateBufferFlush());
72+
}
73+
74+
@Test
75+
public void configureImmediateBufferFlush() {
76+
Properties properties = new Properties();
77+
properties.put(BrokerConstants.IMMEDIATE_BUFFER_FLUSH_PROPERTY_NAME, "true");
78+
MemoryConfig config = new MemoryConfig(properties);
79+
BrokerConfiguration brokerConfiguration = new BrokerConfiguration(config);
80+
assertTrue(brokerConfiguration.isAllowAnonymous());
81+
assertFalse(brokerConfiguration.isAllowZeroByteClientId());
82+
assertFalse(brokerConfiguration.isReauthorizeSubscriptionsOnConnect());
83+
assertTrue(brokerConfiguration.isImmediateBufferFlush());
84+
}
85+
}

broker/src/test/java/io/moquette/broker/MQTTConnectionConnectTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public class MQTTConnectionConnectTest {
4949
private EmbeddedChannel channel;
5050
private SessionRegistry sessionRegistry;
5151
private MqttMessageBuilders.ConnectBuilder connMsg;
52-
private static final BrokerConfiguration CONFIG = new BrokerConfiguration(true, true, false);
52+
private static final BrokerConfiguration CONFIG = new BrokerConfiguration(true, true, false, false);
5353
private IAuthenticator mockAuthenticator;
5454
private PostOffice postOffice;
5555
private MemoryQueueRepository queueRepository;
@@ -205,7 +205,7 @@ public void noPasswdAuthentication() {
205205
@Test
206206
public void prohibitAnonymousClient() {
207207
MqttConnectMessage msg = connMsg.clientId(FAKE_CLIENT_ID).build();
208-
BrokerConfiguration config = new BrokerConfiguration(false, true, false);
208+
BrokerConfiguration config = new BrokerConfiguration(false, true, false, false);
209209

210210
sut = createMQTTConnection(config);
211211
channel = (EmbeddedChannel) sut.channel;
@@ -223,7 +223,7 @@ public void prohibitAnonymousClient_providingUsername() {
223223
MqttConnectMessage msg = connMsg.clientId(FAKE_CLIENT_ID)
224224
.username(TEST_USER + "_fake")
225225
.build();
226-
BrokerConfiguration config = new BrokerConfiguration(false, true, false);
226+
BrokerConfiguration config = new BrokerConfiguration(false, true, false, false);
227227

228228
createMQTTConnection(config);
229229

@@ -237,7 +237,7 @@ public void prohibitAnonymousClient_providingUsername() {
237237

238238
@Test
239239
public void testZeroByteClientIdNotAllowed() {
240-
BrokerConfiguration config = new BrokerConfiguration(false, false, false);
240+
BrokerConfiguration config = new BrokerConfiguration(false, false, false, false);
241241

242242
sut = createMQTTConnection(config);
243243
channel = (EmbeddedChannel) sut.channel;
@@ -288,7 +288,7 @@ public void testBindWithSameClientIDBadCredentialsDoesntDropExistingClient() {
288288
EmbeddedChannel evilChannel = new EmbeddedChannel();
289289

290290
// Exercise
291-
BrokerConfiguration config = new BrokerConfiguration(true, true, false);
291+
BrokerConfiguration config = new BrokerConfiguration(true, true, false, false);
292292
final MQTTConnection evilConnection = createMQTTConnection(config, evilChannel, postOffice);
293293
evilConnection.processConnect(evilClientConnMsg);
294294

broker/src/test/java/io/moquette/broker/MQTTConnectionPublishTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public class MQTTConnectionPublishTest {
5151
public void setUp() {
5252
connMsg = MqttMessageBuilders.connect().protocolVersion(MqttVersion.MQTT_3_1).cleanSession(true);
5353

54-
BrokerConfiguration config = new BrokerConfiguration(true, true, false);
54+
BrokerConfiguration config = new BrokerConfiguration(true, true, false, false);
5555

5656
createMQTTConnection(config);
5757
}

broker/src/test/java/io/moquette/broker/PostOfficeInternalPublishTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public class PostOfficeInternalPublishTest {
5252
private SessionRegistry sessionRegistry;
5353
private MockAuthenticator mockAuthenticator;
5454
private static final BrokerConfiguration ALLOW_ANONYMOUS_AND_ZERO_BYTES_CLID =
55-
new BrokerConfiguration(true, true, false);
55+
new BrokerConfiguration(true, true, false, false);
5656
private MemoryRetainedRepository retainedRepository;
5757
private MemoryQueueRepository queueRepository;
5858

broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public class PostOfficePublishTest {
6262
private SessionRegistry sessionRegistry;
6363
private MockAuthenticator mockAuthenticator;
6464
static final BrokerConfiguration ALLOW_ANONYMOUS_AND_ZERO_BYTES_CLID =
65-
new BrokerConfiguration(true, true, false);
65+
new BrokerConfiguration(true, true, false, false);
6666
private MemoryRetainedRepository retainedRepository;
6767
private MemoryQueueRepository queueRepository;
6868

broker/src/test/java/io/moquette/broker/PostOfficeSubscribeTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public class PostOfficeSubscribeTest {
6464
private MqttConnectMessage connectMessage;
6565
private IAuthenticator mockAuthenticator;
6666
private SessionRegistry sessionRegistry;
67-
public static final BrokerConfiguration CONFIG = new BrokerConfiguration(true, true, false);
67+
public static final BrokerConfiguration CONFIG = new BrokerConfiguration(true, true, false, false);
6868
private MemoryQueueRepository queueRepository;
6969

7070
@Before

broker/src/test/java/io/moquette/broker/PostOfficeUnsubscribeTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public class PostOfficeUnsubscribeTest {
5555
private MqttConnectMessage connectMessage;
5656
private IAuthenticator mockAuthenticator;
5757
private SessionRegistry sessionRegistry;
58-
public static final BrokerConfiguration CONFIG = new BrokerConfiguration(true, true, false);
58+
public static final BrokerConfiguration CONFIG = new BrokerConfiguration(true, true, false, false);
5959
private MemoryQueueRepository queueRepository;
6060

6161
@Before

0 commit comments

Comments
 (0)