Skip to content

Commit 45e28a8

Browse files
addaleaxMylesBorins
authored andcommitted
src: minor refactoring to StreamBase writes
Instead of having per-request callbacks, always call a callback on the `StreamBase` instance itself for `WriteWrap` and `ShutdownWrap`. This makes `WriteWrap` cleanup consistent for all stream classes, since the after-write callback is always the same now. If special handling is needed for writes that happen to a sub-class, `AfterWrite` can be overridden by that class, rather than that class providing its own callback (e.g. updating the write queue size for libuv streams). If special handling is needed for writes that happen on another stream instance, the existing `after_write_cb()` callback is used for that (e.g. custom code after writing to the transport from a TLS stream). As a nice bonus, this also makes `WriteWrap` and `ShutdownWrap` instances slightly smaller. Backport-PR-URL: #20456 PR-URL: #17564 Reviewed-By: Anatoli Papirovski <apapirovski@mac.com> Reviewed-By: James M Snell <jasnell@gmail.com>
1 parent 07a6770 commit 45e28a8

File tree

9 files changed

+73
-89
lines changed

9 files changed

+73
-89
lines changed

src/js_stream.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ void JSStream::DoAfterWrite(const FunctionCallbackInfo<Value>& args) {
181181
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
182182
ASSIGN_OR_RETURN_UNWRAP(&w, args[0].As<Object>());
183183

184-
wrap->OnAfterWrite(w);
184+
w->Done(0);
185185
}
186186

187187

src/node_http2.cc

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -975,9 +975,6 @@ inline void Http2Session::SetChunksSinceLastWrite(size_t n) {
975975

976976
WriteWrap* Http2Session::AllocateSend() {
977977
HandleScope scope(env()->isolate());
978-
auto AfterWrite = [](WriteWrap* req, int status) {
979-
req->Dispose();
980-
};
981978
Local<Object> obj =
982979
env()->write_wrap_constructor_function()
983980
->NewInstance(env()->context()).ToLocalChecked();
@@ -987,7 +984,7 @@ WriteWrap* Http2Session::AllocateSend() {
987984
session(),
988985
NGHTTP2_SETTINGS_MAX_FRAME_SIZE);
989986
// Max frame size + 9 bytes for the header
990-
return WriteWrap::New(env(), obj, stream_, AfterWrite, size + 9);
987+
return WriteWrap::New(env(), obj, stream_, size + 9);
991988
}
992989

993990
void Http2Session::Send(WriteWrap* req, char* buf, size_t length) {

src/stream_base-inl.h

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,15 +144,19 @@ void StreamBase::JSMethod(const FunctionCallbackInfo<Value>& args) {
144144
}
145145

146146

147+
inline void ShutdownWrap::OnDone(int status) {
148+
stream()->AfterShutdown(this, status);
149+
}
150+
151+
147152
WriteWrap* WriteWrap::New(Environment* env,
148153
Local<Object> obj,
149154
StreamBase* wrap,
150-
DoneCb cb,
151155
size_t extra) {
152156
size_t storage_size = ROUND_UP(sizeof(WriteWrap), kAlignSize) + extra;
153157
char* storage = new char[storage_size];
154158

155-
return new(storage) WriteWrap(env, obj, wrap, cb, storage_size);
159+
return new(storage) WriteWrap(env, obj, wrap, storage_size);
156160
}
157161

158162

@@ -172,6 +176,10 @@ size_t WriteWrap::ExtraSize() const {
172176
return storage_size_ - ROUND_UP(sizeof(*this), kAlignSize);
173177
}
174178

179+
inline void WriteWrap::OnDone(int status) {
180+
stream()->AfterWrite(this, status);
181+
}
182+
175183
} // namespace node
176184

177185
#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS

src/stream_base.cc

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,7 @@ int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
5555
AsyncHooks::DefaultTriggerAsyncIdScope(env, wrap->get_async_id());
5656
ShutdownWrap* req_wrap = new ShutdownWrap(env,
5757
req_wrap_obj,
58-
this,
59-
AfterShutdown);
58+
this);
6059

6160
int err = DoShutdown(req_wrap);
6261
if (err)
@@ -66,7 +65,6 @@ int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
6665

6766

