Skip to content

Commit 9e66814

Browse files
authored
Merge pull request #4 from tilfin/fix/parse-json
Fix parsing JSON fragments
2 parents c3247fd + 0c6baac commit 9e66814

File tree

6 files changed

+80
-12
lines changed

6 files changed

+80
-12
lines changed

.travis.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
language: node_js
22
node_js:
3-
- "4.3"
4-
- "6.0"
3+
- "6.10"
4+
- "8"
55
script: 'make test-cov'
66
after_success: 'make coveralls; make clean'

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ kinesis-stream-lambda
22
=====================
33

44
[![NPM Version][npm-image]][npm-url]
5+
[![Node](https://img.shields.io/node/v/kinesis-stream-lambda.svg)]()
56
[![Build Status](https://travis-ci.org/tilfin/kinesis-stream-lambda.svg?branch=master)](https://travis-ci.org/tilfin/kinesis-stream-lambda)
67
[![Coverage Status](https://coveralls.io/repos/github/tilfin/kinesis-stream-lambda/badge.svg?branch=master)](https://coveralls.io/github/tilfin/kinesis-stream-lambda?branch=master)
78

@@ -10,7 +11,7 @@ kinesis-stream-lambda
1011
* Easily reads a Lambda event of Kinesis Stream as a stream handling the chunk as Buffer
1112
* Supports KPL aggregation (set opts.isAgg true)
1213
* Provides KSL.parseJSON transform to handle items expanded array data in one record (set opts.expandArray true)
13-
* Node.js 4.3 or Later
14+
* Node.js 6.10 or Later
1415

1516
## How to install
1617

lib/json_transform.js

Lines changed: 51 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
const Transform = require('stream').Transform;
44

5+
const JSONUnexpectedTokenRe = /^Unexpected token (.) in JSON at position (\d+)$/;
6+
57
/**
68
* JSONTransform
79
*/
@@ -26,19 +28,27 @@ class JSONTransform extends Transform {
2628
this._countBy = countBy;
2729
this._expanding = expanding;
2830
this._items = [];
31+
this._buf = '';
2932
}
3033

3134
_transform(chunk, encoding, cb) {
35+
this._buf += chunk.toString();
36+
3237
try {
33-
const data = JSON.parse(chunk);
34-
if (this._expanding && (data instanceof Array)) {
35-
this._items = this._items.concat(data);
36-
} else {
37-
this._items.push(data);
38-
}
38+
const data = JSON.parse(this._buf);
39+
this._pushItemsFromData(data);
40+
this._buf = '';
3941
} catch (err) {
40-
cb(err, null);
41-
return;
42+
if (!(err instanceof SyntaxError)) {
43+
return cb(err);
44+
}
45+
46+
const r = this._rescueJSONError(err);
47+
if (r === 'next') {
48+
return cb();
49+
} else if (r === 'error') {
50+
return cb(err);
51+
}
4252
}
4353

4454
while (this._items.length >= this._countBy) {
@@ -47,6 +57,39 @@ class JSONTransform extends Transform {
4757
cb();
4858
}
4959

60+
_rescueJSONError(err) {
61+
const errMsg = err.message;
62+
if (errMsg === 'Unexpected end of JSON input') {
63+
// JSON unfinished
64+
return 'next';
65+
}
66+
67+
const md = JSONUnexpectedTokenRe.exec(errMsg);
68+
if (md) {
69+
const pos = Number(md[2]);
70+
if (md[1] === '{') {
71+
// another JSON follows
72+
const data = JSON.parse(this._buf.substr(0, pos));
73+
this._pushItemsFromData(data);
74+
this._buf = this._buf.substr(pos);
75+
return 'continue';
76+
} else if (this._buf.substr(pos - 1, 1) === '"') {
77+
// JSON unfinished
78+
return 'next';
79+
}
80+
}
81+
82+
return 'error';
83+
}
84+
85+
_pushItemsFromData(data) {
86+
if (this._expanding && (data instanceof Array)) {
87+
this._items = this._items.concat(data);
88+
} else {
89+
this._items.push(data);
90+
}
91+
}
92+
5093
_flush(cb) {
5194
const items = this._items;
5295

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "kinesis-stream-lambda",
3-
"version": "0.7.2",
3+
"version": "0.8.0",
44
"description": "Readable stream in Lambda for Kinesis Stream",
55
"main": "lib/index.js",
66
"scripts": {
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{"color":"red","value":"#f00"}
2+
{"color":"green","value":"#0f0"}
3+
{"color":"blue","value":"#00f"}

test/json_stream.test.js

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,27 @@ describe('JSONTransform', () => {
7979
});
8080
});
8181

82+
context('passed multiline JSON', () => {
83+
const sharedExamples = function(highWaterMark, errMsg) {
84+
it(`flush valid items through '${errMsg}'`, (done) => {
85+
const readStream = fs.createReadStream(__dirname + '/fixtures/data/multiline_json.txt', { highWaterMark });
86+
const jsonStream = new JSONTransform()
87+
const writeStream = es.writeArray(function (err, array) {
88+
assert.deepEqual(array[0], { color: "red", value: "#f00" });
89+
assert.deepEqual(array[1], { color: "green", value: "#0f0" });
90+
assert.deepEqual(array[2], { color: "blue", value: "#00f" });
91+
done();
92+
});
93+
94+
readStream.pipe(jsonStream).pipe(writeStream);
95+
});
96+
}
97+
98+
sharedExamples(4, 'Unexpected token A in JSON at position');
99+
sharedExamples(25, 'Unexpected end of JSON input');
100+
sharedExamples(36, 'Unexpected token { in JSON at position');
101+
});
102+
82103
context('passed invalid JSON', () => {
83104
it('raises error event', (done) => {
84105
const readStream = fs.createReadStream(__dirname + '/fixtures/data/invalid_json.txt');

0 commit comments

Comments
 (0)