@@ -25,7 +25,7 @@ StreamPipe::StreamPipe(StreamBase* source,
25
25
source->PushStreamListener (&readable_listener_);
26
26
sink->PushStreamListener (&writable_listener_);
27
27
28
- CHECK ( sink->HasWantsWrite () );
28
+ uses_wants_write_ = sink->HasWantsWrite ();
29
29
30
30
// Set up links between this object and the source/sink objects.
31
31
// In particular, this makes sure that they are garbage collected as a group,
@@ -66,7 +66,8 @@ void StreamPipe::Unpipe() {
66
66
is_closed_ = true ;
67
67
is_reading_ = false ;
68
68
source ()->RemoveStreamListener (&readable_listener_);
69
- sink ()->RemoveStreamListener (&writable_listener_);
69
+ if (pending_writes_ == 0 )
70
+ sink ()->RemoveStreamListener (&writable_listener_);
70
71
71
72
// Delay the JS-facing part with SetImmediate, because this might be from
72
73
// inside the garbage collector, so we can’t run JS here.
@@ -124,13 +125,16 @@ void StreamPipe::ReadableListener::OnStreamRead(ssize_t nread,
124
125
// EOF or error; stop reading and pass the error to the previous listener
125
126
// (which might end up in JS).
126
127
pipe->is_eof_ = true ;
128
+ // Cache `sink()` here because the previous listener might do things
129
+ // that eventually lead to an `Unpipe()` call.
130
+ StreamBase* sink = pipe->sink ();
127
131
stream ()->ReadStop ();
128
132
CHECK_NOT_NULL (previous_listener_);
129
133
previous_listener_->OnStreamRead (nread, uv_buf_init (nullptr , 0 ));
130
134
// If we’re not writing, close now. Otherwise, we’ll do that in
131
135
// `OnStreamAfterWrite()`.
132
- if (! pipe->is_writing_ ) {
133
- pipe-> ShutdownWritable ();
136
+ if (pipe->pending_writes_ == 0 ) {
137
+ sink-> Shutdown ();
134
138
pipe->Unpipe ();
135
139
}
136
140
return ;
@@ -140,30 +144,38 @@ void StreamPipe::ReadableListener::OnStreamRead(ssize_t nread,
140
144
}
141
145
142
146
void StreamPipe::ProcessData (size_t nread, AllocatedBuffer&& buf) {
147
+ CHECK (uses_wants_write_ || pending_writes_ == 0 );
143
148
uv_buf_t buffer = uv_buf_init (buf.data (), nread);
144
149
StreamWriteResult res = sink ()->Write (&buffer, 1 );
150
+ pending_writes_++;
145
151
if (!res.async ) {
146
152
writable_listener_.OnStreamAfterWrite (nullptr , res.err );
147
153
} else {
148
- is_writing_ = true ;
149
154
is_reading_ = false ;
150
155
res.wrap ->SetAllocatedStorage (std::move (buf));
151
156
if (source () != nullptr )
152
157
source ()->ReadStop ();
153
158
}
154
159
}
155
160
156
- void StreamPipe::ShutdownWritable () {
157
- sink ()->Shutdown ();
158
- }
159
-
160
161
void StreamPipe::WritableListener::OnStreamAfterWrite (WriteWrap* w,
161
162
int status) {
162
163
StreamPipe* pipe = ContainerOf (&StreamPipe::writable_listener_, this );
163
- pipe->is_writing_ = false ;
164
+ pipe->pending_writes_ --;
165
+ if (pipe->is_closed_ ) {
166
+ if (pipe->pending_writes_ == 0 ) {
167
+ Environment* env = pipe->env ();
168
+ HandleScope handle_scope (env->isolate ());
169
+ Context::Scope context_scope (env->context ());
170
+ pipe->MakeCallback (env->oncomplete_string (), 0 , nullptr ).ToLocalChecked ();
171
+ stream ()->RemoveStreamListener (this );
172
+ }
173
+ return ;
174
+ }
175
+
164
176
if (pipe->is_eof_ ) {
165
177
AsyncScope async_scope (pipe);
166
- pipe->ShutdownWritable ();
178
+ pipe->sink ()-> Shutdown ();
167
179
pipe->Unpipe ();
168
180
return ;
169
181
}
@@ -175,6 +187,10 @@ void StreamPipe::WritableListener::OnStreamAfterWrite(WriteWrap* w,
175
187
prev->OnStreamAfterWrite (w, status);
176
188
return ;
177
189
}
190
+
191
+ if (!pipe->uses_wants_write_ ) {
192
+ OnStreamWantsWrite (65536 );
193
+ }
178
194
}
179
195
180
196
void StreamPipe::WritableListener::OnStreamAfterShutdown (ShutdownWrap* w,
@@ -198,6 +214,7 @@ void StreamPipe::WritableListener::OnStreamDestroy() {
198
214
StreamPipe* pipe = ContainerOf (&StreamPipe::writable_listener_, this );
199
215
pipe->sink_destroyed_ = true ;
200
216
pipe->is_eof_ = true ;
217
+ pipe->pending_writes_ = 0 ;
201
218
pipe->Unpipe ();
202
219
}
203
220
@@ -236,8 +253,7 @@ void StreamPipe::Start(const FunctionCallbackInfo<Value>& args) {
236
253
StreamPipe* pipe;
237
254
ASSIGN_OR_RETURN_UNWRAP (&pipe, args.Holder ());
238
255
pipe->is_closed_ = false ;
239
- if (pipe->wanted_data_ > 0 )
240
- pipe->writable_listener_ .OnStreamWantsWrite (pipe->wanted_data_ );
256
+ pipe->writable_listener_ .OnStreamWantsWrite (65536 );
241
257
}
242
258
243
259
void StreamPipe::Unpipe (const FunctionCallbackInfo<Value>& args) {
@@ -246,6 +262,18 @@ void StreamPipe::Unpipe(const FunctionCallbackInfo<Value>& args) {
246
262
pipe->Unpipe ();
247
263
}
248
264
265
+ void StreamPipe::IsClosed (const FunctionCallbackInfo<Value>& args) {
266
+ StreamPipe* pipe;
267
+ ASSIGN_OR_RETURN_UNWRAP (&pipe, args.Holder ());
268
+ args.GetReturnValue ().Set (pipe->is_closed_ );
269
+ }
270
+
271
+ void StreamPipe::PendingWrites (const FunctionCallbackInfo<Value>& args) {
272
+ StreamPipe* pipe;
273
+ ASSIGN_OR_RETURN_UNWRAP (&pipe, args.Holder ());
274
+ args.GetReturnValue ().Set (pipe->pending_writes_ );
275
+ }
276
+
249
277
namespace {
250
278
251
279
void InitializeStreamPipe (Local<Object> target,
@@ -260,6 +288,8 @@ void InitializeStreamPipe(Local<Object> target,
260
288
FIXED_ONE_BYTE_STRING (env->isolate (), " StreamPipe" );
261
289
env->SetProtoMethod (pipe, " unpipe" , StreamPipe::Unpipe);
262
290
env->SetProtoMethod (pipe, " start" , StreamPipe::Start);
291
+ env->SetProtoMethod (pipe, " isClosed" , StreamPipe::IsClosed);
292
+ env->SetProtoMethod (pipe, " pendingWrites" , StreamPipe::PendingWrites);
263
293
pipe->Inherit (AsyncWrap::GetConstructorTemplate (env));
264
294
pipe->SetClassName (stream_pipe_string);
265
295
pipe->InstanceTemplate ()->SetInternalFieldCount (1 );
0 commit comments