6867
void StreamBase::AfterShutdown(ShutdownWrap* req_wrap, int status) {
69-
StreamBase* wrap = req_wrap->wrap();
7068
Environment* env = req_wrap->env();
7169

7270
// The wrap and request objects should still be there.
@@ -78,7 +76,7 @@ void StreamBase::AfterShutdown(ShutdownWrap* req_wrap, int status) {
7876
Local<Object> req_wrap_obj = req_wrap->object();
7977
Local<Value> argv[3] = {
8078
Integer::New(env->isolate(), status),
81-
wrap->GetObject(),
79+
GetObject(),
8280
req_wrap_obj
8381
};
8482

@@ -159,8 +157,7 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
159157
CHECK_NE(wrap, nullptr);
160158
AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(env,
161159
wrap->get_async_id());
162-
req_wrap = WriteWrap::New(env, req_wrap_obj, this, AfterWrite,
163-
storage_size);
160+
req_wrap = WriteWrap::New(env, req_wrap_obj, this, storage_size);
164161
}
165162

166163
offset = 0;
@@ -252,7 +249,7 @@ int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
252249
CHECK_NE(wrap, nullptr);
253250
AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(env,
254251
wrap->get_async_id());
255-
req_wrap = WriteWrap::New(env, req_wrap_obj, this, AfterWrite);
252+
req_wrap = WriteWrap::New(env, req_wrap_obj, this);
256253
}
257254

258255
err = DoWrite(req_wrap, bufs, count, nullptr);
@@ -338,8 +335,7 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
338335
CHECK_NE(wrap, nullptr);
339336
AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(env,
340337
wrap->get_async_id());
341-
req_wrap = WriteWrap::New(env, req_wrap_obj, this, AfterWrite,
342-
storage_size);
338+
req_wrap = WriteWrap::New(env, req_wrap_obj, this, storage_size);
343339
}
344340

345341
data = req_wrap->Extra();
@@ -401,7 +397,6 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
401397

402398

403399
void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) {
404-
StreamBase* wrap = req_wrap->wrap();
405400
Environment* env = req_wrap->env();
406401

407402
HandleScope handle_scope(env->isolate());
@@ -413,19 +408,19 @@ void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) {
413408
// Unref handle property
414409
Local<Object> req_wrap_obj = req_wrap->object();
415410
req_wrap_obj->Delete(env->context(), env->handle_string()).FromJust();
416-
wrap->OnAfterWrite(req_wrap);
411+
OnAfterWrite(req_wrap, status);
417412

418413
Local<Value> argv[] = {
419414
Integer::New(env->isolate(), status),
420-
wrap->GetObject(),
415+
GetObject(),
421416
req_wrap_obj,
422417
Undefined(env->isolate())
423418
};
424419

425-
const char* msg = wrap->Error();
420+
const char* msg = Error();
426421
if (msg != nullptr) {
427422
argv[3] = OneByteString(env->isolate(), msg);
428-
wrap->ClearError();
423+
ClearError();
429424
}
430425

431426
if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust())

src/stream_base.h

