Skip to content

Commit 67f1d76

Browse files
committed
src: introduce native-layer stream piping
Provide a way to create pipes between native `StreamBase` instances that acts more directly than a `.pipe()` call would. PR-URL: #18936 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
1 parent f7f1437 commit 67f1d76

File tree

10 files changed

+358
-3
lines changed

10 files changed

+358
-3
lines changed

node.gyp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,7 @@
338338
'src/string_decoder.cc',
339339
'src/string_search.cc',
340340
'src/stream_base.cc',
341+
'src/stream_pipe.cc',
341342
'src/stream_wrap.cc',
342343
'src/tcp_wrap.cc',
343344
'src/timer_wrap.cc',
@@ -394,6 +395,7 @@
394395
'src/string_decoder-inl.h',
395396
'src/stream_base.h',
396397
'src/stream_base-inl.h',
398+
'src/stream_pipe.h',
397399
'src/stream_wrap.h',
398400
'src/tracing/agent.h',
399401
'src/tracing/node_trace_buffer.h',

src/async_wrap.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ namespace node {
5858
V(SHUTDOWNWRAP) \
5959
V(SIGNALWRAP) \
6060
V(STATWATCHER) \
61+
V(STREAMPIPE) \
6162
V(TCPCONNECTWRAP) \
6263
V(TCPSERVERWRAP) \
6364
V(TCPWRAP) \

src/env.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,7 @@ struct PackageConfig {
222222
V(onstop_string, "onstop") \
223223
V(onstreamclose_string, "onstreamclose") \
224224
V(ontrailers_string, "ontrailers") \
225+
V(onunpipe_string, "onunpipe") \
225226
V(onwrite_string, "onwrite") \
226227
V(openssl_error_stack, "opensslErrorStack") \
227228
V(output_string, "output") \
@@ -233,6 +234,8 @@ struct PackageConfig {
233234
V(pbkdf2_error_string, "PBKDF2 Error") \
234235
V(pid_string, "pid") \
235236
V(pipe_string, "pipe") \
237+
V(pipe_target_string, "pipeTarget") \
238+
V(pipe_source_string, "pipeSource") \
236239
V(port_string, "port") \
237240
V(preference_string, "preference") \
238241
V(priority_string, "priority") \
@@ -255,9 +258,11 @@ struct PackageConfig {
255258
V(session_id_string, "sessionId") \
256259
V(shell_string, "shell") \
257260
V(signal_string, "signal") \
261+
V(sink_string, "sink") \
258262
V(size_string, "size") \
259263
V(sni_context_err_string, "Invalid SNI context") \
260264
V(sni_context_string, "sni_context") \
265+
V(source_string, "source") \
261266
V(stack_string, "stack") \
262267
V(status_string, "status") \
263268
V(stdio_string, "stdio") \

src/node_http2.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1813,7 +1813,9 @@ inline void Http2Stream::Close(int32_t code) {
18131813
}
18141814

18151815
int Http2Stream::DoShutdown(ShutdownWrap* req_wrap) {
1816-
CHECK(!this->IsDestroyed());
1816+
if (IsDestroyed())
1817+
return UV_EPIPE;
1818+
18171819
{
18181820
Http2Scope h2scope(this);
18191821
flags_ |= NGHTTP2_STREAM_FLAG_SHUT;

src/node_internals.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ struct sockaddr;
120120
V(serdes) \
121121
V(signal_wrap) \
122122
V(spawn_sync) \
123+
V(stream_pipe) \
123124
V(stream_wrap) \
124125
V(string_decoder) \
125126
V(tcp_wrap) \

src/stream_base-inl.h

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,14 @@ inline void StreamListener::OnStreamAfterWrite(WriteWrap* w, int status) {
6767

6868
inline StreamResource::~StreamResource() {
6969
while (listener_ != nullptr) {
70-
listener_->OnStreamDestroy();
71-
RemoveStreamListener(listener_);
70+
StreamListener* listener = listener_;
71+
listener->OnStreamDestroy();
72+
// Remove the listener if it didn’t remove itself. This makes the logic
73+
// logic in `OnStreamDestroy()` implementations easier, because they
74+
// may call generic cleanup functions which can just remove the
75+
// listener unconditionally.
76+
if (listener == listener_)
77+
RemoveStreamListener(listener_);
7278
}
7379
}
7480

src/stream_base.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,9 @@ class StreamListener {
141141
// This is called immediately before the stream is destroyed.
142142
virtual void OnStreamDestroy() {}
143143

144+
// The stream this is currently associated with, or nullptr if there is none.
145+
inline StreamResource* stream() { return stream_; }
146+
144147
protected:
145148
// Pass along a read error to the `StreamListener` instance that was active
146149
// before this one. For example, a protocol parser does not care about read

src/stream_pipe.cc

Lines changed: 266 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,266 @@
1+
#include "stream_pipe.h"
2+
#include "stream_base-inl.h"
3+
#include "node_buffer.h"
4+
#include "node_internals.h"
5+
6+
using v8::Context;
7+
using v8::External;
8+
using v8::FunctionCallbackInfo;
9+
using v8::FunctionTemplate;
10+
using v8::Local;
11+
using v8::Object;
12+
using v8::Value;
13+
14+
namespace node {
15+
16+
StreamPipe::StreamPipe(StreamBase* source,
17+
StreamBase* sink,
18+
Local<Object> obj)
19+
: AsyncWrap(source->stream_env(), obj, AsyncWrap::PROVIDER_STREAMPIPE) {
20+
MakeWeak(this);
21+
22+
CHECK_NE(sink, nullptr);
23+
CHECK_NE(source, nullptr);
24+
25+
source->PushStreamListener(&readable_listener_);
26+
sink->PushStreamListener(&writable_listener_);
27+
28+
CHECK(sink->HasWantsWrite());
29+
30+
// Set up links between this object and the source/sink objects.
31+
// In particular, this makes sure that they are garbage collected as a group,
32+
// if that applies to the given streams (for example, Http2Streams use
33+
// weak references).
34+
obj->Set(env()->context(), env()->source_string(), source->GetObject())
35+
.FromJust();
36+
source->GetObject()->Set(env()->context(), env()->pipe_target_string(), obj)
37+
.FromJust();
38+
obj->Set(env()->context(), env()->sink_string(), sink->GetObject())
39+
.FromJust();
40+
sink->GetObject()->Set(env()->context(), env()->pipe_source_string(), obj)
41+
.FromJust();
42+
}
43+
44+
StreamPipe::~StreamPipe() {
45+
CHECK(is_closed_);
46+
}
47+
48+
StreamBase* StreamPipe::source() {
49+
return static_cast<StreamBase*>(readable_listener_.stream());
50+
}
51+
52+
StreamBase* StreamPipe::sink() {
53+
return static_cast<StreamBase*>(writable_listener_.stream());
54+
}
55+
56+
void StreamPipe::Unpipe() {
57+
if (is_closed_)
58+
return;
59+
60+
// Note that we cannot use virtual methods on `source` and `sink` here,
61+
// because this function can be called from their destructors via
62+
// `OnStreamDestroy()`.
63+
64+
is_closed_ = true;
65+
is_reading_ = false;
66+
source()->RemoveStreamListener(&readable_listener_);
67+
sink()->RemoveStreamListener(&writable_listener_);
68+
69+
// Delay the JS-facing part with SetImmediate, because this might be from
70+
// inside the garbage collector, so we can’t run JS here.
71+
HandleScope handle_scope(env()->isolate());
72+
env()->SetImmediate([](Environment* env, void* data) {
73+
StreamPipe* pipe = static_cast<StreamPipe*>(data);
74+
75+
HandleScope handle_scope(env->isolate());
76+
Context::Scope context_scope(env->context());
77+
Local<Object> object = pipe->object();
78+
79+
if (object->Has(env->context(), env->onunpipe_string()).FromJust()) {
80+
pipe->MakeCallback(env->onunpipe_string(), 0, nullptr).ToLocalChecked();
81+
}
82+
83+
// Set all the links established in the constructor to `null`.
84+
Local<Value> null = Null(env->isolate());
85+
86+
Local<Value> source_v;
87+
Local<Value> sink_v;
88+
source_v = object->Get(env->context(), env->source_string())
89+
.ToLocalChecked();
90+
sink_v = object->Get(env->context(), env->sink_string())
91+
.ToLocalChecked();
92+
CHECK(source_v->IsObject());
93+
CHECK(sink_v->IsObject());
94+
95+
object->Set(env->context(), env->source_string(), null).FromJust();
96+
object->Set(env->context(), env->sink_string(), null).FromJust();
97+
source_v.As<Object>()->Set(env->context(),
98+
env->pipe_target_string(),
99+
null).FromJust();
100+
sink_v.As<Object>()->Set(env->context(),
101+
env->pipe_source_string(),
102+
null).FromJust();
103+
}, static_cast<void*>(this), object());
104+
}
105+
106+
uv_buf_t StreamPipe::ReadableListener::OnStreamAlloc(size_t suggested_size) {
107+
StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
108+
size_t size = std::min(suggested_size, pipe->wanted_data_);
109+
CHECK_GT(size, 0);
110+
return uv_buf_init(Malloc(size), size);
111+
}
112+
113+
void StreamPipe::ReadableListener::OnStreamRead(ssize_t nread,
114+
const uv_buf_t& buf) {
115+
StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
116+
AsyncScope async_scope(pipe);
117+
if (nread < 0) {
118+
// EOF or error; stop reading and pass the error to the previous listener
119+
// (which might end up in JS).
120+
free(buf.base);
121+
pipe->is_eof_ = true;
122+
stream()->ReadStop();
123+
CHECK_NE(previous_listener_, nullptr);
124+
previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0));
125+
// If we’re not writing, close now. Otherwise, we’ll do that in
126+
// `OnStreamAfterWrite()`.
127+
if (!pipe->is_writing_) {
128+
pipe->ShutdownWritable();
129+
pipe->Unpipe();
130+
}
131+
return;
132+
}
133+
134+
pipe->ProcessData(nread, buf);
135+
}
136+
137+
void StreamPipe::ProcessData(size_t nread, const uv_buf_t& buf) {
138+
uv_buf_t buffer = uv_buf_init(buf.base, nread);
139+
StreamWriteResult res = sink()->Write(&buffer, 1);
140+
if (!res.async) {
141+
free(buf.base);
142+
writable_listener_.OnStreamAfterWrite(nullptr, res.err);
143+
} else {
144+
is_writing_ = true;
145+
is_reading_ = false;
146+
res.wrap->SetAllocatedStorage(buf.base, buf.len);
147+
source()->ReadStop();
148+
}
149+
}
150+
151+
void StreamPipe::ShutdownWritable() {
152+
sink()->Shutdown();
153+
}
154+
155+
void StreamPipe::WritableListener::OnStreamAfterWrite(WriteWrap* w,
156+
int status) {
157+
StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
158+
pipe->is_writing_ = false;
159+
if (pipe->is_eof_) {
160+
AsyncScope async_scope(pipe);
161+
pipe->ShutdownWritable();
162+
pipe->Unpipe();
163+
return;
164+
}
165+
166+
if (status != 0) {
167+
CHECK_NE(previous_listener_, nullptr);
168+
StreamListener* prev = previous_listener_;
169+
pipe->Unpipe();
170+
prev->OnStreamAfterWrite(w, status);
171+
return;
172+
}
173+
}
174+
175+
void StreamPipe::WritableListener::OnStreamAfterShutdown(ShutdownWrap* w,
176+
int status) {
177+
StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
178+
CHECK_NE(previous_listener_, nullptr);
179+
StreamListener* prev = previous_listener_;
180+
pipe->Unpipe();
181+
prev->OnStreamAfterShutdown(w, status);
182+
}
183+
184+
void StreamPipe::ReadableListener::OnStreamDestroy() {
185+
StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
186+
if (!pipe->is_eof_) {
187+
OnStreamRead(UV_EPIPE, uv_buf_init(nullptr, 0));
188+
}
189+
}
190+
191+
void StreamPipe::WritableListener::OnStreamDestroy() {
192+
StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
193+
pipe->is_eof_ = true;
194+
pipe->Unpipe();
195+
}
196+
197+
void StreamPipe::WritableListener::OnStreamWantsWrite(size_t suggested_size) {
198+
StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
199+
pipe->wanted_data_ = suggested_size;
200+
if (pipe->is_reading_ || pipe->is_closed_)
201+
return;
202+
AsyncScope async_scope(pipe);
203+
pipe->is_reading_ = true;
204+
pipe->source()->ReadStart();
205+
}
206+
207+
uv_buf_t StreamPipe::WritableListener::OnStreamAlloc(size_t suggested_size) {
208+
CHECK_NE(previous_listener_, nullptr);
209+
return previous_listener_->OnStreamAlloc(suggested_size);
210+
}
211+
212+
void StreamPipe::WritableListener::OnStreamRead(ssize_t nread,
213+
const uv_buf_t& buf) {
214+
CHECK_NE(previous_listener_, nullptr);
215+
return previous_listener_->OnStreamRead(nread, buf);
216+
}
217+
218+
void StreamPipe::New(const FunctionCallbackInfo<Value>& args) {
219+
CHECK(args.IsConstructCall());
220+
CHECK(args[0]->IsExternal());
221+
CHECK(args[1]->IsExternal());
222+
auto source = static_cast<StreamBase*>(args[0].As<External>()->Value());
223+
auto sink = static_cast<StreamBase*>(args[1].As<External>()->Value());
224+
225+
new StreamPipe(source, sink, args.This());
226+
}
227+
228+
void StreamPipe::Start(const FunctionCallbackInfo<Value>& args) {
229+
StreamPipe* pipe;
230+
ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
231+
pipe->is_closed_ = false;
232+
if (pipe->wanted_data_ > 0)
233+
pipe->writable_listener_.OnStreamWantsWrite(pipe->wanted_data_);
234+
}
235+
236+
void StreamPipe::Unpipe(const FunctionCallbackInfo<Value>& args) {
237+
StreamPipe* pipe;
238+
ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
239+
pipe->Unpipe();
240+
}
241+
242+
namespace {
243+
244+
void InitializeStreamPipe(Local<Object> target,
245+
Local<Value> unused,
246+
Local<Context> context) {
247+
Environment* env = Environment::GetCurrent(context);
248+
249+
// Create FunctionTemplate for FileHandle::CloseReq
250+
Local<FunctionTemplate> pipe = env->NewFunctionTemplate(StreamPipe::New);
251+
Local<String> stream_pipe_string =
252+
FIXED_ONE_BYTE_STRING(env->isolate(), "StreamPipe");
253+
env->SetProtoMethod(pipe, "unpipe", StreamPipe::Unpipe);
254+
env->SetProtoMethod(pipe, "start", StreamPipe::Start);
255+
AsyncWrap::AddWrapMethods(env, pipe);
256+
pipe->SetClassName(stream_pipe_string);
257+
pipe->InstanceTemplate()->SetInternalFieldCount(1);
258+
target->Set(context, stream_pipe_string, pipe->GetFunction()).FromJust();
259+
}
260+
261+
} // anonymous namespace
262+
263+
} // namespace node
264+
265+
NODE_MODULE_CONTEXT_AWARE_INTERNAL(stream_pipe,
266+
node::InitializeStreamPipe)

0 commit comments

Comments
 (0)