Skip to content

Commit e78cbfe

Browse files
authored
Merge pull request #54 from littlstar/updates
cleaned code, improved documentation, fixed variable naming in reduce
2 parents 60dc872 + 29ef58b commit e78cbfe

2 files changed

Lines changed: 121 additions & 116 deletions

File tree

lib/Request.js

Lines changed: 91 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ class Request {
3333

3434
/**
3535
* Enable modifications to the initial context.
36+
*
37+
* @return {Promise}
3638
*/
3739

3840
resolveSources() {
@@ -45,12 +47,29 @@ class Request {
4547
})
4648
}
4749

50+
/**
51+
* Creates a progress bar for the request.
52+
*
53+
* @param {Batch} batch The Batch instance used for the request.
54+
* @param {Number} numSources The number of files being processed.
55+
*/
56+
57+
handleProgress(batch, numSources) {
58+
if (this.showProgress) {
59+
const progress = new ProgressBar('[:bar] :percent', {
60+
total: numSources,
61+
width: 40
62+
})
63+
batch.on('progress', () => progress.tick())
64+
}
65+
}
66+
4867
/**
4968
* Sets the encoding to use when getting s3 objects with
50-
* <code>object.Body.toString(encoding)</code>. If not set, <code>utf8</code>
51-
* is used.
69+
* object.Body.toString(encoding). If not set, utf8 is used.
5270
*
5371
* @param {String} encoding The encoding.
72+
* @return {Request} The instance on which this method was called.
5473
*/
5574

