Skip to content

Commit 8a99c94

Browse files
committed
Merge pull request #60 from nodejitsu/v0.7.x
v0.7.x
2 parents 8bc5ffb + b133b74 commit 8a99c94

File tree

14 files changed

+95
-70
lines changed

14 files changed

+95
-70
lines changed

README.md

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,11 @@ Here is a simple example of a [Reactor](#reactors) server that will send an emai
5151
})
5252
],
5353
//
54-
// Add Reconnect logic that uses node-backoff
54+
// Add Reconnect logic that uses `back`
5555
//
5656
reconnect: {
57-
type: 'exponential',
58-
maxTries: 2,
59-
initialDelay: 100,
57+
retries: 2,
58+
minDelay: 100,
6059
maxDelay: 300
6160
}
6261
}).connect(1337);

lib/godot/net/client.js

Lines changed: 51 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@
77

88
var dgram = require('dgram'),
99
net = require('net'),
10-
util = require('util'),
11-
backoff = require('backoff'),
12-
EventEmitter = require('events').EventEmitter;
10+
utile = require('utile'),
11+
clone = utile.clone,
12+
back = require('back'),
13+
EventEmitter = require('events').EventEmitter,
14+
noop = function () {};
1315

1416
//
1517
// ### function Server (options)
@@ -29,13 +31,19 @@ var Client = module.exports = function Client(options) {
2931
throw new Error('Cannot create client without type: udp, tcp, unix');
3032
}
3133

34+
if(typeof options.reconnect !== 'undefined'
35+
&& typeof options.reconnect!== 'object') {
36+
throw new Error('Reconnect must be a defined object if used')
37+
}
38+
3239
var self = this;
3340

