Skip to content

Commit

Permalink
feat: Transfer Manager Metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
d-goog committed Sep 14, 2023
1 parent 9bba9e3 commit ad05114
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 18 deletions.
9 changes: 8 additions & 1 deletion src/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import {
ApiError,
Duplexify,
DuplexifyConstructor,
GCCL_GCS_CMD_KEY,
} from './nodejs-common/util';
// eslint-disable-next-line @typescript-eslint/no-var-requires
const duplexify: DuplexifyConstructor = require('duplexify');
Expand Down Expand Up @@ -221,6 +222,7 @@ type PublicResumableUploadOptions =
export interface CreateResumableUploadOptions
extends Pick<resumableUpload.UploadConfig, PublicResumableUploadOptions> {
preconditionOpts?: PreconditionOptions;
[GCCL_GCS_CMD_KEY]?: resumableUpload.UploadConfig[typeof GCCL_GCS_CMD_KEY];
}

export type CreateResumableUploadResponse = [string];
Expand Down Expand Up @@ -371,6 +373,7 @@ export interface CreateReadStreamOptions {
start?: number;
end?: number;
decompress?: boolean;
[GCCL_GCS_CMD_KEY]?: string;
}

export interface SaveOptions extends CreateWriteStreamOptions {
Expand Down Expand Up @@ -1580,12 +1583,16 @@ class File extends ServiceObject<File, FileMetadata> {
headers.Range = `bytes=${tailRequest ? end : `${start}-${end}`}`;
}

const reqOpts = {
const reqOpts: DecorateRequestOptions = {
uri: '',
headers,
qs: query,
};

if (options[GCCL_GCS_CMD_KEY]) {
reqOpts[GCCL_GCS_CMD_KEY] = options[GCCL_GCS_CMD_KEY];
}

this.requestStream(reqOpts)
.on('error', err => {
throughStream.destroy(err);
Expand Down
5 changes: 5 additions & 0 deletions src/nodejs-common/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {Interceptor} from './service-object';
import {
BodyResponseCallback,
DecorateRequestOptions,
GCCL_GCS_CMD_KEY,
MakeAuthenticatedRequest,
PackageJson,
util,
Expand Down Expand Up @@ -253,6 +254,10 @@ export class Service {
} gccl-invocation-id/${uuid.v4()}`,
};

if (reqOpts[GCCL_GCS_CMD_KEY]) {
reqOpts.headers['x-goog-api-client'] += reqOpts[GCCL_GCS_CMD_KEY];
}

if (reqOpts.shouldReturnStream) {
return this.makeAuthenticatedRequest(reqOpts) as {} as r.Request;
} else {
Expand Down
23 changes: 19 additions & 4 deletions src/nodejs-common/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ import {getRuntimeTrackingString} from '../util';

const packageJson = require('../../../package.json');

export const GCCL_GCS_CMD_KEY = Symbol('GCCL_GCS_CMD');

export interface GCCL_GCS_CMD {
[GCCL_GCS_CMD_KEY]?: string;
}

// eslint-disable-next-line @typescript-eslint/no-var-requires
const duplexify: DuplexifyConstructor = require('duplexify');

Expand Down Expand Up @@ -233,6 +239,7 @@ export interface DecorateRequestOptions extends r.CoreOptions {
interceptors_?: Interceptor[];
shouldReturnStream?: boolean;
projectId?: string;
[GCCL_GCS_CMD_KEY]?: string;
}

export interface ParsedHttpResponseBody {
Expand Down Expand Up @@ -530,7 +537,7 @@ export class Util {
body: writeStream,
},
],
} as {} as r.OptionsWithUri;
} as {} as r.OptionsWithUri & GCCL_GCS_CMD;

options.makeAuthenticatedRequest(reqOpts, {
onAuthenticated(err, authenticatedReqOpts) {
Expand All @@ -539,7 +546,9 @@ export class Util {
return;
}

requestDefaults.headers = util._getDefaultHeaders();
requestDefaults.headers = util._getDefaultHeaders(
reqOpts[GCCL_GCS_CMD_KEY]
);
const request = teenyRequest.defaults(requestDefaults);
request(authenticatedReqOpts!, (err, resp, body) => {
util.handleResp(err, resp, body, (err, data) => {
Expand Down Expand Up @@ -1014,13 +1023,19 @@ export class Util {
: [optionsOrCallback as T, cb as C];
}

_getDefaultHeaders() {
return {
_getDefaultHeaders(gcclGcsCmd?: string) {
const headers = {
'User-Agent': util.getUserAgentFromPackageJson(packageJson),
'x-goog-api-client': `${getRuntimeTrackingString()} gccl/${
packageJson.version
} gccl-invocation-id/${uuid.v4()}`,
};

if (gcclGcsCmd) {
headers['x-goog-api-client'] += ` ${gcclGcsCmd}`;
}

return headers;
}
}

Expand Down
33 changes: 24 additions & 9 deletions src/resumable-upload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import retry = require('async-retry');
import {RetryOptions, PreconditionOptions} from './storage';
import * as uuid from 'uuid';
import {getRuntimeTrackingString} from './util';
import {GCCL_GCS_CMD_KEY} from './nodejs-common/util';

const NOT_FOUND_STATUS_CODE = 404;
const RESUMABLE_INCOMPLETE_STATUS_CODE = 308;
Expand Down Expand Up @@ -193,6 +194,8 @@ export interface UploadConfig extends Pick<WritableOptions, 'highWaterMark'> {
* Configuration options for retrying retryable errors.
*/
retryOptions: RetryOptions;

[GCCL_GCS_CMD_KEY]?: string;
}

export interface ConfigMetadata {
Expand Down Expand Up @@ -274,6 +277,7 @@ export class Upload extends Writable {
private localWriteCache: Buffer[] = [];
private localWriteCacheByteLength = 0;
private upstreamEnded = false;
#gcclGcsCMD?: string;

constructor(cfg: UploadConfig) {
super(cfg);
Expand Down Expand Up @@ -347,6 +351,8 @@ export class Upload extends Writable {
: NaN;
this.contentLength = isNaN(contentLength) ? '*' : contentLength;

this.#gcclGcsCMD = cfg[GCCL_GCS_CMD_KEY];

this.once('writing', () => {
if (this.uri) {
this.continueUploading();
Expand Down Expand Up @@ -585,6 +591,11 @@ export class Upload extends Writable {
delete metadata.contentType;
}

const gcsCommandSuffix = this.#gcclGcsCMD ? ` ${this.#gcclGcsCMD}` : '';
const googAPIClient = `${getRuntimeTrackingString()} gccl/${
packageJson.version
} gccl-invocation-id/${this.currentInvocationId.uri}${gcsCommandSuffix}`;

// Check if headers already exist before creating new ones
const reqOpts: GaxiosOptions = {
method: 'POST',
Expand All @@ -598,9 +609,7 @@ export class Upload extends Writable {
),
data: metadata,
headers: {
'x-goog-api-client': `${getRuntimeTrackingString()} gccl/${
packageJson.version
} gccl-invocation-id/${this.currentInvocationId.uri}`,
'x-goog-api-client': googAPIClient,
...headers,
},
};
Expand Down Expand Up @@ -766,10 +775,13 @@ export class Upload extends Writable {
},
});

const gcsCommandSuffix = this.#gcclGcsCMD ? ` ${this.#gcclGcsCMD}` : '';
const googAPIClient = `${getRuntimeTrackingString()} gccl/${
packageJson.version
} gccl-invocation-id/${this.currentInvocationId.chunk}${gcsCommandSuffix}`;

const headers: GaxiosOptions['headers'] = {
'x-goog-api-client': `${getRuntimeTrackingString()} gccl/${
packageJson.version
} gccl-invocation-id/${this.currentInvocationId.chunk}`,
'x-goog-api-client': googAPIClient,
};

// If using multiple chunk upload, set appropriate header
Expand Down Expand Up @@ -904,15 +916,18 @@ export class Upload extends Writable {
}

private async getAndSetOffset() {
const gcsCommandSuffix = this.#gcclGcsCMD ? ` ${this.#gcclGcsCMD}` : '';
const googAPIClient = `${getRuntimeTrackingString()} gccl/${
packageJson.version
} gccl-invocation-id/${this.currentInvocationId.offset}${gcsCommandSuffix}`;

const opts: GaxiosOptions = {
method: 'PUT',
url: this.uri!,
headers: {
'Content-Length': 0,
'Content-Range': 'bytes */*',
'x-goog-api-client': `${getRuntimeTrackingString()} gccl/${
packageJson.version
} gccl-invocation-id/${this.currentInvocationId.offset}`,
'x-goog-api-client': googAPIClient,
},
};
try {
Expand Down
53 changes: 49 additions & 4 deletions src/transfer-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import * as retry from 'async-retry';
import {ApiError} from './nodejs-common';
import {GaxiosResponse, Headers} from 'gaxios';
import {createHash} from 'crypto';
import {GCCL_GCS_CMD_KEY} from './nodejs-common/util';

/**
* Default number of concurrently executing promises to use when calling uploadManyFiles.
Expand Down Expand Up @@ -64,6 +65,21 @@ const UPLOAD_IN_CHUNKS_DEFAULT_CHUNK_SIZE = 32 * 1024 * 1024;
const DEFAULT_PARALLEL_CHUNKED_UPLOAD_LIMIT = 2;

const EMPTY_REGEX = '(?:)';

/**
* The `gccl-gcs-cmd` value for the `X-Goog-API-Client` header.
* Example: `gccl-gcs-cmd/tm.upload_many`
*
* @see {@link GCCL_GCS_CMD}.
* @see {@link GCCL_GCS_CMD_KEY}.
*/
const GCCL_GCS_CMD_FEATURE = {
UPLOAD_MANY: 'tm.upload_many',
DOWNLOAD_MANY: 'tm.download_many',
UPLOAD_SHARDED: 'tm.upload_sharded',
DOWNLOAD_SHARDED: 'tm.download_sharded',
};

export interface UploadManyFilesOptions {
concurrencyLimit?: number;
skipIfExists?: boolean;
Expand Down Expand Up @@ -168,7 +184,9 @@ class XMLMultiPartUploadHelper implements MultiPartUploadHelper {
this.bucket = bucket;
this.fileName = fileName;
// eslint-disable-next-line prettier/prettier
this.baseUrl = `https://${bucket.name}.${new URL(this.bucket.storage.apiEndpoint).hostname}/${fileName}`;
this.baseUrl = `https://${bucket.name}.${
new URL(this.bucket.storage.apiEndpoint).hostname
}/${fileName}`;
this.xmlBuilder = new XMLBuilder({arrayNodeName: 'Part'});
this.xmlParser = new XMLParser();
this.partsMap = partsMap || new Map<number, string>();
Expand All @@ -189,7 +207,20 @@ class XMLMultiPartUploadHelper implements MultiPartUploadHelper {
const url = `${this.baseUrl}?uploads`;
return retry(async bail => {
try {
const headers = await this.authClient.getRequestHeaders();

for (const [key, value] of Object.entries(headers)) {
if (key.toLocaleLowerCase().trim() === 'x-goog-api-client') {
// Prepend command feature to value, if not already there
if (!value.includes(GCCL_GCS_CMD_FEATURE.UPLOAD_SHARDED)) {
headers[key] = `${value} ${GCCL_GCS_CMD_FEATURE.UPLOAD_SHARDED}`;
}
break;
}
}

const res = await this.authClient.request({
headers,
method: 'POST',
url,
});
Expand Down Expand Up @@ -314,6 +345,10 @@ export class TransferManager {
this.bucket = bucket;
}

#setMethodHeader() {
// .
}

/**
* @typedef {object} UploadManyFilesOptions
* @property {number} [concurrencyLimit] The number of concurrently executing promises
Expand Down Expand Up @@ -394,6 +429,7 @@ export class TransferManager {

const passThroughOptionsCopy: UploadOptions = {
...options.passthroughOptions,
[GCCL_GCS_CMD_KEY]: GCCL_GCS_CMD_FEATURE.UPLOAD_MANY,
};

passThroughOptionsCopy.destination = filePath;
Expand All @@ -403,6 +439,7 @@ export class TransferManager {
passThroughOptionsCopy.destination
);
}

promises.push(
limit(() =>
this.bucket.upload(filePath, passThroughOptionsCopy as UploadOptions)
Expand Down Expand Up @@ -487,6 +524,7 @@ export class TransferManager {
for (const file of files) {
const passThroughOptionsCopy = {
...options.passthroughOptions,
[GCCL_GCS_CMD_KEY]: GCCL_GCS_CMD_FEATURE.DOWNLOAD_MANY,
};

if (options.prefix) {
Expand All @@ -499,6 +537,7 @@ export class TransferManager {
if (options.stripPrefix) {
passThroughOptionsCopy.destination = file.name.replace(regex, '');
}

promises.push(limit(() => file.download(passThroughOptionsCopy)));
}

Expand Down Expand Up @@ -569,9 +608,15 @@ export class TransferManager {
chunkEnd = chunkEnd > size ? size : chunkEnd;
promises.push(
limit(() =>
file.download({start: chunkStart, end: chunkEnd}).then(resp => {
return fileToWrite.write(resp[0], 0, resp[0].length, chunkStart);
})
file
.download({
start: chunkStart,
end: chunkEnd,
[GCCL_GCS_CMD_KEY]: GCCL_GCS_CMD_FEATURE.DOWNLOAD_SHARDED,
})
.then(resp => {
return fileToWrite.write(resp[0], 0, resp[0].length, chunkStart);
})
)
);

Expand Down

0 comments on commit ad05114

Please sign in to comment.