Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix issue1212 #1213

Merged
merged 3 commits into from
Apr 1, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.dromara.soul.admin.listener.websocket;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.dromara.soul.admin.service.SyncDataService;
import org.dromara.soul.admin.spring.SpringBeanUtils;
Expand All @@ -31,6 +32,7 @@
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;

Expand All @@ -42,7 +44,7 @@
* @since 2.0.0
*/
@Slf4j
@ServerEndpoint("/websocket")
@ServerEndpoint(value = "/websocket", configurator = WebsocketConfigurator.class)
public class WebsocketCollector {

private static final Set<Session> SESSION_SET = new CopyOnWriteArraySet<>();
Expand All @@ -56,10 +58,22 @@ public class WebsocketCollector {
*/
@OnOpen
public void onOpen(final Session session) {
log.info("websocket on open successful....");
log.info("websocket on client[{}] open successful....", getClientIp(session));
SESSION_SET.add(session);
}

private static String getClientIp(final Session session) {
Map<String, Object> userProperties = session.getUserProperties();
if (MapUtils.isEmpty(userProperties)) {
return StringUtils.EMPTY;
}
Object ipObject = userProperties.get(WebsocketListener.CLIENT_IP_NAME);
if (null == ipObject) {
return StringUtils.EMPTY;
}
return ipObject.toString();
}

/**
* On message.
*
Expand Down Expand Up @@ -87,6 +101,7 @@ public void onMessage(final String message, final Session session) {
public void onClose(final Session session) {
SESSION_SET.remove(session);
ThreadLocalUtil.clear();
log.warn("websocket close on client[{}]", getClientIp(session));
}

/**
Expand All @@ -99,7 +114,7 @@ public void onClose(final Session session) {
public void onError(final Session session, final Throwable error) {
SESSION_SET.remove(session);
ThreadLocalUtil.clear();
log.error("websocket collection error: ", error);
log.error("websocket collection on client[{}] error: ", getClientIp(session), error);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.dromara.soul.admin.listener.websocket;

import lombok.extern.slf4j.Slf4j;

import javax.servlet.http.HttpSession;
import javax.websocket.HandshakeResponse;
import javax.websocket.server.HandshakeRequest;
import javax.websocket.server.ServerEndpointConfig;

/**
* The Websocket configurator.
*
* @author xiaoyu(Myth)
* @author huangxiaofeng
* @since 2.0.0
*/
@Slf4j
public class WebsocketConfigurator extends ServerEndpointConfig.Configurator {

@Override
public void modifyHandshake(final ServerEndpointConfig sec, final HandshakeRequest request, final HandshakeResponse response) {
HttpSession httpSession = (HttpSession) request.getHttpSession();
// 把HttpSession中保存的ClientIP放到ServerEndpointConfig中,关键字可以跟之前不同
sec.getUserProperties().put(WebsocketListener.CLIENT_IP_NAME, httpSession.getAttribute(WebsocketListener.CLIENT_IP_NAME));
super.modifyHandshake(sec, request, response);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.dromara.soul.admin.listener.websocket;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;

import javax.servlet.ServletRequestEvent;
import javax.servlet.ServletRequestListener;
import javax.servlet.annotation.WebListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpSession;

/**
* The Websocket listener.
*
* @author xiaoyu(Myth)
* @author huangxiaofeng
* @since 2.0.0
*/
@Slf4j
@WebListener
@Configuration
public class WebsocketListener implements ServletRequestListener {

public static final String CLIENT_IP_NAME = "ClientIP";

@Override
public void requestDestroyed(final ServletRequestEvent sre) {
try {
HttpServletRequest request = (HttpServletRequest) sre.getServletRequest();
if (null != request && null != request.getSession()) {
HttpSession session = request.getSession();
request.removeAttribute(CLIENT_IP_NAME);
session.removeAttribute(CLIENT_IP_NAME);
}
} catch (Exception e) {
log.error("", e);
}
}

@Override
public void requestInitialized(final ServletRequestEvent sre) {
try {
HttpServletRequest request = (HttpServletRequest) sre.getServletRequest();
if (null != request && null != request.getSession()) {
HttpSession session = request.getSession();
request.setAttribute(CLIENT_IP_NAME, sre.getServletRequest().getRemoteAddr());
session.setAttribute(CLIENT_IP_NAME, sre.getServletRequest().getRemoteAddr());
}
} catch (Exception e) {
log.error("", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.dromara.soul.plugin.sync.data.websocket;

import java.util.Objects;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.dromara.soul.common.concurrent.SoulThreadFactory;
Expand All @@ -33,6 +32,7 @@
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

Expand All @@ -47,7 +47,7 @@ public class WebsocketSyncDataService implements SyncDataService, AutoCloseable
private final List<WebSocketClient> clients = new ArrayList<>();

private final ScheduledThreadPoolExecutor executor;

/**
* Instantiates a new Websocket sync cache.
*
Expand All @@ -71,7 +71,7 @@ public WebsocketSyncDataService(final WebsocketConfig websocketConfig,
}
try {
for (WebSocketClient client : clients) {
boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS);
boolean success = client.connectBlocking(3, TimeUnit.SECONDS);
if (success) {
log.info("websocket connection is successful.....");
} else {
Expand All @@ -82,23 +82,26 @@ public WebsocketSyncDataService(final WebsocketConfig websocketConfig,
if (client.isClosed()) {
boolean reconnectSuccess = client.reconnectBlocking();
if (reconnectSuccess) {
log.info("websocket reconnect is successful.....");
log.info("websocket reconnect server[{}] is successful.....", client.getURI().toString());
} else {
log.error("websocket reconnection is error.....");
log.error("websocket reconnection server[{}] is error.....", client.getURI().toString());
}
} else {
client.sendPing();
log.info("websocket send to [{}] ping message successful", client.getURI().toString());
}
} catch (InterruptedException e) {
log.error("websocket connect is error :{}", e.getMessage());
}
}, 10, 30, TimeUnit.SECONDS);
}, 10, 10, TimeUnit.SECONDS);
}
/* client.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxyaddress", 80)));*/
} catch (InterruptedException e) {
log.info("websocket connection...exception....", e);
}

}

@Override
public void close() {
for (WebSocketClient client : clients) {
Expand Down