Skip to content

Commit 69e5137

Browse files
committed
Refactor Session and Transaction maintains a promise
Instead of keeping a connection `Session` and `Transaction` now keep a promise to a connection instead of a resolved connection.
1 parent 822a438 commit 69e5137

File tree

8 files changed

+139
-74
lines changed

8 files changed

+139
-74
lines changed

src/v1/driver.js

Lines changed: 78 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ class Driver {
116116
}
117117

118118
_createSession(conn) {
119-
return new Session(conn, (cb) => {
119+
return new Session(new Promise((resolve, reject) => resolve(conn)), (cb) => {
120120
// This gets called on Session#close(), and is where we return
121121
// the pooled 'connection' instance.
122122

@@ -204,6 +204,47 @@ class RoundRobinArray {
204204
}
205205

206206
let GET_SERVERS = "CALL dbms.cluster.routing.getServers";
207+
208+
class ClusterView {
209+
constructor(expires, routers, readers, writers) {
210+
this.expires = expires;
211+
this.routers = routers;
212+
this.readers = readers;
213+
this.routers = writers;
214+
}
215+
}
216+
217+
function newClusterView(session) {
218+
return session.run(GET_SERVERS)
219+
.then((res) => {
220+
session.close();
221+
let record = res.records[0];
222+
//Note we are loosing precision here but we are not
223+
//terribly worried since it is only
224+
//for dates more than 140000 years into the future.
225+
let expires = record.get('ttl').toNumber();
226+
let servers = record.get('servers');
227+
let routers = new RoundRobinArray();
228+
let readers = new RoundRobinArray();
229+
let writers = new RoundRobinArray();
230+
for (let i = 0; i <= servers.length; i++) {
231+
let server = servers[i];
232+
233+
let role = server['role'];
234+
let addresses = server['addresses'];
235+
if (role === 'ROUTE') {
236+
routers.pushAll(addresses);
237+
} else if (role === 'WRITE') {
238+
writers.pushAll(addresses);
239+
} else if (role === 'READ') {
240+
readers.pushAll(addresses);
241+
}
242+
}
243+
244+
return new ClusterView(expires, routers, readers, writers);
245+
});
246+
}
247+
207248
class RoutingDriver extends Driver {
208249

209250
constructor(url, userAgent = 'neo4j-javascript/0.0', token = {}, config = {}) {
@@ -213,15 +254,25 @@ class RoutingDriver extends Driver {
213254
this._readers = new RoundRobinArray();
214255
this._writers = new RoundRobinArray();
215256
this._expires = Date.now();
216-
this._checkServers();
217257
}
218258

219-
_checkServers() {
259+
//TODO make nice, expose constants?
260+
session(mode) {
261+
//Check so that we have servers available
262+
this._checkServers().then( () => {
263+
let conn = this._acquireConnection(mode);
264+
return this._createSession(conn);
265+
});
266+
}
267+
268+
async _checkServers() {
220269
if (this._expires < Date.now() ||
221270
this._routers.empty() ||
222271
this._readers.empty() ||
223272
this._writers.empty()) {
224-
this._callServers();
273+
return await this._callServers();
274+
} else {
275+
return new Promise((resolve, reject) => resolve(false));
225276
}
226277
}
227278

@@ -239,16 +290,19 @@ class RoutingDriver extends Driver {
239290
let url = this._routers.hop();
240291
try {
241292
let res = await this._call(url);
293+
console.log("got result");
242294
if (res.records.length != 1) continue;
243295
let record = res.records[0];
244296
//Note we are loosing precision here but we are not
245297
//terribly worried since it is only
246298
//for dates more than 140000 years into the future.
247299
this._expires += record.get('ttl').toNumber();
248300
let servers = record.get('servers');
301+
console.log(servers);
249302
for (let i = 0; i <= servers.length; i++) {
250303
let server = servers[i];
251-
seen.delete(server);
304+
seen.remove(server);
305+
252306
let role = server['role'];
253307
let addresses = server['addresses'];
254308
if (role === 'ROUTE') {
@@ -266,47 +320,42 @@ class RoutingDriver extends Driver {
266320
//these are no longer valid according to server
267321
let self = this;
268322
seen.forEach((key) => {
269-
self._pool.purge(key);
323+
console.log("remove seen");
324+
self._pools.purge(key);
270325
});
271326
success = true;
272-
return;
327+
return new Promise((resolve, reject) => resolve(true));
273328
} catch (error) {
274329
//continue
275-
this._forget(url);
276330
console.log(error);
331+
this._forget(url);
277332
}
278333
}
279334

335+
let errorMsg = "Server could not perform discovery, please open a new driver with a different seed address.";
280336
if (this.onError) {
281-
this.onError("Server could not perform discovery, please open a new driver with a different seed address.");
337+
this.onError(errorMsg);
282338
}
283-
this.close();
284-
}
285339

286-
//TODO make nice, expose constants?
287-
session(mode) {
288-
let conn = this._aquireConnection(mode);
289-
return this._createSession(conn);
340+
return new Promise((resolve, reject) => reject(errorMsg));
290341
}
291342

292-
_aquireConnection(mode) {
343+
_acquireConnection(mode) {
293344
//make sure we have enough servers
294-
this._checkServers();
295-
296345
let m = mode || WRITE;
297346
if (m === READ) {
298-
return this._pools.acquire(this._readers.hop());
347+
return this._pool.acquire(this._readers.hop());
299348
} else if (m === WRITE) {
300-
return this._pools.acquire(this._writers.hop());
349+
return this._pool.acquire(this._writers.hop());
301350
} else {
302351
//TODO fail
303352
}
304353
}
305354

306355
_allServers() {
307356
let seen = new Set(this._routers.toArray());
308-
let writers = this._writers.toArray()
309-
let readers = this._readers.toArray()
357+
let writers = this._writers.toArray();
358+
let readers = this._readers.toArray();
310359
for (let i = 0; i < writers.length; i++) {
311360
seen.add(writers[i]);
312361
}
@@ -319,18 +368,19 @@ class RoutingDriver extends Driver {
319368
async _call(url) {
320369
let conn = this._pool.acquire(url);
321370
let session = this._createSession(conn);
322-
console.log("calling " + GET_SERVERS);
323371
return session.run(GET_SERVERS)
324372
.then((res) => {
325373
session.close();
326374
return res;
327375
}).catch((err) => {
376+
console.log(err);
328377
this._forget(url);
329378
return Promise.reject(err);
330379
});
331380
}
332381

333382
_forget(url) {
383+
console.log("forget");
334384
this._pools.purge(url);
335385
this._routers.remove(url);
336386
this._readers.remove(url);
@@ -426,9 +476,12 @@ let USER_AGENT = "neo4j-javascript/" + VERSION;
426476
function driver(url, authToken, config = {}) {
427477
let sch = scheme(url);
428478
if (sch === "bolt+routing://") {
429-
return new RoutingDriver(url, USER_AGENT, authToken, config);
430-
} else {
479+
return new RoutingDriver(url, USER_AGENT, authToken, config);
480+
} else if (sch === "bolt://") {
431481
return new Driver(url, USER_AGENT, authToken, config);
482+
} else {
483+
throw new Error("Unknown scheme: " + sch);
484+
432485
}
433486
}
434487

src/v1/session.js

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,19 +33,15 @@ import {newError} from "./error";
3333
class Session {
3434
/**
3535
* @constructor
36-
* @param {Connection} conn - A connection to use
36+
* @param {Promise} connectionPromise - A connection to use
3737
* @param {function()} onClose - Function to be called on connection close
3838
*/
39-
constructor( conn, onClose ) {
40-
this._conn = conn;
39+
constructor( connectionPromise, onClose ) {
40+
this._connectionPromise = connectionPromise;
4141
this._onClose = onClose;
4242
this._hasTx = false;
4343
}
4444

45-
isEncrypted() {
46-
return this._conn.isEncrypted();
47-
}
48-
4945
/**
5046
* Run Cypher statement
5147
* Could be called with a statement object i.e.: {statement: "MATCH ...", parameters: {param: 1}}
@@ -61,9 +57,11 @@ class Session {
6157
}
6258
let streamObserver = new _RunObserver();
6359
if (!this._hasTx) {
64-
this._conn.run(statement, parameters, streamObserver);
65-
this._conn.pullAll(streamObserver);
66-
this._conn.sync();
60+
this._connectionPromise.then((conn) => {
61+
conn.run(statement, parameters, streamObserver);
62+
conn.pullAll(streamObserver);
63+
conn.sync();
64+
}).catch((err) => streamObserver.onError(err));
6765
} else {
6866
streamObserver.onError(newError("Statements cannot be run directly on a "
6967
+ "session with an open transaction; either run from within the "
@@ -82,13 +80,13 @@ class Session {
8280
*/
8381
beginTransaction() {
8482
if (this._hasTx) {
85-
throw new newError("You cannot begin a transaction on a session with an "
83+
throw newError("You cannot begin a transaction on a session with an "
8684
+ "open transaction; either run from within the transaction or use a "
8785
+ "different session.")
8886
}
8987

9088
this._hasTx = true;
91-
return new Transaction(this._conn, () => {this._hasTx = false});
89+
return new Transaction(this._connectionPromise, () => {this._hasTx = false});
9290
}
9391

9492
/**

src/v1/transaction.js

Lines changed: 29 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,17 @@ import Result from './result';
2727
class Transaction {
2828
/**
2929
* @constructor
30-
* @param {Connection} conn - A connection to use
30+
* @param {Promise} connectionPromise - A connection to use
3131
* @param {function()} onClose - Function to be called when transaction is committed or rolled back.
3232
*/
33-
constructor(conn, onClose) {
34-
this._conn = conn;
33+
constructor(connectionPromise, onClose) {
34+
this._connectionPromise = connectionPromise;
3535
let streamObserver = new _TransactionStreamObserver(this);
36-
this._conn.run("BEGIN", {}, streamObserver);
37-
this._conn.discardAll(streamObserver);
36+
this._connectionPromise.then((conn) => {
37+
conn.run("BEGIN", {}, streamObserver);
38+
conn.discardAll(streamObserver);
39+
}).catch(streamObserver.onError);
40+
3841
this._state = _states.ACTIVE;
3942
this._onClose = onClose;
4043
}
@@ -52,7 +55,7 @@ class Transaction {
5255
parameters = statement.parameters || {};
5356
statement = statement.text;
5457
}
55-
return this._state.run(this._conn, new _TransactionStreamObserver(this), statement, parameters);
58+
return this._state.run(this._connectionPromise, new _TransactionStreamObserver(this), statement, parameters);
5659
}
5760

5861
/**
@@ -63,7 +66,7 @@ class Transaction {
6366
* @returns {Result} - New Result
6467
*/
6568
commit() {
66-
let committed = this._state.commit(this._conn, new _TransactionStreamObserver(this));
69+
let committed = this._state.commit(this._connectionPromise, new _TransactionStreamObserver(this));
6770
this._state = committed.state;
6871
//clean up
6972
this._onClose();
@@ -79,7 +82,7 @@ class Transaction {
7982
* @returns {Result} - New Result
8083
*/
8184
rollback() {
82-
let committed = this._state.rollback(this._conn, new _TransactionStreamObserver(this));
85+
let committed = this._state.rollback(this._connectionPromise, new _TransactionStreamObserver(this));
8386
this._state = committed.state;
8487
//clean up
8588
this._onClose();
@@ -114,17 +117,20 @@ class _TransactionStreamObserver extends StreamObserver {
114117
let _states = {
115118
//The transaction is running with no explicit success or failure marked
116119
ACTIVE: {
117-
commit: (conn, observer) => {
118-
return {result: _runDiscardAll("COMMIT", conn, observer),
120+
commit: (connectionPromise, observer) => {
121+
return {result: _runDiscardAll("COMMIT", connectionPromise, observer),
119122
state: _states.SUCCEEDED}
120123
},
121-
rollback: (conn, observer) => {
122-
return {result: _runDiscardAll("ROLLBACK", conn, observer), state: _states.ROLLED_BACK};
124+
rollback: (connectionPromise, observer) => {
125+
return {result: _runDiscardAll("ROLLBACK", connectionPromise, observer), state: _states.ROLLED_BACK};
123126
},
124-
run: (conn, observer, statement, parameters) => {
125-
conn.run( statement, parameters || {}, observer );
126-
conn.pullAll( observer );
127-
conn.sync();
127+
run: (connectionPromise, observer, statement, parameters) => {
128+
connectionPromise.then((conn) => {
129+
conn.run( statement, parameters || {}, observer );
130+
conn.pullAll( observer );
131+
conn.sync();
132+
}).catch(observer.onError);
133+
128134
return new Result( observer, statement, parameters );
129135
}
130136
},
@@ -196,10 +202,13 @@ let _states = {
196202
}
197203
};
198204

199-
function _runDiscardAll(msg, conn, observer) {
200-
conn.run(msg, {}, observer );
201-
conn.discardAll(observer);
202-
conn.sync();
205+
function _runDiscardAll(msg, connectionPromise, observer) {
206+
connectionPromise.then((conn) => {
207+
conn.run(msg, {}, observer);
208+
conn.discardAll(observer);
209+
conn.sync();
210+
}).catch(observer.onError);
211+
203212
return new Result(observer, msg, {});
204213
}
205214

test/v1/boltkit.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,15 +49,15 @@ BoltKit.prototype.start = function(script, port) {
4949

5050
var Server = function(){};
5151
//give process some time to exit
52-
Server.prototype.exit = function(callback) {setTimeout(function(){callback(code);}, 500)};
52+
Server.prototype.exit = function(callback) {setTimeout(function(){callback(code);}, 1000)};
5353

5454
return new Server();
5555
};
5656

5757
//Make sure boltstub is started before running
5858
//user code
5959
BoltKit.prototype.run = function(callback) {
60-
setTimeout(callback, 500);
60+
setTimeout(callback, 1000);
6161
};
6262

6363
function boltKitSupport() {

test/v1/driver.test.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ describe('driver', function() {
4848
driver.session();
4949
});
5050

51+
it('should handle wrong scheme ', function() {
52+
expect(function(){neo4j.driver("tank://localhost", neo4j.auth.basic("neo4j", "neo4j"))}).toThrow(new Error("Unknown scheme: tank://"));
53+
});
54+
5155
it('should fail early on wrong credentials', function(done) {
5256
// Given
5357
var driver = neo4j.driver("bolt://localhost", neo4j.auth.basic("neo4j", "who would use such a password"));

0 commit comments

Comments
 (0)