Skip to content

Commit bf68b5f

Browse files
committed
Defitions, Configuration, Disconnect, etc
1 parent ea096a3 commit bf68b5f

File tree

8 files changed

+79
-5
lines changed

8 files changed

+79
-5
lines changed

.env.json

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
{
2+
"Topic1Name":"kafka-test-topic",
3+
"Topic1Partition":"1",
4+
"Topic1Replicas":"1",
5+
"ZookeeperPort":"2181",
6+
"KafkaProtocol":"kafka",
7+
"KafkaPort":"9092",
8+
"KafkaIP":"localhost",
9+
"Throttle":"250"
10+
}

API.md

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,35 @@
1+
<a name="top"></a>
2+
13
# API
24

5+
* [Configuration](#Configuration)
6+
* [Consumer](#Consumer)
7+
* [Producer](#Producer)
8+
9+
10+
<a name="Configuration"></a>
11+
12+
## Configuration
13+
14+
### All
15+
16+
Configurations custom to this wrapper
17+
18+
| Field | Description| Type | Default |
19+
|------------|------------|------------|-----------|
20+
| throttle | Throttle interval time (ms) | Number | 500|
21+
| topics | Topics to subscribe to | String[] | \['kafka-test-topic'\] |
22+
| autoInterval | Allow auto intervals for polling (producer) and consuming (consumer). | boolean | true |
23+
24+
### Consumer
25+
26+
Configurations custom to this wrapper's Consumer class.
27+
28+
| Field | Description| Type | Default |
29+
|------------|------------|------------|-----------|
30+
| consumeMax | Number of messages to consume for each interval. | Number | 1 |
31+
32+
[back to top](#top)
333

434
<a name="Consumer"></a>
535

src/client.js

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@ class KafkaClient {
7272
}
7373
return new Promise(
7474
(resolve, reject) => {
75-
kafkaClient.disconnect();
7675

7776
kafkaClient.prependListener('event.error', (err) => {
7877
console.error('Disconnect Operation (Error)', `${new Date()}: Error:`, err);
@@ -84,6 +83,14 @@ class KafkaClient {
8483
console.log('Disconnect Operation', `${new Date()}: Client Disconnected: ${JSON.stringify(arg)}`);
8584
return resolve();
8685
});
86+
87+
kafkaClient.disconnect();
88+
89+
setTimeout(() => {
90+
const err = new Error(`Disconnect Operation (Error) ${new Date()}: Failed to Disconnect`);
91+
console.log(err.message);
92+
return reject(err);
93+
}, 20000); // Wait 20 seconds
8794
})
8895
.then(() => {
8996
KAFKA_EVENTS.forEach((event) => {
@@ -113,6 +120,10 @@ class KafkaClient {
113120
console.error(`Error Log: ${new Date()}:`, err);
114121
this._errorDispatcher.next(err);
115122
});
123+
124+
kafkaClient.on('disconnected', (arg) => {
125+
126+
})
116127
}
117128

118129
/**

src/consumer.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ class Consumer extends KafkaClient {
110110
this._consumeLoop = setInterval(() => {
111111
// start consuming messages
112112
this.kafkaConsumer.consume(this._config.consumeMax);
113-
}, this._config.throttle);
113+
}, this._config.e);
114114
}
115115

116116
}

src/default-config.js

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,17 @@
1-
const ENV = require('dotenv').config().parsed; // Environment from docker .ENV
1+
// const ENV = require('dotenv').config().parsed; // Environment from docker .ENV
2+
// @TODO fix ENV
3+
const ENV = require('../.env.json');
24

5+
/*
6+
Topic1Name=kafka-test-topic
7+
Topic1Partition=1
8+
Topic1Replicas=1
9+
ZookeeperPort=2181
10+
KafkaProtocol=kafka
11+
KafkaPort=9092
12+
KafkaIP=localhost
13+
Throttle=250
14+
*/
315
/**
416
* KafkaConfig
517
*/
@@ -8,7 +20,7 @@ module.exports = {
820
client: {
921
// debug: 'all',
1022
'group.id': 'kafka',
11-
'metadata.broker.list': `${ENV.KafkaIP}:${ENV.KafkaPort}`,
23+
'metadata.broker.list': 'localhost:9092',
1224
'enable.auto.commit': false
1325
},
1426
topics: [ENV.Topic1Name],

src/index.d.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,12 @@ export interface MessagePayload {
166166
}
167167

168168
export interface DeliveryReport {
169+
topic: string;
170+
partition: number;
171+
offset: number;
172+
key: string;
173+
opaque: string;
174+
size: number;
169175

170176
}
171177

src/producer.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ class Producer extends KafkaClient {
6262
* @param {String} [key=null] - keyed message (optional)
6363
* @param {String} [opaque=null] - opaque token which gets passed along to your delivery reports
6464
* @return {Promise<DeliveryReport>}
65+
* @TODO will delivery report be synchronized with produce?
6566
*/
6667
publish(message, topic = this._config.topics[0], partition = -1, key = null, opaque = null) {
6768
return new Promise((resolve, reject) => {

test.js

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,11 @@ describe('Kafka Test', () => {
1010
});
1111

1212
afterEach(() => {
13-
return Promise.all([producer.disconnect(), consumer.disconnect()]);
13+
console.log('Disconnect');
14+
return Promise.all([producer.disconnect(), consumer.disconnect()])
15+
.then((data) => {
16+
console.log('Disconnected', data);
17+
});
1418
});
1519

1620
it('Pub Sub', () => {

0 commit comments

Comments
 (0)