Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
zhou-hao committed Jan 12, 2021
2 parents 94c9b0e + cf9787f commit 7c22ccd
Showing 1 changed file with 28 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,38 +158,38 @@ private Mono<Tuple3<MqttConnection, DeviceOperator, MqttConnectionSession>> hand
MqttConnection connection) {
return Mono
.fromCallable(() -> {
String deviceId = device.getDeviceId();
if (resp.isSuccess()) {
counter.increment();
DeviceSession session = sessionManager.getSession(deviceId);
MqttConnectionSession newSession = new MqttConnectionSession(deviceId, device, getTransport(), connection, gatewayMonitor);
if (null == session) {
sessionManager.register(newSession);
} else if (session instanceof ReplaceableDeviceSession) {
((ReplaceableDeviceSession) session).replaceWith(newSession);
}
gatewayMonitor.connected();
gatewayMonitor.totalConnection(counter.sum());
//监听断开连接
connection.onClose(conn -> {
counter.decrement();
DeviceSession _tmp = sessionManager.getSession(newSession.getId());

if (newSession == _tmp || _tmp == null) {
sessionManager.unregister(deviceId);
try {
String deviceId = device.getDeviceId();
if (resp.isSuccess()) {
counter.increment();
DeviceSession session = sessionManager.getSession(deviceId);
MqttConnectionSession newSession = new MqttConnectionSession(deviceId, device, getTransport(), connection, gatewayMonitor);
if (null == session) {
sessionManager.register(newSession);
} else if (session instanceof ReplaceableDeviceSession) {
((ReplaceableDeviceSession) session).replaceWith(newSession);
}
gatewayMonitor.disconnected();
gatewayMonitor.connected();
gatewayMonitor.totalConnection(counter.sum());
});
try {
//监听断开连接
connection.onClose(conn -> {
counter.decrement();
DeviceSession _tmp = sessionManager.getSession(newSession.getId());

if (newSession == _tmp || _tmp == null) {
sessionManager.unregister(deviceId);
}
gatewayMonitor.disconnected();
gatewayMonitor.totalConnection(counter.sum());
});
return Tuples.of(connection.accept(), device, newSession);
} catch (IllegalStateException ignore) {

} else {
log.warn("MQTT客户端认证[{}]失败:{}", deviceId, resp.getMessage());
connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
gatewayMonitor.rejected();
}
} else {
connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
gatewayMonitor.rejected();
log.warn("MQTT客户端认证[{}]失败:{}", deviceId, resp.getMessage());
} catch (IllegalStateException ignore) {

}
return null;
})
Expand Down

0 comments on commit 7c22ccd

Please sign in to comment.