Skip to content

Commit

Permalink
feat: add hosts to generate resume file name
Browse files Browse the repository at this point in the history
supplementary adjustment for fa92d96
  • Loading branch information
lihsai0 committed Jul 2, 2024
1 parent 64ffaf9 commit 3d75323
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 117 deletions.
239 changes: 146 additions & 93 deletions qiniu/storage/resume.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const rpc = require('../rpc');
const { SERVICE_NAME } = require('../httpc/region');
const { ResponseWrapper } = require('../httpc/responseWrapper');
const { Endpoint } = require('../httpc/endpoint');
const { StaticRegionsProvider } = require('../httpc/regionsProvider');
const { EndpointsRetryPolicy } = require('../httpc/endpointsRetryPolicy');
const { RegionsRetryPolicy } = require('../httpc/regionsRetryPolicy');
const { Retrier } = require('../retry');
Expand Down Expand Up @@ -86,6 +87,7 @@ function PutExtra (
this.fname = fname || '';
this.params = params || {};
this.mimeType = mimeType || null;
// @deprecated use resumeRecorder and resumeKey instead
this.resumeRecordFile = resumeRecordFile || null;
this.progressCallback = progressCallback || null;
this.partSize = partSize || conf.BLOCK_SIZE;
Expand All @@ -100,56 +102,83 @@ function PutExtra (
* @param {Object} options
* @param {string} options.accessKey
* @param {string} options.bucketName
* @param {boolean} [options.retryable]
* @param {'v1' | 'v2' | string} [options.uploadApiVersion]
* @param {JsonFileRecorder} [options.resumeRecorder]
* @param {string} [options.resumeKey]
* @param {string} [options.key]
* @param {string} [options.filePath]
* @param {PutExtra} options.putExtra
*
* @returns Retrier
*/
function _getRegionsRetrier (options) {
const {
bucketName,
accessKey,
retryable = true,
bucketName,
key,
filePath,

uploadApiVersion,
resumeRecorder,
resumeKey
putExtra
} = options;

const preferredScheme = this.config.useHttpsDomain ? 'https' : 'http';
let preferredEndpoints;
const isResumeAvailable = Boolean(resumeRecorder && resumeKey);
if (isResumeAvailable) {
const resumeInfo = resumeRecorder.getSync(resumeKey);
if (resumeInfo && Array.isArray(resumeInfo.upDomains)) {
preferredEndpoints = resumeInfo.upDomains.map(d =>
new Endpoint(d, { defaultScheme: preferredScheme }));
}

let regionsProviderPromise = this.config.getRegionsProvider({
accessKey,
bucketName
});

// generate resume key, if there is a recorder but not resume key
if (putExtra.resumeRecorder && !putExtra.resumeKey) {
regionsProviderPromise = regionsProviderPromise
.then(regionsProvider => regionsProvider.getRegions())
.then(regions => {
if (!regions || !regions.length) {
return Promise.reject(new Error(`no region available for the bucket "${bucketName}"`));
}
const upAccEndpoints = regions[0].services[SERVICE_NAME.UP_ACC] || [];
const upEndpoints = regions[0].services[SERVICE_NAME.UP] || [];
const upHosts = upAccEndpoints.concat(upEndpoints).map(e => e.host);
putExtra.resumeKey = putExtra.resumeRecorder.generateKeySync({
hosts: upHosts,
accessKey: accessKey,
bucketName: bucketName,
key: key,
filePath: filePath,
version: putExtra.version,
partSize: putExtra.partSize
});
return new StaticRegionsProvider(regions);
});
}

return this.config.getRegionsProvider({
bucketName,
accessKey
})
return regionsProviderPromise
.then(regionsProvider => {
// handle preferred endpoints
let preferredEndpoints;
if (putExtra.resumeRecorder && putExtra.resumeKey) {
const resumeInfo = putExtra.resumeRecorder.getSync(putExtra.resumeKey);
if (resumeInfo && Array.isArray(resumeInfo.upDomains)) {
preferredEndpoints = resumeInfo.upDomains.map(d =>
new Endpoint(d, { defaultScheme: preferredScheme }));
}
}

const serviceNames = this.config.accelerateUploading
? [SERVICE_NAME.UP_ACC, SERVICE_NAME.UP]
: [SERVICE_NAME.UP];
const retryPolicies = [
new AccUnavailableRetryPolicy(),
new TokenExpiredRetryPolicy({
uploadApiVersion,
uploadApiVersion: putExtra.version,
recordExistsHandler: () => {
if (!isResumeAvailable) {
if (!putExtra.resumeRecorder || !putExtra.resumeKey) {
return;
}
resumeRecorder.hasSync(resumeKey);
putExtra.resumeRecorder.hasSync(putExtra.resumeKey);
},
recordDeleteHandler: () => {
if (!isResumeAvailable) {
if (!putExtra.resumeRecorder || !putExtra.resumeKey) {
return;
}
resumeRecorder.deleteSync(resumeKey);
putExtra.resumeRecorder.deleteSync(putExtra.resumeKey);
}
}),
new EndpointsRetryPolicy({
Expand All @@ -159,10 +188,10 @@ function _getRegionsRetrier (options) {
regionsProvider,
serviceNames,
onChangedRegion: () => {
if (!isResumeAvailable) {
if (!putExtra.resumeRecorder || !putExtra.resumeKey) {
return;
}
resumeRecorder.deleteSync(resumeKey);
putExtra.resumeRecorder.deleteSync(putExtra.resumeKey);
},
preferredEndpoints
})
Expand All @@ -175,12 +204,12 @@ function _getRegionsRetrier (options) {
if (context.error.noNeedRetry) {
return false;
}
return retryable;
return true;
}
if (policy instanceof AccUnavailableRetryPolicy) {
return true;
}
return retryable && context.result && context.result.needRetry();
return context.result && context.result.needRetry();
}
});
});
Expand Down Expand Up @@ -219,33 +248,39 @@ ResumeUploader.prototype.putStream = function (
}
);

// Why need retrier even if retryable is false?
// Because the retrier is used to get the endpoints,
// which will be initialed by region policy.
const result = _getRegionsRetrier.call(this, {
bucketName: util.getBucketFromUptoken(uploadToken),
accessKey: util.getAKFromUptoken(uploadToken),
retryable: false
const bucketName = util.getBucketFromUptoken(uploadToken);
const accessKey = util.getAKFromUptoken(uploadToken);

// useless by not retryable
// uploadApiVersion: putExtra.version,
const result = this.config.getRegionsProvider({
bucketName,
accessKey
})
.then(retrier => Promise.all([
retrier,
retrier.initContext()
]))
.then(([retrier, context]) => retrier.retry({
func: context => putReq(
context.endpoint,
.then(regionsProvider => regionsProvider.getRegions())
.then(regions => {
if (!regions || !regions.length) {
return Promise.reject(new Error('no region available for the bucket', bucketName));
}
const preferService = this.config.accelerateUploading
? SERVICE_NAME.UP_ACC
: SERVICE_NAME.UP;
if (
!regions[0].services ||
!regions[0].services[preferService] ||
!regions[0].services[preferService].length
) {
return Promise.reject(new Error('no endpoint available for the bucket', bucketName));
}
const endpoint = regions[0].services[preferService][0];
return putReq(
endpoint,
preferredScheme,
uploadToken,
key,
rsStream,
rsStreamLen,
putExtra
),
context
}));
);
});

handleReqCallback(result, callbackFunc);

Expand Down Expand Up @@ -838,38 +873,36 @@ ResumeUploader.prototype.putFile = function (
putExtra.fname = path.basename(localFile);
}

const akFromToken = util.getAKFromUptoken(uploadToken);
const bucketFromToken = util.getBucketFromUptoken(uploadToken);
const accessKey = util.getAKFromUptoken(uploadToken);
const bucketName = util.getBucketFromUptoken(uploadToken);

putExtra = getDefaultPutExtra(
putExtra,
{
accessKey: akFromToken,
bucketName: bucketFromToken,
key,
filePath: localFile
key
}
);

const result = _getRegionsRetrier.call(this, {
accessKey: akFromToken,
bucketName: bucketFromToken,
accessKey,
bucketName,
key,
filePath: localFile,

uploadApiVersion: putExtra.version,
resumeRecorder: putExtra.resumeRecorder,
resumeKey: putExtra.resumeKey
putExtra
})
.then(retrier => Promise.all([
retrier,
retrier.initContext()
]))
.then(([retrier, context]) => retrier.retry({
func: context => {
func: ctx => {
const rsStream = fs.createReadStream(localFile, {
highWaterMark: conf.BLOCK_SIZE
});
const rsStreamLen = fs.statSync(localFile).size;
const p = putReq(
context.endpoint,
ctx.endpoint,
preferredScheme,
uploadToken,
key,
Expand Down Expand Up @@ -914,10 +947,7 @@ ResumeUploader.prototype.putFileWithoutKey = function (
/**
* @param {PutExtra} putExtra
* @param {Object} options
* @param {string} [options.accessKey]
* @param {string} [options.bucketName]
* @param {string | null} [options.key]
* @param {string} [options.filePath]
* @returns {PutExtra}
*/
function getDefaultPutExtra (putExtra, options) {
Expand All @@ -943,30 +973,6 @@ function getDefaultPutExtra (putExtra, options) {
putExtra.resumeKey = parsedPath.name;
}

// generate `resumeKey` if not exists
if (
putExtra.resumeRecorder &&
!putExtra.resumeKey &&
options.filePath &&
options.accessKey &&
options.bucketName
) {
let fileLastModify;
try {
fileLastModify = options.filePath && fs.statSync(options.filePath).mtimeMs.toString();
} catch (_err) {
fileLastModify = '';
}
const recordValuesToHash = [
putExtra.version,
options.accessKey,
`${options.bucketName}:${options.key}`,
options.filePath,
fileLastModify
];
putExtra.resumeKey = putExtra.resumeRecorder.generateKey(recordValuesToHash);
}

return putExtra;
}

Expand Down Expand Up @@ -1001,9 +1007,9 @@ JsonFileRecorder.prototype.setSync = function (key, data) {
* @returns {undefined | Object.<string, any>}
*/
JsonFileRecorder.prototype.getSync = function (key) {
const filePath = path.join(this.baseDirPath, key);
let result;
try {
const filePath = path.join(this.baseDirPath, key);
const recordContent = fs.readFileSync(
filePath,
{
Expand All @@ -1018,24 +1024,71 @@ JsonFileRecorder.prototype.getSync = function (key) {
};

JsonFileRecorder.prototype.hasSync = function (key) {
const filePath = path.join(this.baseDirPath, key);
try {
const filePath = path.join(this.baseDirPath, key);
return fs.existsSync(filePath);
} catch (_err) {
return false;
}
};

JsonFileRecorder.prototype.deleteSync = function (key) {
const filePath = path.join(this.baseDirPath, key);
try {
const filePath = path.join(this.baseDirPath, key);
fs.unlinkSync(filePath);
} catch (_err) {
// pass
}
};

JsonFileRecorder.prototype.generateKey = function (fields) {
/**
* @param {Object} options
* @param {string[]} options.hosts
* @param {string} options.accessKey
* @param {string} options.bucketName
* @param {string} options.key
* @param {string} options.filePath
* @param {string} options.version
* @param {string} options.partSize
* @returns {string | undefined}
*/
JsonFileRecorder.prototype.generateKeySync = function (options) {
// if some options not pass in, can't generate a valid key
if (
[
Array.isArray(options.hosts),
options.accessKey,
options.bucketName,
options.key,
options.filePath,
options.version,
options.partSize
].some(v => !v)
) {
return;
}

let fileStats;
try {
fileStats = options.filePath && fs.statSync(options.filePath);
} catch (_err) {
return;
}

const fields = [
options.hosts.join(''),
options.accessKey,
options.bucketName,
options.key || '',
options.filePath,
fileStats ? fileStats.mtimeMs.toString() : '',
fileStats ? fileStats.size.toString() : '',
options.version, // the upload version
options.version === 'v1'
? conf.BLOCK_SIZE.toString()
: options.partSize.toString(),
'json.v1' // the record file format version
];
const h = crypto.createHash('sha1');
fields.forEach(v => {
h.update(v);
Expand Down
4 changes: 0 additions & 4 deletions qiniu/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,3 @@ exports.prepareZone = function (ctx, accessKey, bucket, callback) {
});
}
};

exports.writeOrCreateSync = function () {

};
Loading

0 comments on commit 3d75323

Please sign in to comment.