Lines changed: 27 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -16,39 +16,37 @@ namespace node {
1616
// Forward declarations
1717
class StreamBase;
1818

19-
template <class Req>
19+
template<typename Base>
2020
class StreamReq {
2121
public:
22-
typedef void (*DoneCb)(Req* req, int status);
23-
24-
explicit StreamReq(DoneCb cb) : cb_(cb) {
22+
explicit StreamReq(StreamBase* stream) : stream_(stream) {
2523
}
2624

2725
inline void Done(int status, const char* error_str = nullptr) {
28-
Req* req = static_cast<Req*>(this);
26+
Base* req = static_cast<Base*>(this);
2927
Environment* env = req->env();
3028
if (error_str != nullptr) {
3129
req->object()->Set(env->error_string(),
3230
OneByteString(env->isolate(), error_str));
3331
}
3432

35-
cb_(req, status);
33+
req->OnDone(status);
3634
}
3735

36+
inline StreamBase* stream() const { return stream_; }
37+
3838
private:
39-
DoneCb cb_;
39+
StreamBase* const stream_;
4040
};
4141

4242
class ShutdownWrap : public ReqWrap<uv_shutdown_t>,
4343
public StreamReq<ShutdownWrap> {
4444
public:
4545
ShutdownWrap(Environment* env,
4646
v8::Local<v8::Object> req_wrap_obj,
47-
StreamBase* wrap,
48-
DoneCb cb)
47+
StreamBase* stream)
4948
: ReqWrap(env, req_wrap_obj, AsyncWrap::PROVIDER_SHUTDOWNWRAP),
50-
StreamReq<ShutdownWrap>(cb),
51-
wrap_(wrap) {
49+
StreamReq<ShutdownWrap>(stream) {
5250
Wrap(req_wrap_obj, this);
5351
}
5452

@@ -60,27 +58,22 @@ class ShutdownWrap : public ReqWrap<uv_shutdown_t>,
6058
return ContainerOf(&ShutdownWrap::req_, req);
6159
}
6260

63-
inline StreamBase* wrap() const { return wrap_; }
6461
size_t self_size() const override { return sizeof(*this); }
6562

66-
private:
67-
StreamBase* const wrap_;
63+
inline void OnDone(int status); // Just calls stream()->AfterShutdown()
6864
};
6965

70-
class WriteWrap: public ReqWrap<uv_write_t>,
71-
public StreamReq<WriteWrap> {
66+
class WriteWrap : public ReqWrap<uv_write_t>,
67+
public StreamReq<WriteWrap> {
7268
public:
7369
static inline WriteWrap* New(Environment* env,
7470
v8::Local<v8::Object> obj,
75-
StreamBase* wrap,
76-
DoneCb cb,
71+
StreamBase* stream,
7772
size_t extra = 0);
7873
inline void Dispose();
7974
inline char* Extra(size_t offset = 0);
8075
inline size_t ExtraSize() const;
8176

82-
inline StreamBase* wrap() const { return wrap_; }
83-
8477
size_t self_size() const override { return storage_size_; }
8578

8679
static WriteWrap* from_req(uv_write_t* req) {
@@ -91,24 +84,22 @@ class WriteWrap: public ReqWrap<uv_write_t>,
9184

9285
WriteWrap(Environment* env,
9386
v8::Local<v8::Object> obj,
94-
StreamBase* wrap,
95-
DoneCb cb)
87+
StreamBase* stream)
9688
: ReqWrap(env, obj, AsyncWrap::PROVIDER_WRITEWRAP),
97-
StreamReq<WriteWrap>(cb),
98-
wrap_(wrap),
89+
StreamReq<WriteWrap>(stream),
9990
storage_size_(0) {
10091
Wrap(obj, this);
10192
}
10293

94+
inline void OnDone(int status); // Just calls stream()->AfterWrite()
95+
10396
protected:
10497
WriteWrap(Environment* env,
10598
v8::Local<v8::Object> obj,
106-
StreamBase* wrap,
107-
DoneCb cb,
99+
StreamBase* stream,
108100
size_t storage_size)
109101
: ReqWrap(env, obj, AsyncWrap::PROVIDER_WRITEWRAP),
110-
StreamReq<WriteWrap>(cb),
111-
wrap_(wrap),
102+
StreamReq<WriteWrap>(stream),
112103
storage_size_(storage_size) {
113104
Wrap(obj, this);
114105
}
@@ -129,7 +120,6 @@ class WriteWrap: public ReqWrap<uv_write_t>,
129120
// WriteWrap. Ensure this never happens.
130121
void operator delete(void* ptr) { UNREACHABLE(); }
131122

132-
StreamBase* const wrap_;
133123
const size_t storage_size_;
134124
};
135125

@@ -151,7 +141,7 @@ class StreamResource {
151141
void* ctx;
152142
};
153143

154-
typedef void (*AfterWriteCb)(WriteWrap* w, void* ctx);
144+
typedef void (*AfterWriteCb)(WriteWrap* w, int status, void* ctx);
155145
typedef void (*AllocCb)(size_t size, uv_buf_t* buf, void* ctx);
156146
typedef void (*ReadCb)(ssize_t nread,
157147
const uv_buf_t* buf,
@@ -176,9 +166,9 @@ class StreamResource {
176166
virtual void ClearError();
177167

178168
// Events
179-
inline void OnAfterWrite(WriteWrap* w) {
169+
inline void OnAfterWrite(WriteWrap* w, int status) {
180170
if (!after_write_cb_.is_empty())
181-
after_write_cb_.fn(w, after_write_cb_.ctx);
171+
after_write_cb_.fn(w, status, after_write_cb_.ctx);
182172
}
183173

184174
inline void OnAlloc(size_t size, uv_buf_t* buf) {
@@ -208,14 +198,12 @@ class StreamResource {
208198
inline Callback<ReadCb> read_cb() { return read_cb_; }
209199
inline Callback<DestructCb> destruct_cb() { return destruct_cb_; }
210200

211-
private:
201+
protected:
212202
Callback<AfterWriteCb> after_write_cb_;
213203
Callback<AllocCb> alloc_cb_;
214204
Callback<ReadCb> read_cb_;
215205
Callback<DestructCb> destruct_cb_;
216206
uint64_t bytes_read_;
217-
218-
friend class StreamBase;
219207
};
220208

221209
class StreamBase : public StreamResource {
@@ -257,6 +245,10 @@ class StreamBase : public StreamResource {
257245
v8::Local<v8::Object> buf,
258246
v8::Local<v8::Object> handle);
259247

248+
// These are called by the respective {Write,Shutdown}Wrap class.
249+
virtual void AfterShutdown(ShutdownWrap* req, int status);
250+
virtual void AfterWrite(WriteWrap* req, int status);
251+
260252
protected:
261253
explicit StreamBase(Environment* env) : env_(env), consumed_(false) {
262254
}
@@ -267,10 +259,6 @@ class StreamBase : public StreamResource {
267259
virtual AsyncWrap* GetAsyncWrap() = 0;
268260
virtual v8::Local<v8::Object> GetObject();
269261

270-
// Libuv callbacks
271-
static void AfterShutdown(ShutdownWrap* req, int status);
272-
static void AfterWrite(WriteWrap* req, int status);
273-
274262
// JS Methods
275263
int ReadStart(const v8::FunctionCallbackInfo<v8::Value>& args);
276264
int ReadStop(const v8::FunctionCallbackInfo<v8::Value>& args);

src/stream_wrap.cc

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,6 @@ LibuvStreamWrap::LibuvStreamWrap(Environment* env,
9292
provider),
9393
StreamBase(env),
9494
stream_(stream) {
95-
set_after_write_cb({ OnAfterWriteImpl, this });
9695
set_alloc_cb({ OnAllocImpl, this });
9796
set_read_cb({ OnReadImpl, this });
9897
}
@@ -299,13 +298,13 @@ void LibuvStreamWrap::SetBlocking(const FunctionCallbackInfo<Value>& args) {
299298

300299
int LibuvStreamWrap::DoShutdown(ShutdownWrap* req_wrap) {
301300
int err;
302-
err = uv_shutdown(req_wrap->req(), stream(), AfterShutdown);
301+
err = uv_shutdown(req_wrap->req(), stream(), AfterUvShutdown);
303302
req_wrap->Dispatched();
304303
return err;
305304
}
306305

307306

308-
void LibuvStreamWrap::AfterShutdown(uv_shutdown_t* req, int status) {
307+
void LibuvStreamWrap::AfterUvShutdown(uv_shutdown_t* req, int status) {
309308
ShutdownWrap* req_wrap = ShutdownWrap::from_req(req);
310309
CHECK_NE(req_wrap, nullptr);
311310
HandleScope scope(req_wrap->env()->isolate());
@@ -360,9 +359,9 @@ int LibuvStreamWrap::DoWrite(WriteWrap* w,
360359
uv_stream_t* send_handle) {
361360
int r;
362361
if (send_handle == nullptr) {
363-
r = uv_write(w->req(), stream(), bufs, count, AfterWrite);
362+
r = uv_write(w->req(), stream(), bufs, count, AfterUvWrite);
364363
} else {
365-
r = uv_write2(w->req(), stream(), bufs, count, send_handle, AfterWrite);
364+
r = uv_write2(w->req(), stream(), bufs, count, send_handle, AfterUvWrite);
366365
}
367366

368367
if (!r) {
@@ -383,7 +382,7 @@ int LibuvStreamWrap::DoWrite(WriteWrap* w,
383382
}
384383

385384

386-
void LibuvStreamWrap::AfterWrite(uv_write_t* req, int status) {
385+
void LibuvStreamWrap::AfterUvWrite(uv_write_t* req, int status) {
387386
WriteWrap* req_wrap = WriteWrap::from_req(req);
388387
CHECK_NE(req_wrap, nullptr);
389388
HandleScope scope(req_wrap->env()->isolate());
@@ -392,9 +391,9 @@ void LibuvStreamWrap::AfterWrite(uv_write_t* req, int status) {
392391
}
393392

394393

395-
void LibuvStreamWrap::OnAfterWriteImpl(WriteWrap* w, void* ctx) {
396-
LibuvStreamWrap* wrap = static_cast<LibuvStreamWrap*>(ctx);
397-
wrap->UpdateWriteQueueSize();
394+
void LibuvStreamWrap::AfterWrite(WriteWrap* w, int status) {
395+
StreamBase::AfterWrite(w, status);
396+
UpdateWriteQueueSize();
398397
}
399398

400399
} // namespace node

0 commit comments

Comments
 (0)