Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http2,tls: store WriteWrap using BaseObjectPtr #35488

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions src/node_http2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1525,12 +1525,12 @@ void Http2Session::ClearOutgoing(int status) {
std::vector<NgHttp2StreamWrite> current_outgoing_buffers_;
current_outgoing_buffers_.swap(outgoing_buffers_);
for (const NgHttp2StreamWrite& wr : current_outgoing_buffers_) {
WriteWrap* wrap = wr.req_wrap;
if (wrap != nullptr) {
BaseObjectPtr<AsyncWrap> wrap = std::move(wr.req_wrap);
if (wrap) {
// TODO(addaleax): Pass `status` instead of 0, so that we actually error
// out with the error from the write to the underlying protocol,
// if one occurred.
wrap->Done(0);
WriteWrap::FromObject(wrap)->Done(0);
}
}
}
Expand Down Expand Up @@ -1813,7 +1813,7 @@ void Http2Session::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) {

bool Http2Session::HasWritesOnSocketForStream(Http2Stream* stream) {
for (const NgHttp2StreamWrite& wr : outgoing_buffers_) {
if (wr.req_wrap != nullptr && wr.req_wrap->stream() == stream)
if (wr.req_wrap && WriteWrap::FromObject(wr.req_wrap)->stream() == stream)
return true;
}
return false;
Expand Down Expand Up @@ -1966,8 +1966,8 @@ void Http2Stream::Destroy() {
// we still have queued outbound writes.
while (!queue_.empty()) {
NgHttp2StreamWrite& head = queue_.front();
if (head.req_wrap != nullptr)
head.req_wrap->Done(UV_ECANCELED);
if (head.req_wrap)
WriteWrap::FromObject(head.req_wrap)->Done(UV_ECANCELED);
queue_.pop();
}

Expand Down Expand Up @@ -2196,7 +2196,8 @@ int Http2Stream::DoWrite(WriteWrap* req_wrap,
// Store the req_wrap on the last write info in the queue, so that it is
// only marked as finished once all buffers associated with it are finished.
queue_.emplace(NgHttp2StreamWrite {
i == nbufs - 1 ? req_wrap : nullptr,
BaseObjectPtr<AsyncWrap>(
i == nbufs - 1 ? req_wrap->GetAsyncWrap() : nullptr),
bufs[i]
});
IncrementAvailableOutboundLength(bufs[i].len);
Expand Down Expand Up @@ -2290,10 +2291,11 @@ ssize_t Http2Stream::Provider::Stream::OnRead(nghttp2_session* handle,
// find out when the HTTP2 stream wants to consume data, and because the
// StreamBase API allows empty input chunks.
while (!stream->queue_.empty() && stream->queue_.front().buf.len == 0) {
WriteWrap* finished = stream->queue_.front().req_wrap;
BaseObjectPtr<AsyncWrap> finished =
std::move(stream->queue_.front().req_wrap);
stream->queue_.pop();
if (finished != nullptr)
finished->Done(0);
if (finished)
WriteWrap::FromObject(finished)->Done(0);
}

if (!stream->queue_.empty()) {
Expand Down Expand Up @@ -2919,8 +2921,8 @@ void Http2Ping::DetachFromSession() {
}

void NgHttp2StreamWrite::MemoryInfo(MemoryTracker* tracker) const {
if (req_wrap != nullptr)
tracker->TrackField("req_wrap", req_wrap->GetAsyncWrap());
if (req_wrap)
tracker->TrackField("req_wrap", req_wrap);
tracker->TrackField("buf", buf);
}

Expand Down
6 changes: 3 additions & 3 deletions src/node_http2.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,12 @@ using Http2Headers = NgHeaders<Http2HeadersTraits>;
using Http2RcBufferPointer = NgRcBufPointer<Http2RcBufferPointerTraits>;

struct NgHttp2StreamWrite : public MemoryRetainer {
WriteWrap* req_wrap = nullptr;
BaseObjectPtr<AsyncWrap> req_wrap;
uv_buf_t buf;

inline explicit NgHttp2StreamWrite(uv_buf_t buf_) : buf(buf_) {}
inline NgHttp2StreamWrite(WriteWrap* req, uv_buf_t buf_) :
req_wrap(req), buf(buf_) {}
inline NgHttp2StreamWrite(BaseObjectPtr<AsyncWrap> req_wrap, uv_buf_t buf_) :
req_wrap(std::move(req_wrap)), buf(buf_) {}

void MemoryInfo(MemoryTracker* tracker) const override;
SET_MEMORY_INFO_NAME(NgHttp2StreamWrite)
Expand Down
22 changes: 22 additions & 0 deletions src/stream_base-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,28 @@ StreamBase* StreamBase::FromObject(v8::Local<v8::Object> obj) {
StreamBase::kStreamBaseField));
}

WriteWrap* WriteWrap::FromObject(v8::Local<v8::Object> req_wrap_obj) {
return static_cast<WriteWrap*>(StreamReq::FromObject(req_wrap_obj));
}

template <typename T, bool kIsWeak>
WriteWrap* WriteWrap::FromObject(
const BaseObjectPtrImpl<T, kIsWeak>& base_obj) {
if (!base_obj) return nullptr;
return FromObject(base_obj->object());
}

ShutdownWrap* ShutdownWrap::FromObject(v8::Local<v8::Object> req_wrap_obj) {
return static_cast<ShutdownWrap*>(StreamReq::FromObject(req_wrap_obj));
}

template <typename T, bool kIsWeak>
ShutdownWrap* ShutdownWrap::FromObject(
const BaseObjectPtrImpl<T, kIsWeak>& base_obj) {
if (!base_obj) return nullptr;
return FromObject(base_obj->object());
}

void WriteWrap::SetAllocatedStorage(AllocatedBuffer&& storage) {
CHECK_NULL(storage_.data());
storage_ = std::move(storage);
Expand Down
8 changes: 6 additions & 2 deletions src/stream_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -621,12 +621,16 @@ StreamResource::~StreamResource() {

ShutdownWrap* StreamBase::CreateShutdownWrap(
Local<Object> object) {
return new SimpleShutdownWrap<AsyncWrap>(this, object);
auto* wrap = new SimpleShutdownWrap<AsyncWrap>(this, object);
wrap->MakeWeak();
return wrap;
}

WriteWrap* StreamBase::CreateWriteWrap(
Local<Object> object) {
return new SimpleWriteWrap<AsyncWrap>(this, object);
auto* wrap = new SimpleWriteWrap<AsyncWrap>(this, object);
wrap->MakeWeak();
return wrap;
}

} // namespace node
10 changes: 10 additions & 0 deletions src/stream_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ class ShutdownWrap : public StreamReq {
StreamBase* stream,
v8::Local<v8::Object> req_wrap_obj);

static inline ShutdownWrap* FromObject(v8::Local<v8::Object> req_wrap_obj);
template <typename T, bool kIsWeak>
static inline ShutdownWrap* FromObject(
const BaseObjectPtrImpl<T, kIsWeak>& base_obj);

// Call stream()->EmitAfterShutdown() and dispose of this request wrap.
void OnDone(int status) override;
};
Expand All @@ -89,6 +94,11 @@ class WriteWrap : public StreamReq {
StreamBase* stream,
v8::Local<v8::Object> req_wrap_obj);

static inline WriteWrap* FromObject(v8::Local<v8::Object> req_wrap_obj);
template <typename T, bool kIsWeak>
static inline WriteWrap* FromObject(
const BaseObjectPtrImpl<T, kIsWeak>& base_obj);

// Call stream()->EmitAfterWrite() and dispose of this request wrap.
void OnDone(int status) override;

Expand Down
29 changes: 16 additions & 13 deletions src/tls_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,10 @@ bool TLSWrap::InvokeQueued(int status, const char* error_str) {
if (!write_callback_scheduled_)
return false;

if (current_write_ != nullptr) {
WriteWrap* w = current_write_;
current_write_ = nullptr;
if (current_write_) {
BaseObjectPtr<AsyncWrap> current_write = std::move(current_write_);
current_write_.reset();
WriteWrap* w = WriteWrap::FromObject(current_write);
w->Done(status, error_str);
}

Expand Down Expand Up @@ -301,7 +302,7 @@ void TLSWrap::EncOut() {
}

// Split-off queue
if (established_ && current_write_ != nullptr) {
if (established_ && current_write_) {
Debug(this, "EncOut() setting write_callback_scheduled_");
write_callback_scheduled_ = true;
}
Expand Down Expand Up @@ -372,10 +373,12 @@ void TLSWrap::EncOut() {

void TLSWrap::OnStreamAfterWrite(WriteWrap* req_wrap, int status) {
Debug(this, "OnStreamAfterWrite(status = %d)", status);
if (current_empty_write_ != nullptr) {
if (current_empty_write_) {
Debug(this, "Had empty write");
WriteWrap* finishing = current_empty_write_;
current_empty_write_ = nullptr;
BaseObjectPtr<AsyncWrap> current_empty_write =
std::move(current_empty_write_);
current_empty_write_.reset();
WriteWrap* finishing = WriteWrap::FromObject(current_empty_write);
finishing->Done(status);
return;
}
Expand Down Expand Up @@ -735,23 +738,23 @@ int TLSWrap::DoWrite(WriteWrap* w,
ClearOut();
if (BIO_pending(enc_out_) == 0) {
Debug(this, "No pending encrypted output, writing to underlying stream");
CHECK_NULL(current_empty_write_);
current_empty_write_ = w;
CHECK(!current_empty_write_);
current_empty_write_.reset(w->GetAsyncWrap());
StreamWriteResult res =
underlying_stream()->Write(bufs, count, send_handle);
if (!res.async) {
BaseObjectPtr<TLSWrap> strong_ref{this};
env()->SetImmediate([this, strong_ref](Environment* env) {
OnStreamAfterWrite(current_empty_write_, 0);
OnStreamAfterWrite(WriteWrap::FromObject(current_empty_write_), 0);
});
}
return 0;
}
}

// Store the current write wrap
CHECK_NULL(current_write_);
current_write_ = w;
CHECK(!current_write_);
current_write_.reset(w->GetAsyncWrap());

// Write encrypted data to underlying stream and call Done().
if (length == 0) {
Expand Down Expand Up @@ -804,7 +807,7 @@ int TLSWrap::DoWrite(WriteWrap* w,
// If we stopped writing because of an error, it's fatal, discard the data.
if (!arg.IsEmpty()) {
Debug(this, "Got SSL error (%d), returning UV_EPROTO", err);
current_write_ = nullptr;
current_write_.reset();
return UV_EPROTO;
}

Expand Down
4 changes: 2 additions & 2 deletions src/tls_wrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,9 @@ class TLSWrap : public AsyncWrap,
// Waiting for ClearIn() to pass to SSL_write().
AllocatedBuffer pending_cleartext_input_;
size_t write_size_ = 0;
WriteWrap* current_write_ = nullptr;
BaseObjectPtr<AsyncWrap> current_write_;
bool in_dowrite_ = false;
WriteWrap* current_empty_write_ = nullptr;
BaseObjectPtr<AsyncWrap> current_empty_write_;
bool write_callback_scheduled_ = false;
bool started_ = false;
bool established_ = false;
Expand Down