Skip to content

Commit b1552e6

Browse files
authored
Merge pull request #6 from tilfin/feature/update-modules
Update modules
2 parents 70bc24e + 14e75db commit b1552e6

File tree

10 files changed

+206
-91
lines changed

10 files changed

+206
-91
lines changed

.npmignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
coverage
22
example
33
test
4+
tmp
45
.gitignore
56
.travis.yml
67
Makefile

README.md

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ kinesis-stream-lambda
1010

1111
* Easily reads a Lambda event of Kinesis Stream as a stream handling the chunk as Buffer
1212
* Supports KPL aggregation (set opts.isAgg true)
13-
* Provides KSL.parseJSON transform to handle items expanded array data in one record (set opts.expandArray true)
13+
* Provides KSL.parseJSON transform to handle items expanded array data in one record (set opts.flatArray true)
1414
* Node.js 6.10 or Later
1515

1616
## How to install
@@ -27,33 +27,59 @@ furthermore,
2727
$ npm install -save aws-kinesis-agg
2828
```
2929

30-
## Lambda handler example
30+
## Lambda handler examples
31+
32+
### async/await style
3133

3234
```javascript
33-
const es = require('event-stream');
35+
const StreamUtils = require('@tilfin/stream-utils');
3436
const KSL = require('kinesis-stream-lambda');
37+
const PromisedLife = require('promised-lifestream');
38+
39+
exports.handler = async function (event) {
40+
console.log('event: ', JSON.stringify(event, null, 2));
41+
42+
const result = [];
43+
44+
await PromisedLife([
45+
KSL.reader(event, { isAgg: false }),
46+
KSL.parseJSON({ flatArray: false }),
47+
StreamUtils.map(function(data, cb) {
48+
result.push(data);
49+
cb(null, data)
50+
})
51+
])
3552

53+
console.dir(result);
54+
}
55+
```
56+
57+
### normal style
58+
59+
```javascript
60+
const StreamUtils = require('@tilfin/stream-utils');
61+
const KSL = require('kinesis-stream-lambda');
3662

