Skip to content

Commit f2bd3ad

Browse files
committed
Merge pull request #1 from cubejs/master
rebasing
2 parents ad631cf + d820198 commit f2bd3ad

File tree

6 files changed

+124
-7
lines changed

6 files changed

+124
-7
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,6 @@ logs
1212
cluster-cache-domain
1313
cluster-cache-persist
1414
ports
15+
/coverage/
16+
/lib-cov/
17+
/log/

lib/index.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ Cluster.prototype.listen = function(createApp, cb) {
8282
connThreshold: options.connThreshold || 10000, // recycle workers after this many connections
8383
uptimeThreshold: options.uptimeThreshold || 3600 * 24, // 24 hours (uptimeThreshold is in seconds)
8484
heartbeatInterval: options.heartbeatInterval,
85+
maxHeartbeatDelay: options.maxHeartbeatDelay,
8586
emitter: self
8687
});
8788

lib/process.js

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ var Process = module.exports = function Process(options) {
4949
workersKilled: 0
5050
};
5151

52+
this.options.maxHeartbeatDelay = options.maxHeartbeatDelay || 3*60000; //default 3 mins
53+
5254
this._heartbeats = [];
5355

5456
this.killall = function(signal) {
@@ -150,9 +152,12 @@ var Process = module.exports = function Process(options) {
150152
self.emitter.emit('listening', message.pid);
151153
}
152154
if(message.type === 'heartbeat'){
153-
154155
if(message.pid != process.pid){
155156
self._heartbeats.push(message);//must append to the tail
157+
// update the last heartbeat time for the worker
158+
var workerStats = self.stats.workers[message.pid];
159+
//console.log('heartbeat ' + process.pid);
160+
workerStats.lastHeartbeatAt= Date.now();
156161
}
157162

158163
self._heartbeatScheduler = self._heartbeatScheduler || setInterval(function () {
@@ -194,6 +199,23 @@ var Process = module.exports = function Process(options) {
194199
});
195200

196201
self.lastTime = Date.now();
202+
203+
// Check the last heartbeat time of all the workers
204+
_.each(self.stats.workers, function (workerStats, pid) {
205+
var now = Date.now();
206+
if (now - workerStats.lastHeartbeatAt> self.options.maxHeartbeatDelay) {
207+
// this worker hasn't been sending heartbeat for maxHeartbeatDelay
208+
log(util.format('[Cluster2] Detected worker%d is not responsive for %d', pid, now - workerStats.lastHeartbeatAt));
209+
var deathQueue = require('./misc').deathQueue;
210+
deathQueue(self.workers[pid], self.emitter, function () {
211+
// create a successor
212+
var successor = self.createWorker();
213+
self.workers[successor.pid] = successor;
214+
log(util.format('[Cluster2] Created a new worker with pid %d', successor.pid));
215+
return successor;
216+
});
217+
}
218+
});
197219

198220
}, self.options.heartbeatInterval || 60000);
199221
}
@@ -385,17 +407,19 @@ Process.prototype.listen = function() {
385407
self.emitter.emit('died', worker.pid);
386408
self.stats.workersKilled++;
387409
self.stats.noWorkers--;
410+
delete self.workers[worker.pid + ''];
411+
delete self.stats.workers[worker.pid];
388412
return;
389413
}
390414

391415
self.emitter.emit('died', worker.pid);
392416
self.stats.workersKilled++;
393417
self.stats.noWorkers--;
418+
delete self.workers[worker.pid + ''];
419+
delete self.stats.workers[worker.pid];
394420
//bugfix by huzhou@ebay.com, worker & replacement name collision
395421
var replacement = self.createWorker();
396422
self.workers[replacement.pid + ''] = replacement;
397-
delete self.workers[worker.pid + ''];
398-
delete self.stats.workers[worker.pid];
399423

400424
log('[cluster2] updated worker list:' + _.keys(self.workers));
401425
};
@@ -456,6 +480,8 @@ Process.prototype.listen = function() {
456480
}
457481
});
458482

