Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow calling heartbeat from eachMessage handler #1255

Merged
merged 4 commits into from
Dec 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/Consuming.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ The `eachMessage` handler provides a convenient and easy to use API, feeding you

```javascript
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
eachMessage: async ({ topic, partition, message, heartbeat }) => {
console.log({
key: message.key.toString(),
value: message.value.toString(),
Expand All @@ -53,6 +53,8 @@ await consumer.run({
})
```

Be aware that the `eachMessage` handler should not block for longer than the configured [session timeout](#options) or else the consumer will be removed from the group. If your workload involves very slow processing times for individual messages then you should either increase the session timeout or make periodic use of the `heartbeat` function exposed in the handler payload.

## <a name="each-batch"></a> eachBatch

Some use cases require dealing with batches directly. This handler will feed your function batches and provide some utility functions to give your code more flexibility: `resolveOffset`, `heartbeat`, `commitOffsetsIfNecessary`, `uncommittedOffsets`, `isRunning`, and `isStale`. All resolved offsets will be automatically committed after the function is executed.
Expand Down
272 changes: 168 additions & 104 deletions src/consumer/__tests__/consumeMessages.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,25 +79,29 @@ describe('Consumer', () => {

expect(cluster.refreshMetadataIfNecessary).toHaveBeenCalled()

expect(messagesConsumed[0]).toEqual({
topic: topicName,
partition: 0,
message: expect.objectContaining({
key: Buffer.from(messages[0].key),
value: Buffer.from(messages[0].value),
offset: '0',
}),
})
expect(messagesConsumed[0]).toEqual(
expect.objectContaining({
topic: topicName,
partition: 0,
message: expect.objectContaining({
key: Buffer.from(messages[0].key),
value: Buffer.from(messages[0].value),
offset: '0',
}),
})
)

expect(messagesConsumed[messagesConsumed.length - 1]).toEqual({
topic: topicName,
partition: 0,
message: expect.objectContaining({
key: Buffer.from(messages[messages.length - 1].key),
value: Buffer.from(messages[messages.length - 1].value),
offset: '99',
}),
})
expect(messagesConsumed[messagesConsumed.length - 1]).toEqual(
expect.objectContaining({
topic: topicName,
partition: 0,
message: expect.objectContaining({
key: Buffer.from(messages[messages.length - 1].key),
value: Buffer.from(messages[messages.length - 1].value),
offset: '99',
}),
})
)

// check if all offsets are present
expect(messagesConsumed.map(m => m.message.offset)).toEqual(messages.map((_, i) => `${i}`))
Expand Down Expand Up @@ -205,6 +209,54 @@ describe('Consumer', () => {
}
})

it('heartbeats are exposed in the eachMessage handler', async () => {
consumer = createConsumer({
cluster,
groupId,
heartbeatInterval: 50,
logger: newLogger(),
})

topicName = `test-topic-${secureRandom()}`
await createTopic({
topic: topicName,
partitions: 1,
})

await consumer.connect()
await producer.connect()
await consumer.subscribe({ topic: topicName, fromBeginning: true })

const messagesConsumed = []

let heartbeats = 0
consumer.on(consumer.events.HEARTBEAT, () => {
heartbeats++
})

consumer.run({
eachMessage: async payload => {
await new Promise(resolve => {
setTimeout(resolve, 100)
})

await payload.heartbeat()
messagesConsumed.push(payload.message)

await new Promise(resolve => {
setTimeout(resolve, 100)
})
},
})

await waitForConsumerToJoinGroup(consumer)

await producer.send({ acks: 1, topic: topicName, messages: [{ key: 'value', value: 'value' }] })
await waitForMessages(messagesConsumed, { number: 1 })

expect(heartbeats).toBe(1)
})

it('consume GZIP messages', async () => {
await consumer.connect()
await producer.connect()
Expand All @@ -227,24 +279,24 @@ describe('Consumer', () => {
})

await expect(waitForMessages(messagesConsumed, { number: 2 })).resolves.toEqual([
{
expect.objectContaining({
topic: topicName,
partition: 0,
message: expect.objectContaining({
key: Buffer.from(message1.key),
value: Buffer.from(message1.value),
offset: '0',
}),
},
{
}),
expect.objectContaining({
topic: topicName,
partition: 0,
message: expect.objectContaining({
key: Buffer.from(message2.key),
value: Buffer.from(message2.value),
offset: '1',
}),
},
}),
])
})

Expand Down Expand Up @@ -335,25 +387,29 @@ describe('Consumer', () => {

expect(cluster.refreshMetadataIfNecessary).toHaveBeenCalled()

expect(messagesConsumed[0]).toEqual({
topic: topicName,
partition: 0,
message: expect.objectContaining({
key: Buffer.from(messages[0].key),
value: Buffer.from(messages[0].value),
offset: '0',
}),
})
expect(messagesConsumed[0]).toEqual(
expect.objectContaining({
topic: topicName,
partition: 0,
message: expect.objectContaining({
key: Buffer.from(messages[0].key),
value: Buffer.from(messages[0].value),
offset: '0',
}),
})
)

expect(messagesConsumed[messagesConsumed.length - 1]).toEqual({
topic: topicName,
partition: 0,
message: expect.objectContaining({
key: Buffer.from(messages[messages.length - 1].key),
value: Buffer.from(messages[messages.length - 1].value),
offset: '99',
}),
})
expect(messagesConsumed[messagesConsumed.length - 1]).toEqual(
expect.objectContaining({
topic: topicName,
partition: 0,
message: expect.objectContaining({
key: Buffer.from(messages[messages.length - 1].key),
value: Buffer.from(messages[messages.length - 1].value),
offset: '99',
}),
})
)

// check if all offsets are present
expect(messagesConsumed.map(m => m.message.offset)).toEqual(messages.map((_, i) => `${i}`))
Expand Down Expand Up @@ -418,71 +474,79 @@ describe('Consumer', () => {
const messagesFromTopic1 = messagesConsumed.filter(m => m.topic === topicName)
const messagesFromTopic2 = messagesConsumed.filter(m => m.topic === topicName2)

expect(messagesFromTopic1[0]).toEqual({
topic: topicName,
partition: 0,
message: expect.objectContaining({
key: Buffer.from(messages1[0].key),
value: Buffer.from(messages1[0].value),
headers: {
'header-keyA': Buffer.from(messages1[0].headers['header-keyA']),
'header-keyB': Buffer.from(messages1[0].headers['header-keyB']),
'header-keyC': Buffer.from(messages1[0].headers['header-keyC']),
},
magicByte: 2,
offset: '0',
}),
})
expect(messagesFromTopic1[0]).toEqual(
expect.objectContaining({
topic: topicName,
partition: 0,
message: expect.objectContaining({
key: Buffer.from(messages1[0].key),
value: Buffer.from(messages1[0].value),
headers: {
'header-keyA': Buffer.from(messages1[0].headers['header-keyA']),
'header-keyB': Buffer.from(messages1[0].headers['header-keyB']),
'header-keyC': Buffer.from(messages1[0].headers['header-keyC']),
},
magicByte: 2,
offset: '0',
}),
})
)

const lastMessage1 = messages1[messages1.length - 1]
expect(messagesFromTopic1[messagesFromTopic1.length - 1]).toEqual({
topic: topicName,
partition: 0,
message: expect.objectContaining({
key: Buffer.from(lastMessage1.key),
value: Buffer.from(lastMessage1.value),
headers: {
'header-keyA': Buffer.from(lastMessage1.headers['header-keyA']),
'header-keyB': Buffer.from(lastMessage1.headers['header-keyB']),
'header-keyC': Buffer.from(lastMessage1.headers['header-keyC']),
},
magicByte: 2,
offset: '102',
}),
})
expect(messagesFromTopic1[messagesFromTopic1.length - 1]).toEqual(
expect.objectContaining({
topic: topicName,
partition: 0,
message: expect.objectContaining({
key: Buffer.from(lastMessage1.key),
value: Buffer.from(lastMessage1.value),
headers: {
'header-keyA': Buffer.from(lastMessage1.headers['header-keyA']),
'header-keyB': Buffer.from(lastMessage1.headers['header-keyB']),
'header-keyC': Buffer.from(lastMessage1.headers['header-keyC']),
},
magicByte: 2,
offset: '102',
}),
})
)

expect(messagesFromTopic2[0]).toEqual({
topic: topicName2,
partition: 0,
message: expect.objectContaining({
key: Buffer.from(messages2[0].key),
value: Buffer.from(messages2[0].value),
headers: {
'header-keyA': Buffer.from(messages2[0].headers['header-keyA']),
'header-keyB': Buffer.from(messages2[0].headers['header-keyB']),
'header-keyC': Buffer.from(messages2[0].headers['header-keyC']),
},
magicByte: 2,
offset: '0',
}),
})
expect(messagesFromTopic2[0]).toEqual(
expect.objectContaining({
topic: topicName2,
partition: 0,
message: expect.objectContaining({
key: Buffer.from(messages2[0].key),
value: Buffer.from(messages2[0].value),
headers: {
'header-keyA': Buffer.from(messages2[0].headers['header-keyA']),
'header-keyB': Buffer.from(messages2[0].headers['header-keyB']),
'header-keyC': Buffer.from(messages2[0].headers['header-keyC']),
},
magicByte: 2,
offset: '0',
}),
})
)

const lastMessage2 = messages2[messages2.length - 1]
expect(messagesFromTopic2[messagesFromTopic2.length - 1]).toEqual({
topic: topicName2,
partition: 0,
message: expect.objectContaining({
key: Buffer.from(lastMessage2.key),
value: Buffer.from(lastMessage2.value),
headers: {
'header-keyA': Buffer.from(lastMessage2.headers['header-keyA']),
'header-keyB': Buffer.from(lastMessage2.headers['header-keyB']),
'header-keyC': Buffer.from(lastMessage2.headers['header-keyC']),
},
magicByte: 2,
offset: '102',
}),
})
expect(messagesFromTopic2[messagesFromTopic2.length - 1]).toEqual(
expect.objectContaining({
topic: topicName2,
partition: 0,
message: expect.objectContaining({
key: Buffer.from(lastMessage2.key),
value: Buffer.from(lastMessage2.value),
headers: {
'header-keyA': Buffer.from(lastMessage2.headers['header-keyA']),
'header-keyB': Buffer.from(lastMessage2.headers['header-keyB']),
'header-keyC': Buffer.from(lastMessage2.headers['header-keyC']),
},
magicByte: 2,
offset: '102',
}),
})
)

// check if all offsets are present
expect(messagesFromTopic1.map(m => m.message.offset)).toEqual(messages1.map((_, i) => `${i}`))
Expand Down Expand Up @@ -533,7 +597,7 @@ describe('Consumer', () => {
})

await expect(waitForMessages(messagesConsumed, { number: 2 })).resolves.toEqual([
{
expect.objectContaining({
topic: topicName,
partition: 0,
message: expect.objectContaining({
Expand All @@ -545,8 +609,8 @@ describe('Consumer', () => {
magicByte: 2,
offset: '0',
}),
},
{
}),
expect.objectContaining({
topic: topicName,
partition: 0,
message: expect.objectContaining({
Expand All @@ -558,7 +622,7 @@ describe('Consumer', () => {
magicByte: 2,
offset: '1',
}),
},
}),
])
})

Expand Down
4 changes: 2 additions & 2 deletions src/consumer/__tests__/errorRecovery.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,15 @@ describe('Consumer', () => {
await waitForConsumerToJoinGroup(consumer)

await expect(waitForMessages(messagesConsumed)).resolves.toEqual([
{
expect.objectContaining({
topic: topicName,
partition: 0,
message: expect.objectContaining({
key: Buffer.from(message1.key),
value: Buffer.from(message1.value),
offset: '0',
}),
},
}),
])
})

Expand Down
Loading