Skip to content

Commit

Permalink
websocket: improve frame parsing (#3447)
Browse files Browse the repository at this point in the history
  • Loading branch information
tsctx authored Dec 19, 2024
1 parent 8044a43 commit c2933ef
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 28 deletions.
88 changes: 61 additions & 27 deletions lib/web/websocket/receiver.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const { PerMessageDeflate } = require('./permessage-deflate')

class ByteParser extends Writable {
#buffers = []
#fragmentsBytes = 0
#byteOffset = 0
#loop = false

Expand Down Expand Up @@ -208,16 +209,14 @@ class ByteParser extends Writable {
this.#state = parserStates.INFO
} else {
if (!this.#info.compressed) {
this.#fragments.push(body)
this.writeFragments(body)

// If the frame is not fragmented, a message has been received.
// If the frame is fragmented, it will terminate with a fin bit set
// and an opcode of 0 (continuation), therefore we handle that when
// parsing continuation frames, not here.
if (!this.#info.fragmented && this.#info.fin) {
const fullMessage = Buffer.concat(this.#fragments)
websocketMessageReceived(this.#handler, this.#info.binaryType, fullMessage)
this.#fragments.length = 0
websocketMessageReceived(this.#handler, this.#info.binaryType, this.consumeFragments())
}

this.#state = parserStates.INFO
Expand All @@ -228,7 +227,7 @@ class ByteParser extends Writable {
return
}

this.#fragments.push(data)
this.writeFragments(data)

if (!this.#info.fin) {
this.#state = parserStates.INFO
Expand All @@ -237,11 +236,10 @@ class ByteParser extends Writable {
return
}

websocketMessageReceived(this.#handler, this.#info.binaryType, Buffer.concat(this.#fragments))
websocketMessageReceived(this.#handler, this.#info.binaryType, this.consumeFragments())

this.#loop = true
this.#state = parserStates.INFO
this.#fragments.length = 0
this.run(callback)
})

Expand All @@ -265,34 +263,70 @@ class ByteParser extends Writable {
return emptyBuffer
}

if (this.#buffers[0].length === n) {
this.#byteOffset -= this.#buffers[0].length
this.#byteOffset -= n

const first = this.#buffers[0]

if (first.length > n) {
// replace with remaining buffer
this.#buffers[0] = first.subarray(n, first.length)
return first.subarray(0, n)
} else if (first.length === n) {
// prefect match
return this.#buffers.shift()
} else {
let offset = 0
// If Buffer.allocUnsafe is used, extra copies will be made because the offset is non-zero.
const buffer = Buffer.allocUnsafeSlow(n)
while (offset !== n) {
const next = this.#buffers[0]
const length = next.length

if (length + offset === n) {
buffer.set(this.#buffers.shift(), offset)
break
} else if (length + offset > n) {
buffer.set(next.subarray(0, n - offset), offset)
this.#buffers[0] = next.subarray(n - offset)
break
} else {
buffer.set(this.#buffers.shift(), offset)
offset += length
}
}

return buffer
}
}

writeFragments (fragment) {
this.#fragmentsBytes += fragment.length
this.#fragments.push(fragment)
}

consumeFragments () {
const fragments = this.#fragments

if (fragments.length === 1) {
// single fragment
this.#fragmentsBytes = 0
return fragments.shift()
}

const buffer = Buffer.allocUnsafe(n)
let offset = 0
// If Buffer.allocUnsafe is used, extra copies will be made because the offset is non-zero.
const output = Buffer.allocUnsafeSlow(this.#fragmentsBytes)

while (offset !== n) {
const next = this.#buffers[0]
const { length } = next

if (length + offset === n) {
buffer.set(this.#buffers.shift(), offset)
break
} else if (length + offset > n) {
buffer.set(next.subarray(0, n - offset), offset)
this.#buffers[0] = next.subarray(n - offset)
break
} else {
buffer.set(this.#buffers.shift(), offset)
offset += next.length
}
for (let i = 0; i < fragments.length; ++i) {
const buffer = fragments[i]
output.set(buffer, offset)
offset += buffer.length
}

this.#byteOffset -= n
this.#fragments = []
this.#fragmentsBytes = 0

return buffer
return output
}

parseCloseBody (data) {
Expand Down
2 changes: 1 addition & 1 deletion lib/web/websocket/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ function toArrayBuffer (buffer) {
if (buffer.byteLength === buffer.buffer.byteLength) {
return buffer.buffer
}
return buffer.buffer.slice(buffer.byteOffset, buffer.byteOffset + buffer.byteLength)
return new Uint8Array(buffer).buffer
}

/**
Expand Down

0 comments on commit c2933ef

Please sign in to comment.