Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions examples/search/client.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
const DHT = require('hyperdht')
const vocabulary = require('./vocabulary')
const { SimHash } = require('simhash-vocabulary')
const Hyperdrive = require('hyperdrive')
const Hyperswarm = require('hyperswarm')
const Corestore = require('corestore')
const { spawn } = require('bare-subprocess')
const fs = require('bare-fs')
const process = require('bare-process')
const Hyperbee = require('hyperbee')

const beeKey = process.argv[2]
const searchTokens = process.argv.slice(3)

async function main() {
const node = new DHT({
ephemeral: true,
host: '127.0.0.1',
bootstrap: [{ host: '127.0.0.1', port: 49739 }],
simhash: new SimHash(vocabulary)
})
const swarm = new Hyperswarm()
const store = new Corestore('./client')
const bee = new Hyperbee(store.get(beeKey), {
keyEncoding: 'utf-8',
valueEncoding: 'json'
})
await bee.ready()

swarm.on('connection', (conn) => {
console.log('connection')
store.replicate(conn)
})

const discovery = swarm.join(bee.discoveryKey, { client: true, server: false })
await discovery.flushed()

await new Promise((res) => setTimeout(res, 2000))

const res = await node.search(['gif', ...searchTokens])

for (const r of res) {
const file = await bee.get(r.values[0].toString('hex'))
console.log('Found', file.value.path, ', distance:', r.distance)
}

if (res.length > 0) {
const { value: file } = await bee.get(res[0].values[0].toString('hex'))

const drive = new Hyperdrive(store, Buffer.from(file.key, 'hex'))
await drive.ready()
const discovery = swarm.join(drive.discoveryKey, { client: true, server: false })
await discovery.flushed()

const fileData = await drive.get(file.path)
fs.writeFileSync(`search-results/${file.path}`, fileData)

spawn('open', [`search-results/${file.path}`])
}
}

main()
5 changes: 5 additions & 0 deletions examples/search/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"dependencies": {
"searchable-record-cache": "^0.0.6"
}
}
82 changes: 82 additions & 0 deletions examples/search/server.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
const { SimHash } = require('simhash-vocabulary')
const createTestnet = require('hyperdht/testnet')
const path = require('bare-path')
const { randomBytes } = require('bare-crypto')
const MirrorDrive = require('mirror-drive')
const Localdrive = require('localdrive')
const Hyperdrive = require('hyperdrive')
const Corestore = require('corestore')
const Hyperswarm = require('hyperswarm')
const { Transform } = require('streamx')
const vocabulary = require('./vocabulary')
const Hyperbee = require('hyperbee')

async function main() {
const testnet = await createTestnet(10, {
port: 49739
})

for (const n of testnet) {
n._simhash = new SimHash(vocabulary)
}

function pushDHT(file) {
return new Transform({
async transform(chunk, cb) {
const tokens = path.basename(file).replace(/\..+$/, '').split('_').filter(Boolean)

const key = randomBytes(32)
await testnet.nodes[0].searchableRecordPut(['gif', ...tokens], key)
await bee.put(key.toString('hex'), {
path: file,
key: dst.key.toString('hex')
})
this.push(chunk)
cb(null)
}
})
}

const swarm = new Hyperswarm()
const store = new Corestore('./server')
const bee = new Hyperbee(store.get({ name: 'lookup' }), {
keyEncoding: 'utf-8',
valueEncoding: 'json'
})
await bee.ready()

swarm.on('connection', (conn) => {
console.log('connection')
store.replicate(conn)
})
const src = new Localdrive('./images')
const dst = new Hyperdrive(store)

const mirror = new MirrorDrive(src, dst, {
transformers: [
(file) => {
return pushDHT(file)
}
]
})

await mirror.done()

{
const discovery = swarm.join(bee.discoveryKey)
await discovery.flushed()
}

{
const discovery = swarm.join(dst.discoveryKey)
await discovery.flushed()
}

for await (const file of dst.list('.')) {
console.log('list', file) // => { key, value }
}

console.log('Serving DHT on', bee.key.toString('hex'), testnet.nodes[0].port)
}

