Skip to content

Commit b79830e

Browse files
committed
Merge pull request #348 from nkzawa/patch-7
Fix polling transports and add tests for closing transports
2 parents 8163660 + 02f9ed8 commit b79830e

File tree

3 files changed

+122
-11
lines changed

3 files changed

+122
-11
lines changed

lib/socket.js

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ function Socket (id, server, transport, req) {
2626
this.writeBuffer = [];
2727
this.packetsFn = [];
2828
this.sentCallbackFn = [];
29+
this.cleanupFn = [];
2930
this.request = req;
3031

3132
// Cache IP since it might not be in the req later
@@ -93,7 +94,6 @@ Socket.prototype.onPacket = function (packet) {
9394
break;
9495

9596
case 'error':
96-
this.transport.close();
9797
this.onClose('parse error');
9898
break;
9999

@@ -129,7 +129,6 @@ Socket.prototype.setPingTimeout = function () {
129129
var self = this;
130130
clearTimeout(self.pingTimeoutTimer);
131131
self.pingTimeoutTimer = setTimeout(function () {
132-
self.transport.close();
133132
self.onClose('ping timeout');
134133
}, self.server.pingInterval + self.server.pingTimeout);
135134
};
@@ -142,13 +141,25 @@ Socket.prototype.setPingTimeout = function () {
142141
*/
143142

144143
Socket.prototype.setTransport = function (transport) {
144+
var onError = this.onError.bind(this);
145+
var onPacket = this.onPacket.bind(this);
146+
var flush = this.flush.bind(this);
147+
var onClose = this.onClose.bind(this, 'transport close');
148+
145149
this.transport = transport;
146-
this.transport.once('error', this.onError.bind(this));
147-
this.transport.on('packet', this.onPacket.bind(this));
148-
this.transport.on('drain', this.flush.bind(this));
149-
this.transport.once('close', this.onClose.bind(this, 'transport close'));
150+
this.transport.once('error', onError);
151+
this.transport.on('packet', onPacket);
152+
this.transport.on('drain', flush);
153+
this.transport.once('close', onClose);
150154
//this function will manage packet events (also message callbacks)
151155
this.setupSendCallback();
156+
157+
this.cleanupFn.push(function() {
158+
transport.removeListener('error', onError);
159+
transport.removeListener('packet', onPacket);
160+
transport.removeListener('drain', flush);
161+
transport.removeListener('close', onClose);
162+
});
152163
};
153164

154165
/**
@@ -178,6 +189,7 @@ Socket.prototype.maybeUpgrade = function (transport) {
178189
function onPacket(packet){
179190
if ('ping' == packet.type && 'probe' == packet.data) {
180191
transport.send([{ type: 'pong', data: 'probe', options: { compress: true } }]);
192+
self.emit('upgrading', transport);
181193
clearInterval(self.checkIntervalTimer);
182194
self.checkIntervalTimer = setInterval(check, 100);
183195
} else if ('upgrade' == packet.type && self.readyState != 'closed') {
@@ -252,6 +264,9 @@ Socket.prototype.maybeUpgrade = function (transport) {
252264
*/
253265

254266
Socket.prototype.clearTransport = function () {
267+
var cleanup;
268+
while (cleanup = this.cleanupFn.shift()) cleanup();
269+
255270
// silence further transport errors and prevent uncaught exceptions
256271
this.transport.on('error', function(){
257272
debug('error triggered by discarded transport');
@@ -271,6 +286,7 @@ Socket.prototype.clearTransport = function () {
271286

272287
Socket.prototype.onClose = function (reason, description) {
273288
if ('closed' != this.readyState) {
289+
this.readyState = 'closed';
274290
clearTimeout(this.pingTimeoutTimer);
275291
clearInterval(this.checkIntervalTimer);
276292
this.checkIntervalTimer = null;
@@ -284,7 +300,6 @@ Socket.prototype.onClose = function (reason, description) {
284300
this.packetsFn = [];
285301
this.sentCallbackFn = [];
286302
this.clearTransport();
287-
this.readyState = 'closed';
288303
this.emit('close', reason, description);
289304
}
290305
};
@@ -297,8 +312,14 @@ Socket.prototype.onClose = function (reason, description) {
297312

298313
Socket.prototype.setupSendCallback = function () {
299314
var self = this;
315+
this.transport.on('drain', onDrain);
316+
317+
this.cleanupFn.push(function() {
318+
self.transport.removeListener('drain', onDrain);
319+
});
320+
300321
//the message was sent successfully, execute the callback
301-
this.transport.on('drain', function() {
322+
function onDrain() {
302323
if (self.sentCallbackFn.length > 0) {
303324
var seqFn = self.sentCallbackFn.splice(0,1)[0];
304325
if ('function' == typeof seqFn) {
@@ -313,7 +334,7 @@ Socket.prototype.setupSendCallback = function () {
313334
}
314335
}
315336
}
316-
});
337+
}
317338
};
318339

319340
/**

lib/transports/polling.js

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ module.exports = Polling;
2828

2929
function Polling (req) {
3030
Transport.call(this, req);
31+
32+
this.closeTimeout = 30 * 1000;
3133
}
3234

3335
/**
@@ -366,6 +368,9 @@ Polling.prototype.compress = function (data, encoding, callback) {
366368
Polling.prototype.doClose = function (fn) {
367369
debug('closing');
368370

371+
var self = this;
372+
var closeTimeoutTimer;
373+
369374
if (this.dataReq) {
370375
debug('aborting ongoing data request');
371376
this.dataReq.destroy();
@@ -374,9 +379,16 @@ Polling.prototype.doClose = function (fn) {
374379
if (this.writable) {
375380
debug('transport writable - closing right away');
376381
this.send([{ type: 'close', options: { compress: true } }]);
377-
fn();
382+
onClose();
378383
} else {
379384
debug('transport not writable - buffering orderly close');
380-
this.shouldClose = fn;
385+
this.shouldClose = onClose;
386+
closeTimeoutTimer = setTimeout(onClose, this.closeTimeout);
387+
}
388+
389+
function onClose() {
390+
clearTimeout(closeTimeoutTimer);
391+
fn();
392+
self.onClose();
381393
}
382394
};

test/server.js

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -843,6 +843,84 @@ describe('server', function () {
843843
});
844844
});
845845

846+
it('should close transport upon ping timeout (ws)', function (done) {
847+
var opts = { allowUpgrades: false, transports: ['websocket'], pingInterval: 50, pingTimeout: 30 };
848+
var engine = listen(opts, function (port) {
849+
engine.on('connection', function (conn) {
850+
conn.transport.on('close', done);
851+
});
852+
var socket = new eioc.Socket('ws://localhost:%d'.s(port), { transports: ['websocket'] });
853+
// override to simulate an inactive client
854+
socket.sendPacket = socket.onHeartbeat = function (){};
855+
});
856+
});
857+
858+
it('should close transport upon ping timeout (polling)', function (done) {
859+
var opts = { allowUpgrades: false, transports: ['polling'], pingInterval: 50, pingTimeout: 30 };
860+
var engine = listen(opts, function (port) {
861+
engine.on('connection', function (conn) {
862+
conn.transport.on('close', done);
863+
});
864+
var socket = new eioc.Socket('ws://localhost:%d'.s(port), { transports: ['polling'] });
865+
// override to simulate an inactive client
866+
socket.sendPacket = socket.onHeartbeat = function (){};
867+
});
868+
});
869+
870+
it('should close transport upon parse error (ws)', function (done) {
871+
var opts = { allowUpgrades: false, transports: ['websocket'] };
872+
var engine = listen(opts, function (port) {
873+
engine.on('connection', function (conn) {
874+
conn.transport.on('close', done);
875+
});
876+
var socket = new eioc.Socket('ws://localhost:%d'.s(port), { transports: ['websocket'] });
877+
socket.on('open', function () {
878+
socket.transport.ws.send('invalid');
879+
});
880+
});
881+
});
882+
883+
it('should close transport upon parse error (polling)', function (done) {
884+
var opts = { allowUpgrades: false, transports: ['polling'] };
885+
var engine = listen(opts, function (port) {
886+
engine.on('connection', function (conn) {
887+
conn.transport.closeTimeout = 100;
888+
conn.transport.on('close', done);
889+
});
890+
var socket = new eioc.Socket('ws://localhost:%d'.s(port), { transports: ['polling'] });
891+
socket.on('open', function () {
892+
socket.transport.doWrite('invalid', function (){});
893+
});
894+
});
895+
});
896+
897+
it('should close upgrading transport upon socket close', function (done) {
898+
var engine = listen(function (port) {
899+
engine.on('connection', function (conn) {
900+
conn.on('upgrading', function (transport) {
901+
transport.on('close', done);
902+
conn.close();
903+
});
904+
});
905+
new eioc.Socket('ws://localhost:%d'.s(port));
906+
});
907+
});
908+
909+
it('should close upgrading transport upon upgrade timeout', function (done) {
910+
var opts = { upgradeTimeout: 100 };
911+
var engine = listen(opts, function (port) {
912+
engine.on('connection', function (conn) {
913+
conn.on('upgrading', function (transport) {
914+
transport.on('close', done);
915+
});
916+
});
917+
var socket = new eioc.Socket('ws://localhost:%d'.s(port));
918+
socket.on('upgrading', function (transport) {
919+
// override not to complete upgrading
920+
transport.send = function (){};
921+
});
922+
});
923+
});
846924
});
847925

848926
describe('messages', function () {

0 commit comments

Comments
 (0)