diff --git a/src/file.ts b/src/file.ts index d6299acf3..dd4fc6760 100644 --- a/src/file.ts +++ b/src/file.ts @@ -394,7 +394,7 @@ export interface SetStorageClassCallback { (err?: Error | null, apiResponse?: Metadata): void; } -class RequestError extends Error { +export class RequestError extends Error { code?: string; errors?: Error[]; } @@ -1387,10 +1387,8 @@ class File extends ServiceObject { const throughStream = new PassThroughShim(); - let isCompressed = true; let crc32c = true; let md5 = false; - let safeToValidate = true; if (typeof options.validation === 'string') { const value = options.validation.toLowerCase().trim(); @@ -1415,6 +1413,98 @@ class File extends ServiceObject { md5 = false; } + const onComplete = (err: Error | null) => { + if (err) { + throughStream.destroy(err); + } + }; + + // We listen to the response event from the request stream so that we + // can... + // + // 1) Intercept any data from going to the user if an error occurred. + // 2) Calculate the hashes from the http.IncomingMessage response + // stream, + // which will return the bytes from the source without decompressing + // gzip'd content. We then send it through decompressed, if + // applicable, to the user. + const onResponse = ( + err: Error | null, + _body: ResponseBody, + rawResponseStream: Metadata + ) => { + if (err) { + // Get error message from the body. + this.getBufferFromReadable(rawResponseStream).then(body => { + err.message = body.toString('utf8'); + throughStream.destroy(err); + }); + + return; + } + + const headers = rawResponseStream.toJSON().headers; + const isCompressed = headers['content-encoding'] === 'gzip'; + const hashes: {crc32c?: string; md5?: string} = {}; + + // The object is safe to validate if: + // 1. It was stored gzip and returned to us gzip OR + // 2. It was never stored as gzip + const safeToValidate = + (headers['x-goog-stored-content-encoding'] === 'gzip' && + isCompressed) || + headers['x-goog-stored-content-encoding'] === 'identity'; + + const transformStreams: Transform[] = []; + + if (shouldRunValidation) { + // The x-goog-hash header should be set with a crc32c and md5 hash. + // ex: headers['x-goog-hash'] = 'crc32c=xxxx,md5=xxxx' + if (typeof headers['x-goog-hash'] === 'string') { + headers['x-goog-hash'] + .split(',') + .forEach((hashKeyValPair: string) => { + const delimiterIndex = hashKeyValPair.indexOf('='); + const hashType = hashKeyValPair.substr(0, delimiterIndex); + const hashValue = hashKeyValPair.substr(delimiterIndex + 1); + hashes[hashType as 'crc32c' | 'md5'] = hashValue; + }); + } + + validateStream = new HashStreamValidator({ + crc32c, + md5, + crc32cGenerator: this.crc32cGenerator, + crc32cExpected: hashes.crc32c, + md5Expected: hashes.md5, + }); + } + + if (md5 && !hashes.md5) { + const hashError = new RequestError( + FileExceptionMessages.MD5_NOT_AVAILABLE + ); + hashError.code = 'MD5_NOT_AVAILABLE'; + throughStream.destroy(hashError); + return; + } + + if (safeToValidate && shouldRunValidation && validateStream) { + transformStreams.push(validateStream); + } + + if (isCompressed && options.decompress) { + transformStreams.push(zlib.createGunzip()); + } + + pipeline( + rawResponseStream, + ...(transformStreams as [Transform]), + throughStream, + onComplete + ); + }; + // Authenticate the request, then pipe the remote API request to the stream // returned to the user. const makeRequest = () => { @@ -1445,14 +1535,11 @@ class File extends ServiceObject { } const reqOpts = { - forever: false, uri: '', headers, qs: query, }; - const hashes: {crc32c?: string; md5?: string} = {}; - this.requestStream(reqOpts) .on('error', err => { throughStream.destroy(err); @@ -1462,151 +1549,7 @@ class File extends ServiceObject { util.handleResp(null, res, null, onResponse); }) .resume(); - - // We listen to the response event from the request stream so that we - // can... - // - // 1) Intercept any data from going to the user if an error occurred. - // 2) Calculate the hashes from the http.IncomingMessage response - // stream, - // which will return the bytes from the source without decompressing - // gzip'd content. We then send it through decompressed, if - // applicable, to the user. - const onResponse = ( - err: Error | null, - _body: ResponseBody, - rawResponseStream: Metadata - ) => { - if (err) { - // Get error message from the body. - this.getBufferFromReadable(rawResponseStream).then(body => { - err.message = body.toString('utf8'); - throughStream.destroy(err); - }); - - return; - } - - rawResponseStream.on('error', onComplete); - - const headers = rawResponseStream.toJSON().headers; - isCompressed = headers['content-encoding'] === 'gzip'; - - // The object is safe to validate if: - // 1. It was stored gzip and returned to us gzip OR - // 2. It was never stored as gzip - safeToValidate = - (headers['x-goog-stored-content-encoding'] === 'gzip' && - isCompressed) || - headers['x-goog-stored-content-encoding'] === 'identity'; - - const transformStreams: Transform[] = []; - - if (shouldRunValidation) { - // The x-goog-hash header should be set with a crc32c and md5 hash. - // ex: headers['x-goog-hash'] = 'crc32c=xxxx,md5=xxxx' - if (typeof headers['x-goog-hash'] === 'string') { - headers['x-goog-hash'] - .split(',') - .forEach((hashKeyValPair: string) => { - const delimiterIndex = hashKeyValPair.indexOf('='); - const hashType = hashKeyValPair.substr(0, delimiterIndex); - const hashValue = hashKeyValPair.substr(delimiterIndex + 1); - hashes[hashType as 'crc32c' | 'md5'] = hashValue; - }); - } - - validateStream = new HashStreamValidator({ - crc32c, - md5, - crc32cGenerator: this.crc32cGenerator, - }); - - transformStreams.push(validateStream); - } - - if (isCompressed && options.decompress) { - transformStreams.push(zlib.createGunzip()); - } - - const handoffStream = new PassThrough({ - final: cb => { - // Preserving `onComplete`'s ability to - // destroy `throughStream` before pipeline - // attempts to. - onComplete(null) - .then(() => { - cb(); - }) - .catch(cb); - }, - }); - - pipeline( - rawResponseStream, - ...(transformStreams as [Transform]), - handoffStream, - throughStream, - onComplete - ); - }; - - // This is hooked to the `complete` event from the request stream. This is - // our chance to validate the data and let the user know if anything went - // wrong. - let onCompleteCalled = false; - const onComplete = async (err: Error | null) => { - if (onCompleteCalled) { - return; - } - - onCompleteCalled = true; - - if (err) { - throughStream.destroy(err); - return; - } - - if (rangeRequest || !shouldRunValidation) { - return; - } - - // If we're doing validation, assume the worst-- a data integrity - // mismatch. If not, these tests won't be performed, and we can assume - // the best. - // We must check if the server decompressed the data on serve because hash - // validation is not possible in this case. - let failed = (crc32c || md5) && safeToValidate; - if (validateStream && safeToValidate) { - if (crc32c && hashes.crc32c) { - failed = !validateStream.test('crc32c', hashes.crc32c); - } - - if (md5 && hashes.md5) { - failed = !validateStream.test('md5', hashes.md5); - } - } - - if (md5 && !hashes.md5) { - const hashError = new RequestError( - FileExceptionMessages.MD5_NOT_AVAILABLE - ); - hashError.code = 'MD5_NOT_AVAILABLE'; - - throughStream.destroy(hashError); - } else if (failed) { - const mismatchError = new RequestError( - FileExceptionMessages.DOWNLOAD_MISMATCH - ); - mismatchError.code = 'CONTENT_DOWNLOAD_MISMATCH'; - - throughStream.destroy(mismatchError); - } else { - return; - } - }; }; - throughStream.on('reading', makeRequest); return throughStream; @@ -1971,6 +1914,7 @@ class File extends ServiceObject { crc32c, md5, crc32cGenerator: this.crc32cGenerator, + updateHashesOnly: true, }); const fileWriteStream = duplexify(); diff --git a/src/hash-stream-validator.ts b/src/hash-stream-validator.ts index db4a527f4..300ada7d9 100644 --- a/src/hash-stream-validator.ts +++ b/src/hash-stream-validator.ts @@ -20,20 +20,31 @@ import { CRC32C_DEFAULT_VALIDATOR_GENERATOR, CRC32CValidator, } from './crc32c'; +import {FileExceptionMessages, RequestError} from './file'; interface HashStreamValidatorOptions { + /** Enables CRC32C calculation. To validate a provided value use `crc32cExpected`. */ crc32c: boolean; + /** Enables MD5 calculation. To validate a provided value use `md5Expected`. */ md5: boolean; + /** Set a custom CRC32C generator */ crc32cGenerator: CRC32CValidatorGenerator; + /** Sets the expected CRC32C value to verify once all data has been consumed. Also sets the `crc32c` option to `true` */ + crc32cExpected?: string; + /** Sets the expected MD5 value to verify once all data has been consumed. Also sets the `md5` option to `true` */ + md5Expected?: string; + /** Indicates whether or not to run a validation check or only update the hash values */ + updateHashesOnly?: boolean; } - class HashStreamValidator extends Transform { readonly crc32cEnabled: boolean; readonly md5Enabled: boolean; + readonly crc32cExpected: string | undefined; + readonly md5Expected: string | undefined; + readonly updateHashesOnly: boolean = false; #crc32cHash?: CRC32CValidator = undefined; #md5Hash?: Hash = undefined; - #md5Digest = ''; constructor(options: Partial = {}) { @@ -41,6 +52,9 @@ class HashStreamValidator extends Transform { this.crc32cEnabled = !!options.crc32c; this.md5Enabled = !!options.md5; + this.updateHashesOnly = !!options.updateHashesOnly; + this.crc32cExpected = options.crc32cExpected; + this.md5Expected = options.md5Expected; if (this.crc32cEnabled) { const crc32cGenerator = @@ -54,12 +68,41 @@ class HashStreamValidator extends Transform { } } - _flush(callback: () => void) { + _flush(callback: (error?: Error | null | undefined) => void) { if (this.#md5Hash) { this.#md5Digest = this.#md5Hash.digest('base64'); } - callback(); + if (this.updateHashesOnly) { + callback(); + return; + } + + // If we're doing validation, assume the worst-- a data integrity + // mismatch. If not, these tests won't be performed, and we can assume + // the best. + // We must check if the server decompressed the data on serve because hash + // validation is not possible in this case. + let failed = this.crc32cEnabled || this.md5Enabled; + + if (this.crc32cEnabled && this.crc32cExpected) { + failed = !this.test('crc32c', this.crc32cExpected); + } + + if (this.md5Enabled && this.md5Expected) { + failed = !this.test('md5', this.md5Expected); + } + + if (failed) { + const mismatchError = new RequestError( + FileExceptionMessages.DOWNLOAD_MISMATCH + ); + mismatchError.code = 'CONTENT_DOWNLOAD_MISMATCH'; + + callback(mismatchError); + } else { + callback(); + } } _transform( diff --git a/test/file.ts b/test/file.ts index 73ddef124..14b493f22 100644 --- a/test/file.ts +++ b/test/file.ts @@ -1038,7 +1038,6 @@ describe('File', () => { it('should create an authenticated request', done => { file.requestStream = (opts: DecorateRequestOptions) => { assert.deepStrictEqual(opts, { - forever: false, uri: '', headers: { 'Accept-Encoding': 'gzip',