Skip to content
This repository was archived by the owner on Mar 10, 2020. It is now read-only.

refactor: convert dht API to async/await #1156

Merged
merged 10 commits into from
Nov 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions src/dht/find-peer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
'use strict'

const PeerId = require('peer-id')
const PeerInfo = require('peer-info')
const multiaddr = require('multiaddr')
const ndjson = require('iterable-ndjson')
const configure = require('../lib/configure')
const toIterable = require('../lib/stream-to-iterable')

module.exports = configure(({ ky }) => {
return (peerId, options) => (async function * () {
options = options || {}

const searchParams = new URLSearchParams(options.searchParams)
searchParams.set('arg', `${peerId}`)
if (options.verbose != null) searchParams.set('verbose', options.verbose)

const res = await ky.get('dht/findpeer', {
timeout: options.timeout,
signal: options.signal,
headers: options.headers,
searchParams
})

for await (const message of ndjson(toIterable(res.body))) {
// 2 = FinalPeer
// https://github.com/libp2p/go-libp2p-core/blob/6e566d10f4a5447317a66d64c7459954b969bdab/routing/query.go#L18
if (message.Type === 2 && message.Responses) {
for (const { ID, Addrs } of message.Responses) {
const peerInfo = new PeerInfo(PeerId.createFromB58String(ID))
if (Addrs) Addrs.forEach(a => peerInfo.multiaddrs.add(multiaddr(a)))
yield peerInfo
}
}
}
})()
})
38 changes: 38 additions & 0 deletions src/dht/find-provs.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
'use strict'

const PeerId = require('peer-id')
const PeerInfo = require('peer-info')
const multiaddr = require('multiaddr')
const ndjson = require('iterable-ndjson')
const configure = require('../lib/configure')
const toIterable = require('../lib/stream-to-iterable')

module.exports = configure(({ ky }) => {
return (cid, options) => (async function * () {
options = options || {}

const searchParams = new URLSearchParams(options.searchParams)
searchParams.set('arg', `${cid}`)
if (options.numProviders) searchParams.set('num-providers', options.numProviders)
if (options.verbose != null) searchParams.set('verbose', options.verbose)

const res = await ky.get('dht/findprovs', {
timeout: options.timeout,
signal: options.signal,
headers: options.headers,
searchParams
})

for await (const message of ndjson(toIterable(res.body))) {
// 4 = Provider
// https://github.com/libp2p/go-libp2p-core/blob/6e566d10f4a5447317a66d64c7459954b969bdab/routing/query.go#L20
if (message.Type === 4 && message.Responses) {
for (const { ID, Addrs } of message.Responses) {
const peerInfo = new PeerInfo(PeerId.createFromB58String(ID))
if (Addrs) Addrs.forEach(a => peerInfo.multiaddrs.add(multiaddr(a)))
yield peerInfo
}
}
}
})()
})
63 changes: 0 additions & 63 deletions src/dht/findpeer.js

This file was deleted.

63 changes: 0 additions & 63 deletions src/dht/findprovs.js

This file was deleted.

62 changes: 22 additions & 40 deletions src/dht/get.js
Original file line number Diff line number Diff line change
@@ -1,48 +1,30 @@
'use strict'

const promisify = require('promisify-es6')
const ndjson = require('iterable-ndjson')
const configure = require('../lib/configure')
const toIterable = require('../lib/stream-to-iterable')

