Skip to content

Commit

Permalink
Merge pull request #367 from JaapRood/fix/seek-stale-batches
Browse files Browse the repository at this point in the history
Discard messages for toppars that saw a seek operation during a fetch or batch processing
  • Loading branch information
tulios authored Jun 6, 2019
2 parents d1d5dd3 + 0516eb6 commit c06a362
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 14 deletions.
11 changes: 7 additions & 4 deletions docs/Consuming.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ Some use cases require dealing with batches directly. This handler will feed you
```javascript
await consumer.run({
eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning }) => {
eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning, isStale }) => {
for (let message of batch.messages) {
console.log({
topic: batch.topic,
Expand All @@ -85,15 +85,16 @@ await consumer.run({
* `resolveOffset()` is used to mark a message in the batch as processed. In case of errors, the consumer will automatically commit the resolved offsets.
* `commitOffsetsIfNecessary(offsets?)` is used to commit offsets based on the autoCommit configurations (`autoCommitInterval` and `autoCommitThreshold`). Note that auto commit won't happen in `eachBatch` if `commitOffsetsIfNecessary` is not invoked. Take a look at [autoCommit](#auto-commit) for more information.
* `uncommittedOffsets()` returns all offsets by topic-partition which have not yet been committed.
* `isStale()` returns whether the messages in the batch have been rendered stale through some other operation and should be discarded. For example, when calling [`consumer.seek`](#seek) the messages in the batch should be discarded, as they are not at the offset we seeked to.

### Example

```javascript
consumer.run({
eachBatchAutoResolve: false,
eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning }) => {
eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning, isStale }) => {
for (let message of batch.messages) {
if (!isRunning()) break
if (!isRunning() || isStale()) break
await processMessage(message)
await resolveOffset(message.offset)
await heartbeat()
Expand All @@ -102,7 +103,7 @@ consumer.run({
})
```

In the example above, if the consumer is shutting down in the middle of the batch, the remaining messages won't be resolved and therefore not committed. This way, you can quickly shut down the consumer without losing/skipping any messages.
In the example above, if the consumer is shutting down in the middle of the batch, the remaining messages won't be resolved and therefore not committed. This way, you can quickly shut down the consumer without losing/skipping any messages. If the batch goes stale for some other reason (like calling `consumer.seek`) none of the remaining messages are processed either.

## <a name="concurrent-processing"></a> Partition-aware concurrency

Expand Down Expand Up @@ -235,6 +236,8 @@ consumer.run({ eachMessage: async ({ topic, message }) => true })
consumer.seek({ topic: 'example', partition: 0, offset: 12384 })
```

Upon seeking to an offset, any messages in active batches are marked as stale and discarded, making sure the next message read for the partition is from the offset sought to. Make sure to check `isStale()` before processing a message using [the `eachBatch` interface](#each-batch) of `consumer.run`.

## <a name="custom-partition-assigner"></a> Custom partition assigner

It's possible to configure the strategy the consumer will use to distribute partitions amongst the consumer group. KafkaJS has a round robin assigner configured by default.
Expand Down
152 changes: 150 additions & 2 deletions src/consumer/__tests__/consumeMessages.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,16 @@ describe('Consumer', () => {
const batchesConsumed = []
const functionsExposed = []
consumer.run({
eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning, uncommittedOffsets }) => {
eachBatch: async ({
batch,
resolveOffset,
heartbeat,
isRunning,
isStale,
uncommittedOffsets,
}) => {
batchesConsumed.push(batch)
functionsExposed.push(resolveOffset, heartbeat, isRunning, uncommittedOffsets)
functionsExposed.push(resolveOffset, heartbeat, isRunning, isStale, uncommittedOffsets)
},
})

Expand Down Expand Up @@ -228,6 +235,7 @@ describe('Consumer', () => {
expect.any(Function),
expect.any(Function),
expect.any(Function),
expect.any(Function),
])
})

Expand Down Expand Up @@ -461,6 +469,146 @@ describe('Consumer', () => {
expect(calls).toEqual(1)
})

describe('discarding messages after seeking', () => {
it('stops consuming messages when fetched batch has gone stale', async () => {
consumer = createConsumer({
cluster: createCluster(),
groupId,
logger: newLogger(),

// make sure we fetch a batch of messages
minBytes: 1024,
maxWaitTimeInMs: 500,
})

const messages = Array(10)
.fill()
.map(() => {
const value = secureRandom()
return { key: `key-${value}`, value: `value-${value}` }
})

await consumer.connect()
await producer.connect()
await producer.send({ acks: 1, topic: topicName, messages })
await consumer.subscribe({ topic: topicName, fromBeginning: true })

let offsetsConsumed = []

consumer.run({
eachMessage: async ({ message }) => {
offsetsConsumed.push(message.offset)

if (offsetsConsumed.length === 1) {
consumer.seek({ topic: topicName, partition: 0, offset: message.offset })
}
},
})

await waitFor(() => offsetsConsumed.length >= 2, { delay: 50 })

expect(offsetsConsumed[0]).toEqual(offsetsConsumed[1])
})

it('resolves a batch as stale when seek was called while processing it', async () => {
consumer = createConsumer({
cluster: createCluster(),
groupId,
logger: newLogger(),

// make sure we fetch a batch of messages
minBytes: 1024,
maxWaitTimeInMs: 500,
})

const messages = Array(10)
.fill()
.map(() => {
const value = secureRandom()
return { key: `key-${value}`, value: `value-${value}` }
})

await consumer.connect()
await producer.connect()
await producer.send({ acks: 1, topic: topicName, messages })
await consumer.subscribe({ topic: topicName, fromBeginning: true })

let offsetsConsumed = []

consumer.run({
eachBatch: async ({ batch, isStale, heartbeat, resolveOffset }) => {
for (let message of batch.messages) {
if (isStale()) break

offsetsConsumed.push(message.offset)

if (offsetsConsumed.length === 1) {
consumer.seek({ topic: topicName, partition: 0, offset: message.offset })
}

await resolveOffset(message.offset)
await heartbeat()
}
},
})

await waitFor(() => offsetsConsumed.length >= 2, { delay: 50 })

expect(offsetsConsumed[0]).toEqual(offsetsConsumed[1])
})

it('skips messages fetched while seek was called', async () => {
consumer = createConsumer({
cluster: createCluster(),
groupId,
maxWaitTimeInMs: 1000,
logger: newLogger(),
})

const messages = Array(10)
.fill()
.map(() => {
const value = secureRandom()
return { key: `key-${value}`, value: `value-${value}` }
})
await producer.connect()
await producer.send({ acks: 1, topic: topicName, messages })

await consumer.connect()

await consumer.subscribe({ topic: topicName, fromBeginning: true })

const sleep = value => waitFor(delay => delay >= value)
let offsetsConsumed = []

const eachBatch = async ({ batch, heartbeat }) => {
for (const message of batch.messages) {
offsetsConsumed.push(message.offset)
}

await heartbeat()
}

consumer.run({
eachBatch,
})

await waitForConsumerToJoinGroup(consumer)

await waitFor(() => offsetsConsumed.length === messages.length, { delay: 50 })
await sleep(50)

// Hope that we're now in an active fetch state? Something like FETCH_START might help
const seekedOffset = offsetsConsumed[Math.floor(messages.length / 2)]
consumer.seek({ topic: topicName, partition: 0, offset: seekedOffset })
await producer.send({ acks: 1, topic: topicName, messages }) // trigger completion of fetch

await waitFor(() => offsetsConsumed.length > messages.length, { delay: 50 })

expect(offsetsConsumed[messages.length]).toEqual(seekedOffset)
})
})

describe('transactions', () => {
testIfKafka_0_11('accepts messages from an idempotent producer', async () => {
cluster = createCluster({ allowExperimentalV011: true })
Expand Down
20 changes: 13 additions & 7 deletions src/consumer/consumerGroup.js
Original file line number Diff line number Diff line change
Expand Up @@ -374,15 +374,17 @@ module.exports = class ConsumerGroup {
({ topic }) => topic === topicName
)

return partitions.map(partitionData => {
const partitionRequestData = topicRequestData.partitions.find(
({ partition }) => partition === partitionData.partition
)
return partitions
.filter(partitionData => !this.seekOffset.has(topicName, partitionData.partition))
.map(partitionData => {
const partitionRequestData = topicRequestData.partitions.find(
({ partition }) => partition === partitionData.partition
)

const fetchedOffset = partitionRequestData.fetchOffset
const fetchedOffset = partitionRequestData.fetchOffset

return new Batch(topicName, fetchedOffset, partitionData)
})
return new Batch(topicName, fetchedOffset, partitionData)
})
})

return flatten(batchesPerPartition)
Expand Down Expand Up @@ -484,4 +486,8 @@ module.exports = class ConsumerGroup {
}
}
}

hasSeekOffset({ topic, partition }) {
return this.seekOffset.has(topic, partition)
}
}
3 changes: 2 additions & 1 deletion src/consumer/runner.js
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ module.exports = class Runner {
const { topic, partition } = batch

for (let message of batch.messages) {
if (!this.running) {
if (!this.running || this.consumerGroup.hasSeekOffset({ topic, partition })) {
break
}

Expand Down Expand Up @@ -187,6 +187,7 @@ module.exports = class Runner {
},
uncommittedOffsets: () => this.consumerGroup.uncommittedOffsets(),
isRunning: () => this.running,
isStale: () => this.consumerGroup.hasSeekOffset({ topic, partition }),
})
} catch (e) {
if (!isKafkaJSError(e)) {
Expand Down
4 changes: 4 additions & 0 deletions src/consumer/seekOffsets.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ module.exports = class SeekOffsets extends Map {
super.set([topic, partition], offset)
}

has(topic, partition) {
return Array.from(this.keys()).some(([t, p]) => t === topic && p === partition)
}

pop() {
if (this.size === 0) {
return
Expand Down

0 comments on commit c06a362

Please sign in to comment.