Skip to content

Commit

Permalink
fix: refactor createReadStream to remove unnecessary stream (#2153)
Browse files Browse the repository at this point in the history
* fix: refactor createReadStream to remove unnecessary stream

* remove instantiation of HashStreamValidator until necessary

* turn on keepAlive for createReadStream

* fix unit test

* Update src/hash-stream-validator.ts

Co-authored-by: Daniel Bankhead <[email protected]>

* Update src/hash-stream-validator.ts

Co-authored-by: Daniel Bankhead <[email protected]>

* Update src/hash-stream-validator.ts

Co-authored-by: Daniel Bankhead <[email protected]>

* Update src/hash-stream-validator.ts

Co-authored-by: Daniel Bankhead <[email protected]>

* Update src/file.ts

Co-authored-by: Daniel Bankhead <[email protected]>

* Update src/hash-stream-validator.ts

Co-authored-by: Daniel Bankhead <[email protected]>

* Update src/hash-stream-validator.ts

Co-authored-by: Daniel Bankhead <[email protected]>

* Update src/file.ts

Co-authored-by: Daniel Bankhead <[email protected]>

* Update src/file.ts

Co-authored-by: Daniel Bankhead <[email protected]>

* Update src/file.ts

Co-authored-by: Daniel Bankhead <[email protected]>

* Update src/file.ts

Co-authored-by: Daniel Bankhead <[email protected]>

* fix merge / logic problems

---------

Co-authored-by: Daniel Bankhead <[email protected]>
  • Loading branch information
ddelgrosso1 and danielbankhead authored Mar 2, 2023
1 parent 4006dfe commit 2c97310
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 155 deletions.
244 changes: 94 additions & 150 deletions src/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ export interface SetStorageClassCallback {
(err?: Error | null, apiResponse?: Metadata): void;
}

class RequestError extends Error {
export class RequestError extends Error {
code?: string;
errors?: Error[];
}
Expand Down Expand Up @@ -1387,10 +1387,8 @@ class File extends ServiceObject<File> {

const throughStream = new PassThroughShim();

let isCompressed = true;
let crc32c = true;
let md5 = false;
let safeToValidate = true;

if (typeof options.validation === 'string') {
const value = options.validation.toLowerCase().trim();
Expand All @@ -1415,6 +1413,98 @@ class File extends ServiceObject<File> {
md5 = false;
}

const onComplete = (err: Error | null) => {
if (err) {
throughStream.destroy(err);
}
};

// We listen to the response event from the request stream so that we
// can...
//
// 1) Intercept any data from going to the user if an error occurred.
// 2) Calculate the hashes from the http.IncomingMessage response
// stream,
// which will return the bytes from the source without decompressing
// gzip'd content. We then send it through decompressed, if
// applicable, to the user.
const onResponse = (
err: Error | null,
_body: ResponseBody,
rawResponseStream: Metadata
) => {
if (err) {
// Get error message from the body.
this.getBufferFromReadable(rawResponseStream).then(body => {
err.message = body.toString('utf8');
throughStream.destroy(err);
});

return;
}

const headers = rawResponseStream.toJSON().headers;
const isCompressed = headers['content-encoding'] === 'gzip';
const hashes: {crc32c?: string; md5?: string} = {};

// The object is safe to validate if:
// 1. It was stored gzip and returned to us gzip OR
// 2. It was never stored as gzip
const safeToValidate =
(headers['x-goog-stored-content-encoding'] === 'gzip' &&
isCompressed) ||
headers['x-goog-stored-content-encoding'] === 'identity';

const transformStreams: Transform[] = [];

if (shouldRunValidation) {
// The x-goog-hash header should be set with a crc32c and md5 hash.
// ex: headers['x-goog-hash'] = 'crc32c=xxxx,md5=xxxx'
if (typeof headers['x-goog-hash'] === 'string') {
headers['x-goog-hash']
.split(',')
.forEach((hashKeyValPair: string) => {
const delimiterIndex = hashKeyValPair.indexOf('=');
const hashType = hashKeyValPair.substr(0, delimiterIndex);
const hashValue = hashKeyValPair.substr(delimiterIndex + 1);
hashes[hashType as 'crc32c' | 'md5'] = hashValue;
});
}

validateStream = new HashStreamValidator({
crc32c,
md5,
crc32cGenerator: this.crc32cGenerator,
crc32cExpected: hashes.crc32c,
md5Expected: hashes.md5,
});
}

if (md5 && !hashes.md5) {
const hashError = new RequestError(
FileExceptionMessages.MD5_NOT_AVAILABLE
);
hashError.code = 'MD5_NOT_AVAILABLE';
throughStream.destroy(hashError);
return;
}

if (safeToValidate && shouldRunValidation && validateStream) {
transformStreams.push(validateStream);
}

if (isCompressed && options.decompress) {
transformStreams.push(zlib.createGunzip());
}

pipeline(
rawResponseStream,
...(transformStreams as [Transform]),
throughStream,
onComplete
);
};

// Authenticate the request, then pipe the remote API request to the stream
// returned to the user.
const makeRequest = () => {
Expand Down Expand Up @@ -1445,14 +1535,11 @@ class File extends ServiceObject<File> {
}

const reqOpts = {
forever: false,
uri: '',
headers,
qs: query,
};

const hashes: {crc32c?: string; md5?: string} = {};

this.requestStream(reqOpts)
.on('error', err => {
throughStream.destroy(err);
Expand All @@ -1462,151 +1549,7 @@ class File extends ServiceObject<File> {
util.handleResp(null, res, null, onResponse);
})
.resume();

// We listen to the response event from the request stream so that we
// can...
//
// 1) Intercept any data from going to the user if an error occurred.
// 2) Calculate the hashes from the http.IncomingMessage response
// stream,
// which will return the bytes from the source without decompressing
// gzip'd content. We then send it through decompressed, if
// applicable, to the user.
const onResponse = (
err: Error | null,
_body: ResponseBody,
rawResponseStream: Metadata
) => {
if (err) {
// Get error message from the body.
this.getBufferFromReadable(rawResponseStream).then(body => {
err.message = body.toString('utf8');
throughStream.destroy(err);
});

return;
}

rawResponseStream.on('error', onComplete);

const headers = rawResponseStream.toJSON().headers;
isCompressed = headers['content-encoding'] === 'gzip';

// The object is safe to validate if:
// 1. It was stored gzip and returned to us gzip OR
// 2. It was never stored as gzip
safeToValidate =
(headers['x-goog-stored-content-encoding'] === 'gzip' &&
isCompressed) ||
headers['x-goog-stored-content-encoding'] === 'identity';

const transformStreams: Transform[] = [];

if (shouldRunValidation) {
// The x-goog-hash header should be set with a crc32c and md5 hash.
// ex: headers['x-goog-hash'] = 'crc32c=xxxx,md5=xxxx'
if (typeof headers['x-goog-hash'] === 'string') {
headers['x-goog-hash']
.split(',')
.forEach((hashKeyValPair: string) => {
const delimiterIndex = hashKeyValPair.indexOf('=');
const hashType = hashKeyValPair.substr(0, delimiterIndex);
const hashValue = hashKeyValPair.substr(delimiterIndex + 1);
hashes[hashType as 'crc32c' | 'md5'] = hashValue;
});
}

validateStream = new HashStreamValidator({
crc32c,
md5,
crc32cGenerator: this.crc32cGenerator,
});

transformStreams.push(validateStream);
}

if (isCompressed && options.decompress) {
transformStreams.push(zlib.createGunzip());
}

const handoffStream = new PassThrough({
final: cb => {
// Preserving `onComplete`'s ability to
// destroy `throughStream` before pipeline
// attempts to.
onComplete(null)
.then(() => {
cb();
})
.catch(cb);
},
});

pipeline(
rawResponseStream,
...(transformStreams as [Transform]),
handoffStream,
throughStream,
onComplete
);
};

// This is hooked to the `complete` event from the request stream. This is
// our chance to validate the data and let the user know if anything went
// wrong.
let onCompleteCalled = false;
const onComplete = async (err: Error | null) => {
if (onCompleteCalled) {
return;
}

onCompleteCalled = true;

if (err) {
throughStream.destroy(err);
return;
}

if (rangeRequest || !shouldRunValidation) {
return;
}

// If we're doing validation, assume the worst-- a data integrity
// mismatch. If not, these tests won't be performed, and we can assume
// the best.
// We must check if the server decompressed the data on serve because hash
// validation is not possible in this case.
let failed = (crc32c || md5) && safeToValidate;
if (validateStream && safeToValidate) {
if (crc32c && hashes.crc32c) {
failed = !validateStream.test('crc32c', hashes.crc32c);
}

if (md5 && hashes.md5) {
failed = !validateStream.test('md5', hashes.md5);
}
}

if (md5 && !hashes.md5) {
const hashError = new RequestError(
FileExceptionMessages.MD5_NOT_AVAILABLE
);
hashError.code = 'MD5_NOT_AVAILABLE';

throughStream.destroy(hashError);
} else if (failed) {
const mismatchError = new RequestError(
FileExceptionMessages.DOWNLOAD_MISMATCH
);
mismatchError.code = 'CONTENT_DOWNLOAD_MISMATCH';

throughStream.destroy(mismatchError);
} else {
return;
}
};
};

throughStream.on('reading', makeRequest);

return throughStream;
Expand Down Expand Up @@ -1971,6 +1914,7 @@ class File extends ServiceObject<File> {
crc32c,
md5,
crc32cGenerator: this.crc32cGenerator,
updateHashesOnly: true,
});

const fileWriteStream = duplexify();
Expand Down
51 changes: 47 additions & 4 deletions src/hash-stream-validator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,41 @@ import {
CRC32C_DEFAULT_VALIDATOR_GENERATOR,
CRC32CValidator,
} from './crc32c';
import {FileExceptionMessages, RequestError} from './file';

interface HashStreamValidatorOptions {
/** Enables CRC32C calculation. To validate a provided value use `crc32cExpected`. */
crc32c: boolean;
/** Enables MD5 calculation. To validate a provided value use `md5Expected`. */
md5: boolean;
/** Set a custom CRC32C generator */
crc32cGenerator: CRC32CValidatorGenerator;
/** Sets the expected CRC32C value to verify once all data has been consumed. Also sets the `crc32c` option to `true` */
crc32cExpected?: string;
/** Sets the expected MD5 value to verify once all data has been consumed. Also sets the `md5` option to `true` */
md5Expected?: string;
/** Indicates whether or not to run a validation check or only update the hash values */
updateHashesOnly?: boolean;
}

class HashStreamValidator extends Transform {
readonly crc32cEnabled: boolean;
readonly md5Enabled: boolean;
readonly crc32cExpected: string | undefined;
readonly md5Expected: string | undefined;
readonly updateHashesOnly: boolean = false;

#crc32cHash?: CRC32CValidator = undefined;
#md5Hash?: Hash = undefined;

#md5Digest = '';

constructor(options: Partial<HashStreamValidatorOptions> = {}) {
super();

this.crc32cEnabled = !!options.crc32c;
this.md5Enabled = !!options.md5;
this.updateHashesOnly = !!options.updateHashesOnly;
this.crc32cExpected = options.crc32cExpected;
this.md5Expected = options.md5Expected;

if (this.crc32cEnabled) {
const crc32cGenerator =
Expand All @@ -54,12 +68,41 @@ class HashStreamValidator extends Transform {
}
}

_flush(callback: () => void) {
_flush(callback: (error?: Error | null | undefined) => void) {
if (this.#md5Hash) {
this.#md5Digest = this.#md5Hash.digest('base64');
}

callback();
if (this.updateHashesOnly) {
callback();
return;
}

// If we're doing validation, assume the worst-- a data integrity
// mismatch. If not, these tests won't be performed, and we can assume
// the best.
// We must check if the server decompressed the data on serve because hash
// validation is not possible in this case.
let failed = this.crc32cEnabled || this.md5Enabled;

if (this.crc32cEnabled && this.crc32cExpected) {
failed = !this.test('crc32c', this.crc32cExpected);
}

if (this.md5Enabled && this.md5Expected) {
failed = !this.test('md5', this.md5Expected);
}

if (failed) {
const mismatchError = new RequestError(
FileExceptionMessages.DOWNLOAD_MISMATCH
);
mismatchError.code = 'CONTENT_DOWNLOAD_MISMATCH';

callback(mismatchError);
} else {
callback();
}
}

_transform(
Expand Down
1 change: 0 additions & 1 deletion test/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1038,7 +1038,6 @@ describe('File', () => {
it('should create an authenticated request', done => {
file.requestStream = (opts: DecorateRequestOptions) => {
assert.deepStrictEqual(opts, {
forever: false,
uri: '',
headers: {
'Accept-Encoding': 'gzip',
Expand Down

0 comments on commit 2c97310

Please sign in to comment.