Skip to content

Commit 56b1079

Browse files
committed
Forget CC members on DatabaseUnavailable
This commit makes routing driver forget Causal Cluster members when they fail with `Neo.TransientError.General.DatabaseUnavailable` error code. It means database is either shutting down or doing store copy. Generally we should not hammer database in such state with new requests and give it some time to either recover (in case of store copy) or just stop using it. Also restructured a bit error handling code in `RoutingDriver` and `RoutingSession`.
1 parent e42c360 commit 56b1079

File tree

3 files changed

+111
-29
lines changed

3 files changed

+111
-29
lines changed

src/v1/routing-driver.js

Lines changed: 41 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import RoundRobinLoadBalancingStrategy, {ROUND_ROBIN_STRATEGY_NAME} from './inte
3131
class RoutingDriver extends Driver {
3232

3333
constructor(url, routingContext, userAgent, token = {}, config = {}) {
34-
super(url, userAgent, token, RoutingDriver._validateConfig(config));
34+
super(url, userAgent, token, validateConfig(config));
3535
this._routingContext = routingContext;
3636
}
3737

@@ -42,16 +42,18 @@ class RoutingDriver extends Driver {
4242

4343
_createSession(mode, connectionProvider, bookmark, config) {
4444
return new RoutingSession(mode, connectionProvider, bookmark, config, (error, conn) => {
45-
if (error.code === SESSION_EXPIRED) {
46-
this._forgetConnection(conn);
45+
if (!conn) {
46+
// connection can be undefined if error happened before connection was acquired
4747
return error;
48-
} else if (RoutingDriver._isFailureToWrite(error)) {
49-
let url = 'UNKNOWN';
50-
// connection is undefined if error happened before connection was acquired
51-
if (conn) {
52-
url = conn.url;
53-
this._connectionProvider.forgetWriter(conn.url);
54-
}
48+
}
49+
50+
const url = conn.url;
51+
52+
if (error.code === SESSION_EXPIRED || isDatabaseUnavailable(error)) {
53+
this._connectionProvider.forget(url);
54+
return error;
55+
} else if (isFailureToWrite(error)) {
56+
this._connectionProvider.forgetWriter(url);
5557
return newError('No longer possible to write to server at ' + url, SESSION_EXPIRED);
5658
} else {
5759
return error;
@@ -65,30 +67,12 @@ class RoutingDriver extends Driver {
6567
return SESSION_EXPIRED;
6668
}
6769

68-
_forgetConnection(connection) {
69-
// connection is undefined if error happened before connection was acquired
70-
if (connection) {
71-
this._connectionProvider.forget(connection.url);
72-
}
73-
}
74-
75-
static _validateConfig(config) {
76-
if(config.trust === 'TRUST_ON_FIRST_USE') {
77-
throw newError('The chosen trust mode is not compatible with a routing driver');
78-
}
79-
return config;
80-
}
81-
82-
static _isFailureToWrite(error) {
83-
return error.code === 'Neo.ClientError.Cluster.NotALeader' ||
84-
error.code === 'Neo.ClientError.General.ForbiddenOnReadOnlyDatabase';
85-
}
86-
8770
/**
8871
* Create new load balancing strategy based on the config.
8972
* @param {object} config the user provided config.
9073
* @param {Pool} connectionPool the connection pool for this driver.
9174
* @return {LoadBalancingStrategy} new strategy.
75+
* @private
9276
*/
9377
static _createLoadBalancingStrategy(config, connectionPool) {
9478
const configuredValue = config.loadBalancingStrategy;
@@ -102,6 +86,34 @@ class RoutingDriver extends Driver {
10286
}
10387
}
10488

89+
/**
90+
* @private
91+
*/
92+
function validateConfig(config) {
93+
if (config.trust === 'TRUST_ON_FIRST_USE') {
94+
throw newError('The chosen trust mode is not compatible with a routing driver');
95+
}
96+
return config;
97+
}
98+
99+
/**
100+
* @private
101+
*/
102+
function isFailureToWrite(error) {
103+
return error.code === 'Neo.ClientError.Cluster.NotALeader' ||
104+
error.code === 'Neo.ClientError.General.ForbiddenOnReadOnlyDatabase';
105+
}
106+
107+
/**
108+
* @private
109+
*/
110+
function isDatabaseUnavailable(error) {
111+
return error.code === 'Neo.TransientError.General.DatabaseUnavailable';
112+
}
113+
114+
/**
115+
* @private
116+
*/
105117
class RoutingSession extends Session {
106118
constructor(mode, connectionProvider, bookmark, config, onFailedConnection) {
107119
super(mode, connectionProvider, bookmark, config);
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
!: AUTO INIT
2+
!: AUTO RESET
3+
!: AUTO PULL_ALL
4+
!: AUTO ACK_FAILURE
5+
!: AUTO RUN "ROLLBACK" {}
6+
!: AUTO RUN "BEGIN" {}
7+
!: AUTO RUN "COMMIT" {}
8+
9+
C: RUN "{{{query}}}" {}
10+
C: PULL_ALL
11+
S: FAILURE {"code": "Neo.TransientError.General.DatabaseUnavailable", "message": "Database is busy doing store copy"}
12+
S: IGNORED

test/v1/routing.driver.boltkit.it.js

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1896,6 +1896,52 @@ describe('routing driver', () => {
18961896
});
18971897
});
18981898

1899+
it('should forget writer on database unavailable error', done => {
1900+
testAddressPurgeOnDatabaseError(`CREATE (n {name:'Bob'})`, WRITE, done);
1901+
});
1902+
1903+
it('should forget reader on database unavailable error', done => {
1904+
testAddressPurgeOnDatabaseError(`RETURN 1`, READ, done);
1905+
});
1906+
1907+
function testAddressPurgeOnDatabaseError(query, accessMode, done) {
1908+
const kit = new boltkit.BoltKit();
1909+
1910+
const router = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9010);
1911+
1912+
const serverPort = accessMode === READ ? 9005 : 9007;
1913+
const serverAddress = '127.0.0.1:' + serverPort;
1914+
const serverTemplateScript = './test/resources/boltkit/address_unavailable_template.script.mst';
1915+
const server = kit.startWithTemplate(serverTemplateScript, {query: query}, serverPort);
1916+
1917+
kit.run(() => {
1918+
const driver = newDriver('bolt+routing://127.0.0.1:9010');
1919+
1920+
const session = driver.session(accessMode);
1921+
session.run(query).catch(error => {
1922+
expect(error.message).toEqual('Database is busy doing store copy');
1923+
expect(error.code).toEqual('Neo.TransientError.General.DatabaseUnavailable');
1924+
1925+
expect(hasAddressInConnectionPool(driver, serverAddress)).toBeFalsy();
1926+
expect(hasRouterInRoutingTable(driver, serverAddress)).toBeFalsy();
1927+
expect(hasReaderInRoutingTable(driver, serverAddress)).toBeFalsy();
1928+
expect(hasWriterInRoutingTable(driver, serverAddress)).toBeFalsy();
1929+
1930+
session.close(() => {
1931+
driver.close();
1932+
1933+
router.exit(code1 => {
1934+
server.exit(code2 => {
1935+
expect(code1).toEqual(0);
1936+
expect(code2).toEqual(0);
1937+
done();
1938+
});
1939+
});
1940+
});
1941+
});
1942+
});
1943+
}
1944+
18991945
function moveNextDateNow30SecondsForward() {
19001946
const currentTime = Date.now();
19011947
hijackNextDateNowCall(currentTime + 30 * 1000 + 1);
@@ -2028,6 +2074,18 @@ describe('routing driver', () => {
20282074
return getConnectionPool(driver).has(address);
20292075
}
20302076

2077+
function hasRouterInRoutingTable(driver, expectedRouter) {
2078+
return getRoutingTable(driver).routers.indexOf(expectedRouter) > -1;
2079+
}
2080+
2081+
function hasReaderInRoutingTable(driver, expectedReader) {
2082+
return getRoutingTable(driver).readers.indexOf(expectedReader) > -1;
2083+
}
2084+
2085+
function hasWriterInRoutingTable(driver, expectedWriter) {
2086+
return getRoutingTable(driver).writers.indexOf(expectedWriter) > -1;
2087+
}
2088+
20312089
function assertHasRouters(driver, expectedRouters) {
20322090
expect(getRoutingTable(driver).routers).toEqual(expectedRouters);
20332091
}

0 commit comments

Comments
 (0)