diff --git a/src/async_wrap.h b/src/async_wrap.h index 735ae1f7dbe54e..b52b4d7ee7d69a 100644 --- a/src/async_wrap.h +++ b/src/async_wrap.h @@ -60,8 +60,10 @@ namespace node { V(PROCESSWRAP) \ V(PROMISE) \ V(QUERYWRAP) \ + V(QUIC_ENDPOINT) \ V(QUIC_LOGSTREAM) \ V(QUIC_PACKET) \ + V(QUIC_UDP) \ V(SHUTDOWNWRAP) \ V(SIGNALWRAP) \ V(STATWATCHER) \ diff --git a/src/quic/bindingdata.cc b/src/quic/bindingdata.cc index 9690031773781b..13baa23c54250b 100644 --- a/src/quic/bindingdata.cc +++ b/src/quic/bindingdata.cc @@ -65,6 +65,7 @@ void BindingData::Initialize(Environment* env, Local target) { void BindingData::RegisterExternalReferences( ExternalReferenceRegistry* registry) { + registry->Register(IllegalConstructor); registry->Register(SetCallbacks); registry->Register(FlushPacketFreelist); } @@ -199,6 +200,15 @@ bool NgHttp3CallbackScope::in_nghttp3_callback(Environment* env) { return binding.in_nghttp3_callback_scope; } +CallbackScopeBase::CallbackScopeBase(Environment* env) + : env(env), context_scope(env->context()), try_catch(env->isolate()) {} + +CallbackScopeBase::~CallbackScopeBase() { + if (try_catch.HasCaught() && !try_catch.HasTerminated()) { + errors::TriggerUncaughtException(env->isolate(), try_catch); + } +} + void IllegalConstructor(const FunctionCallbackInfo& args) { THROW_ERR_ILLEGAL_CONSTRUCTOR(Environment::GetCurrent(args)); } diff --git a/src/quic/bindingdata.h b/src/quic/bindingdata.h index e9e8e719c4d892..a5bfe57d8ea6e6 100644 --- a/src/quic/bindingdata.h +++ b/src/quic/bindingdata.h @@ -12,6 +12,8 @@ #include #include #include +#include +#include #include namespace node { @@ -26,6 +28,9 @@ enum class Side { }; constexpr size_t kDefaultMaxPacketLength = NGTCP2_MAX_UDP_PAYLOAD_SIZE; +constexpr size_t kMaxSizeT = std::numeric_limits::max(); +constexpr uint64_t kMaxSafeJsInteger = 9007199254740991; +constexpr auto kSocketAddressInfoTimeout = 60 * NGTCP2_SECONDS; // ============================================================================ @@ -44,7 +49,6 @@ constexpr size_t kDefaultMaxPacketLength = NGTCP2_MAX_UDP_PAYLOAD_SIZE; // internalBinding('quic') is first loaded. #define QUIC_JS_CALLBACKS(V) \ V(endpoint_close, EndpointClose) \ - V(endpoint_error, EndpointError) \ V(session_new, SessionNew) \ V(session_close, SessionClose) \ V(session_error, SessionError) \ @@ -66,12 +70,15 @@ constexpr size_t kDefaultMaxPacketLength = NGTCP2_MAX_UDP_PAYLOAD_SIZE; #define QUIC_STRINGS(V) \ V(ack_delay_exponent, "ackDelayExponent") \ V(active_connection_id_limit, "activeConnectionIDLimit") \ + V(address_lru_size, "addressLRUSize") \ V(alpn, "alpn") \ V(ca, "ca") \ V(certs, "certs") \ + V(cc_algorithm, "cc") \ V(crl, "crl") \ V(ciphers, "ciphers") \ V(disable_active_migration, "disableActiveMigration") \ + V(disable_stateless_reset, "disableStatelessReset") \ V(enable_tls_trace, "tlsTrace") \ V(endpoint, "Endpoint") \ V(endpoint_udp, "Endpoint::UDP") \ @@ -84,18 +91,35 @@ constexpr size_t kDefaultMaxPacketLength = NGTCP2_MAX_UDP_PAYLOAD_SIZE; V(initial_max_stream_data_uni, "initialMaxStreamDataUni") \ V(initial_max_streams_bidi, "initialMaxStreamsBidi") \ V(initial_max_streams_uni, "initialMaxStreamsUni") \ + V(ipv6_only, "ipv6Only") \ V(keylog, "keylog") \ V(keys, "keys") \ V(logstream, "LogStream") \ V(max_ack_delay, "maxAckDelay") \ + V(max_connections_per_host, "maxConnectionsPerHost") \ + V(max_connections_total, "maxConnectionsTotal") \ V(max_datagram_frame_size, "maxDatagramFrameSize") \ V(max_idle_timeout, "maxIdleTimeout") \ + V(max_payload_size, "maxPayloadSize") \ + V(max_retries, "maxRetries") \ + V(max_stateless_resets, "maxStatelessResetsPerHost") \ V(packetwrap, "PacketWrap") \ V(reject_unauthorized, "rejectUnauthorized") \ + V(retry_token_expiration, "retryTokenExpiration") \ V(request_peer_certificate, "requestPeerCertificate") \ + V(reset_token_secret, "resetTokenSecret") \ + V(rx_loss, "rxDiagnosticLoss") \ V(session, "Session") \ V(session_id_ctx, "sessionIDContext") \ V(stream, "Stream") \ + V(token_expiration, "tokenExpiration") \ + V(token_secret, "tokenSecret") \ + V(tx_loss, "txDiagnosticLoss") \ + V(udp_receive_buffer_size, "udpReceiveBufferSize") \ + V(udp_send_buffer_size, "udpSendBufferSize") \ + V(udp_ttl, "udpTTL") \ + V(unacknowledged_packet_threshold, "unacknowledgedPacketThreshold") \ + V(validate_address, "validateAddress") \ V(verify_hostname_identity, "verifyHostnameIdentity") // ============================================================================= @@ -133,6 +157,8 @@ class BindingData final std::vector> packet_freelist; + std::unordered_map> listening_endpoints; + // Purge the packet free list to free up memory. static void FlushPacketFreelist( const v8::FunctionCallbackInfo& args); @@ -203,6 +229,26 @@ struct NgHttp3CallbackScope { static bool in_nghttp3_callback(Environment* env); }; +struct CallbackScopeBase { + Environment* env; + v8::Context::Scope context_scope; + v8::TryCatch try_catch; + + explicit CallbackScopeBase(Environment* env); + CallbackScopeBase(const CallbackScopeBase&) = delete; + CallbackScopeBase(CallbackScopeBase&&) = delete; + CallbackScopeBase& operator=(const CallbackScopeBase&) = delete; + CallbackScopeBase& operator=(CallbackScopeBase&&) = delete; + ~CallbackScopeBase(); +}; + +template +struct CallbackScope final : public CallbackScopeBase { + BaseObjectPtr ref; + explicit CallbackScope(const T* ptr) + : CallbackScopeBase(ptr->env()), ref(ptr) {} +}; + } // namespace quic } // namespace node diff --git a/src/quic/defs.h b/src/quic/defs.h index 6b8048d040e991..d7123d16f11cab 100644 --- a/src/quic/defs.h +++ b/src/quic/defs.h @@ -90,13 +90,20 @@ uint64_t GetStat(Stats* stats) { return stats->*member; } -#define STAT_INCREMENT(Type, name) IncrementStat(&stats_); +#define STAT_INCREMENT(Type, name) \ + IncrementStat(stats_.Data()); #define STAT_INCREMENT_N(Type, name, amt) \ - IncrementStat(&stats_, amt); + IncrementStat(stats_.Data(), amt); #define STAT_RECORD_TIMESTAMP(Type, name) \ - RecordTimestampStat(&stats_); -#define STAT_SET(Type, name, val) SetStat(&stats_, val); -#define STAT_GET(Type, name) GetStat(&stats_); + RecordTimestampStat(stats_.Data()); +#define STAT_SET(Type, name, val) \ + SetStat(stats_.Data(), val); +#define STAT_GET(Type, name) GetStat(stats_.Data()); +#define STAT_FIELD(_, name) uint64_t name; +#define STAT_STRUCT(name) \ + struct Stats final { \ + name##_STATS(STAT_FIELD) \ + }; } // namespace quic } // namespace node diff --git a/src/quic/endpoint.cc b/src/quic/endpoint.cc new file mode 100644 index 00000000000000..4b29de39a8df0d --- /dev/null +++ b/src/quic/endpoint.cc @@ -0,0 +1,1343 @@ +#if HAVE_OPENSSL && NODE_OPENSSL_HAS_QUIC + +#include "endpoint.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "defs.h" + +namespace node { + +using v8::ArrayBufferView; +using v8::BackingStore; +using v8::FunctionCallbackInfo; +using v8::FunctionTemplate; +using v8::HandleScope; +using v8::Int32; +using v8::Integer; +using v8::Just; +using v8::Local; +using v8::Maybe; +using v8::Nothing; +using v8::Number; +using v8::Object; +using v8::PropertyAttribute; +using v8::String; +using v8::Value; + +namespace quic { + +// ============================================================================ + +namespace { +#ifdef DEBUG +bool is_diagnostic_packet_loss(double probability) { + if (LIKELY(probability == 0.0)) return false; + unsigned char c = 255; + CHECK(crypto::CSPRNG(&c, 1).is_ok()); + return (static_cast(c) / 255) < probability; +} +#endif // DEBUG + +template +bool SetOption(Environment* env, + Opt* options, + const v8::Local& object, + const v8::Local& name) { + v8::Local value; + if (!object->Get(env->context(), name).ToLocal(&value)) return false; + if (!value->IsUndefined()) { + int num = value.As()->Value(); + switch (num) { + case NGTCP2_CC_ALGO_RENO: + [[fallthrough]]; + case NGTCP2_CC_ALGO_CUBIC: + [[fallthrough]]; + case NGTCP2_CC_ALGO_BBR: + [[fallthrough]]; + case NGTCP2_CC_ALGO_BBR2: + break; + default: + THROW_ERR_INVALID_ARG_VALUE(env, "The cc_algorithm is invalid"); + return false; + } + options->*member = static_cast(num); + } + return true; +} + +template +bool SetOption(Environment* env, + Opt* options, + const v8::Local& object, + const v8::Local& name) { + v8::Local value; + if (!object->Get(env->context(), name).ToLocal(&value)) return false; + if (!value->IsUndefined()) { + CHECK(value->IsNumber()); + options->*member = value.As()->Value(); + } + return true; +} + +template +bool SetOption(Environment* env, + Opt* options, + const v8::Local& object, + const v8::Local& name) { + v8::Local value; + if (!object->Get(env->context(), name).ToLocal(&value)) return false; + if (!value->IsUndefined()) { + CHECK(value->IsNumber()); + options->*member = value.As()->Value(); + } + return true; +} + +template +bool SetOption(Environment* env, + Opt* options, + const v8::Local& object, + const v8::Local& name) { + v8::Local value; + if (!object->Get(env->context(), name).ToLocal(&value)) return false; + if (!value->IsUndefined()) { + CHECK(value->IsNumber()); + options->*member = value.As()->Value(); + } + return true; +} + +template +bool SetOption(Environment* env, + Opt* options, + const v8::Local& object, + const v8::Local& name) { + v8::Local value; + if (!object->Get(env->context(), name).ToLocal(&value)) return false; + if (!value->IsUndefined()) { + CHECK(value->IsArrayBufferView()); + Store store(value.As()); + CHECK_EQ(store.length(), TokenSecret::QUIC_TOKENSECRET_LEN); + ngtcp2_vec buf = store; + options->*member = buf.base; + } + return true; +} +} // namespace + +Maybe Endpoint::Options::From(Environment* env, + Local value) { + if (value.IsEmpty() || !value->IsObject()) { + THROW_ERR_INVALID_ARG_TYPE(env, "options must be an object"); + return Nothing(); + } + + auto& state = BindingData::Get(env); + auto params = value.As(); + Options options; + +#define SET(name) \ + SetOption( \ + env, &options, params, state.name##_string()) + + if (!SET(retry_token_expiration) || !SET(token_expiration) || + !SET(max_connections_per_host) || !SET(max_connections_total) || + !SET(max_stateless_resets) || !SET(address_lru_size) || + !SET(max_retries) || !SET(max_payload_size) || + !SET(unacknowledged_packet_threshold) || !SET(validate_address) || + !SET(disable_stateless_reset) || !SET(ipv6_only) || +#ifdef DEBUG + !SET(rx_loss) || !SET(tx_loss) || +#endif + !SET(cc_algorithm) || !SET(udp_receive_buffer_size) || + !SET(udp_send_buffer_size) || !SET(udp_ttl) || !SET(reset_token_secret) || + !SET(token_secret)) { + return Nothing(); + } + + return Just(options); + +#undef SET +} + +void Endpoint::Options::MemoryInfo(MemoryTracker* tracker) const { + tracker->TrackField("reset_token_secret", reset_token_secret); + tracker->TrackField("token_secret", token_secret); +} + +// ====================================================================================== +// Endpoint::UDP and Endpoint::UDP::Impl + +class Endpoint::UDP::Impl final : public HandleWrap { + public: + static Local GetConstructorTemplate(Environment* env) { + auto& state = BindingData::Get(env); + auto tmpl = state.udp_constructor_template(); + if (tmpl.IsEmpty()) { + tmpl = NewFunctionTemplate(env->isolate(), IllegalConstructor); + tmpl->Inherit(HandleWrap::GetConstructorTemplate(env)); + tmpl->InstanceTemplate()->SetInternalFieldCount( + HandleWrap::kInternalFieldCount); + tmpl->SetClassName(state.endpoint_udp_string()); + state.set_udp_constructor_template(tmpl); + } + return tmpl; + } + + static BaseObjectPtr Create(Endpoint* endpoint) { + Local obj; + if (!GetConstructorTemplate(endpoint->env()) + ->InstanceTemplate() + ->NewInstance(endpoint->env()->context()) + .ToLocal(&obj)) { + return BaseObjectPtr(); + } + + return MakeDetachedBaseObject(endpoint, obj); + } + + static Impl* From(uv_udp_t* handle) { + return ContainerOf(&Impl::handle_, handle); + } + + static Impl* From(uv_handle_t* handle) { + return From(reinterpret_cast(handle)); + } + + Impl(Endpoint* endpoint, Local object) + : HandleWrap(endpoint->env(), + object, + reinterpret_cast(&handle_), + AsyncWrap::PROVIDER_QUIC_UDP), + endpoint_(endpoint) { + CHECK_EQ(uv_udp_init(endpoint->env()->event_loop(), &handle_), 0); + handle_.data = this; + } + + SET_NO_MEMORY_INFO() + SET_MEMORY_INFO_NAME(Endpoint::UDP::Impl) + SET_SELF_SIZE(Impl) + + private: + static void ClosedCb(uv_handle_t* handle) { + std::unique_ptr ptr(From(handle)); + } + + static void OnAlloc(uv_handle_t* handle, + size_t suggested_size, + uv_buf_t* buf) { + *buf = From(handle)->env()->allocate_managed_buffer(suggested_size); + } + + static void OnReceive(uv_udp_t* handle, + ssize_t nread, + const uv_buf_t* buf, + const sockaddr* addr, + unsigned int flags) { + // Nothing to do in these cases. Specifically, if the nread + // is zero or we've received a partial packet, we're just + // going to ignore it. + if (nread == 0 || flags & UV_UDP_PARTIAL) return; + + auto impl = From(handle); + DCHECK_NOT_NULL(impl); + DCHECK_NOT_NULL(impl->endpoint_); + + if (nread < 0) { + impl->endpoint_->Destroy(CloseContext::RECEIVE_FAILURE, + static_cast(nread)); + return; + } + + impl->endpoint_->Receive(uv_buf_t{buf->base, static_cast(nread)}, + SocketAddress(addr)); + } + + uv_udp_t handle_; + Endpoint* endpoint_; + + friend class UDP; +}; + +Endpoint::UDP::UDP(Endpoint* endpoint) : impl_(Impl::Create(endpoint)) { + endpoint->env()->AddCleanupHook(CleanupHook, this); +} + +Endpoint::UDP::~UDP() { + Close(); +} + +int Endpoint::UDP::Bind(const Endpoint::Options& options) { + if (is_bound_) return UV_EALREADY; + if (is_closed() || impl_->IsHandleClosing()) return UV_EBADF; + + int flags = 0; + if (options.local_address.family() == AF_INET6 && options.ipv6_only) + flags |= UV_UDP_IPV6ONLY; + int err = uv_udp_bind(&impl_->handle_, options.local_address.data(), flags); + int size; + + if (!err) { + is_bound_ = true; + size = static_cast(options.udp_receive_buffer_size); + if (size > 0) { + err = uv_recv_buffer_size(reinterpret_cast(&impl_->handle_), + &size); + if (err) return err; + } + + size = static_cast(options.udp_send_buffer_size); + if (size > 0) { + err = uv_send_buffer_size(reinterpret_cast(&impl_->handle_), + &size); + if (err) return err; + } + + size = static_cast(options.udp_ttl); + if (size > 0) { + err = uv_udp_set_ttl(&impl_->handle_, size); + if (err) return err; + } + } + + return err; +} + +void Endpoint::UDP::Ref() { + if (!is_closed()) uv_ref(reinterpret_cast(&impl_->handle_)); +} + +void Endpoint::UDP::Unref() { + if (!is_closed()) uv_unref(reinterpret_cast(&impl_->handle_)); +} + +int Endpoint::UDP::Start() { + if (is_closed() || impl_->IsHandleClosing()) return UV_EBADF; + if (is_started_) return 0; + int err = uv_udp_recv_start(&impl_->handle_, Impl::OnAlloc, Impl::OnReceive); + is_started_ = (err == 0); + return err; +} + +void Endpoint::UDP::Stop() { + if (is_closed() || impl_->IsHandleClosing() || !is_started_) return; + USE(uv_udp_recv_stop(&impl_->handle_)); + is_started_ = false; +} + +void Endpoint::UDP::Close() { + if (is_closed() || impl_->IsHandleClosing()) return; + Stop(); + is_bound_ = false; + impl_->env()->RemoveCleanupHook(CleanupHook, this); + impl_->Close(); + impl_.reset(); +} + +bool Endpoint::UDP::is_bound() const { + return is_bound_; +} + +bool Endpoint::UDP::is_closed() const { + return !impl_; +} +Endpoint::UDP::operator bool() const { + return !impl_; +} + +SocketAddress Endpoint::UDP::local_address() const { + CHECK(!is_closed() && is_bound()); + return SocketAddress::FromSockName(impl_->handle_); +} + +int Endpoint::UDP::Send(BaseObjectPtr packet) { + if (is_closed() || impl_->IsHandleClosing()) return UV_EBADF; + CHECK(packet && !packet->is_sending()); + uv_buf_t buf = *packet; + return packet->Dispatch( + uv_udp_send, + &impl_->handle_, + &buf, + 1, + packet->destination().data(), + uv_udp_send_cb{[](uv_udp_send_t* req, int status) { + auto ptr = static_cast(ReqWrap::from_req(req)); + ptr->Done(status); + }}); +} + +void Endpoint::UDP::MemoryInfo(MemoryTracker* tracker) const { + if (impl_) tracker->TrackField("impl", impl_); +} + +void Endpoint::UDP::CleanupHook(void* data) { + static_cast(data)->Close(); +} + +// ============================================================================ + +bool Endpoint::HasInstance(Environment* env, Local value) { + return GetConstructorTemplate(env)->HasInstance(value); +} + +Local Endpoint::GetConstructorTemplate(Environment* env) { + auto& state = BindingData::Get(env); + auto tmpl = state.endpoint_constructor_template(); + if (tmpl.IsEmpty()) { + auto isolate = env->isolate(); + tmpl = NewFunctionTemplate(isolate, IllegalConstructor); + tmpl->Inherit(AsyncWrap::GetConstructorTemplate(env)); + tmpl->SetClassName(state.endpoint_string()); + tmpl->InstanceTemplate()->SetInternalFieldCount( + Endpoint::kInternalFieldCount); + SetProtoMethod(isolate, tmpl, "listen", DoListen); + SetProtoMethod(isolate, tmpl, "closeGracefully", DoCloseGracefully); + SetProtoMethod(isolate, tmpl, "connect", DoConnect); + SetProtoMethod(isolate, tmpl, "markBusy", MarkBusy); + SetProtoMethod(isolate, tmpl, "ref", Ref); + SetProtoMethod(isolate, tmpl, "unref", Unref); + SetProtoMethodNoSideEffect(isolate, tmpl, "address", LocalAddress); + state.set_endpoint_constructor_template(tmpl); + } + return tmpl; +} + +void Endpoint::Initialize(Environment* env, Local target) { + SetMethod(env->context(), target, "createEndpoint", CreateEndpoint); + +#define V(name, _) NODE_DEFINE_CONSTANT(target, IDX_STATS_ENDPOINT_##name); + ENDPOINT_STATS(V) +#undef V +#define V(name, _, __) NODE_DEFINE_CONSTANT(target, IDX_STATE_ENDPOINT_##name); + ENDPOINT_STATE(V) +#undef V +} + +void Endpoint::RegisterExternalReferences(ExternalReferenceRegistry* registry) { + registry->Register(CreateEndpoint); + registry->Register(DoConnect); + registry->Register(DoListen); + registry->Register(DoCloseGracefully); + registry->Register(LocalAddress); + registry->Register(Ref); + registry->Register(Unref); +} + +BaseObjectPtr Endpoint::Create(Environment* env, + const Endpoint::Options& options) { + Local obj; + if (!GetConstructorTemplate(env) + ->InstanceTemplate() + ->NewInstance(env->context()) + .ToLocal(&obj)) { + return BaseObjectPtr(); + } + + return MakeDetachedBaseObject(env, obj, options); +} + +Endpoint::Endpoint(Environment* env, + Local object, + const Endpoint::Options& options) + : AsyncWrap(env, object, AsyncWrap::PROVIDER_QUIC_ENDPOINT), + stats_(env->isolate()), + state_(env->isolate()), + options_(std::move(options)), + udp_(this), + addrLRU_(options_.address_lru_size) { + MakeWeak(); + + const auto defineProperty = [&](auto name, auto value) { + object + ->DefineOwnProperty( + env->context(), name, value, PropertyAttribute::ReadOnly) + .Check(); + }; + + defineProperty(env->state_string(), state_.GetArrayBuffer()); + defineProperty(env->stats_string(), stats_.GetArrayBuffer()); +} + +Endpoint::~Endpoint() { + udp_.Close(); + DCHECK_EQ(state_->pending_callbacks, 0); + DCHECK(sessions_.empty()); + DCHECK(is_closed()); +} + +SocketAddress Endpoint::local_address() const { + CHECK(!is_closed()); + return udp_.local_address(); +} + +void Endpoint::MarkAsBusy(bool on) { + state_->busy = on ? 1 : 0; +} + +Maybe Endpoint::GenerateNewToken( + uint32_t version, const SocketAddress& remote_address) { + if (is_closed() || is_closing()) { + THROW_ERR_INVALID_STATE(env(), + "Endpoint is closed. Unable to create token."); + return Nothing(); + } + + return Just(RegularToken(version, remote_address, options_.token_secret)); +} + +Maybe Endpoint::GenerateNewStatelessResetToken( + uint8_t* token, const CID& cid) const { + if (is_closed() || is_closing()) { + THROW_ERR_INVALID_STATE(env(), + "Endpoint is closed. Unable to create token."); + return Nothing(); + } + return Just(StatelessResetToken(token, options_.reset_token_secret, cid)); +} + +void Endpoint::AddSession(const CID& cid, BaseObjectPtr session) { + if (is_closed() || is_closing()) return; + sessions_[cid] = session; + IncrementSocketAddressCounter(session->remote_address()); + if (session->is_server()) { + STAT_INCREMENT(Stats, server_sessions); + } else { + STAT_INCREMENT(Stats, client_sessions); + } + if (session->is_server()) EmitNewSession(session); +} + +void Endpoint::RemoveSession(const CID& cid) { + if (is_closed()) return; + auto session = FindSession(cid); + if (!session) return; + DecrementSocketAddressCounter(session->remote_address()); + sessions_.erase(cid); + if (state_->waiting_for_callbacks == 1) MaybeDestroy(); +} + +BaseObjectPtr Endpoint::FindSession(const CID& cid) { + BaseObjectPtr session; + auto session_it = sessions_.find(cid); + if (session_it == std::end(sessions_)) { + auto scid_it = dcid_to_scid_.find(cid); + if (scid_it != std::end(dcid_to_scid_)) { + session_it = sessions_.find(scid_it->second); + CHECK_NE(session_it, std::end(sessions_)); + session = session_it->second; + } + } else { + session = session_it->second; + } + return session; +} + +void Endpoint::AssociateCID(const CID& cid, const CID& scid) { + if (!is_closed() && !is_closing() && cid && scid && cid != scid && + dcid_to_scid_[cid] != scid) { + dcid_to_scid_.emplace(cid, scid); + } +} + +void Endpoint::DisassociateCID(const CID& cid) { + if (!is_closed() && cid) dcid_to_scid_.erase(cid); +} + +void Endpoint::AssociateStatelessResetToken(const StatelessResetToken& token, + Session* session) { + if (is_closed() || is_closing()) return; + token_map_[token] = session; +} + +void Endpoint::DisassociateStatelessResetToken( + const StatelessResetToken& token) { + if (!is_closed()) token_map_.erase(token); +} + +void Endpoint::Send(BaseObjectPtr packet) { +#ifdef DEBUG + // When diagnostic packet loss is enabled, the packet will be randomly + // dropped. This can happen to any type of packet. We use this only in + // testing to test various reliability issues. + if (UNLIKELY(is_diagnostic_packet_loss(options_.tx_loss))) { + packet->Done(0); + // Simulating tx packet loss + return; + } +#endif // DEBUG + + if (is_closed() || is_closing() || packet->length() == 0) return; + state_->pending_callbacks++; + int err = udp_.Send(packet); + + if (err != 0) { + packet->Done(err); + Destroy(CloseContext::SEND_FAILURE, err); + } + STAT_INCREMENT_N(Stats, bytes_sent, packet->length()); + STAT_INCREMENT(Stats, packets_sent); +} + +void Endpoint::SendRetry(const PathDescriptor& options) { + // Generating and sending retry packets does consume some system resources, + // and it is possible for a malicious peer to trigger sending a large number + // of retry packets, resulting in a potential DOS vector. To help ward that + // off, we track how many retry packets we send to a particular host and + // enforce limits. Note that since we are using an LRU cache these limits + // aren't strict. If a retry is sent, we increment the retry_count statistic + // to give application code a means of detecting and responding to abuse on + // its own. What this count does not give is the rate of retry, so it is still + // somewhat limited. + auto info = addrLRU_.Upsert(options.remote_address); + if (++(info->retry_count) <= options_.max_retries) { + auto packet = + Packet::CreateRetryPacket(env(), this, options, options_.token_secret); + if (packet) { + STAT_INCREMENT(Stats, retry_count); + Send(std::move(packet)); + } + + // If creating the retry is unsuccessful, we just drop things on the floor. + // It's not worth committing any further resources to this one packet. We + // might want to log the failure at some point tho. + } +} + +void Endpoint::SendVersionNegotiation(const PathDescriptor& options) { + // While creating and sending a version negotiation packet does consume a + // small amount of system resources, and while it is fairly trivial for a + // malicious peer to force a version negotiation to be sent, these are more + // trivial to create than the cryptographically generated retry and stateless + // reset packets. If the packet is sent, then we'll at least increment the + // version_negotiation_count statistic so that application code can keep an + // eye on it. + auto packet = Packet::CreateVersionNegotiationPacket(env(), this, options); + if (packet) { + STAT_INCREMENT(Stats, version_negotiation_count); + Send(std::move(packet)); + } + + // If creating the packet is unsuccessful, we just drop things on the floor. + // It's not worth committing any further resources to this one packet. We + // might want to log the failure at some point tho. +} + +bool Endpoint::SendStatelessReset(const PathDescriptor& options, + size_t source_len) { + if (UNLIKELY(options_.disable_stateless_reset)) return false; + + const auto exceeds_limits = [&] { + SocketAddressInfoTraits::Type* counts = + addrLRU_.Peek(options.remote_address); + auto count = counts != nullptr ? counts->reset_count : 0; + return count >= options_.max_stateless_resets; + }; + + // Per the QUIC spec, we need to protect against sending too many stateless + // reset tokens to an endpoint to prevent endless looping. + if (exceeds_limits()) return false; + + auto packet = Packet::CreateStatelessResetPacket( + env(), this, options, options_.reset_token_secret, source_len); + + if (packet) { + addrLRU_.Upsert(options.remote_address)->reset_count++; + STAT_INCREMENT(Stats, stateless_reset_count); + Send(std::move(packet)); + return true; + } + return false; +} + +void Endpoint::SendImmediateConnectionClose(const PathDescriptor& options, + QuicError reason) { + // While it is possible for a malicious peer to cause us to create a large + // number of these, generating them is fairly trivial. + auto packet = Packet::CreateImmediateConnectionClosePacket( + env(), this, options, reason); + if (packet) { + STAT_INCREMENT(Stats, immediate_close_count); + Send(std::move(packet)); + } +} + +bool Endpoint::Start() { + if (is_closed() || is_closing()) return false; + if (state_->receiving == 1) return true; + + int err = 0; + if (state_->bound == 0) { + err = udp_.Bind(options_); + if (err != 0) { + // If we failed to bind, destroy the endpoint. There's nothing we can do. + Destroy(CloseContext::BIND_FAILURE, err); + return false; + } + state_->bound = 1; + } + + err = udp_.Start(); + if (err != 0) { + // If we failed to start listening, destroy the endpoint. There's nothing we + // can do. + Destroy(CloseContext::START_FAILURE, err); + return false; + } + + BindingData::Get(env()).listening_endpoints[this] = + BaseObjectPtr(this); + state_->receiving = 1; + return true; +} + +void Endpoint::Listen(const Session::Options& options) { + if (is_closed() || is_closing() || state_->listening == 1) return; + server_options_ = options; + if (Start()) state_->listening = 1; +} + +BaseObjectPtr Endpoint::Connect( + const SocketAddress& remote_address, + const Session::Options& options, + std::optional sessionTicket) { + // TODO(@jasnell): Implement as part of Session... + // If starting fails, the endpoint will be destroyed. + if (!Start()) return BaseObjectPtr(); + + // auto config = Session::Config( + // Side::CLIENT, + // *this, + // // For client sessions, we always generate a random intial CID for the + // // server. This is generally just a throwaway. The server will generate + // // it's own CID and send that back to us. + // CIDFactory::random().Generate(NGTCP2_MIN_INITIAL_DCIDLEN), + // local_address(), + // remote_address); + + // if (options.qlog) config.EnableQLog(); + + // config.session_ticket = sessionTicket; + + // auto session = + // Session::Create(BaseObjectPtr(this), config, options); + // if (!session) return BaseObjectPtr(); + + // session->set_wrapped(); + + // auto on_exit = OnScopeLeave([&] { session->SendPendingData(); }); + + return BaseObjectPtr(); +} + +void Endpoint::MaybeDestroy() { + if (!is_closing() && sessions_.empty() && state_->pending_callbacks == 0 && + state_->listening == 0) { + Destroy(); + } +} + +void Endpoint::Destroy(CloseContext context, int status) { + if (is_closed() || is_closing()) return; + + STAT_RECORD_TIMESTAMP(Stats, destroyed_at); + + state_->closing = 1; + state_->listening = 0; + + close_context_ = context; + close_status_ = status; + + // If there are open sessions still, shut them down. As those clean themselves + // up, they will remove themselves. The cleanup here will be synchronous and + // no attempt will be made to communicate further with the peer. + // Intentionally copy the sessions map so that we can safely iterate over it + // while those clean themselves up. + auto sessions = sessions_; + for (auto& session : sessions) + session.second->Close(Session::CloseMethod::SILENT); + sessions.clear(); + CHECK(sessions_.empty()); + token_map_.clear(); + dcid_to_scid_.clear(); + + udp_.Close(); + state_->closing = 0; + state_->bound = 0; + state_->receiving = 0; + BindingData::Get(env()).listening_endpoints.erase(this); + + EmitClose(close_context_, close_status_); +} + +void Endpoint::CloseGracefully() { + if (!is_closed() && !is_closing() && state_->waiting_for_callbacks == 0) { + state_->listening = 0; + state_->waiting_for_callbacks = 1; + } + + // Maybe we can go ahead and destroy now? + MaybeDestroy(); +} + +void Endpoint::Receive(const uv_buf_t& buf, + const SocketAddress& remote_address) { + const auto receive = [&](Store&& store, + const SocketAddress& local_address, + const SocketAddress& remote_address, + const CID& dcid, + const CID& scid) { + STAT_INCREMENT_N(Stats, bytes_received, store.length()); + auto session = FindSession(dcid); + return session && !session->is_destroyed() + ? session->Receive( + std::move(store), local_address, remote_address) + : false; + }; + + const auto accept = [&](const Session::Config& config, Store&& store) { + if (is_closed() || is_closing() || !is_listening()) return false; + + auto session = Session::Create( + BaseObjectPtr(this), config, server_options_.value()); + + return session ? session->Receive(std::move(store), + config.local_address, + config.remote_address) + : false; + }; + + const auto acceptInitialPacket = [&](const uint32_t version, + const CID& dcid, + const CID& scid, + Store&& store, + const SocketAddress& local_address, + const SocketAddress& remote_address) { + // Conditionally accept an initial packet to create a new session. + + // If we're not listening, do not accept. + if (state_->listening == 0) return false; + + ngtcp2_pkt_hd hd; + + // This is our first condition check... A minimal check to see if ngtcp2 can + // even recognize this packet as a quic packet with the correct version. + ngtcp2_vec vec = store; + switch (ngtcp2_accept(&hd, vec.base, vec.len)) { + case 1: + // The requested QUIC protocol version is not supported + SendVersionNegotiation( + PathDescriptor{version, dcid, scid, local_address, remote_address}); + // The packet was successfully processed, even if we did refuse the + // connection and send a version negotiation in response. + return true; + case -1: + // The packet is invalid and we're just going to ignore it. + return false; + } + + // This is the second condition check... If the server has been marked busy + // or the remote peer has exceeded their maximum number of concurrent + // connections, any new connections will be shut down immediately. + const auto limits_exceeded = [&] { + if (sessions_.size() >= options_.max_connections_total) return true; + + SocketAddressInfoTraits::Type* counts = addrLRU_.Peek(remote_address); + auto count = counts != nullptr ? counts->active_connections : 0; + return count >= options_.max_connections_per_host; + }; + + if (state_->busy || limits_exceeded()) { + // Endpoint is busy or the connection count is exceeded. The connection is + // refused. + if (state_->busy) STAT_INCREMENT(Stats, server_busy_count); + SendImmediateConnectionClose( + PathDescriptor{version, scid, dcid, local_address, remote_address}, + QuicError::ForTransport(NGTCP2_CONNECTION_REFUSED)); + // The packet was successfully processed, even if we did refuse the + // connection. + return true; + } + + // At this point, we start to set up the configuration for our local + // session. The second argument to the Config constructor here is the dcid. + // We pass the received scid here as the value because that is the value + // *this* session will use as it's outbound dcid. + auto config = Session::Config(Side::SERVER, + *this, + scid, + local_address, + remote_address, + version, + version, + dcid); + + // The this point, the config.scid and config.dcid represent *our* views of + // the CIDs. Specifically, config.dcid identifies the peer and config.scid + // identifies us. config.dcid should equal scid. config.scid should *not* + // equal dcid. + + const auto is_remote_address_validated = [&] { + auto info = addrLRU_.Peek(remote_address); + return info != nullptr ? info->validated : false; + }; + + // QUIC has address validation built in to the handshake but allows for + // an additional explicit validation request using RETRY frames. If we + // are using explicit validation, we check for the existence of a valid + // token in the packet. If one does not exist, we send a retry with + // a new token. If it does exist, and if it's valid, we grab the original + // cid and continue. + if (!is_remote_address_validated()) { + switch (hd.type) { + case NGTCP2_PKT_INITIAL: + // First, let's see if we need to do anything here. + + if (options_.validate_address) { + // If there is no token, generate and send one. + if (hd.token.len == 0) { + SendRetry(PathDescriptor{ + version, + dcid, + scid, + local_address, + remote_address, + }); + // We still consider this a successfully handled packet even + // if we send a retry. + return true; + } + + // We have two kinds of tokens, each prefixed with a different magic + // byte. + switch (hd.token.base[0]) { + case RetryToken::kTokenMagic: { + RetryToken token(hd.token.base, hd.token.len); + auto ocid = token.Validate( + version, + remote_address, + dcid, + options_.token_secret, + options_.retry_token_expiration * NGTCP2_SECONDS); + if (ocid == std::nullopt) { + // Invalid retry token was detected. Close the connection. + SendImmediateConnectionClose( + PathDescriptor{ + version, scid, dcid, local_address, remote_address}, + QuicError::ForTransport(NGTCP2_CONNECTION_REFUSED)); + // We still consider this a successfully handled packet even + // if we send a connection close. + return true; + } + + // The ocid is the original dcid that was encoded into the + // original retry packet sent to the client. We use it for + // validation. + config.ocid.emplace(ocid.value()); + config.retry_scid.emplace(dcid); + break; + } + case RegularToken::kTokenMagic: { + RegularToken token(hd.token.base, hd.token.len); + if (!token.Validate( + version, + remote_address, + options_.token_secret, + options_.token_expiration * NGTCP2_SECONDS)) { + SendRetry(PathDescriptor{ + version, + dcid, + scid, + local_address, + remote_address, + }); + // We still consider this to be a successfully handled packet + // if a retry is sent. + return true; + } + hd.token.base = nullptr; + hd.token.len = 0; + break; + } + default: { + SendRetry(PathDescriptor{ + version, + dcid, + scid, + local_address, + remote_address, + }); + return true; + } + } + + // Ok! If we've got this far, our token is valid! Which means our + // path to the remote address is valid (for now). Let's record that + // so we don't have to do this dance again for this endpoint + // instance. + addrLRU_.Upsert(remote_address)->validated = true; + } else if (hd.token.len > 0) { + // If validation is turned off and there is a token, that's weird. + // The peer should only have a token if we sent it to them and we + // wouldn't have sent it unless validation was turned on. Let's + // assume the peer is buggy or malicious and drop the packet on the + // floor. + return false; + } + break; + case NGTCP2_PKT_0RTT: + // If it's a 0RTT packet, we're always going to perform path + // validation no matter what. This is a bit unfortunate since + // ORTT is supposed to be, you know, 0RTT, but sending a retry + // forces a round trip... but if the remote address is not + // validated, there's a possibility that this 0RTT is forged + // or otherwise suspicious. Before we can do anything with it, + // we have to validate it. Keep in mind that this means the + // client needs to respond with a proper initial packet in + // order to proceed. + // TODO(@jasnell): Validate this further to ensure this is + // the correct behavior. + SendRetry(PathDescriptor{ + version, + dcid, + scid, + local_address, + remote_address, + }); + return true; + } + } + + return accept(config, std::move(store)); + }; + + // When a received packet contains a QUIC short header but cannot be matched + // to a known Session, it is either (a) garbage, (b) a valid packet for a + // connection we no longer have state for, or (c) a stateless reset. Because + // we do not yet know if we are going to process the packet, we need to try to + // quickly determine -- with as little cost as possible -- whether the packet + // contains a reset token. We do so by checking the final + // NGTCP2_STATELESS_RESET_TOKENLEN bytes in the packet to see if they match + // one of the known reset tokens previously given by the remote peer. If + // there's a match, then it's a reset token, if not, we move on the to the + // next check. It is very important that this check be as inexpensive as + // possible to avoid a DOS vector. + const auto maybeStatelessReset = [&](const CID& dcid, + const CID& scid, + Store& store, + const SocketAddress& local_address, + const SocketAddress& remote_address) { + if (options_.disable_stateless_reset || + store.length() < NGTCP2_STATELESS_RESET_TOKENLEN) + return false; + + ngtcp2_vec vec = store; + vec.base += vec.len; + vec.base -= NGTCP2_STATELESS_RESET_TOKENLEN; + + Session* session = nullptr; + auto it = token_map_.find(StatelessResetToken(vec.base)); + if (it != token_map_.end()) session = it->second; + + return session != nullptr ? receive(std::move(store), + local_address, + remote_address, + dcid, + scid) + : false; + }; + +#ifdef DEBUG + // When diagnostic packet loss is enabled, the packet will be randomly + // dropped. + if (UNLIKELY(is_diagnostic_packet_loss(options_.rx_loss))) { + // Simulating rx packet loss + return; + } +#endif // DEBUG + + // TODO(@jasnell): Implement blocklist support + // if (UNLIKELY(block_list_->Apply(remote_address))) { + // Debug(this, "Ignoring blocked remote address: %s", remote_address); + // return; + // } + + std::shared_ptr backing = env()->release_managed_buffer(buf); + if (UNLIKELY(!backing)) + return Destroy(CloseContext::RECEIVE_FAILURE, UV_ENOMEM); + + Store store(backing, buf.len, 0); + + ngtcp2_vec vec = store; + ngtcp2_version_cid pversion_cid; + + // This is our first check to see if the received data can be processed as a + // QUIC packet. If this fails, then the QUIC packet header is invalid and + // cannot be processed; all we can do is ignore it. If it succeeds, we have a + // valid QUIC header but there is still no guarantee that the packet can be + // successfully processed. + if (ngtcp2_pkt_decode_version_cid( + &pversion_cid, vec.base, vec.len, NGTCP2_MAX_CIDLEN) < 0) { + return; // Ignore the packet! + } + + // QUIC currently requires CID lengths of max NGTCP2_MAX_CIDLEN. The ngtcp2 + // API allows non-standard lengths, and we may want to allow non-standard + // lengths later. But for now, we're going to ignore any packet with a + // non-standard CID length. + if (pversion_cid.dcidlen > NGTCP2_MAX_CIDLEN || + pversion_cid.scidlen > NGTCP2_MAX_CIDLEN) + return; // Ignore the packet! + + // Each QUIC peer has two CIDs: The Source Connection ID (or scid), and the + // Destination Connection ID (or dcid). For each peer, the dcid is the CID + // identifying the other peer, and the scid is the CID identifying itself. + // That is, the client's scid is the server dcid; likewise the server's scid + // is the client's dcid. + // + // The dcid and scid below are the values sent from the peer received in the + // current packet, so in this case, dcid represents who the peer sent the + // packet too (this endpoint) and the scid represents who sent the packet. + CID dcid(pversion_cid.dcid, pversion_cid.dcidlen); + CID scid(pversion_cid.scid, pversion_cid.scidlen); + + // We index the current sessions by the dcid of the client. For initial + // packets, the dcid is some random value and the scid is omitted from the + // header (it uses what quic calls a "short header"). It is unlikely (but not + // impossible) that this randomly selected dcid will be in our index. If we do + // happen to have a collision, as unlikely as it is, ngtcp2 will do the right + // thing when it tries to process the packet so we really don't have to worry + // about it here. If the dcid is not known, the listener here will be nullptr. + // + // When the session is established, this peer will create it's own scid and + // will send that back to the remote peer to use as it's new dcid on + // subsequent packets. When that session is added, we will index it by the + // local scid, so as long as the client sends the subsequent packets with the + // right dcid, everything will just work. + + auto session = FindSession(dcid); + auto addr = local_address(); + + HandleScope handle_scope(env()->isolate()); + + // If a session is not found, there are four possible reasons: + // 1. The session has not been created yet + // 2. The session existed once but we've lost the local state for it + // 3. The packet is a stateless reset sent by the peer + // 4. This is a malicious or malformed packet. + if (!session) { + // No existing session. + + // Handle possible reception of a stateless reset token... If it is a + // stateless reset, the packet will be handled with no additional action + // necessary here. We want to return immediately without committing any + // further resources. + if (!scid && maybeStatelessReset(dcid, scid, store, addr, remote_address)) + return; // Stateless reset! Don't do any further processing. + + if (acceptInitialPacket(pversion_cid.version, + dcid, + scid, + std::move(store), + addr, + remote_address)) { + // Packet was successfully received. + STAT_INCREMENT(Stats, packets_received); + } + return; + } + + // If we got here, the dcid matched the scid of a known local session. Yay! + if (receive(std::move(store), addr, remote_address, dcid, scid)) + STAT_INCREMENT(Stats, packets_received); +} + +void Endpoint::PacketDone(int status) { + if (is_closed()) return; + state_->pending_callbacks--; + // Can we go ahead and close now? + if (state_->waiting_for_callbacks == 1) { + // MaybeDestroy potentially creates v8 handles so let's make sure + // we have a HandleScope on the stack. + HandleScope scope(env()->isolate()); + MaybeDestroy(); + } +} + +void Endpoint::IncrementSocketAddressCounter(const SocketAddress& addr) { + addrLRU_.Upsert(addr)->active_connections++; +} + +void Endpoint::DecrementSocketAddressCounter(const SocketAddress& addr) { + auto* counts = addrLRU_.Peek(addr); + if (counts != nullptr && counts->active_connections > 0) + counts->active_connections--; +} + +bool Endpoint::is_closed() const { + return !udp_; +} +bool Endpoint::is_closing() const { + return state_->closing; +} +bool Endpoint::is_listening() const { + return state_->listening; +} + +void Endpoint::MemoryInfo(MemoryTracker* tracker) const { + tracker->TrackField("options", options_); + tracker->TrackField("udp", udp_); + if (server_options_.has_value()) { + tracker->TrackField("server_options", server_options_.value()); + } + tracker->TrackField("token_map", token_map_); + tracker->TrackField("sessions", sessions_); + tracker->TrackField("cid_map", dcid_to_scid_); + tracker->TrackField("address LRU", addrLRU_); +} + +// ====================================================================================== +// Endpoint::SocketAddressInfoTraits + +bool Endpoint::SocketAddressInfoTraits::CheckExpired( + const SocketAddress& address, const Type& type) { + return (uv_hrtime() - type.timestamp) > kSocketAddressInfoTimeout; +} + +void Endpoint::SocketAddressInfoTraits::Touch(const SocketAddress& address, + Type* type) { + type->timestamp = uv_hrtime(); +} + +// ====================================================================================== +// JavaScript call outs + +void Endpoint::EmitNewSession(const BaseObjectPtr& session) { + if (!env()->can_call_into_js()) return; + CallbackScope scope(this); + session->set_wrapped(); + Local arg = session->object(); + + MakeCallback(BindingData::Get(env()).session_new_callback(), 1, &arg); +} + +void Endpoint::EmitClose(CloseContext context, int status) { + if (!env()->can_call_into_js()) return; + CallbackScope scope(this); + auto isolate = env()->isolate(); + Local argv[] = {Integer::New(isolate, static_cast(context)), + Integer::New(isolate, static_cast(status))}; + + MakeCallback( + BindingData::Get(env()).endpoint_close_callback(), arraysize(argv), argv); +} + +// ====================================================================================== +// Endpoint JavaScript API + +void Endpoint::CreateEndpoint(const FunctionCallbackInfo& args) { + CHECK(!args.IsConstructCall()); + auto env = Environment::GetCurrent(args); + CHECK(args[0]->IsObject()); + Options options; + if (!Options::From(env, args[0]).To(&options)) { + // There was an error. Just exit to propagate. + return; + } + + auto endpoint = Endpoint::Create(env, options); + if (endpoint) args.GetReturnValue().Set(endpoint->object()); +} + +void Endpoint::DoConnect(const FunctionCallbackInfo& args) { + auto env = Environment::GetCurrent(args); + Endpoint* endpoint; + ASSIGN_OR_RETURN_UNWRAP(&endpoint, args.Holder()); + + // args[0] is a SocketAddress + // args[1] is a Session OptionsObject (see session.cc) + // args[2] is an optional SessionTicket + + DCHECK(SocketAddressBase::HasInstance(env, args[0])); + SocketAddressBase* address; + ASSIGN_OR_RETURN_UNWRAP(&address, args[0]); + + DCHECK(args[1]->IsObject()); + Session::Options options; + if (!Session::Options::From(env, args[1]).To(&options)) { + // There was an error. Return to propagate + return; + } + + BaseObjectPtr session; + + if (!args[2]->IsUndefined()) { + SessionTicket ticket; + if (SessionTicket::FromV8Value(env, args[2]).To(&ticket)) { + session = endpoint->Connect(*address->address(), options, ticket); + } + } else { + session = endpoint->Connect(*address->address(), options); + } + + if (session) args.GetReturnValue().Set(session->object()); +} + +void Endpoint::DoListen(const FunctionCallbackInfo& args) { + Endpoint* endpoint; + ASSIGN_OR_RETURN_UNWRAP(&endpoint, args.Holder()); + auto env = Environment::GetCurrent(args); + + Session::Options options; + if (Session::Options::From(env, args[0]).To(&options)) { + endpoint->Listen(options); + } +} + +void Endpoint::MarkBusy(const FunctionCallbackInfo& args) { + Endpoint* endpoint; + ASSIGN_OR_RETURN_UNWRAP(&endpoint, args.Holder()); + endpoint->MarkAsBusy(args[0]->IsTrue()); +} + +void Endpoint::DoCloseGracefully(const FunctionCallbackInfo& args) { + Endpoint* endpoint; + ASSIGN_OR_RETURN_UNWRAP(&endpoint, args.Holder()); + endpoint->CloseGracefully(); +} + +void Endpoint::LocalAddress(const FunctionCallbackInfo& args) { + auto env = Environment::GetCurrent(args); + Endpoint* endpoint; + ASSIGN_OR_RETURN_UNWRAP(&endpoint, args.Holder()); + if (endpoint->is_closed()) return; + auto local_address = endpoint->local_address(); + auto addr = SocketAddressBase::Create( + env, std::make_shared(local_address)); + if (addr) args.GetReturnValue().Set(addr->object()); +} + +void Endpoint::Ref(const FunctionCallbackInfo& args) { + Endpoint* endpoint; + ASSIGN_OR_RETURN_UNWRAP(&endpoint, args.Holder()); + endpoint->udp_.Ref(); +} + +void Endpoint::Unref(const FunctionCallbackInfo& args) { + Endpoint* endpoint; + ASSIGN_OR_RETURN_UNWRAP(&endpoint, args.Holder()); + endpoint->udp_.Unref(); +} + +} // namespace quic +} // namespace node + +#endif // HAVE_OPENSSL && NODE_OPENSSL_HAS_QUIC diff --git a/src/quic/endpoint.h b/src/quic/endpoint.h new file mode 100644 index 00000000000000..37d764977ad208 --- /dev/null +++ b/src/quic/endpoint.h @@ -0,0 +1,487 @@ +#pragma once + +#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS +#if HAVE_OPENSSL && NODE_OPENSSL_HAS_QUIC + +#include +#include +#include +#include +#include +#include +#include +#include +#include "bindingdata.h" +#include "defs.h" +#include "packet.h" +#include "session.h" +#include "sessionticket.h" +#include "tokens.h" + +namespace node { +namespace quic { + +#define ENDPOINT_STATS(V) \ + V(CREATED_AT, created_at) \ + V(DESTROYED_AT, destroyed_at) \ + V(BYTES_RECEIVED, bytes_received) \ + V(BYTES_SENT, bytes_sent) \ + V(PACKETS_RECEIVED, packets_received) \ + V(PACKETS_SENT, packets_sent) \ + V(SERVER_SESSIONS, server_sessions) \ + V(CLIENT_SESSIONS, client_sessions) \ + V(SERVER_BUSY_COUNT, server_busy_count) \ + V(RETRY_COUNT, retry_count) \ + V(VERSION_NEGOTIATION_COUNT, version_negotiation_count) \ + V(STATELESS_RESET_COUNT, stateless_reset_count) \ + V(IMMEDIATE_CLOSE_COUNT, immediate_close_count) + +#define ENDPOINT_STATE(V) \ + /* Bound to the UDP port */ \ + V(BOUND, bound, uint8_t) \ + /* Receiving packets on the UDP port */ \ + V(RECEIVING, receiving, uint8_t) \ + /* Listening as a QUIC server */ \ + V(LISTENING, listening, uint8_t) \ + /* In the process of closing down */ \ + V(CLOSING, closing, uint8_t) \ + /* In the process of closing down, waiting for pending send callbacks */ \ + V(WAITING_FOR_CALLBACKS, waiting_for_callbacks, uint8_t) \ + /* Temporarily paused serving new initial requests */ \ + V(BUSY, busy, uint8_t) \ + /* The number of pending send callbacks */ \ + V(PENDING_CALLBACKS, pending_callbacks, size_t) + +// An Endpoint encapsulates the UDP local port binding and is responsible for +// sending and receiving QUIC packets. A single endpoint can act as both a QUIC +// client and server simultaneously. +class Endpoint final : public AsyncWrap, public Packet::Listener { + public: + STAT_STRUCT(ENDPOINT) + + struct State final { +#define V(_, name, type) type name; + ENDPOINT_STATE(V) +#undef V + }; + + static constexpr size_t DEFAULT_MAX_CONNECTIONS = + std::min(kMaxSizeT, static_cast(kMaxSafeJsInteger)); + static constexpr size_t DEFAULT_MAX_CONNECTIONS_PER_HOST = 100; + static constexpr size_t DEFAULT_MAX_SOCKETADDRESS_LRU_SIZE = + (DEFAULT_MAX_CONNECTIONS_PER_HOST * 10); + static constexpr size_t DEFAULT_MAX_STATELESS_RESETS = 10; + static constexpr size_t DEFAULT_MAX_RETRY_LIMIT = 10; + + // Endpoint configuration options + struct Options final : public MemoryRetainer { + // The local socket address to which the UDP port will be bound. The port + // may be 0 to have Node.js select an available port. IPv6 or IPv4 addresses + // may be used. When using IPv6, dual mode will be supported by default. + SocketAddress local_address; + + // Retry tokens issued by the Endpoint are time-limited. By default, retry + // tokens expire after DEFAULT_RETRYTOKEN_EXPIRATION *seconds*. This is an + // arbitrary choice that is not mandated by the QUIC specification; so we + // can choose any value that makes sense here. Retry tokens are sent to the + // client, which echoes them back to the server in a subsequent set of + // packets, which means the expiration must be set high enough to allow a + // reasonable round-trip time for the session TLS handshake to complete. + uint64_t retry_token_expiration = + RetryToken::QUIC_DEFAULT_RETRYTOKEN_EXPIRATION / NGTCP2_SECONDS; + + // Tokens issued using NEW_TOKEN are time-limited. By default, tokens expire + // after DEFAULT_TOKEN_EXPIRATION *seconds*. + uint64_t token_expiration = + RegularToken::QUIC_DEFAULT_REGULARTOKEN_EXPIRATION / NGTCP2_SECONDS; + + // Each Endpoint places limits on the number of concurrent connections from + // a single host, and the total number of concurrent connections allowed as + // a whole. These are set to fairly modest, and arbitrary defaults. We can + // set these to whatever we'd like. + uint64_t max_connections_per_host = DEFAULT_MAX_CONNECTIONS_PER_HOST; + uint64_t max_connections_total = DEFAULT_MAX_CONNECTIONS; + + // A stateless reset in QUIC is a discrete mechanism that one endpoint can + // use to communicate to a peer that it has lost whatever state it + // previously held about a session. Because generating a stateless reset + // consumes resources (even very modestly), they can be a DOS vector in + // which a malicious peer intentionally sends a large number of stateless + // reset eliciting packets. To protect against that risk, we limit the + // number of stateless resets that may be generated for a given remote host + // within a window of time. This is not mandated by QUIC, and the limit is + // arbitrary. We can set it to whatever we'd like. + uint64_t max_stateless_resets = DEFAULT_MAX_STATELESS_RESETS; + + // For tracking the number of connections per host, the number of stateless + // resets that have been sent, and tracking the path verification status of + // a remote host, we maintain an LRU cache of the most recently seen hosts. + // The address_lru_size parameter determines the size of that cache. The + // default is set modestly at 10 times the default max connections per host. + uint64_t address_lru_size = DEFAULT_MAX_SOCKETADDRESS_LRU_SIZE; + + // Similar to stateless resets, we enforce a limit on the number of retry + // packets that can be generated and sent for a remote host. Generating + // retry packets consumes a modest amount of resources and it's fairly + // trivial for a malcious peer to trigger generation of a large number of + // retries, so limiting them helps prevent a DOS vector. + uint64_t max_retries = DEFAULT_MAX_RETRY_LIMIT; + + // The max_payload_size is the maximum size of a serialized QUIC packet. It + // should always be set small enough to fit within a single MTU without + // fragmentation. The default is set by the QUIC specification at 1200. This + // value should not be changed unless you know for sure that the entire path + // supports a given MTU without fragmenting at any point in the path. + uint64_t max_payload_size = kDefaultMaxPacketLength; + + // The unacknowledged_packet_threshold is the maximum number of + // unacknowledged packets that an ngtcp2 session will accumulate before + // sending an acknowledgement. Setting this to 0 uses the ngtcp2 defaults, + // which is what most will want. The value can be changed to fine tune some + // of the performance characteristics of the session. This should only be + // changed if you have a really good reason for doing so. + uint64_t unacknowledged_packet_threshold = 0; + + // The validate_address parameter instructs the Endpoint to perform explicit + // address validation using retry tokens. This is strongly recommended and + // should only be disabled in trusted, closed environments as a performance + // optimization. + bool validate_address = true; + + // The stateless reset mechanism can be disabled. This should rarely ever be + // needed, and should only ever be done in trusted, closed environments as a + // performance optimization. + bool disable_stateless_reset = false; + +#ifdef DEBUG + // The rx_loss and tx_loss parameters are debugging tools that allow the + // Endpoint to simulate random packet loss. The value for each parameter is + // a value between 0.0 and 1.0 indicating a probability of packet loss. Each + // time a packet is sent or received, the packet loss bit is calculated and + // if true, the packet is silently dropped. This should only ever be used + // for testing and debugging. There is never a reason why rx_loss and + // tx_loss should ever be used in a production system. + double rx_loss = 0.0; + double tx_loss = 0.0; +#endif // DEBUG + + // There are several common congestion control algorithms that ngtcp2 uses + // to determine how it manages the flow control window: RENO, CUBIC, BBR, + // and BBR2. The details of how each works is not relevant here. The choice + // of which to use by default is arbitrary and we can choose whichever we'd + // like. Additional performance profiling will be needed to determine which + // is the better of the two for our needs. + ngtcp2_cc_algo cc_algorithm = NGTCP2_CC_ALGO_CUBIC; + + // By default, when Node.js starts, it will generate a reset_token_secret at + // random. This is a secret used in generating stateless reset tokens. In + // order for stateless reset to be effective, however, it is necessary to + // use a deterministic secret that persists across ngtcp2 endpoints and + // sessions. + TokenSecret reset_token_secret; + + // The secret used for generating new tokens. + TokenSecret token_secret; + + // When the local_address specifies an IPv6 local address to bind to, the + // ipv6_only parameter determines whether dual stack mode (supporting both + // IPv6 and IPv4) transparently is supported. This sets the UV_UDP_IPV6ONLY + // flag on the underlying uv_udp_t. + bool ipv6_only = false; + + uint32_t udp_receive_buffer_size = 0; + uint32_t udp_send_buffer_size = 0; + + // The UDP TTL configuration is the number of network hops a packet will be + // forwarded through. The default is 64. The value is in the range 1 to 255. + // Setting to 0 uses the default. + uint8_t udp_ttl = 0; + + void MemoryInfo(MemoryTracker* tracker) const override; + SET_MEMORY_INFO_NAME(Endpoint::Config) + SET_SELF_SIZE(Options) + + static v8::Maybe From(Environment* env, + v8::Local value); + }; + + bool HasInstance(Environment* env, v8::Local value); + static v8::Local GetConstructorTemplate( + Environment* env); + static void Initialize(Environment* env, v8::Local target); + static void RegisterExternalReferences(ExternalReferenceRegistry* registry); + + static BaseObjectPtr Create(Environment* env, + const Endpoint::Options& config); + + Endpoint(Environment* env, + v8::Local object, + const Endpoint::Options& options); + ~Endpoint() override; + + inline const Options& options() const { + return options_; + } + inline const Stats& stats() const { + return *stats_.Data(); + } + + // While the busy flag is set, the Endpoint will reject all initial packets + // with a SERVER_BUSY response. This allows us to build a circuit breaker + // directly into the implementation, explicitly signaling that the server is + // blocked when activity is too high. + void MarkAsBusy(bool on = true); + + // Use the endpoint's token secret to generate a new token. + v8::Maybe GenerateNewToken(uint32_t version, + const SocketAddress& remote_address); + + // Use the endpoint's reset token secret to generate a new stateless reset. + v8::Maybe GenerateNewStatelessResetToken( + uint8_t* token, const CID& cid) const; + + void AddSession(const CID& cid, BaseObjectPtr session); + void RemoveSession(const CID& cid); + BaseObjectPtr FindSession(const CID& cid); + + // A single session may be associated with multiple CIDs. + // AssociateCID registers the mapping both in the Endpoint and the inner + // Endpoint. + void AssociateCID(const CID& cid, const CID& scid); + void DisassociateCID(const CID& cid); + + // Associates a given stateless reset token with the session. This allows + // stateless reset tokens to be recognized and dispatched to the proper + // Endpoint and Session for processing. + void AssociateStatelessResetToken(const StatelessResetToken& token, + Session* session); + void DisassociateStatelessResetToken(const StatelessResetToken& token); + + void Send(BaseObjectPtr packet); + + // Generates and sends a retry packet. This is terminal for the connection. + // Retry packets are used to force explicit path validation by issuing a token + // to the peer that it must thereafter include in all subsequent initial + // packets. Upon receiving a retry packet, the peer must termination it's + // initial attempt to establish a connection and start a new attempt. + // + // Retry packets will only ever be generated by QUIC servers, and only if the + // QuicSocket is configured for explicit path validation. There is no way for + // a client to force a retry packet to be created. However, once a client + // determines that explicit path validation is enabled, it could attempt to + // DOS by sending a large number of malicious initial packets to intentionally + // ellicit retry packets (It can do so by intentionally sending initial + // packets that ignore the retry token). To help mitigate that risk, we limit + // the number of retries we send to a given remote endpoint. + void SendRetry(const PathDescriptor& options); + + // Sends a version negotiation packet. This is terminal for the connection and + // is sent only when a QUIC packet is received for an unsupported QUIC + // version. It is possible that a malicious packet triggered this so we need + // to be careful not to commit too many resources. + void SendVersionNegotiation(const PathDescriptor& options); + + // Possibly generates and sends a stateless reset packet. This is terminal for + // the connection. It is possible that a malicious packet triggered this so we + // need to be careful not to commit too many resources. + bool SendStatelessReset(const PathDescriptor& options, size_t source_len); + + // Shutdown a connection prematurely, before a Session is created. This should + // only be called at the start of a session before the crypto keys have been + // established. + void SendImmediateConnectionClose(const PathDescriptor& options, + QuicError error); + + // Listen for connections (act as a server). + void Listen(const Session::Options& options); + + // Create a new client-side Session. + BaseObjectPtr Connect( + const SocketAddress& remote_address, + const Session::Options& options, + std::optional sessionTicket = std::nullopt); + + // Returns the local address only if the endpoint has been bound. Before + // the endpoint is bound, or after it is closed, this will abort due to + // a failed check so it is important to check `is_closed()` before calling. + SocketAddress local_address() const; + + void MemoryInfo(MemoryTracker* tracker) const override; + SET_MEMORY_INFO_NAME(Endpoint) + SET_SELF_SIZE(Endpoint) + + private: +#define V(name, _) IDX_STATS_ENDPOINT_##name, + enum EndpointStatsIdx { ENDPOINT_STATS(V) IDX_STATS_ENDPOINT_COUNT }; +#undef V + +#define V(name, key, __) \ + IDX_STATE_ENDPOINT_##name = OffsetOf(&Endpoint::State::key), + enum EndpointStateIdx { ENDPOINT_STATE(V) }; +#undef V + + class UDP final : public MemoryRetainer { + public: + explicit UDP(Endpoint* endpoint); + ~UDP() override; + + int Bind(const Endpoint::Options& config); + int Start(); + void Stop(); + void Close(); + int Send(BaseObjectPtr packet); + + // Returns the local UDP socket address to which we are bound, + // or fail with an assert if we are not bound. + SocketAddress local_address() const; + + bool is_bound() const; + bool is_closed() const; + operator bool() const; + + void Ref(); + void Unref(); + + void MemoryInfo(node::MemoryTracker* tracker) const override; + SET_MEMORY_INFO_NAME(Endpoint::UDP) + SET_SELF_SIZE(UDP) + + private: + class Impl; + + static void CleanupHook(void* data); + + BaseObjectPtr impl_; + bool is_bound_ = false; + bool is_started_ = false; + }; + + bool is_closed() const; + bool is_closing() const; + bool is_listening() const; + + bool Start(); + + // Destroy the endpoint if... + // * There are no sessions, + // * There are no sent packets with pending done callbacks, and + // * We're not listening for new initial packets. + void MaybeDestroy(); + + // Specifies the general reason the endpoint is being destroyed. + enum class CloseContext { + CLOSE, + BIND_FAILURE, + START_FAILURE, + RECEIVE_FAILURE, + SEND_FAILURE, + LISTEN_FAILURE, + }; + + void Destroy(CloseContext context = CloseContext::CLOSE, int status = 0); + + // A graceful close will destroy the endpoint once all existing sessions + // have ended normally. Creating new sessions (inbound or outbound) will + // be prevented. + void CloseGracefully(); + + void Release(); + + void PacketDone(int status) override; + + void EmitNewSession(const BaseObjectPtr& session); + void EmitClose(CloseContext context, int status); + + void IncrementSocketAddressCounter(const SocketAddress& address); + void DecrementSocketAddressCounter(const SocketAddress& address); + + // JavaScript API + + // Create a new Endpoint instance. `createEndpoint()` is exposed as a method + // on the internalBinding('quic') object. + // @param Endpoint::Options options - Options to configure the Endpoint. + static void CreateEndpoint(const v8::FunctionCallbackInfo& args); + + // Methods on the Endpoint instance: + + // Create a new client Session on this endpoint. + // @param node::SocketAddress local_address - The local address to bind to. + // @param Session::Options options - Options to configure the Session. + // @param v8::ArrayBufferView session_ticket - The session ticket to use for + // the Session. + // @param v8::ArrayBufferView remote_transport_params - The remote transport + // params. + static void DoConnect(const v8::FunctionCallbackInfo& args); + + // Start listening as a QUIC server + // @param Session::Options options - Options to configure the Session. + static void DoListen(const v8::FunctionCallbackInfo& args); + + // Mark the Endpoint as busy, temporarily pausing handling of new initial + // packets. + // @param bool on - If true, mark the Endpoint as busy. + static void MarkBusy(const v8::FunctionCallbackInfo& args); + + // DoCloseGracefully is the signal that endpoint should close. Any packets + // that are already in the queue or in flight will be allowed to finish, but + // the EndpoingWrap will be otherwise no longer able to receive or send + // packets. + static void DoCloseGracefully( + const v8::FunctionCallbackInfo& args); + + // Get the local address of the Endpoint. + // @return node::SocketAddress - The local address of the Endpoint. + static void LocalAddress(const v8::FunctionCallbackInfo& args); + + // Ref() causes a listening Endpoint to keep the event loop active. + static void Ref(const v8::FunctionCallbackInfo& args); + + // Unref() allows the event loop to close even if the Endpoint is listening. + static void Unref(const v8::FunctionCallbackInfo& args); + + void Receive(const uv_buf_t& buf, const SocketAddress& from); + + AliasedStruct stats_; + AliasedStruct state_; + const Options options_; + UDP udp_; + + // Set if/when the endpoint is configured to listen. + std::optional server_options_{}; + + // A Session is generally identified by one or more CIDs. We use two + // maps for this rather than one to avoid creating a whole bunch of + // BaseObjectPtr references. The primary map (sessions_) just maps + // the original CID to the Session, the second map (dcid_to_scid_) + // maps the additional CIDs to the the primary. + CID::Map> sessions_; + CID::Map dcid_to_scid_; + StatelessResetToken::Map token_map_; + + struct SocketAddressInfoTraits final { + struct Type final { + size_t active_connections; + size_t reset_count; + size_t retry_count; + uint64_t timestamp; + bool validated; + }; + + static bool CheckExpired(const SocketAddress& address, const Type& type); + static void Touch(const SocketAddress& address, Type* type); + }; + + SocketAddressLRU addrLRU_; + + CloseContext close_context_ = CloseContext::CLOSE; + int close_status_ = 0; + + friend class UDP; + friend class Packet; +}; + +} // namespace quic +} // namespace node + +#endif // HAVE_OPENSSL && NODE_OPENSSL_HAS_QUIC +#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS diff --git a/src/quic/packet.cc b/src/quic/packet.cc index 27ba21d69a5e90..2277110aff56b8 100644 --- a/src/quic/packet.cc +++ b/src/quic/packet.cc @@ -286,7 +286,6 @@ BaseObjectPtr Packet::CreateConnectionClosePacket( BaseObjectPtr Packet::CreateImmediateConnectionClosePacket( Environment* env, Listener* listener, - const SocketAddress& destination, const PathDescriptor& path_descriptor, const QuicError& reason) { auto packet = Packet::Create(env, diff --git a/src/quic/packet.h b/src/quic/packet.h index 156174ebac8379..228f67d1e86187 100644 --- a/src/quic/packet.h +++ b/src/quic/packet.h @@ -130,7 +130,6 @@ class Packet final : public ReqWrap { static BaseObjectPtr CreateImmediateConnectionClosePacket( Environment* env, Listener* listener, - const SocketAddress& destination, const PathDescriptor& path_descriptor, const QuicError& reason); @@ -146,15 +145,15 @@ class Packet final : public ReqWrap { Listener* listener, const PathDescriptor& path_descriptor); + // Called when the packet is done being sent. + void Done(int status); + private: static BaseObjectPtr FromFreeList(Environment* env, std::shared_ptr data, Listener* listener, const SocketAddress& destination); - // Called when the packet is done being sent. - void Done(int status); - Listener* listener_; SocketAddress destination_; std::shared_ptr data_; diff --git a/src/quic/session.h b/src/quic/session.h new file mode 100644 index 00000000000000..6fdd7d584327d6 --- /dev/null +++ b/src/quic/session.h @@ -0,0 +1,94 @@ +#pragma once + +#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS +#if HAVE_OPENSSL && NODE_OPENSSL_HAS_QUIC + +#include +#include +#include +#include +#include "bindingdata.h" +#include "cid.h" +#include "data.h" + +namespace node { +namespace quic { + +class Endpoint; + +// TODO(@jasnell): This is a placeholder definition of Session that +// includes only the pieces needed by Endpoint right now. The full +// Session definition will be provided separately. +class Session final : public AsyncWrap { + public: + struct Config { + SocketAddress local_address; + SocketAddress remote_address; + std::optional ocid = std::nullopt; + std::optional retry_scid = std::nullopt; + + Config(Side side, + const Endpoint& endpoint, + const CID& scid, + const SocketAddress& local_address, + const SocketAddress& remote_address, + uint32_t min_quic_version, + uint32_t max_quic_version, + const CID& ocid = CID::kInvalid); + }; + struct Options : public MemoryRetainer { + SET_NO_MEMORY_INFO() + SET_MEMORY_INFO_NAME(Session::Options) + SET_SELF_SIZE(Options) + + static v8::Maybe From(Environment* env, + v8::Local value); + }; + + static BaseObjectPtr Create(BaseObjectPtr endpoint, + const Config& config, + const Options& options); + + enum class CloseMethod { + // Roundtrip through JavaScript, causing all currently opened streams + // to be closed. An attempt will be made to send a CONNECTION_CLOSE + // frame to the peer. If closing while within the ngtcp2 callback scope, + // sending the CONNECTION_CLOSE will be deferred until the scope exits. + DEFAULT, + // The connected peer will not be notified. + SILENT, + // Closing gracefully disables the ability to open or accept new streams for + // this Session. Existing streams are allowed to close naturally on their + // own. + // Once called, the Session will be immediately closed once there are no + // remaining streams. No notification is given to the connected peer that we + // are in a graceful closing state. A CONNECTION_CLOSE will be sent only + // once + // Close() is called. + GRACEFUL + }; + + Session(Environment* env, v8::Local object); + + void Close(CloseMethod method = CloseMethod::DEFAULT); + bool Receive(Store&& store, + const SocketAddress& local_address, + const SocketAddress& remote_address); + + bool is_destroyed() const; + bool is_server() const; + // The session is "wrapped" if it has been passed out to JavaScript + // via the New Session event or returned by the connect method. + void set_wrapped(); + SocketAddress remote_address() const; + + SET_NO_MEMORY_INFO() + SET_MEMORY_INFO_NAME(Session) + SET_SELF_SIZE(Session) +}; + +} // namespace quic +} // namespace node + +#endif // HAVE_OPENSSL && NODE_OPENSSL_HAS_QUIC +#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS diff --git a/src/quic/tlscontext.cc b/src/quic/tlscontext.cc index b75f5c56312ec0..ec14a732bc16fc 100644 --- a/src/quic/tlscontext.cc +++ b/src/quic/tlscontext.cc @@ -1,9 +1,6 @@ #if HAVE_OPENSSL && NODE_OPENSSL_HAS_QUIC #include "tlscontext.h" -#include "bindingdata.h" -#include "defs.h" -#include "transportparams.h" #include #include #include @@ -12,6 +9,9 @@ #include #include #include +#include "bindingdata.h" +#include "defs.h" +#include "transportparams.h" namespace node { @@ -555,6 +555,7 @@ ngtcp2_conn* TLSContext::getConnection(ngtcp2_crypto_conn_ref* ref) { Maybe TLSContext::Options::From(Environment* env, Local value) { if (value.IsEmpty() || !value->IsObject()) { + THROW_ERR_INVALID_ARG_TYPE(env, "options must be an object"); return Nothing(); }