@@ -17,48 +17,23 @@ const {
17
17
validateObject,
18
18
} = require ( 'internal/validators' ) ;
19
19
20
+ const {
21
+ isClosed,
22
+ isReadable,
23
+ isReadableNodeStream,
24
+ isReadableFinished,
25
+ isWritable,
26
+ isWritableNodeStream,
27
+ isWritableFinished,
28
+ willEmitClose : _willEmitClose ,
29
+ } = require ( 'internal/streams/utils' ) ;
30
+
20
31
function isRequest ( stream ) {
21
32
return stream . setHeader && typeof stream . abort === 'function' ;
22
33
}
23
34
24
- function isServerResponse ( stream ) {
25
- return (
26
- typeof stream . _sent100 === 'boolean' &&
27
- typeof stream . _removedConnection === 'boolean' &&
28
- typeof stream . _removedContLen === 'boolean' &&
29
- typeof stream . _removedTE === 'boolean' &&
30
- typeof stream . _closed === 'boolean'
31
- ) ;
32
- }
33
-
34
- function isReadable ( stream ) {
35
- return typeof stream . readable === 'boolean' ||
36
- typeof stream . readableEnded === 'boolean' ||
37
- ! ! stream . _readableState ;
38
- }
39
-
40
- function isWritable ( stream ) {
41
- return typeof stream . writable === 'boolean' ||
42
- typeof stream . writableEnded === 'boolean' ||
43
- ! ! stream . _writableState ;
44
- }
45
-
46
- function isWritableFinished ( stream ) {
47
- if ( stream . writableFinished ) return true ;
48
- const wState = stream . _writableState ;
49
- if ( ! wState || wState . errored ) return false ;
50
- return wState . finished || ( wState . ended && wState . length === 0 ) ;
51
- }
52
-
53
35
const nop = ( ) => { } ;
54
36
55
- function isReadableEnded ( stream ) {
56
- if ( stream . readableEnded ) return true ;
57
- const rState = stream . _readableState ;
58
- if ( ! rState || rState . errored ) return false ;
59
- return rState . endEmitted || ( rState . ended && rState . length === 0 ) ;
60
- }
61
-
62
37
function eos ( stream , options , callback ) {
63
38
if ( arguments . length === 2 ) {
64
39
callback = options ;
@@ -74,13 +49,12 @@ function eos(stream, options, callback) {
74
49
callback = once ( callback ) ;
75
50
76
51
const readable = options . readable ||
77
- ( options . readable !== false && isReadable ( stream ) ) ;
52
+ ( options . readable !== false && isReadableNodeStream ( stream ) ) ;
78
53
const writable = options . writable ||
79
- ( options . writable !== false && isWritable ( stream ) ) ;
54
+ ( options . writable !== false && isWritableNodeStream ( stream ) ) ;
80
55
81
56
const wState = stream . _writableState ;
82
57
const rState = stream . _readableState ;
83
- const state = wState || rState ;
84
58
85
59
const onlegacyfinish = ( ) => {
86
60
if ( ! stream . writable ) onfinish ( ) ;
@@ -89,16 +63,13 @@ function eos(stream, options, callback) {
89
63
// TODO (ronag): Improve soft detection to include core modules and
90
64
// common ecosystem modules that do properly emit 'close' but fail
91
65
// this generic check.
92
- let willEmitClose = isServerResponse ( stream ) || (
93
- state &&
94
- state . autoDestroy &&
95
- state . emitClose &&
96
- state . closed === false &&
97
- isReadable ( stream ) === readable &&
98
- isWritable ( stream ) === writable
66
+ let willEmitClose = (
67
+ _willEmitClose ( stream ) &&
68
+ isReadableNodeStream ( stream ) === readable &&
69
+ isWritableNodeStream ( stream ) === writable
99
70
) ;
100
71
101
- let writableFinished = stream . writableFinished || wState ?. finished ;
72
+ let writableFinished = isWritableFinished ( stream , false ) ;
102
73
const onfinish = ( ) => {
103
74
writableFinished = true ;
104
75
// Stream should not be destroyed here. If it is that
@@ -107,12 +78,12 @@ function eos(stream, options, callback) {
107
78
if ( stream . destroyed ) willEmitClose = false ;
108
79
109
80
if ( willEmitClose && ( ! stream . readable || readable ) ) return ;
110
- if ( ! readable || readableEnded ) callback . call ( stream ) ;
81
+ if ( ! readable || readableFinished ) callback . call ( stream ) ;
111
82
} ;
112
83
113
- let readableEnded = stream . readableEnded || rState ?. endEmitted ;
84
+ let readableFinished = isReadableFinished ( stream , false ) ;
114
85
const onend = ( ) => {
115
- readableEnded = true ;
86
+ readableFinished = true ;
116
87
// Stream should not be destroyed here. If it is that
117
88
// means that user space is doing something differently and
118
89
// we cannot trust willEmitClose.
@@ -126,7 +97,7 @@ function eos(stream, options, callback) {
126
97
callback . call ( stream , err ) ;
127
98
} ;
128
99
129
- let closed = wState ?. closed || rState ?. closed ;
100
+ let closed = isClosed ( stream ) ;
130
101
131
102
const onclose = ( ) => {
132
103
closed = true ;
@@ -137,13 +108,13 @@ function eos(stream, options, callback) {
137
108
return callback . call ( stream , errored ) ;
138
109
}
139
110
140
- if ( readable && ! readableEnded ) {
141
- if ( ! isReadableEnded ( stream ) )
111
+ if ( readable && ! readableFinished ) {
112
+ if ( ! isReadableFinished ( stream , false ) )
142
113
return callback . call ( stream ,
143
114
new ERR_STREAM_PREMATURE_CLOSE ( ) ) ;
144
115
}
145
116
if ( writable && ! writableFinished ) {
146
- if ( ! isWritableFinished ( stream ) )
117
+ if ( ! isWritableFinished ( stream , false ) )
147
118
return callback . call ( stream ,
148
119
new ERR_STREAM_PREMATURE_CLOSE ( ) ) ;
149
120
}
@@ -185,19 +156,16 @@ function eos(stream, options, callback) {
185
156
}
186
157
} else if (
187
158
! readable &&
188
- ( ! willEmitClose || stream . readable ) &&
189
- writableFinished
159
+ ( ! willEmitClose || isReadable ( stream ) ) &&
160
+ ( writableFinished || ! isWritable ( stream ) )
190
161
) {
191
162
process . nextTick ( onclose ) ;
192
163
} else if (
193
164
! writable &&
194
- ( ! willEmitClose || stream . writable ) &&
195
- readableEnded
165
+ ( ! willEmitClose || isWritable ( stream ) ) &&
166
+ ( readableFinished || ! isReadable ( stream ) )
196
167
) {
197
168
process . nextTick ( onclose ) ;
198
- } else if ( ! wState && ! rState && stream . _closed === true ) {
199
- // _closed is for OutgoingMessage which is not a proper Writable.
200
- process . nextTick ( onclose ) ;
201
169
} else if ( ( rState && stream . req && stream . aborted ) ) {
202
170
process . nextTick ( onclose ) ;
203
171
}
0 commit comments