Skip to content

Commit 26da00f

Browse files
author
Jayakrishnan JR
committed
codes
1 parent 08a72f8 commit 26da00f

File tree

6 files changed

+4054
-0
lines changed

6 files changed

+4054
-0
lines changed

config.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
module.exports = {
2+
kafka_topic: 'example',
3+
kafka_server: 'localhost:2181',
4+
};

consumer.app.js

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
2+
const kafka = require('kafka-node');
3+
const bp = require('body-parser');
4+
const config = require('./config');
5+
6+
try {
7+
const Consumer = kafka.HighLevelConsumer;
8+
const client = new kafka.Client(config.kafka_server);
9+
let consumer = new Consumer(
10+
client,
11+
[{ topic: config.kafka_topic, partition: 0 }],
12+
{
13+
autoCommit: true,
14+
fetchMaxWaitMs: 1000,
15+
fetchMaxBytes: 1024 * 1024,
16+
encoding: 'utf8',
17+
fromOffset: false
18+
}
19+
);
20+
consumer.on('message', async function(message) {
21+
console.log('here');
22+
console.log(
23+
'kafka-> ',
24+
message.value
25+
);
26+
})
27+
consumer.on('error', function(err) {
28+
console.log('error', err);
29+
});
30+
}
31+
catch(e) {
32+
console.log(e);
33+
}

0 commit comments

Comments
 (0)