Skip to content

Commit ccfe892

Browse files
committed
Merge remote-tracking branch 'origin/6.x' into #102-auth-ctrl
# Conflicts: # src/Kuzzle.js # src/networkWrapper/protocols/abstract/realtime.js # test/network/offlineQueue.test.js # test/network/query.test.js
2 parents a715daf + 95e692b commit ccfe892

File tree

19 files changed

+1606
-9432
lines changed

19 files changed

+1606
-9432
lines changed

package-lock.json

Lines changed: 0 additions & 8755 deletions
This file was deleted.

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
"license": "Apache-2.0",
2929
"dependencies": {
3030
"bluebird": "3.5.1",
31+
"mock-require": "^3.0.1",
3132
"uuid": "3.2.1",
3233
"uws": "^9.14.0"
3334
},

src/Collection.js

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -227,11 +227,9 @@ Collection.prototype.deleteDocument = function (arg, options, cb) {
227227

228228
this.kuzzle.query(this.buildQueryArgs('document', action), data, options, cb && function (err, res) {
229229
if (err) {
230-
cb(err);
231-
}
232-
else {
233-
cb(null, (action === 'delete' ? [res.result._id] : res.result.ids));
230+
return cb(err);
234231
}
232+
cb(null, (action === 'delete' ? [res.result._id] : res.result.ids));
235233
});
236234

237235
return this;

src/Kuzzle.js

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,7 @@ class Kuzzle extends KuzzleEventEmitter {
345345
* @param {function} [cb] Connection callback
346346
*/
347347
connect (cb) {
348-
if (this.network.state !== 'offline') {
348+
if (this.network.isReady()) {
349349
if (cb) {
350350
cb(null, this);
351351
}
@@ -374,7 +374,10 @@ class Kuzzle extends KuzzleEventEmitter {
374374
});
375375

376376
this.network.addListener('disconnect', () => {
377-
this.disconnect();
377+
for (const collection of Object.keys(this.collections)) {
378+
delete this.collections[collection];
379+
}
380+
378381
this.emit('disconnected');
379382
});
380383

src/networkWrapper/index.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
function network(protocol, host, options) {
1010
switch (protocol) {
11+
case 'http':
12+
return new (require('./protocols/http'))(host, options);
1113
case 'websocket':
1214
if (typeof window !== 'undefined' && typeof WebSocket === 'undefined') {
1315
throw new Error('Aborting: no websocket support detected.');
Lines changed: 303 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,303 @@
1+
'use strict';
2+
3+
const
4+
KuzzleEventEmitter = require('../../../eventEmitter');
5+
6+
class AbtractWrapper extends KuzzleEventEmitter {
7+
8+
constructor (host, options) {
9+
super();
10+
11+
Object.defineProperties(this, {
12+
host: {
13+
value: host,
14+
enumerable: true
15+
},
16+
port: {
17+
value: (options && typeof options.port === 'number') ? options.port : 7512,
18+
enumerable: true
19+
},
20+
ssl: {
21+
value: (options && typeof options.sslConnection === 'boolean') ? options.sslConnection : false,
22+
enumerable: true
23+
},
24+
queuing: {
25+
value: false,
26+
writable: true
27+
},
28+
// configuration properties
29+
autoQueue: {
30+
value: false,
31+
enumerable: true,
32+
writable: true
33+
},
34+
autoReplay: {
35+
value: false,
36+
enumerable: true,
37+
writable: true
38+
},
39+
state: {
40+
value: 'offline',
41+
enumerable: true,
42+
writable: true
43+
},
44+
/*
45+
Offline queue use the following format:
46+
[
47+
{
48+
ts: <query timestamp>,
49+
query: 'query',
50+
cb: callbackFunction
51+
}
52+
]
53+
*/
54+
offlineQueue: {
55+
value: [],
56+
enumerable: true,
57+
writable: true
58+
},
59+
queueFilter: {
60+
value: null,
61+
enumerable: true,
62+
writable: true
63+
},
64+
queueMaxSize: {
65+
value: 500,
66+
enumerable: true,
67+
writable: true
68+
},
69+
queueTTL: {
70+
value: 120000,
71+
enumerable: true,
72+
writable: true
73+
},
74+
replayInterval: {
75+
value: 10,
76+
enumerable: true,
77+
writable: true
78+
},
79+
offlineQueueLoader: {
80+
value: null,
81+
enumerable: true,
82+
writable: true
83+
}
84+
});
85+
86+
if (options) {
87+
Object.keys(options).forEach(opt => {
88+
if (this.hasOwnProperty(opt) && Object.getOwnPropertyDescriptor(this, opt).writable) {
89+
this[opt] = options[opt];
90+
}
91+
});
92+
}
93+
}
94+
95+
/* @Abstract */
96+
connect() {
97+
throw new Error('Method "connect" is not implemented');
98+
}
99+
100+
/**
101+
* Called when the client's connection is established
102+
*/
103+
clientConnected(state, wasConnected) {
104+
this.state = state || 'ready';
105+
this.emit(wasConnected && 'reconnect' || 'connect');
106+
107+
if (this.autoQueue) {
108+
this.stopQueuing();
109+
}
110+
111+
if (this.autoReplay) {
112+
this.playQueue();
113+
}
114+
}
115+
116+
/**
117+
* Called when the client's connection is closed
118+
*/
119+
disconnect() {
120+
this.state = 'offline';
121+
if (this.autoQueue) {
122+
this.startQueuing();
123+
}
124+
}
125+
126+
/**
127+
* Empties the offline queue without replaying it.
128+
*/
129+
flushQueue() {
130+
this.offlineQueue = [];
131+
}
132+
133+
/**
134+
* Replays the requests queued during offline mode.
135+
*/
136+
playQueue() {
137+
if (this.isReady()) {
138+
cleanQueue(this);
139+
dequeue(this);
140+
}
141+
}
142+
143+
/**
144+
* Starts the requests queuing. Works only during offline mode, and if the autoQueue option is set to false.
145+
*/
146+
startQueuing() {
147+
this.queuing = true;
148+
}
149+
150+
/**
151+
* Stops the requests queuing. Works only during offline mode, and if the autoQueue option is set to false.
152+
*/
153+
stopQueuing() {
154+
this.queuing = false;
155+
}
156+
157+
subscribe(object, options, notificationCB, cb) { // eslint-disable-line
158+
throw new Error('Not Implemented');
159+
}
160+
161+
unsubscribe(object, options, channel, cb) { // eslint-disable-line
162+
throw new Error('Not Implemented');
163+
}
164+
165+
query(object, options, cb) {
166+
let queuable = options && (options.queuable !== false) || true;
167+
168+
if (this.queueFilter) {
169+
queuable = queuable && this.queueFilter(object);
170+
}
171+
172+
if (this.queuing && queuable) {
173+
cleanQueue(this, object, cb);
174+
this.emit('offlineQueuePush', {query: object, cb: cb});
175+
return this.offlineQueue.push({ts: Date.now(), query: object, cb: cb});
176+
}
177+
178+
if (this.isReady()) {
179+
return emitRequest(this, object, cb);
180+
}
181+
182+
return discardRequest(object, cb);
183+
}
184+
185+
isReady() {
186+
return this.state === 'ready';
187+
}
188+
}
189+
190+
/**
191+
* Emit a request to Kuzzle
192+
*
193+
* @param {AbstractWrapper} network
194+
* @param {object} request
195+
* @param {responseCallback} [cb]
196+
*/
197+
function emitRequest (network, request, cb) {
198+
if (request.jwt !== undefined || cb) {
199+
network.once(request.requestId, response => {
200+
let error = null;
201+
202+
if (request.action !== 'logout' && response.error && response.error.message === 'Token expired') {
203+
network.emit('tokenExpired', request, cb);
204+
}
205+
206+
if (response.error) {
207+
error = new Error(response.error.message);
208+
Object.assign(error, response.error);
209+
error.status = response.status;
210+
network.emit('queryError', error, request, cb);
211+
}
212+
213+
if (cb) {
214+
cb(error, response);
215+
}
216+
});
217+
}
218+
// Track requests made to allow Room.subscribeToSelf to work
219+
network.send(request);
220+
}
221+
222+
function discardRequest(object, cb) {
223+
if (cb) {
224+
cb(new Error('Unable to execute request: not connected to a Kuzzle server.\nDiscarded request: ' + JSON.stringify(object)));
225+
}
226+
}
227+
228+
/**
229+
* Clean up the queue, ensuring the queryTTL and queryMaxSize properties are respected
230+
* @param {AbstractWrapper} network
231+
*/
232+
function cleanQueue (network) {
233+
const now = Date.now();
234+
let lastDocumentIndex = -1;
235+
236+
if (network.queueTTL > 0) {
237+
network.offlineQueue.forEach((query, index) => {
238+
if (query.ts < now - network.queueTTL) {
239+
lastDocumentIndex = index;
240+
}
241+
});
242+
243+
if (lastDocumentIndex !== -1) {
244+
network.offlineQueue
245+
.splice(0, lastDocumentIndex + 1)
246+
.forEach(droppedRequest => {
247+
network.emit('offlineQueuePop', droppedRequest.query);
248+
});
249+
}
250+
}
251+
252+
if (network.queueMaxSize > 0 && network.offlineQueue.length > network.queueMaxSize) {
253+
network.offlineQueue
254+
.splice(0, network.offlineQueue.length - network.queueMaxSize)
255+
.forEach(droppedRequest => {
256+
network.emit('offlineQueuePop', droppedRequest.query);
257+
});
258+
}
259+
}
260+
261+
/**
262+
* Play all queued requests, in order.
263+
*/
264+
function dequeue (network) {
265+
const
266+
uniqueQueue = {},
267+
dequeuingProcess = () => {
268+
if (network.offlineQueue.length > 0) {
269+
emitRequest(network, network.offlineQueue[0].query, network.offlineQueue[0].cb);
270+
network.emit('offlineQueuePop', network.offlineQueue.shift());
271+
272+
setTimeout(() => {
273+
dequeuingProcess();
274+
}, Math.max(0, network.replayInterval));
275+
}
276+
};
277+
278+
if (network.offlineQueueLoader) {
279+
if (typeof network.offlineQueueLoader !== 'function') {
280+
throw new Error('Invalid value for offlineQueueLoader property. Expected: function. Got: ' + typeof network.offlineQueueLoader);
281+
}
282+
283+
const additionalQueue = network.offlineQueueLoader();
284+
if (Array.isArray(additionalQueue)) {
285+
network.offlineQueue = additionalQueue
286+
.concat(network.offlineQueue)
287+
.filter(request => {
288+
// throws if the query object does not contain required attributes
289+
if (!request.query || request.query.requestId === undefined || !request.query.action || !request.query.controller) {
290+
throw new Error('Invalid offline queue request. One or more missing properties: requestId, action, controller.');
291+
}
292+
293+
return uniqueQueue.hasOwnProperty(request.query.requestId) ? false : (uniqueQueue[request.query.requestId] = true);
294+
});
295+
} else {
296+
throw new Error('Invalid value returned by the offlineQueueLoader function. Expected: array. Got: ' + typeof additionalQueue);
297+
}
298+
}
299+
300+
dequeuingProcess();
301+
}
302+
303+
module.exports = AbtractWrapper;

0 commit comments

Comments
 (0)