Skip to content
This repository has been archived by the owner on Mar 23, 2023. It is now read-only.

Commit

Permalink
feat: add querying and make all tests pass
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Jacob Heun <jacobheun@gmail.com>
  • Loading branch information
jacobheun committed Apr 4, 2018
1 parent b710421 commit 0c89c78
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 33 deletions.
9 changes: 7 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,15 @@ $ npm install datastore-s3
```

## Usage
A bucket must be created prior to using datastore-s3. Please see the AWS docs for information on how to configure the S3 instance. A bucket name is required to be set at the s3 instance level, see the below example.

```js
const S3 = require('aws-sdk').S3
const s3Instance = new S3({ params: { Bucket: 'my-ipfs-bucket' } })
const S3Store = require('datastore-s3')
// TODO: How does this need to be configured and used with IPFS
const store = new S3Store('.ipfs/datastore', {
s3: s3Instance
})
```

## Contribute
Expand All @@ -40,4 +45,4 @@ Small note: If editing the Readme, please conform to the [standard-readme](https

## License

MIT 2017 © IPFS
MIT 2018 © IPFS
163 changes: 132 additions & 31 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
const path = require('path')
const setImmediate = require('async/setImmediate')
const each = require('async/each')
const eachLimit = require('async/eachLimit')
const waterfall = require('async/series')
const asyncFilter = require('interface-datastore').utils.asyncFilter
const asyncSort = require('interface-datastore').utils.asyncSort
const Key = require('interface-datastore').Key

const Deferred = require('pull-defer')
const pull = require('pull-stream')

/* :: export type S3DSInputOptions = {
s3: S3Instance
Expand Down Expand Up @@ -85,11 +87,11 @@ class S3Datastore {
Key: this._getFullKey(key)
}, (err, data) => {
if (err) {
callback(err, null)
return
return callback(err, null)
}

callback(null, data.Body || null)

// If a body was returned, ensure it's a Buffer
callback(null, data.Body ? Buffer.from(data.Body) : null)
})
}

Expand All @@ -105,11 +107,9 @@ class S3Datastore {
Key: this._getFullKey(key)
}, (err, data) => {
if (err && err.code === 'NotFound') {
callback(null, false)
return
return callback(null, false)
} else if (err) {
callback(err, false)
return
return callback(err, false)
}

callback(null, true)
Expand Down Expand Up @@ -159,48 +159,149 @@ class S3Datastore {
}
}

/**
* Recursively fetches all keys from s3
*/
_listKeys (params, keys, callback) {
if (typeof callback === 'undefined') {
callback = keys
keys = []
}

this.opts.s3.listObjectsV2(params, (err, data) => {
if (err) {
return callback(err)
}

data.Contents.forEach((d) => {
// Remove the path from the key
keys.push(new Key(d.Key.slice(this.path.length), false))
})

// If we didnt get all records, recursively query
if (data.isTruncated) {
// If NextMarker is absent, use the key from the last result
params.StartAfter = data.Contents[data.Contents.length - 1].Key

// recursively fetch keys
return this._listKeys(params, keys, callback)
}

callback(err, keys)
})
}

/**
* Returns an iterator for fetching objects from s3 by their key
* @param {Array<Key>} keys
* @param {Boolean} keysOnly Whether or not only keys should be returned
*/
_getS3Iterator (keys, keysOnly) {
let count = 0

return {
next: (callback) => {
// Check if we're done
if (count >= keys.length) {
return callback(null, null, null)
}

let currentKey = keys[count++]

if (keysOnly) {
return callback(null, currentKey, null)
}

// Fetch the object Buffer from s3
this.get(currentKey, (err, data) => {
callback(err, currentKey, data)
})
}
}
}

/**
* Query the store.
*
* @param {Object} q
* @returns {PullStream}
*/
query (q /* : Query<Buffer> */) /* : QueryResult<Buffer> */ {


const prefix = path.join(this.path, q.prefix || '')

let deferred = Deferred.source()
let prefix = q.prefix
let limit = q.limit || null
let offset = q.offset || 0
let filters = q.filters || []
let orders = q.orders || []
let keysOnly = q.keysOnly || false
let iterator

const params = {
Prefix: prefix
}

// List the objects from s3, with: prefix, limit, offset
// this gets called recursively, the internals need to iterate
const rawStream = (end, callback) => {
if (end) {
return callback(end)
}

iterator.next((err, key, value) => {
if (err) {
return callback(err)
}

// If the iterator is done, declare the stream done
if (err === null && key === null && value === null) {
return callback(true)
}

const res = {
key: key
}

if (value) {
res.value = value
}

// If !keyOnly get each object from s3
callback(null, res)
})
}

// Get all the keys via list object, recursively as needed
this._listKeys(params, (err, keys) => {
if (err) {
return callback(err)
}

// Filter the objects
iterator = this._getS3Iterator(keys, q.keysOnly || false)

// Order the objects
deferred.resolve(rawStream)
})

// Use a deferred pull stream source, as async operations need to occur before the
// pull stream begins
let tasks = [deferred]

if (q.filters != null) {
tasks = tasks.concat(q.filters.map(f => asyncFilter(f)))
}

/*
- `prefix: string` (optional) - only return values where the key starts with this prefix
- `filters: Array<Filter<Value>>` (optional) - filter the results according to the these functions
- `orders: Array<Order<Value>>` (optional) - order the results according to these functions
- `limit: number` (optional) - only return this many records
- `offset: number` (optional) - skip this many records at the beginning
- `keysOnly: bool` (optional) - Only return keys, no values.
*/
if (q.orders != null) {
tasks = tasks.concat(q.orders.map(o => asyncSort(o)))
}

if (q.offset != null) {
let i = 0
tasks.push(pull.filter(() => i++ >= q.offset))
}

if (q.limit != null) {
tasks.push(pull.take(q.limit))
}

// I'll need to return a https://pull-stream.github.io/#pull-defer, since the query to s3 is async
throw new Error('TODO')
return deferred
return pull.apply(null, tasks)
}

/**
* This will check the s3 bucket to ensure permissions are set
* This will check the s3 bucket to ensure access and existence
*
* @param {function(Error)} callback
*/
Expand Down
17 changes: 17 additions & 0 deletions test/utils/s3-mock.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,23 @@ module.exports = class S3Mock {
callback({ code: 'NotFound' }, null)
}
})

this.mocks['listObjectsV2'] = standin.replace(s3, 'listObjectsV2', (stand, params, callback) => {
expect(params.Prefix).to.be.a('string')
const results = {
Contents: []
}

for (let k in this.storage) {
if (k.startsWith(params.Prefix)) {
results.Contents.push({
Key: k
})
}
}

callback(null, results)
})

this.mocks['upload'] = standin.replace(s3, 'upload', (stand, params, callback) => {
expect(params.Key).to.be.a('string')
Expand Down

0 comments on commit 0c89c78

Please sign in to comment.