Skip to content

Commit c3247fd

Browse files
authored
Merge pull request #3 from tilfin/refactor/use-class
Use class define
2 parents 61d0b52 + ef9cbba commit c3247fd

File tree

4 files changed

+106
-116
lines changed

4 files changed

+106
-116
lines changed

lib/json_transform.js

Lines changed: 48 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,69 +1,70 @@
11
'use strict';
22

33
const Transform = require('stream').Transform;
4-
const util = require('util');
5-
64

75
/**
86
* JSONTransform
9-
*
10-
* @param {Object} opts - Transform options
11-
* @param {Number} opts.countBy - number of block items
12-
* @param {Boolean} opts.expandArray - Flatten an array into items
137
*/
14-
function JSONTransform(opts) {
15-
if (!(this instanceof JSONTransform))
16-
return new JSONTransform(opts);
8+
class JSONTransform extends Transform {
179

18-
const opts_ = opts || {};
19-
this._countBy = opts_.countBy || 1;
20-
this._expanding = opts_.expandArray || false;
21-
delete opts_.countBy;
22-
delete opts_.expandArray;
10+
/**
11+
* Contructor
12+
*
13+
* @param {Object} options - Transform options
14+
* @param {Number} options.countBy - number of block items
15+
* @param {Boolean} options.expandArray - Flatten an array into items
16+
*/
17+
constructor(options) {
18+
const opts = options || {};
19+
const countBy = opts.countBy || 1;
20+
const expanding = opts.expandArray || false;
21+
delete opts.countBy;
22+
delete opts.expandArray;
23+
opts.objectMode = true;
24+
super(opts);
2325

24-
this._items = [];
26+
this._countBy = countBy;
27+
this._expanding = expanding;
28+
this._items = [];
29+
}
2530

26-
opts_.objectMode = true;
27-
Transform.call(this, opts_);
28-
}
29-
util.inherits(JSONTransform, Transform);
31+
_transform(chunk, encoding, cb) {
32+
try {
33+
const data = JSON.parse(chunk);
34+
if (this._expanding && (data instanceof Array)) {
35+
this._items = this._items.concat(data);
36+
} else {
37+
this._items.push(data);
38+
}
39+
} catch (err) {
40+
cb(err, null);
41+
return;
42+
}
3043

31-
JSONTransform.prototype._transform = function(chunk, encoding, cb) {
32-
try {
33-
const data = JSON.parse(chunk);
34-
if (this._expanding && (data instanceof Array)) {
35-
this._items = this._items.concat(data);
36-
} else {
37-
this._items.push(data);
44+
while (this._items.length >= this._countBy) {
45+
if (!this._pushItems(this._shiftItems())) break;
3846
}
39-
} catch (err) {
40-
cb(err, null);
41-
return;
47+
cb();
4248
}
4349

44-
while (this._items.length >= this._countBy) {
45-
if (!this._enqueue(this._shiftItems())) break;
46-
}
47-
cb();
48-
}
50+
_flush(cb) {
51+
const items = this._items;
4952

50-
JSONTransform.prototype._flush = function(cb) {
51-
const items = this._items;
53+
while (items.length >= this._countBy) {
54+
this._pushItems(this._shiftItems());
55+
}
5256

53-
while (items.length >= this._countBy) {
54-
this._enqueue(this._shiftItems());
57+
if (items.length) this._pushItems(items);
58+
cb();
5559
}
5660

57-
if (items.length) this._enqueue(items);
58-
cb();
59-
}
60-
61-
JSONTransform.prototype._enqueue = function(items) {
62-
return this.push(this._countBy > 1 ? items : items[0]);
63-
}
61+
_pushItems(items) {
62+
return this.push(this._countBy > 1 ? items : items[0]);
63+
}
6464

65-
JSONTransform.prototype._shiftItems = function() {
66-
return this._items.splice(0, this._countBy);
65+
_shiftItems() {
66+
return this._items.splice(0, this._countBy);
67+
}
6768
}
6869

6970
module.exports = JSONTransform;

lib/read_stream.js

Lines changed: 51 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,61 +1,66 @@
11
'use strict';
22

3-
const util = require('util');
43
const Readable = require('stream').Readable;
54

5+
/**
6+
* KLReadStream
7+
*/
8+
class KLReadStream extends Readable {
69

7-
function KLReadStream(event, options) {
8-
if (!(this instanceof KLReadStream))
9-
return new KLReadStream(event, options);
10+
/**
11+
* Contructor
12+
*
13+
* @param {Object} event - Lambda event
14+
* @param {Object} options - Readable stream options
15+
* @param {Boolean} options.isAgg - Use KPL aggregation or not
16+
*/
17+
constructor(event, options) {
18+
const opts = options || {};
19+
const isAgg = opts.isAgg;
20+
delete opts.isAgg;
21+
opts.objectMode = true;
22+
super(opts);
1023

11-
this._records = event.Records;
12-
this._recCnt = this._records.length;
13-
this._recIdx = 0;
14-
15-
const opts = options || {};
16-
if (opts.isAgg) {
17-
this._agg = require('aws-kinesis-agg');
18-
} else {
19-
this._agg = null;
24+
this._records = event.Records;
25+
this._recCnt = this._records.length;
26+
this._recIdx = 0;
27+
this._agg = isAgg ? require('aws-kinesis-agg') : null;
2028
}
21-
delete opts.isAgg;
2229

23-
opts.objectMode = true;
24-
Readable.call(this, opts);
25-
}
26-
util.inherits(KLReadStream, Readable);
27-
KLReadStream.prototype._read = function(m) {
28-
this._nextRecord();
29-
}
30-
KLReadStream.prototype._nextRecord = function() {
31-
if (this._recIdx === this._recCnt) {
32-
this.push(null);
33-
return;
30+
_read(m) {
31+
this._nextRecord();
3432
}
3533

36-
const record = this._records[this._recIdx++];
37-
if (this._agg) {
38-
this._agg.deaggregateSync(record.kinesis, true, (err, userRecords) => {
39-
if (err) {
40-
this.emit('error', err);
41-
} else {
42-
userRecords.forEach((usrRec) => {
43-
this._processRecord(usrRec.data);
44-
});
45-
}
46-
});
47-
} else {
48-
this._processRecord(record.kinesis.data);
34+
_nextRecord() {
35+
if (this._recIdx === this._recCnt) {
36+
this.push(null);
37+
return;
38+
}
39+
40+
const record = this._records[this._recIdx++];
41+
if (this._agg) {
42+
this._agg.deaggregateSync(record.kinesis, true, (err, userRecords) => {
43+
if (err) {
44+
this.emit('error', err);
45+
} else {
46+
userRecords.forEach((usrRec) => {
47+
this._processRecord(usrRec.data);
48+
});
49+
}
50+
});
51+
} else {
52+
this._processRecord(record.kinesis.data);
53+
}
4954
}
50-
}
51-
KLReadStream.prototype._processRecord = function(b64Data) {
52-
try {
53-
const buf = new Buffer(b64Data, 'base64');
54-
this.push(buf);
55-
} catch (err) {
56-
this.emit('error', Error('invalid Base64 data'));
55+
56+
_processRecord(b64Data) {
57+
try {
58+
const buf = new Buffer(b64Data, 'base64');
59+
this.push(buf);
60+
} catch (err) {
61+
this.emit('error', Error('invalid Base64 data'));
62+
}
5763
}
5864
}
5965

60-
6166
module.exports = KLReadStream;

test/json_stream.test.js

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ describe('JSONTransform', () => {
1313
context('passed valid JSON', () => {
1414
it('raises finish event', (done) => {
1515
const readStream = fs.createReadStream(__dirname + '/fixtures/data/valid_json.txt');
16-
const jsonStream = JSONTransform()
16+
const jsonStream = new JSONTransform()
1717
.on('finish', function() {
1818
assert.isOk(true);
1919
done();
@@ -30,7 +30,7 @@ describe('JSONTransform', () => {
3030
context('passed 9 items JSON with count by 2 and expanding array', () => {
3131
it('flush 5 items', (done) => {
3232
const readStream = fs.createReadStream(__dirname + '/fixtures/data/data_json.txt');
33-
const jsonStream = JSONTransform({ expandArray: true, countBy: 2 })
33+
const jsonStream = new JSONTransform({ expandArray: true, countBy: 2 })
3434
.on('error', function(err) {
3535
assert.ifError(err);
3636
done();
@@ -52,7 +52,7 @@ describe('JSONTransform', () => {
5252
context('highWaterMark is less than item count of 1 JSON line', () => {
5353
it('reads rightly', (done) => {
5454
const readStream = fs.createReadStream(__dirname + '/fixtures/data/data_json.txt');
55-
const jsonStream = JSONTransform({ highWaterMark: 7, expandArray: true, countBy: 1 })
55+
const jsonStream = new JSONTransform({ highWaterMark: 7, expandArray: true, countBy: 1 })
5656
.on('error', function(err) {
5757
assert.ifError(err);
5858
done();
@@ -82,7 +82,7 @@ describe('JSONTransform', () => {
8282
context('passed invalid JSON', () => {
8383
it('raises error event', (done) => {
8484
const readStream = fs.createReadStream(__dirname + '/fixtures/data/invalid_json.txt');
85-
const jsonStream = JSONTransform()
85+
const jsonStream = new JSONTransform()
8686
.on('finish', function() {
8787
assert.isOk(false);
8888
done();

test/read_stream.test.js

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,31 +4,15 @@ const es = require('event-stream');
44
const chai = require('chai');
55
const assert = chai.assert;
66

7-
const KLReader = require('../lib/read_stream');
7+
const KLReadStream = require('../lib/read_stream');
88

99

10-
describe('KinesisLambda.reader', () => {
11-
context('called function', () => {
12-
it('OK', () => {
13-
const event = require('./fixtures/events/data-0');
14-
const readStream = KLReader(event);
15-
16-
readStream
17-
.pipe(es.writeArray(function(err, array) {
18-
assert.equal(array[0].toString(), 'data');
19-
}))
20-
.on('end', function() {
21-
assert.isOk(true);
22-
done();
23-
});
24-
});
25-
});
26-
10+
describe('KLReadStream', () => {
2711
context('aggregation mode', () => {
2812
context('data is invalid base64', () => {
2913
it('raises error event', (done) => {
3014
const event = require('./fixtures/events/agg-data-invalid');
31-
const readStream = KLReader(event, { isAgg: true });
15+
const readStream = new KLReadStream(event, { isAgg: true });
3216

3317
readStream.on('error', function(err) {
3418
assert.instanceOf(err, Error);

0 commit comments

Comments
 (0)