Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit dd01c45

Browse files
theanarkhguangwong
authored andcommittedOct 10, 2022
cluster: send connection to other server when worker drop it
PR-URL: nodejs/node#43747 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Paolo Insogna <paolo@cowtech.it>
1 parent 7e919db commit dd01c45

File tree

2 files changed

+72
-1
lines changed

2 files changed

+72
-1
lines changed
 

‎lib/internal/cluster/child.js

+8-1
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,14 @@ function rr(message, { indexesKey, index }, cb) {
208208
function onconnection(message, handle) {
209209
const key = message.key;
210210
const server = handles.get(key);
211-
const accepted = server !== undefined;
211+
let accepted = server !== undefined;
212+
213+
if (accepted && server[owner_symbol]) {
214+
const self = server[owner_symbol];
215+
if (self.maxConnections && self._connections >= self.maxConnections) {
216+
accepted = false;
217+
}
218+
}
212219

213220
send({ ack: message.seq, accepted });
214221

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
'use strict';
2+
const common = require('../common');
3+
const assert = require('assert');
4+
const net = require('net');
5+
const cluster = require('cluster');
6+
const tmpdir = require('../common/tmpdir');
7+
8+
// The core has bug in handling pipe handle by ipc when platform is win32,
9+
// it can be triggered on win32. I will fix it in another pr.
10+
if (common.isWindows)
11+
common.skip('no setSimultaneousAccepts on pipe handle');
12+
13+
let connectionCount = 0;
14+
let listenCount = 0;
15+
let worker1;
16+
let worker2;
17+
18+
function request(path) {
19+
for (let i = 0; i < 10; i++) {
20+
net.connect(path);
21+
}
22+
}
23+
24+
function handleMessage(message) {
25+
assert.match(message.action, /listen|connection/);
26+
if (message.action === 'listen') {
27+
if (++listenCount === 2) {
28+
request(common.PIPE);
29+
}
30+
} else if (message.action === 'connection') {
31+
if (++connectionCount === 10) {
32+
worker1.send({ action: 'disconnect' });
33+
worker2.send({ action: 'disconnect' });
34+
}
35+
}
36+
}
37+
38+
if (cluster.isPrimary) {
39+
cluster.schedulingPolicy = cluster.SCHED_RR;
40+
tmpdir.refresh();
41+
worker1 = cluster.fork({ maxConnections: 1, pipePath: common.PIPE });
42+
worker2 = cluster.fork({ maxConnections: 9, pipePath: common.PIPE });
43+
worker1.on('message', common.mustCall((message) => {
44+
handleMessage(message);
45+
}, 2));
46+
worker2.on('message', common.mustCall((message) => {
47+
handleMessage(message);
48+
}, 10));
49+
} else {
50+
const server = net.createServer(common.mustCall((socket) => {
51+
process.send({ action: 'connection' });
52+
}, +process.env.maxConnections));
53+
54+
server.listen(process.env.pipePath, common.mustCall(() => {
55+
process.send({ action: 'listen' });
56+
}));
57+
58+
server.maxConnections = +process.env.maxConnections;
59+
60+
process.on('message', common.mustCall((message) => {
61+
assert.strictEqual(message.action, 'disconnect');
62+
process.disconnect();
63+
}));
64+
}

0 commit comments

Comments
 (0)
Please sign in to comment.