main()
76 changes: 76 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const connect = require('./lib/connect')
const { FIREWALL, BOOTSTRAP_NODES, KNOWN_NODES, COMMANDS } = require('./lib/constants')
const { hash, createKeyPair } = require('./lib/crypto')
const { decode } = require('hypercore-id-encoding')
const { hammingDistance } = require('searchable-record-cache')
const RawStreamSet = require('./lib/raw-stream-set')
const ConnectionPool = require('./lib/connection-pool')
const { STREAM_NOT_CONNECTED } = require('./lib/errors')
Expand Down Expand Up @@ -52,6 +53,7 @@ class HyperDHT extends DHT {
this._randomPunchInterval = opts.randomPunchInterval || 20000 // min 20s between random punches...
this._randomPunches = 0
this._randomPunchLimit = 1 // set to one for extra safety for now
this._simhash = opts.simhash

this.once('persistent', () => {
this._persistent = new Persistent(this, persistent)
Expand Down Expand Up @@ -184,6 +186,68 @@ class HyperDHT extends DHT {
return this.query({ target, command: COMMANDS.LOOKUP, value: null }, opts)
}

async searchableRecordPut(tokens, value, opts = {}) {
if (!this._simhash) return

const target = this._simhash.hash(tokens)
const query = this.query({ target, command: COMMANDS.SEARCH, value: null }, opts)
await query.finished()

for (const closest of query.closestReplies) {
await this.request(
{
target,
command: COMMANDS.SEARCHABLE_RECORD_PUT,
value: c.encode(m.searchableRecord, { value, key: target })
},
closest.from
)
}

return target
}

async search(tokens, opts = {}) {
if (!this._simhash) return

const target = this._simhash.hash(tokens)

const query = this.query(
{
target,
command: COMMANDS.SEARCH,
value: c.encode(m.searchOptions, {
closest: opts.closest || 5,
values: opts.values || 5
})
},
opts
)

const results = []
const seen = new Set()

for await (const reply of query) {
if (reply.value) {
const res = c.decode(m.searchResponse, reply.value)

for (const r of res) {
const key = r.key.toString('hex')
if (seen.has(key)) continue

const distance = hammingDistance(r.key, target)
seen.add(key)
results.push({ ...r, distance, from: reply.from })
}
}
}

results.sort((a, b) => a.distance - b.distance)
while (results.length > 5) results.pop()

return results
}

lookupAndUnannounce(target, keyPair, opts = {}) {
const unannounces = []
const dht = this
Expand Down Expand Up @@ -426,6 +490,14 @@ class HyperDHT extends DHT {
this._persistent.onimmutableget(req)
return true
}
case COMMANDS.SEARCH: {
this._persistent.onsearch(req)
return true
}
case COMMANDS.SEARCHABLE_RECORD_PUT: {
this._persistent.onsearchablerecordput(req)
return true
}
}

return false
Expand Down Expand Up @@ -604,6 +676,10 @@ function defaultCacheOpts(opts) {
maxSize: (maxSize / 2) | 0,
maxAge: opts.maxAge || 48 * 60 * 60 * 1000 // 48 hours
},
searchableRecords: {
maxSize: (maxSize / 2) | 0,
maxAge: opts.maxAge || 48 * 60 * 60 * 1000 // 48 hours
},
bumps: { maxSize, maxAge }
}
}
Expand Down
4 changes: 3 additions & 1 deletion lib/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ const COMMANDS = (exports.COMMANDS = {
MUTABLE_PUT: 6,
MUTABLE_GET: 7,
IMMUTABLE_PUT: 8,
IMMUTABLE_GET: 9
IMMUTABLE_GET: 9,
SEARCHABLE_RECORD_PUT: 10,
SEARCH: 11
})

exports.BOOTSTRAP_NODES = global.Pear?.config.dht?.bootstrap || [
Expand Down
53 changes: 53 additions & 0 deletions lib/messages.js
Original file line number Diff line number Diff line change
Expand Up @@ -420,3 +420,56 @@ exports.mutableGetResponse = {
}
}
}

exports.searchOptions = {
preencode(state, m) {
c.uint.preencode(state, m.closest)
c.uint.preencode(state, m.values)
},
encode(state, m) {
c.uint.encode(state, m.closest)
c.uint.encode(state, m.values)
},
decode(state) {
return {
closest: c.uint.decode(state),
values: c.uint.decode(state)
}
}
}

exports.searchableRecord = {
preencode(state, m) {
c.fixed32.preencode(state, m.key)
c.fixed32.preencode(state, m.value)
},
encode(state, m) {
c.fixed32.encode(state, m.key)
c.fixed32.encode(state, m.value)
},
decode(state) {
return {
key: c.fixed32.decode(state),
value: c.fixed32.decode(state)
}
}
}

const searchResponseItem = {
preencode(state, m) {
c.fixed32.preencode(state, m.key)
c.array(c.fixed32).preencode(state, m.values)
},
encode(state, m) {
c.fixed32.encode(state, m.key)
c.array(c.fixed32).encode(state, m.values)
},
decode(state) {
return {
key: c.fixed32.decode(state),
values: c.array(c.fixed32).decode(state)
}
}
}

exports.searchResponse = c.array(searchResponseItem)
38 changes: 38 additions & 0 deletions lib/persistent.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
const c = require('compact-encoding')
const sodium = require('sodium-universal')
const RecordCache = require('record-cache')
const { SearchableRecordCache } = require('searchable-record-cache')
const Cache = require('xache')
const b4a = require('b4a')
const unslab = require('unslab')
Expand All @@ -21,6 +22,7 @@ module.exports = class Persistent {
this.refreshes = new Cache(opts.refreshes)
this.mutables = new Cache(opts.mutables)
this.immutables = new Cache(opts.immutables)
this.searchableRecords = new SearchableRecordCache(opts.searchableRecords)
}

onlookup(req) {
Expand Down Expand Up @@ -226,11 +228,47 @@ module.exports = class Persistent {
req.reply(null)
}

async onsearchablerecordput(req) {
if (!req.target) return

const doc = c.decode(m.searchableRecord, req.value)
const { key, value } = doc

this.searchableRecords.add(key, value)

req.reply(null)
}

onsearch(req) {
if (!req.target || !req.value) {
// allow searchable record put to do an empty search
req.reply(null)
return
}

const opts = c.decode(m.searchOptions, req.value)
const results = []

const res = this.searchableRecords.search(req.target, opts)

for (const r of res) {
results.push({
values: r.values,
key: r.key,
distance: r.distance
})
}

results.sort((a, b) => a.distance - b.distance)
req.reply(c.encode(m.searchResponse, results.slice(0, 12))) // max we can fit
}

destroy() {
this.records.destroy()
this.refreshes.destroy()
this.mutables.destroy()
this.immutables.destroy()
this.searchableRecords.destroy()
}

static signMutable(seq, value, keyPair) {
Expand Down
Loading