Skip to content
This repository was archived by the owner on Mar 23, 2023. It is now read-only.

Commit 0c89c78

Browse files
committed
feat: add querying and make all tests pass
License: MIT Signed-off-by: Jacob Heun <jacobheun@gmail.com>
1 parent b710421 commit 0c89c78

File tree

3 files changed

+156
-33
lines changed

3 files changed

+156
-33
lines changed

README.md

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,15 @@ $ npm install datastore-s3
2626
```
2727

2828
## Usage
29+
A bucket must be created prior to using datastore-s3. Please see the AWS docs for information on how to configure the S3 instance. A bucket name is required to be set at the s3 instance level, see the below example.
2930

3031
```js
32+
const S3 = require('aws-sdk').S3
33+
const s3Instance = new S3({ params: { Bucket: 'my-ipfs-bucket' } })
3134
const S3Store = require('datastore-s3')
32-
// TODO: How does this need to be configured and used with IPFS
35+
const store = new S3Store('.ipfs/datastore', {
36+
s3: s3Instance
37+
})
3338
```
3439

3540
## Contribute
@@ -40,4 +45,4 @@ Small note: If editing the Readme, please conform to the [standard-readme](https
4045

4146
## License
4247

43-
MIT 2017 © IPFS
48+
MIT 2018 © IPFS

src/index.js

Lines changed: 132 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@
55
const path = require('path')
66
const setImmediate = require('async/setImmediate')
77
const each = require('async/each')
8+
const eachLimit = require('async/eachLimit')
89
const waterfall = require('async/series')
910
const asyncFilter = require('interface-datastore').utils.asyncFilter
1011
const asyncSort = require('interface-datastore').utils.asyncSort
1112
const Key = require('interface-datastore').Key
1213

1314
const Deferred = require('pull-defer')
15+
const pull = require('pull-stream')
1416

1517
/* :: export type S3DSInputOptions = {
1618
s3: S3Instance
@@ -85,11 +87,11 @@ class S3Datastore {
8587
Key: this._getFullKey(key)
8688
}, (err, data) => {
8789
if (err) {
88-
callback(err, null)
89-
return
90+
return callback(err, null)
9091
}
91-
92-
callback(null, data.Body || null)
92+
93+
// If a body was returned, ensure it's a Buffer
94+
callback(null, data.Body ? Buffer.from(data.Body) : null)
9395
})
9496
}
9597

@@ -105,11 +107,9 @@ class S3Datastore {
105107
Key: this._getFullKey(key)
106108
}, (err, data) => {
107109
if (err && err.code === 'NotFound') {
108-
callback(null, false)
109-
return
110+
return callback(null, false)
110111
} else if (err) {
111-
callback(err, false)
112-
return
112+
return callback(err, false)
113113
}
114114

115115
callback(null, true)
@@ -159,48 +159,149 @@ class S3Datastore {
159159
}
160160
}
161161

162+
/**
163+
* Recursively fetches all keys from s3
164+
*/
165+
_listKeys (params, keys, callback) {
166+
if (typeof callback === 'undefined') {
167+
callback = keys
168+
keys = []
169+
}
170+
171+
this.opts.s3.listObjectsV2(params, (err, data) => {
172+
if (err) {
173+
return callback(err)
174+
}
175+
176+
data.Contents.forEach((d) => {
177+
// Remove the path from the key
178+
keys.push(new Key(d.Key.slice(this.path.length), false))
179+
})
180+
181+
// If we didnt get all records, recursively query
182+
if (data.isTruncated) {
183+
// If NextMarker is absent, use the key from the last result
184+
params.StartAfter = data.Contents[data.Contents.length - 1].Key
185+
186+
// recursively fetch keys
187+
return this._listKeys(params, keys, callback)
188+
}
189+
190+
callback(err, keys)
191+
})
192+
}
193+
194+
/**
195+
* Returns an iterator for fetching objects from s3 by their key
196+
* @param {Array<Key>} keys
197+
* @param {Boolean} keysOnly Whether or not only keys should be returned
198+
*/
199+
_getS3Iterator (keys, keysOnly) {
200+
let count = 0
201+
202+
return {
203+
next: (callback) => {
204+
// Check if we're done
205+
if (count >= keys.length) {
206+
return callback(null, null, null)
207+
}
208+
209+
let currentKey = keys[count++]
210+
211+
if (keysOnly) {
212+
return callback(null, currentKey, null)
213+
}
214+
215+
// Fetch the object Buffer from s3
216+
this.get(currentKey, (err, data) => {
217+
callback(err, currentKey, data)
218+
})
219+
}
220+
}
221+
}
222+
162223
/**
163224
* Query the store.
164225
*
165226
* @param {Object} q
166227
* @returns {PullStream}
167228
*/
168229
query (q /* : Query<Buffer> */) /* : QueryResult<Buffer> */ {
169-
230+
231+
const prefix = path.join(this.path, q.prefix || '')
232+
170233
let deferred = Deferred.source()
171-
let prefix = q.prefix
172-
let limit = q.limit || null
173-
let offset = q.offset || 0
174-
let filters = q.filters || []
175-
let orders = q.orders || []
176-
let keysOnly = q.keysOnly || false
234+
let iterator
235+
236+
const params = {
237+
Prefix: prefix
238+
}
177239

178-
// List the objects from s3, with: prefix, limit, offset
240+
// this gets called recursively, the internals need to iterate
241+
const rawStream = (end, callback) => {
242+
if (end) {
243+
return callback(end)
244+
}
245+
246+
iterator.next((err, key, value) => {
247+
if (err) {
248+
return callback(err)
249+
}
250+
251+
// If the iterator is done, declare the stream done
252+
if (err === null && key === null && value === null) {
253+
return callback(true)
254+
}
255+
256+
const res = {
257+
key: key
258+
}
259+
260+
if (value) {
261+
res.value = value
262+
}
179263

180-
// If !keyOnly get each object from s3
264+
callback(null, res)
265+
})
266+
}
267+
268+
// Get all the keys via list object, recursively as needed
269+
this._listKeys(params, (err, keys) => {
270+
if (err) {
271+
return callback(err)
272+
}
181273

182-
// Filter the objects
274+
iterator = this._getS3Iterator(keys, q.keysOnly || false)
183275

184-
// Order the objects
276+
deferred.resolve(rawStream)
277+
})
185278

279+
// Use a deferred pull stream source, as async operations need to occur before the
280+
// pull stream begins
281+
let tasks = [deferred]
186282

283+
if (q.filters != null) {
284+
tasks = tasks.concat(q.filters.map(f => asyncFilter(f)))
285+
}
187286

188-
/*
189-
- `prefix: string` (optional) - only return values where the key starts with this prefix
190-
- `filters: Array<Filter<Value>>` (optional) - filter the results according to the these functions
191-
- `orders: Array<Order<Value>>` (optional) - order the results according to these functions
192-
- `limit: number` (optional) - only return this many records
193-
- `offset: number` (optional) - skip this many records at the beginning
194-
- `keysOnly: bool` (optional) - Only return keys, no values.
195-
*/
287+
if (q.orders != null) {
288+
tasks = tasks.concat(q.orders.map(o => asyncSort(o)))
289+
}
290+
291+
if (q.offset != null) {
292+
let i = 0
293+
tasks.push(pull.filter(() => i++ >= q.offset))
294+
}
295+
296+
if (q.limit != null) {
297+
tasks.push(pull.take(q.limit))
298+
}
196299

197-
// I'll need to return a https://pull-stream.github.io/#pull-defer, since the query to s3 is async
198-
throw new Error('TODO')
199-
return deferred
300+
return pull.apply(null, tasks)
200301
}
201302

202303
/**
203-
* This will check the s3 bucket to ensure permissions are set
304+
* This will check the s3 bucket to ensure access and existence
204305
*
205306
* @param {function(Error)} callback
206307
*/

test/utils/s3-mock.js

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,23 @@ module.exports = class S3Mock {
4949
callback({ code: 'NotFound' }, null)
5050
}
5151
})
52+
53+
this.mocks['listObjectsV2'] = standin.replace(s3, 'listObjectsV2', (stand, params, callback) => {
54+
expect(params.Prefix).to.be.a('string')
55+
const results = {
56+
Contents: []
57+
}
58+
59+
for (let k in this.storage) {
60+
if (k.startsWith(params.Prefix)) {
61+
results.Contents.push({
62+
Key: k
63+
})
64+
}
65+
}
66+
67+
callback(null, results)
68+
})
5269

5370
this.mocks['upload'] = standin.replace(s3, 'upload', (stand, params, callback) => {
5471
expect(params.Key).to.be.a('string')

0 commit comments

Comments
 (0)