Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent the consumption of messages for topics paused while fetch is in-flight #397

Merged
merged 9 commits into from
Jun 18, 2019
5 changes: 4 additions & 1 deletion docs/InstrumentationEvents.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ Instrumentation Event:
* consumer.events.GROUP_JOIN
payload: {`groupId`, `memberId`, `leaderId`, `isLeader`, `memberAssignment`, `duration`}

* consumer.events.FETCH
* consumer.events.FETCH_START
payload: {}

* consumer.events.FETCH
payload: {`numberOfBatches`, `duration`}

* consumer.events.START_BATCH_PROCESS
Expand Down
52 changes: 49 additions & 3 deletions src/consumer/__tests__/consumeMessages.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const {
newLogger,
waitFor,
waitForMessages,
waitForNextEvent,
testIfKafka_0_11,
waitForConsumerToJoinGroup,
generateMessages,
Expand Down Expand Up @@ -578,7 +579,6 @@ describe('Consumer', () => {

await consumer.subscribe({ topic: topicName, fromBeginning: true })

const sleep = value => waitFor(delay => delay >= value)
let offsetsConsumed = []

const eachBatch = async ({ batch, heartbeat }) => {
Expand All @@ -596,9 +596,8 @@ describe('Consumer', () => {
await waitForConsumerToJoinGroup(consumer)

await waitFor(() => offsetsConsumed.length === messages.length, { delay: 50 })
await sleep(50)
await waitForNextEvent(consumer, consumer.events.FETCH_START)

// Hope that we're now in an active fetch state? Something like FETCH_START might help
const seekedOffset = offsetsConsumed[Math.floor(messages.length / 2)]
consumer.seek({ topic: topicName, partition: 0, offset: seekedOffset })
await producer.send({ acks: 1, topic: topicName, messages }) // trigger completion of fetch
Expand All @@ -609,6 +608,53 @@ describe('Consumer', () => {
})
})

it('discards messages received when pausing while fetch is in-flight', async () => {
consumer = createConsumer({
cluster: createCluster(),
groupId,
maxWaitTimeInMs: 200,
logger: newLogger(),
})

const messages = Array(10)
.fill()
.map(() => {
const value = secureRandom()
return { key: `key-${value}`, value: `value-${value}` }
})
await producer.connect()
await producer.send({ acks: 1, topic: topicName, messages })

await consumer.connect()

await consumer.subscribe({ topic: topicName, fromBeginning: true })

let offsetsConsumed = []

const eachBatch = async ({ batch, heartbeat }) => {
for (const message of batch.messages) {
offsetsConsumed.push(message.offset)
}

await heartbeat()
}

consumer.run({
eachBatch,
})

await waitForConsumerToJoinGroup(consumer)
await waitFor(() => offsetsConsumed.length === messages.length, { delay: 50 })
await waitForNextEvent(consumer, consumer.events.FETCH_START)

consumer.pause([{ topic: topicName }])
await producer.send({ acks: 1, topic: topicName, messages }) // trigger completion of fetch

await waitForNextEvent(consumer, consumer.events.FETCH)

expect(offsetsConsumed.length).toEqual(messages.length)
})

describe('transactions', () => {
testIfKafka_0_11('accepts messages from an idempotent producer', async () => {
cluster = createCluster({ allowExperimentalV011: true })
Expand Down
22 changes: 22 additions & 0 deletions src/consumer/__tests__/instrumentationEvents.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,28 @@ describe('Consumer > Instrumentation Events', () => {
})
})

it('emits fetch start', async () => {
const onFetchStart = jest.fn()
let fetch = 0
consumer.on(consumer.events.FETCH_START, async event => {
onFetchStart(event)
fetch++
})

await consumer.connect()
await consumer.subscribe({ topic: topicName, fromBeginning: true })

await consumer.run({ eachMessage: () => true })

await waitFor(() => fetch > 0)
expect(onFetchStart).toHaveBeenCalledWith({
id: expect.any(Number),
timestamp: expect.any(Number),
type: 'consumer.fetch_start',
payload: {},
})
})

it('emits start batch process', async () => {
const onStartBatchProcess = jest.fn()
let startBatchProcess = 0
Expand Down
38 changes: 21 additions & 17 deletions src/consumer/consumerGroup.js
Original file line number Diff line number Diff line change
Expand Up @@ -369,23 +369,27 @@ module.exports = class ConsumerGroup {
topics: requestsPerLeader[nodeId],
})

const batchesPerPartition = responses.map(({ topicName, partitions }) => {
const topicRequestData = requestsPerLeader[nodeId].find(
({ topic }) => topic === topicName
)

return partitions
.filter(partitionData => !this.seekOffset.has(topicName, partitionData.partition))
.map(partitionData => {
const partitionRequestData = topicRequestData.partitions.find(
({ partition }) => partition === partitionData.partition
)

const fetchedOffset = partitionRequestData.fetchOffset

return new Batch(topicName, fetchedOffset, partitionData)
})
})
const pausedAtResponse = this.subscriptionState.paused()

const batchesPerPartition = responses
.filter(({ topicName }) => !pausedAtResponse.includes(topicName))
.map(({ topicName, partitions }) => {
const topicRequestData = requestsPerLeader[nodeId].find(
({ topic }) => topic === topicName
)

return partitions
.filter(partitionData => !this.seekOffset.has(topicName, partitionData.partition))
.map(partitionData => {
const partitionRequestData = topicRequestData.partitions.find(
({ partition }) => partition === partitionData.partition
)

const fetchedOffset = partitionRequestData.fetchOffset

return new Batch(topicName, fetchedOffset, partitionData)
})
})

return flatten(batchesPerPartition)
})
Expand Down
1 change: 1 addition & 0 deletions src/consumer/instrumentationEvents.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const events = {
COMMIT_OFFSETS: consumerType('commit_offsets'),
GROUP_JOIN: consumerType('group_join'),
FETCH: consumerType('fetch'),
FETCH_START: consumerType('fetch_start'),
START_BATCH_PROCESS: consumerType('start_batch_process'),
END_BATCH_PROCESS: consumerType('end_batch_process'),
CONNECT: consumerType('connect'),
Expand Down
5 changes: 4 additions & 1 deletion src/consumer/runner.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ const createRetry = require('../retry')
const limitConcurrency = require('../utils/concurrency')
const { KafkaJSError } = require('../errors')
const {
events: { GROUP_JOIN, FETCH, START_BATCH_PROCESS, END_BATCH_PROCESS },
events: { GROUP_JOIN, FETCH, FETCH_START, START_BATCH_PROCESS, END_BATCH_PROCESS },
} = require('./instrumentationEvents')

const isTestMode = process.env.NODE_ENV === 'test'
Expand Down Expand Up @@ -214,6 +214,9 @@ module.exports = class Runner {

async fetch() {
const startFetch = Date.now()

this.instrumentationEmitter.emit(FETCH_START, {})

const batches = await this.consumerGroup.fetch()

this.instrumentationEmitter.emit(FETCH, {
Expand Down
17 changes: 17 additions & 0 deletions testHelpers/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,22 @@ const retryProtocol = (errorType, fn) =>
const waitForMessages = (buffer, { number = 1, delay = 50 } = {}) =>
waitFor(() => (buffer.length >= number ? buffer : false), { delay, ignoreTimeout: true })

const waitForNextEvent = (consumer, eventName, { maxWait = 10000 } = {}) =>
new Promise((resolve, reject) => {
const timeoutId = setTimeout(
() => reject(new Error(`Timeout waiting for '${eventName}'`)),
maxWait
)
consumer.on(eventName, event => {
clearTimeout(timeoutId)
resolve(event)
})
consumer.on(consumer.events.CRASH, event => {
clearTimeout(timeoutId)
reject(event.payload.error)
})
})

const waitForConsumerToJoinGroup = (consumer, { maxWait = 10000 } = {}) =>
new Promise((resolve, reject) => {
const timeoutId = setTimeout(() => reject(new Error('Timeout')), maxWait)
Expand Down Expand Up @@ -225,6 +241,7 @@ module.exports = {
createTopic,
waitFor: testWaitFor,
waitForMessages,
waitForNextEvent,
waitForConsumerToJoinGroup,
testIfKafka_0_11,
testIfKafka_1_1_0,
Expand Down