-
Notifications
You must be signed in to change notification settings - Fork 30k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
PR-URL: #53073 Reviewed-By: Marco Ippolito <[email protected]> Reviewed-By: Matthew Aitken <[email protected]> Reviewed-By: Trivikram Kamat <[email protected]> Reviewed-By: Rafael Gonzaga <[email protected]> Reviewed-By: Matteo Collina <[email protected]>
- Loading branch information
1 parent
494fa54
commit b941992
Showing
12 changed files
with
580 additions
and
135 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
'use strict' | ||
|
||
const { createInflateRaw, Z_DEFAULT_WINDOWBITS } = require('node:zlib') | ||
const { isValidClientWindowBits } = require('./util') | ||
|
||
const tail = Buffer.from([0x00, 0x00, 0xff, 0xff]) | ||
const kBuffer = Symbol('kBuffer') | ||
const kLength = Symbol('kLength') | ||
|
||
class PerMessageDeflate { | ||
/** @type {import('node:zlib').InflateRaw} */ | ||
#inflate | ||
|
||
#options = {} | ||
|
||
constructor (extensions) { | ||
this.#options.serverNoContextTakeover = extensions.has('server_no_context_takeover') | ||
this.#options.serverMaxWindowBits = extensions.get('server_max_window_bits') | ||
} | ||
|
||
decompress (chunk, fin, callback) { | ||
// An endpoint uses the following algorithm to decompress a message. | ||
// 1. Append 4 octets of 0x00 0x00 0xff 0xff to the tail end of the | ||
// payload of the message. | ||
// 2. Decompress the resulting data using DEFLATE. | ||
|
||
if (!this.#inflate) { | ||
let windowBits = Z_DEFAULT_WINDOWBITS | ||
|
||
if (this.#options.serverMaxWindowBits) { // empty values default to Z_DEFAULT_WINDOWBITS | ||
if (!isValidClientWindowBits(this.#options.serverMaxWindowBits)) { | ||
callback(new Error('Invalid server_max_window_bits')) | ||
return | ||
} | ||
|
||
windowBits = Number.parseInt(this.#options.serverMaxWindowBits) | ||
} | ||
|
||
this.#inflate = createInflateRaw({ windowBits }) | ||
this.#inflate[kBuffer] = [] | ||
this.#inflate[kLength] = 0 | ||
|
||
this.#inflate.on('data', (data) => { | ||
this.#inflate[kBuffer].push(data) | ||
this.#inflate[kLength] += data.length | ||
}) | ||
|
||
this.#inflate.on('error', (err) => { | ||
this.#inflate = null | ||
callback(err) | ||
}) | ||
} | ||
|
||
this.#inflate.write(chunk) | ||
if (fin) { | ||
this.#inflate.write(tail) | ||
} | ||
|
||
this.#inflate.flush(() => { | ||
const full = Buffer.concat(this.#inflate[kBuffer], this.#inflate[kLength]) | ||
|
||
this.#inflate[kBuffer].length = 0 | ||
this.#inflate[kLength] = 0 | ||
|
||
callback(null, full) | ||
}) | ||
} | ||
} | ||
|
||
module.exports = { PerMessageDeflate } |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ const { | |
} = require('./util') | ||
const { WebsocketFrameSend } = require('./frame') | ||
const { closeWebSocketConnection } = require('./connection') | ||
const { PerMessageDeflate } = require('./permessage-deflate') | ||
|
||
// This code was influenced by ws released under the MIT license. | ||
// Copyright (c) 2011 Einar Otto Stangvik <[email protected]> | ||
|
@@ -33,10 +34,18 @@ class ByteParser extends Writable { | |
#info = {} | ||
#fragments = [] | ||
|
||
constructor (ws) { | ||
/** @type {Map<string, PerMessageDeflate>} */ | ||
#extensions | ||
|
||
constructor (ws, extensions) { | ||
super() | ||
|
||
this.ws = ws | ||
this.#extensions = extensions == null ? new Map() : extensions | ||
|
||
if (this.#extensions.has('permessage-deflate')) { | ||
this.#extensions.set('permessage-deflate', new PerMessageDeflate(extensions)) | ||
} | ||
} | ||
|
||
/** | ||
|
@@ -91,7 +100,16 @@ class ByteParser extends Writable { | |
// the negotiated extensions defines the meaning of such a nonzero | ||
// value, the receiving endpoint MUST _Fail the WebSocket | ||
// Connection_. | ||
if (rsv1 !== 0 || rsv2 !== 0 || rsv3 !== 0) { | ||
// This document allocates the RSV1 bit of the WebSocket header for | ||
// PMCEs and calls the bit the "Per-Message Compressed" bit. On a | ||
// WebSocket connection where a PMCE is in use, this bit indicates | ||
// whether a message is compressed or not. | ||
if (rsv1 !== 0 && !this.#extensions.has('permessage-deflate')) { | ||
failWebsocketConnection(this.ws, 'Expected RSV1 to be clear.') | ||
return | ||
} | ||
|
||
if (rsv2 !== 0 || rsv3 !== 0) { | ||
failWebsocketConnection(this.ws, 'RSV1, RSV2, RSV3 must be clear') | ||
return | ||
} | ||
|
@@ -122,7 +140,7 @@ class ByteParser extends Writable { | |
return | ||
} | ||
|
||
if (isContinuationFrame(opcode) && this.#fragments.length === 0) { | ||
if (isContinuationFrame(opcode) && this.#fragments.length === 0 && !this.#info.compressed) { | ||
failWebsocketConnection(this.ws, 'Unexpected continuation frame') | ||
return | ||
} | ||
|
@@ -138,6 +156,7 @@ class ByteParser extends Writable { | |
|
||
if (isTextBinaryFrame(opcode)) { | ||
this.#info.binaryType = opcode | ||
this.#info.compressed = rsv1 !== 0 | ||
} | ||
|
||
this.#info.opcode = opcode | ||
|
@@ -185,21 +204,50 @@ class ByteParser extends Writable { | |
|
||
if (isControlFrame(this.#info.opcode)) { | ||
this.#loop = this.parseControlFrame(body) | ||
this.#state = parserStates.INFO | ||
} else { | ||
this.#fragments.push(body) | ||
|
||
// If the frame is not fragmented, a message has been received. | ||
// If the frame is fragmented, it will terminate with a fin bit set | ||
// and an opcode of 0 (continuation), therefore we handle that when | ||
// parsing continuation frames, not here. | ||
if (!this.#info.fragmented && this.#info.fin) { | ||
const fullMessage = Buffer.concat(this.#fragments) | ||
websocketMessageReceived(this.ws, this.#info.binaryType, fullMessage) | ||
this.#fragments.length = 0 | ||
if (!this.#info.compressed) { | ||
this.#fragments.push(body) | ||
|
||
// If the frame is not fragmented, a message has been received. | ||
// If the frame is fragmented, it will terminate with a fin bit set | ||
// and an opcode of 0 (continuation), therefore we handle that when | ||
// parsing continuation frames, not here. | ||
if (!this.#info.fragmented && this.#info.fin) { | ||
const fullMessage = Buffer.concat(this.#fragments) | ||
websocketMessageReceived(this.ws, this.#info.binaryType, fullMessage) | ||
this.#fragments.length = 0 | ||
} | ||
|
||
this.#state = parserStates.INFO | ||
} else { | ||
this.#extensions.get('permessage-deflate').decompress(body, this.#info.fin, (error, data) => { | ||
if (error) { | ||
closeWebSocketConnection(this.ws, 1007, error.message, error.message.length) | ||
return | ||
} | ||
|
||
this.#fragments.push(data) | ||
|
||
if (!this.#info.fin) { | ||
this.#state = parserStates.INFO | ||
this.#loop = true | ||
this.run(callback) | ||
return | ||
} | ||
|
||
websocketMessageReceived(this.ws, this.#info.binaryType, Buffer.concat(this.#fragments)) | ||
|
||
this.#loop = true | ||
this.#state = parserStates.INFO | ||
this.run(callback) | ||
this.#fragments.length = 0 | ||
}) | ||
|
||
this.#loop = false | ||
break | ||
} | ||
} | ||
|
||
this.#state = parserStates.INFO | ||
} | ||
} | ||
} | ||
|
@@ -333,7 +381,6 @@ class ByteParser extends Writable { | |
this.ws[kReadyState] = states.CLOSING | ||
this.ws[kReceivedClose] = true | ||
|
||
this.end() | ||
return false | ||
} else if (opcode === opcodes.PING) { | ||
// Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
'use strict' | ||
|
||
const { WebsocketFrameSend } = require('./frame') | ||
const { opcodes, sendHints } = require('./constants') | ||
|
||
/** @type {Uint8Array} */ | ||
const FastBuffer = Buffer[Symbol.species] | ||
|
||
class SendQueue { | ||
#queued = new Set() | ||
#size = 0 | ||
|
||
/** @type {import('net').Socket} */ | ||
#socket | ||
|
||
constructor (socket) { | ||
this.#socket = socket | ||
} | ||
|
||
add (item, cb, hint) { | ||
if (hint !== sendHints.blob) { | ||
const data = clone(item, hint) | ||
|
||
if (this.#size === 0) { | ||
this.#dispatch(data, cb, hint) | ||
} else { | ||
this.#queued.add([data, cb, true, hint]) | ||
this.#size++ | ||
|
||
this.#run() | ||
} | ||
|
||
return | ||
} | ||
|
||
const promise = item.arrayBuffer() | ||
const queue = [null, cb, false, hint] | ||
promise.then((ab) => { | ||
queue[0] = clone(ab, hint) | ||
queue[2] = true | ||
|
||
this.#run() | ||
}) | ||
|
||
this.#queued.add(queue) | ||
this.#size++ | ||
} | ||
|
||
#run () { | ||
for (const queued of this.#queued) { | ||
const [data, cb, done, hint] = queued | ||
|
||
if (!done) return | ||
|
||
this.#queued.delete(queued) | ||
this.#size-- | ||
|
||
this.#dispatch(data, cb, hint) | ||
} | ||
} | ||
|
||
#dispatch (data, cb, hint) { | ||
const frame = new WebsocketFrameSend() | ||
const opcode = hint === sendHints.string ? opcodes.TEXT : opcodes.BINARY | ||
|
||
frame.frameData = data | ||
const buffer = frame.createFrame(opcode) | ||
|
||
this.#socket.write(buffer, cb) | ||
} | ||
} | ||
|
||
function clone (data, hint) { | ||
switch (hint) { | ||
case sendHints.string: | ||
return Buffer.from(data) | ||
case sendHints.arrayBuffer: | ||
case sendHints.blob: | ||
return new FastBuffer(data) | ||
case sendHints.typedArray: | ||
return Buffer.copyBytesFrom(data) | ||
} | ||
} | ||
|
||
module.exports = { SendQueue } |
Oops, something went wrong.