diff --git a/.aegir.js b/.aegir.js index 8bf94327..e060d1c9 100644 --- a/.aegir.js +++ b/.aegir.js @@ -1,4 +1,4 @@ module.exports = { - bundlesize: { maxSize: '197kB' } + bundlesize: { maxSize: '222kB' } } \ No newline at end of file diff --git a/package.json b/package.json index 5604797d..ce06bca9 100644 --- a/package.json +++ b/package.json @@ -54,6 +54,7 @@ "merge-options": "^1.0.1", "multihashes": "~0.4.14", "multihashing-async": "~0.5.2", + "p-queue": "^5.0.0", "peer-id": "~0.12.2", "peer-info": "~0.15.1", "priorityqueue": "~0.2.1", @@ -62,6 +63,7 @@ "protons": "^1.0.1", "pull-length-prefixed": "^1.3.2", "pull-stream": "^3.6.9", + "pull-stream-to-async-iterator": "^1.0.1", "varint": "^5.0.0", "xor-distance": "^2.0.0" }, diff --git a/src/index.js b/src/index.js index 7134dd11..d8f4025a 100644 --- a/src/index.js +++ b/src/index.js @@ -537,7 +537,8 @@ class KadDHT extends EventEmitter { const errors = [] waterfall([ - (cb) => this.providers.addProvider(key, this.peerInfo.id, cb), + // TODO: refactor this in method in async and remove this wrapper + (cb) => promiseToCallback(this.providers.addProvider(key, this.peerInfo.id))(err => cb(err)), (cb) => this.getClosestPeers(key.buffer, cb), (peers, cb) => { const msg = new Message(Message.TYPES.ADD_PROVIDER, key.buffer, 0) diff --git a/src/private.js b/src/private.js index 20e9bc87..4215a38e 100644 --- a/src/private.js +++ b/src/private.js @@ -517,7 +517,7 @@ module.exports = (dht) => ({ async _findNProvidersAsync (key, providerTimeout, n) { const out = new LimitedPeerList(n) - const provs = await promisify(cb => dht.providers.getProviders(key, cb))() + const provs = await dht.providers.getProviders(key) provs.forEach((id) => { let info diff --git a/src/providers.js b/src/providers.js index 85e79987..db31eef6 100644 --- a/src/providers.js +++ b/src/providers.js @@ -2,11 +2,11 @@ const cache = require('hashlru') const varint = require('varint') -const each = require('async/each') -const pull = require('pull-stream') -const CID = require('cids') const PeerId = require('peer-id') const Key = require('interface-datastore').Key +const Queue = require('p-queue') +const promisify = require('promisify-es6') +const toIterator = require('pull-stream-to-async-iterator') const c = require('./constants') const utils = require('./utils') @@ -56,6 +56,8 @@ class Providers { this.lruCacheSize = cacheSize || c.PROVIDERS_LRU_CACHE_SIZE this.providers = cache(this.lruCacheSize) + + this.syncQueue = new Queue({ concurrency: 1 }) } /** @@ -71,134 +73,89 @@ class Providers { } /** - * Check all providers if they are still valid, and if not - * delete them. + * Check all providers if they are still valid, and if not delete them. * - * @returns {undefined} + * @returns {Promise} * * @private */ _cleanup () { - this._getProviderCids((err, cids) => { - if (err) { - return this._log.error('Failed to get cids', err) - } - - each(cids, (cid, cb) => { - this._getProvidersMap(cid, (err, provs) => { - if (err) { - return cb(err) + return this.syncQueue.add(async () => { + this._log('start cleanup') + const start = Date.now() + + let count = 0 + let deleteCount = 0 + const deleted = new Map() + const batch = this.datastore.batch() + + // Get all provider entries from the datastore + const query = this.datastore.query({ prefix: c.PROVIDERS_KEY_PREFIX }) + for await (const entry of toIterator(query)) { + try { + // Add a delete to the batch for each expired entry + const { cid, peerId } = parseProviderKey(entry.key) + const time = readTime(entry.value) + const now = Date.now() + const delta = now - time + const expired = delta > this.provideValidity + this._log('comparing: %d - %d = %d > %d %s', + now, time, delta, this.provideValidity, expired ? '(expired)' : '') + if (expired) { + deleteCount++ + batch.delete(entry.key) + const peers = deleted.get(cid) || new Set() + peers.add(peerId) + deleted.set(cid, peers) } + count++ + } catch (err) { + this._log.error(err.message) + } + } + this._log('deleting %d / %d entries', deleteCount, count) - provs.forEach((time, provider) => { - this._log('comparing: %s - %s > %s', Date.now(), time, this.provideValidity) - if (Date.now() - time > this.provideValidity) { - provs.delete(provider) - } - }) + // Commit the deletes to the datastore + if (deleted.size) { + await promisify(cb => batch.commit(cb))() + } + // Clear expired entries from the cache + for (const [cid, peers] of deleted) { + const key = makeProviderKey(cid) + const provs = this.providers.get(key) + if (provs) { + for (const peerId of peers) { + provs.delete(peerId) + } if (provs.size === 0) { - return this._deleteProvidersMap(cid, cb) + this.providers.remove(key) + } else { + this.providers.set(key, provs) } - - cb() - }) - }, (err) => { - if (err) { - return this._log.error('Failed to cleanup', err) } + } - this._log('Cleanup successfull') - }) + this._log('Cleanup successful (%dms)', Date.now() - start) }) } /** - * Get a list of all cids that providers are known for. - * - * @param {function(Error, Array)} callback - * @returns {undefined} - * - * @private - */ - _getProviderCids (callback) { - pull( - this.datastore.query({ prefix: c.PROVIDERS_KEY_PREFIX }), - pull.map((entry) => { - const parts = entry.key.toString().split('/') - if (parts.length !== 4) { - this._log.error('incorrectly formatted provider entry in datastore: %s', entry.key) - return - } - - let decoded - try { - decoded = utils.decodeBase32(parts[2]) - } catch (err) { - this._log.error('error decoding base32 provider key: %s', parts[2]) - return - } - - let cid - try { - cid = new CID(decoded) - } catch (err) { - this._log.error('error converting key to cid from datastore: %s', err.message) - } - - return cid - }), - pull.filter(Boolean), - pull.collect(callback) - ) - } - - /** - * Get the currently known provider maps for a given CID. + * Get the currently known provider peer ids for a given CID. * * @param {CID} cid - * @param {function(Error, Map)} callback - * @returns {undefined} + * @returns {Promise>} * * @private */ - _getProvidersMap (cid, callback) { - const provs = this.providers.get(makeProviderKey(cid)) - + async _getProvidersMap (cid) { + const cacheKey = makeProviderKey(cid) + let provs = this.providers.get(cacheKey) if (!provs) { - return loadProviders(this.datastore, cid, callback) + provs = await loadProviders(this.datastore, cid) + this.providers.set(cacheKey, provs) } - - callback(null, provs) - } - - /** - * Completely remove a providers map entry for a given CID. - * - * @param {CID} cid - * @param {function(Error)} callback - * @returns {undefined} - * - * @private - */ - _deleteProvidersMap (cid, callback) { - const dsKey = makeProviderKey(cid) - this.providers.set(dsKey, null) - const batch = this.datastore.batch() - - pull( - this.datastore.query({ - keysOnly: true, - prefix: dsKey - }), - pull.through((entry) => batch.delete(entry.key)), - pull.onEnd((err) => { - if (err) { - return callback(err) - } - batch.commit(callback) - }) - ) + return provs } get cleanupInterval () { @@ -219,53 +176,40 @@ class Providers { } /** - * Add a new provider. + * Add a new provider for the given CID. * * @param {CID} cid * @param {PeerId} provider - * @param {function(Error)} callback - * @returns {undefined} + * @returns {Promise} */ - addProvider (cid, provider, callback) { - this._log('addProvider %s', cid.toBaseEncodedString()) - const dsKey = makeProviderKey(cid) - const provs = this.providers.get(dsKey) - - const next = (err, provs) => { - if (err) { - return callback(err) - } + async addProvider (cid, provider) { + return this.syncQueue.add(async () => { + this._log('addProvider %s', cid.toBaseEncodedString()) + const provs = await this._getProvidersMap(cid) this._log('loaded %s provs', provs.size) const now = Date.now() - provs.set(provider, now) + provs.set(utils.encodeBase32(provider.id), now) + const dsKey = makeProviderKey(cid) this.providers.set(dsKey, provs) - writeProviderEntry(this.datastore, cid, provider, now, callback) - } - - if (!provs) { - loadProviders(this.datastore, cid, next) - } else { - next(null, provs) - } + return writeProviderEntry(this.datastore, cid, provider, now) + }) } /** * Get a list of providers for the given CID. * * @param {CID} cid - * @param {function(Error, Array)} callback - * @returns {undefined} + * @returns {Promise>} */ - getProviders (cid, callback) { - this._log('getProviders %s', cid.toBaseEncodedString()) - this._getProvidersMap(cid, (err, provs) => { - if (err) { - return callback(err) - } - - callback(null, Array.from(provs.keys())) + async getProviders (cid) { + return this.syncQueue.add(async () => { + this._log('getProviders %s', cid.toBaseEncodedString()) + const provs = await this._getProvidersMap(cid) + return [...provs.keys()].map((base32PeerId) => { + return new PeerId(utils.decodeBase32(base32PeerId)) + }) }) } } @@ -273,13 +217,14 @@ class Providers { /** * Encode the given key its matching datastore key. * - * @param {CID} cid + * @param {CID|string} cid - cid or base32 encoded string * @returns {string} * * @private */ function makeProviderKey (cid) { - return c.PROVIDERS_KEY_PREFIX + utils.encodeBase32(cid.buffer) + cid = typeof cid === 'string' ? cid : utils.encodeBase32(cid.buffer) + return c.PROVIDERS_KEY_PREFIX + cid } /** @@ -289,48 +234,59 @@ function makeProviderKey (cid) { * @param {CID} cid * @param {PeerId} peer * @param {number} time - * @param {function(Error)} callback - * @returns {undefined} + * @returns {Promise} * * @private */ -function writeProviderEntry (store, cid, peer, time, callback) { +async function writeProviderEntry (store, cid, peer, time) { const dsKey = [ makeProviderKey(cid), '/', utils.encodeBase32(peer.id) ].join('') - store.put(new Key(dsKey), Buffer.from(varint.encode(time)), callback) + const key = new Key(dsKey) + const buffer = Buffer.from(varint.encode(time)) + return promisify(cb => store.put(key, buffer, cb))() +} + +/** + * Parse the CID and provider peer id from the key + * + * @param {DKey} key + * @returns {Object} object with peer id and cid + * + * @private + */ +function parseProviderKey (key) { + const parts = key.toString().split('/') + if (parts.length !== 4) { + throw new Error('incorrectly formatted provider entry key in datastore: ' + key) + } + + return { + cid: parts[2], + peerId: parts[3] + } } /** - * Load providers from the store. + * Load providers for the given CID from the store. * * @param {Datastore} store * @param {CID} cid - * @param {function(Error, Map)} callback - * @returns {undefined} + * @returns {Promise>} * * @private */ -function loadProviders (store, cid, callback) { - pull( - store.query({ prefix: makeProviderKey(cid) }), - pull.map((entry) => { - const parts = entry.key.toString().split('/') - const lastPart = parts[parts.length - 1] - const rawPeerId = utils.decodeBase32(lastPart) - return [new PeerId(rawPeerId), readTime(entry.value)] - }), - pull.collect((err, res) => { - if (err) { - return callback(err) - } - - return callback(null, new Map(res)) - }) - ) +async function loadProviders (store, cid) { + const providers = new Map() + const query = store.query({ prefix: makeProviderKey(cid) }) + for await (const entry of toIterator(query)) { + const { peerId } = parseProviderKey(entry.key) + providers.set(peerId, readTime(entry.value)) + } + return providers } function readTime (buf) { diff --git a/src/rpc/handlers/add-provider.js b/src/rpc/handlers/add-provider.js index 68cb0898..b90894e0 100644 --- a/src/rpc/handlers/add-provider.js +++ b/src/rpc/handlers/add-provider.js @@ -1,8 +1,10 @@ 'use strict' const CID = require('cids') -const utils = require('../../utils') const errcode = require('err-code') +const promiseToCallback = require('promise-to-callback') + +const utils = require('../../utils') module.exports = (dht) => { const log = utils.logger(dht.peerInfo.id, 'rpc:add-provider') @@ -11,7 +13,7 @@ module.exports = (dht) => { * * @param {PeerInfo} peer * @param {Message} msg - * @param {function(Error, Message)} callback + * @param {function(Error)} callback * @returns {undefined} */ return function addProvider (peer, msg, callback) { @@ -48,7 +50,7 @@ module.exports = (dht) => { if (!dht._isSelf(pi.id)) { foundProvider = true dht.peerBook.put(pi) - dht.providers.addProvider(cid, pi.id, callback) + promiseToCallback(dht.providers.addProvider(cid, pi.id))(err => callback(err)) } }) @@ -59,7 +61,7 @@ module.exports = (dht) => { // https://github.com/libp2p/js-libp2p-kad-dht/pull/127 // https://github.com/libp2p/js-libp2p-kad-dht/issues/128 if (!foundProvider) { - dht.providers.addProvider(cid, peer.id, callback) + promiseToCallback(dht.providers.addProvider(cid, peer.id))(err => callback(err)) } } } diff --git a/src/rpc/handlers/get-providers.js b/src/rpc/handlers/get-providers.js index 4d067c87..c90912c3 100644 --- a/src/rpc/handlers/get-providers.js +++ b/src/rpc/handlers/get-providers.js @@ -3,7 +3,7 @@ const CID = require('cids') const parallel = require('async/parallel') const PeerInfo = require('peer-info') - +const promiseToCallback = require('promise-to-callback') const errcode = require('err-code') const Message = require('../../message') @@ -41,7 +41,7 @@ module.exports = (dht) => { cb(null, exists) }), - (cb) => dht.providers.getProviders(cid, cb), + (cb) => promiseToCallback(dht.providers.getProviders(cid))(cb), (cb) => dht._betterPeersToQuery(msg, peer, cb) ], (err, res) => { if (err) { diff --git a/test/providers.spec.js b/test/providers.spec.js index 4f7324e6..327e6886 100644 --- a/test/providers.spec.js +++ b/test/providers.spec.js @@ -4,165 +4,145 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect +const promisify = require('promisify-es6') const Store = require('interface-datastore').MemoryDatastore -const parallel = require('async/parallel') -const waterfall = require('async/waterfall') const CID = require('cids') -const multihashing = require('multihashing-async') -const map = require('async/map') -const timesSeries = require('async/timesSeries') -const each = require('async/each') -const eachSeries = require('async/eachSeries') -const range = require('lodash.range') const LevelStore = require('datastore-level') const path = require('path') const os = require('os') +const multihashing = promisify(require('multihashing-async')) const Providers = require('../src/providers') -const createPeerInfo = require('./utils/create-peer-info') -const createValues = require('./utils/create-values') +const createPeerInfo = promisify(require('./utils/create-peer-info')) +const createValues = promisify(require('./utils/create-values')) describe('Providers', () => { let infos + let providers - before(function (done) { + before(async function () { this.timeout(10 * 1000) - createPeerInfo(3, (err, peers) => { - if (err) { - return done(err) - } + infos = await createPeerInfo(3) + }) - infos = peers - done() - }) + afterEach(() => { + providers && providers.stop() }) - it('simple add and get of providers', (done) => { - const providers = new Providers(new Store(), infos[2].id) + it('simple add and get of providers', async () => { + providers = new Providers(new Store(), infos[2].id) const cid = new CID('QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n') - parallel([ - (cb) => providers.addProvider(cid, infos[0].id, cb), - (cb) => providers.addProvider(cid, infos[1].id, cb) - ], (err) => { - expect(err).to.not.exist() - providers.getProviders(cid, (err, provs) => { - expect(err).to.not.exist() - expect(provs).to.be.eql([infos[0].id, infos[1].id]) - providers.stop() - - done() - }) - }) + await Promise.all([ + providers.addProvider(cid, infos[0].id), + providers.addProvider(cid, infos[1].id) + ]) + + const provs = await providers.getProviders(cid) + const ids = new Set(provs.map((peerId) => peerId.toB58String())) + expect(ids.has(infos[0].id.toB58String())).to.be.eql(true) + expect(ids.has(infos[1].id.toB58String())).to.be.eql(true) }) - it('more providers than space in the lru cache', (done) => { - const providers = new Providers(new Store(), infos[2].id, 10) - - waterfall([ - (cb) => map( - range(100), - (i, cb) => multihashing(Buffer.from(`hello ${i}`), 'sha2-256', cb), - cb - ), - (hashes, cb) => { - const cids = hashes.map((h) => new CID(h)) - - map(cids, (cid, cb) => { - providers.addProvider(cid, infos[0].id, cb) - }, (err) => cb(err, cids)) - }, - (cids, cb) => { - map(cids, (cid, cb) => { - providers.getProviders(cid, cb) - }, (err, provs) => { - expect(err).to.not.exist() - expect(provs).to.have.length(100) - provs.forEach((p) => { - expect(p[0].id).to.be.eql(infos[0].id.id) - }) - providers.stop() - cb() - }) - } - ], done) + it('duplicate add of provider is deduped', async () => { + providers = new Providers(new Store(), infos[2].id) + + const cid = new CID('QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n') + + await Promise.all([ + providers.addProvider(cid, infos[0].id), + providers.addProvider(cid, infos[0].id), + providers.addProvider(cid, infos[1].id), + providers.addProvider(cid, infos[1].id), + providers.addProvider(cid, infos[1].id) + ]) + + const provs = await providers.getProviders(cid) + expect(provs).to.have.length(2) + const ids = new Set(provs.map((peerId) => peerId.toB58String())) + expect(ids.has(infos[0].id.toB58String())).to.be.eql(true) + expect(ids.has(infos[1].id.toB58String())).to.be.eql(true) }) - it('expires', (done) => { - const providers = new Providers(new Store(), infos[2].id) + it('more providers than space in the lru cache', async () => { + providers = new Providers(new Store(), infos[2].id, 10) + + const hashes = await Promise.all([...new Array(100)].map((i) => { + return multihashing(Buffer.from(`hello ${i}`), 'sha2-256') + })) + + const cids = hashes.map((h) => new CID(h)) + + await Promise.all(cids.map(cid => providers.addProvider(cid, infos[0].id))) + const provs = await Promise.all(cids.map(cid => providers.getProviders(cid))) + + expect(provs).to.have.length(100) + for (const p of provs) { + expect(p[0].id).to.be.eql(infos[0].id.id) + } + }) + + it('expires', async () => { + providers = new Providers(new Store(), infos[2].id) providers.cleanupInterval = 100 providers.provideValidity = 200 const cid = new CID('QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n') - parallel([ - (cb) => providers.addProvider(cid, infos[0].id, cb), - (cb) => providers.addProvider(cid, infos[1].id, cb) - ], (err) => { - expect(err).to.not.exist() - - providers.getProviders(cid, (err, provs) => { - expect(err).to.not.exist() - expect(provs).to.have.length(2) - expect(provs[0].id).to.be.eql(infos[0].id.id) - expect(provs[1].id).to.be.eql(infos[1].id.id) - }) - - setTimeout(() => { - providers.getProviders(cid, (err, provs) => { - expect(err).to.not.exist() - expect(provs).to.have.length(0) - providers.stop() - done() - }) - // TODO: this is a timeout based check, make cleanup monitorable - }, 400) - }) + await Promise.all([ + providers.addProvider(cid, infos[0].id), + providers.addProvider(cid, infos[1].id) + ]) + + const provs = await providers.getProviders(cid) + + expect(provs).to.have.length(2) + expect(provs[0].id).to.be.eql(infos[0].id.id) + expect(provs[1].id).to.be.eql(infos[1].id.id) + + await new Promise(resolve => setTimeout(resolve, 400)) + + const provsAfter = await providers.getProviders(cid) + expect(provsAfter).to.have.length(0) }) // slooow so only run when you need to - it.skip('many', (done) => { + it.skip('many', async function () { const p = path.join( os.tmpdir(), (Math.random() * 100).toString() ) const store = new LevelStore(p) - const providers = new Providers(store, infos[2].id, 10) + providers = new Providers(store, infos[2].id, 10) console.log('starting') - waterfall([ - (cb) => parallel([ - (cb) => createValues(100, cb), - (cb) => createPeerInfo(600, cb) - ], cb), - (res, cb) => { - console.log('got values and peers') - const values = res[0] - const peers = res[1] - let total = Date.now() - eachSeries(values, (v, cb) => { - eachSeries(peers, (p, cb) => { - providers.addProvider(v.cid, p.id, cb) - }, cb) - }, (err) => { - console.log('addProvider %s peers %s cids in %sms', peers.length, values.length, Date.now() - total) - expect(err).to.not.exist() - console.log('starting profile with %s peers and %s cids', peers.length, values.length) - timesSeries(3, (i, cb) => { - const start = Date.now() - each(values, (v, cb) => { - providers.getProviders(v.cid, cb) - }, (err) => { - expect(err).to.not.exist() - console.log('query %sms', (Date.now() - start)) - cb() - }) - }, cb) - }) + const res = await Promise.all([ + createValues(100), + createPeerInfo(600) + ]) + + console.log('got values and peers') + const values = res[0] + const peers = res[1] + let total = Date.now() + + for (const v of values) { + for (const p of peers) { + await providers.addProvider(v.cid, p.id) } - ], (err) => { - expect(err).to.not.exist() - store.close(done) - }) + } + + console.log('addProvider %s peers %s cids in %sms', peers.length, values.length, Date.now() - total) + console.log('starting profile with %s peers and %s cids', peers.length, values.length) + + for (let i = 0; i < 3; i++) { + const start = Date.now() + for (const v of values) { + await providers.getProviders(v.cid) + console.log('query %sms', (Date.now() - start)) + } + } + + await store.close() }) }) diff --git a/test/rpc/handlers/add-provider.spec.js b/test/rpc/handlers/add-provider.spec.js index 05cecc51..1219265f 100644 --- a/test/rpc/handlers/add-provider.spec.js +++ b/test/rpc/handlers/add-provider.spec.js @@ -8,6 +8,7 @@ const expect = chai.expect const parallel = require('async/parallel') const waterfall = require('async/waterfall') const _ = require('lodash') +const promiseToCallback = require('promise-to-callback') const Message = require('../../../src/message') const handler = require('../../../src/rpc/handlers/add-provider') @@ -82,7 +83,7 @@ describe('rpc - handlers - AddProvider', () => { waterfall([ (cb) => handler(dht)(sender, msg, cb), - (cb) => dht.providers.getProviders(cid, cb), + (cb) => promiseToCallback(dht.providers.getProviders(cid))(cb), (provs, cb) => { expect(provs).to.have.length(1) expect(provs[0].id).to.eql(provider.id.id) @@ -106,7 +107,7 @@ describe('rpc - handlers - AddProvider', () => { waterfall([ (cb) => handler(dht)(sender, msg, cb), - (cb) => dht.providers.getProviders(cid, cb), + (cb) => promiseToCallback(dht.providers.getProviders(cid))(cb), (provs, cb) => { expect(dht.peerBook.has(provider.id)).to.equal(false) expect(provs).to.have.length(1) diff --git a/test/rpc/handlers/get-providers.spec.js b/test/rpc/handlers/get-providers.spec.js index f4db2cda..6fe0934b 100644 --- a/test/rpc/handlers/get-providers.spec.js +++ b/test/rpc/handlers/get-providers.spec.js @@ -6,6 +6,7 @@ chai.use(require('dirty-chai')) const expect = chai.expect const parallel = require('async/parallel') const waterfall = require('async/waterfall') +const promiseToCallback = require('promise-to-callback') const Message = require('../../../src/message') const utils = require('../../../src/utils') @@ -89,7 +90,7 @@ describe('rpc - handlers - GetProviders', () => { waterfall([ (cb) => dht._add(closer, cb), - (cb) => dht.providers.addProvider(v.cid, prov, cb), + (cb) => promiseToCallback(dht.providers.addProvider(v.cid, prov))(err => cb(err)), (cb) => handler(dht)(peers[0], msg, cb) ], (err, response) => { expect(err).to.not.exist()