Skip to content

Commit c597268

Browse files
author
zhuwang
committed
Used deathQueue to kill the unresponsive worker and create a new successor
Changed the name convention Added test case and modified test/lib/server.js to support the test case
1 parent c583155 commit c597268

File tree

5 files changed

+88
-51
lines changed

5 files changed

+88
-51
lines changed

examples/test-samples/nanny-feature-test.js

Lines changed: 0 additions & 32 deletions
This file was deleted.

lib/index.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ Cluster.prototype.listen = function(createApp, cb) {
8282
connThreshold: options.connThreshold || 10000, // recycle workers after this many connections
8383
uptimeThreshold: options.uptimeThreshold || 3600 * 24, // 24 hours (uptimeThreshold is in seconds)
8484
heartbeatInterval: options.heartbeatInterval,
85-
maxHbInterval: options.maxHbInterval,
85+
maxHeartbeatDelay: options.maxHeartbeatDelay,
8686
emitter: self
8787
});
8888

lib/process.js

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ var Process = module.exports = function Process(options) {
4949
workersKilled: 0
5050
};
5151

52-
this.options.maxHbInterval = options.maxHbInterval || 3*60000; //default 3 mins
52+
this.options.maxHeartbeatDelay = options.maxHeartbeatDelay || 3*60000; //default 3 mins
5353

5454
this._heartbeats = [];
5555

@@ -157,7 +157,7 @@ var Process = module.exports = function Process(options) {
157157
// update the last heartbeat time for the worker
158158
var workerStats = self.stats.workers[message.pid];
159159
//console.log('heartbeat ' + process.pid);
160-
workerStats.lastHbTime = Date.now();
160+
workerStats.lastHeartbeatAt= Date.now();
161161
}
162162

163163
self._heartbeatScheduler = self._heartbeatScheduler || setInterval(function () {
@@ -203,20 +203,16 @@ var Process = module.exports = function Process(options) {
203203
// Check the last heartbeat time of all the workers
204204
_.each(self.stats.workers, function (workerStats, pid) {
205205
var now = Date.now();
206-
//console.log(workerStats.lastHbTime + ', ' + self.options.maxHbInterval);
207-
if (now - workerStats.lastHbTime > self.options.maxHbInterval) {
208-
// this worker hasn't been sending heartbeat for maxHbInterval
209-
console.log(util.format('[Cluster2] Detected worker%d is not responsive for %d', pid, now - workerStats.lastHbTime));
210-
// create a replacement immediatly
211-
var replacement = self.createWorker();
212-
self.workers[replacement.pid] = replacement;
213-
console.log(util.format('[Cluster2] Created a new worker with pid %d', replacement.pid));
214-
// kill the dead worker
215-
var workerFileName = util.format('%s/worker.%d.pid', self.options.pids, pid);
216-
console.log(util.format('[Cluster2] Kill the orignial worker %s', workerFileName));
217-
self.kill(workerFileName, 'SIGINT', function deleteEntry() {
218-
delete self.workers[pid];
219-
delete self.stats.workers[pid];
206+
if (now - workerStats.lastHeartbeatAt> self.options.maxHeartbeatDelay) {
207+
// this worker hasn't been sending heartbeat for maxHeartbeatDelay
208+
log(util.format('[Cluster2] Detected worker%d is not responsive for %d', pid, now - workerStats.lastHeartbeatAt));
209+
var deathQueue = require('./misc').deathQueue;
210+
deathQueue(self.workers[pid], self.emitter, function () {
211+
// create a successor
212+
var successor = self.createWorker();
213+
self.workers[successor.pid] = successor;
214+
log(util.format('[Cluster2] Created a new worker with pid %d', successor.pid));
215+
return successor;
220216
});
221217
}
222218
});

test/lib/server.js

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,17 @@ server.get('/', function(req, res) {
2626
}
2727
});
2828

29+
// test nanny feature
30+
server.get('/nanny-feature-test', function (req, res) {
31+
// the first worker that handles the request will run away
32+
res.send(process.pid + '');
33+
console.log('worker ' + process.pid + ' will run away after 2s');
34+
setTimeout(function () {
35+
console.log('worker ' + process.pid + ' runs away');
36+
clearInterval(process.heartbeat);
37+
}, 2000);
38+
});
39+
2940
server.on('close', function() {
3041
serving = false;
3142
})
@@ -40,12 +51,14 @@ var c = new Cluster({
4051
ecv: {
4152
control: true
4253
},
43-
heartbeatInterval: process.env["heartbeatInterval"] || 1000
54+
heartbeatInterval: process.env["heartbeatInterval"] || 1000,
55+
maxHeartbeatDelay: process.env["maxHeartbeatDelay"] || 3000
4456
});
4557

4658
c.on('died', function(pid) {
47-
console.log('Worker ' + pid + ' died');
59+
//console.log('Worker ' + pid + ' died');
4860
process.send({
61+
pid: pid,
4962
dead: true
5063
})
5164
});

test/nanny-feature-test.js

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
'use strict';
2+
3+
var should = require('should'),
4+
request = require('request'),
5+
_ = require('underscore'),
6+
spawn = require('child_process').spawn,
7+
EventEmitter = require('events').EventEmitter,
8+
emitter = new EventEmitter();
9+
10+
describe('Test Cluster2 Nanny Feature', function () {
11+
12+
var childProc;
13+
14+
before(function (done) {
15+
this.timeout(10000);
16+
var env = {};
17+
_.extend(env, process.env);
18+
_.extend(env, {
19+
host: '127.0.0.1',
20+
port: 3000,
21+
noWorkers: 2,
22+
heartbeatInterval: 500,
23+
maxHeartbeatDelay: 1500
24+
});
25+
26+
childProc = spawn('node', ['test/lib/server.js'], {
27+
env: env,
28+
stdio: ['pipe', 1, 2, 'ipc']
29+
});
30+
31+
childProc.once('message', function (msg) {
32+
if (msg.ready) {
33+
return done();
34+
}
35+
});
36+
//setTimeout(done, 5000);
37+
});
38+
39+
it('Should get the run away worker id and get notified when the worker died', function (done) {
40+
var pid;
41+
this.timeout(10000);
42+
request.get('http://127.0.0.1:3000/nanny-feature-test', function (err, res, body) {
43+
should.not.exist(err);
44+
body.should.be.ok;
45+
pid = parseInt(body);
46+
//console.log('body: ' + pid);
47+
});
48+
childProc.on('message', function (msg) {
49+
if (msg.dead) {
50+
msg.pid.should.equal(pid);
51+
return done();
52+
}
53+
});
54+
});
55+
56+
after(function (done) {
57+
childProc.kill('SIGKILL');
58+
done();
59+
});
60+
});

0 commit comments

Comments
 (0)