37-
exports.handler = function(event, context) {
63+
exports.handler = function (event, context, callback) {
3864
console.log('event: ', JSON.stringify(event, null, 2));
3965

4066
const result = [];
4167
const stream = KSL.reader(event, { isAgg: false });
4268

43-
stream.on('end', function() {
69+
stream.on('end', () => {
4470
console.dir(result);
45-
context.done();
71+
callback();
4672
});
4773

48-
stream.on('error', function(err) {
49-
context.fail(err);
74+
stream.on('error', err => {
75+
callback(err);
5076
});
5177

5278
stream
53-
.pipe(KSL.parseJSON({ expandArray: false }))
54-
.pipe(es.map(function(data, callback) {
79+
.pipe(KSL.parseJSON({ flatArray: false }))
80+
.pipe(StreamUtils.map(function(data, cb) {
5581
result.push(data);
56-
callback(null, data)
82+
cb(null, data)
5783
}));
5884
}
5985
```

example/async.js

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
const StreamUtils = require('@tilfin/stream-utils');
2+
const KSL = require('../lib');
3+
const PromisedLife = require('promised-lifestream');
4+
5+
exports.handler = async function (event) {
6+
console.log('event: ', JSON.stringify(event, null, 2));
7+
8+
const result = [];
9+
10+
await PromisedLife([
11+
KSL.reader(event, { isAgg: false }),
12+
KSL.parseJSON({ flatArray: false }),
13+
StreamUtils.map(function(data, cb) {
14+
result.push(data);
15+
cb(null, data)
16+
})
17+
])
18+
19+
console.dir(result);
20+
}
21+
22+
exports.handler(require('./data'))
23+
.then(() => {
24+
console.info('<<< END >>>')
25+
})
26+
.catch(err => {
27+
console.error('Failed:', err)
28+
})

example/data.json

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
{
2+
"Records": [
3+
{
4+
"kinesis": {
5+
"kinesisSchemaVersion": "1.0",
6+
"partitionKey": "485bd0f3c7f8b6ec3ddc6ada18abcf8d",
7+
"sequenceNumber": "49565441425403787852263465772329791873704550161163747330",
8+
"data": "W3siaWQiOjEsInZhbHVlIjoxMH0seyJpZCI6MiwidmFsdWUiOjIwfSx7ImlkIjozLCJ2YWx1ZSI6MzB9LHsiaWQiOjQsInZhbHVlIjoxNX0seyJpZCI6NSwidmFsdWUiOjI1fV0=",
9+
"approximateArrivalTimestamp": 1473002069.264
10+
},
11+
"eventSource": "aws:kinesis",
12+
"eventVersion": "1.0",
13+
"eventID": "shardId-000000000000:49565441425403787852263465772329791873704550161163747330",
14+
"eventName": "aws:kinesis:record",
15+
"invokeIdentityArn": "arn:aws:iam::986450800000:role/lambda_basic_execution",
16+
"awsRegion": "ap-northeast-1",
17+
"eventSourceARN": "arn:aws:kinesis:ap-northeast-1:986450800000:stream/kpl-stream"
18+
},
19+
{
20+
"kinesis": {
21+
"kinesisSchemaVersion": "1.0",
22+
"partitionKey": "485bd0f3c7f8b6ec3ddc6ada18abcf8d",
23+
"sequenceNumber": "49565441425403787852263465772329791873704550161163747330",
24+
"data": "W3siaWQiOjEsInZhbHVlIjoxMH0seyJpZCI6MiwidmFsdWUiOjIwfSx7ImlkIjozLCJ2YWx1ZSI6MzB9LHsiaWQiOjQsInZhbHVlIjoxNX0seyJpZCI6NSwidmFsdWUiOjI1fV0=",
25+
"approximateArrivalTimestamp": 1473002069.264
26+
},
27+
"eventSource": "aws:kinesis",
28+
"eventVersion": "1.0",
29+
"eventID": "shardId-000000000000:49565441425403787852263465772329791873704550161163747330",
30+
"eventName": "aws:kinesis:record",
31+
"invokeIdentityArn": "arn:aws:iam::986450800000:role/lambda_basic_execution",
32+
"awsRegion": "ap-northeast-1",
33+
"eventSourceARN": "arn:aws:kinesis:ap-northeast-1:986450800000:stream/kpl-stream"
34+
},
35+
{
36+
"kinesis": {
37+
"kinesisSchemaVersion": "1.0",
38+
"partitionKey": "485bd0f3c7f8b6ec3ddc6ada18abcf8d",
39+
"sequenceNumber": "49565441425403787852263465772329791873704550161163747330",
40+
"data": "W3siaWQiOjEsInZhbHVlIjoxMH0seyJpZCI6MiwidmFsdWUiOjIwfSx7ImlkIjozLCJ2YWx1ZSI6MzB9LHsiaWQiOjQsInZhbHVlIjoxNX0seyJpZCI6NSwidmFsdWUiOjI1fV0=",
41+
"approximateArrivalTimestamp": 1473002069.264
42+
},
43+
"eventSource": "aws:kinesis",
44+
"eventVersion": "1.0",
45+
"eventID": "shardId-000000000000:49565441425403787852263465772329791873704550161163747330",
46+
"eventName": "aws:kinesis:record",
47+
"invokeIdentityArn": "arn:aws:iam::986450800000:role/lambda_basic_execution",
48+
"awsRegion": "ap-northeast-1",
49+
"eventSourceARN": "arn:aws:kinesis:ap-northeast-1:986450800000:stream/kpl-stream"
50+
}
51+
]
52+
}

example/normal.js

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
const StreamUtils = require('@tilfin/stream-utils');
2+
const KSL = require('../lib');
3+
4+
exports.handler = function (event, context, callback) {
5+
console.log('event: ', JSON.stringify(event, null, 2));
6+
7+
const result = [];
8+
const stream = KSL.reader(event, { isAgg: false });
9+
10+
stream.on('end', () => {
11+
console.dir(result);
12+
callback();
13+
});
14+
15+
stream.on('error', err => {
16+
callback(err);
17+
});
18+
19+
stream
20+
.pipe(KSL.parseJSON({ flatArray: false }))
21+
.pipe(StreamUtils.map(function(data, cb) {
22+
result.push(data);
23+
cb(null, data)
24+
}));
25+
}
26+
27+
exports.handler(require('./data'), null, function (err) {
28+
if (err) console.error('Failed:', err)
29+
else console.info('<<< END >>>')
30+
})

lib/json_transform.js

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,15 @@ class JSONTransform extends Transform {
1414
*
1515
* @param {Object} options - Transform options
1616
* @param {Number} options.countBy - number of block items
17-
* @param {Boolean} options.expandArray - Flatten an array into items
17+
* @param {Boolean} options.flatArray - Flatten an array into items
18+
* @param {Boolean} options.expandArray - Deprecated. use flatArray
1819
*/
1920
constructor(options) {
2021
const opts = options || {};
2122
const countBy = opts.countBy || 1;
22-
const expanding = opts.expandArray || false;
23+
const expanding = opts.flatArray || opts.expandArray || false;
2324
delete opts.countBy;
25+
delete opts.flatArray;
2426
delete opts.expandArray;
2527
opts.objectMode = true;
2628
super(opts);

package.json

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
{
22
"name": "kinesis-stream-lambda",
3-
"version": "0.8.1",
3+
"version": "0.9.0",
44
"description": "Readable stream in Lambda for Kinesis Stream",
55
"main": "lib/index.js",
66
"scripts": {
77
"test": "mocha test/*.test.js",
8-
"coverage": "./node_modules/.bin/istanbul cover _mocha -r test/*.test.js"
8+
"coverage": "istanbul cover _mocha -r test/*.test.js"
99
},
1010
"keywords": [
1111
"kinesis",
@@ -19,16 +19,19 @@
1919
"url": "https://github.com/tilfin/kinesis-stream-lambda"
2020
},
2121
"devDependencies": {
22-
"aws-kinesis-agg": "^2.2.2",
23-
"chai": "^3.5.0",
24-
"coveralls": "^2.11.12",
25-
"event-stream": "=3.3.4",
22+
"@tilfin/stream-utils": "^0.6.0",
23+
"aws-kinesis-agg": "^4.0.2",
24+
"chai": "^4.2.0",
25+
"coveralls": "^3.0.2",
2626
"istanbul": "^0.4.5",
27-
"mocha": "^3.0.2",
28-
"mocha-lcov-reporter": "^1.2.0",
29-
"through2": "^2.0.1"
27+
"mocha": "^5.2.0",
28+
"mocha-lcov-reporter": "^1.3.0",
29+
"through2": "^3.0.0"
3030
},
3131
"engines": {
32-
"node": ">=6"
32+
"node": ">=6.10"
33+
},
34+
"dependencies": {
35+
"promised-lifestream": "^0.7.0"
3336
}
3437
}

test/json_stream.test.js

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
'use strict';
1+
'use strict'
2+
3+
const StreamUtils = require('@tilfin/stream-utils')
4+
const { assert } = require('chai')
25

36
const fs = require('fs');
4-
const es = require('event-stream');
57
const through2 = require('through2');
6-
const chai = require('chai');
7-
const assert = chai.assert;
88

99
const JSONTransform = require('../lib/json_transform');
1010

@@ -30,13 +30,13 @@ describe('JSONTransform', () => {
3030
context('passed 9 items JSON with count by 2 and expanding array', () => {
3131
it('flush 5 items', (done) => {
3232
const readStream = fs.createReadStream(__dirname + '/fixtures/data/data_json.txt');
33-
const jsonStream = new JSONTransform({ expandArray: true, countBy: 2 })
33+
const jsonStream = new JSONTransform({ flatArray: true, countBy: 2 })
3434
.on('error', function(err) {
3535
assert.ifError(err);
3636
done();
3737
});
3838

39-
const writeStream = es.writeArray(function (err, array) {
39+
const writeStream = StreamUtils.writeArray(function (err, array) {
4040
assert.deepEqual(array[0], [{ id: 1 }, { id: 2 }]);
4141
assert.deepEqual(array[1], [{ id: 3 }, { id: 4 }]);
4242
assert.deepEqual(array[2], [{ id: 5 }, { id: 6 }]);
@@ -52,7 +52,7 @@ describe('JSONTransform', () => {
5252
context('highWaterMark is less than item count of 1 JSON line', () => {
5353
it('reads rightly', (done) => {
5454
const readStream = fs.createReadStream(__dirname + '/fixtures/data/data_json.txt');
55-
const jsonStream = new JSONTransform({ highWaterMark: 7, expandArray: true, countBy: 1 })
55+
const jsonStream = new JSONTransform({ highWaterMark: 7, flatArray: true, countBy: 1 })
5656
.on('error', function(err) {
5757
assert.ifError(err);
5858
done();
@@ -70,7 +70,7 @@ describe('JSONTransform', () => {
7070
cb();
7171
});
7272

73-
const writeStream = es.writeArray(function (err, array) {
73+
const writeStream = StreamUtils.writeArray(function (err, array) {
7474
assert.equal(array.length, 9);
7575
done();
7676
});
@@ -84,7 +84,7 @@ describe('JSONTransform', () => {
8484
it(`flush valid items through '${errMsg}'`, (done) => {
8585
const readStream = fs.createReadStream(__dirname + '/fixtures/data/multiline_json.txt', { highWaterMark });
8686
const jsonStream = new JSONTransform()
87-
const writeStream = es.writeArray(function (err, array) {
87+
const writeStream = StreamUtils.writeArray(function (err, array) {
8888
assert.deepEqual(array[0], { color: "red", value: "#f00" });
8989
assert.deepEqual(array[1], { color: "green", value: "#0f0" });
9090
assert.deepEqual(array[2], { color: "blue", value: "#00f" });

0 commit comments

Comments
 (0)