Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions src/Kuzzle.js
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ class Kuzzle extends KuzzleEventEmitter {
set jwt (encodedJwt) {
this.auth.authenticationToken = encodedJwt;
}

get connected () {
return this.protocol.connected;
}
Expand Down Expand Up @@ -368,9 +368,13 @@ class Kuzzle extends KuzzleEventEmitter {

if (!request.volatile) {
request.volatile = this.volatile;
} else if (typeof request.volatile !== 'object' || Array.isArray(request.volatile)) {
} else if (
typeof request.volatile !== 'object'
|| Array.isArray(request.volatile)
) {
throw new Error(`Kuzzle.query: Invalid volatile argument received: ${JSON.stringify(request.volatile)}`);
}

for (const item of Object.keys(this.volatile)) {
if (request.volatile[item] === undefined) {
request.volatile[item] = this.volatile[item];
Expand Down Expand Up @@ -460,15 +464,15 @@ Discarded request: ${JSON.stringify(request)}`));
}

const controller = new ControllerClass(this);

if (!(controller.name && controller.name.length > 0)) {
throw new Error('Controllers must have a name.');
}

if (controller.kuzzle !== this) {
throw new Error('You must pass the Kuzzle SDK instance to the parent constructor.');
}

if (this.__proxy__) {
this.__proxy__.registerProp(accessor);
}
Expand Down
44 changes: 23 additions & 21 deletions src/protocols/abstract/common.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
const
KuzzleError = require('../../KuzzleError'),
uuidv4 = require('../../uuidv4'),
KuzzleEventEmitter = require('../../eventEmitter');
KuzzleEventEmitter = require('../../eventEmitter'),
PendingRequest = require('./pendingRequest');

class AbstractWrapper extends KuzzleEventEmitter {

constructor (host, options = {}) {
super();

Expand Down Expand Up @@ -66,8 +66,6 @@ class AbstractWrapper extends KuzzleEventEmitter {
* Called when the client's connection is established
*/
clientConnected (state, wasConnected) {
this.on('disconnected', () => this.clear());

this.state = state || 'ready';
this.emit(wasConnected && 'reconnect' || 'connect');
}
Expand All @@ -87,29 +85,30 @@ class AbstractWrapper extends KuzzleEventEmitter {
Discarded request: ${JSON.stringify(request)}`));
}

this._pendingRequests.set(request.requestId, request);
const pending = new PendingRequest(request);
this._pendingRequests.set(request.requestId, pending);

return new Promise((resolve, reject) => {
this.once(request.requestId, response => {
this._pendingRequests.delete(request.requestId);
this.once(request.requestId, response => {
this._pendingRequests.delete(request.requestId);

if (response.error) {
const error = new KuzzleError(response.error);
if (response.error) {
const error = new KuzzleError(response.error);

this.emit('queryError', error, request);
this.emit('queryError', error, request);

if (request.action !== 'logout' && error.message === 'Token expired') {
this.emit('tokenExpired');
}

return reject(error);
if (request.action !== 'logout' && error.message === 'Token expired') {
this.emit('tokenExpired');
}

return resolve(response);
});
return pending.reject(error);
}

this.send(request);
pending.resolve(response);
});

this.send(request);

return pending.promise;
}

isReady () {
Expand All @@ -121,8 +120,11 @@ Discarded request: ${JSON.stringify(request)}`));
* Emits an event for each discarded pending request.
*/
clear () {
for (const request of this._pendingRequests.values()) {
this.emit('discarded', request);
const rejectedError = new Error('Network error: request was sent but no response has been received');
for (const pending of this._pendingRequests.values()) {
pending.reject(rejectedError);
this.removeAllListeners(pending.request.requestId);
this.emit('discarded', pending.request);
}

this._pendingRequests.clear();
Expand Down
23 changes: 23 additions & 0 deletions src/protocols/abstract/pendingRequest.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
'use strict';

class PendingRequest {
constructor(request) {
this._resolve = null;
this._reject = null;
this.request = request;
this.promise = new Promise((resolve, reject) => {
this._resolve = resolve;
this._reject = reject;
});
}

resolve(...payload) {
this._resolve(...payload);
}

reject(error) {
this._reject(error);
}
}

module.exports = PendingRequest;
2 changes: 2 additions & 0 deletions src/protocols/abstract/realtime.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class RTWrapper extends KuzzleAbstractProtocol {
* Called when the client's connection is closed
*/
clientDisconnected() {
this.clear();
this.emit('disconnect');
}

Expand All @@ -53,6 +54,7 @@ class RTWrapper extends KuzzleAbstractProtocol {
*/
clientNetworkError(error) {
this.state = 'offline';
this.clear();

const connectionError = new Error(`Unable to connect to kuzzle server at ${this.host}:${this.port}`);
connectionError.internal = error;
Expand Down
33 changes: 27 additions & 6 deletions test/protocol/common.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ const
should = require('should'),
sinon = require('sinon'),
KuzzleError = require('../../src/KuzzleError'),
AbstractWrapper = require('../../src/protocols/abstract/common');
AbstractWrapper = require('../../src/protocols/abstract/common'),
PendingRequest = require('../../src/protocols/abstract/pendingRequest');

describe('Common Protocol', () => {
let
Expand Down Expand Up @@ -52,7 +53,9 @@ describe('Common Protocol', () => {

protocol.query(request);

should(protocol.pendingRequests.get('bar')).be.eql(request);
const pending = protocol.pendingRequests.get('bar');

pending.should.be.an.instanceOf(PendingRequest).and.match({request});
});

it('should fire a "queryError" event and reject if an error occurred', () => {
Expand Down Expand Up @@ -103,7 +106,7 @@ describe('Common Protocol', () => {
should(res.error).be.null();
should(res.result).be.exactly(response.result);
should(res.status).be.exactly(42);
should(protocol.pendingRequests.get('bar')).be.undefined();
should(protocol.pendingRequests.has('bar')).be.false();
});
});

Expand Down Expand Up @@ -155,13 +158,23 @@ describe('Common Protocol', () => {
});

describe('#clear', () => {
it('should send "discarded" event for each pending request', () => {
it('should discard and reject cleared requests', () => {
const
request1 = { requestId: '12345', body: 'foobar' },
request2 = { requestId: '54321', body: 'barfoo' };

protocol._pendingRequests.set(request1, request1);
protocol._pendingRequests.set(request2, request2);
protocol.state = 'ready';
protocol.send = () => {};

protocol.query(request1);
protocol.query(request2);

protocol.listenerCount(request1.requestId).should.be.eql(1);
protocol.listenerCount(request2.requestId).should.be.eql(1);

const
pending1 = protocol._pendingRequests.get(request1.requestId),
pending2 = protocol._pendingRequests.get(request2.requestId);

const listener = sinon.stub();
protocol.on('discarded', listener);
Expand All @@ -171,6 +184,14 @@ describe('Common Protocol', () => {
should(listener).be.calledTwice();
should(listener.getCall(0).args).be.eql([request1]);
should(listener.getCall(1).args).be.eql([request2]);

should(protocol._pendingRequests).be.empty();

protocol.listenerCount(request1.requestId).should.be.eql(0);
protocol.listenerCount(request2.requestId).should.be.eql(0);

return pending1.promise.should.be.rejectedWith({message: 'Network error: request was sent but no response has been received'})
.then(() => pending2.promise.should.be.rejectedWith({message: 'Network error: request was sent but no response has been received'}));
});
});

Expand Down
6 changes: 6 additions & 0 deletions test/protocol/websocket.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ describe('WebSocket networking module', () => {
it('should call listeners on a "disconnect" event', () => {
const cb = sinon.stub();

sinon.spy(websocket, 'clear');
websocket.retrying = false;
websocket.addListener('disconnect', cb);
should(websocket.listeners('disconnect').length).be.eql(1);
Expand All @@ -169,11 +170,13 @@ describe('WebSocket networking module', () => {
clock.tick(10);
should(cb).be.calledOnce();
should(websocket.listeners('disconnect').length).be.eql(1);
websocket.clear.should.be.calledOnce();
});

it('should call error listeners on a "disconnect" event with an abnormal websocket code', () => {
const cb = sinon.stub();

sinon.spy(websocket, 'clear');
websocket.retrying = false;
websocket.addListener('networkError', cb);
should(websocket.listeners('networkError').length).be.eql(1);
Expand All @@ -188,8 +191,10 @@ describe('WebSocket networking module', () => {
should(cb.firstCall.args[0].internal.status).be.equal(4666);
should(cb.firstCall.args[0].internal.message).be.equal('foobar');
should(websocket.listeners('networkError').length).be.eql(1);
websocket.clear.should.be.calledOnce();

cb.reset();
websocket.clear.resetHistory();
websocket.connect();
clientStub.onclose({code: 4666, reason: 'foobar'});

Expand All @@ -199,6 +204,7 @@ describe('WebSocket networking module', () => {
should(cb.firstCall.args[0].internal.status).be.equal(4666);
should(cb.firstCall.args[0].internal.message).be.equal('foobar');
should(websocket.listeners('networkError').length).be.eql(1);
websocket.clear.should.be.calledOnce();
});

it('should be able to register ephemeral callbacks on an event', () => {
Expand Down