Skip to content
This repository has been archived by the owner on Mar 23, 2023. It is now read-only.

Commit

Permalink
feat: adds interface-datastore streaming api
Browse files Browse the repository at this point in the history
Uses the Adapter from interface-datastore to support the new
streaming api for puts/get/etc.
  • Loading branch information
achingbrain authored and jacobheun committed May 8, 2020
1 parent 4251456 commit 6c74394
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 49 deletions.
9 changes: 4 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,18 @@
},
"homepage": "https://github.com/ipfs/js-datastore-s3#readme",
"dependencies": {
"datastore-core": "^0.7.0",
"interface-datastore": "^0.7.0",
"streaming-iterables": "^4.1.0",
"datastore-core": "^1.1.0",
"interface-datastore": "^1.0.2",
"upath": "^1.1.0"
},
"devDependencies": {
"aegir": "^20.4.1",
"aegir": "^22.0.0",
"aws-sdk": "^2.579.0",
"chai": "^4.2.0",
"dirty-chai": "^2.0.1",
"flow-bin": "^0.93.0",
"flow-typed": "^2.5.1",
"ipfs-repo": "^0.29.2",
"ipfs-repo": "^2.1.1",
"stand-in": "^4.2.0"
},
"peerDependencies": {
Expand Down
59 changes: 15 additions & 44 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@
/* :: import type {Batch, Query, QueryResult, Callback} from 'interface-datastore' */
const assert = require('assert')
const path = require('upath')
const {
filter,
map,
take
} = require('streaming-iterables')

const IDatastore = require('interface-datastore')
const sortAll = IDatastore.utils.sortAll
const Key = IDatastore.Key
const Errors = IDatastore.Errors
const {
Adapter,
Key,
Errors,
utils: {
filter
}
} = require('interface-datastore')
const createRepo = require('./s3-repo')

/* :: export type S3DSInputOptions = {
Expand Down Expand Up @@ -42,13 +41,15 @@ declare type S3Instance = {
* Keys need to be sanitized before use, as they are written
* to the file system as is.
*/
class S3Datastore {
class S3Datastore extends Adapter {
/* :: path: string */
/* :: opts: S3DSInputOptions */
/* :: bucket: string */
/* :: createIfMissing: boolean */

constructor (path /* : string */, opts /* : S3DSInputOptions */) {
super()

this.path = path
this.opts = opts
const {
Expand Down Expand Up @@ -209,13 +210,7 @@ class S3Datastore {
}
}

/**
* Query the store.
*
* @param {Object} q
* @returns {Iterable}
*/
query (q /* : Query<Buffer> */) /* : QueryResult<Buffer> */ {
async * _all (q, options) {
const prefix = path.join(this.path, q.prefix || '')

let values = true
Expand All @@ -230,36 +225,18 @@ class S3Datastore {
let it = this._listKeys(params)

if (q.prefix != null) {
it = filter(k => k.toString().startsWith(q.prefix), it)
it = filter(it, k => k.toString().startsWith(q.prefix))
}

it = map(async (key) => {
for await (const key of it) {
const res /* : QueryEntry<Buffer> */ = { key }
if (values) {
// Fetch the object Buffer from s3
res.value = await this.get(key)
}
return res
}, it)

if (Array.isArray(q.filters)) {
it = q.filters.reduce((it, f) => filter(f, it), it)
}

if (Array.isArray(q.orders)) {
it = q.orders.reduce((it, f) => sortAll(it, f), it)
yield res
}

if (q.offset != null) {
let i = 0
it = filter(() => i++ >= q.offset, it)
}

if (q.limit != null) {
it = take(q.limit, it)
}

return it
}

/**
Expand All @@ -280,12 +257,6 @@ class S3Datastore {
throw Errors.dbOpenFailedError(err)
}
}

/**
* Close the store.
*/
close () {
}
}

module.exports = S3Datastore
Expand Down

0 comments on commit 6c74394

Please sign in to comment.