Skip to content

Commit

Permalink
stream: improve Readable legacy compat
Browse files Browse the repository at this point in the history
The popular stream-shift library accesses internal Readable
state which has been modified.

Refs: googleapis/nodejs-storage#2391
Refs: mafintosh/stream-shift#10
PR-URL: nodejs#51470
  • Loading branch information
ronag committed Jan 15, 2024
1 parent ee61c2c commit fe8212b
Showing 1 changed file with 68 additions and 21 deletions.
89 changes: 68 additions & 21 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ const {
ObjectKeys,
ObjectSetPrototypeOf,
Promise,
Proxy,
SafeSet,
SafeWeakMap,
Symbol,
SymbolAsyncDispose,
SymbolAsyncIterator,
Expand Down Expand Up @@ -102,6 +104,8 @@ const kErroredValue = Symbol('kErroredValue');
const kDefaultEncodingValue = Symbol('kDefaultEncodingValue');
const kDecoderValue = Symbol('kDecoderValue');
const kEncodingValue = Symbol('kEncodingValue');
const kBuffer = Symbol('kBuffer');
const kBufferIndex = Symbol('kBufferIndex');

const kEnded = 1 << 9;
const kEndEmitted = 1 << 10;
Expand Down Expand Up @@ -276,8 +280,8 @@ function ReadableState(options, stream, isDuplex) {
getHighWaterMark(this, options, 'readableHighWaterMark', isDuplex) :
getDefaultHighWaterMark(false);

this.buffer = [];
this.bufferIndex = 0;
this[kBuffer] = [];
this[kBufferIndex] = 0;
this.length = 0;
this.pipes = [];

Expand Down Expand Up @@ -561,13 +565,13 @@ function addChunk(stream, state, chunk, addToFront) {
// Update the buffer info.
state.length += (state[kState] & kObjectMode) !== 0 ? 1 : chunk.length;
if (addToFront) {
if (state.bufferIndex > 0) {
state.buffer[--state.bufferIndex] = chunk;
if (state[kBufferIndex] > 0) {
state[kBuffer][--state[kBufferIndex]] = chunk;
} else {
state.buffer.unshift(chunk); // Slow path
state[kBuffer].unshift(chunk); // Slow path
}
} else {
state.buffer.push(chunk);
state[kBuffer].push(chunk);
}

if ((state[kState] & kNeedReadable) !== 0)
Expand All @@ -592,14 +596,14 @@ Readable.prototype.setEncoding = function(enc) {

// Iterate over current buffer to convert already stored Buffers:
let content = '';
for (const data of state.buffer.slice(state.bufferIndex)) {
for (const data of state[kBuffer].slice(state[kBufferIndex])) {
content += decoder.write(data);
}
state.buffer.length = 0;
state.bufferIndex = 0;
state[kBuffer].length = 0;
state[kBufferIndex] = 0;

if (content !== '')
state.buffer.push(content);
state[kBuffer].push(content);
state.length = content.length;
return this;
};
Expand Down Expand Up @@ -633,7 +637,7 @@ function howMuchToRead(n, state) {
if (NumberIsNaN(n)) {
// Only flow one buffer at a time.
if ((state[kState] & kFlowing) !== 0 && state.length)
return state.buffer[state.bufferIndex].length;
return state[kBuffer][state[kBufferIndex]].length;
return state.length;
}
if (n <= state.length)
Expand Down Expand Up @@ -790,7 +794,7 @@ function onEofChunk(stream, state) {
if (decoder) {
const chunk = decoder.end();
if (chunk && chunk.length) {
state.buffer.push(chunk);
state[kBuffer].push(chunk);
state.length += (state[kState] & kObjectMode) !== 0 ? 1 : chunk.length;
}
}
Expand Down Expand Up @@ -1459,7 +1463,7 @@ ObjectDefineProperties(Readable.prototype, {
__proto__: null,
enumerable: false,
get: function() {
return this._readableState && this._readableState.buffer;
return this._readableState && this._readableState[kBuffer];
},
},

Expand Down Expand Up @@ -1541,9 +1545,9 @@ ObjectDefineProperties(Readable.prototype, {
return this._readableState ? this._readableState.endEmitted : false;
},
},

});

const proxyCache = new SafeWeakMap();
ObjectDefineProperties(ReadableState.prototype, {
// Legacy getter for `pipesCount`.
pipesCount: {
Expand All @@ -1568,6 +1572,49 @@ ObjectDefineProperties(ReadableState.prototype, {
}
},
},

// Legacy compat
buffer: {
__proto__: null,
get() {
const r = this._readableState;
let proxy = proxyCache.get(r);
if (!proxy) {
proxy = new Proxy(r[kBuffer], {
__proto__: null,
get(target, name) {
if (name === 'length') {
return target[kBuffer].length - target[kBufferIndex];
}
if (name === '0') {
return target[kBuffer][target[kBufferIndex]];
}
if (target[kBufferIndex] > 0) {
target.splice(0, target[kBufferIndex]);
target[kBufferIndex] = 0;
}
return target[name];
},
set(target, name, value, receiver) {
if (target[kBufferIndex] > 0) {
target.splice(0, target[kBufferIndex]);
target[kBufferIndex] = 0;
}
target[name] = value;
return true;
}
});
proxyCache.set(r, proxy);
}
return proxy
},
},
bufferIndex: {
__proto__: null,
get() {
return 0;
}
}
});

// Exposed for testing purposes only.
Expand All @@ -1582,10 +1629,10 @@ function fromList(n, state) {
if (state.length === 0)
return null;

let idx = state.bufferIndex;
let idx = state[kBufferIndex];
let ret;

const buf = state.buffer;
const buf = state[kBuffer];
const len = buf.length;

if ((state[kState] & kObjectMode) !== 0) {
Expand Down Expand Up @@ -1665,13 +1712,13 @@ function fromList(n, state) {
}

if (idx === len) {
state.buffer.length = 0;
state.bufferIndex = 0;
state[kBuffer].length = 0;
state[kBufferIndex] = 0;
} else if (idx > 1024) {
state.buffer.splice(0, idx);
state.bufferIndex = 0;
state[kBuffer].splice(0, idx);
state[kBufferIndex] = 0;
} else {
state.bufferIndex = idx;
state[kBufferIndex] = idx;
}

return ret;
Expand Down

0 comments on commit fe8212b

Please sign in to comment.