Skip to content

Commit

Permalink
net: shutdown gracefully
Browse files Browse the repository at this point in the history
Wait for the `shutdown` request completion before emitting the `finish`
event and destroying the socket. While this might not be that relevant
in case of plain TCP sockets, the TLS implementation sends close-notify
packet during shutdown request. Destroying socket whilst this write is
in progress tends to cause ECONNRESET errors in our tests.
  • Loading branch information
indutny committed Jun 6, 2015
1 parent 6e78e5f commit f3503ba
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 28 deletions.
22 changes: 20 additions & 2 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ function WriteReq(chunk, encoding, cb) {
this.next = null;
}

function _end(stream, cb) {
cb(null);
}

function WritableState(options, stream) {
options = options || {};

Expand Down Expand Up @@ -106,6 +110,13 @@ function WritableState(options, stream) {

// True if the error was already emitted and should not be thrown again
this.errorEmitted = false;

// when `state._end()` was called
this.flushed = false;

// NOTE: added here to not pollute the prototype of the WritableStream and to
// avoid conflicts with user-land methods
this._end = options._end || _end;
}

WritableState.prototype.getBuffer = function writableStateGetBuffer() {
Expand Down Expand Up @@ -447,6 +458,7 @@ function needFinish(state) {
state.length === 0 &&
state.bufferedRequest === null &&
!state.finished &&
!state.flushed &&
!state.writing);
}

Expand All @@ -462,8 +474,14 @@ function finishMaybe(stream, state) {
if (need) {
if (state.pendingcb === 0) {
prefinish(stream, state);
state.finished = true;
stream.emit('finish');
state.flushed = true;
state._end(stream, function(err) {
if (err)
stream.emit('error', err);

state.finished = true;
stream.emit('finish');
});
} else {
prefinish(stream, state);
}
Expand Down
75 changes: 49 additions & 26 deletions lib/net.js
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ function Socket(options) {

stream.Duplex.call(this, options);

// NOTE: do it here to avoid copying `options`
this._writableState._end = _end;

if (options.handle) {
this._handle = options.handle; // private
} else if (options.fd !== undefined) {
Expand Down Expand Up @@ -199,32 +202,7 @@ function onSocketFinish() {
if (!this._handle || !this._handle.shutdown)
return this.destroy();

var req = new ShutdownWrap();
req.oncomplete = afterShutdown;
req.handle = this._handle;
var err = this._handle.shutdown(req);

if (err)
return this._destroy(errnoException(err, 'shutdown'));
}


function afterShutdown(status, handle, req) {
var self = handle.owner;

debug('afterShutdown destroyed=%j', self.destroyed,
self._readableState);

// callback may come after call to destroy.
if (self.destroyed)
return;

if (self._readableState.ended) {
debug('readableState ended, destroying');
self.destroy();
} else {
self.once('_socketEnd', self.destroy);
}
this.once('_socketEnd', this.destroy);
}

// the EOF has been received, and no more bytes are coming.
Expand Down Expand Up @@ -690,6 +668,50 @@ Socket.prototype._write = function(data, encoding, cb) {
this._writeGeneric(false, data, encoding, cb);
};

function _end(socket, cb) {
debug('_end');

// If still connecting - defer handling 'finish' until 'connect' will happen
if (socket._connecting) {
debug('_end: not yet connected');
return socket.once('connect', function() {
_end(socket, cb);
});
}

if (!socket.readable || socket._readableState.ended) {
debug('_end: not readable or ended');
return cb();
}

// otherwise, just shutdown, or destroy() if not possible
if (!socket._handle || !socket._handle.shutdown) {
debug('_end: no handle or handle does not support shutdown');
return cb();
}

var req = new ShutdownWrap();
req.oncomplete = afterShutdown;
req.handle = this._handle;
req.flushCb = cb;
var err = socket._handle.shutdown(req);

if (err) {
debug('_end: errno %s', err);
return cb(errnoException(err, 'shutdown'));
}
}


function afterShutdown(status, handle, req) {
var self = handle.owner;

debug('afterShutdown destroyed=%j', self.destroyed,
self._readableState);

req.flushCb();
}

function createWriteReq(req, handle, data, encoding) {
switch (encoding) {
case 'binary':
Expand Down Expand Up @@ -864,6 +886,7 @@ Socket.prototype.connect = function(options, cb) {
this._readableState.endEmitted = false;
this._writableState.ended = false;
this._writableState.ending = false;
this._writableState.flushed = false;
this._writableState.finished = false;
this._writableState.errorEmitted = false;
this.destroyed = false;
Expand Down
22 changes: 22 additions & 0 deletions test/parallel/test-stream2-writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -383,3 +383,25 @@ test('finish is emitted if last chunk is empty', function(t) {
w.write(Buffer(1));
w.end(Buffer(0));
});

test('finish is emitted after shutdown', function(t) {
var w = new W();
var shutdown = false;

w._writableState._end = function(stream, cb) {
assert(stream === w);
setTimeout(function() {
shutdown = true;
cb();
}, 100);
};
w._write = function(chunk, e, cb) {
process.nextTick(cb);
};
w.on('finish', function() {
assert(shutdown);
t.end();
});
w.write(Buffer(1));
w.end(Buffer(0));
});

0 comments on commit f3503ba

Please sign in to comment.