@@ -8,216 +8,213 @@ const uv = process.binding('uv');
88const debug = util . debuglog ( 'stream_wrap' ) ;
99const errors = require ( 'internal/errors' ) ;
1010
11- function StreamWrap ( stream ) {
12- const handle = new JSStream ( ) ;
13-
14- this . stream = stream ;
15-
16- this . _list = null ;
17-
18- const self = this ;
19- handle . close = function ( cb ) {
20- debug ( 'close' ) ;
21- self . doClose ( cb ) ;
22- } ;
23- handle . isAlive = function ( ) {
24- return self . isAlive ( ) ;
25- } ;
26- handle . isClosing = function ( ) {
27- return self . isClosing ( ) ;
28- } ;
29- handle . onreadstart = function ( ) {
30- return self . readStart ( ) ;
31- } ;
32- handle . onreadstop = function ( ) {
33- return self . readStop ( ) ;
34- } ;
35- handle . onshutdown = function ( req ) {
36- return self . doShutdown ( req ) ;
37- } ;
38- handle . onwrite = function ( req , bufs ) {
39- return self . doWrite ( req , bufs ) ;
40- } ;
41-
42- this . stream . pause ( ) ;
43- this . stream . on ( 'error' , function onerror ( err ) {
44- self . emit ( 'error' , err ) ;
45- } ) ;
46- this . stream . on ( 'data' , function ondata ( chunk ) {
47- if ( typeof chunk === 'string' || this . _readableState . objectMode === true ) {
48- // Make sure that no further `data` events will happen
49- this . pause ( ) ;
50- this . removeListener ( 'data' , ondata ) ;
51-
52- self . emit ( 'error' , new errors . Error ( 'ERR_STREAM_WRAP' ) ) ;
53- return ;
54- }
55-
56- debug ( 'data' , chunk . length ) ;
57- if ( self . _handle )
58- self . _handle . readBuffer ( chunk ) ;
59- } ) ;
60- this . stream . once ( 'end' , function onend ( ) {
61- debug ( 'end' ) ;
62- if ( self . _handle )
63- self . _handle . emitEOF ( ) ;
64- } ) ;
65-
66- Socket . call ( this , {
67- handle : handle
68- } ) ;
69- }
70- util . inherits ( StreamWrap , Socket ) ;
71- module . exports = StreamWrap ;
72-
73- // require('_stream_wrap').StreamWrap
74- StreamWrap . StreamWrap = StreamWrap ;
75-
76- StreamWrap . prototype . isAlive = function isAlive ( ) {
77- return true ;
78- } ;
79-
80- StreamWrap . prototype . isClosing = function isClosing ( ) {
81- return ! this . readable || ! this . writable ;
82- } ;
83-
84- StreamWrap . prototype . readStart = function readStart ( ) {
85- this . stream . resume ( ) ;
86- return 0 ;
87- } ;
88-
89- StreamWrap . prototype . readStop = function readStop ( ) {
90- this . stream . pause ( ) ;
91- return 0 ;
92- } ;
93-
94- StreamWrap . prototype . doShutdown = function doShutdown ( req ) {
95- const self = this ;
96- const handle = this . _handle ;
97- const item = this . _enqueue ( 'shutdown' , req ) ;
98-
99- this . stream . end ( function ( ) {
100- // Ensure that write was dispatched
101- setImmediate ( function ( ) {
102- if ( ! self . _dequeue ( item ) )
11+ /* This class serves as a wrapper for when the C++ side of Node wants access
12+ * to a standard JS stream. For example, TLS or HTTP do not operate on network
13+ * resources conceptually, although that is the common case and what we are
14+ * optimizing for; in theory, they are completely composable and can work with
15+ * any stream resource they see.
16+ *
17+ * For the common case, i.e. a TLS socket wrapping around a net.Socket, we
18+ * can skip going through the JS layer and let TLS access the raw C++ handle
19+ * of a net.Socket. The flipside of this is that, to maintain composability,
20+ * we need a way to create "fake" net.Socket instances that call back into a
21+ * "real" JavaScript stream. JSStreamWrap is exactly this.
22+ */
23+ class JSStreamWrap extends Socket {
24+ constructor ( stream ) {
25+ const handle = new JSStream ( ) ;
26+ handle . close = ( cb ) => {
27+ debug ( 'close' ) ;
28+ this . doClose ( cb ) ;
29+ } ;
30+ handle . isAlive = ( ) => this . isAlive ( ) ;
31+ handle . isClosing = ( ) => this . isClosing ( ) ;
32+ handle . onreadstart = ( ) => this . readStart ( ) ;
33+ handle . onreadstop = ( ) => this . readStop ( ) ;
34+ handle . onshutdown = ( req ) => this . doShutdown ( req ) ;
35+ handle . onwrite = ( req , bufs ) => this . doWrite ( req , bufs ) ;
36+
37+ stream . pause ( ) ;
38+ stream . on ( 'error' , ( err ) => this . emit ( 'error' , err ) ) ;
39+ const ondata = ( chunk ) => {
40+ if ( typeof chunk === 'string' ||
41+ stream . _readableState . objectMode === true ) {
42+ // Make sure that no further `data` events will happen.
43+ stream . pause ( ) ;
44+ stream . removeListener ( 'data' , ondata ) ;
45+
46+ this . emit ( 'error' , new errors . Error ( 'ERR_STREAM_WRAP' ) ) ;
10347 return ;
48+ }
10449
105- handle . finishShutdown ( req , 0 ) ;
50+ debug ( 'data' , chunk . length ) ;
51+ if ( this . _handle )
52+ this . _handle . readBuffer ( chunk ) ;
53+ } ;
54+ stream . on ( 'data' , ondata ) ;
55+ stream . once ( 'end' , ( ) => {
56+ debug ( 'end' ) ;
57+ if ( this . _handle )
58+ this . _handle . emitEOF ( ) ;
10659 } ) ;
107- } ) ;
108- return 0 ;
109- } ;
11060
111- StreamWrap . prototype . doWrite = function doWrite ( req , bufs ) {
112- const self = this ;
113- const handle = self . _handle ;
61+ super ( { handle, manualStart : true } ) ;
62+ this . stream = stream ;
63+ this . _list = null ;
64+ this . read ( 0 ) ;
65+ }
11466
115- var pending = bufs . length ;
67+ // Legacy
68+ static get StreamWrap ( ) {
69+ return JSStreamWrap ;
70+ }
11671
117- // Queue the request to be able to cancel it
118- const item = self . _enqueue ( 'write' , req ) ;
72+ isAlive ( ) {
73+ return true ;
74+ }
11975
120- self . stream . cork ( ) ;
121- for ( var n = 0 ; n < bufs . length ; n ++ )
122- self . stream . write ( bufs [ n ] , done ) ;
123- self . stream . uncork ( ) ;
76+ isClosing ( ) {
77+ return ! this . readable || ! this . writable ;
78+ }
12479
125- function done ( err ) {
126- if ( ! err && -- pending !== 0 )
127- return ;
80+ readStart ( ) {
81+ this . stream . resume ( ) ;
82+ return 0 ;
83+ }
12884
129- // Ensure that this is called once in case of error
130- pending = 0 ;
85+ readStop ( ) {
86+ this . stream . pause ( ) ;
87+ return 0 ;
88+ }
13189
132- let errCode = 0 ;
133- if ( err ) {
134- const code = uv [ `UV_${ err . code } ` ] ;
135- errCode = ( err . code && code ) ? code : uv . UV_EPIPE ;
136- }
90+ doShutdown ( req ) {
91+ const handle = this . _handle ;
92+ const item = this . _enqueue ( 'shutdown' , req ) ;
13793
138- // Ensure that write was dispatched
139- setImmediate ( function ( ) {
140- // Do not invoke callback twice
141- if ( ! self . _dequeue ( item ) )
142- return ;
94+ this . stream . end ( ( ) => {
95+ // Ensure that write was dispatched
96+ setImmediate ( ( ) => {
97+ if ( ! this . _dequeue ( item ) )
98+ return ;
14399
144- handle . doAfterWrite ( req ) ;
145- handle . finishWrite ( req , errCode ) ;
100+ handle . finishShutdown ( req , 0 ) ;
101+ } ) ;
146102 } ) ;
103+ return 0 ;
147104 }
148105
149- return 0 ;
150- } ;
106+ doWrite ( req , bufs ) {
107+ const self = this ;
108+ const handle = this . _handle ;
151109
152- function QueueItem ( type , req ) {
153- this . type = type ;
154- this . req = req ;
155- this . prev = this ;
156- this . next = this ;
157- }
110+ var pending = bufs . length ;
158111
159- StreamWrap . prototype . _enqueue = function _enqueue ( type , req ) {
160- const item = new QueueItem ( type , req ) ;
161- if ( this . _list === null ) {
162- this . _list = item ;
163- return item ;
164- }
112+ // Queue the request to be able to cancel it
113+ const item = this . _enqueue ( 'write' , req ) ;
114+
115+ this . stream . cork ( ) ;
116+ for ( var n = 0 ; n < bufs . length ; n ++ )
117+ this . stream . write ( bufs [ n ] , done ) ;
118+ this . stream . uncork ( ) ;
119+
120+ function done ( err ) {
121+ if ( ! err && -- pending !== 0 )
122+ return ;
123+
124+ // Ensure that this is called once in case of error
125+ pending = 0 ;
165126
166- item . next = this . _list . next ;
167- item . prev = this . _list ;
168- item . next . prev = item ;
169- item . prev . next = item ;
127+ let errCode = 0 ;
128+ if ( err ) {
129+ const code = uv [ `UV_${ err . code } ` ] ;
130+ errCode = ( err . code && code ) ? code : uv . UV_EPIPE ;
131+ }
170132
171- return item ;
172- } ;
133+ // Ensure that write was dispatched
134+ setImmediate ( function ( ) {
135+ // Do not invoke callback twice
136+ if ( ! self . _dequeue ( item ) )
137+ return ;
173138
174- StreamWrap . prototype . _dequeue = function _dequeue ( item ) {
175- assert ( item instanceof QueueItem ) ;
139+ handle . doAfterWrite ( req ) ;
140+ handle . finishWrite ( req , errCode ) ;
141+ } ) ;
142+ }
176143
177- var next = item . next ;
178- var prev = item . prev ;
144+ return 0 ;
145+ }
179146
180- if ( next === null && prev === null )
181- return false ;
147+ _enqueue ( type , req ) {
148+ const item = new QueueItem ( type , req ) ;
149+ if ( this . _list === null ) {
150+ this . _list = item ;
151+ return item ;
152+ }
182153
183- item . next = null ;
184- item . prev = null ;
154+ item . next = this . _list . next ;
155+ item . prev = this . _list ;
156+ item . next . prev = item ;
157+ item . prev . next = item ;
185158
186- if ( next === item ) {
187- prev = null ;
188- next = null ;
189- } else {
190- prev . next = next ;
191- next . prev = prev ;
159+ return item ;
192160 }
193161
194- if ( this . _list === item )
195- this . _list = next ;
162+ _dequeue ( item ) {
163+ assert ( item instanceof QueueItem ) ;
196164
197- return true ;
198- } ;
165+ var next = item . next ;
166+ var prev = item . prev ;
199167
200- StreamWrap . prototype . doClose = function doClose ( cb ) {
201- const self = this ;
202- const handle = self . _handle ;
168+ if ( next === null && prev === null )
169+ return false ;
203170
204- setImmediate ( function ( ) {
205- while ( self . _list !== null ) {
206- const item = self . _list ;
207- const req = item . req ;
208- self . _dequeue ( item ) ;
171+ item . next = null ;
172+ item . prev = null ;
209173
210- const errCode = uv . UV_ECANCELED ;
211- if ( item . type === 'write' ) {
212- handle . doAfterWrite ( req ) ;
213- handle . finishWrite ( req , errCode ) ;
214- } else if ( item . type === 'shutdown' ) {
215- handle . finishShutdown ( req , errCode ) ;
216- }
174+ if ( next === item ) {
175+ prev = null ;
176+ next = null ;
177+ } else {
178+ prev . next = next ;
179+ next . prev = prev ;
217180 }
218181
219- // Should be already set by net.js
220- assert ( self . _handle === null ) ;
221- cb ( ) ;
222- } ) ;
223- } ;
182+ if ( this . _list === item )
183+ this . _list = next ;
184+
185+ return true ;
186+ }
187+
188+ doClose ( cb ) {
189+ const handle = this . _handle ;
190+
191+ setImmediate ( ( ) => {
192+ while ( this . _list !== null ) {
193+ const item = this . _list ;
194+ const req = item . req ;
195+ this . _dequeue ( item ) ;
196+
197+ const errCode = uv . UV_ECANCELED ;
198+ if ( item . type === 'write' ) {
199+ handle . doAfterWrite ( req ) ;
200+ handle . finishWrite ( req , errCode ) ;
201+ } else if ( item . type === 'shutdown' ) {
202+ handle . finishShutdown ( req , errCode ) ;
203+ }
204+ }
205+
206+ // Should be already set by net.js
207+ assert . strictEqual ( this . _handle , null ) ;
208+ cb ( ) ;
209+ } ) ;
210+ }
211+ }
212+
213+ function QueueItem ( type , req ) {
214+ this . type = type ;
215+ this . req = req ;
216+ this . prev = this ;
217+ this . next = this ;
218+ }
219+
220+ module . exports = JSStreamWrap ;
0 commit comments