Skip to content

Commit

Permalink
update all transporters
Browse files Browse the repository at this point in the history
  • Loading branch information
icebob committed May 3, 2017
1 parent 08af862 commit e685052
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 52 deletions.
2 changes: 1 addition & 1 deletion examples/multi-server/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ let chalk = require("chalk");

let { STRATEGY_ROUND_ROBIN, STRATEGY_RANDOM } = require("../../src/constants");
let ServiceBroker = require("../../src/service-broker");
let NatsTransporter = require("../../src/transporters/nats");
let NatsTransporter = require("../../src/transporters/mqtt");

// Create broker
let broker = new ServiceBroker({
Expand Down
2 changes: 1 addition & 1 deletion examples/multi-server/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
let _ = require("lodash");
let ServiceBroker = require("../../src/service-broker");
let { CustomError } = require("../../src/errors");
let NatsTransporter = require("../../src/transporters/nats");
let NatsTransporter = require("../../src/transporters/mqtt");

// Create broker
let broker = new ServiceBroker({
Expand Down
2 changes: 1 addition & 1 deletion src/transit.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class Transit {
}

/**
* Connect with transporter
* Connect with transporter. If failed, try again after 5 sec.
*
* @memberOf Transit
*/
Expand Down
3 changes: 1 addition & 2 deletions src/transporters/fake.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ class FakeTransporter extends Transporter {
* @memberOf FakeTransporter
*/
connect() {
this.connected = true;
return Promise.resolve();
return this.onConnected();
}

/**
Expand Down
22 changes: 11 additions & 11 deletions src/transporters/mqtt.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,36 +40,36 @@ class MqttTransporter extends Transporter {
*/
connect() {
return new Promise((resolve, reject) => {
let mqtt = require("mqtt");
this.client = mqtt.connect(this.opts.mqtt);
const mqtt = require("mqtt");
const client = mqtt.connect(this.opts.mqtt);

this.client.on("connect", () => {
client.on("connect", () => {
this.client = client;
this.logger.info("MQTT connected!");
this.connected = true;

resolve();
this.onConnected().then(resolve);
});

/* istanbul ignore next */
this.client.on("error", (e) => {
this.logger.error("MQTT error", e);
client.on("error", (e) => {
this.logger.error("MQTT error!", e.message);

if (!this.client.connected)
if (!client.connected)
reject(e);
});

/* istanbul ignore next */
this.client.on("reconnect", () => {
client.on("reconnect", () => {
this.logger.warn("MQTT reconnecting...");
});

this.client.on("message", (topic, msg) => {
client.on("message", (topic, msg) => {
const cmd = topic.split(".")[1];
this.messageHandler(cmd, msg);
});

/* istanbul ignore next */
this.client.on("close", () => {
client.on("close", () => {
this.connected = true;
this.logger.warn("MQTT disconnected!");
});
Expand Down
23 changes: 11 additions & 12 deletions src/transporters/nats.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,45 +43,44 @@ class NatsTransporter extends Transporter {
connect() {
return new Promise((resolve, reject) => {
let Nats = require("nats");
this.client = Nats.connect(this.opts.nats);
const client = Nats.connect(this.opts.nats);

this.client.on("connect", () => {
client.on("connect", () => {
this.client = client;
this.logger.info("NATS connected!");

this.onConnected().then(resolve);
});

this.client.on("reconnect", () => {
/* istanbul ignore next */
client.on("reconnect", () => {
this.logger.info("NATS reconnected!");
this.onConnected(true);
});

/* istanbul ignore next */
this.client.on("reconnecting", () => {
client.on("reconnecting", () => {
this.logger.warn("NATS reconnecting...");
});

/* istanbul ignore next */
this.client.on("disconnect", () => {
client.on("disconnect", () => {
if (this.connected) {
this.logger.warn("NATS disconnected!");
this.connected = false;
}
});

/* istanbul ignore next */
this.client.on("error", e => {
client.on("error", e => {
this.logger.error("NATS error!", e.message);
/*if (e.toString().indexOf("ECONNREFUSED") != -1) {
this.reconnectAfterTime();
return;
}*/

if (!this.client.connected)
if (!client.connected)
reject(e);
});

/* istanbul ignore next */
this.client.on("close", () => {
client.on("close", () => {
this.connected = false;
this.logger.warn("NATS connection closed!");
});
Expand Down
51 changes: 27 additions & 24 deletions src/transporters/redis.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,51 +41,54 @@ class RedisTransporter extends Transporter {
*/
connect() {
return new Promise((resolve, reject) => {
let Redis = require("ioredis");
this.clientSub = new Redis(this.opts.redis);
this.clientPub = new Redis(this.opts.redis);
const Redis = require("ioredis");
const clientSub = new Redis(this.opts.redis);

this.clientSub.on("connect", () => {
clientSub.on("connect", () => {
this.logger.info("Redis-sub connected!");

this.clientPub.on("connect", () => {
const clientPub = new Redis(this.opts.redis);

clientPub.on("connect", () => {
this.clientSub = clientSub;
this.clientPub = clientPub;

this.logger.info("Redis-pub connected!");

this.connected = true;
this.onConnected().then(resolve);
});

resolve();
/* istanbul ignore next */
clientPub.on("error", (e) => {
this.logger.error("Redis-pub error", e);

if (!this.connected)
reject(e);
});

/* istanbul ignore next */
clientPub.on("close", () => {
this.connected = true;
this.logger.warn("Redis-pub disconnected!");
});
});

this.clientSub.on("message", (topic, msg) => {
clientSub.on("message", (topic, msg) => {
const cmd = topic.split(".")[1];
this.messageHandler(cmd, msg);
});

/* istanbul ignore next */
this.clientPub.on("error", (e) => {
this.logger.error("Redis-pub error", e);

if (!this.client.connected)
reject(e);
});

/* istanbul ignore next */
this.clientSub.on("error", (e) => {
clientSub.on("error", (e) => {
this.logger.error("Redis-sub error", e);
});

/* istanbul ignore next */
this.clientSub.on("close", () => {
clientSub.on("close", () => {
this.connected = true;
this.logger.warn("Redis-sub disconnected!");
});

/* istanbul ignore next */
this.clientPub.on("close", () => {
this.connected = true;
this.logger.warn("Redis-pub disconnected!");
});

});
}

Expand Down

0 comments on commit e685052

Please sign in to comment.