Skip to content

Commit a2e9656

Browse files
author
huzhou
committed
~ 0.4.23 much more rigorous deathQueue
1 parent 064f460 commit a2e9656

File tree

2 files changed

+158
-7
lines changed

2 files changed

+158
-7
lines changed

lib/misc.js

Lines changed: 146 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,12 @@
1414
* limitations under the License.
1515
*/
1616

17-
var fs = require('fs'),
18-
assert = require('assert'),
17+
var _ = require('underscore'),
18+
fs = require('fs'),
19+
util = require('util'),
1920
when = require('when'),
20-
timeout = require('when/timeout');
21+
timeout = require('when/timeout'),
22+
assert = require('assert');
2123

2224
// Utility to ensure that certain directories exist
2325
exports.ensureDir = function(dir, clean) {
@@ -53,7 +55,7 @@ exports.forMemoryNum = function(memory) {
5355
};
5456

5557

56-
var tillPrevDeath = null;
58+
/*var tillPrevDeath = null;
5759
5860
exports.deathQueue = function deathQueue(worker, emitter, success){
5961
@@ -105,4 +107,144 @@ exports.deathQueue = function deathQueue(worker, emitter, success){
105107
//some one in the queue already, wait till prev death and then start `die`
106108
afterDeath = tillPrevDeath = timeout(60000, tillPrevDeath.ensure(die));
107109
}
110+
};*/
111+
112+
exports.safeKill = function(pid, signal, logger){
113+
114+
try{
115+
process.kill(pid, signal);
116+
return false;
117+
}
118+
catch(e){
119+
//verify error is Error: ESRCH
120+
logger.debug('[shutdown] safeKill received:%j', e);
121+
return e.errno === 'ESRCH'; //no such process
122+
}
108123
};
124+
125+
exports.deathQueueGenerator = function(options){
126+
127+
options = options || {};
128+
129+
var tillPrevDeath = null,
130+
queue = options.queue || [],
131+
wait = options.timeout || 60000,
132+
retry = options.retry || 3,
133+
logger = options.logger || {
134+
'debug' : function(){
135+
console.log.apply(console, arguments);
136+
}
137+
};
138+
139+
return function deathQueue(worker, emitter, success){
140+
141+
assert.ok(worker);
142+
assert.ok(emitter);
143+
assert.ok(success);
144+
145+
var pid = worker.pid,
146+
death = util.format('worker-%d-died', pid);
147+
148+
if(!_.contains(queue, pid)){
149+
150+
queue.push(pid);
151+
152+
var tillDeath = when.defer(),
153+
afterDeath = null,
154+
die = function die(retry){
155+
156+
if(!retry){
157+
if(tillPrevDeath){
158+
tillPrevDeath.reject(new Error('[deathQueue] failed after retries'));
159+
}
160+
tillPrevDeath = null;//reset
161+
}
162+
163+
var successor = success(),
164+
successorPid = successor.process.pid,
165+
successorGuard = setTimeout(function onSuccessorTimeout(){
166+
//handle error case of successor not 'listening' after started
167+
exports.safeKill(pid, 'SIGTERM', logger);
168+
169+
logger.debug('[deathQueue] successor:%d did not start listening, kill by SIGTERM', successorPid);
170+
//cancel onListening event handler of the dead successor
171+
emitter.removeListener('listening', onSuccessorListening);
172+
//retry of the 'die' process
173+
die(retry - 1);
174+
175+
}, wait);
176+
177+
//when successor is in place, the old worker could be discontinued finally
178+
emitter.on('listening', function onSuccessorListening(onboard){
179+
180+
if(successorPid !== onboard){
181+
return; //noop
182+
}
183+
else{
184+
emitter.removeListener('listening', onSuccessorListening);
185+
}
186+
187+
clearTimeout(successorGuard);
188+
logger.debug('[deathQueue] successor:%d of %d is ready, wait for %s and timeout in:%dms', successorPid, pid, death, wait);
189+
190+
var deathGuard = setTimeout(function(){
191+
192+
if(!exports.safeKill(pid, 'SIGTERM', logger)){
193+
//worker still there, should emit 'exit' eventually
194+
logger.debug('[deathQueue] worker:%d did not report death by:%d, kill by SIGTERM', pid, wait);
195+
//remove the redundant exit listener
196+
emitter.removeListener('died', onDeath);
197+
}
198+
else{//suicide or accident already happended, process has run away
199+
//we emit this from master on behalf of the run away process.
200+
logger.debug('[deathQueue] worker:%d probably ran away, emit:%s on behalf', death);
201+
//immediately report death to the master
202+
emitter.emit('died', pid);
203+
}
204+
205+
}, wait);
206+
207+
worker.kill('SIGINT');
208+
209+
emitter.on('died', function onDeath(dismiss){
210+
211+
if(pid !== dismiss){
212+
return;
213+
}
214+
else{
215+
emitter.removeListener('died', onDeath);
216+
}
217+
218+
logger.debug('[deathQueue] %d died', pid);
219+
220+
clearTimeout(deathGuard);//release the deathGuard
221+
222+
tillDeath.resolve(pid);
223+
224+
if(tillPrevDeath === afterDeath){//last of dyingQueue resolved, clean up the dyingQueue
225+
226+
logger.debug('[deathQueue] death queue cleaned up');
227+
228+
tillPrevDeath = null;
229+
230+
queue.splice(0, queue.length);
231+
}
232+
});
233+
});
234+
};
235+
236+
if(!tillPrevDeath){//1st in the dying queue,
237+
afterDeath = tillPrevDeath = tillDeath.promise;//1 min
238+
die(retry);
239+
}
240+
else{
241+
afterDeath = tillPrevDeath = tillPrevDeath.ensure(_.bind(die, null, retry));
242+
}
243+
}
244+
};
245+
246+
};
247+
248+
exports.deathQueue = exports.deathQueueGenerator({
249+
'timeout': 60000
250+
});

package.json

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,16 @@
33
"contributors": [{
44
"name": "Subbu Allamaraju",
55
"email": "subbu@ebaysf.com"
6+
},
7+
{
8+
"name": "Roy Zhou",
9+
"email": "huzhou@ebay.com"
610
}],
711
"name": "cluster2",
8-
"version": "0.4.17",
12+
"version": "0.4.23",
913
"repository": {
1014
"type": "git",
11-
"url": "https://github.com/ql-io/cluster2"
15+
"url": "https://github.com/cubejs/cluster2"
1216
},
1317
"engines": {
1418
"node": ">= 0.8.0"
@@ -34,5 +38,10 @@
3438
"scripts": {
3539
"test": "nodeunit test"
3640
},
37-
"optionalDependencies": {}
41+
"optionalDependencies": {
42+
43+
},
44+
"publishConfig": {
45+
"registry": "https://registry.npmjs.org"
46+
}
3847
}

0 commit comments

Comments
 (0)