diff --git a/qiniu/storage/resume.js b/qiniu/storage/resume.js index efa8003..1d50f33 100644 --- a/qiniu/storage/resume.js +++ b/qiniu/storage/resume.js @@ -17,6 +17,7 @@ const rpc = require('../rpc'); const { SERVICE_NAME } = require('../httpc/region'); const { ResponseWrapper } = require('../httpc/responseWrapper'); const { Endpoint } = require('../httpc/endpoint'); +const { StaticRegionsProvider } = require('../httpc/regionsProvider'); const { EndpointsRetryPolicy } = require('../httpc/endpointsRetryPolicy'); const { RegionsRetryPolicy } = require('../httpc/regionsRetryPolicy'); const { Retrier } = require('../retry'); @@ -86,6 +87,7 @@ function PutExtra ( this.fname = fname || ''; this.params = params || {}; this.mimeType = mimeType || null; + // @deprecated use resumeRecorder and resumeKey instead this.resumeRecordFile = resumeRecordFile || null; this.progressCallback = progressCallback || null; this.partSize = partSize || conf.BLOCK_SIZE; @@ -100,56 +102,83 @@ function PutExtra ( * @param {Object} options * @param {string} options.accessKey * @param {string} options.bucketName - * @param {boolean} [options.retryable] - * @param {'v1' | 'v2' | string} [options.uploadApiVersion] - * @param {JsonFileRecorder} [options.resumeRecorder] - * @param {string} [options.resumeKey] + * @param {string} [options.key] + * @param {string} [options.filePath] + * @param {PutExtra} options.putExtra + * + * @returns Retrier */ function _getRegionsRetrier (options) { const { - bucketName, accessKey, - retryable = true, + bucketName, + key, + filePath, - uploadApiVersion, - resumeRecorder, - resumeKey + putExtra } = options; const preferredScheme = this.config.useHttpsDomain ? 'https' : 'http'; - let preferredEndpoints; - const isResumeAvailable = Boolean(resumeRecorder && resumeKey); - if (isResumeAvailable) { - const resumeInfo = resumeRecorder.getSync(resumeKey); - if (resumeInfo && Array.isArray(resumeInfo.upDomains)) { - preferredEndpoints = resumeInfo.upDomains.map(d => - new Endpoint(d, { defaultScheme: preferredScheme })); - } + + let regionsProviderPromise = this.config.getRegionsProvider({ + accessKey, + bucketName + }); + + // generate resume key, if there is a recorder but not resume key + if (putExtra.resumeRecorder && !putExtra.resumeKey) { + regionsProviderPromise = regionsProviderPromise + .then(regionsProvider => regionsProvider.getRegions()) + .then(regions => { + if (!regions || !regions.length) { + return Promise.reject(new Error(`no region available for the bucket "${bucketName}"`)); + } + const upAccEndpoints = regions[0].services[SERVICE_NAME.UP_ACC] || []; + const upEndpoints = regions[0].services[SERVICE_NAME.UP] || []; + const upHosts = upAccEndpoints.concat(upEndpoints).map(e => e.host); + putExtra.resumeKey = putExtra.resumeRecorder.generateKeySync({ + hosts: upHosts, + accessKey: accessKey, + bucketName: bucketName, + key: key, + filePath: filePath, + version: putExtra.version, + partSize: putExtra.partSize + }); + return new StaticRegionsProvider(regions); + }); } - return this.config.getRegionsProvider({ - bucketName, - accessKey - }) + return regionsProviderPromise .then(regionsProvider => { + // handle preferred endpoints + let preferredEndpoints; + if (putExtra.resumeRecorder && putExtra.resumeKey) { + const resumeInfo = putExtra.resumeRecorder.getSync(putExtra.resumeKey); + if (resumeInfo && Array.isArray(resumeInfo.upDomains)) { + preferredEndpoints = resumeInfo.upDomains.map(d => + new Endpoint(d, { defaultScheme: preferredScheme })); + } + } + const serviceNames = this.config.accelerateUploading ? [SERVICE_NAME.UP_ACC, SERVICE_NAME.UP] : [SERVICE_NAME.UP]; const retryPolicies = [ new AccUnavailableRetryPolicy(), new TokenExpiredRetryPolicy({ - uploadApiVersion, + uploadApiVersion: putExtra.version, recordExistsHandler: () => { - if (!isResumeAvailable) { + if (!putExtra.resumeRecorder || !putExtra.resumeKey) { return; } - resumeRecorder.hasSync(resumeKey); + putExtra.resumeRecorder.hasSync(putExtra.resumeKey); }, recordDeleteHandler: () => { - if (!isResumeAvailable) { + if (!putExtra.resumeRecorder || !putExtra.resumeKey) { return; } - resumeRecorder.deleteSync(resumeKey); + putExtra.resumeRecorder.deleteSync(putExtra.resumeKey); } }), new EndpointsRetryPolicy({ @@ -159,10 +188,10 @@ function _getRegionsRetrier (options) { regionsProvider, serviceNames, onChangedRegion: () => { - if (!isResumeAvailable) { + if (!putExtra.resumeRecorder || !putExtra.resumeKey) { return; } - resumeRecorder.deleteSync(resumeKey); + putExtra.resumeRecorder.deleteSync(putExtra.resumeKey); }, preferredEndpoints }) @@ -175,12 +204,12 @@ function _getRegionsRetrier (options) { if (context.error.noNeedRetry) { return false; } - return retryable; + return true; } if (policy instanceof AccUnavailableRetryPolicy) { return true; } - return retryable && context.result && context.result.needRetry(); + return context.result && context.result.needRetry(); } }); }); @@ -219,33 +248,39 @@ ResumeUploader.prototype.putStream = function ( } ); - // Why need retrier even if retryable is false? - // Because the retrier is used to get the endpoints, - // which will be initialed by region policy. - const result = _getRegionsRetrier.call(this, { - bucketName: util.getBucketFromUptoken(uploadToken), - accessKey: util.getAKFromUptoken(uploadToken), - retryable: false + const bucketName = util.getBucketFromUptoken(uploadToken); + const accessKey = util.getAKFromUptoken(uploadToken); - // useless by not retryable - // uploadApiVersion: putExtra.version, + const result = this.config.getRegionsProvider({ + bucketName, + accessKey }) - .then(retrier => Promise.all([ - retrier, - retrier.initContext() - ])) - .then(([retrier, context]) => retrier.retry({ - func: context => putReq( - context.endpoint, + .then(regionsProvider => regionsProvider.getRegions()) + .then(regions => { + if (!regions || !regions.length) { + return Promise.reject(new Error('no region available for the bucket', bucketName)); + } + const preferService = this.config.accelerateUploading + ? SERVICE_NAME.UP_ACC + : SERVICE_NAME.UP; + if ( + !regions[0].services || + !regions[0].services[preferService] || + !regions[0].services[preferService].length + ) { + return Promise.reject(new Error('no endpoint available for the bucket', bucketName)); + } + const endpoint = regions[0].services[preferService][0]; + return putReq( + endpoint, preferredScheme, uploadToken, key, rsStream, rsStreamLen, putExtra - ), - context - })); + ); + }); handleReqCallback(result, callbackFunc); @@ -838,38 +873,36 @@ ResumeUploader.prototype.putFile = function ( putExtra.fname = path.basename(localFile); } - const akFromToken = util.getAKFromUptoken(uploadToken); - const bucketFromToken = util.getBucketFromUptoken(uploadToken); + const accessKey = util.getAKFromUptoken(uploadToken); + const bucketName = util.getBucketFromUptoken(uploadToken); + putExtra = getDefaultPutExtra( putExtra, { - accessKey: akFromToken, - bucketName: bucketFromToken, - key, - filePath: localFile + key } ); const result = _getRegionsRetrier.call(this, { - accessKey: akFromToken, - bucketName: bucketFromToken, + accessKey, + bucketName, + key, + filePath: localFile, - uploadApiVersion: putExtra.version, - resumeRecorder: putExtra.resumeRecorder, - resumeKey: putExtra.resumeKey + putExtra }) .then(retrier => Promise.all([ retrier, retrier.initContext() ])) .then(([retrier, context]) => retrier.retry({ - func: context => { + func: ctx => { const rsStream = fs.createReadStream(localFile, { highWaterMark: conf.BLOCK_SIZE }); const rsStreamLen = fs.statSync(localFile).size; const p = putReq( - context.endpoint, + ctx.endpoint, preferredScheme, uploadToken, key, @@ -914,10 +947,7 @@ ResumeUploader.prototype.putFileWithoutKey = function ( /** * @param {PutExtra} putExtra * @param {Object} options - * @param {string} [options.accessKey] - * @param {string} [options.bucketName] * @param {string | null} [options.key] - * @param {string} [options.filePath] * @returns {PutExtra} */ function getDefaultPutExtra (putExtra, options) { @@ -943,30 +973,6 @@ function getDefaultPutExtra (putExtra, options) { putExtra.resumeKey = parsedPath.name; } - // generate `resumeKey` if not exists - if ( - putExtra.resumeRecorder && - !putExtra.resumeKey && - options.filePath && - options.accessKey && - options.bucketName - ) { - let fileLastModify; - try { - fileLastModify = options.filePath && fs.statSync(options.filePath).mtimeMs.toString(); - } catch (_err) { - fileLastModify = ''; - } - const recordValuesToHash = [ - putExtra.version, - options.accessKey, - `${options.bucketName}:${options.key}`, - options.filePath, - fileLastModify - ]; - putExtra.resumeKey = putExtra.resumeRecorder.generateKey(recordValuesToHash); - } - return putExtra; } @@ -1001,9 +1007,9 @@ JsonFileRecorder.prototype.setSync = function (key, data) { * @returns {undefined | Object.} */ JsonFileRecorder.prototype.getSync = function (key) { - const filePath = path.join(this.baseDirPath, key); let result; try { + const filePath = path.join(this.baseDirPath, key); const recordContent = fs.readFileSync( filePath, { @@ -1018,8 +1024,8 @@ JsonFileRecorder.prototype.getSync = function (key) { }; JsonFileRecorder.prototype.hasSync = function (key) { - const filePath = path.join(this.baseDirPath, key); try { + const filePath = path.join(this.baseDirPath, key); return fs.existsSync(filePath); } catch (_err) { return false; @@ -1027,15 +1033,62 @@ JsonFileRecorder.prototype.hasSync = function (key) { }; JsonFileRecorder.prototype.deleteSync = function (key) { - const filePath = path.join(this.baseDirPath, key); try { + const filePath = path.join(this.baseDirPath, key); fs.unlinkSync(filePath); } catch (_err) { // pass } }; -JsonFileRecorder.prototype.generateKey = function (fields) { +/** + * @param {Object} options + * @param {string[]} options.hosts + * @param {string} options.accessKey + * @param {string} options.bucketName + * @param {string} options.key + * @param {string} options.filePath + * @param {string} options.version + * @param {string} options.partSize + * @returns {string | undefined} + */ +JsonFileRecorder.prototype.generateKeySync = function (options) { + // if some options not pass in, can't generate a valid key + if ( + [ + Array.isArray(options.hosts), + options.accessKey, + options.bucketName, + options.key, + options.filePath, + options.version, + options.partSize + ].some(v => !v) + ) { + return; + } + + let fileStats; + try { + fileStats = options.filePath && fs.statSync(options.filePath); + } catch (_err) { + return; + } + + const fields = [ + options.hosts.join(''), + options.accessKey, + options.bucketName, + options.key || '', + options.filePath, + fileStats ? fileStats.mtimeMs.toString() : '', + fileStats ? fileStats.size.toString() : '', + options.version, // the upload version + options.version === 'v1' + ? conf.BLOCK_SIZE.toString() + : options.partSize.toString(), + 'json.v1' // the record file format version + ]; const h = crypto.createHash('sha1'); fields.forEach(v => { h.update(v); diff --git a/qiniu/util.js b/qiniu/util.js index 0509cee..c1678b8 100644 --- a/qiniu/util.js +++ b/qiniu/util.js @@ -380,7 +380,3 @@ exports.prepareZone = function (ctx, accessKey, bucket, callback) { }); } }; - -exports.writeOrCreateSync = function () { - -}; diff --git a/test/resume_up.test.js b/test/resume_up.test.js index 0e4b53c..bf02c5a 100644 --- a/test/resume_up.test.js +++ b/test/resume_up.test.js @@ -467,6 +467,7 @@ describe('test resume up', function () { ); }); + // testResumeParams.length = 1; testResumeParams.forEach(testParam => { const { version, @@ -509,21 +510,39 @@ describe('test resume up', function () { putExtra.partSize = partSize; } - const filepath = path.join(os.tmpdir(), key); - const result = createRandomFile(filepath, fileSizeMB * (1 << 20)) + let upHosts = []; + const filePath = path.join(os.tmpdir(), key); + const result = createRandomFile(filePath, fileSizeMB * (1 << 20)) + // mock file .then(() => { // add to auto clean file - filepathListToDelete.push(filepath); + filepathListToDelete.push(filePath); filepathListToDelete.push(putExtra.resumeRecordFile); // upload and abort putExtra.progressCallback = (_uploaded, _total) => { throw new Error('mocked error'); }; + }) + // get up hosts for generating resume key later + .then(() => resumeUploader.config.getRegionsProvider({ + accessKey: accessKey, + bucketName: bucketName + })) + .then(regionsProvider => regionsProvider.getRegions()) + .then(regions => { + const serviceName = resumeUploader.config.accelerateUploading + ? SERVICE_NAME.UP_ACC + : SERVICE_NAME.UP; + upHosts = regions[0].services[serviceName].map(e => e.host); + }) + // get up hosts end + // mock upload failed + .then(() => { return resumeUploader.putFile( uploadToken, key, - filepath, + filePath, putExtra ) .catch(err => { @@ -532,8 +551,8 @@ describe('test resume up', function () { } }); }) + // try to upload from resume point .then(() => { - // try to upload from resume point const couldResume = Boolean(putExtra.resumeRecordFile || putExtra.resumeRecorder); let isFirstPart = true; putExtra.progressCallback = (uploaded, _total) => { @@ -545,7 +564,6 @@ describe('test resume up', function () { throw new Error('should resume'); } if (!couldResume && uploaded / partSize > 1) { - console.log('lihs debug:', { couldResume, uploaded }); throw new Error('should not resume'); } }; @@ -553,7 +571,7 @@ describe('test resume up', function () { resumeUploader.putFile( uploadToken, key, - filepath, + filePath, putExtra, callback ) @@ -574,20 +592,15 @@ describe('test resume up', function () { )); } else { should.exist(putExtra.resumeRecorder); - let fileLastModify; - try { - fileLastModify = filepath && fs.statSync(filepath).mtimeMs.toString(); - } catch (_err) { - fileLastModify = ''; - } - const recordValuesToHash = [ - putExtra.version, + const expectResumeKey = putExtra.resumeRecorder.generateKeySync({ + hosts: upHosts, accessKey, - `${bucketName}:${key}`, - filepath, - fileLastModify - ]; - const expectResumeKey = putExtra.resumeRecorder.generateKey(recordValuesToHash); + bucketName, + key, + filePath, + version: version || 'v1', + partSize: partSize || qiniu.conf.BLOCK_SIZE + }); should.ok(!fs.existsSync( path.join( resumeRecorderOption.baseDirPath,