Skip to content

Commit c2cd81c

Browse files
authored
Merge pull request #1 from tilfin/fix/throttle
Update JSONTransform
2 parents 6809441 + 8814f6e commit c2cd81c

File tree

5 files changed

+94
-42
lines changed

5 files changed

+94
-42
lines changed

lib/index.js

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
'use strict';
22

33
const KLReadStream = require('./read_stream');
4-
4+
const JSONTransform = require('./json_transform');
55

66
exports.reader = function(event, opts) {
77
return new KLReadStream(event, opts);
88
}
99

10-
exports.parseJSON = require('./json_transform');
10+
exports.parseJSON = function(opts) {
11+
return new JSONTransform(opts);
12+
}

lib/json_transform.js

Lines changed: 66 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,69 @@
11
'use strict';
22

3-
const through2 = require('through2');
4-
5-
6-
module.exports = function(options) {
7-
const opts = options || {};
8-
const countBy = opts.countBy || 1;
9-
const expanding = opts.expandArray || false;
10-
11-
let items = [];
12-
13-
const stream = through2({ objectMode: true },
14-
function(chunk, enc, cb) {
15-
try {
16-
const data = JSON.parse(chunk);
17-
if (expanding && (data instanceof Array)) {
18-
items = items.concat(data);
19-
} else {
20-
items.push(data);
21-
}
22-
} catch (err) {
23-
cb(err, null);
24-
return;
25-
}
26-
27-
while (items.length >= countBy) {
28-
const enqItems = items.splice(0, countBy);
29-
this.push(countBy > 1 ? enqItems : enqItems[0]);
30-
}
31-
cb();
32-
}, function(cb) {
33-
if (items.length) {
34-
this.push(items);
35-
}
36-
cb();
37-
});
38-
39-
return stream;
3+
const Transform = require('stream').Transform;
4+
const util = require('util');
5+
6+
7+
/**
8+
* JSONTransform
9+
*
10+
* @param {Object} opts - Transform options
11+
* @param {Number} opts.countBy - number of block items
12+
* @param {Boolean} opts.expandArray - Flatten an array into items
13+
*/
14+
function JSONTransform(opts) {
15+
if (!(this instanceof JSONTransform))
16+
return new JSONTransform(opts);
17+
18+
const opts_ = opts || {};
19+
this._countBy = opts_.countBy || 1;
20+
this._expanding = opts_.expandArray || false;
21+
delete opts_.countBy;
22+
delete opts_.expandArray;
23+
24+
this._items = [];
25+
26+
opts_.objectMode = true;
27+
Transform.call(this, opts_);
4028
}
29+
util.inherits(JSONTransform, Transform);
30+
31+
JSONTransform.prototype._transform = function(chunk, encoding, cb) {
32+
try {
33+
const data = JSON.parse(chunk);
34+
if (this._expanding && (data instanceof Array)) {
35+
this._items = this._items.concat(data);
36+
} else {
37+
this._items.push(data);
38+
}
39+
} catch (err) {
40+
cb(err, null);
41+
return;
42+
}
43+
44+
while (this._items.length >= this._countBy) {
45+
if (!this._enqueue(this._shiftItems())) break;
46+
}
47+
cb();
48+
}
49+
50+
JSONTransform.prototype._flush = function(cb) {
51+
const items = this._items;
52+
53+
while (items.length >= this._countBy) {
54+
this._enqueue(this._shiftItems());
55+
}
56+
57+
if (items.length) this._enqueue(items);
58+
cb();
59+
}
60+
61+
JSONTransform.prototype._enqueue = function(items) {
62+
return this.push(this._countBy > 1 ? items : items[0]);
63+
}
64+
65+
JSONTransform.prototype._shiftItems = function() {
66+
return this._items.splice(0, this._countBy);
67+
}
68+
69+
module.exports = JSONTransform;

package.json

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,6 @@
1818
"type": "git",
1919
"url": "https://github.com/tilfin/kinesis-stream-lambda"
2020
},
21-
"dependencies": {
22-
"through2": "^2.0.1"
23-
},
2421
"devDependencies": {
2522
"aws-kinesis-agg": "^2.2.2",
2623
"chai": "^3.5.0",

test/fixtures/data/data_json.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
[{"id":1},{"id":2},{"id":3},{"id":4},{"id":5},{"id":6},{"id":7},{"id":8},{"id":9}]

test/json_stream.test.js

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
'use strict';
22

33
const fs = require('fs');
4+
const es = require('event-stream');
45
const chai = require('chai');
56
const assert = chai.assert;
67

@@ -25,6 +26,28 @@ describe('KinesisLambda.parseJSON', () => {
2526
});
2627
});
2728

29+
context('passed 9 items JSON with count by 2 and expanding array', () => {
30+
it('flush 5 items', (done) => {
31+
const readStream = fs.createReadStream(__dirname + '/fixtures/data/data_json.txt');
32+
const jsonStream = JSONTransform({ expandArray: true, countBy: 2 })
33+
.on('error', function(err) {
34+
assert.ifError(err);
35+
done();
36+
});
37+
38+
const writeStream = es.writeArray(function (err, array) {
39+
assert.deepEqual(array[0], [{ id: 1 }, { id: 2 }]);
40+
assert.deepEqual(array[1], [{ id: 3 }, { id: 4 }]);
41+
assert.deepEqual(array[2], [{ id: 5 }, { id: 6 }]);
42+
assert.deepEqual(array[3], [{ id: 7 }, { id: 8 }]);
43+
assert.deepEqual(array[4], [{ id: 9 }]);
44+
done();
45+
});
46+
47+
readStream.pipe(jsonStream).pipe(writeStream);
48+
});
49+
});
50+
2851
context('passed invalid JSON', () => {
2952
it('raises error event', (done) => {
3053
const readStream = fs.createReadStream(__dirname + '/fixtures/data/invalid_json.txt');

0 commit comments

Comments
 (0)