Skip to content

Commit

Permalink
net,src: refactor writeQueueSize tracking
Browse files Browse the repository at this point in the history
Currently, writeQueueSize is never used in C++ and barely used
within JS. Instead of constantly updating the value on the JS
object, create a getter that will retrieve the most up-to-date
value from C++.

For the vast majority of cases though, create a new prop on
Socket.prototype[kLastWriteQueueSize] using a Symbol. Use this
to track the current write size, entirely in JS land.

PR-URL: nodejs#17650
  • Loading branch information
apapirovski authored and addaleax committed Jan 10, 2018
1 parent 71a7ede commit f9a17f5
Show file tree
Hide file tree
Showing 14 changed files with 126 additions and 185 deletions.
5 changes: 0 additions & 5 deletions lib/_tls_wrap.js
Original file line number Diff line number Diff line change
Expand Up @@ -460,11 +460,6 @@ TLSSocket.prototype._init = function(socket, wrap) {
var options = this._tlsOptions;
var ssl = this._handle;

// lib/net.js expect this value to be non-zero if write hasn't been flushed
// immediately. After the handshake is done this will represent the actual
// write queue size
ssl.writeQueueSize = 1;

this.server = options.server;

// For clients, we will always have either a given ca list or be using
Expand Down
33 changes: 21 additions & 12 deletions lib/net.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ const { nextTick } = require('internal/process/next_tick');
const errors = require('internal/errors');
const dns = require('dns');

const kLastWriteQueueSize = Symbol('lastWriteQueueSize');

// `cluster` is only used by `listenInCluster` so for startup performance
// reasons it's lazy loaded.
var cluster = null;
Expand Down Expand Up @@ -198,6 +200,7 @@ function Socket(options) {
this._handle = null;
this._parent = null;
this._host = null;
this[kLastWriteQueueSize] = 0;

if (typeof options === 'number')
options = { fd: options }; // Legacy interface.
Expand Down Expand Up @@ -398,12 +401,14 @@ Socket.prototype.setTimeout = function(msecs, callback) {


Socket.prototype._onTimeout = function() {
if (this._handle) {
// `.prevWriteQueueSize` !== `.updateWriteQueueSize()` means there is
const handle = this._handle;
const lastWriteQueueSize = this[kLastWriteQueueSize];
if (lastWriteQueueSize > 0 && handle) {
// `lastWriteQueueSize !== writeQueueSize` means there is
// an active write in progress, so we suppress the timeout.
const prevWriteQueueSize = this._handle.writeQueueSize;
if (prevWriteQueueSize > 0 &&
prevWriteQueueSize !== this._handle.updateWriteQueueSize()) {
const writeQueueSize = handle.writeQueueSize;
if (lastWriteQueueSize !== writeQueueSize) {
this[kLastWriteQueueSize] = writeQueueSize;
this._unrefTimer();
return;
}
Expand Down Expand Up @@ -473,7 +478,7 @@ Object.defineProperty(Socket.prototype, 'readyState', {
Object.defineProperty(Socket.prototype, 'bufferSize', {
get: function() {
if (this._handle) {
return this._handle.writeQueueSize + this.writableLength;
return this[kLastWriteQueueSize] + this.writableLength;
}
}
});
Expand Down Expand Up @@ -764,12 +769,13 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) {

this._bytesDispatched += req.bytes;

// If it was entirely flushed, we can write some more right now.
// However, if more is left in the queue, then wait until that clears.
if (req.async && this._handle.writeQueueSize !== 0)
req.cb = cb;
else
if (!req.async) {
cb();
return;
}

req.cb = cb;
this[kLastWriteQueueSize] = req.bytes;
};


Expand Down Expand Up @@ -853,6 +859,9 @@ function afterWrite(status, handle, req, err) {
if (self !== process.stderr && self !== process.stdout)
debug('afterWrite', status);

if (req.async)
self[kLastWriteQueueSize] = 0;

// callback may come after call to destroy.
if (self.destroyed) {
debug('afterWrite destroyed');
Expand All @@ -872,7 +881,7 @@ function afterWrite(status, handle, req, err) {
debug('afterWrite call cb');

if (req.cb)
req.cb.call(self);
req.cb.call(undefined);
}


Expand Down
1 change: 0 additions & 1 deletion src/pipe_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ PipeWrap::PipeWrap(Environment* env,
int r = uv_pipe_init(env->event_loop(), &handle_, ipc);
CHECK_EQ(r, 0); // How do we proxy this error up to javascript?
// Suggestion: uv_pipe_init() returns void.
UpdateWriteQueueSize();
}


Expand Down
13 changes: 10 additions & 3 deletions src/stream_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
}

err = DoWrite(req_wrap, buf_list, count, nullptr);
req_wrap_obj->Set(env->async(), True(env->isolate()));
if (HasWriteQueue())
req_wrap_obj->Set(env->async(), True(env->isolate()));

if (err)
req_wrap->Dispose();
Expand Down Expand Up @@ -249,7 +250,8 @@ int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
req_wrap = WriteWrap::New(env, req_wrap_obj, this);

err = DoWrite(req_wrap, bufs, count, nullptr);
req_wrap_obj->Set(env->async(), True(env->isolate()));
if (HasWriteQueue())
req_wrap_obj->Set(env->async(), True(env->isolate()));
req_wrap_obj->Set(env->buffer_string(), args[1]);

if (err)
Expand Down Expand Up @@ -373,7 +375,8 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
reinterpret_cast<uv_stream_t*>(send_handle));
}

req_wrap_obj->Set(env->async(), True(env->isolate()));
if (HasWriteQueue())
req_wrap_obj->Set(env->async(), True(env->isolate()));

if (err)
req_wrap->Dispose();
Expand Down Expand Up @@ -467,6 +470,10 @@ int StreamResource::DoTryWrite(uv_buf_t** bufs, size_t* count) {
return 0;
}

bool StreamResource::HasWriteQueue() {
return true;
}


const char* StreamResource::Error() const {
return nullptr;
Expand Down
1 change: 1 addition & 0 deletions src/stream_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ class StreamResource {
uv_buf_t* bufs,
size_t count,
uv_stream_t* send_handle) = 0;
virtual bool HasWriteQueue();
virtual const char* Error() const;
virtual void ClearError();

Expand Down
48 changes: 28 additions & 20 deletions src/stream_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@
namespace node {

using v8::Context;
using v8::DontDelete;
using v8::EscapableHandleScope;
using v8::FunctionCallbackInfo;
using v8::FunctionTemplate;
using v8::HandleScope;
using v8::Integer;
using v8::Local;
using v8::Object;
using v8::ReadOnly;
using v8::Signature;
using v8::Value;


Expand Down Expand Up @@ -99,7 +101,16 @@ LibuvStreamWrap::LibuvStreamWrap(Environment* env,
void LibuvStreamWrap::AddMethods(Environment* env,
v8::Local<v8::FunctionTemplate> target,
int flags) {
env->SetProtoMethod(target, "updateWriteQueueSize", UpdateWriteQueueSize);
Local<FunctionTemplate> get_write_queue_size =
FunctionTemplate::New(env->isolate(),
GetWriteQueueSize,
env->as_external(),
Signature::New(env->isolate(), target));
target->PrototypeTemplate()->SetAccessorProperty(
env->write_queue_size_string(),
get_write_queue_size,
Local<FunctionTemplate>(),
static_cast<PropertyAttribute>(ReadOnly | DontDelete));
env->SetProtoMethod(target, "setBlocking", SetBlocking);
StreamBase::AddMethods<LibuvStreamWrap>(env, target, flags);
}
Expand Down Expand Up @@ -135,17 +146,6 @@ bool LibuvStreamWrap::IsIPCPipe() {
}


uint32_t LibuvStreamWrap::UpdateWriteQueueSize() {
HandleScope scope(env()->isolate());
uint32_t write_queue_size = stream()->write_queue_size;
object()->Set(env()->context(),
env()->write_queue_size_string(),
Integer::NewFromUnsigned(env()->isolate(),
write_queue_size)).FromJust();
return write_queue_size;
}


int LibuvStreamWrap::ReadStart() {
return uv_read_start(stream(), OnAlloc, OnRead);
}
Expand Down Expand Up @@ -267,13 +267,18 @@ void LibuvStreamWrap::OnRead(uv_stream_t* handle,
}


void LibuvStreamWrap::UpdateWriteQueueSize(
const FunctionCallbackInfo<Value>& args) {
void LibuvStreamWrap::GetWriteQueueSize(
const FunctionCallbackInfo<Value>& info) {
LibuvStreamWrap* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
ASSIGN_OR_RETURN_UNWRAP(&wrap, info.This());

if (wrap->stream() == nullptr) {
info.GetReturnValue().Set(0);
return;
}

uint32_t write_queue_size = wrap->UpdateWriteQueueSize();
args.GetReturnValue().Set(write_queue_size);
uint32_t write_queue_size = wrap->stream()->write_queue_size;
info.GetReturnValue().Set(write_queue_size);
}


Expand Down Expand Up @@ -370,12 +375,16 @@ int LibuvStreamWrap::DoWrite(WriteWrap* w,
}

w->Dispatched();
UpdateWriteQueueSize();

return r;
}


bool LibuvStreamWrap::HasWriteQueue() {
return stream()->write_queue_size > 0;
}


void LibuvStreamWrap::AfterUvWrite(uv_write_t* req, int status) {
WriteWrap* req_wrap = WriteWrap::from_req(req);
CHECK_NE(req_wrap, nullptr);
Expand All @@ -387,7 +396,6 @@ void LibuvStreamWrap::AfterUvWrite(uv_write_t* req, int status) {

void LibuvStreamWrap::AfterWrite(WriteWrap* w, int status) {
StreamBase::AfterWrite(w, status);
UpdateWriteQueueSize();
}

} // namespace node
Expand Down
6 changes: 3 additions & 3 deletions src/stream_wrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class LibuvStreamWrap : public HandleWrap, public StreamBase {
uv_buf_t* bufs,
size_t count,
uv_stream_t* send_handle) override;
bool HasWriteQueue() override;

inline uv_stream_t* stream() const {
return stream_;
Expand Down Expand Up @@ -83,15 +84,14 @@ class LibuvStreamWrap : public HandleWrap, public StreamBase {
}

AsyncWrap* GetAsyncWrap() override;
uint32_t UpdateWriteQueueSize();

static void AddMethods(Environment* env,
v8::Local<v8::FunctionTemplate> target,
int flags = StreamBase::kFlagNone);

private:
static void UpdateWriteQueueSize(
const v8::FunctionCallbackInfo<v8::Value>& args);
static void GetWriteQueueSize(
const v8::FunctionCallbackInfo<v8::Value>& info);
static void SetBlocking(const v8::FunctionCallbackInfo<v8::Value>& args);

// Callbacks for libuv
Expand Down
1 change: 0 additions & 1 deletion src/tcp_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ TCPWrap::TCPWrap(Environment* env, Local<Object> object, ProviderType provider)
int r = uv_tcp_init(env->event_loop(), &handle_);
CHECK_EQ(r, 0); // How do we proxy this error up to javascript?
// Suggestion: uv_tcp_init() returns void.
UpdateWriteQueueSize();
}


Expand Down
45 changes: 23 additions & 22 deletions src/tls_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,16 @@ namespace node {
using crypto::SecureContext;
using crypto::SSLWrap;
using v8::Context;
using v8::DontDelete;
using v8::EscapableHandleScope;
using v8::Exception;
using v8::Function;
using v8::FunctionCallbackInfo;
using v8::FunctionTemplate;
using v8::Integer;
using v8::Local;
using v8::Object;
using v8::ReadOnly;
using v8::Signature;
using v8::String;
using v8::Value;

Expand Down Expand Up @@ -309,7 +311,6 @@ void TLSWrap::EncOut() {

// No data to write
if (BIO_pending(enc_out_) == 0) {
UpdateWriteQueueSize();
if (clear_in_->Length() == 0)
InvokeQueued(0);
return;
Expand Down Expand Up @@ -555,17 +556,6 @@ bool TLSWrap::IsClosing() {
}


uint32_t TLSWrap::UpdateWriteQueueSize(uint32_t write_queue_size) {
HandleScope scope(env()->isolate());
if (write_queue_size == 0)
write_queue_size = BIO_pending(enc_out_);
object()->Set(env()->context(),
env()->write_queue_size_string(),
Integer::NewFromUnsigned(env()->isolate(),
write_queue_size)).FromJust();
return write_queue_size;
}


int TLSWrap::ReadStart() {
if (stream_ != nullptr)
Expand Down Expand Up @@ -612,9 +602,6 @@ int TLSWrap::DoWrite(WriteWrap* w,
// However, if there is any data that should be written to the socket,
// the callback should not be invoked immediately
if (BIO_pending(enc_out_) == 0) {
// net.js expects writeQueueSize to be > 0 if the write isn't
// immediately flushed
UpdateWriteQueueSize(1);
return stream_->DoWrite(w, bufs, count, send_handle);
}
}
Expand Down Expand Up @@ -666,7 +653,6 @@ int TLSWrap::DoWrite(WriteWrap* w,

// Try writing data immediately
EncOut();
UpdateWriteQueueSize();

return 0;
}
Expand Down Expand Up @@ -938,12 +924,17 @@ int TLSWrap::SelectSNIContextCallback(SSL* s, int* ad, void* arg) {
#endif // SSL_CTRL_SET_TLSEXT_SERVERNAME_CB


void TLSWrap::UpdateWriteQueueSize(const FunctionCallbackInfo<Value>& args) {
void TLSWrap::GetWriteQueueSize(const FunctionCallbackInfo<Value>& info) {
TLSWrap* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
ASSIGN_OR_RETURN_UNWRAP(&wrap, info.This());

uint32_t write_queue_size = wrap->UpdateWriteQueueSize();
args.GetReturnValue().Set(write_queue_size);
if (wrap->clear_in_ == nullptr) {
info.GetReturnValue().Set(0);
return;
}

uint32_t write_queue_size = BIO_pending(wrap->enc_out_);
info.GetReturnValue().Set(write_queue_size);
}


Expand All @@ -966,14 +957,24 @@ void TLSWrap::Initialize(Local<Object> target,
t->InstanceTemplate()->SetInternalFieldCount(1);
t->SetClassName(tlsWrapString);

Local<FunctionTemplate> get_write_queue_size =
FunctionTemplate::New(env->isolate(),
GetWriteQueueSize,
env->as_external(),
Signature::New(env->isolate(), t));
t->PrototypeTemplate()->SetAccessorProperty(
env->write_queue_size_string(),
get_write_queue_size,
Local<FunctionTemplate>(),
static_cast<PropertyAttribute>(ReadOnly | DontDelete));

AsyncWrap::AddWrapMethods(env, t, AsyncWrap::kFlagHasReset);
env->SetProtoMethod(t, "receive", Receive);
env->SetProtoMethod(t, "start", Start);
env->SetProtoMethod(t, "setVerifyMode", SetVerifyMode);
env->SetProtoMethod(t, "enableSessionCallbacks", EnableSessionCallbacks);
env->SetProtoMethod(t, "destroySSL", DestroySSL);
env->SetProtoMethod(t, "enableCertCb", EnableCertCb);
env->SetProtoMethod(t, "updateWriteQueueSize", UpdateWriteQueueSize);

StreamBase::AddMethods<TLSWrap>(env, t, StreamBase::kFlagHasWritev);
SSLWrap<TLSWrap>::AddMethods(env, t);
Expand Down
Loading

0 comments on commit f9a17f5

Please sign in to comment.