Skip to content

Commit 2db792f

Browse files
committed
Initialize project
0 parents  commit 2db792f

11 files changed

Lines changed: 533 additions & 0 deletions

File tree

.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
/target/
2+
/.settings/
3+
/.classpath
4+
/.project

pom.xml

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
2+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
3+
<modelVersion>4.0.0</modelVersion>
4+
<groupId>org.codenergic.simcat</groupId>
5+
<artifactId>eventbus-java-client</artifactId>
6+
<version>0.0.1-SNAPSHOT</version>
7+
<name>Vert.x EventBus Client</name>
8+
<description>Java port of vertx3-eventbus-client.js</description>
9+
10+
<dependencies>
11+
<dependency>
12+
<groupId>junit</groupId>
13+
<artifactId>junit</artifactId>
14+
<version>4.12</version>
15+
<scope>test</scope>
16+
</dependency>
17+
<dependency>
18+
<groupId>com.neovisionaries</groupId>
19+
<artifactId>nv-websocket-client</artifactId>
20+
<version>1.31</version>
21+
</dependency>
22+
<dependency>
23+
<groupId>com.fasterxml.jackson.core</groupId>
24+
<artifactId>jackson-databind</artifactId>
25+
<version>2.8.6</version>
26+
</dependency>
27+
</dependencies>
28+
29+
<build>
30+
<plugins>
31+
<plugin>
32+
<groupId>org.apache.maven.plugins</groupId>
33+
<artifactId>maven-compiler-plugin</artifactId>
34+
<version>3.6.1</version>
35+
<configuration>
36+
<source>1.6</source>
37+
<target>1.6</target>
38+
</configuration>
39+
</plugin>
40+
</plugins>
41+
</build>
42+
</project>
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
2+
package org.codenergic.eventbus;
3+
4+
import java.util.Map;
5+
6+
import org.codenergic.eventbus.handler.ConnectionHandler;
7+
import org.codenergic.eventbus.handler.MessageHandler;
8+
import org.codenergic.eventbus.handler.ReplyHandler;
9+
10+
public interface EventBus {
11+
static int CONNECTING = 0;
12+
static int OPEN = 1;
13+
static int CLOSING = 2;
14+
static int CLOSED = 3;
15+
16+
void onOpen(ConnectionHandler connectionHandler);
17+
18+
void onClose(ConnectionHandler connectionHandler);
19+
20+
void send(String address, String message);
21+
22+
void send(String address, String message, Map<String, Object> headers);
23+
24+
void send(String address, String message, Map<String, Object> headers, ReplyHandler replyHandler);
25+
26+
void publish(String address, String message, Map<String, Object> headers);
27+
28+
void registerHandler(String address, MessageHandler handler);
29+
30+
void registerHandler(String address, Map<String, Object> headers, MessageHandler handler);
31+
32+
void unregisterHandler(String address, MessageHandler handler);
33+
34+
void unregisterHandler(String address, Map<String, Object> headers, MessageHandler handler);
35+
36+
EventBus open() throws Exception;
37+
38+
void close();
39+
}
Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
package org.codenergic.eventbus;
2+
3+
import java.io.IOException;
4+
import java.util.ArrayList;
5+
import java.util.HashMap;
6+
import java.util.List;
7+
import java.util.Map;
8+
import java.util.Timer;
9+
import java.util.TimerTask;
10+
import java.util.UUID;
11+
12+
import org.codenergic.eventbus.handler.ConnectionHandler;
13+
import org.codenergic.eventbus.handler.Message;
14+
import org.codenergic.eventbus.handler.MessageHandler;
15+
import org.codenergic.eventbus.handler.ReplyHandler;
16+
17+
import com.fasterxml.jackson.core.JsonProcessingException;
18+
import com.fasterxml.jackson.databind.JsonNode;
19+
import com.fasterxml.jackson.databind.ObjectMapper;
20+
import com.neovisionaries.ws.client.WebSocket;
21+
import com.neovisionaries.ws.client.WebSocketAdapter;
22+
import com.neovisionaries.ws.client.WebSocketException;
23+
import com.neovisionaries.ws.client.WebSocketFactory;
24+
import com.neovisionaries.ws.client.WebSocketFrame;
25+
26+
public final class EventBusAdapter implements EventBus {
27+
private Map<String, List<MessageHandler>> messageHandlers = new HashMap<String, List<MessageHandler>>();
28+
private Map<String, ReplyHandler> replyHandlers = new HashMap<String, ReplyHandler>();
29+
30+
private ObjectMapper objectMapper;
31+
private ConnectionHandler onOpenHandler;
32+
private ConnectionHandler onCloseHandler;
33+
34+
private Timer timer = new Timer();
35+
private int pingInterval;
36+
37+
private WebSocket webSocket;
38+
private int state;
39+
40+
private EventBusAdapter(String address, EventBusOptions options) throws IOException {
41+
this.objectMapper = options.getObjectMapper();
42+
this.pingInterval = options.getPingInterval();
43+
this.webSocket = new WebSocketFactory()
44+
.createSocket(address);
45+
46+
this.state = EventBus.CONNECTING;
47+
48+
this.webSocket.addListener(new WebSocketAdapter() {
49+
@Override
50+
public void onError(WebSocket websocket, WebSocketException cause) throws Exception {
51+
super.onError(websocket, cause);
52+
}
53+
54+
@Override
55+
public void onConnected(final WebSocket websocket, Map<String, List<String>> headers) throws Exception {
56+
state = EventBus.OPEN;
57+
if (onOpenHandler != null) {
58+
onOpenHandler.handle();
59+
}
60+
onOpen();
61+
}
62+
63+
@Override
64+
public void onCloseFrame(WebSocket websocket, WebSocketFrame frame) throws Exception {
65+
state = EventBus.CLOSED;
66+
if (onCloseHandler != null) {
67+
onCloseHandler.handle();
68+
}
69+
onClose();
70+
}
71+
72+
@Override
73+
public void onBinaryMessage(WebSocket websocket, byte[] binary) throws Exception {
74+
onMessage(websocket, binary);
75+
}
76+
});
77+
}
78+
79+
public static EventBus connect(String address, EventBusOptions options) throws IOException {
80+
return new EventBusAdapter(address, options);
81+
}
82+
83+
public static EventBus connect(String address) throws IOException {
84+
return EventBusAdapter.connect(address, new EventBusOptions());
85+
}
86+
87+
@Override
88+
public EventBus open() throws Exception {
89+
webSocket.connect();
90+
return this;
91+
}
92+
93+
@Override
94+
public void onOpen(ConnectionHandler connectionHandler) {
95+
this.onOpenHandler = connectionHandler;
96+
}
97+
98+
@Override
99+
public void onClose(ConnectionHandler connectionHandler) {
100+
this.onCloseHandler = connectionHandler;
101+
}
102+
103+
@Override
104+
public void send(String address, String message) {
105+
send(address, message, null, null);
106+
}
107+
108+
@Override
109+
public void send(String address, String message, Map<String, Object> headers) {
110+
send(address, message, headers, null);
111+
}
112+
113+
@Override
114+
public void send(String address, String message, Map<String, Object> headers, ReplyHandler replyHandler) {
115+
sendMessage(Message.TYPE_SEND, address, message, headers, replyHandler);
116+
}
117+
118+
@Override
119+
public void publish(String address, String message, Map<String, Object> headers) {
120+
sendMessage(Message.TYPE_PUBLISH, address, message, headers, null);
121+
}
122+
123+
@Override
124+
public void registerHandler(String address, MessageHandler handler) {
125+
registerHandler(address, null, handler);
126+
}
127+
128+
@Override
129+
public void registerHandler(String address, Map<String, Object> headers, MessageHandler handler) {
130+
if (!messageHandlers.containsKey(address)) {
131+
messageHandlers.put(address, new ArrayList<MessageHandler>());
132+
}
133+
134+
sendMessage(Message.TYPE_REGISTER, address, null, headers, null);
135+
messageHandlers.get(address).add(handler);
136+
}
137+
138+
@Override
139+
public void unregisterHandler(String address, MessageHandler handler) {
140+
unregisterHandler(address, null, handler);
141+
}
142+
143+
@Override
144+
public void unregisterHandler(String address, Map<String, Object> headers, MessageHandler handler) {
145+
List<MessageHandler> handlers = messageHandlers.get(address);
146+
if (handlers == null || handlers.isEmpty()) {
147+
return;
148+
}
149+
150+
sendMessage(Message.TYPE_UNREGISTER, address, null, headers, null);
151+
handlers.remove(handler);
152+
}
153+
154+
@Override
155+
public void close() {
156+
state = EventBus.CLOSING;
157+
webSocket.sendClose();
158+
}
159+
160+
private void sendMessage(String type, String address, String message, Map<String, Object> headers,
161+
ReplyHandler replyHandler) {
162+
if (state != EventBus.OPEN) {
163+
throw new RuntimeException("Invalid state");
164+
}
165+
166+
Message msg = null;
167+
if (Message.TYPE_SEND.equals(type) && replyHandler != null) {
168+
String replyAddress = UUID.randomUUID().toString();
169+
msg = new Message(Message.TYPE_SEND, address, headers, message, replyAddress);
170+
replyHandlers.put(replyAddress, replyHandler);
171+
} else {
172+
msg = new Message(type, address, headers, message);
173+
}
174+
175+
try {
176+
webSocket.sendBinary(objectMapper.writeValueAsBytes(msg));
177+
} catch (JsonProcessingException e) {
178+
e.printStackTrace();
179+
}
180+
}
181+
182+
private void onOpen() {
183+
timer.schedule(new TimerTask() {
184+
@Override
185+
public void run() {
186+
// send ping
187+
webSocket.sendBinary("{\"type\":\"ping\"}".getBytes());
188+
}
189+
}, 0, pingInterval);
190+
}
191+
192+
private void onClose() {
193+
timer.cancel();
194+
}
195+
196+
private void onMessage(WebSocket ws, byte[] body) throws JsonProcessingException, IOException {
197+
JsonNode json = objectMapper.readTree(body);
198+
199+
if (json.has("replyAddress")) {
200+
// currently not supported
201+
}
202+
203+
String address = json.get("address").textValue();
204+
Message message = objectMapper.treeToValue(json, Message.class);
205+
206+
if (messageHandlers.containsKey(address)) {
207+
List<MessageHandler> handlers = messageHandlers.get(address);
208+
for (MessageHandler handler : handlers) {
209+
handler.handle(message);
210+
}
211+
} else if (replyHandlers.containsKey(address)) {
212+
ReplyHandler handler = replyHandlers.get(address);
213+
replyHandlers.remove(address);
214+
handler.handle(message);
215+
} else {
216+
217+
}
218+
}
219+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package org.codenergic.eventbus;
2+
3+
import com.fasterxml.jackson.databind.ObjectMapper;
4+
5+
public class EventBusOptions {
6+
private int pingInterval = 5000;
7+
private ObjectMapper objectMapper = new ObjectMapper();
8+
9+
protected int getPingInterval() {
10+
return pingInterval;
11+
}
12+
13+
public EventBusOptions setPingInterval(int pingInterval) {
14+
this.pingInterval = pingInterval;
15+
return this;
16+
}
17+
18+
protected ObjectMapper getObjectMapper() {
19+
return objectMapper;
20+
}
21+
22+
public EventBusOptions setObjectMapper(ObjectMapper objectMapper) {
23+
this.objectMapper = objectMapper;
24+
return this;
25+
}
26+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package org.codenergic.eventbus.handler;
2+
3+
public interface ConnectionHandler {
4+
void handle();
5+
}

0 commit comments

Comments
 (0)