Skip to content

Commit 1475dc4

Browse files
committed
support buffer input
1 parent d8afc8b commit 1475dc4

File tree

2 files changed

+67
-8
lines changed

2 files changed

+67
-8
lines changed

src/index.d.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,8 +225,7 @@ export class Producer extends Client {
225225

226226
constructor(conf?: ProducerConfig, topicConfig?: TopicConfig);
227227

228-
public publish(message: string, topic?: string, partition?: number, key?: string, opaque?: string): Promise<DeliveryReport>;
229-
public publishBuffer(messageBuffer: Buffer, topic?: string, partition?: number, key?: string, opaque?: string): Promise<DeliveryReport>;
228+
public publish(message: string | Buffer | number | object | boolean, topic?: string, partition?: number, key?: string, opaque?: string): Promise<DeliveryReport>;
230229

231230
public report(): Observable<DeliveryReport>;
232231

src/producer.js

Lines changed: 66 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,39 @@ class Producer extends KafkaClient {
6565
* @TODO will delivery report be synchronized with produce?
6666
*/
6767
publish(message, topic = this._config.topics[0], partition = -1, key = null, opaque = null) {
68-
// eslint-disable-next-line new-cap
69-
return this.publishBuffer(new Buffer.from(message), topic, partition, key, opaque);
68+
return new Promise((resolve, reject) => {
69+
70+
try {
71+
this.kafkaProducer.produce(
72+
topic,
73+
partition,
74+
// eslint-disable-next-line new-cap
75+
new Buffer.from(message),
76+
key,
77+
Date.now(),
78+
opaque
79+
);
80+
81+
this.kafkaProducer.prependListener('delivery-report', (err, report) => {
82+
if (err) {
83+
super.emitError(err);
84+
}
85+
console.log('Delivery Report Operation:', new Date(), report);
86+
this._deliveryReportDispatcher.next(report);
87+
resolve(report);
88+
});
89+
90+
} catch (err) {
91+
console.error('Producer Operation (Error)', new Date(), err);
92+
super.emitError(err);
93+
return Promise.reject(err);
94+
}
95+
});
7096
}
7197

7298
/**
7399
* Publish a message
74-
* @param {Buffer} messageBuffer - message buffer to send
100+
* @param {String} message - message to send
75101
* @param {String} [topic=this._config.topics[0]] - topic to send to
76102
* @param {number} [partition=-1] - optionally specify a partition for the message, this defaults to -1 - which will
77103
* use librdkafka's default partitioner (consistent random for keyed messages, random for unkeyed messages)
@@ -80,21 +106,22 @@ class Producer extends KafkaClient {
80106
* @return {Promise<DeliveryReport>}
81107
* @TODO will delivery report be synchronized with produce?
82108
*/
83-
publishBuffer(messageBuffer, topic = this._config.topics[0], partition = -1, key = null, opaque = null) {
109+
publish(message, topic = this._config.topics[0], partition = -1, key = null, opaque = null) {
84110
return new Promise((resolve, reject) => {
85111

86112
try {
87113
this.kafkaProducer.produce(
88114
topic,
89115
partition,
90-
message,
116+
this._createBuffer(message),
91117
key,
92118
Date.now(),
93119
opaque
94120
);
95121

96122
this.kafkaProducer.prependListener('delivery-report', (err, report) => {
97123
if (err) {
124+
// @TODO don't error here?
98125
super.emitError(err);
99126
}
100127
console.log('Delivery Report Operation:', new Date(), report);
@@ -105,7 +132,7 @@ class Producer extends KafkaClient {
105132
} catch (err) {
106133
console.error('Producer Operation (Error)', new Date(), err);
107134
super.emitError(err);
108-
return Promise.reject(err);
135+
return reject(err);
109136
}
110137
});
111138
}
@@ -125,6 +152,39 @@ class Producer extends KafkaClient {
125152
return this._deliveryReportDispatcher.asObservable();
126153
}
127154

155+
/**
156+
* checks to see if obj is a buffer
157+
* @param {object} obj to check
158+
* @return {boolean|*}
159+
* @private
160+
*/
161+
_isBuffer(obj) {
162+
return obj != null && obj.constructor != null &&
163+
typeof obj.constructor.isBuffer === 'function' && obj.constructor.isBuffer(obj);
164+
}
165+
166+
/**
167+
* create buffer from message
168+
* @param {object} message
169+
* @return {*}
170+
* @private {Buffer}
171+
*/
172+
_createBuffer(message) {
173+
if (this._isBuffer(message)) {
174+
// eslint-disable-next-line new-cap
175+
return message;
176+
} else if (typeof message === 'string') {
177+
// eslint-disable-next-line new-cap
178+
return new Buffer.from(message);
179+
} else {
180+
try {
181+
// eslint-disable-next-line new-cap
182+
return new Buffer.from(JSON.stringify(message));
183+
} catch (err) {
184+
throw new Error('Invalid message input ' + err);
185+
}
186+
}
187+
}
128188

129189
/**
130190
* Initializes the events

0 commit comments

Comments
 (0)