Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
"bluebird": "3.4.7",
"eslint-loader": "^1.6.1",
"uuid": "3.0.1",
"ws": "^2.0.3"
"ws": "3.0.0"
},
"peerDependencies": {
"bufferutil": "^2.0.1",
Expand All @@ -54,4 +54,4 @@
"engines": {
"node": ">= 6.9.1"
}
}
}
28 changes: 22 additions & 6 deletions src/Kuzzle.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ var
* @param host - Server name or IP Address to the Kuzzle instance
* @param [options] - Connection options
* @param {responseCallback} [cb] - Handles connection response
* @constructor
*/
function Kuzzle (host, options, cb) {
var self = this;
Expand Down Expand Up @@ -80,11 +79,15 @@ function Kuzzle (host, options, cb) {
subscriptions: {
/*
Contains the centralized subscription list in the following format:
pending: <number of pending subscriptions>
pending: {
subscriptionUid_1: kuzzleRoomInstance_1,
subscriptionUid_2: kuzzleRoomInstance_2,
subscriptionUid_...: kuzzleRoomInstance_...
},
'roomId': {
kuzzleRoomID_1: kuzzleRoomInstance_1,
kuzzleRoomID_2: kuzzleRoomInstance_2,
kuzzleRoomID_...: kuzzleRoomInstance_...
subscriptionUid_1: kuzzleRoomInstance_1,
subscriptionUid_2: kuzzleRoomInstance_2,
subscriptionUid_...: kuzzleRoomInstance_...
}

This was made to allow multiple subscriptions on the same set of filters, something that Kuzzle does not permit.
Expand Down Expand Up @@ -360,12 +363,14 @@ Kuzzle.prototype.connect = function () {
});

self.network.onConnectError(function (error) {
var connectionError = new Error('Unable to connect to kuzzle proxy server at "' + self.host + '"');
var connectionError = new Error('Unable to connect to kuzzle proxy server at "' + self.host + ':' + self.port + '"');

connectionError.internal = error;
self.state = 'error';
self.emitEvent('networkError', connectionError);

disableAllSubscriptions.call(self);

if (self.connectCB) {
self.connectCB(connectionError);
}
Expand Down Expand Up @@ -1541,4 +1546,15 @@ function discardRequest(object, cb) {
}
}

function disableAllSubscriptions() {
var self = this;

Object.keys(self.subscriptions).forEach(function (roomId) {
Object.keys(self.subscriptions[roomId]).forEach(function (subscriptionId) {
var subscription = self.subscriptions[roomId][subscriptionId];
subscription.subscribing = false;
});
});
}

module.exports = Kuzzle;
12 changes: 6 additions & 6 deletions src/Room.js
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,13 @@ Room.prototype.count = function (cb) {
*/
Room.prototype.renew = function (filters, notificationCB, cb) {
var
self = this,
now = Date.now(),
subscribeQuery = {
scope: this.scope,
state: this.state,
users: this.users
},
self = this;
scope: self.scope,
state: self.state,
users: self.users
};

if (typeof filters === 'function') {
cb = notificationCB;
Expand Down Expand Up @@ -212,7 +212,7 @@ Room.prototype.renew = function (filters, notificationCB, cb) {
self.kuzzle.subscriptions.pending[self.id] = self;

subscribeQuery.body = self.filters;
subscribeQuery = self.kuzzle.addHeaders(subscribeQuery, this.headers);
subscribeQuery = self.kuzzle.addHeaders(subscribeQuery, self.headers);

self.kuzzle.query(self.collection.buildQueryArgs('realtime', 'subscribe'), subscribeQuery, {volatile: self.volatile}, function (error, response) {
delete self.kuzzle.subscriptions.pending[self.id];
Expand Down
89 changes: 85 additions & 4 deletions src/networkWrapper/wrappers/socketio.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@ function SocketIO(host, port, ssl) {
this.port = port;
this.ssl = ssl;
this.socket = null;
this.wasConnected = false;
this.forceDisconnect = false;
this.handlers = {
connect: [],
reconnect: [],
connectError: [],
disconnect: []
};
this.retrying = false;

/**
* Creates a new socket from the provided arguments
Expand All @@ -12,11 +21,50 @@ function SocketIO(host, port, ssl) {
* @param {int} reconnectionDelay
*/
this.connect = function (autoReconnect, reconnectionDelay) {
var self = this;

this.socket = window.io((this.ssl ? 'https://' : 'http://') + this.host + ':' + this.port, {
reconnection: autoReconnect,
reconnectionDelay: reconnectionDelay,
forceNew: true
});

this.socket.on('connect', function() {
if (self.wasConnected) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to socket.io documentation, connect event is fired "upon a connection including a successful reconnection".
If this is the case, don't we have a double trigger with the on('reconnect',..) one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, this is different cases:

  • reconnection on connect event is used when proxy close connection
  • reconnection on reconnect event is used when socket hang out and come back again via socket io reconnection logic

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I see that you intend to use them differently but this is not what the doc says. connect is fired each time a connection is made, including socket.io reconnections.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so I removed reconnect listener

self.handlers.reconnect.forEach(function(handler) {
handler();
});
}
else {
self.handlers.connect.forEach(function(handler) {
handler();
});
}

self.wasConnected = true;
});

this.socket.on('connect_error', function(error) {
onClientNetworkError(self, autoReconnect, reconnectionDelay, error);
});

this.socket.on('disconnect', function() {
var error;

if (self.forceDisconnect) {
self.handlers.disconnect.forEach(function(handler) {
handler();
});
}
else {
error = new Error('An error occurred, this may due that kuzzle was not ready yet');
error.status = 500;

onClientNetworkError(self, autoReconnect, reconnectionDelay, error);
}

self.forceDisconnect = false;
});
};

/**
Expand All @@ -25,31 +73,39 @@ function SocketIO(host, port, ssl) {
* @param {function} callback
*/
this.onConnect = function (callback) {
this.socket.on('connect', callback);
if (this.handlers.connect.indexOf(callback) === -1) {
this.handlers.connect.push(callback);
}
};

/**
* Fires the provided callback whenever a connection error is received
* @param {function} callback
*/
this.onConnectError = function (callback) {
this.socket.on('connect_error', callback);
if (this.handlers.connectError.indexOf(callback) === -1) {
this.handlers.connectError.push(callback);
}
};

/**
* Fires the provided callback whenever a disconnection occurred
* @param {function} callback
*/
this.onDisconnect = function (callback) {
this.socket.on('disconnect', callback);
if (this.handlers.disconnect.indexOf(callback) === -1) {
this.handlers.disconnect.push(callback);
}
};

/**
* Fires the provided callback whenever a connection has been reestablished
* @param {function} callback
*/
this.onReconnect = function (callback) {
this.socket.on('reconnect', callback);
if (this.handlers.reconnect.indexOf(callback) === -1) {
this.handlers.reconnect.push(callback);
}
};

/**
Expand Down Expand Up @@ -97,9 +153,34 @@ function SocketIO(host, port, ssl) {
* Closes the connection
*/
this.close = function () {
this.forceDisconnect = true;

this.socket.close();
this.socket = null;
};
}

/**
* Called when the connection closes with an error state
*
* @param {SocketIO}
* @param {boolean} autoReconnect
* @param {number} reconnectionDelay
* @param {Error} error
*/
function onClientNetworkError(socketio, autoReconnect, reconnectionDelay, error) {
if (autoReconnect && !socketio.retrying && !socketio.stopRetryingToConnect) {
socketio.retrying = true;
setTimeout(function () {
socketio.retrying = false;
socketio.connect(autoReconnect, reconnectionDelay);
}, reconnectionDelay);
}

socketio.handlers.connectError.forEach(function(handler) {
handler(error);
});
}


module.exports = SocketIO;
47 changes: 34 additions & 13 deletions src/networkWrapper/wrappers/websocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,39 @@ function WSNode(host, port, ssl) {
self.stopRetryingToConnect = false;
};

this.client.onclose = function (code, message) {
if (code === 1000) {
this.client.onclose = function (closeEvent, message) {
var error;
var status;
var reason = message;

if (typeof closeEvent === 'number') {
status = closeEvent;
}
else {
status = closeEvent.code;

if (closeEvent.reason) {
reason = closeEvent.reason;
}
}

if (status === 1000) {
self.emitEvent('disconnect');
}
else {
onClientError.call(self, autoReconnect, reconnectionDelay, message);
error = new Error(reason);
error.status = status;

onClientNetworkError(self, autoReconnect, reconnectionDelay, error);
}
};

this.client.onerror = function (error) {
onClientError.call(self, autoReconnect, reconnectionDelay, error);
if (!(error instanceof Error)) {
error = new Error(error);
}

onClientNetworkError(self, autoReconnect, reconnectionDelay, error);
};

this.client.onmessage = function (payload) {
Expand Down Expand Up @@ -133,22 +155,21 @@ WSNode.prototype.constructor = WSNode;
/**
* Called when the connection closes with an error state
*
* @param {WSNode}
* @param {boolean} autoReconnect
* @param {number} reconnectionDelay
* @param {string|Object} message
* @param {Error} error
*/
function onClientError(autoReconnect, reconnectionDelay, message) {
var self = this;

if (autoReconnect && !self.retrying && !self.stopRetryingToConnect) {
self.retrying = true;
function onClientNetworkError(ws, autoReconnect, reconnectionDelay, error) {
if (autoReconnect && !ws.retrying && !ws.stopRetryingToConnect) {
ws.retrying = true;
setTimeout(function () {
self.retrying = false;
self.connect(autoReconnect, reconnectionDelay);
ws.retrying = false;
ws.connect(autoReconnect, reconnectionDelay);
}, reconnectionDelay);
}

self.emitEvent('networkError', new Error(message));
ws.emitEvent('networkError', error);
}

module.exports = WSNode;
12 changes: 12 additions & 0 deletions test/Room/methods.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,18 @@ describe('Room methods', function () {
should(room.queue).match([{action: 'renew', args: [{}, cb, cb]}]);
});

it('should reset subscribing when network error occurs', function () {
var cb = sinon.stub();
room.renew({}, cb, cb);
should(room.subscribing).be.true();

kuzzle.connect();
kuzzle.network.emit('networkError', new Error('foo'));

should(room.subscribing).be.false();
should(kuzzle.query).be.called();
});

it('should register itself in the global subscription list', function () {
room.renew({}, sinon.stub());
kuzzle.query.yield(null, result);
Expand Down
2 changes: 1 addition & 1 deletion test/kuzzle/connect.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ describe('Kuzzle connect', function () {
it('should call the provided callback on a connection error', function (done) {
var kuzzle = new Kuzzle('nowhere', {connect: 'manual'}, function (err, res) {
should(err).be.instanceOf(Error);
should(err.message).be.exactly('Unable to connect to kuzzle proxy server at "nowhere"');
should(err.message).be.exactly('Unable to connect to kuzzle proxy server at "nowhere:7512"');
should(err.internal.message).be.exactly('Mock Error');
should(res).be.undefined();
should(kuzzle.state).be.exactly('error');
Expand Down
Loading