Skip to content

Commit

Permalink
NatsClient refactor (eugenp#3914)
Browse files Browse the repository at this point in the history
  • Loading branch information
pivovarit authored Apr 11, 2018
1 parent 5da2ea9 commit df42f24
Showing 1 changed file with 26 additions and 54 deletions.
80 changes: 26 additions & 54 deletions libraries/src/main/java/com/baeldung/jnats/NatsClient.java
Original file line number Diff line number Diff line change
@@ -1,31 +1,35 @@
package com.baeldung.jnats;

import io.nats.client.*;

import java.io.IOException;
import java.util.*;

import io.nats.client.AsyncSubscription;
import io.nats.client.Connection;
import io.nats.client.Message;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.nats.client.Subscription;
import io.nats.client.SyncSubscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NatsClient {
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public final class NatsClient {

private String serverURI;
private final String serverURI;

private Connection natsConnection;
private final Connection natsConnection;

private Map<String, Subscription> subscriptions = new HashMap<>();
private final Map<String, Subscription> subscriptions = new HashMap<>();

private final static Logger log = LoggerFactory.getLogger(NatsClient.class);

public NatsClient() {
NatsClient() {
this.serverURI = "jnats://localhost:4222";
natsConnection = initConnection(serverURI);
}


public NatsClient(String serverURI) {

if ((serverURI != null) && (!serverURI.isEmpty())) {
this.serverURI = serverURI;
} else {
Expand All @@ -40,59 +44,33 @@ public void closeConnection() {
natsConnection.close();
}


private Connection initConnection(String uri) {
try {

Options options = new Options.Builder()
.errorCb(new ExceptionHandler() {
@Override
public void onException(NATSException ex) {
log.error("Connection Exception: ", ex);
}
})
.disconnectedCb(new DisconnectedCallback() {
@Override
public void onDisconnect(ConnectionEvent event) {
log.error("Channel disconnected: {}", event.getConnection());
}
})
.reconnectedCb(new ReconnectedCallback() {
@Override
public void onReconnect(ConnectionEvent event) {
log.error("Reconnected to server: {}", event.getConnection());
}
})
.errorCb(ex -> log.error("Connection Exception: ", ex))
.disconnectedCb(event -> log.error("Channel disconnected: {}", event.getConnection()))
.reconnectedCb(event -> log.error("Reconnected to server: {}", event.getConnection()))
.build();

return Nats.connect(uri, options);

} catch (IOException ioe) {
log.error("Error connecting to NATs! ", ioe);
return null;
}
}


public void publishMessage(String topic, String replyTo, String message) {
void publishMessage(String topic, String replyTo, String message) {
try {
// Simple Publisher
natsConnection.publish(topic, replyTo, message.getBytes());
} catch (IOException ioe) {
log.error("Error publishing message: {} to {} ", message, topic, ioe);
}
}


public void subscribeAsync(String topic) {

// Simple Async Subscriber
AsyncSubscription subscription = natsConnection.subscribe(topic, new MessageHandler() {
@Override
public void onMessage(Message msg) {
log.info("Received message on {}", msg.getSubject());
}
});
AsyncSubscription subscription = natsConnection.subscribe(
topic, msg -> log.info("Received message on {}", msg.getSubject()));

if (subscription == null) {
log.error("Error subscribing to {}", topic);
Expand All @@ -101,13 +79,11 @@ public void onMessage(Message msg) {
}
}

public SyncSubscription subscribeSync(String topic) {
// Simple Sync Subscriber
SyncSubscription subscribeSync(String topic) {
return natsConnection.subscribe(topic);
}

public void unsubscribe(String topic) {

try {
Subscription subscription = subscriptions.get(topic);

Expand All @@ -116,15 +92,12 @@ public void unsubscribe(String topic) {
} else {
log.error("{} not found. Unable to unsubscribe.", topic);
}

} catch (IOException ioe) {
log.error("Error unsubscribing from {} ", topic, ioe);
}
}


public Message makeRequest(String topic, String request) {

Message makeRequest(String topic, String request) {
try {
return natsConnection.request(topic, request.getBytes(), 100);
} catch (IOException | InterruptedException ioe) {
Expand All @@ -133,7 +106,7 @@ public Message makeRequest(String topic, String request) {
}
}

public void installReply(String topic, String reply) {
void installReply(String topic, String reply) {
natsConnection.subscribe(topic, message -> {
try {
natsConnection.publish(message.getReplyTo(), reply.getBytes());
Expand All @@ -143,8 +116,7 @@ public void installReply(String topic, String reply) {
});
}

public SyncSubscription joinQueueGroup(String topic, String queue) {
SyncSubscription joinQueueGroup(String topic, String queue) {
return natsConnection.subscribe(topic, queue);
}

}

0 comments on commit df42f24

Please sign in to comment.