Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: move out more h2 from core client #2860

Merged
merged 4 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
refactor: move out more h2 from core client
  • Loading branch information
ronag committed Feb 27, 2024
commit 079c08453fd1a66777161bb378f829d9249f5120
4 changes: 3 additions & 1 deletion lib/core/symbols.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,7 @@ module.exports = {
kHTTPConnVersion: Symbol('http connection version'),
kRetryHandlerDefaultRetry: Symbol('retry agent default retry'),
kConstruct: Symbol('constructable'),
kListeners: Symbol('listeners')
kListeners: Symbol('listeners'),
kHTTPWrite: Symbol('write'),
kMaxConcurrentStreams: Symbol('max concurrent streams')
}
4 changes: 3 additions & 1 deletion lib/dispatcher/client-h1.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ const {
kHTTPConnVersion,
kListeners,
kOnError,
kResume
kResume,
kHTTPWrite
} = require('../core/symbols.js')

const constants = require('../llhttp/constants.js')
Expand Down Expand Up @@ -645,6 +646,7 @@ function onParserTimeout (parser) {

async function connectH1 (client, socket) {
client[kHTTPConnVersion] = 'h1'
client[kHTTPWrite] = (...args) => writeH1(client, ...args)

if (!llhttpInstance) {
llhttpInstance = await llhttpPromise
Expand Down
46 changes: 27 additions & 19 deletions lib/dispatcher/client-h2.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ const {
kHTTPConnVersion,
kOnError,
// HTTP2
kHost,
kMaxConcurrentStreams,
kHTTP2Session,
kHTTP2SessionState,
kHTTP2CopyHeaders,
kResume
kResume,
kHTTPWrite
} = require('../core/symbols.js')

const kOpenStreams = Symbol('open streams')

// Experimental
let h2ExperimentalWarned = false

Expand All @@ -58,6 +60,10 @@ const {

async function connectH2 (client, socket) {
client[kHTTPConnVersion] = 'h2'
client[kHTTPWrite] = (...args) => {
// TODO (fix): return
writeH2(client, ...args)
}

if (!h2ExperimentalWarned) {
h2ExperimentalWarned = true
Expand All @@ -68,9 +74,10 @@ async function connectH2 (client, socket) {

const session = http2.connect(client[kUrl], {
createConnection: () => socket,
peerMaxConcurrentStreams: client[kHTTP2SessionState].maxConcurrentStreams
peerMaxConcurrentStreams: client[kMaxConcurrentStreams]
})

session[kOpenStreams] = 0
session[kClient] = client
session[kSocket] = socket
session.on('error', onHttp2SessionError)
Expand Down Expand Up @@ -217,9 +224,10 @@ function writeH2 (client, request) {

/** @type {import('node:http2').ClientHttp2Stream} */
let stream
const h2State = client[kHTTP2SessionState]

headers[HTTP2_HEADER_AUTHORITY] = host || client[kHost]
const { hostname, port } = client[kUrl]

headers[HTTP2_HEADER_AUTHORITY] = host || `${hostname}${port ? `:${port}` : ''}`
headers[HTTP2_HEADER_METHOD] = method

try {
Expand All @@ -235,8 +243,8 @@ function writeH2 (client, request) {
if (stream != null) {
util.destroy(stream, err)

h2State.openStreams -= 1
if (h2State.openStreams === 0) {
session[kOpenStreams] -= 1
if (session[kOpenStreams] === 0) {
session.unref()
}
}
Expand All @@ -257,18 +265,18 @@ function writeH2 (client, request) {

if (stream.id && !stream.pending) {
request.onUpgrade(null, null, stream)
++h2State.openStreams
++session[kOpenStreams]
} else {
stream.once('ready', () => {
request.onUpgrade(null, null, stream)
++h2State.openStreams
++session[kOpenStreams]
})
}

stream.once('close', () => {
h2State.openStreams -= 1
session[kOpenStreams] -= 1
// TODO(HTTP/2): unref only if current streams count is 0
if (h2State.openStreams === 0) session.unref()
if (session[kOpenStreams] === 0) session.unref()
})

return true
Expand Down Expand Up @@ -348,7 +356,7 @@ function writeH2 (client, request) {
}

// Increment counter as we have new several streams open
++h2State.openStreams
++session[kOpenStreams]

stream.once('response', headers => {
const { [HTTP2_HEADER_STATUS]: statusCode, ...realHeaders } = headers
Expand All @@ -371,8 +379,8 @@ function writeH2 (client, request) {
// Stream is closed or half-closed-remote (6), decrement counter and cleanup
// It does not have sense to continue working with the stream as we do not
// have yet RST_STREAM support on client-side
h2State.openStreams -= 1
if (h2State.openStreams === 0) {
session[kOpenStreams] -= 1
if (session[kOpenStreams] === 0) {
session.unref()
}

Expand All @@ -388,16 +396,16 @@ function writeH2 (client, request) {
})

stream.once('close', () => {
h2State.openStreams -= 1
session[kOpenStreams] -= 1
// TODO(HTTP/2): unref only if current streams count is 0
if (h2State.openStreams === 0) {
if (session[kOpenStreams] === 0) {
session.unref()
}
})

stream.once('error', function (err) {
if (client[kHTTP2Session] && !client[kHTTP2Session].destroyed && !this.closed && !this.destroyed) {
h2State.streams -= 1
session[kOpenStreams] -= 1
util.destroy(stream, err)
}
})
Expand All @@ -407,7 +415,7 @@ function writeH2 (client, request) {
errorRequest(client, request, err)

if (client[kHTTP2Session] && !client[kHTTP2Session].destroyed && !this.closed && !this.destroyed) {
h2State.streams -= 1
session[kOpenStreams] -= 1
util.destroy(stream, err)
}
})
Expand Down
39 changes: 11 additions & 28 deletions lib/dispatcher/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,16 @@ const {
kMaxResponseSize,
kHTTPConnVersion,
kOnError,
kHTTPWrite,
// HTTP2
kHost,
kMaxConcurrentStreams,
kHTTP2Session,
kHTTP2SessionState,
kHTTP2BuildRequest,
kHTTP1BuildRequest,
kResume
} = require('../core/symbols.js')
const { connectH1, writeH1, resumeH1 } = require('./client-h1.js')
const { connectH2, writeH2 } = require('./client-h2.js')
const { connectH1, resumeH1 } = require('./client-h1.js')
const { connectH2 } = require('./client-h2.js')

const kClosedResolve = Symbol('kClosedResolve')

Expand Down Expand Up @@ -107,8 +107,8 @@ class Client extends DispatcherBase {
autoSelectFamily,
autoSelectFamilyAttemptTimeout,
// h2
allowH2,
maxConcurrentStreams
maxConcurrentStreams,
allowH2
} = {}) {
super()

Expand Down Expand Up @@ -237,17 +237,10 @@ class Client extends DispatcherBase {
this[kClosedResolve] = null
this[kMaxResponseSize] = maxResponseSize > -1 ? maxResponseSize : -1
this[kHTTPConnVersion] = null
this[kMaxConcurrentStreams] = maxConcurrentStreams != null ? maxConcurrentStreams : 100 // Max peerConcurrentStreams for a Node h2 server

// HTTP/2
this[kHTTP2Session] = null
this[kHTTP2SessionState] = !allowH2
? null
: {
// streams: null, // Fixed queue of streams - For future support of `push`
openStreams: 0, // Keep track of them to decide whether or not unref the session
maxConcurrentStreams: maxConcurrentStreams != null ? maxConcurrentStreams : 100 // Max peerConcurrentStreams for a Node h2 server
}
this[kHost] = `${this[kUrl].hostname}${this[kUrl].port ? `:${this[kUrl].port}` : ''}`

// kQueue is built up of 3 sections separated by
// the kRunningIdx and kPendingIdx indices.
Expand Down Expand Up @@ -309,6 +302,7 @@ class Client extends DispatcherBase {
[kDispatch] (opts, handler) {
const origin = opts.origin || this[kUrl].origin

// TODO (fix): Why do these need tp be
const request = this[kHTTPConnVersion] === 'h2'
? Request[kHTTP2BuildRequest](origin, opts, handler)
: Request[kHTTP1BuildRequest](origin, opts, handler)
Expand Down Expand Up @@ -361,13 +355,12 @@ class Client extends DispatcherBase {
}

if (this[kHTTP2Session] != null) {
util.destroy(this[kHTTP2Session], err)
this[kHTTP2Session].destroy(err)
this[kHTTP2Session] = null
this[kHTTP2SessionState] = null
}

if (this[kSocket]) {
util.destroy(this[kSocket].on('close', callback), err)
this[kSocket].destroy(err).on('close', callback)
} else {
queueMicrotask(callback)
}
Expand Down Expand Up @@ -652,24 +645,14 @@ function _resume (client, sync) {
}
}

if (!request.aborted && write(client, request)) {
if (!request.aborted && client[kHTTPWrite](request)) {
client[kPendingIdx]++
} else {
client[kQueue].splice(client[kPendingIdx], 1)
}
}
}

function write (client, request) {
if (client[kHTTPConnVersion] === 'h2') {
// TODO (fix): Why does this not return the value
// from writeH2.
writeH2(client, request)
} else {
return writeH1(client, request)
}
}

function errorRequest (client, request, err) {
try {
request.onError(err)
Expand Down
Loading