Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

net: track bytesWritten in C++ land #19551

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/internal/net.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ function makeSyncWrite(fd) {
if (enc !== 'buffer')
chunk = Buffer.from(chunk, enc);

this._bytesDispatched += chunk.length;
this._handle.bytesWritten += chunk.length;

const ctx = {};
writeBuffer(fd, chunk, 0, chunk.length, null, undefined, ctx);
Expand Down
28 changes: 20 additions & 8 deletions lib/net.js
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ function normalizeArgs(args) {
// called when creating new Socket, or when re-using a closed Socket
function initSocketHandle(self) {
self._undestroy();
self._bytesDispatched = 0;
self._sockname = null;

// Handle creation may be deferred to bind() or connect() time.
Expand All @@ -222,7 +221,8 @@ function initSocketHandle(self) {
}


const BYTES_READ = Symbol('bytesRead');
const kBytesRead = Symbol('kBytesRead');
const kBytesWritten = Symbol('kBytesWritten');


function Socket(options) {
Expand Down Expand Up @@ -278,6 +278,11 @@ function Socket(options) {

this._writev = null;
this._write = makeSyncWrite(fd);
// makeSyncWrite adjusts this value like the original handle would, so
// we need to let it do that by turning it into a writable, own property.
Object.defineProperty(this._handle, 'bytesWritten', {
value: 0, writable: true
});
}
} else {
// these will be set once there is a connection
Expand Down Expand Up @@ -316,7 +321,8 @@ function Socket(options) {
this._server = null;

// Used after `.destroy()`
this[BYTES_READ] = 0;
this[kBytesRead] = 0;
this[kBytesWritten] = 0;
}
util.inherits(Socket, stream.Duplex);

Expand Down Expand Up @@ -588,8 +594,9 @@ Socket.prototype._destroy = function(exception, cb) {
if (this !== process.stderr)
debug('close handle');
var isException = exception ? true : false;
// `bytesRead` should be accessible after `.destroy()`
this[BYTES_READ] = this._handle.bytesRead;
// `bytesRead` and `kBytesWritten` should be accessible after `.destroy()`
this[kBytesRead] = this._handle.bytesRead;
this[kBytesWritten] = this._handle.bytesWritten;

this._handle.close(() => {
debug('emit close');
Expand Down Expand Up @@ -689,7 +696,7 @@ function protoGetter(name, callback) {
}

protoGetter('bytesRead', function bytesRead() {
return this._handle ? this._handle.bytesRead : this[BYTES_READ];
return this._handle ? this._handle.bytesRead : this[kBytesRead];
});

protoGetter('remoteAddress', function remoteAddress() {
Expand Down Expand Up @@ -761,8 +768,6 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) {
// Bail out if handle.write* returned an error
if (ret) return ret;

this._bytesDispatched += req.bytes;

if (!req.async) {
cb();
return;
Expand All @@ -782,6 +787,13 @@ Socket.prototype._write = function(data, encoding, cb) {
this._writeGeneric(false, data, encoding, cb);
};


// Legacy alias. Having this is probably being overly cautious, but it doesn't
// really hurt anyone either. This can probably be removed safely if desired.
protoGetter('_bytesDispatched', function _bytesDispatched() {
return this._handle ? this._handle.bytesWritten : this[kBytesWritten];
});

protoGetter('bytesWritten', function bytesWritten() {
var bytes = this._bytesDispatched;
const state = this._writableState;
Expand Down
1 change: 1 addition & 0 deletions src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ struct PackageConfig {
V(bytes_string, "bytes") \
V(bytes_parsed_string, "bytesParsed") \
V(bytes_read_string, "bytesRead") \
V(bytes_written_string, "bytesWritten") \
V(cached_data_string, "cachedData") \
V(cached_data_produced_string, "cachedDataProduced") \
V(cached_data_rejected_string, "cachedDataRejected") \
Expand Down
34 changes: 31 additions & 3 deletions src/stream_base-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,16 @@ inline StreamWriteResult StreamBase::Write(
v8::Local<v8::Object> req_wrap_obj) {
Environment* env = stream_env();
int err;

size_t total_bytes = 0;
for (size_t i = 0; i < count; ++i)
total_bytes += bufs[i].len;
bytes_written_ += total_bytes;

if (send_handle == nullptr) {
err = DoTryWrite(&bufs, &count);
if (err != 0 || count == 0) {
return StreamWriteResult { false, err, nullptr };
return StreamWriteResult { false, err, nullptr, total_bytes };
}
}

Expand Down Expand Up @@ -226,7 +232,7 @@ inline StreamWriteResult StreamBase::Write(
ClearError();
}

return StreamWriteResult { async, err, req_wrap };
return StreamWriteResult { async, err, req_wrap, total_bytes };
}

template <typename OtherBase>
Expand Down Expand Up @@ -301,6 +307,12 @@ void StreamBase::AddMethods(Environment* env,
env->as_external(),
signature);

Local<FunctionTemplate> get_bytes_written_templ =
FunctionTemplate::New(env->isolate(),
GetBytesWritten<Base>,
env->as_external(),
signature);

t->PrototypeTemplate()->SetAccessorProperty(env->fd_string(),
get_fd_templ,
Local<FunctionTemplate>(),
Expand All @@ -316,6 +328,11 @@ void StreamBase::AddMethods(Environment* env,
Local<FunctionTemplate>(),
attributes);

t->PrototypeTemplate()->SetAccessorProperty(env->bytes_written_string(),
get_bytes_written_templ,
Local<FunctionTemplate>(),
attributes);

env->SetProtoMethod(t, "readStart", JSMethod<Base, &StreamBase::ReadStartJS>);
env->SetProtoMethod(t, "readStop", JSMethod<Base, &StreamBase::ReadStopJS>);
if ((flags & kFlagNoShutdown) == 0)
Expand Down Expand Up @@ -357,7 +374,6 @@ void StreamBase::GetFD(const FunctionCallbackInfo<Value>& args) {

template <class Base>
void StreamBase::GetBytesRead(const FunctionCallbackInfo<Value>& args) {
// The handle instance hasn't been set. So no bytes could have been read.
Base* handle;
ASSIGN_OR_RETURN_UNWRAP(&handle,
args.This(),
Expand All @@ -368,6 +384,18 @@ void StreamBase::GetBytesRead(const FunctionCallbackInfo<Value>& args) {
args.GetReturnValue().Set(static_cast<double>(wrap->bytes_read_));
}

template <class Base>
void StreamBase::GetBytesWritten(const FunctionCallbackInfo<Value>& args) {
Base* handle;
ASSIGN_OR_RETURN_UNWRAP(&handle,
args.This(),
args.GetReturnValue().Set(0));

StreamBase* wrap = static_cast<StreamBase*>(handle);
// uint64_t -> double. 53bits is enough for all real cases.
args.GetReturnValue().Set(static_cast<double>(wrap->bytes_written_));
}

template <class Base>
void StreamBase::GetExternal(const FunctionCallbackInfo<Value>& args) {
Base* handle;
Expand Down
22 changes: 12 additions & 10 deletions src/stream_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,11 @@ int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
inline void SetWriteResultPropertiesOnWrapObject(
Environment* env,
Local<Object> req_wrap_obj,
const StreamWriteResult& res,
size_t bytes) {
const StreamWriteResult& res) {
req_wrap_obj->Set(
env->context(),
env->bytes_string(),
Number::New(env->isolate(), bytes)).FromJust();
Number::New(env->isolate(), res.bytes)).FromJust();
req_wrap_obj->Set(
env->context(),
env->async(),
Expand All @@ -91,7 +90,6 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
MaybeStackBuffer<uv_buf_t, 16> bufs(count);

size_t storage_size = 0;
uint32_t bytes = 0;
size_t offset;

if (!all_buffers) {
Expand Down Expand Up @@ -123,7 +121,6 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
Local<Value> chunk = chunks->Get(i);
bufs[i].base = Buffer::Data(chunk);
bufs[i].len = Buffer::Length(chunk);
bytes += bufs[i].len;
}
}

Expand All @@ -140,7 +137,6 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
if (Buffer::HasInstance(chunk)) {
bufs[i].base = Buffer::Data(chunk);
bufs[i].len = Buffer::Length(chunk);
bytes += bufs[i].len;
continue;
}

Expand All @@ -160,12 +156,11 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
bufs[i].base = str_storage;
bufs[i].len = str_size;
offset += str_size;
bytes += str_size;
}
}

StreamWriteResult res = Write(*bufs, count, nullptr, req_wrap_obj);
SetWriteResultPropertiesOnWrapObject(env, req_wrap_obj, res, bytes);
SetWriteResultPropertiesOnWrapObject(env, req_wrap_obj, res);
if (res.wrap != nullptr && storage) {
res.wrap->SetAllocatedStorage(storage.release(), storage_size);
}
Expand Down Expand Up @@ -193,7 +188,7 @@ int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {

if (res.async)
req_wrap_obj->Set(env->context(), env->buffer_string(), args[1]).FromJust();
SetWriteResultPropertiesOnWrapObject(env, req_wrap_obj, res, buf.len);
SetWriteResultPropertiesOnWrapObject(env, req_wrap_obj, res);

return res.err;
}
Expand Down Expand Up @@ -228,6 +223,7 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
// Try writing immediately if write size isn't too big
char stack_storage[16384]; // 16kb
size_t data_size;
size_t synchronously_written = 0;
uv_buf_t buf;

bool try_write = storage_size <= sizeof(stack_storage) &&
Expand All @@ -243,6 +239,11 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
uv_buf_t* bufs = &buf;
size_t count = 1;
err = DoTryWrite(&bufs, &count);
// Keep track of the bytes written here, because we're taking a shortcut
// by using `DoTryWrite()` directly instead of using the utilities
// provided by `Write()`.
synchronously_written = count == 0 ? data_size : data_size - buf.len;
bytes_written_ += synchronously_written;

// Immediate failure or success
if (err != 0 || count == 0) {
Expand Down Expand Up @@ -298,8 +299,9 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
}

StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj);
res.bytes += synchronously_written;

SetWriteResultPropertiesOnWrapObject(env, req_wrap_obj, res, data_size);
SetWriteResultPropertiesOnWrapObject(env, req_wrap_obj, res);
if (res.wrap != nullptr) {
res.wrap->SetAllocatedStorage(data.release(), data_size);
}
Expand Down
5 changes: 5 additions & 0 deletions src/stream_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ struct StreamWriteResult {
bool async;
int err;
WriteWrap* wrap;
size_t bytes;
};


Expand Down Expand Up @@ -247,6 +248,7 @@ class StreamResource {

StreamListener* listener_ = nullptr;
uint64_t bytes_read_ = 0;
uint64_t bytes_written_ = 0;

friend class StreamListener;
};
Expand Down Expand Up @@ -324,6 +326,9 @@ class StreamBase : public StreamResource {
template <class Base>
static void GetBytesRead(const v8::FunctionCallbackInfo<v8::Value>& args);

template <class Base>
static void GetBytesWritten(const v8::FunctionCallbackInfo<v8::Value>& args);

template <class Base,
int (StreamBase::*Method)(
const v8::FunctionCallbackInfo<v8::Value>& args)>
Expand Down
67 changes: 67 additions & 0 deletions test/parallel/test-net-bytes-written-large.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const net = require('net');

// Regression test for https://github.com/nodejs/node/issues/19562:
// Writing to a socket first tries to push through as much data as possible
// without blocking synchronously, and, if that is not enough, queues more
// data up for asynchronous writing.
// Check that `bytesWritten` accounts for both parts of a write.

const N = 10000000;
{
// Variant 1: Write a Buffer.
const server = net.createServer(common.mustCall((socket) => {
socket.end(Buffer.alloc(N), common.mustCall(() => {
assert.strictEqual(socket.bytesWritten, N);
}));
assert.strictEqual(socket.bytesWritten, N);
})).listen(0, common.mustCall(() => {
const client = net.connect(server.address().port);
client.resume();
client.on('close', common.mustCall(() => {
assert.strictEqual(client.bytesRead, N);
server.close();
}));
}));
}

{
// Variant 2: Write a string.
const server = net.createServer(common.mustCall((socket) => {
socket.end('a'.repeat(N), common.mustCall(() => {
assert.strictEqual(socket.bytesWritten, N);
}));
assert.strictEqual(socket.bytesWritten, N);
})).listen(0, common.mustCall(() => {
const client = net.connect(server.address().port);
client.resume();
client.on('close', common.mustCall(() => {
assert.strictEqual(client.bytesRead, N);
server.close();
}));
}));
}

{
// Variant 2: writev() with mixed data.
const server = net.createServer(common.mustCall((socket) => {
socket.cork();
socket.write('a'.repeat(N));
assert.strictEqual(socket.bytesWritten, N);
socket.write(Buffer.alloc(N));
assert.strictEqual(socket.bytesWritten, 2 * N);
socket.end('', common.mustCall(() => {
assert.strictEqual(socket.bytesWritten, 2 * N);
}));
socket.uncork();
})).listen(0, common.mustCall(() => {
const client = net.connect(server.address().port);
client.resume();
client.on('close', common.mustCall(() => {
assert.strictEqual(client.bytesRead, 2 * N);
server.close();
}));
}));
}