Skip to content
This repository has been archived by the owner on Mar 23, 2023. It is now read-only.

Commit

Permalink
refactor: async iterators
Browse files Browse the repository at this point in the history
Uses async await and async iterators to implement the proposal here ipfs/interface-datastore#25

License: MIT
Signed-off-by: Alan Shaw <alan.shaw@protocol.ai>
  • Loading branch information
Alan Shaw committed Dec 2, 2018
1 parent 0aa53d3 commit b8a13f9
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 190 deletions.
8 changes: 3 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,13 @@
"dependencies": {
"datastore-core": "~0.6.0",
"encoding-down": "^5.0.4",
"interface-datastore": "~0.6.0",
"interface-datastore": "github:ipfs/interface-datastore#refactor/async-iterators",
"level-js": "github:timkuijsten/level.js#idbunwrapper",
"leveldown": "^3.0.2",
"levelup": "^2.0.2",
"pull-stream": "^3.6.9"
"levelup": "^2.0.2"
},
"devDependencies": {
"aegir": "^15.3.1",
"async": "^2.6.1",
"aegir": "^17.1.1",
"chai": "^4.2.0",
"cids": "~0.5.5",
"dirty-chai": "^2.0.1",
Expand Down
183 changes: 85 additions & 98 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@

/* :: import type {Callback, Batch, Query, QueryResult, QueryEntry} from 'interface-datastore' */

const pull = require('pull-stream')
const levelup = require('levelup')

const asyncFilter = require('interface-datastore').utils.asyncFilter
const asyncSort = require('interface-datastore').utils.asyncSort
const Key = require('interface-datastore').Key
const Errors = require('interface-datastore').Errors
const { Key, Errors, utils } = require('interface-datastore')
const encode = require('encoding-down')
const { promisify } = require('util')

const { filter, map, take, sortAll } = utils

/**
* A datastore backed by leveldb.
Expand Down Expand Up @@ -50,59 +48,53 @@ class LevelDatastore {
)
}

open (callback /* : Callback<void> */) /* : void */ {
this.db.open((err) => {
if (err) {
return callback(Errors.dbOpenFailedError(err))
}
callback()
})
async open () /* : Promise */ {
try {
await this.db.open()
} catch (err) {
throw Errors.dbOpenFailedError(err)
}
}

put (key /* : Key */, value /* : Buffer */, callback /* : Callback<void> */) /* : void */ {
this.db.put(key.toString(), value, (err) => {
if (err) {
return callback(Errors.dbWriteFailedError(err))
}
callback()
})
async put (key /* : Key */, value /* : Buffer */) /* : Promise */ {
try {
await this.db.put(key.toString(), value)
} catch (err) {
throw Errors.dbWriteFailedError(err)
}
}

get (key /* : Key */, callback /* : Callback<Buffer> */) /* : void */ {
this.db.get(key.toString(), (err, data) => {
if (err) {
return callback(Errors.notFoundError(err))
}
callback(null, data)
})
async get (key /* : Key */) /* : Promise */ {
let data
try {
data = await this.db.get(key.toString())
} catch (err) {
if (err.notFound) throw Errors.notFoundError(err)
throw Errors.dbWriteFailedError(err)
}
return data
}

has (key /* : Key */, callback /* : Callback<bool> */) /* : void */ {
this.db.get(key.toString(), (err, res) => {
if (err) {
if (err.notFound) {
callback(null, false)
return
}
callback(err)
return
}

callback(null, true)
})
async has (key /* : Key */) /* : Promise<Boolean> */ {
try {
await this.db.get(key.toString())
} catch (err) {
if (err.notFound) return false
throw err
}
return true
}

delete (key /* : Key */, callback /* : Callback<void> */) /* : void */ {
this.db.del(key.toString(), (err) => {
if (err) {
return callback(Errors.dbDeleteFailedError(err))
}
callback()
})
async delete (key /* : Key */) /* : Promise */ {
try {
await this.db.del(key.toString())
} catch (err) {
throw Errors.dbDeleteFailedError(err)
}
}

close (callback /* : Callback<void> */) /* : void */ {
this.db.close(callback)
async close () /* : Promise */ {
return this.db.close()
}

batch () /* : Batch<Buffer> */ {
Expand All @@ -121,8 +113,8 @@ class LevelDatastore {
key: key.toString()
})
},
commit: (callback /* : Callback<void> */) /* : void */ => {
this.db.batch(ops, callback)
commit: async () /* : Promise */ => {
return this.db.batch(ops)
}
}
}
Expand All @@ -133,70 +125,65 @@ class LevelDatastore {
values = !q.keysOnly
}

const iter = this.db.db.iterator({
keys: true,
values: values,
keyAsBuffer: true
})

const rawStream = (end, cb) => {
if (end) {
return iter.end((err) => {
cb(err || end)
})
}

iter.next((err, key, value) => {
if (err) {
return cb(err)
}

if (err == null && key == null && value == null) {
return iter.end((err) => {
cb(err || true)
})
}

const res /* : QueryEntry<Buffer> */ = {
key: new Key(key, false)
}

if (values) {
res.value = Buffer.from(value)
}

cb(null, res)
let it = levelIteratorToIterator(
this.db.db.iterator({
keys: true,
values: values,
keyAsBuffer: true
})
}
)

let tasks = [rawStream]
let filters = []
it = map(it, ({ key, value }) => {
const res /* : QueryEntry<Buffer> */ = { key: new Key(key, false) }
if (values) {
res.value = Buffer.from(value)
}
return res
})

if (q.prefix != null) {
const prefix = q.prefix
filters.push((e, cb) => cb(null, e.key.toString().startsWith(prefix)))
it = filter(it, e => e.key.toString().startsWith(q.prefix))
}

if (q.filters != null) {
filters = filters.concat(q.filters)
if (Array.isArray(q.filters)) {
it = q.filters.reduce((it, f) => filter(it, f), it)
}

tasks = tasks.concat(filters.map(f => asyncFilter(f)))

if (q.orders != null) {
tasks = tasks.concat(q.orders.map(o => asyncSort(o)))
if (Array.isArray(q.orders)) {
it = q.orders.reduce((it, f) => sortAll(it, f), it)
}

if (q.offset != null) {
let i = 0
tasks.push(pull.filter(() => i++ >= q.offset))
it = filter(it, () => i++ >= q.offset)
}

if (q.limit != null) {
tasks.push(pull.take(q.limit))
it = take(it, q.limit)
}

return pull.apply(null, tasks)
return it
}
}

function levelIteratorToIterator (li) {
return {
next: () => new Promise((resolve, reject) => {
li.next((err, key, value) => {
if (err) return reject(err)
if (key == null) return resolve({ done: true })
resolve({ done: false, value: { key, value } })
})
}),
return: () => new Promise((resolve, reject) => {
li.end(err => {
if (err) return reject(err)
resolve({ done: true })
})
}),
[Symbol.asyncIterator] () {
return this
}
}
}

Expand Down
44 changes: 26 additions & 18 deletions test/browser.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@
/* eslint-env mocha */
'use strict'

const each = require('async/each')
const MountStore = require('datastore-core').MountDatastore
const Key = require('interface-datastore').Key
const { MountDatastore } = require('datastore-core')
const { Key } = require('interface-datastore')

// leveldown will be swapped for level-js
const leveljs = require('leveldown')
Expand All @@ -14,31 +13,40 @@ const LevelStore = require('../src')
describe('LevelDatastore', () => {
describe('interface-datastore (leveljs)', () => {
require('interface-datastore/src/tests')({
setup (callback) {
callback(null, new LevelStore('hello', {db: leveljs}))
},
teardown (callback) {
leveljs.destroy('hello', callback)
}
setup: () => new LevelStore('hello', { db: leveljs }),
teardown: () => new Promise((resolve, reject) => {
leveljs.destroy('hello', err => {
if (err) return reject(err)
resolve()
})
})
})
})

describe('interface-datastore (mount(leveljs, leveljs, leveljs))', () => {
// TODO: unskip when datastore-core is converted to async/await/iterators
describe.skip('interface-datastore (mount(leveljs, leveljs, leveljs))', () => {
require('interface-datastore/src/tests')({
setup (callback) {
callback(null, new MountStore([{
setup () {
return new MountDatastore([{
prefix: new Key('/a'),
datastore: new LevelStore('one', {db: leveljs})
datastore: new LevelStore('one', { db: leveljs })
}, {
prefix: new Key('/q'),
datastore: new LevelStore('two', {db: leveljs})
datastore: new LevelStore('two', { db: leveljs })
}, {
prefix: new Key('/z'),
datastore: new LevelStore('three', {db: leveljs})
}]))
datastore: new LevelStore('three', { db: leveljs })
}])
},
teardown (callback) {
each(['one', 'two', 'three'], leveljs.destroy.bind(leveljs), callback)
teardown () {
return Promise.all(['one', 'two', 'three'].map(dir => {
return new Promise((resolve, reject) => {
leveljs.destroy(dir, err => {
if (err) return reject(err)
resolve()
})
})
}))
}
})
})
Expand Down
Loading

0 comments on commit b8a13f9

Please sign in to comment.