diff --git a/lib/internal/child_process.js b/lib/internal/child_process.js index 13d0a11aa1e705..44904658217d77 100644 --- a/lib/internal/child_process.js +++ b/lib/internal/child_process.js @@ -455,7 +455,10 @@ function setupChannel(target, channel) { var jsonBuffer = ''; var pendingHandle = null; channel.buffering = false; - channel.onread = function(nread, pool, recvHandle) { + channel.pendingHandle = null; + channel.onread = function(nread, pool) { + const recvHandle = channel.pendingHandle; + channel.pendingHandle = null; // TODO(bnoordhuis) Check that nread > 0. if (pool) { if (recvHandle) diff --git a/src/env.h b/src/env.h index 5ebd56a3fff633..89027767f3885a 100644 --- a/src/env.h +++ b/src/env.h @@ -215,6 +215,7 @@ class ModuleWrap; V(owner_string, "owner") \ V(parse_error_string, "Parse Error") \ V(path_string, "path") \ + V(pending_handle_string, "pendingHandle") \ V(pbkdf2_error_string, "PBKDF2 Error") \ V(pid_string, "pid") \ V(pipe_string, "pipe") \ diff --git a/src/stream_base-inl.h b/src/stream_base-inl.h index 287978a87034eb..76922c1d8af77d 100644 --- a/src/stream_base-inl.h +++ b/src/stream_base-inl.h @@ -33,9 +33,7 @@ inline StreamListener::~StreamListener() { inline void StreamListener::PassReadErrorToPreviousListener(ssize_t nread) { CHECK_NE(previous_listener_, nullptr); - previous_listener_->OnStreamRead(nread, - uv_buf_init(nullptr, 0), - UV_UNKNOWN_HANDLE); + previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0)); } @@ -85,12 +83,10 @@ inline uv_buf_t StreamResource::EmitAlloc(size_t suggested_size) { return listener_->OnStreamAlloc(suggested_size); } -inline void StreamResource::EmitRead(ssize_t nread, - const uv_buf_t& buf, - uv_handle_type pending) { +inline void StreamResource::EmitRead(ssize_t nread, const uv_buf_t& buf) { if (nread > 0) bytes_read_ += static_cast(nread); - listener_->OnStreamRead(nread, buf, pending); + listener_->OnStreamRead(nread, buf); } inline void StreamResource::EmitAfterWrite(WriteWrap* w, int status) { diff --git a/src/stream_base.cc b/src/stream_base.cc index 9acf2273abd78b..8bdcebe88ab19f 100644 --- a/src/stream_base.cc +++ b/src/stream_base.cc @@ -437,23 +437,17 @@ void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) { } -void StreamBase::CallJSOnreadMethod(ssize_t nread, - Local buf, - Local handle) { +void StreamBase::CallJSOnreadMethod(ssize_t nread, Local buf) { Environment* env = env_; Local argv[] = { Integer::New(env->isolate(), nread), - buf, - handle + buf }; if (argv[1].IsEmpty()) argv[1] = Undefined(env->isolate()); - if (argv[2].IsEmpty()) - argv[2] = Undefined(env->isolate()); - AsyncWrap* wrap = GetAsyncWrap(); CHECK_NE(wrap, nullptr); wrap->MakeCallback(env->onread_string(), arraysize(argv), argv); @@ -495,19 +489,6 @@ uv_buf_t StreamListener::OnStreamAlloc(size_t suggested_size) { return uv_buf_init(Malloc(suggested_size), suggested_size); } -void StreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) { - // This cannot be virtual because it is just as valid to override the other - // OnStreamRead() callback. - CHECK(0 && "OnStreamRead() needs to be implemented"); -} - -void StreamListener::OnStreamRead(ssize_t nread, - const uv_buf_t& buf, - uv_handle_type pending) { - CHECK_EQ(pending, UV_UNKNOWN_HANDLE); - OnStreamRead(nread, buf); -} - void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) { CHECK_NE(stream_, nullptr); diff --git a/src/stream_base.h b/src/stream_base.h index 0b176d11819fca..f18b6bda0a08a4 100644 --- a/src/stream_base.h +++ b/src/stream_base.h @@ -150,17 +150,8 @@ class StreamListener { // with base nullpptr in case of an error. // `nread` is the number of read bytes (which is at most the buffer length), // or, if negative, a libuv error code. - // The variant with a `uv_handle_type` argument is used by libuv-backed - // streams for handle transfers (e.g. passing net.Socket instances between - // cluster workers). For all other streams, overriding the simple variant - // should be sufficient. - // By default, the second variant crashes if `pending` is set and otherwise - // calls the simple variant. virtual void OnStreamRead(ssize_t nread, const uv_buf_t& buf) = 0; - virtual void OnStreamRead(ssize_t nread, - const uv_buf_t& buf, - uv_handle_type pending); // This is called once a Write has finished. `status` may be 0 or, // if negative, a libuv error code. @@ -229,9 +220,7 @@ class StreamResource { uv_buf_t EmitAlloc(size_t suggested_size); // Call the current listener's OnStreamRead() method and update the // stream's read byte counter. - void EmitRead(ssize_t nread, - const uv_buf_t& buf = uv_buf_init(nullptr, 0), - uv_handle_type pending = UV_UNKNOWN_HANDLE); + void EmitRead(ssize_t nread, const uv_buf_t& buf = uv_buf_init(nullptr, 0)); // Call the current listener's OnStreamAfterWrite() method. void EmitAfterWrite(WriteWrap* w, int status); @@ -260,10 +249,7 @@ class StreamBase : public StreamResource { virtual bool IsIPCPipe(); virtual int GetFD(); - void CallJSOnreadMethod( - ssize_t nread, - v8::Local buf, - v8::Local handle = v8::Local()); + void CallJSOnreadMethod(ssize_t nread, v8::Local buf); // These are called by the respective {Write,Shutdown}Wrap class. virtual void AfterShutdown(ShutdownWrap* req, int status); diff --git a/src/stream_wrap.cc b/src/stream_wrap.cc index 0be73f9114adb1..bc10cf80e828f1 100644 --- a/src/stream_wrap.cc +++ b/src/stream_wrap.cc @@ -93,7 +93,6 @@ LibuvStreamWrap::LibuvStreamWrap(Environment* env, provider), StreamBase(env), stream_(stream) { - PushStreamListener(this); } @@ -146,7 +145,13 @@ bool LibuvStreamWrap::IsIPCPipe() { int LibuvStreamWrap::ReadStart() { - return uv_read_start(stream(), OnAlloc, OnRead); + return uv_read_start(stream(), [](uv_handle_t* handle, + size_t suggested_size, + uv_buf_t* buf) { + static_cast(handle->data)->OnUvAlloc(suggested_size, buf); + }, [](uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) { + static_cast(stream->data)->OnUvRead(nread, buf); + }); } @@ -155,16 +160,11 @@ int LibuvStreamWrap::ReadStop() { } -void LibuvStreamWrap::OnAlloc(uv_handle_t* handle, - size_t suggested_size, - uv_buf_t* buf) { - LibuvStreamWrap* wrap = static_cast(handle->data); - HandleScope scope(wrap->env()->isolate()); - Context::Scope context_scope(wrap->env()->context()); - - CHECK_EQ(wrap->stream(), reinterpret_cast(handle)); +void LibuvStreamWrap::OnUvAlloc(size_t suggested_size, uv_buf_t* buf) { + HandleScope scope(env()->isolate()); + Context::Scope context_scope(env()->context()); - *buf = wrap->EmitAlloc(suggested_size); + *buf = EmitAlloc(suggested_size); } @@ -190,64 +190,47 @@ static Local AcceptHandle(Environment* env, LibuvStreamWrap* parent) { } -void LibuvStreamWrap::OnStreamRead(ssize_t nread, - const uv_buf_t& buf, - uv_handle_type pending) { - HandleScope handle_scope(env()->isolate()); +void LibuvStreamWrap::OnUvRead(ssize_t nread, const uv_buf_t* buf) { + HandleScope scope(env()->isolate()); Context::Scope context_scope(env()->context()); - - if (nread <= 0) { - free(buf.base); - if (nread < 0) - CallJSOnreadMethod(nread, Local()); - return; - } - - CHECK_LE(static_cast(nread), buf.len); - - Local pending_obj; - - if (pending == UV_TCP) { - pending_obj = AcceptHandle(env(), this); - } else if (pending == UV_NAMED_PIPE) { - pending_obj = AcceptHandle(env(), this); - } else if (pending == UV_UDP) { - pending_obj = AcceptHandle(env(), this); - } else { - CHECK_EQ(pending, UV_UNKNOWN_HANDLE); - } - - Local obj = Buffer::New(env(), buf.base, nread).ToLocalChecked(); - CallJSOnreadMethod(nread, obj, pending_obj); -} - - -void LibuvStreamWrap::OnRead(uv_stream_t* handle, - ssize_t nread, - const uv_buf_t* buf) { - LibuvStreamWrap* wrap = static_cast(handle->data); - HandleScope scope(wrap->env()->isolate()); - Context::Scope context_scope(wrap->env()->context()); uv_handle_type type = UV_UNKNOWN_HANDLE; - if (wrap->is_named_pipe_ipc() && - uv_pipe_pending_count(reinterpret_cast(handle)) > 0) { - type = uv_pipe_pending_type(reinterpret_cast(handle)); + if (is_named_pipe_ipc() && + uv_pipe_pending_count(reinterpret_cast(stream())) > 0) { + type = uv_pipe_pending_type(reinterpret_cast(stream())); } // We should not be getting this callback if someone as already called // uv_close() on the handle. - CHECK_EQ(wrap->persistent().IsEmpty(), false); + CHECK_EQ(persistent().IsEmpty(), false); if (nread > 0) { - if (wrap->is_tcp()) { + if (is_tcp()) { NODE_COUNT_NET_BYTES_RECV(nread); - } else if (wrap->is_named_pipe()) { + } else if (is_named_pipe()) { NODE_COUNT_PIPE_BYTES_RECV(nread); } + + Local pending_obj; + + if (type == UV_TCP) { + pending_obj = AcceptHandle(env(), this); + } else if (type == UV_NAMED_PIPE) { + pending_obj = AcceptHandle(env(), this); + } else if (type == UV_UDP) { + pending_obj = AcceptHandle(env(), this); + } else { + CHECK_EQ(type, UV_UNKNOWN_HANDLE); + } + + if (!pending_obj.IsEmpty()) { + object()->Set(env()->context(), + env()->pending_handle_string(), + pending_obj).FromJust(); + } } - wrap->EmitRead(nread, *buf, type); + EmitRead(nread, *buf); } @@ -373,11 +356,6 @@ void LibuvStreamWrap::AfterUvWrite(uv_write_t* req, int status) { req_wrap->Done(status); } - -void LibuvStreamWrap::AfterWrite(WriteWrap* w, int status) { - StreamBase::AfterWrite(w, status); -} - } // namespace node NODE_BUILTIN_MODULE_CONTEXT_AWARE(stream_wrap, diff --git a/src/stream_wrap.h b/src/stream_wrap.h index 129006b1600c6c..e5ad25b91e6fea 100644 --- a/src/stream_wrap.h +++ b/src/stream_wrap.h @@ -33,9 +33,7 @@ namespace node { -class LibuvStreamWrap : public HandleWrap, - public StreamListener, - public StreamBase { +class LibuvStreamWrap : public HandleWrap, public StreamBase { public: static void Initialize(v8::Local target, v8::Local unused, @@ -93,30 +91,12 @@ class LibuvStreamWrap : public HandleWrap, static void SetBlocking(const v8::FunctionCallbackInfo& args); // Callbacks for libuv - static void OnAlloc(uv_handle_t* handle, - size_t suggested_size, - uv_buf_t* buf); + void OnUvAlloc(size_t suggested_size, uv_buf_t* buf); + void OnUvRead(ssize_t nread, const uv_buf_t* buf); - static void OnRead(uv_stream_t* handle, - ssize_t nread, - const uv_buf_t* buf); static void AfterUvWrite(uv_write_t* req, int status); static void AfterUvShutdown(uv_shutdown_t* req, int status); - // Resource interface implementation - void OnStreamRead(ssize_t nread, - const uv_buf_t& buf) override { - CHECK(0 && "must not be called"); - } - void OnStreamRead(ssize_t nread, - const uv_buf_t& buf, - uv_handle_type pending) override; - void OnStreamAfterWrite(WriteWrap* w, int status) override { - previous_listener_->OnStreamAfterWrite(w, status); - } - - void AfterWrite(WriteWrap* req_wrap, int status) override; - uv_stream_t* const stream_; };