3441
this.type = options.type;
3542
this.host = options.host;
3643
this.port = options.port;
3744
this.path = options.path;
3845
this.reconnect = options.reconnect;
46+
this.attempt = null;
3947
this.producers = {};
4048
this.handlers = {
4149
data: {},
@@ -52,7 +60,7 @@ var Client = module.exports = function Client(options) {
5260
//
5361
// Inherit from EventEmitter
5462
//
55-
util.inherits(Client, EventEmitter);
63+
utile.inherits(Client, EventEmitter);
5664

5765
//
5866
// ### function add (producer)
@@ -126,28 +134,7 @@ Client.prototype.write = function (data) {
126134
// Opens the underlying network connection for this client.
127135
//
128136
Client.prototype.connect = function (port, host, callback) {
129-
var self = this,
130-
connectBackoff, backoffType;
131-
132-
if (this.reconnect) {
133-
if (typeof this.reconnect === 'object') {
134-
backoffType = this.reconnect.type || 'exponential';
135-
connectBackoff = backoff[backoffType](this.reconnect);
136-
connectBackoff.failAfter(this.reconnect.maxTries || 10);
137-
}
138-
else {
139-
connectBackoff = backoff.exponential();
140-
connectBackoff.failAfter(10);
141-
}
142-
143-
connectBackoff.on('fail', function (err) {
144-
self.emit('error', err);
145-
});
146-
147-
connectBackoff.on('ready', function () {
148-
connect();
149-
});
150-
}
137+
var self = this;
151138

152139
//
153140
// Do some fancy arguments parsing to support everything
@@ -173,8 +160,41 @@ Client.prototype.connect = function (port, host, callback) {
173160
: self.emit('error', err) ;
174161
}
175162

163+
function reconnect(err) {
164+
self.attempt = self.attempt || clone(self.reconnect);
165+
//
166+
// Remark: Terminate the backoff when we have hit our fail condition with
167+
// a noop to avoid an if statement
168+
//
169+
// TODO: Make this less coupled (I feel like i want this contained in
170+
// `back` but eh)
171+
//
172+
return self.terminate
173+
? noop()
174+
: back(function (fail, backoff) {
175+
//
176+
// Remark: We are done here, emit error and set termination
177+
//
178+
if (fail) {
179+
self.terminate = true;
180+
self.attempt = null;
181+
return self.emit('error', err);
182+
}
183+
//
184+
// So we can listen on when reconnect events are about to fire
185+
//
186+
self.emit('reconnect');
187+
//
188+
// Attempt a CONNECT!
189+
//
190+
return connect();
191+
}, self.attempt);
192+
}
193+
176194
function onError(err) {
177-
return connectBackoff ? connectBackoff.backoff(err) : self.emit('error', err);
195+
return self.reconnect
196+
? reconnect(err)
197+
: self.emit('error', err);
178198
}
179199

180200
function connect() {
@@ -193,9 +213,10 @@ Client.prototype.connect = function (port, host, callback) {
193213

194214
self.socket.on('error', onError);
195215
self.socket.on('connect', function () {
196-
if (connectBackoff) {
197-
connectBackoff.reset();
198-
}
216+
//
217+
// Remark: We have successfully connected so reset the terminate variable
218+
//
219+
self.terminate = false;
199220
self.emit('connect');
200221
});
201222
}

lib/godot/net/server.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ utile.inherits(Server, events.EventEmitter);
7171
Server.prototype.add = function (reactor) {
7272
this.emit('add', reactor);
7373
reactor.on('error', this.emit.bind(this, 'error'));
74+
reactor.on('reactor:error', this.emit.bind(this, 'reactor:error'));
7475
this.reactors[reactor.id] = reactor;
7576
};
7677

lib/godot/producer/producer.js

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,11 @@
88
var stream = require('stream'),
99
ip = require('ip'),
1010
utile = require('utile'),
11-
uuid = require('node-uuid');
11+
uuid = require('node-uuid'),
12+
tick = typeof setImmediate == 'undefined'
13+
? process.nextTick
14+
: setImmediate;
1215

13-
//
14-
// ### function Producer (options)
1516
// #### @options {Object} Options for this producer.
1617
// Constructor function for the Producer object responsible
1718
// for creating events to process.
@@ -107,7 +108,7 @@ Object.keys(Producer.prototype.types).forEach(function (key) {
107108
//
108109
if (value === 0) {
109110
return (function tickProduce() {
110-
process.nextTick(function () {
111+
tick(function () {
111112
self.produce();
112113
tickProduce();
113114
});

lib/godot/reactor/email.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ Email.prototype.write = function (data) {
7676
self._last = new Date();
7777

7878
return err
79-
? self.emit('error', err)
79+
? self.emit('reactor:error', err)
8080
: self.emit('data', data);
8181
});
8282
};

lib/godot/reactor/graphite.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ Graphite.prototype.write = function (data) {
8282
metrics[metricName] = this.meta ? data.meta[this.meta] : data.metric;
8383
this.client.write(metrics, data.time, function (err) {
8484
self._last = now;
85-
if (err) { return self.emit('error', err) }
85+
if (err) { return self.emit('reactor:error', err) }
8686
});
8787

8888
self.emit('data', data);

lib/godot/reactor/map.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,13 @@ Map.prototype.write = function (data) {
5555
if (!this.passThrough) {
5656
return this.mapFn(data, function (err, data) {
5757
return err
58-
? self.emit('error', err)
58+
? self.emit('reactor:error', err)
5959
: self.emit('data', data);
6060
});
6161
}
6262

6363
this.mapFn(data, function (err) {
64-
if (err) { self.emit('error', err) }
64+
if (err) { self.emit('reactor:error', err) }
6565
});
6666

6767
this.emit('data', data);

lib/godot/reactor/reactor.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ Reactor.prototype.createStream = function (source) {
7575
return this.reactors.reduce(function (last, nextOptions) {
7676
var stream = wrapStream(nextOptions.Factory, nextOptions.args || []);
7777
stream.on('error', self.emit.bind(self, 'error'));
78+
stream.on('reactor:error', self.emit.bind(self, 'reactor:error'));
7879
return last.pipe(stream);
7980
}, source);
8081
};

lib/godot/reactor/redis.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ var Redis = module.exports = function Redis(options, redisFn) {
3434
if (!this.client) {
3535
this.client = redis.createClient(this.port, this.host, this.redisOptions);
3636

37-
this.client.on('error', this.emit.bind(this, 'error'));
37+
this.client.on('error', this.emit.bind(this, 'reactor:error'));
3838
if (this.password) {
3939
this.client.auth(this.password, function () {
4040
// Remark: What if data is sent before we are authenticated?
@@ -56,7 +56,7 @@ utile.inherits(Redis, ReadWriteStream);
5656
Redis.prototype.write = function (data) {
5757
var self = this;
5858
this.redisFn(this.client, data, function (err, data) {
59-
if (err) { return self.emit('error', err) }
59+
if (err) { return self.emit('reactor:error', err) }
6060
});
6161

6262
this.emit('data', data);

lib/godot/reactor/sms.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ Sms.prototype.write = function (data) {
8080
self._last = new Date();
8181

8282
return err
83-
? self.emit('error', err)
83+
? self.emit('reactor:error', err)
8484
: self.emit('data', data);
8585
});
86-
};
86+
};

0 commit comments

Comments
 (0)