Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 52 additions & 64 deletions src/kafka-pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ export interface IKafkaOptions {
globalConfig?: object,
topicConfig?: object,
useHeaders?: boolean,
keyFun?: (any) => Buffer
keyFun?: (any) => Buffer
}

const defaultLogger = Logger.createLogger({
Expand All @@ -28,7 +28,7 @@ export class KafkaPubSub extends PubSubEngine {
protected producer: Kafka.HighLevelProducer // ProducerStream not exported
protected consumer: Kafka.KafkaConsumer // ConsumerStream not exported
protected options: any

private ee: EventEmitter;
private subscriptions: { [key: string]: [string, (...args: any[]) => void] }
private subIdCounter: number;
Expand All @@ -39,16 +39,16 @@ export class KafkaPubSub extends PubSubEngine {
super()
this.options = options
this.logger = createChildLogger(this.options.logger || defaultLogger, 'KafkaPubSub')

this.ee = new EventEmitter();
this.subscriptions = {};
this.subIdCounter = 0;
}

public async publish(channel: string, payload: any): Promise<void> {
public async publish(channel: string, payload: any): Promise<void> {
// only create producer if we actually publish something
this.producer = this.producer || await this.createProducer()

let kafkaPayload = payload
if (!this.options.useHeaders) {
kafkaPayload = {
Expand All @@ -59,16 +59,16 @@ export class KafkaPubSub extends PubSubEngine {

if (this.logger.debug) {
this.logger.debug("Publish %s", JSON.stringify(kafkaPayload))
}
}

return new Promise((resolve, reject) => {
this.producer.produce(
this.options.topic,
null,
this.serialiseMessage(kafkaPayload),
this.options.keyFun ? this.options.keyFun(kafkaPayload) : null,
this.options.topic,
null,
this.serialiseMessage(kafkaPayload),
this.options.keyFun ? this.options.keyFun(kafkaPayload) : null,
Date.now(),
this.options.useHeaders ? [ {channel: Buffer.from(channel)} ] : null,
this.options.useHeaders ? [{ channel: Buffer.from(channel) }] : null,
(err) => {
if (err) {
reject(err)
Expand All @@ -95,7 +95,7 @@ export class KafkaPubSub extends PubSubEngine {
return Promise.resolve(this.subIdCounter)
}

public unsubscribe(index: number) {
public unsubscribe(index: number) {
const [channel, onMessage] = this.subscriptions[index];
this.logger.info("Unsubscribing from %s", channel)
delete this.subscriptions[index]
Expand Down Expand Up @@ -130,7 +130,7 @@ export class KafkaPubSub extends PubSubEngine {
return Promise.all([producerPromise, consumerPromise]).then()
}

brokerList(){
brokerList() {
return this.options.port ? `${this.options.host}:${this.options.port}` : this.options.host
}

Expand All @@ -144,66 +144,60 @@ export class KafkaPubSub extends PubSubEngine {

private async createProducer(): Promise<Kafka.HighLevelProducer> {
const producer = new Kafka.HighLevelProducer(
Object.assign(
{},
{
'metadata.broker.list': this.brokerList(),
},
this.options.globalConfig),
Object.assign(
{},
{},
this.options.topicConfig
)
Object.assign(
{},
{
'metadata.broker.list': this.brokerList(),
},
this.options.globalConfig),
Object.assign(
{},
{},
this.options.topicConfig
)
);
producer.on('event.error', (err) => {
this.logger.error(err)
})
return new Promise((resolve, reject) => {
producer.on('ready', (data, metadata) => {
let topics = metadata.topics.map(topic => topic.name);
let topics = metadata.topics.map(topic => topic.name);
this.logger.info('Connected, found topics: %s', topics);

if (topics.includes(this.options.topic)) {
resolve(producer);
} else {
this.logger.error('Could not find requested topic %s', this.options.topic);
producer.disconnect()
reject('Could not find requested topic %s')
}

resolve(producer);
})

this.logger.info("Connecting producer ...")
producer.connect();
})
})
}

private async createConsumer(topic: string): Promise<Kafka.KafkaConsumer> {
private async createConsumer(topic: string): Promise<Kafka.KafkaConsumer> {
// Create a group for each instance. The consumer will receive all messages from the topic
const groupId = this.options.groupId || uuidv4()

const consumer = new Kafka.KafkaConsumer(
Object.assign(
{},
{
'group.id': `kafka-pubsub-${groupId}`,
'metadata.broker.list': this.brokerList()
},
this.options.globalConfig,
),
Object.assign(
{},
{"auto.offset.reset": "latest"},
this.options.topicConfig
));
Object.assign(
{},
{
'group.id': `kafka-pubsub-${groupId}`,
'metadata.broker.list': this.brokerList()
},
this.options.globalConfig,
),
Object.assign(
{},
{ "auto.offset.reset": "latest" },
this.options.topicConfig
));

consumer.on('data', (message) => {
if (this.logger.debug) {
this.logger.debug("Received %s", message.value.toString())
}

if (this.options.useHeaders && message.headers) {
const channelHeader = message.headers.find((header: Kafka.MessageHeader) => { return "channel" in header})
const channelHeader = message.headers.find((header: Kafka.MessageHeader) => { return "channel" in header })
if (channelHeader) {
const channel = channelHeader.channel.toString()
this.ee.emit(channel, message.value) // do not parse yet
Expand All @@ -214,38 +208,32 @@ export class KafkaPubSub extends PubSubEngine {
const parsedMessage = this.deserialiseMessage(message.value)
if (parsedMessage.channel) {
// Using channel abstraction
this.ee.emit(parsedMessage.channel, parsedMessage.payload)
this.ee.emit(parsedMessage.channel, parsedMessage.payload)
} else {
// No channel abstraction, publish over the whole topic
this.ee.emit(topic, parsedMessage)
}
}

})

consumer.on('event.log', (event) => {
this.logger.debug(event);
});

return new Promise((resolve, reject) => {
consumer.on('ready', (data, metadata) => {
let topics = metadata.topics.map(topic => topic.name);
let topics = metadata.topics.map(topic => topic.name);
this.logger.info('Connected, found topics: %s', topics);

if (topics.includes(topic)) {
this.logger.info("Subscribing to %s", topic)
consumer.subscribe([topic]);
consumer.consume();
resolve(consumer);
} else {
this.logger.error('Could not find requested topic %s', topic);
consumer.disconnect()
reject('Could not find requested topic %s')
}

this.logger.info("Subscribing to %s", topic)
consumer.subscribe([topic]);
consumer.consume();
resolve(consumer);
})

this.logger.info("Connecting consumer ...")
consumer.connect();
consumer.connect();
})
}
}