Skip to content

Commit a9d7610

Browse files
committed
Replaced some synchronized sections with use of java.util.concurrent structures.
1 parent cca0c67 commit a9d7610

File tree

6 files changed

+48
-69
lines changed

6 files changed

+48
-69
lines changed

core/src/main/java/com/glines/socketio/client/jre/SocketIOConnectionXHRBase.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,12 @@
2424
package com.glines.socketio.client.jre;
2525

2626
import java.io.IOException;
27+
import java.util.Collection;
2728
import java.util.LinkedList;
29+
import java.util.concurrent.BlockingQueue;
2830
import java.util.concurrent.Executors;
2931
import java.util.concurrent.Future;
32+
import java.util.concurrent.LinkedBlockingQueue;
3033
import java.util.concurrent.ScheduledExecutorService;
3134
import java.util.concurrent.TimeUnit;
3235
import java.util.concurrent.atomic.AtomicBoolean;
@@ -60,7 +63,7 @@ public class SocketIOConnectionXHRBase implements SocketIOConnection {
6063
protected HttpExchange currentGet;
6164
protected HttpExchange currentPost;
6265
protected final boolean isConnectionPersistent;
63-
protected LinkedList<SocketIOFrame> queue = new LinkedList<SocketIOFrame>();
66+
protected BlockingQueue<SocketIOFrame> queue = new LinkedBlockingQueue<SocketIOFrame>();
6467
protected AtomicBoolean doingSend = new AtomicBoolean(false);
6568
protected AtomicLong messageId = new AtomicLong(0);
6669
protected String closeId = null;
@@ -180,21 +183,18 @@ public void sendMessage(int messageType, String message) throws SocketIOExceptio
180183

181184
protected void sendFrame(SocketIOFrame frame) {
182185
messageId.incrementAndGet();
183-
synchronized (queue) {
184-
queue.add(frame);
185-
}
186+
queue.offer(frame);
186187
checkSend();
187188
}
188189

189190
protected void checkSend() {
190191
if (doingSend.compareAndSet(false, true)) {
191192
StringBuilder str = new StringBuilder();
192-
synchronized(queue) {
193-
for (SocketIOFrame frame: queue) {
193+
Collection<SocketIOFrame> elements = new LinkedList<SocketIOFrame>();
194+
queue.drainTo(elements);
195+
for (SocketIOFrame frame: elements) {
194196
str.append(frame.encode());
195197
}
196-
queue.clear();
197-
}
198198
doSend(str.toString());
199199
}
200200
}

core/src/main/java/com/glines/socketio/server/SocketIOServlet.java

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,8 @@
2929
import java.util.HashMap;
3030
import java.util.Map;
3131

32-
import javax.servlet.GenericServlet;
3332
import javax.servlet.ServletException;
34-
import javax.servlet.ServletRequest;
35-
import javax.servlet.ServletResponse;
36-
import javax.servlet.http.Cookie;
33+
import javax.servlet.http.HttpServlet;
3734
import javax.servlet.http.HttpServletRequest;
3835
import javax.servlet.http.HttpServletResponse;
3936

@@ -48,7 +45,7 @@
4845

4946
/**
5047
*/
51-
public abstract class SocketIOServlet extends GenericServlet {
48+
public abstract class SocketIOServlet extends HttpServlet {
5249
public static final String BUFFER_SIZE_INIT_PARAM = "bufferSize";
5350
public static final String MAX_IDLE_TIME_INIT_PARAM = "maxIdleTime";
5451
public static final int BUFFER_SIZE_DEFAULT = 8192;
@@ -84,19 +81,17 @@ public void init() throws ServletException {
8481
}
8582
}
8683

84+
@Override
85+
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
86+
serve(req, resp);
87+
}
8788

8889
@Override
89-
public void service(ServletRequest request, ServletResponse response)
90-
throws ServletException, IOException {
91-
service((HttpServletRequest)request, (HttpServletResponse)response);
90+
protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
91+
serve(req, resp);
9292
}
9393

94-
protected void service(HttpServletRequest request, HttpServletResponse response)
95-
throws ServletException, IOException {
96-
if ("OPTIONS".equals(request.getMethod())) {
97-
// TODO: process and reply to CORS preflight request.
98-
return;
99-
}
94+
private void serve(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
10095

10196
String path = request.getPathInfo();
10297
if (path == null || path.length() == 0 || "/".equals(path)) {
@@ -142,6 +137,5 @@ public void destroy() {
142137
* Returns an instance of SocketIOInbound or null if the connection is to be denied.
143138
* The value of cookies and protocols may be null.
144139
*/
145-
protected abstract SocketIOInbound doSocketIOConnect(
146-
HttpServletRequest request, String[] protocols);
140+
protected abstract SocketIOInbound doSocketIOConnect(HttpServletRequest request, String[] protocols);
147141
}

samples/broadcast/src/main/java/com/glines/socketio/sample/broadcast/BroadcastSocketServlet.java

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,14 @@
2424
package com.glines.socketio.sample.broadcast;
2525

2626
import java.io.IOException;
27+
import java.util.Collection;
2728
import java.util.HashSet;
29+
import java.util.LinkedList;
30+
import java.util.Queue;
2831
import java.util.Set;
32+
import java.util.concurrent.BlockingQueue;
33+
import java.util.concurrent.ConcurrentLinkedQueue;
34+
import java.util.concurrent.LinkedBlockingQueue;
2935

3036
import javax.servlet.http.Cookie;
3137
import javax.servlet.http.HttpServletRequest;
@@ -38,34 +44,27 @@
3844

3945
public class BroadcastSocketServlet extends SocketIOServlet {
4046
private static final long serialVersionUID = 1L;
41-
private Set<BroadcastConnection> connections = new HashSet<BroadcastConnection>();
47+
private Queue<BroadcastConnection> connections = new ConcurrentLinkedQueue<BroadcastConnection>();
4248

4349
private class BroadcastConnection implements SocketIOInbound {
44-
private SocketIOOutbound outbound = null;
50+
private volatile SocketIOOutbound outbound = null;
4551

4652
@Override
4753
public String getProtocol() {
48-
// TODO Auto-generated method stub
4954
return null;
5055
}
5156

5257
@Override
5358
public void onConnect(SocketIOOutbound outbound) {
5459
this.outbound = outbound;
55-
synchronized (connections) {
56-
connections.add(this);
57-
}
60+
connections.offer(this);
5861
}
5962

6063
@Override
6164
public void onDisconnect(DisconnectReason reason, String errorMessage) {
62-
synchronized(this) {
6365
this.outbound = null;
64-
}
65-
synchronized (connections) {
6666
connections.remove(this);
6767
}
68-
}
6968

7069
@Override
7170
public void onMessage(int messageType, String message) {
@@ -74,7 +73,6 @@ public void onMessage(int messageType, String message) {
7473

7574
private void broadcast(int messageType, String message) {
7675
Log.debug("Broadcasting: " + message);
77-
synchronized (connections) {
7876
for(BroadcastConnection c: connections) {
7977
if (c != this) {
8078
try {
@@ -86,7 +84,6 @@ private void broadcast(int messageType, String message) {
8684
}
8785
}
8886
}
89-
}
9087

9188
@Override
9289
protected SocketIOInbound doSocketIOConnect(HttpServletRequest request, String[] protocols) {

samples/chat-gwt/src/main/java/com/glines/socketio/sample/gwtchat/GWTChatSocketServlet.java

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,15 @@
2424
package com.glines.socketio.sample.gwtchat;
2525

2626
import java.io.IOException;
27+
import java.util.Collection;
2728
import java.util.Collections;
2829
import java.util.HashSet;
30+
import java.util.LinkedList;
31+
import java.util.Queue;
2932
import java.util.Set;
33+
import java.util.concurrent.BlockingQueue;
34+
import java.util.concurrent.ConcurrentLinkedQueue;
35+
import java.util.concurrent.LinkedBlockingQueue;
3036
import java.util.concurrent.atomic.AtomicInteger;
3137

3238
import javax.servlet.http.Cookie;
@@ -44,10 +50,10 @@
4450
public class GWTChatSocketServlet extends SocketIOServlet {
4551
private static final long serialVersionUID = 1L;
4652
private AtomicInteger ids = new AtomicInteger(1);
47-
private Set<GWTChatConnection> connections = new HashSet<GWTChatConnection>();
53+
private Queue<GWTChatConnection> connections = new ConcurrentLinkedQueue<GWTChatConnection>();
4854

4955
private class GWTChatConnection implements SocketIOInbound {
50-
private SocketIOOutbound outbound = null;
56+
private volatile SocketIOOutbound outbound = null;
5157
private Integer sessionId = ids.getAndIncrement();
5258

5359
@Override
@@ -58,9 +64,7 @@ public String getProtocol() {
5864
@Override
5965
public void onConnect(SocketIOOutbound outbound) {
6066
this.outbound = outbound;
61-
synchronized (connections) {
62-
connections.add(this);
63-
}
67+
connections.offer(this);
6468
try {
6569
outbound.sendMessage(SocketIOFrame.JSON_MESSAGE_TYPE, JSON.toString(
6670
Collections.singletonMap("welcome", "Welcome to GWT Chat!")));
@@ -73,12 +77,8 @@ public void onConnect(SocketIOOutbound outbound) {
7377

7478
@Override
7579
public void onDisconnect(DisconnectReason reason, String errorMessage) {
76-
synchronized(this) {
7780
this.outbound = null;
78-
}
79-
synchronized (connections) {
8081
connections.remove(this);
81-
}
8282
broadcast(SocketIOFrame.JSON_MESSAGE_TYPE, JSON.toString(
8383
Collections.singletonMap("announcement", sessionId + " disconnected")));
8484
}
@@ -116,7 +116,6 @@ public void onMessage(int messageType, String message) {
116116

117117
private void broadcast(int messageType, String message) {
118118
Log.debug("Broadcasting: " + message);
119-
synchronized (connections) {
120119
for(GWTChatConnection c: connections) {
121120
if (c != this) {
122121
try {
@@ -128,7 +127,6 @@ private void broadcast(int messageType, String message) {
128127
}
129128
}
130129
}
131-
}
132130

133131
@Override
134132
protected SocketIOInbound doSocketIOConnect(HttpServletRequest request, String[] protocols) {

samples/chat/src/main/java/com/glines/socketio/sample/chat/ChatSocketServlet.java

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,15 @@
2424
package com.glines.socketio.sample.chat;
2525

2626
import java.io.IOException;
27+
import java.util.Collection;
2728
import java.util.Collections;
2829
import java.util.HashSet;
30+
import java.util.LinkedList;
31+
import java.util.Queue;
2932
import java.util.Set;
33+
import java.util.concurrent.BlockingQueue;
34+
import java.util.concurrent.ConcurrentLinkedQueue;
35+
import java.util.concurrent.LinkedBlockingQueue;
3036
import java.util.concurrent.atomic.AtomicInteger;
3137

3238
import javax.servlet.http.HttpServletRequest;
@@ -43,10 +49,10 @@
4349
public class ChatSocketServlet extends SocketIOServlet {
4450
private static final long serialVersionUID = 1L;
4551
private AtomicInteger ids = new AtomicInteger(1);
46-
private Set<ChatConnection> connections = new HashSet<ChatConnection>();
52+
private Queue<ChatConnection> connections = new ConcurrentLinkedQueue<ChatConnection>();
4753

4854
private class ChatConnection implements SocketIOInbound {
49-
private SocketIOOutbound outbound = null;
55+
private volatile SocketIOOutbound outbound = null;
5056
private Integer sessionId = ids.getAndIncrement();
5157

5258
@Override
@@ -58,9 +64,7 @@ public String getProtocol() {
5864
@Override
5965
public void onConnect(SocketIOOutbound outbound) {
6066
this.outbound = outbound;
61-
synchronized (connections) {
62-
connections.add(this);
63-
}
67+
connections.offer(this);
6468
try {
6569
outbound.sendMessage(SocketIOFrame.JSON_MESSAGE_TYPE, JSON.toString(
6670
Collections.singletonMap("welcome", "Welcome to Socket.IO Chat!")));
@@ -73,12 +77,8 @@ public void onConnect(SocketIOOutbound outbound) {
7377

7478
@Override
7579
public void onDisconnect(DisconnectReason reason, String errorMessage) {
76-
synchronized(this) {
7780
this.outbound = null;
78-
}
79-
synchronized (connections) {
8081
connections.remove(this);
81-
}
8282
broadcast(SocketIOFrame.JSON_MESSAGE_TYPE, JSON.toString(
8383
Collections.singletonMap("announcement", sessionId + " disconnected")));
8484
}
@@ -150,7 +150,6 @@ public void onMessage(int messageType, String message) {
150150

151151
private void broadcast(int messageType, String message) {
152152
Log.debug("Broadcasting: " + message);
153-
synchronized (connections) {
154153
for(ChatConnection c: connections) {
155154
if (c != this) {
156155
try {
@@ -162,7 +161,6 @@ private void broadcast(int messageType, String message) {
162161
}
163162
}
164163
}
165-
}
166164

167165
@Override
168166
protected SocketIOInbound doSocketIOConnect(HttpServletRequest request, String[] protocols) {

samples/echo/src/main/java/com/glines/socketio/sample/echo/EchoSocketServlet.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import java.io.IOException;
2727
import java.util.HashSet;
2828
import java.util.Set;
29+
import java.util.concurrent.BlockingQueue;
30+
import java.util.concurrent.LinkedBlockingQueue;
2931

3032
import javax.servlet.http.Cookie;
3133
import javax.servlet.http.HttpServletRequest;
@@ -36,34 +38,24 @@
3638

3739
public class EchoSocketServlet extends SocketIOServlet {
3840
private static final long serialVersionUID = 1L;
39-
private Set<EchoConnection> connections = new HashSet<EchoConnection>();
4041

4142
private class EchoConnection implements SocketIOInbound {
42-
private SocketIOOutbound outbound = null;
43+
private volatile SocketIOOutbound outbound = null;
4344

4445
@Override
4546
public String getProtocol() {
46-
// TODO Auto-generated method stub
4747
return null;
4848
}
4949

5050
@Override
5151
public void onConnect(SocketIOOutbound outbound) {
5252
this.outbound = outbound;
53-
synchronized (connections) {
54-
connections.add(this);
55-
}
5653
}
5754

5855
@Override
5956
public void onDisconnect(DisconnectReason reason, String errorMessage) {
60-
synchronized(this) {
6157
this.outbound = null;
6258
}
63-
synchronized (connections) {
64-
connections.remove(this);
65-
}
66-
}
6759

6860
@Override
6961
public void onMessage(int messageType, String message) {

0 commit comments

Comments
 (0)