module.exports = (send) => {
return promisify((key, opts, callback) => {
if (typeof opts === 'function' && !callback) {
callback = opts
opts = {}
}

// opts is the real callback --
// 'callback' is being injected by promisify
if (typeof opts === 'function' && typeof callback === 'function') {
callback = opts
opts = {}
}
module.exports = configure(({ ky }) => {
return (key, options) => (async function * () {
options = options || {}

function handleResult (done, err, res) {
if (err) {
return done(err)
}
if (!res) {
return done(new Error('empty response'))
}
if (res.length === 0) {
return done(new Error('no value returned for key'))
}
const searchParams = new URLSearchParams(options.searchParams)
searchParams.set('arg', `${key}`)
if (options.verbose != null) searchParams.set('verbose', options.verbose)

// Inconsistent return values in the browser vs node
if (Array.isArray(res)) {
res = res[0]
}
const res = await ky.get('dht/get', {
timeout: options.timeout,
signal: options.signal,
headers: options.headers,
searchParams
})

if (res.Type === 5) {
done(null, res.Extra)
} else {
done(new Error('key was not found (type 6)'))
for await (const message of ndjson(toIterable(res.body))) {
// 5 = Value
// https://github.com/libp2p/go-libp2p-core/blob/6e566d10f4a5447317a66d64c7459954b969bdab/routing/query.go#L21
if (message.Type === 5) {
yield message.Extra
}
}

send({
path: 'dht/get',
args: key,
qs: opts
}, handleResult.bind(null, callback))
})
}
})()
})
31 changes: 22 additions & 9 deletions src/dht/index.js
Original file line number Diff line number Diff line change
@@ -1,17 +1,30 @@
'use strict'

const moduleConfig = require('../utils/module-config')
const callbackify = require('callbackify')
const errCode = require('err-code')
const { collectify } = require('../lib/converters')

module.exports = (arg) => {
const send = moduleConfig(arg)
module.exports = config => {
const get = require('./get')(config)
const findPeer = require('./find-peer')(config)

return {
get: require('./get')(send),
put: require('./put')(send),
findProvs: require('./findprovs')(send),
findPeer: require('./findpeer')(send),
provide: require('./provide')(send),
get: callbackify.variadic(async (key, options) => {
for await (const value of get(key, options)) {
return value
}
throw errCode(new Error('value not found'), 'ERR_TYPE_5_NOT_FOUND')
}),
put: callbackify.variadic(collectify(require('./put')(config))),
findProvs: callbackify.variadic(collectify(require('./find-provs')(config))),
findPeer: callbackify.variadic(async (peerId, options) => {
for await (const peerInfo of findPeer(peerId, options)) {
return peerInfo
}
throw errCode(new Error('final peer not found'), 'ERR_TYPE_2_NOT_FOUND')
}),
provide: callbackify.variadic(collectify(require('./provide')(config))),
// find closest peerId to given peerId
query: require('./query')(send)
query: callbackify.variadic(collectify(require('./query')(config)))
}
}
63 changes: 33 additions & 30 deletions src/dht/provide.js
Original file line number Diff line number Diff line change
@@ -1,37 +1,40 @@
'use strict'

const promisify = require('promisify-es6')
const CID = require('cids')
const PeerId = require('peer-id')
const PeerInfo = require('peer-info')
const multiaddr = require('multiaddr')
const ndjson = require('iterable-ndjson')
const configure = require('../lib/configure')
const toIterable = require('../lib/stream-to-iterable')
const toCamel = require('../lib/object-to-camel')

module.exports = (send) => {
return promisify((cids, opts, callback) => {
if (typeof opts === 'function' && !callback) {
callback = opts
opts = {}
}
module.exports = configure(({ ky }) => {
return (cids, options) => (async function * () {
cids = Array.isArray(cids) ? cids : [cids]
options = options || {}

// opts is the real callback --
// 'callback' is being injected by promisify
if (typeof opts === 'function' && typeof callback === 'function') {
callback = opts
opts = {}
}
const searchParams = new URLSearchParams(options.searchParams)
cids.forEach(cid => searchParams.append('arg', `${cid}`))
if (options.recursive != null) searchParams.set('recursive', options.recursive)
if (options.verbose != null) searchParams.set('verbose', options.verbose)

if (!Array.isArray(cids)) {
cids = [cids]
}
const res = await ky.get('dht/provide', {
timeout: options.timeout,
signal: options.signal,
headers: options.headers,
searchParams
})

// Validate CID(s) and serialize
try {
cids = cids.map(cid => new CID(cid).toBaseEncodedString('base58btc'))
} catch (err) {
return callback(err)
for await (let message of ndjson(toIterable(res.body))) {
message = toCamel(message)
if (message.responses) {
message.responses = message.responses.map(({ ID, Addrs }) => {
const peerInfo = new PeerInfo(PeerId.createFromB58String(ID))
if (Addrs) Addrs.forEach(a => peerInfo.multiaddrs.add(multiaddr(a)))
return peerInfo
})
}
yield message
}

send({
path: 'dht/provide',
args: cids,
qs: opts
}, callback)
})
}
})()
})
Loading