Skip to content

Commit d8b783f

Browse files
committed
Use custom StreamListener instead of fixed buffer
1 parent f84b416 commit d8b783f

12 files changed

+69
-134
lines changed

lib/net.js

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -103,20 +103,18 @@ function getFlags(ipv6Only) {
103103
return ipv6Only === true ? TCPConstants.UV_TCP_IPV6ONLY : 0;
104104
}
105105

106-
function createHandle(fd, is_server, buf) {
106+
function createHandle(fd, is_server) {
107107
validateInt32(fd, 'fd', 0);
108108
const type = TTYWrap.guessHandleType(fd);
109109
if (type === 'PIPE') {
110110
return new Pipe(
111-
is_server ? PipeConstants.SERVER : PipeConstants.SOCKET,
112-
buf
111+
is_server ? PipeConstants.SERVER : PipeConstants.SOCKET
113112
);
114113
}
115114

116115
if (type === 'TCP') {
117116
return new TCP(
118-
is_server ? TCPConstants.SERVER : TCPConstants.SOCKET,
119-
buf
117+
is_server ? TCPConstants.SERVER : TCPConstants.SOCKET
120118
);
121119
}
122120

@@ -223,6 +221,10 @@ function initSocketHandle(self) {
223221
self._handle[owner_symbol] = self;
224222
self._handle.onread = onStreamRead;
225223
self[async_id_symbol] = getNewAsyncId(self._handle);
224+
225+
if (self[kBuffer]) {
226+
self._handle.useUserBuffer(self[kBuffer]);
227+
}
226228
}
227229
}
228230

@@ -904,8 +906,8 @@ Socket.prototype.connect = function(...args) {
904906

905907
if (!this._handle) {
906908
this._handle = pipe ?
907-
new Pipe(PipeConstants.SOCKET, this[kBuffer]) :
908-
new TCP(TCPConstants.SOCKET, this[kBuffer]);
909+
new Pipe(PipeConstants.SOCKET) :
910+
new TCP(TCPConstants.SOCKET);
909911
initSocketHandle(this);
910912
}
911913

src/connection_wrap.cc

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,6 @@ ConnectionWrap<WrapType, UVType>::ConnectionWrap(Environment* env,
2929
reinterpret_cast<uv_stream_t*>(&handle_),
3030
provider) {}
3131

32-
template <typename WrapType, typename UVType>
33-
ConnectionWrap<WrapType, UVType>::ConnectionWrap(Environment* env,
34-
Local<Object> object,
35-
ProviderType provider,
36-
uv_buf_t buf)
37-
: LibuvStreamWrap(env,
38-
object,
39-
reinterpret_cast<uv_stream_t*>(&handle_),
40-
provider,
41-
buf) {}
42-
4332

4433
template <typename WrapType, typename UVType>
4534
void ConnectionWrap<WrapType, UVType>::OnConnection(uv_stream_t* handle,
@@ -127,23 +116,11 @@ template ConnectionWrap<PipeWrap, uv_pipe_t>::ConnectionWrap(
127116
Local<Object> object,
128117
ProviderType provider);
129118

130-
template ConnectionWrap<PipeWrap, uv_pipe_t>::ConnectionWrap(
131-
Environment* env,
132-
Local<Object> object,
133-
ProviderType provider,
134-
uv_buf_t buf);
135-
136119
template ConnectionWrap<TCPWrap, uv_tcp_t>::ConnectionWrap(
137120
Environment* env,
138121
Local<Object> object,
139122
ProviderType provider);
140123

141-
template ConnectionWrap<TCPWrap, uv_tcp_t>::ConnectionWrap(
142-
Environment* env,
143-
Local<Object> object,
144-
ProviderType provider,
145-
uv_buf_t buf);
146-
147124
template void ConnectionWrap<PipeWrap, uv_pipe_t>::OnConnection(
148125
uv_stream_t* handle, int status);
149126

src/connection_wrap.h

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,6 @@ class ConnectionWrap : public LibuvStreamWrap {
1919
ConnectionWrap(Environment* env,
2020
v8::Local<v8::Object> object,
2121
ProviderType provider);
22-
ConnectionWrap(Environment* env,
23-
v8::Local<v8::Object> object,
24-
ProviderType provider,
25-
uv_buf_t buf);
2622

2723
UVType handle_;
2824
};

src/pipe_wrap.cc

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -149,14 +149,7 @@ void PipeWrap::New(const FunctionCallbackInfo<Value>& args) {
149149
UNREACHABLE();
150150
}
151151

152-
if (args.Length() > 1 && Buffer::HasInstance(args[1])) {
153-
uv_buf_t buf;
154-
buf.base = Buffer::Data(args[1]);
155-
buf.len = Buffer::Length(args[1]);
156-
new PipeWrap(env, args.This(), provider, ipc, buf);
157-
} else {
158-
new PipeWrap(env, args.This(), provider, ipc);
159-
}
152+
new PipeWrap(env, args.This(), provider, ipc);
160153
}
161154

162155

@@ -170,17 +163,6 @@ PipeWrap::PipeWrap(Environment* env,
170163
// Suggestion: uv_pipe_init() returns void.
171164
}
172165

173-
PipeWrap::PipeWrap(Environment* env,
174-
Local<Object> object,
175-
ProviderType provider,
176-
bool ipc,
177-
uv_buf_t buf)
178-
: ConnectionWrap(env, object, provider, buf) {
179-
int r = uv_pipe_init(env->event_loop(), &handle_, ipc);
180-
CHECK_EQ(r, 0); // How do we proxy this error up to javascript?
181-
// Suggestion: uv_pipe_init() returns void.
182-
}
183-
184166

185167
void PipeWrap::Bind(const FunctionCallbackInfo<Value>& args) {
186168
PipeWrap* wrap;

src/pipe_wrap.h

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,6 @@ class PipeWrap : public ConnectionWrap<PipeWrap, uv_pipe_t> {
5555
v8::Local<v8::Object> object,
5656
ProviderType provider,
5757
bool ipc);
58-
PipeWrap(Environment* env,
59-
v8::Local<v8::Object> object,
60-
ProviderType provider,
61-
bool ipc,
62-
uv_buf_t buf);
6358

6459
static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
6560
static void Bind(const v8::FunctionCallbackInfo<v8::Value>& args);

src/stream_base-inl.h

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -150,25 +150,13 @@ inline void StreamResource::EmitWantsWrite(size_t suggested_size) {
150150
}
151151

152152
inline StreamBase::StreamBase(Environment* env) : env_(env) {
153-
buf_.base = nullptr;
154-
buf_.len = 0;
155-
PushStreamListener(&default_listener_);
156-
}
157-
158-
inline StreamBase::StreamBase(Environment* env, uv_buf_t buf)
159-
: env_(env),
160-
buf_(buf) {
161153
PushStreamListener(&default_listener_);
162154
}
163155

164156
inline Environment* StreamBase::stream_env() const {
165157
return env_;
166158
}
167159

168-
inline uv_buf_t StreamBase::stream_buf() const {
169-
return buf_;
170-
}
171-
172160
inline int StreamBase::Shutdown(v8::Local<v8::Object> req_wrap_obj) {
173161
Environment* env = stream_env();
174162

@@ -338,6 +326,9 @@ void StreamBase::AddMethods(Environment* env, Local<FunctionTemplate> t) {
338326
env->SetProtoMethod(t, "readStart", JSMethod<Base, &StreamBase::ReadStartJS>);
339327
env->SetProtoMethod(t, "readStop", JSMethod<Base, &StreamBase::ReadStopJS>);
340328
env->SetProtoMethod(t, "shutdown", JSMethod<Base, &StreamBase::Shutdown>);
329+
env->SetProtoMethod(t,
330+
"useUserBuffer",
331+
JSMethod<Base, &StreamBase::UseUserBuffer>);
341332
env->SetProtoMethod(t, "writev", JSMethod<Base, &StreamBase::Writev>);
342333
env->SetProtoMethod(t,
343334
"writeBuffer",

src/stream_base.cc

Lines changed: 39 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,13 @@ int StreamBase::ReadStopJS(const FunctionCallbackInfo<Value>& args) {
4646
return ReadStop();
4747
}
4848

49+
int StreamBase::UseUserBuffer(const FunctionCallbackInfo<Value>& args) {
50+
CHECK(Buffer::HasInstance(args[0]));
51+
52+
uv_buf_t buf = uv_buf_init(Buffer::Data(args[0]), Buffer::Length(args[0]));
53+
PushStreamListener(new CustomBufferJSListener(buf));
54+
return 0;
55+
}
4956

5057
int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
5158
CHECK(args[0]->IsObject());
@@ -295,7 +302,7 @@ void StreamBase::CallJSOnreadMethod(ssize_t nread,
295302
DCHECK_EQ(static_cast<int32_t>(nread), nread);
296303
DCHECK_LE(offset, INT32_MAX);
297304

298-
if (ab.IsEmpty() && buf_.base == nullptr) {
305+
if (ab.IsEmpty()) {
299306
DCHECK_EQ(offset, 0);
300307
DCHECK_LE(nread, 0);
301308
} else {
@@ -347,12 +354,7 @@ void StreamResource::ClearError() {
347354

348355

349356
uv_buf_t StreamListener::OnStreamAlloc(size_t suggested_size) {
350-
StreamBase* stream = static_cast<StreamBase*>(stream_);
351-
const uv_buf_t stream_buf = stream->stream_buf();
352-
if (stream_buf.base != nullptr)
353-
return stream_buf;
354-
else
355-
return uv_buf_init(Malloc(suggested_size), suggested_size);
357+
return uv_buf_init(Malloc(suggested_size), suggested_size);
356358
}
357359

358360

@@ -362,32 +364,44 @@ void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
362364
Environment* env = stream->stream_env();
363365
HandleScope handle_scope(env->isolate());
364366
Context::Scope context_scope(env->context());
365-
const uv_buf_t stream_buf = stream->stream_buf();
366-
Local<ArrayBuffer> obj;
367367

368368
if (nread <= 0) {
369-
if (stream_buf.base == nullptr)
370-
free(buf.base);
371-
if (nread == 0)
372-
return;
373-
} else if (stream_buf.base != nullptr) {
374-
CHECK_LE(static_cast<size_t>(nread), stream_buf.len);
375-
} else {
376-
CHECK_LE(static_cast<size_t>(nread), buf.len);
369+
free(buf.base);
370+
if (nread < 0)
371+
stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>());
372+
return;
373+
}
377374

378-
char* base = Realloc(buf.base, nread);
375+
CHECK_LE(static_cast<size_t>(nread), buf.len);
376+
char* base = Realloc(buf.base, nread);
379377

380-
obj = ArrayBuffer::New(
381-
env->isolate(),
382-
base,
383-
nread,
384-
// Transfer ownership to V8.
385-
v8::ArrayBufferCreationMode::kInternalized);
386-
}
378+
Local<ArrayBuffer> obj = ArrayBuffer::New(
379+
env->isolate(),
380+
base,
381+
nread,
382+
v8::ArrayBufferCreationMode::kInternalized); // Transfer ownership to V8.
387383
stream->CallJSOnreadMethod(nread, obj);
388384
}
389385

390386

387+
uv_buf_t CustomBufferJSListener::OnStreamAlloc(size_t suggested_size) {
388+
return buffer_;
389+
}
390+
391+
392+
void CustomBufferJSListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
393+
CHECK_NOT_NULL(stream_);
394+
CHECK_EQ(buf.base, buffer_.base);
395+
396+
StreamBase* stream = static_cast<StreamBase*>(stream_);
397+
Environment* env = stream->stream_env();
398+
HandleScope handle_scope(env->isolate());
399+
Context::Scope context_scope(env->context());
400+
401+
stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>());
402+
}
403+
404+
391405
void ReportWritesToJSStreamListener::OnStreamAfterReqFinished(
392406
StreamReq* req_wrap, int status) {
393407
StreamBase* stream = static_cast<StreamBase*>(stream_);

src/stream_base.h

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,21 @@ class EmitToJSStreamListener : public ReportWritesToJSStreamListener {
185185
};
186186

187187

188+
// An alternative listener that uses a custom, user-provided buffer
189+
// for reading data.
190+
class CustomBufferJSListener : public ReportWritesToJSStreamListener {
191+
public:
192+
uv_buf_t OnStreamAlloc(size_t suggested_size) override;
193+
void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override;
194+
void OnStreamDestroy() override { delete this; }
195+
196+
explicit CustomBufferJSListener(uv_buf_t buffer) : buffer_(buffer) {}
197+
198+
private:
199+
uv_buf_t buffer_;
200+
};
201+
202+
188203
// A generic stream, comparable to JS land’s `Duplex` streams.
189204
// A stream is always controlled through one `StreamListener` instance.
190205
class StreamResource {
@@ -271,7 +286,6 @@ class StreamBase : public StreamResource {
271286
// This is named `stream_env` to avoid name clashes, because a lot of
272287
// subclasses are also `BaseObject`s.
273288
Environment* stream_env() const;
274-
uv_buf_t stream_buf() const;
275289

276290
// Shut down the current stream. This request can use an existing
277291
// ShutdownWrap object (that was created in JS), or a new one will be created.
@@ -302,7 +316,6 @@ class StreamBase : public StreamResource {
302316

303317
protected:
304318
explicit StreamBase(Environment* env);
305-
explicit StreamBase(Environment* env, uv_buf_t buf);
306319

307320
// JS Methods
308321
int ReadStartJS(const v8::FunctionCallbackInfo<v8::Value>& args);
@@ -312,6 +325,7 @@ class StreamBase : public StreamResource {
312325
int WriteBuffer(const v8::FunctionCallbackInfo<v8::Value>& args);
313326
template <enum encoding enc>
314327
int WriteString(const v8::FunctionCallbackInfo<v8::Value>& args);
328+
int UseUserBuffer(const v8::FunctionCallbackInfo<v8::Value>& args);
315329

316330
template <class Base>
317331
static void GetFD(const v8::FunctionCallbackInfo<v8::Value>& args);
@@ -341,7 +355,6 @@ class StreamBase : public StreamResource {
341355

342356
private:
343357
Environment* env_;
344-
uv_buf_t buf_;
345358
EmitToJSStreamListener default_listener_;
346359

347360
void SetWriteResult(const StreamWriteResult& res);

src/stream_wrap.cc

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -106,19 +106,6 @@ LibuvStreamWrap::LibuvStreamWrap(Environment* env,
106106
stream_(stream) {
107107
}
108108

109-
LibuvStreamWrap::LibuvStreamWrap(Environment* env,
110-
Local<Object> object,
111-
uv_stream_t* stream,
112-
AsyncWrap::ProviderType provider,
113-
uv_buf_t buf)
114-
: HandleWrap(env,
115-
object,
116-
reinterpret_cast<uv_handle_t*>(stream),
117-
provider),
118-
StreamBase(env, buf),
119-
stream_(stream) {
120-
}
121-
122109

123110
Local<FunctionTemplate> LibuvStreamWrap::GetConstructorTemplate(
124111
Environment* env) {

src/stream_wrap.h

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -84,11 +84,6 @@ class LibuvStreamWrap : public HandleWrap, public StreamBase {
8484
v8::Local<v8::Object> object,
8585
uv_stream_t* stream,
8686
AsyncWrap::ProviderType provider);
87-
LibuvStreamWrap(Environment* env,
88-
v8::Local<v8::Object> object,
89-
uv_stream_t* stream,
90-
AsyncWrap::ProviderType provider,
91-
uv_buf_t buf);
9287

9388
AsyncWrap* GetAsyncWrap() override;
9489

0 commit comments

Comments
 (0)