5675
encode(encoding) {
@@ -60,11 +79,12 @@ class Request {
6079

6180
/**
6281
* Sets a transformation function to be used when getting objects from s3.
63-
* Using <code>transform</code> takes precedence over <code>encode</code>.
82+
* Using transform takes precedence over encode.
6483
*
6584
* @param {Function} transformer The function to use to transform the
6685
* object. The transforation function takes an s3 object as a parameter
6786
* and should return the file's contents as a string.
87+
* @return {Request} The instance on which this method was called.
6888
*/
6989

7090
transform(t) {
@@ -77,6 +97,7 @@ class Request {
7797
* the computer can handle). Has no effect with reduce.
7898
*
7999
* @param {Integer} concurrency The concurrency level to use in the request.
100+
* @return {Request} The instance on which this method was called.
80101
*/
81102

82103
concurrency(concurrency) {
@@ -86,7 +107,9 @@ class Request {
86107

87108
/**
88109
* Limits the number of sources being operated on.
110+
*
89111
* @param {Integer} limit
112+
* @return {Request} The instance on which this method was called.
90113
*/
91114

92115
limit(limit) {
@@ -96,6 +119,8 @@ class Request {
96119

97120
/**
98121
* Reverse the sources being operated on.
122+
*
123+
* @return {Request} The instance on which this method was called.
99124
*/
100125

101126
reverse() {
@@ -105,6 +130,8 @@ class Request {
105130

106131
/**
107132
* Enables destructive actions (map, filter) to occur inplace.
133+
*
134+
* @return {Request} The instance on which this method was called.
108135
*/
109136

110137
inplace() {
@@ -119,6 +146,7 @@ class Request {
119146
*
120147
* @param {String} bucket The target bucket.
121148
* @param {String} prefix The target prefix (folder) where the output will go.
149+
* @return {Request} The instance on which this method was called.
122150
*/
123151

124152
output(bucket, prefix) {
@@ -136,6 +164,9 @@ class Request {
136164
* @param {Function} func The function to perform over the working context.
137165
* @param {Boolean} [isasync=false] Set to true if `func` is async (returns a
138166
* Promise).
167+
* @return {Promise<Object>} Resolves after processing has completed,
168+
* returning an object that contains the bucket, prefix, and key of the last
169+
* S3 object that was processed.
139170
*/
140171

141172
forEach(func, isasync) {
@@ -148,6 +179,9 @@ class Request {
148179
* @param {Function} func The function to perform over the working context.
149180
* @param {Boolean} [isAsync=false] Set to true if `func` is async (returns a
150181
* Promise).
182+
* @return {Promise<Object>} Resolves after processing has completed,
183+
* returning an object that contains the bucket, prefix, and key of the last
184+
* S3 object that was processed.
151185
*/
152186

153187
each(func, isAsync, concurrency) {
@@ -158,22 +192,16 @@ class Request {
158192
return new Promise((success, fail) => {
159193
this.resolveSources().then((sources) => {
160194

161-
const progress = new ProgressBar('each [:bar] :percent', {
162-
total: sources.length,
163-
width: 40
164-
})
165-
const last = sources[sources.length - 1]
195+
this.handleProgress(batch, sources.length)
196+
const lastKey = sources[sources.length - 1]
166197

167-
// create functions array
198+
// Loop over sources, apply function to each
168199
sources.forEach((source) => {
169-
200+
const bucket = source.bucket
201+
const key = source.key
202+
const encoding = this.opts.encoding
203+
const transformer = this.opts.transformer
170204
batch.push((done) => {
171-
172-
const bucket = source.bucket
173-
const key = source.key
174-
const encoding = this.opts.encoding
175-
const transformer = this.opts.transformer
176-
177205
this.s3.get(bucket, key, encoding, transformer).then((body) => {
178206
if (isAsync) {
179207
func(body, key).then(done).catch(done)
@@ -185,18 +213,14 @@ class Request {
185213
})
186214
})
187215

188-
if (this.showProgress) {
189-
batch.on('progress', () => progress.tick())
190-
}
191-
216+
// Resolve error or last key processed.
192217
batch.end((err) => {
193218
if (err) {
194219
fail(err)
195220
} else {
196-
success(last)
221+
success(lastKey)
197222
}
198223
})
199-
200224
}).catch(fail)
201225
})
202226
}
@@ -211,6 +235,9 @@ class Request {
211235
* string that will replace the given s3 object.
212236
* @param {Boolean} [isAsync=false] If set to true, this indicates that func
213237
* is async and returns a promise.
238+
* @return {Promise<Object>} Resolves after processing has completed,
239+
* returning an object that contains the bucket, prefix, and key of the last
240+
* S3 object that was processed.
214241
*/
215242

216243
map(func, isAsync) {
@@ -248,20 +275,16 @@ class Request {
248275
return new Promise((success, fail) => {
249276
this.resolveSources().then((sources) => {
250277

251-
const progress = new ProgressBar('map [:bar] :percent', {
252-
total: sources.length,
253-
width: 40
254-
})
278+
this.handleProgress(batch, sources.length)
255279
const lastKey = sources[sources.length - 1]
256280

281+
// Loop over sources, apply mapper function to each
257282
sources.forEach((source) => {
283+
const bucket = source.bucket
284+
const key = source.key
285+
const encoding = this.opts.encoding
286+
const transformer = this.opts.transformer
258287
batch.push((done) => {
259-
260-
const bucket = source.bucket
261-
const key = source.key
262-
const encoding = this.opts.encoding
263-
const transformer = this.opts.transformer
264-
265288
this.s3.get(bucket, key, encoding, transformer).then((val) => {
266289
if (isAsync) {
267290
func(val, source.key).then((newval) => {
@@ -275,10 +298,7 @@ class Request {
275298
})
276299
})
277300

278-
if (this.showProgress) {
279-
batch.on('progress', () => progress.tick())
280-
}
281-
301+
// Resolve error or last key processed
282302
batch.end((err) => {
283303
if (err) {
284304
fail(err)
@@ -294,62 +314,60 @@ class Request {
294314
*
295315
* @param {Function} func Function to execute on each value in the array, taking
296316
* three arguments:
297-
* previousValue - The value previously returned in the last invocation of
298-
* func
299-
* currentValue - The current entry being processed
300-
* key - The key of the current object being processed
317+
* `accumulator`: The accumulated value previously returned from the last
318+
* invocation of func
319+
* `currentValue`: The current entry being processed
320+
* `key`: The key of the current object being processed
301321
* func either returns the updated value, or a promise that resolves to the
302322
* updated value.
303-
* @param {String} value Optional. Initial value to use as the first argument
323+
* @param {String} initialValue Optional. Initial value to use as the first
324+
* `previousValue` in `func`. Defaults to `null`.
304325
* @param {Boolean} isAsync Optional, defaults to false. If set to true, this
305326
* indicates that func returns a promise.
327+
* @return {Promise<Object>} Resolves after processing has completed,
328+
* returning the result of the reducer.
306329
*/
307330

308-
reduce(func, val, isAsync) {
331+
reduce(func, initialValue, isAsync) {
309332

310333
isAsync = isAsync || this.opts.async
334+
initialValue = initialValue || null
311335
const batch = new Batch()
312-
batch.concurrency(1)
336+
batch.concurrency(this.opts.concurrency)
313337

314338
return new Promise((success, fail) => {
315339
this.resolveSources().then((sources) => {
316340

317-
const progress = new ProgressBar('reduce [:bar] :percent', {
318-
total: sources.length,
319-
width: 40
320-
})
341+
this.handleProgress(batch, sources.length)
342+
let accumulator = initialValue
321343

344+
// Loop over sources, update `accumulator`
322345
sources.forEach((source) => {
346+
const bucket = source.bucket
347+
const key = source.key
348+
const encoding = this.opts.encoding
349+
const transformer = this.opts.transformer
323350
batch.push((done) => {
324-
325-
const bucket = source.bucket
326-
const key = source.key
327-
const encoding = this.opts.encoding
328-
const transformer = this.opts.transformer
329-
330351
this.s3.get(bucket, key, encoding, transformer).then((body) => {
331352
if (isAsync) {
332-
func(val, body, key).then((newval) => {
333-
val = newval
353+
func(accumulator, body, key).then((newval) => {
354+
accumulator = newval
334355
done()
335356
}).catch(done)
336357
} else {
337-
val = func(val, body, key)
358+
accumulator = func(accumulator, body, key)
338359
done()
339360
}
340361
}).catch(done)
341362
})
342363
})
343364

344-
if (this.showProgress) {
345-
batch.on('progress', () => progress.tick())
346-
}
347-
365+
// Resolve error or reducer result `accumulator`
348366
batch.end((err) => {
349367
if (err) {
350368
fail(err)
351369
} else {
352-
success(val)
370+
success(accumulator)
353371
}
354372
})
355373
}).catch(fail)
@@ -365,6 +383,9 @@ class Request {
365383
* false.
366384
* @param {Boolean} isAsync Optional, defaults to false. If set to true, this
367385
* indicates that func returns a promise.
386+
* @return {Promise<Object>} Resolves after processing has completed,
387+
* returning an object that contains the bucket, prefix, and key of the last
388+
* S3 object that was processed.
368389
*/
369390

370391
filter(func, isAsync) {
@@ -419,22 +440,17 @@ class Request {
419440
return new Promise((success, fail) => {
420441
this.resolveSources().then((sources) => {
421442

422-
const progress = new ProgressBar('filter [:bar] :percent', {
423-
total: sources.length,
424-
width: 40
425-
})
443+
this.handleProgress(batch, sources.length)
444+
const lastKey = sources[sources.length - 1]
426445

427-
// loop over every key and run the filter function on each object. keep
446+
// Loop over every key and run the filter function on each object. keep
428447
// track of files to keep and remove.
429448
sources.forEach((source) => {
430-
449+
const bucket = source.bucket
450+
const key = source.key
451+
const encoding = this.opts.encoding
452+
const transformer = this.opts.transformer
431453
batch.push((done) => {
432-
433-
const bucket = source.bucket
434-
const key = source.key
435-
const encoding = this.opts.encoding
436-
const transformer = this.opts.transformer
437-
438454
this.s3.get(bucket, key, encoding, transformer).then((body) => {
439455
if (isAsync) {
440456
func(body, source).then((result) => {
@@ -459,15 +475,12 @@ class Request {
459475
})
460476
})
461477

462-
if (this.showProgress) {
463-
batch.on('progress', () => progress.tick())
464-
}
465-
478+
// Resolve error or last source processed
466479
batch.end((err) => {
467480
if (err) {
468481
fail(err)
469482
} else {
470-
success()
483+
success(lastKey)
471484
}
472485
})
473486
}).catch(fail)

0 commit comments

Comments
 (0)