Listens to topics and executes asynchronous functions able to process each kafka message,
ensuring that any processing will succeed, before the corresponding message offset is committed.
Installation
Usage
Documentation
- Simple API
- Ensures that all the jobs will be executed successfully before a message will be committed
- Retry strategy for jobs that fail
- Graceful shutdown
Install with yarn or npm
yarn add kafka-executor
#or
npm install kafka-executor --save
import KafkaExecutor, { Job } from 'kafka-executor';
const executor = new KafkaExecutor({
brokerList:'0.0.0.0:9092,0.0.0.0:9091',
topics:['topic1','topic2'],
groupId:'groupId',
});
executor.init();
executor.addJob('myJobId',new Job((kafkaMessage)=>{
console.log(kafkaMessage);
return Promise.resolve();
}));
import { Job } from 'kafka-executor';
new Job(() => Promise.resolve(), {
maxRetries?: number,
retryDelay?: number | (retryNumber: number) => number,
shouldRetry?: boolean | (err: Error) => boolean,
})
| Name |
Required |
Default |
Description |
| maxRetries: number |
no |
3 |
How many times must retry until fail |
| retryDelay: number | (retryIndex)=>number |
no |
60000 ms |
The delay between the retries in ms |
| shouldRetry: boolean | (error)=>boolean |
no |
true |
Determines if a job have to retry in case of failure |
import KafkaExecutor from 'kafka-executor';
new KafkaExecutor({
brokerList: string;
groupId: string;
topics: string[];
connectionTimeout: string[];
checkInterval?: number;
batchSize?: number;
errorHandler?: (err: Error[], message:KafkaMessage,commit:Function) => void;
logger?: (message: string, type: LogType, code?: string) => void;
maxRetries?: number;
retryDelay?: number;
consumer?: object;
})
| Name |
Required |
Default |
Description |
| brokerList: string |
yes |
- |
Initial list of brokers as a CSV list of broker host or host:port |
| topics: [string] |
yes |
- |
The topics that the consumer will listen to |
| groupId: string |
yes |
- |
Client group id string. All clients sharing the same group.id belong to the same group |
| checkInterval: number |
no |
2000 |
How match time to wait until check for new messages in case of dead period |
| batchSize: number |
no |
1 |
How many messages to process concurrently, Change this according to your error tolerance |
| errorHandler: (error,kafkaMessage,commit:Function)=>void |
no |
yes |
A function responsible for handling job errors. By Default the process will exit with code 1 |
| logger: (message:string, type:'info'|'warn'|'error', code)=>void |
no |
console |
A function responsible for logging |
| consumer: object |
no |
- |
Options for the consumer see rdkafka configuration options |
| maxRetries: number |
no |
3 |
Global configuration for all jobs |
| retryDelay: number |
no |
60000 ms |
Global configuration for all jobs |
import KafkaExecutor from 'kafka-executor';
const executor = new KafkaExecutor({
brokerList: '0.0.0.0:9092';
groupId: 'group';
topics: ['topic'];
});
executor.addJob('myJobId',new Job(...))
executor.init()
executor.removeJob('myJobId')
executor.on('event',Function)
executor.shutdown()
| Name |
Description |
| init: (jobId:string)=>Promise) |
Initialize the kafka-executor and connect consumer with the kafka. |
| addJob: (jobId:string, new Job(...))=>void) |
Adds a job in the processing flow. |
| removeJob: (jobId:string)=>void) |
removes a job. |
| on: (jobId:string)=>void) |
Listens in a variant of events handled by kafka-executor and rdkafka |
| shutdown: (jobId:string)=>Promise) |
shutdown the process gracefully ensuring that the pending jobs will finish before exit |
| Event |
Arguments |
Description |
| message.received |
kafkaMessage[] |
Fires when the consumer gets a message |
| message.committed |
kafkaMessage |
Fires when the consumer commits a message |
| processing.error |
kafkaMessage, error |
Fires when one or more jobs fail |
| shutdown |
- |
Fires when the kafka-executor shutdown |
| Event |
Description |
data |
When using the Standard API consumed messages are emitted in this event. |
disconnected |
The disconnected event is emitted when the broker disconnects.
This event is only emitted when .disconnect is called. The wrapper will always try to reconnect otherwise. |
ready |
The ready event is emitted when the Consumer is ready to read messages. |
event |
The event event is emitted when librdkafka reports an event (if you opted in via the event_cb option). |
event.log |
The event.log event is emitted when logging events occur (if you opted in for logging via the event_cb option).
You will need to set a value for debug if you want information to send. |
event.stats |
The event.stats event is emitted when librdkafka reports stats (if you opted in by setting the statistics.interval.ms to a non-zero value). |
event.throttle |
The event.throttle event is emitted when librdkafka reports throttling. |
{
value: Buffer,
size: number,
topic: string,
offset: number,
partition: number,
key: string,
timestamp: number
}
| Name |
Type |
Description |
| value |
Buffer |
message contents as a Buffer |
| size |
number |
size of the message, in bytes |
| topic |
string |
topic the message comes from |
| offset |
number |
offset the message was read from |
| partition |
string |
partition the message was on |
| key |
number |
key of the message if present |
| timestamp |
number |
timestamp of message creation |
{
...Error,
jobId: string,
status?: string,
}
| Name |
type |
Description |
| jobId |
the failed job |
|
| status |
the http status if exists |
|
| Name |
Description |
| kafkaError |
Log produced by kafka |
| connectionError |
Log produced when trying to connect to kafka |
| jobFailed |
Log produced by a job |
| jobRetry |
Log produced by a job when retries |