Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

fix: random walk #104

Merged
merged 9 commits into from
Apr 22, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
},
"homepage": "https://github.com/libp2p/js-libp2p-kad-dht",
"dependencies": {
"abort-controller": "^3.0.0",
"async": "^2.6.2",
"base32.js": "~0.1.0",
"chai-checkmark": "^1.0.1",
Expand Down
7 changes: 3 additions & 4 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,10 @@ class KadDHT extends EventEmitter {
*/
stop (callback) {
this._running = false
this.randomWalk.stop(() => { // guarantee that random walk is stopped if it was started
this.providers.stop()
this.network.stop(callback)
})
this.randomWalk.stop()
this.providers.stop()
this._queryManager.stop()
this.network.stop(callback)
}

/**
Expand Down
121 changes: 63 additions & 58 deletions src/random-walk.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
const times = require('async/times')
const crypto = require('libp2p-crypto')
const waterfall = require('async/waterfall')
const timeout = require('async/timeout')
const multihashing = require('multihashing-async')
const PeerId = require('peer-id')
const assert = require('assert')
const c = require('./constants')
const { logger } = require('./utils')
const AbortController = require('abort-controller')

const errcode = require('err-code')

Expand All @@ -25,9 +25,8 @@ class RandomWalk {
* @param {DHT} options.dht
*/
constructor (dht, options) {
this._options = { ...c.defaultRandomWalk, ...options }
assert(dht, 'Random Walk needs an instance of the Kademlia DHT')
this._runningHandle = null
this._options = { ...c.defaultRandomWalk, ...options }
this._kadDHT = dht
this.log = logger(dht.peerInfo.id, 'random-walk')
}
Expand All @@ -41,64 +40,46 @@ class RandomWalk {
*/
start () {
// Don't run twice
if (this._running || !this._options.enabled) { return }

// Create running handle
const runningHandle = {
_onCancel: null,
_timeoutId: null,
runPeriodically: (walk, period) => {
runningHandle._timeoutId = setTimeout(() => {
runningHandle._timeoutId = null

walk((nextPeriod) => {
// Was walk cancelled while fn was being called?
if (runningHandle._onCancel) {
return runningHandle._onCancel()
}
// Schedule next
runningHandle.runPeriodically(walk, nextPeriod)
})
}, period)
},
cancel: (cb) => {
// Not currently running, can callback immediately
if (runningHandle._timeoutId) {
clearTimeout(runningHandle._timeoutId)
return cb()
}
// Wait to finish and then call callback
runningHandle._onCancel = cb
}
}
if (this._timeoutId || !this._options.enabled) { return }

// Start doing random walks after `this._options.delay`
runningHandle._timeoutId = setTimeout(() => {
this._timeoutId = setTimeout(() => {
// Start runner immediately
runningHandle.runPeriodically((done) => {
this._runPeriodically((done) => {
// Each subsequent walk should run on a `this._options.interval` interval
this._walk(this._options.queriesPerPeriod, this._options.timeout, () => done(this._options.interval))
}, 0)
}, this._options.delay)

this._runningHandle = runningHandle
}

/**
* Stop the random-walk process.
* @param {function(Error)} callback
* Stop the random-walk process. Any active
* queries will be aborted.
*
* @returns {void}
*/
stop (callback) {
const runningHandle = this._runningHandle
stop () {
clearTimeout(this._timeoutId)
this._timeoutId = null
this._controller && this._controller.abort()
}

if (!runningHandle) {
return callback()
}
/**
* Run function `walk` on every `interval` ms
* @param {function(callback)} walk The function to execute on `interval`
* @param {number} interval The interval to run on in ms
*
* @private
*/
_runPeriodically (walk, interval) {
this._timeoutId = setTimeout(() => {
this._timeoutId = null
dirkmc marked this conversation as resolved.
Show resolved Hide resolved

this._runningHandle = null
runningHandle.cancel(callback)
walk((nextInterval) => {
// Schedule next
this._runPeriodically(walk, nextInterval)
})
}, interval)
}

/**
Expand All @@ -113,39 +94,63 @@ class RandomWalk {
*/
_walk (queries, walkTimeout, callback) {
this.log('start')
this._controller = new AbortController()

times(queries, (i, cb) => {
times(queries, (i, next) => {
this.log('running query %d', i)

// Perform the walk
waterfall([
(cb) => this._randomPeerId(cb),
(id, cb) => timeout((cb) => {
this._query(id, cb)
}, walkTimeout)(cb)
(id, cb) => {
// Check if we've happened to already abort
if (!this._controller) return cb()

this._query(id, {
timeout: walkTimeout,
signal: this._controller.signal
}, cb)
}
], (err) => {
if (err) {
this.log.error('query finished with error', err)
return callback(err)
if (err && err.code !== 'ETIMEDOUT') {
this.log.error('query %d finished with error', i, err)
return next(err)
}

this.log('done')
callback(null)
this.log('finished query %d', i)
next(null)
})
}, (err) => {
this._controller = null
this.log('finished queries')
callback(err)
})
}

/**
* The query run during a random walk request.
*
* TODO: While query currently supports an abort controller, it is not
* yet supported by `DHT.findPeer`. Once https://github.com/libp2p/js-libp2p-kad-dht/pull/82
* is complete, and AbortController support has been added to the
* DHT query functions, the abort here will just work, provided the
* functions support `options.signal`. Once done, this todo should be
* removed.
*
* @param {PeerId} id
* @param {object} options
* @param {number} options.timeout
jacobheun marked this conversation as resolved.
Show resolved Hide resolved
* @param {AbortControllerSignal} options.signal
* @param {function(Error)} callback
* @returns {void}
*
* @private
*/
_query (id, callback) {
_query (id, options, callback) {
this.log('query:%s', id.toB58String())

this._kadDHT.findPeer(id, (err, peer) => {
if (err.code === 'ERR_NOT_FOUND') {
this._kadDHT.findPeer(id, options, (err, peer) => {
if (err && err.code === 'ERR_NOT_FOUND') {
// expected case, we asked for random stuff after all
return callback()
}
Expand Down
4 changes: 2 additions & 2 deletions test/kad-dht.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ function connect (a, b, callback) {

function bootstrap (dhts) {
dhts.forEach((dht) => {
dht.randomWalk._walk(1, 1000, () => {})
dht.randomWalk._walk(1, 10000, () => {})
Copy link
Contributor Author

@jacobheun jacobheun Apr 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that timeout is actually being passed to findPeer I bumped this to 10seconds so the random walk doesn't get killed after 1s. This was causing test failures because routing tables weren't doing enough connecting when it was depending on the walk.

})
}

Expand Down Expand Up @@ -574,7 +574,7 @@ describe('KadDHT', () => {
})

it('random-walk', function (done) {
this.timeout(10 * 1000)
this.timeout(20 * 1000)

const nDHTs = 20
const tdht = new TestDHT()
Expand Down
Loading