-
Notifications
You must be signed in to change notification settings - Fork 30.4k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
lib: move _stream_wrap into internals
This makes a subsequent possible deprecation easier. PR-URL: #16158 Backport-PR-URL: #16626 Reviewed-By: Colin Ihrig <cjihrig@gmail.com> Reviewed-By: Franziska Hinkelmann <franziska.hinkelmann@gmail.com> Reviewed-By: Tobias Nießen <tniessen@tnie.de>
- Loading branch information
Showing
3 changed files
with
229 additions
and
225 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,227 +1,3 @@ | ||
'use strict'; | ||
|
||
const assert = require('assert'); | ||
const util = require('util'); | ||
// TODO(bmeurer): Change this back to const once hole checks are | ||
// properly optimized away early in Ignition+TurboFan. | ||
var Buffer = require('buffer').Buffer; | ||
const { Socket } = require('net'); | ||
const { JSStream } = process.binding('js_stream'); | ||
const uv = process.binding('uv'); | ||
const debug = util.debuglog('stream_wrap'); | ||
|
||
function StreamWrap(stream) { | ||
const handle = new JSStream(); | ||
|
||
this.stream = stream; | ||
|
||
this._list = null; | ||
|
||
const self = this; | ||
handle.close = function(cb) { | ||
debug('close'); | ||
self.doClose(cb); | ||
}; | ||
handle.isAlive = function() { | ||
return self.isAlive(); | ||
}; | ||
handle.isClosing = function() { | ||
return self.isClosing(); | ||
}; | ||
handle.onreadstart = function() { | ||
return self.readStart(); | ||
}; | ||
handle.onreadstop = function() { | ||
return self.readStop(); | ||
}; | ||
handle.onshutdown = function(req) { | ||
return self.doShutdown(req); | ||
}; | ||
handle.onwrite = function(req, bufs) { | ||
return self.doWrite(req, bufs); | ||
}; | ||
|
||
this.stream.pause(); | ||
this.stream.on('error', function onerror(err) { | ||
self.emit('error', err); | ||
}); | ||
this.stream.on('data', function ondata(chunk) { | ||
if (!(chunk instanceof Buffer)) { | ||
// Make sure that no further `data` events will happen | ||
this.pause(); | ||
this.removeListener('data', ondata); | ||
|
||
self.emit('error', new Error('Stream has StringDecoder')); | ||
return; | ||
} | ||
|
||
debug('data', chunk.length); | ||
if (self._handle) | ||
self._handle.readBuffer(chunk); | ||
}); | ||
this.stream.once('end', function onend() { | ||
debug('end'); | ||
if (self._handle) | ||
self._handle.emitEOF(); | ||
}); | ||
|
||
Socket.call(this, { | ||
handle: handle | ||
}); | ||
} | ||
util.inherits(StreamWrap, Socket); | ||
module.exports = StreamWrap; | ||
|
||
// require('_stream_wrap').StreamWrap | ||
StreamWrap.StreamWrap = StreamWrap; | ||
|
||
StreamWrap.prototype.isAlive = function isAlive() { | ||
return true; | ||
}; | ||
|
||
StreamWrap.prototype.isClosing = function isClosing() { | ||
return !this.readable || !this.writable; | ||
}; | ||
|
||
StreamWrap.prototype.readStart = function readStart() { | ||
this.stream.resume(); | ||
return 0; | ||
}; | ||
|
||
StreamWrap.prototype.readStop = function readStop() { | ||
this.stream.pause(); | ||
return 0; | ||
}; | ||
|
||
StreamWrap.prototype.doShutdown = function doShutdown(req) { | ||
const self = this; | ||
const handle = this._handle; | ||
const item = this._enqueue('shutdown', req); | ||
|
||
this.stream.end(function() { | ||
// Ensure that write was dispatched | ||
setImmediate(function() { | ||
if (!self._dequeue(item)) | ||
return; | ||
|
||
handle.finishShutdown(req, 0); | ||
}); | ||
}); | ||
return 0; | ||
}; | ||
|
||
StreamWrap.prototype.doWrite = function doWrite(req, bufs) { | ||
const self = this; | ||
const handle = self._handle; | ||
|
||
var pending = bufs.length; | ||
|
||
// Queue the request to be able to cancel it | ||
const item = self._enqueue('write', req); | ||
|
||
self.stream.cork(); | ||
for (var n = 0; n < bufs.length; n++) | ||
self.stream.write(bufs[n], done); | ||
self.stream.uncork(); | ||
|
||
function done(err) { | ||
if (!err && --pending !== 0) | ||
return; | ||
|
||
// Ensure that this is called once in case of error | ||
pending = 0; | ||
|
||
// Ensure that write was dispatched | ||
setImmediate(function() { | ||
// Do not invoke callback twice | ||
if (!self._dequeue(item)) | ||
return; | ||
|
||
var errCode = 0; | ||
if (err) { | ||
if (err.code && uv['UV_' + err.code]) | ||
errCode = uv['UV_' + err.code]; | ||
else | ||
errCode = uv.UV_EPIPE; | ||
} | ||
|
||
handle.doAfterWrite(req); | ||
handle.finishWrite(req, errCode); | ||
}); | ||
} | ||
|
||
return 0; | ||
}; | ||
|
||
function QueueItem(type, req) { | ||
this.type = type; | ||
this.req = req; | ||
this.prev = this; | ||
this.next = this; | ||
} | ||
|
||
StreamWrap.prototype._enqueue = function _enqueue(type, req) { | ||
const item = new QueueItem(type, req); | ||
if (this._list === null) { | ||
this._list = item; | ||
return item; | ||
} | ||
|
||
item.next = this._list.next; | ||
item.prev = this._list; | ||
item.next.prev = item; | ||
item.prev.next = item; | ||
|
||
return item; | ||
}; | ||
|
||
StreamWrap.prototype._dequeue = function _dequeue(item) { | ||
assert(item instanceof QueueItem); | ||
|
||
var next = item.next; | ||
var prev = item.prev; | ||
|
||
if (next === null && prev === null) | ||
return false; | ||
|
||
item.next = null; | ||
item.prev = null; | ||
|
||
if (next === item) { | ||
prev = null; | ||
next = null; | ||
} else { | ||
prev.next = next; | ||
next.prev = prev; | ||
} | ||
|
||
if (this._list === item) | ||
this._list = next; | ||
|
||
return true; | ||
}; | ||
|
||
StreamWrap.prototype.doClose = function doClose(cb) { | ||
const self = this; | ||
const handle = self._handle; | ||
|
||
setImmediate(function() { | ||
while (self._list !== null) { | ||
const item = self._list; | ||
const req = item.req; | ||
self._dequeue(item); | ||
|
||
const errCode = uv.UV_ECANCELED; | ||
if (item.type === 'write') { | ||
handle.doAfterWrite(req); | ||
handle.finishWrite(req, errCode); | ||
} else if (item.type === 'shutdown') { | ||
handle.finishShutdown(req, errCode); | ||
} | ||
} | ||
|
||
// Should be already set by net.js | ||
assert(self._handle === null); | ||
cb(); | ||
}); | ||
}; | ||
module.exports = require('internal/wrap_js_stream'); |
Oops, something went wrong.