Skip to content

Fix polling transports and add tests for closing transports #348

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Nov 18, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 30 additions & 9 deletions lib/socket.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ function Socket (id, server, transport, req) {
this.writeBuffer = [];
this.packetsFn = [];
this.sentCallbackFn = [];
this.cleanupFn = [];
this.request = req;

// Cache IP since it might not be in the req later
Expand Down Expand Up @@ -93,7 +94,6 @@ Socket.prototype.onPacket = function (packet) {
break;

case 'error':
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason of socket closing becomes "transport close" if we call here.
This method will be called on clearTransport() anyway.

this.transport.close();
this.onClose('parse error');
break;

Expand Down Expand Up @@ -129,7 +129,6 @@ Socket.prototype.setPingTimeout = function () {
var self = this;
clearTimeout(self.pingTimeoutTimer);
self.pingTimeoutTimer = setTimeout(function () {
self.transport.close();
self.onClose('ping timeout');
}, self.server.pingInterval + self.server.pingTimeout);
};
Expand All @@ -142,13 +141,25 @@ Socket.prototype.setPingTimeout = function () {
*/

Socket.prototype.setTransport = function (transport) {
var onError = this.onError.bind(this);
var onPacket = this.onPacket.bind(this);
var flush = this.flush.bind(this);
var onClose = this.onClose.bind(this, 'transport close');

this.transport = transport;
this.transport.once('error', this.onError.bind(this));
this.transport.on('packet', this.onPacket.bind(this));
this.transport.on('drain', this.flush.bind(this));
this.transport.once('close', this.onClose.bind(this, 'transport close'));
this.transport.once('error', onError);
this.transport.on('packet', onPacket);
this.transport.on('drain', flush);
this.transport.once('close', onClose);
//this function will manage packet events (also message callbacks)
this.setupSendCallback();

this.cleanupFn.push(function() {
transport.removeListener('error', onError);
transport.removeListener('packet', onPacket);
transport.removeListener('drain', flush);
transport.removeListener('close', onClose);
});
};

/**
Expand Down Expand Up @@ -178,6 +189,7 @@ Socket.prototype.maybeUpgrade = function (transport) {
function onPacket(packet){
if ('ping' == packet.type && 'probe' == packet.data) {
transport.send([{ type: 'pong', data: 'probe', options: { compress: true } }]);
self.emit('upgrading', transport);
clearInterval(self.checkIntervalTimer);
self.checkIntervalTimer = setInterval(check, 100);
} else if ('upgrade' == packet.type && self.readyState != 'closed') {
Expand Down Expand Up @@ -252,6 +264,9 @@ Socket.prototype.maybeUpgrade = function (transport) {
*/

Socket.prototype.clearTransport = function () {
var cleanup;
while (cleanup = this.cleanupFn.shift()) cleanup();

// silence further transport errors and prevent uncaught exceptions
this.transport.on('error', function(){
debug('error triggered by discarded transport');
Expand All @@ -271,6 +286,7 @@ Socket.prototype.clearTransport = function () {

Socket.prototype.onClose = function (reason, description) {
if ('closed' != this.readyState) {
this.readyState = 'closed';
clearTimeout(this.pingTimeoutTimer);
clearInterval(this.checkIntervalTimer);
this.checkIntervalTimer = null;
Expand All @@ -284,7 +300,6 @@ Socket.prototype.onClose = function (reason, description) {
this.packetsFn = [];
this.sentCallbackFn = [];
this.clearTransport();
this.readyState = 'closed';
this.emit('close', reason, description);
}
};
Expand All @@ -297,8 +312,14 @@ Socket.prototype.onClose = function (reason, description) {

Socket.prototype.setupSendCallback = function () {
var self = this;
this.transport.on('drain', onDrain);

this.cleanupFn.push(function() {
self.transport.removeListener('drain', onDrain);
});

//the message was sent successfully, execute the callback
this.transport.on('drain', function() {
function onDrain() {
if (self.sentCallbackFn.length > 0) {
var seqFn = self.sentCallbackFn.splice(0,1)[0];
if ('function' == typeof seqFn) {
Expand All @@ -313,7 +334,7 @@ Socket.prototype.setupSendCallback = function () {
}
}
}
});
}
};

/**
Expand Down
16 changes: 14 additions & 2 deletions lib/transports/polling.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ module.exports = Polling;

function Polling (req) {
Transport.call(this, req);

this.closeTimeout = 30 * 1000;
}

/**
Expand Down Expand Up @@ -364,6 +366,9 @@ Polling.prototype.compress = function (data, encoding, callback) {
Polling.prototype.doClose = function (fn) {
debug('closing');

var self = this;
var closeTimeoutTimer;

if (this.dataReq) {
debug('aborting ongoing data request');
this.dataReq.destroy();
Expand All @@ -372,9 +377,16 @@ Polling.prototype.doClose = function (fn) {
if (this.writable) {
debug('transport writable - closing right away');
this.send([{ type: 'close', options: { compress: true } }]);
fn();
onClose();
} else {
debug('transport not writable - buffering orderly close');
this.shouldClose = fn;
this.shouldClose = onClose;
closeTimeoutTimer = setTimeout(onClose, this.closeTimeout);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the case that client didn't send a poll request anymore.

}

function onClose() {
clearTimeout(closeTimeoutTimer);
fn();
self.onClose();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

onClose would never be called if we didn't manually call it.

}
};
78 changes: 78 additions & 0 deletions test/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -843,6 +843,84 @@ describe('server', function () {
});
});

it('should close transport upon ping timeout (ws)', function (done) {
var opts = { allowUpgrades: false, transports: ['websocket'], pingInterval: 50, pingTimeout: 30 };
var engine = listen(opts, function (port) {
engine.on('connection', function (conn) {
conn.transport.on('close', done);
});
var socket = new eioc.Socket('ws://localhost:%d'.s(port), { transports: ['websocket'] });
// override to simulate an inactive client
socket.sendPacket = socket.onHeartbeat = function (){};
});
});

it('should close transport upon ping timeout (polling)', function (done) {
var opts = { allowUpgrades: false, transports: ['polling'], pingInterval: 50, pingTimeout: 30 };
var engine = listen(opts, function (port) {
engine.on('connection', function (conn) {
conn.transport.on('close', done);
});
var socket = new eioc.Socket('ws://localhost:%d'.s(port), { transports: ['polling'] });
// override to simulate an inactive client
socket.sendPacket = socket.onHeartbeat = function (){};
});
});

it('should close transport upon parse error (ws)', function (done) {
var opts = { allowUpgrades: false, transports: ['websocket'] };
var engine = listen(opts, function (port) {
engine.on('connection', function (conn) {
conn.transport.on('close', done);
});
var socket = new eioc.Socket('ws://localhost:%d'.s(port), { transports: ['websocket'] });
socket.on('open', function () {
socket.transport.ws.send('invalid');
});
});
});

it('should close transport upon parse error (polling)', function (done) {
var opts = { allowUpgrades: false, transports: ['polling'] };
var engine = listen(opts, function (port) {
engine.on('connection', function (conn) {
conn.transport.closeTimeout = 100;
conn.transport.on('close', done);
});
var socket = new eioc.Socket('ws://localhost:%d'.s(port), { transports: ['polling'] });
socket.on('open', function () {
socket.transport.doWrite('invalid', function (){});
});
});
});

it('should close upgrading transport upon socket close', function (done) {
var engine = listen(function (port) {
engine.on('connection', function (conn) {
conn.on('upgrading', function (transport) {
transport.on('close', done);
conn.close();
});
});
new eioc.Socket('ws://localhost:%d'.s(port));
});
});

it('should close upgrading transport upon upgrade timeout', function (done) {
var opts = { upgradeTimeout: 100 };
var engine = listen(opts, function (port) {
engine.on('connection', function (conn) {
conn.on('upgrading', function (transport) {
transport.on('close', done);
});
});
var socket = new eioc.Socket('ws://localhost:%d'.s(port));
socket.on('upgrading', function (transport) {
// override not to complete upgrading
transport.send = function (){};
});
});
});
});

describe('messages', function () {
Expand Down