Skip to content

Commit 38f3a4e

Browse files
committed
Added close on idle timeout
1 parent 16dfb1c commit 38f3a4e

File tree

6 files changed

+135
-31
lines changed

6 files changed

+135
-31
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
<artifactId>websocket</artifactId>
1010
<packaging>jar</packaging>
1111
<name>websocket</name>
12-
<version>1.16.2</version>
12+
<version>1.16.6</version>
1313
<description>Red5 WebSocket plugin</description>
1414
<url>https://github.com/Red5/red5-websocket</url>
1515
<organization>

src/main/java/org/red5/net/websocket/SecureWebSocketConfiguration.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@ private SSLContext getSslContext() {
195195
log.debug("No ciphers");
196196
}
197197
// http://docs.oracle.com/javase/8/docs/technotes/guides/security/jsse/JSSERefGuide.html#SNIExamples
198+
// https://docs.oracle.com/javase/8/docs/technotes/guides/security/jsse/JSSERefGuide.html#SNIExtension
198199
log.debug("SNI extension enabled: {}", sniEnabled);
199200
List<SNIServerName> serverNames = params.getServerNames();
200201
if (serverNames != null) {

src/main/java/org/red5/net/websocket/WebSocketHandler.java

Lines changed: 57 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,12 @@
1818

1919
package org.red5.net.websocket;
2020

21+
import org.apache.mina.core.future.CloseFuture;
22+
import org.apache.mina.core.future.IoFutureListener;
2123
import org.apache.mina.core.service.IoHandlerAdapter;
24+
import org.apache.mina.core.session.IdleStatus;
2225
import org.apache.mina.core.session.IoSession;
26+
import org.apache.mina.core.write.WriteRequestQueue;
2327
import org.red5.net.websocket.model.WSMessage;
2428
import org.red5.server.plugin.PluginRegistry;
2529
import org.slf4j.Logger;
@@ -39,35 +43,67 @@ public class WebSocketHandler extends IoHandlerAdapter {
3943

4044
private static final Logger log = LoggerFactory.getLogger(WebSocketHandler.class);
4145

42-
/**
43-
* {@inheritDoc}
44-
*/
46+
/** {@inheritDoc} */
4547
@Override
4648
public void messageReceived(IoSession session, Object message) throws Exception {
47-
log.trace("Message received on session: {} {}", session.getId(), message);
49+
if (log.isTraceEnabled()) {
50+
log.trace("Message received (session: {}) {}", session.getId(), message);
51+
}
4852
if (message instanceof WSMessage) {
4953
WebSocketConnection conn = (WebSocketConnection) session.getAttribute(Constants.CONNECTION);
5054
if (conn != null) {
5155
conn.receive((WSMessage) message);
5256
}
57+
} else {
58+
log.trace("Non-WSMessage received {}", message);
5359
}
5460
}
5561

56-
/**
57-
* {@inheritDoc}
58-
*/
62+
/** {@inheritDoc} */
5963
@Override
6064
public void messageSent(IoSession session, Object message) throws Exception {
61-
log.trace("Message sent on session: {} {}", session.getId(), String.valueOf(message));
62-
log.trace("Session read: {} write: {}", session.getReadBytes(), session.getWrittenBytes());
65+
if (log.isTraceEnabled()) {
66+
log.trace("Message sent (session: {}) read: {} write: {}\n{}", session.getId(), session.getReadBytes(), session.getWrittenBytes(), String.valueOf(message));
67+
}
6368
}
6469

65-
/**
66-
* {@inheritDoc}
67-
*/
70+
/** {@inheritDoc} */
71+
@Override
72+
public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
73+
//if (log.isTraceEnabled()) {
74+
log.info("Idle (session: {}) local: {} remote: {}\nread: {} write: {}", session.getId(), session.getLocalAddress(), session.getRemoteAddress(), session.getReadBytes(), session.getWrittenBytes());
75+
//}
76+
// get the existing reference to a ws connection
77+
WebSocketConnection conn = (WebSocketConnection) session.getAttribute(Constants.CONNECTION);
78+
if (conn != null) {
79+
// close the idle socket
80+
conn.close();
81+
} else {
82+
log.debug("WebSocketConnection attribute was empty, force closing idle session: {}", session);
83+
// clear write queue
84+
WriteRequestQueue writeQueue = session.getWriteRequestQueue();
85+
if (!writeQueue.isEmpty(session)) {
86+
writeQueue.clear(session);
87+
}
88+
// force close the session
89+
final CloseFuture future = session.closeNow();
90+
IoFutureListener<CloseFuture> listener = new IoFutureListener<CloseFuture>() {
91+
92+
public void operationComplete(CloseFuture future) {
93+
// now connection should be closed
94+
log.debug("Close operation completed {}: {}", session.getId(), future.isClosed());
95+
future.removeListener(this);
96+
}
97+
98+
};
99+
future.addListener(listener);
100+
}
101+
}
102+
103+
/** {@inheritDoc} */
68104
@Override
69105
public void sessionClosed(IoSession session) throws Exception {
70-
log.trace("Session closed");
106+
log.trace("Session {} closed", session.getId());
71107
// remove connection from scope
72108
WebSocketConnection conn = (WebSocketConnection) session.removeAttribute(Constants.CONNECTION);
73109
if (conn != null) {
@@ -97,12 +133,16 @@ public void sessionClosed(IoSession session) throws Exception {
97133
super.sessionClosed(session);
98134
}
99135

100-
/**
101-
* {@inheritDoc}
102-
*/
136+
/** {@inheritDoc} */
103137
@Override
104138
public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
105-
log.error("exception", cause);
139+
log.warn("Exception (session: {})", session.getId(), cause);
140+
// get the existing reference to a ws connection
141+
WebSocketConnection conn = (WebSocketConnection) session.getAttribute(Constants.CONNECTION);
142+
if (conn != null) {
143+
// close the socket
144+
conn.close();
145+
}
106146
}
107147

108148
}

src/main/java/org/red5/net/websocket/WebSocketTransport.java

Lines changed: 71 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@
2727

2828
import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
2929
import org.apache.mina.core.service.IoHandlerAdapter;
30+
import org.apache.mina.core.service.IoService;
31+
import org.apache.mina.core.service.IoServiceListener;
32+
import org.apache.mina.core.session.IdleStatus;
33+
import org.apache.mina.core.session.IoSession;
3034
import org.apache.mina.filter.codec.ProtocolCodecFilter;
3135
import org.apache.mina.filter.logging.LoggingFilter;
3236
import org.apache.mina.filter.ssl.SslFilter;
@@ -59,6 +63,10 @@ public class WebSocketTransport implements InitializingBean, DisposableBean {
5963

6064
private Set<String> addresses = new HashSet<>();
6165

66+
private int writeTimeout = 30;
67+
68+
private int idleTimeout = 60;
69+
6270
private IoHandlerAdapter ioHandler;
6371

6472
private SocketAcceptor acceptor;
@@ -82,13 +90,53 @@ public class WebSocketTransport implements InitializingBean, DisposableBean {
8290
@Override
8391
public void afterPropertiesSet() throws Exception {
8492
// create the nio acceptor
85-
acceptor = new NioSocketAcceptor();
93+
acceptor = new NioSocketAcceptor(Runtime.getRuntime().availableProcessors() * 4);
94+
acceptor.addListener(new IoServiceListener() {
95+
96+
@Override
97+
public void serviceActivated(IoService service) throws Exception {
98+
//log.debug("serviceActivated: {}", service);
99+
}
100+
101+
@Override
102+
public void serviceIdle(IoService service, IdleStatus idleStatus) throws Exception {
103+
//logger.debug("serviceIdle: {} status: {}", service, idleStatus);
104+
}
105+
106+
@Override
107+
public void serviceDeactivated(IoService service) throws Exception {
108+
//log.debug("serviceDeactivated: {}", service);
109+
}
110+
111+
@Override
112+
public void sessionCreated(IoSession session) throws Exception {
113+
log.info("sessionCreated: {}", session);
114+
//log.trace("Acceptor sessions: {}", acceptor.getManagedSessions());
115+
}
116+
117+
@Override
118+
public void sessionClosed(IoSession session) throws Exception {
119+
log.info("sessionClosed: {}", session);
120+
}
121+
122+
@Override
123+
public void sessionDestroyed(IoSession session) throws Exception {
124+
//log.debug("sessionDestroyed: {}", session);
125+
}
126+
127+
});
86128
// configure the acceptor
87129
SocketSessionConfig sessionConf = acceptor.getSessionConfig();
88130
sessionConf.setReuseAddress(true);
89131
sessionConf.setTcpNoDelay(true);
90132
sessionConf.setSendBufferSize(sendBufferSize);
91133
sessionConf.setReadBufferSize(receiveBufferSize);
134+
// prevent the background blocking queue
135+
sessionConf.setUseReadOperation(false);
136+
// seconds
137+
sessionConf.setWriteTimeout(writeTimeout);
138+
// set an idle time of 30s
139+
sessionConf.setIdleTime(IdleStatus.BOTH_IDLE, idleTimeout);
92140
// close sessions when the acceptor is stopped
93141
acceptor.setCloseOnDeactivation(true);
94142
// requested maximum length of the queue of incoming connections
@@ -143,7 +191,7 @@ public void afterPropertiesSet() throws Exception {
143191
log.debug("Binding to {}", socketAddresses.toString());
144192
acceptor.bind(socketAddresses);
145193
} catch (Exception e) {
146-
log.error("Exception occurred during resolve / bind", e);
194+
log.warn("Exception occurred during resolve / bind", e);
147195
}
148196
}
149197
log.info("started {} websocket transport", (isSecure() ? "secure" : ""));
@@ -168,29 +216,44 @@ public void setAddresses(List<String> addrs) {
168216
}
169217

170218
/**
171-
* @param port
172-
* the port to set
219+
* @param port the port to set
173220
*/
174221
public void setPort(int port) {
175222
this.port = port;
176223
}
177224

178225
/**
179-
* @param sendBufferSize
180-
* the sendBufferSize to set
226+
* @param sendBufferSize the sendBufferSize to set
181227
*/
182228
public void setSendBufferSize(int sendBufferSize) {
183229
this.sendBufferSize = sendBufferSize;
184230
}
185231

186232
/**
187-
* @param receiveBufferSize
188-
* the receiveBufferSize to set
233+
* @param receiveBufferSize the receiveBufferSize to set
189234
*/
190235
public void setReceiveBufferSize(int receiveBufferSize) {
191236
this.receiveBufferSize = receiveBufferSize;
192237
}
193238

239+
/**
240+
* Write timeout.
241+
*
242+
* @param writeTimeout
243+
*/
244+
public void setWriteTimeout(int writeTimeout) {
245+
this.writeTimeout = writeTimeout;
246+
}
247+
248+
/**
249+
* Idle timeout.
250+
*
251+
* @param idleTimeout
252+
*/
253+
public void setIdleTimeout(int idleTimeout) {
254+
this.idleTimeout = idleTimeout;
255+
}
256+
194257
/**
195258
* @param connectionThreads the connectionThreads to set
196259
*/

src/main/java/org/red5/net/websocket/codec/WebSocketDecoder.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -138,9 +138,9 @@ protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput
138138
}
139139
} else {
140140
// session is known to be from a native socket. So simply wrap and pass through
141-
resultBuffer = IoBuffer.wrap(in.array(), 0, in.limit());
142-
in.position(in.limit());
143-
out.write(resultBuffer);
141+
byte[] arr = new byte[in.remaining()];
142+
in.get(arr);
143+
out.write(IoBuffer.wrap(arr));
144144
}
145145
return true;
146146
}
@@ -167,7 +167,7 @@ private boolean doHandShake(IoSession session, IoBuffer in) {
167167
// size for incoming bytes
168168
data = new byte[in.remaining()];
169169
// get incoming bytes
170-
in.get(data, 0, data.length);
170+
in.get(data);
171171
}
172172
// ensure the incoming data is complete (ends with crlfcrlf)
173173
byte[] tail = Arrays.copyOfRange(data, data.length - 4, data.length);

src/main/java/org/red5/net/websocket/model/WSMessage.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public void setPayload(IoBuffer payload) {
9494
*/
9595
public void addPayload(IoBuffer additionalPayload) {
9696
if (payload == null) {
97-
payload = IoBuffer.allocate(additionalPayload.limit());
97+
payload = IoBuffer.allocate(additionalPayload.remaining());
9898
payload.setAutoExpand(true);
9999
}
100100
this.payload.put(additionalPayload);

0 commit comments

Comments
 (0)