4
4
/* :: import type {Batch, Query, QueryResult, Callback} from 'interface-datastore' */
5
5
const assert = require ( 'assert' )
6
6
const path = require ( 'upath' )
7
- const {
8
- filter,
9
- map,
10
- take
11
- } = require ( 'streaming-iterables' )
12
7
13
- const IDatastore = require ( 'interface-datastore' )
14
- const sortAll = IDatastore . utils . sortAll
15
- const Key = IDatastore . Key
16
- const Errors = IDatastore . Errors
8
+ const {
9
+ Adapter,
10
+ Key,
11
+ Errors,
12
+ utils : {
13
+ filter
14
+ }
15
+ } = require ( 'interface-datastore' )
17
16
const createRepo = require ( './s3-repo' )
18
17
19
18
/* :: export type S3DSInputOptions = {
@@ -42,13 +41,15 @@ declare type S3Instance = {
42
41
* Keys need to be sanitized before use, as they are written
43
42
* to the file system as is.
44
43
*/
45
- class S3Datastore {
44
+ class S3Datastore extends Adapter {
46
45
/* :: path: string */
47
46
/* :: opts: S3DSInputOptions */
48
47
/* :: bucket: string */
49
48
/* :: createIfMissing: boolean */
50
49
51
50
constructor ( path /* : string */ , opts /* : S3DSInputOptions */ ) {
51
+ super ( )
52
+
52
53
this . path = path
53
54
this . opts = opts
54
55
const {
@@ -209,13 +210,7 @@ class S3Datastore {
209
210
}
210
211
}
211
212
212
- /**
213
- * Query the store.
214
- *
215
- * @param {Object } q
216
- * @returns {Iterable }
217
- */
218
- query ( q /* : Query<Buffer> */ ) /* : QueryResult<Buffer> */ {
213
+ async * _all ( q , options ) {
219
214
const prefix = path . join ( this . path , q . prefix || '' )
220
215
221
216
let values = true
@@ -230,36 +225,18 @@ class S3Datastore {
230
225
let it = this . _listKeys ( params )
231
226
232
227
if ( q . prefix != null ) {
233
- it = filter ( k => k . toString ( ) . startsWith ( q . prefix ) , it )
228
+ it = filter ( it , k => k . toString ( ) . startsWith ( q . prefix ) )
234
229
}
235
230
236
- it = map ( async ( key ) => {
231
+ for await ( const key of it ) {
237
232
const res /* : QueryEntry<Buffer> */ = { key }
238
233
if ( values ) {
239
234
// Fetch the object Buffer from s3
240
235
res . value = await this . get ( key )
241
236
}
242
- return res
243
- } , it )
244
-
245
- if ( Array . isArray ( q . filters ) ) {
246
- it = q . filters . reduce ( ( it , f ) => filter ( f , it ) , it )
247
- }
248
237
249
- if ( Array . isArray ( q . orders ) ) {
250
- it = q . orders . reduce ( ( it , f ) => sortAll ( it , f ) , it )
238
+ yield res
251
239
}
252
-
253
- if ( q . offset != null ) {
254
- let i = 0
255
- it = filter ( ( ) => i ++ >= q . offset , it )
256
- }
257
-
258
- if ( q . limit != null ) {
259
- it = take ( q . limit , it )
260
- }
261
-
262
- return it
263
240
}
264
241
265
242
/**
@@ -280,12 +257,6 @@ class S3Datastore {
280
257
throw Errors . dbOpenFailedError ( err )
281
258
}
282
259
}
283
-
284
- /**
285
- * Close the store.
286
- */
287
- close ( ) {
288
- }
289
260
}
290
261
291
262
module . exports = S3Datastore
0 commit comments