Skip to content

Commit

Permalink
feat: add headers option to MPU (#2303)
Browse files Browse the repository at this point in the history
* feat: add headers option to MPU

* remove proxyquire from transfer manager tests

* add abort option to MPU

* turn auto abort on by default and add option to disable

* change wording in docs
  • Loading branch information
ddelgrosso1 authored Sep 21, 2023
1 parent 9724239 commit 7f58f30
Show file tree
Hide file tree
Showing 2 changed files with 206 additions and 187 deletions.
53 changes: 50 additions & 3 deletions src/transfer-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,22 +91,25 @@ export interface UploadFileInChunksOptions {
uploadName?: string;
maxQueueSize?: number;
uploadId?: string;
autoAbortFailure?: boolean;
partsMap?: Map<number, string>;
validation?: 'md5' | false;
headers?: {[key: string]: string};
}

export interface MultiPartUploadHelper {
bucket: Bucket;
fileName: string;
uploadId?: string;
partsMap?: Map<number, string>;
initiateUpload(): Promise<void>;
initiateUpload(headers?: {[key: string]: string}): Promise<void>;
uploadPart(
partNumber: number,
chunk: Buffer,
validation?: 'md5' | false
): Promise<void>;
completeUpload(): Promise<GaxiosResponse | undefined>;
abortUpload(): Promise<void>;
}

export type MultiPartHelperGenerator = (
Expand Down Expand Up @@ -185,13 +188,14 @@ class XMLMultiPartUploadHelper implements MultiPartUploadHelper {
*
* @returns {Promise<void>}
*/
async initiateUpload(): Promise<void> {
async initiateUpload(headers: {[key: string]: string} = {}): Promise<void> {
const url = `${this.baseUrl}?uploads`;
return retry(async bail => {
try {
const res = await this.authClient.request({
method: 'POST',
url,
headers,
});
if (res.data && res.data.error) {
throw res.data.error;
Expand Down Expand Up @@ -285,6 +289,30 @@ class XMLMultiPartUploadHelper implements MultiPartUploadHelper {
}, this.retryOptions);
}

/**
* Aborts an multipart upload that is in progress. Once aborted, any parts in the process of being uploaded fail,
* and future requests using the upload ID fail.
*
* @returns {Promise<void>}
*/
async abortUpload(): Promise<void> {
const url = `${this.baseUrl}?uploadId=${this.uploadId}`;
return retry(async bail => {
try {
const res = await this.authClient.request({
url,
method: 'DELETE',
});
if (res.data && res.data.error) {
throw res.data.error;
}
} catch (e) {
this.#handleErrorResponse(e as Error, bail);
return;
}
}, this.retryOptions);
}

/**
* Handles error responses and calls the bail function if the error should not be retried.
*
Expand Down Expand Up @@ -615,6 +643,10 @@ export class TransferManager {
* @property {string} [uploadId] If specified attempts to resume a previous upload.
* @property {Map} [partsMap] If specified alongside uploadId, attempts to resume a previous upload from the last chunk
* specified in partsMap
* @property {object} [headers] headers to be sent when initiating the multipart upload.
* See {@link https://cloud.google.com/storage/docs/xml-api/post-object-multipart#request_headers| Request Headers: Initiate a Multipart Upload}
* @property {boolean} [autoAbortFailure] boolean to indicate if an in progress upload session will be automatically aborted upon failure. If not set,
* failures will be automatically aborted.
* @experimental
*/
/**
Expand Down Expand Up @@ -669,7 +701,7 @@ export class TransferManager {
let promises: Promise<void>[] = [];
try {
if (options.uploadId === undefined) {
await mpuHelper.initiateUpload();
await mpuHelper.initiateUpload(options.headers);
}
const startOrResumptionByte = mpuHelper.partsMap!.size * chunkSize;
const readStream = createReadStream(filePath, {
Expand All @@ -692,6 +724,21 @@ export class TransferManager {
await Promise.all(promises);
return await mpuHelper.completeUpload();
} catch (e) {
if (
(options.autoAbortFailure === undefined || options.autoAbortFailure) &&
mpuHelper.uploadId
) {
try {
await mpuHelper.abortUpload();
return;
} catch (e) {
throw new MultiPartUploadError(
(e as Error).message,
mpuHelper.uploadId!,
mpuHelper.partsMap!
);
}
}
throw new MultiPartUploadError(
(e as Error).message,
mpuHelper.uploadId!,
Expand Down
Loading

0 comments on commit 7f58f30

Please sign in to comment.