Skip to content
This repository was archived by the owner on Apr 12, 2020. It is now read-only.

Commit ea06315

Browse files
zhangliangboandsel
authored andcommitted
1 parent 1de5322 commit ea06315

File tree

3 files changed

+29
-4
lines changed

3 files changed

+29
-4
lines changed

broker/src/main/java/io/moquette/broker/MQTTConnection.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,8 @@ void processConnect(MqttConnectMessage msg) {
171171

172172
NettyUtils.clientID(channel, clientId);
173173
LOG.trace("CONNACK sent, channel: {}", channel);
174+
postOffice.dispatchConnection(msg);
175+
LOG.trace("dispatch connection: {}", msg.toString());
174176
} catch (SessionCorruptedException scex) {
175177
LOG.warn("MQTT session for client ID {} cannot be created, channel: {}", clientId, channel);
176178
abortConnection(CONNECTION_REFUSED_SERVER_UNAVAILABLE);
@@ -257,6 +259,10 @@ void handleConnectionLost() {
257259
sessionRegistry.disconnect(clientID);
258260
}
259261
connected = false;
262+
//dispatch connection lost to intercept.
263+
String userName= NettyUtils.userName(channel);
264+
postOffice.dispatchConnectionLost(clientID,userName);
265+
LOG.trace("dispatch disconnection: clientId={}, userName={}", clientID, userName);
260266
}
261267

262268
void sendConnAck(boolean isSessionAlreadyPresent) {
@@ -284,6 +290,9 @@ void processDisconnect(MqttMessage msg) {
284290
connected = false;
285291
channel.close().addListener(FIRE_EXCEPTION_ON_FAILURE);
286292
LOG.trace("Processed DISCONNECT CId={}, channel: {}", clientID, channel);
293+
String userName=NettyUtils.userName(channel);
294+
postOffice.dispatchDisconnection(clientID,userName);
295+
LOG.trace("dispatch disconnection: clientId={}, userName={}", clientID, userName);
287296
}
288297

289298
void processSubscribe(MqttSubscribeMessage msg) {

broker/src/main/java/io/moquette/broker/PostOffice.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,4 +301,20 @@ public void internalPublish(MqttPublishMessage msg) {
301301
}
302302
retainedRepository.retain(topic, msg);
303303
}
304+
305+
/**
306+
* notify MqttConnectMessage after connection established (already pass login).
307+
* @param msg
308+
*/
309+
void dispatchConnection(MqttConnectMessage msg){
310+
interceptor.notifyClientConnected(msg);
311+
}
312+
313+
void dispatchDisconnection(String clientId,String userName){
314+
interceptor.notifyClientDisconnected(clientId,userName);
315+
}
316+
317+
void dispatchConnectionLost(String clientId,String userName){
318+
interceptor.notifyClientConnectionLost(clientId,userName);
319+
}
304320
}

broker/src/main/java/io/moquette/broker/security/DenyAllAuthorizatorPolicy.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,12 @@
2121
public class DenyAllAuthorizatorPolicy implements IAuthorizatorPolicy {
2222

2323
@Override
24-
public boolean canWrite(Topic topic, String user, String client) {
25-
return true;
24+
public boolean canRead(Topic topic, String user, String client) {
25+
return false;
2626
}
2727

2828
@Override
29-
public boolean canRead(Topic topic, String user, String client) {
30-
return true;
29+
public boolean canWrite(Topic topic, String user, String client) {
30+
return false;
3131
}
3232
}

0 commit comments

Comments
 (0)