483+
// put the emitter in the process
484+
process.emitter = self.emitter;
459485
self.emitter.emit(signal, {
460486
pid: process.pid,
461487
type: 'worker'
@@ -551,7 +577,7 @@ Process.prototype.listen = function() {
551577
// we'd like to have the threshold randomized in between [1, 1.5) of the given threshold
552578
// to avoid all workers die around the same time. This is in particular important for boxes of small number of cpu cores
553579
var connThreshold = self.options.connThreshold,
554-
uptimeThreshod = self.options.uptimeThreshod;
580+
uptimeThreshold = self.options.uptimeThreshold;
555581

556582
var recycle = setInterval(function() {
557583

@@ -637,6 +663,8 @@ Process.prototype.listen = function() {
637663
});
638664

639665
}, self.options.heartbeatInterval || 60000);
666+
// put the heartbeat interval id in the process context
667+
process.heartbeat = heartbeat;
640668

641669
_.each(apps, function(app){
642670

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
"email": "subbu@ebaysf.com"
66
}],
77
"name": "cluster2",
8-
"version": "0.4.15",
8+
"version": "0.4.19",
99
"repository": {
1010
"type": "git",
1111
"url": "https://github.com/ql-io/cluster2"

test/lib/server.js

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,17 @@ server.get('/', function(req, res) {
2626
}
2727
});
2828

29+
// test nanny feature
30+
server.get('/nanny-feature-test', function (req, res) {
31+
// the first worker that handles the request will run away
32+
res.send(process.pid + '');
33+
console.log('worker ' + process.pid + ' will run away after 2s');
34+
setTimeout(function () {
35+
console.log('worker ' + process.pid + ' runs away');
36+
clearInterval(process.heartbeat);
37+
}, 2000);
38+
});
39+
2940
server.on('close', function() {
3041
serving = false;
3142
})
@@ -40,12 +51,14 @@ var c = new Cluster({
4051
ecv: {
4152
control: true
4253
},
43-
heartbeatInterval: process.env["heartbeatInterval"] || 1000
54+
heartbeatInterval: process.env["heartbeatInterval"] || 1000,
55+
maxHeartbeatDelay: process.env["maxHeartbeatDelay"] || 3000
4456
});
4557

4658
c.on('died', function(pid) {
47-
console.log('Worker ' + pid + ' died');
59+
//console.log('Worker ' + pid + ' died');
4860
process.send({
61+
pid: pid,
4962
dead: true
5063
})
5164
});

test/nanny-feature-test.js

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
'use strict';
2+
3+
var should = require('should'),
4+
request = require('request'),
5+
_ = require('underscore'),
6+
spawn = require('child_process').spawn,
7+
nodeunit = require('nodeunit'),
8+
EventEmitter = require('events').EventEmitter,
9+
emitter = new EventEmitter(),
10+
childProc;
11+
12+
exports['Nanny Feature Test'] = {
13+
setUp: function (callback) {
14+
var env = {};
15+
_.extend(env, process.env);
16+
_.extend(env, {
17+
host: '127.0.0.1',
18+
port: 3000,
19+
monPort: 3001,
20+
noWorkers: 2,
21+
hearheatInterval: 500,
22+
maxHeartbeatDelay: 1500
23+
});
24+
25+
childProc = spawn('node', ['test/lib/server.js'], {
26+
env: env,
27+
stdio: ['pipe', 1, 2, 'ipc']
28+
});
29+
30+
childProc.once('message', function (msg) {
31+
if (msg.ready) {
32+
//test.done();
33+
callback();
34+
}
35+
});
36+
},
37+
38+
tearDown: function (callback) {
39+
childProc.kill('SIGKILL');
40+
callback();
41+
},
42+
43+
'worker runs away and gets killed & replacement has been created': function (test) {
44+
var pid;
45+
request.get('http://127.0.0.1:3000/nanny-feature-test', function (err, res, body) {
46+
test.equal(err, null);
47+
test.ok(body);
48+
pid = parseInt(body);
49+
console.log(pid);
50+
});
51+
childProc.on('message', function (msg) {
52+
if (msg.dead) {
53+
console.log(msg.pid);
54+
test.strictEqual(msg.pid, pid);
55+
request.get('http://127.0.0.1:3001/ComponentStatus?component=worker', function (err, res, body) {
56+
console.log(body);
57+
var pids = body.substring(1, body.length - 1).split(',');
58+
test.strictEqual(pids.length, 3); // master & 2 workers
59+
test.done();
60+
});
61+
}
62+
});
63+
},
64+
65+
/*'New worker created to replace the killed worker': function (test) {
66+
request.get('http://127.0.0.1:3001/ComponentStatus?component=worker', function (err, res, body) {
67+
var pids = body.substring(1, body.length - 1).split(',');
68+
test.strictEqual(pids.length, 3); // master & 2 workers
69+
test.done();
70+
});
71+
}*/
72+
};

0 commit comments

Comments
 (0)