5
5
const path = require ( 'path' )
6
6
const setImmediate = require ( 'async/setImmediate' )
7
7
const each = require ( 'async/each' )
8
+ const eachLimit = require ( 'async/eachLimit' )
8
9
const waterfall = require ( 'async/series' )
9
10
const asyncFilter = require ( 'interface-datastore' ) . utils . asyncFilter
10
11
const asyncSort = require ( 'interface-datastore' ) . utils . asyncSort
11
12
const Key = require ( 'interface-datastore' ) . Key
12
13
13
14
const Deferred = require ( 'pull-defer' )
15
+ const pull = require ( 'pull-stream' )
14
16
15
17
/* :: export type S3DSInputOptions = {
16
18
s3: S3Instance
@@ -85,11 +87,11 @@ class S3Datastore {
85
87
Key : this . _getFullKey ( key )
86
88
} , ( err , data ) => {
87
89
if ( err ) {
88
- callback ( err , null )
89
- return
90
+ return callback ( err , null )
90
91
}
91
-
92
- callback ( null , data . Body || null )
92
+
93
+ // If a body was returned, ensure it's a Buffer
94
+ callback ( null , data . Body ? Buffer . from ( data . Body ) : null )
93
95
} )
94
96
}
95
97
@@ -105,11 +107,9 @@ class S3Datastore {
105
107
Key : this . _getFullKey ( key )
106
108
} , ( err , data ) => {
107
109
if ( err && err . code === 'NotFound' ) {
108
- callback ( null , false )
109
- return
110
+ return callback ( null , false )
110
111
} else if ( err ) {
111
- callback ( err , false )
112
- return
112
+ return callback ( err , false )
113
113
}
114
114
115
115
callback ( null , true )
@@ -159,48 +159,149 @@ class S3Datastore {
159
159
}
160
160
}
161
161
162
+ /**
163
+ * Recursively fetches all keys from s3
164
+ */
165
+ _listKeys ( params , keys , callback ) {
166
+ if ( typeof callback === 'undefined' ) {
167
+ callback = keys
168
+ keys = [ ]
169
+ }
170
+
171
+ this . opts . s3 . listObjectsV2 ( params , ( err , data ) => {
172
+ if ( err ) {
173
+ return callback ( err )
174
+ }
175
+
176
+ data . Contents . forEach ( ( d ) => {
177
+ // Remove the path from the key
178
+ keys . push ( new Key ( d . Key . slice ( this . path . length ) , false ) )
179
+ } )
180
+
181
+ // If we didnt get all records, recursively query
182
+ if ( data . isTruncated ) {
183
+ // If NextMarker is absent, use the key from the last result
184
+ params . StartAfter = data . Contents [ data . Contents . length - 1 ] . Key
185
+
186
+ // recursively fetch keys
187
+ return this . _listKeys ( params , keys , callback )
188
+ }
189
+
190
+ callback ( err , keys )
191
+ } )
192
+ }
193
+
194
+ /**
195
+ * Returns an iterator for fetching objects from s3 by their key
196
+ * @param {Array<Key> } keys
197
+ * @param {Boolean } keysOnly Whether or not only keys should be returned
198
+ */
199
+ _getS3Iterator ( keys , keysOnly ) {
200
+ let count = 0
201
+
202
+ return {
203
+ next : ( callback ) => {
204
+ // Check if we're done
205
+ if ( count >= keys . length ) {
206
+ return callback ( null , null , null )
207
+ }
208
+
209
+ let currentKey = keys [ count ++ ]
210
+
211
+ if ( keysOnly ) {
212
+ return callback ( null , currentKey , null )
213
+ }
214
+
215
+ // Fetch the object Buffer from s3
216
+ this . get ( currentKey , ( err , data ) => {
217
+ callback ( err , currentKey , data )
218
+ } )
219
+ }
220
+ }
221
+ }
222
+
162
223
/**
163
224
* Query the store.
164
225
*
165
226
* @param {Object } q
166
227
* @returns {PullStream }
167
228
*/
168
229
query ( q /* : Query<Buffer> */ ) /* : QueryResult<Buffer> */ {
169
-
230
+
231
+ const prefix = path . join ( this . path , q . prefix || '' )
232
+
170
233
let deferred = Deferred . source ( )
171
- let prefix = q . prefix
172
- let limit = q . limit || null
173
- let offset = q . offset || 0
174
- let filters = q . filters || [ ]
175
- let orders = q . orders || [ ]
176
- let keysOnly = q . keysOnly || false
234
+ let iterator
235
+
236
+ const params = {
237
+ Prefix : prefix
238
+ }
177
239
178
- // List the objects from s3, with: prefix, limit, offset
240
+ // this gets called recursively, the internals need to iterate
241
+ const rawStream = ( end , callback ) => {
242
+ if ( end ) {
243
+ return callback ( end )
244
+ }
245
+
246
+ iterator . next ( ( err , key , value ) => {
247
+ if ( err ) {
248
+ return callback ( err )
249
+ }
250
+
251
+ // If the iterator is done, declare the stream done
252
+ if ( err === null && key === null && value === null ) {
253
+ return callback ( true )
254
+ }
255
+
256
+ const res = {
257
+ key : key
258
+ }
259
+
260
+ if ( value ) {
261
+ res . value = value
262
+ }
179
263
180
- // If !keyOnly get each object from s3
264
+ callback ( null , res )
265
+ } )
266
+ }
267
+
268
+ // Get all the keys via list object, recursively as needed
269
+ this . _listKeys ( params , ( err , keys ) => {
270
+ if ( err ) {
271
+ return callback ( err )
272
+ }
181
273
182
- // Filter the objects
274
+ iterator = this . _getS3Iterator ( keys , q . keysOnly || false )
183
275
184
- // Order the objects
276
+ deferred . resolve ( rawStream )
277
+ } )
185
278
279
+ // Use a deferred pull stream source, as async operations need to occur before the
280
+ // pull stream begins
281
+ let tasks = [ deferred ]
186
282
283
+ if ( q . filters != null ) {
284
+ tasks = tasks . concat ( q . filters . map ( f => asyncFilter ( f ) ) )
285
+ }
187
286
188
- /*
189
- - `prefix: string` (optional) - only return values where the key starts with this prefix
190
- - `filters: Array<Filter<Value>>` (optional) - filter the results according to the these functions
191
- - `orders: Array<Order<Value>>` (optional) - order the results according to these functions
192
- - `limit: number` (optional) - only return this many records
193
- - `offset: number` (optional) - skip this many records at the beginning
194
- - `keysOnly: bool` (optional) - Only return keys, no values.
195
- */
287
+ if ( q . orders != null ) {
288
+ tasks = tasks . concat ( q . orders . map ( o => asyncSort ( o ) ) )
289
+ }
290
+
291
+ if ( q . offset != null ) {
292
+ let i = 0
293
+ tasks . push ( pull . filter ( ( ) => i ++ >= q . offset ) )
294
+ }
295
+
296
+ if ( q . limit != null ) {
297
+ tasks . push ( pull . take ( q . limit ) )
298
+ }
196
299
197
- // I'll need to return a https://pull-stream.github.io/#pull-defer, since the query to s3 is async
198
- throw new Error ( 'TODO' )
199
- return deferred
300
+ return pull . apply ( null , tasks )
200
301
}
201
302
202
303
/**
203
- * This will check the s3 bucket to ensure permissions are set
304
+ * This will check the s3 bucket to ensure access and existence
204
305
*
205
306
* @param {function(Error) } callback
206
307
*/
0 commit comments