Skip to content

Commit 0f654ca

Browse files
committed
API Switches
1 parent 66b8c39 commit 0f654ca

File tree

7 files changed

+27
-17
lines changed

7 files changed

+27
-17
lines changed

README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,21 +30,21 @@ function connect() {
3030

3131
function events() {
3232
// Consumer Events
33-
consumer.message()
33+
consumer.onMessage()
3434
.subscribe((data) => {
3535
console.log('Data', data);
3636
});
37-
consumer.error()
37+
consumer.onError()
3838
.subscribe((err) => {
3939
console.log('Consumer Error', err);
4040
});
4141

4242
// Producer Events
43-
producer.report()
43+
producer.onReport()
4444
.subscribe((report) => {
4545
console.log('Producer Delivery Report', report);
4646
});
47-
producer.error()
47+
producer.onError()
4848
.subscribe((err) => {
4949
console.log('Producer Error', err);
5050
});

sample-override.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,14 +75,14 @@ class SampleOverride {
7575
this._startConsuming(1);
7676

7777
// Listen to Messages
78-
this.consumer.message().subscribe(async (message) => {
78+
this.consumer.onMessage().subscribe(async (message) => {
7979
this._stopConsuming(); // Got a message stop consuming (this will not cancel other consumed messages)
8080
await this._process(message); // Process Message
8181
this._startConsuming(1); // Consume the next message
8282
});
8383

8484
// Logging
85-
this.consumer.error().subscribe((err) => {
85+
this.consumer.onError().subscribe((err) => {
8686
console.error(err);
8787
});
8888

sample.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,18 +9,18 @@ const producer = new Producer();
99
*/
1010
function consumerEvent(con) {
1111
return new Promise((resolve, reject) => {
12-
con.message()
12+
con.onMessage()
1313
.subscribe((data) => {
1414
resolve(data);
1515
console.log('SAMPLE Consumer Data:', data);
1616
});
17-
con.error()
17+
con.onError()
1818
.subscribe((err) => {
1919
reject(err);
2020
console.log('SAMPLE Consumer Error:', err);
2121
});
2222
console.log('SAMPLE Consumer Connected');
23-
con.log()
23+
con.onLog()
2424
.subscribe((log) => {
2525
console.log('SAMPLE log', log);
2626
});

src/client.js

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,26 @@ class KafkaClient {
2020
* Listen to log stream
2121
* @return {Observable<log>} - log message
2222
*/
23-
log() {
23+
onLog() {
2424
return this._logDispatcher.asObservable();
2525
}
2626

2727
/**
2828
* Listen to error stream
2929
* @return {Observable<Error>}
3030
*/
31-
error() {
31+
onError() {
3232
return this._errorDispatcher.asObservable();
3333
}
3434

35+
/**
36+
* Listen to disconnect event
37+
* @return {Observable<void>}
38+
*/
39+
onDisconnected() {
40+
return this._disconnectDispatcher.asObservable();
41+
}
42+
3543
/**
3644
* Connect to Kafka
3745
* @protected

src/consumer.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ class Consumer extends KafkaClient {
5656
* Message stream to listen to
5757
* @return {Observable<T>} - message stream
5858
*/
59-
message() {
59+
onMessage() {
6060
return this._messageDispatcher.asObservable();
6161
}
6262

src/index.d.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -184,9 +184,11 @@ export abstract class Client {
184184

185185
constructor();
186186

187-
public log(): Observable<Log>;
187+
public onLog(): Observable<Log>;
188188

189-
public error(): Observable<Error>;
189+
public onError(): Observable<Error>;
190+
191+
public onDisconnected(): Observable<void>;
190192

191193
protected connectClient(kafkaClient?: KafkaClient): Promise<{ name: string }>;
192194

@@ -205,7 +207,7 @@ export class Consumer extends Client {
205207

206208
constructor(conf?: ConsumerConfig, topicConfig?: TopicConfig);
207209

208-
public message(): Observable<MessagePayload>;
210+
public onMessage(): Observable<MessagePayload>;
209211

210212
public connect(): Promise<{ name: string }>;
211213

@@ -227,7 +229,7 @@ export class Producer extends Client {
227229

228230
public publish(message: string | Buffer | number | object | boolean, topic?: string, partition?: number, key?: string, opaque?: string): Promise<DeliveryReport>;
229231

230-
public report(): Observable<DeliveryReport>;
232+
public onReport(): Observable<DeliveryReport>;
231233

232234
public connect(): Promise<{ name: string }>;
233235

src/producer.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ class Producer extends KafkaClient {
148148
* Stream delivery report from the kafka producer
149149
* @return {Observable<DeliveryReport>}
150150
*/
151-
report() {
151+
onReport() {
152152
return this._deliveryReportDispatcher.asObservable();
153153
}
154154

0 commit comments

Comments
 (0)