Skip to content

Emit error when backend unexpectedly disconnects #1316

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 9, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 35 additions & 11 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ var Client = function(config) {
var c = config || {};

this._types = new TypeOverrides(c.types);
this._ending = false;
this._connecting = false;
this._connectionError = false;

this.connection = c.connection || new Connection({
stream: c.stream,
Expand All @@ -50,6 +53,7 @@ util.inherits(Client, EventEmitter);
Client.prototype.connect = function(callback) {
var self = this;
var con = this.connection;
this._connecting = true;

if(this.host && this.host.indexOf('/') === 0) {
con.connect(this.host + '/.s.PGSQL.' + this.port);
Expand Down Expand Up @@ -107,6 +111,7 @@ Client.prototype.connect = function(callback) {
//hook up query handling events to connection
//after the connection initially becomes ready for queries
con.once('readyForQuery', function() {
self._connecting = false;

//delegate rowDescription to active query
con.on('rowDescription', function(msg) {
Expand Down Expand Up @@ -175,34 +180,52 @@ Client.prototype.connect = function(callback) {
});

con.on('error', function(error) {
if(self.activeQuery) {
if(this.activeQuery) {
var activeQuery = self.activeQuery;
self.activeQuery = null;
this.activeQuery = null;
return activeQuery.handleError(error, con);
}

if (this._connecting) {
// set a flag indicating we've seen an error during connection
// the backend will terminate the connection and we don't want
// to throw a second error when the connection is terminated
this._connectionError = true;
}

if(!callback) {
return self.emit('error', error);
return this.emit('error', error);
}

con.end(); // make sure ECONNRESET errors don't cause error events
callback(error);
callback = null;
});
}.bind(this));

con.once('end', function() {
if ( callback ) {
// haven't received a connection message yet !
if (callback) {
// haven't received a connection message yet!
var err = new Error('Connection terminated');
callback(err);
callback = null;
return;
}
if(self.activeQuery) {
if(this.activeQuery) {
var disconnectError = new Error('Connection terminated');
self.activeQuery.handleError(disconnectError, con);
self.activeQuery = null;
this.activeQuery.handleError(disconnectError, con);
this.activeQuery = null;
}
self.emit('end');
});
if (!this._ending) {
// if the connection is ended without us calling .end()
// on this client then we have an unexpected disconnection
// treat this as an error unless we've already emitted an error
// during connection.
if (!this._connectionError) {
this.emit('error', new Error('Connection terminated unexpectedly'));
}
}
this.emit('end');
}.bind(this));


con.on('notice', function(msg) {
Expand Down Expand Up @@ -342,6 +365,7 @@ Client.prototype.query = function(config, values, callback) {
};

Client.prototype.end = function(cb) {
this._ending = true;
this.connection.end();
if (cb) {
this.connection.once('end', cb);
Expand Down
5 changes: 0 additions & 5 deletions lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,6 @@ Connection.prototype.connect = function(port, host) {
});

this.stream.on('close', function() {
// NOTE: node-0.10 emits both 'end' and 'close'
// for streams closed by the peer, while
// node-0.8 only emits 'close'
self.emit('end');
});

Expand Down Expand Up @@ -143,8 +140,6 @@ Connection.prototype.attachListeners = function(stream) {
};

Connection.prototype.requestSsl = function() {
this.checkSslResponse = true;

var bodyBuffer = this.writer
.addInt16(0x04D2)
.addInt16(0x162F).flush();
Expand Down
22 changes: 19 additions & 3 deletions test/integration/client/error-handling-tests.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,24 @@
var helper = require(__dirname + '/test-helper');
var util = require('util');


test('non-query error with callback', function () {
var client = new Client({
user:'asldkfjsadlfkj'
});
client.connect(assert.calls(function (err) {
assert(err);
}));
});

test('non-query error', function() {
var client = new Client({
user:'asldkfjsadlfkj'
});
assert.emits(client, 'error');
client.connect();
});

var createErorrClient = function() {
var client = helper.client();
client.once('error', function(err) {
Expand All @@ -11,9 +29,8 @@ var createErorrClient = function() {
return client;
};

test('error handling', function(){
test('error handling', function() {
test('within a simple query', function() {

var client = createErorrClient();

var query = client.query("select omfg from yodas_dsflsd where pixistix = 'zoiks!!!'");
Expand Down Expand Up @@ -77,7 +94,6 @@ test('error handling', function(){
});

test('non-query error', function() {

var client = new Client({
user:'asldkfjsadlfkj'
});
Expand Down
90 changes: 90 additions & 0 deletions test/integration/client/network-partition-tests.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
var co = require('co')

var buffers = require('../../test-buffers')
var helper = require('./test-helper')

var net = require('net')

var Server = function(response) {
this.server = undefined
this.socket = undefined
this.response = response
}

Server.prototype.start = function (cb) {
// this is our fake postgres server
// it responds with our specified response immediatley after receiving every buffer
// this is sufficient into convincing the client its connectet to a valid backend
// if we respond with a readyForQuery message
this.server = net.createServer(function (socket) {
this.socket = socket
if (this.response) {
this.socket.on('data', function (data) {
// deny request for SSL
if (data.length == 8) {
this.socket.write(new Buffer('N', 'utf8'))
// consider all authentication requests as good
} else if (!data[0]) {
this.socket.write(buffers.authenticationOk())
// respond with our canned response
} else {
this.socket.write(this.response)
}
}.bind(this))
}
}.bind(this))

var port = 54321

var options = {
host: 'localhost',
port: port,
}
this.server.listen(options.port, options.host, function () {
cb(options)
})
}

Server.prototype.drop = function () {
this.socket.end()
}

Server.prototype.close = function (cb) {
this.server.close(cb)
}

var testServer = function (server, cb) {
// wait for our server to start
server.start(function(options) {
// connect a client to it
var client = new helper.Client(options)
client.connect()

// after 50 milliseconds, drop the client
setTimeout(function() {
server.drop()
}, 50)

// blow up if we don't receive an error
var timeoutId = setTimeout(function () {
throw new Error('Client should have emitted an error but it did not.')
}, 5000)

// return our wait token
client.on('error', function () {
clearTimeout(timeoutId)
server.close(cb)
})
})
}

// test being disconnected after readyForQuery
const respondingServer = new Server(buffers.readyForQuery())
testServer(respondingServer, function () {
process.stdout.write('.')
// test being disconnected from a server that never responds
const silentServer = new Server()
testServer(silentServer, function () {
process.stdout.write('.')
})
})
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,18 @@ test('query killed during query execution of prepared statement', function() {
var client = new Client(helper.args);
client.connect(assert.success(function() {
var sleepQuery = 'select pg_sleep($1)';
var query1 = client.query({

const queryConfig = {
name: 'sleep query',
text: sleepQuery,
values: [5] },
assert.calls(function(err, result) {
assert.equal(err.message, 'terminating connection due to administrator command');
values: [5],
};

// client should emit an error because it is unexpectedly disconnected
assert.emits(client, 'error')

var query1 = client.query(queryConfig, assert.calls(function(err, result) {
assert.equal(err.message, 'terminating connection due to administrator command');
}));

query1.on('error', function(err) {
Expand All @@ -53,7 +59,6 @@ test('query killed during query execution of prepared statement', function() {
}));
});


test('client end during query execution of prepared statement', function() {
var client = new Client(helper.args);
client.connect(assert.success(function() {
Expand Down
2 changes: 2 additions & 0 deletions test/unit/client/stream-and-query-error-interaction-tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ test('emits end when not in query', function() {
stream.write = function() {
//NOOP
}

var client = new Client({connection: new Connection({stream: stream})});
client.connect(assert.calls(function() {
client.query('SELECT NOW()', assert.calls(function(err, result) {
assert(err);
}));
}));
assert.emits(client, 'error');
assert.emits(client, 'end');
client.connection.emit('connect');
process.nextTick(function() {
Expand Down