From ee21e6fba3aafbe6153410e34bada6c002191d07 Mon Sep 17 00:00:00 2001 From: Ian Walker-Sperber Date: Thu, 8 Nov 2018 14:36:34 -0600 Subject: [PATCH 01/16] EoS #200 Create a transaction manager and call initProducerId for idempotent producer --- src/producer/index.js | 22 +++++++++++++++++-- src/producer/index.spec.js | 14 ++++++++++++ src/producer/transactionManager.js | 34 ++++++++++++++++++++++++++++++ 3 files changed, 68 insertions(+), 2 deletions(-) create mode 100644 src/producer/transactionManager.js diff --git a/src/producer/index.js b/src/producer/index.js index 49856b024..c1fac4a37 100644 --- a/src/producer/index.js +++ b/src/producer/index.js @@ -3,6 +3,7 @@ const createDefaultPartitioner = require('./partitioners/default') const createSendMessages = require('./sendMessages') const InstrumentationEventEmitter = require('../instrumentation/emitter') const events = require('./instrumentationEvents') +const createTransactionManager = require('./transactionManager') const { CONNECT, DISCONNECT } = require('./instrumentationEvents') const { KafkaJSNonRetriableError } = require('../errors') @@ -16,13 +17,20 @@ module.exports = ({ cluster, logger: rootLogger, createPartitioner = createDefaultPartitioner, - retry = { retries: 5 }, + retry, + idempotent = false, }) => { const partitioner = createPartitioner() const retrier = createRetry(Object.assign({}, cluster.retry, retry)) const instrumentationEmitter = new InstrumentationEventEmitter() const logger = rootLogger.namespace('Producer') - const sendMessages = createSendMessages({ logger, cluster, partitioner }) + const transactionManager = createTransactionManager({ logger, cluster }) + const sendMessages = createSendMessages({ logger, cluster, partitioner, transactionManager }) + retry = retry || idempotent ? { retries: Number.MAX_SAFE_INTEGER } : { retries: 5 } + + if (idempotent && retry.retries < Number.MAX_SAFE_INTEGER) { + logger.warn('Limiting retries for the idempotent producer may invalidate EoS guarantees') + } /** * @typedef {Object} TopicMessages @@ -47,6 +55,12 @@ module.exports = ({ throw new KafkaJSNonRetriableError(`Invalid topic`) } + if (idempotent && acks !== -1) { + throw new KafkaJSNonRetriableError( + `Not requiring ack for all messages invalidates the idempotent producer's EoS guarantees` + ) + } + for (let { topic, messages } of topicMessages) { if (!messages) { throw new KafkaJSNonRetriableError( @@ -158,6 +172,10 @@ module.exports = ({ connect: async () => { await cluster.connect() instrumentationEmitter.emit(CONNECT) + + if (idempotent) { + await transactionManager.initProducerId() + } }, /** diff --git a/src/producer/index.spec.js b/src/producer/index.spec.js index 372fef065..84ebc1bcb 100644 --- a/src/producer/index.spec.js +++ b/src/producer/index.spec.js @@ -441,4 +441,18 @@ describe('Producer', () => { expect(await sendMessages()).toEqual([]) }) }) + + describe('when idempotent=true', () => { + test('it fetches a producer id', async () => { + const cluster = createCluster( + Object.assign(connectionOpts(), { + allowExperimentalV011: true, + createPartitioner: createModPartitioner, + }) + ) + + producer = createProducer({ cluster, logger: newLogger(), idempotent: true }) + await producer.connect() + }) + }) }) diff --git a/src/producer/transactionManager.js b/src/producer/transactionManager.js new file mode 100644 index 000000000..23876379d --- /dev/null +++ b/src/producer/transactionManager.js @@ -0,0 +1,34 @@ +module.exports = ({ logger, cluster }) => { + let producerId = -1 + let producerEpoch = 0 + let sequences = {} + + return { + getSequence(topic, partition) { + sequences[topic] = sequences[topic] || {} + sequences[topic][partition] = sequences[topic][partition] || 0 + + return sequences[topic][partition] + }, + + updateSequence(topic, partition, sequence) { + sequences[topic] = sequences[topic] || {} + sequences[topic][partition] = sequence + }, + + initProducerId: async () => { + await cluster.refreshMetadata() + // If non-transactional we can request the PID from any broker + const broker = await cluster.findControllerBroker() + const result = await broker.initProducerId({ + transactionTimeout: 30000, + }) + + producerId = result.producerId + producerEpoch = result.producerEpoch + sequences = {} + + logger.debug('Initialized producer id & epoch', producerId, producerEpoch) + }, + } +} From 154aee7d258f38f0f5f6d00fe94d6fd61358e174 Mon Sep 17 00:00:00 2001 From: Ian Walker-Sperber Date: Thu, 8 Nov 2018 15:46:07 -0600 Subject: [PATCH 02/16] EoS #200 Support passing "firstSequence" to record batches --- src/protocol/requests/produce/fixtures/v3_request.json | 2 +- src/protocol/requests/produce/v3/request.js | 2 ++ src/protocol/requests/produce/v3/request.spec.js | 2 ++ 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/protocol/requests/produce/fixtures/v3_request.json b/src/protocol/requests/produce/fixtures/v3_request.json index 72d749e18..a9e06152d 100644 --- a/src/protocol/requests/produce/fixtures/v3_request.json +++ b/src/protocol/requests/produce/fixtures/v3_request.json @@ -1 +1 @@ -{"type":"Buffer","data":[255,255,255,255,0,0,117,48,0,0,0,1,0,31,116,101,115,116,45,116,111,112,105,99,45,101,98,98,97,54,56,56,55,57,99,54,102,53,48,56,49,100,56,99,50,0,0,0,1,0,0,0,0,0,0,1,6,0,0,0,0,0,0,0,0,0,0,0,250,0,0,0,0,2,156,66,78,51,0,0,0,0,0,2,0,0,1,95,142,187,58,12,0,0,1,95,142,187,58,12,255,255,255,255,255,255,255,255,0,0,0,0,0,0,0,0,0,3,130,1,0,0,0,48,107,101,121,45,57,100,48,102,51,52,56,99,98,50,101,55,51,48,101,49,101,100,99,52,62,115,111,109,101,45,118,97,108,117,101,45,97,49,55,98,52,99,56,49,102,57,101,99,100,49,101,56,57,54,101,51,2,2,97,2,98,130,1,0,0,2,48,107,101,121,45,99,55,48,55,51,101,57,54,53,99,51,52,98,52,99,99,54,52,52,50,62,115,111,109,101,45,118,97,108,117,101,45,54,53,100,102,52,50,50,48,55,48,100,55,97,100,55,51,57,49,52,102,2,2,97,2,98,130,1,0,0,4,48,107,101,121,45,49,54,57,51,98,49,56,52,97,57,98,53,50,100,98,101,48,51,98,99,62,115,111,109,101,45,118,97,108,117,101,45,51,102,99,98,54,53,102,102,99,97,48,56,55,99,98,97,50,48,97,100,2,2,97,2,98]} +{"type":"Buffer","data":[255,255,255,255,0,0,117,48,0,0,0,1,0,31,116,101,115,116,45,116,111,112,105,99,45,101,98,98,97,54,56,56,55,57,99,54,102,53,48,56,49,100,56,99,50,0,0,0,1,0,0,0,0,0,0,1,6,0,0,0,0,0,0,0,0,0,0,0,250,0,0,0,0,2,224,115,141,85,0,0,0,0,0,2,0,0,1,95,142,187,58,12,0,0,1,95,142,187,58,12,255,255,255,255,255,255,255,255,0,0,0,0,0,10,0,0,0,3,130,1,0,0,0,48,107,101,121,45,57,100,48,102,51,52,56,99,98,50,101,55,51,48,101,49,101,100,99,52,62,115,111,109,101,45,118,97,108,117,101,45,97,49,55,98,52,99,56,49,102,57,101,99,100,49,101,56,57,54,101,51,2,2,97,2,98,130,1,0,0,2,48,107,101,121,45,99,55,48,55,51,101,57,54,53,99,51,52,98,52,99,99,54,52,52,50,62,115,111,109,101,45,118,97,108,117,101,45,54,53,100,102,52,50,50,48,55,48,100,55,97,100,55,51,57,49,52,102,2,2,97,2,98,130,1,0,0,4,48,107,101,121,45,49,54,57,51,98,49,56,52,97,57,98,53,50,100,98,101,48,51,98,99,62,115,111,109,101,45,118,97,108,117,101,45,51,102,99,98,54,53,102,102,99,97,48,56,55,99,98,97,50,48,97,100,2,2,97,2,98]} diff --git a/src/protocol/requests/produce/v3/request.js b/src/protocol/requests/produce/v3/request.js index 67726bc36..c661c529c 100644 --- a/src/protocol/requests/produce/v3/request.js +++ b/src/protocol/requests/produce/v3/request.js @@ -78,6 +78,7 @@ const partitionsEncoder = compression => async ({ partition, messages, transactionalId, + firstSequence, producerId, producerEpoch, }) => { @@ -106,6 +107,7 @@ const partitionsEncoder = compression => async ({ maxTimestamp, producerId, producerEpoch, + firstSequence, transactional: !!transactionalId, lastOffsetDelta: records.length - 1, }) diff --git a/src/protocol/requests/produce/v3/request.spec.js b/src/protocol/requests/produce/v3/request.spec.js index 49142b6d0..0b7bf060b 100644 --- a/src/protocol/requests/produce/v3/request.spec.js +++ b/src/protocol/requests/produce/v3/request.spec.js @@ -16,6 +16,7 @@ describe('Protocol > Requests > Produce > v3', () => { partitions: [ { partition: 0, + firstSequence: 0, messages: [ { key: 'key-9d0f348cb2e730e1edc4', @@ -58,6 +59,7 @@ describe('Protocol > Requests > Produce > v3', () => { partitions: [ { partition: 0, + firstSequence: 10, messages: [ { key: 'key-9d0f348cb2e730e1edc4', From b426f49ab061aec90e0a168a7ce4856ab0237ffc Mon Sep 17 00:00:00 2001 From: Ian Walker-Sperber Date: Thu, 8 Nov 2018 16:03:42 -0600 Subject: [PATCH 03/16] EoS #200 Increment sequence for idempotent producers --- src/broker/index.js | 3 ++ src/producer/createTopicData.js | 17 ++++++----- src/producer/index.js | 1 + src/producer/sendMessages.js | 33 ++++++++++++++++---- src/producer/sendMessages.spec.js | 48 ++++++++++++++++++++++++++---- src/producer/transactionManager.js | 8 +++++ 6 files changed, 93 insertions(+), 17 deletions(-) diff --git a/src/broker/index.js b/src/broker/index.js index 6d5658e23..3850d0b21 100644 --- a/src/broker/index.js +++ b/src/broker/index.js @@ -160,6 +160,7 @@ module.exports = class Broker { * partitions: [ * { * partition: 0, + * firstSequence: 0, * messages: [ * { key: '1', value: 'A' }, * { key: '2', value: 'B' }, @@ -167,6 +168,7 @@ module.exports = class Broker { * }, * { * partition: 1, + * firstSequence: 0, * messages: [ * { key: '3', value: 'C' }, * ] @@ -178,6 +180,7 @@ module.exports = class Broker { * partitions: [ * { * partition: 4, + * firstSequence: 0, * messages: [ * { key: '32', value: 'E' }, * ] diff --git a/src/producer/createTopicData.js b/src/producer/createTopicData.js index b8527e635..83f70bf1f 100644 --- a/src/producer/createTopicData.js +++ b/src/producer/createTopicData.js @@ -1,9 +1,12 @@ module.exports = topicDataForBroker => { - return topicDataForBroker.map(({ topic, partitions, messagesPerPartition }) => ({ - topic, - partitions: partitions.map(partition => ({ - partition, - messages: messagesPerPartition[partition], - })), - })) + return topicDataForBroker.map( + ({ topic, partitions, messagesPerPartition, sequencePerPartition }) => ({ + topic, + partitions: partitions.map(partition => ({ + partition, + firstSequence: sequencePerPartition[partition], + messages: messagesPerPartition[partition], + })), + }) + ) } diff --git a/src/producer/index.js b/src/producer/index.js index c1fac4a37..efbf0d15e 100644 --- a/src/producer/index.js +++ b/src/producer/index.js @@ -174,6 +174,7 @@ module.exports = ({ instrumentationEmitter.emit(CONNECT) if (idempotent) { + // Do we want to init if we already have a producer id? await transactionManager.initProducerId() } }, diff --git a/src/producer/sendMessages.js b/src/producer/sendMessages.js index 9cdeb9d22..8b481cd5c 100644 --- a/src/producer/sendMessages.js +++ b/src/producer/sendMessages.js @@ -12,7 +12,7 @@ const staleMetadata = e => e.type ) -module.exports = ({ logger, cluster, partitioner }) => { +module.exports = ({ logger, cluster, partitioner, transactionManager }) => { const retrier = createRetry({ retries: TOTAL_INDIVIDUAL_ATTEMPTS }) return async ({ acks, timeout, compression, topicMessages }) => { @@ -47,10 +47,18 @@ module.exports = ({ logger, cluster, partitioner }) => { }) const partitions = keys(messagesPerPartition) + const sequencePerPartition = partitions.reduce((result, partition) => { + result[partition] = transactionManager.getSequence(topic, partition) + return result + }, {}) const partitionsPerLeader = cluster.findLeaderForPartitions(topic, partitions) const leaders = keys(partitionsPerLeader) - topicMetadata.set(topic, { partitionsPerLeader, messagesPerPartition }) + topicMetadata.set(topic, { + partitionsPerLeader, + messagesPerPartition, + sequencePerPartition, + }) for (let nodeId of leaders) { const broker = await cluster.findBroker({ nodeId }) @@ -67,18 +75,33 @@ module.exports = ({ logger, cluster, partitioner }) => { const entries = Array.from(topicMetadata.entries()) const topicDataForBroker = entries .filter(([_, { partitionsPerLeader }]) => !!partitionsPerLeader[broker.nodeId]) - .map(([topic, { partitionsPerLeader, messagesPerPartition }]) => ({ + .map(([topic, { partitionsPerLeader, messagesPerPartition, sequencePerPartition }]) => ({ topic, partitions: partitionsPerLeader[broker.nodeId], + sequencePerPartition, messagesPerPartition, })) const topicData = createTopicData(topicDataForBroker) try { - const response = await broker.produce({ acks, timeout, compression, topicData }) + const response = await broker.produce({ + producerId: transactionManager.getProducerId(), + producerEpoch: transactionManager.getProducerEpoch(), + acks, + timeout, + compression, + topicData, + }) const expectResponse = acks !== 0 - responsePerBroker.set(broker, expectResponse ? responseSerializer(response) : []) + const formattedResponse = expectResponse ? responseSerializer(response) : [] + formattedResponse.forEach(({ topicName, partition }) => { + const size = topicMetadata.get(topicName).messagesPerPartition[partition].length + const previous = topicMetadata.get(topicName).sequencePerPartition[partition] + + transactionManager.updateSequence(topicName, partition, previous + size) + }) + responsePerBroker.set(broker, formattedResponse) } catch (e) { responsePerBroker.delete(broker) throw e diff --git a/src/producer/sendMessages.spec.js b/src/producer/sendMessages.spec.js index 97907042f..97671affe 100644 --- a/src/producer/sendMessages.spec.js +++ b/src/producer/sendMessages.spec.js @@ -26,7 +26,13 @@ describe('Producer > sendMessages', () => { 3: [2], } - let messages, partitioner, brokers, cluster, messagesPerPartition, topicPartitionMetadata + let messages, + partitioner, + brokers, + cluster, + messagesPerPartition, + topicPartitionMetadata, + transactionManager beforeEach(() => { messages = [] @@ -62,10 +68,27 @@ describe('Producer > sendMessages', () => { } require('./groupMessagesPerPartition').mockImplementation(() => messagesPerPartition) + transactionManager = { + getProducerId() { + return -1 + }, + getProducerEpoch() { + return 0 + }, + getSequence() { + return 0 + }, + updateSequence() {}, + } }) test('only retry failed brokers', async () => { - const sendMessages = createSendMessages({ logger: newLogger(), cluster, partitioner }) + const sendMessages = createSendMessages({ + logger: newLogger(), + cluster, + partitioner, + transactionManager, + }) brokers[1].produce .mockImplementationOnce(() => { @@ -111,7 +134,12 @@ describe('Producer > sendMessages', () => { } } - const sendMessages = createSendMessages({ logger: newLogger(), cluster, partitioner }) + const sendMessages = createSendMessages({ + logger: newLogger(), + cluster, + partitioner, + transactionManager, + }) brokers[1].produce .mockImplementationOnce(() => { throw new FakeError() @@ -125,7 +153,12 @@ describe('Producer > sendMessages', () => { } test('does not re-produce messages to brokers that are no longer leaders after metadata refresh', async () => { - const sendMessages = createSendMessages({ logger: newLogger(), cluster, partitioner }) + const sendMessages = createSendMessages({ + logger: newLogger(), + cluster, + partitioner, + transactionManager, + }) brokers[2].produce .mockImplementationOnce(() => { @@ -152,7 +185,12 @@ describe('Producer > sendMessages', () => { }) test('refreshes metadata if partition metadata is empty', async () => { - const sendMessages = createSendMessages({ logger: newLogger(), cluster, partitioner }) + const sendMessages = createSendMessages({ + logger: newLogger(), + cluster, + partitioner, + transactionManager, + }) cluster.findTopicPartitionMetadata .mockImplementationOnce(() => ({})) diff --git a/src/producer/transactionManager.js b/src/producer/transactionManager.js index 23876379d..59c2a3fa5 100644 --- a/src/producer/transactionManager.js +++ b/src/producer/transactionManager.js @@ -4,6 +4,14 @@ module.exports = ({ logger, cluster }) => { let sequences = {} return { + getProducerId() { + return producerId + }, + + getProducerEpoch() { + return producerEpoch + }, + getSequence(topic, partition) { sequences[topic] = sequences[topic] || {} sequences[topic][partition] = sequences[topic][partition] || 0 From 807619e7c80055af97beca1c87c5afa10741ea34 Mon Sep 17 00:00:00 2001 From: Ian Walker-Sperber Date: Mon, 12 Nov 2018 13:58:19 -0800 Subject: [PATCH 04/16] EoS #200 Test firstSequence is incremented in sendMessages --- src/producer/index.js | 1 - src/producer/index.spec.js | 36 +++++++++++++---- src/producer/sendMessages.spec.js | 65 +++++++++++++++++++++++++----- src/producer/transactionManager.js | 30 +++++++++----- 4 files changed, 103 insertions(+), 29 deletions(-) diff --git a/src/producer/index.js b/src/producer/index.js index efbf0d15e..c1fac4a37 100644 --- a/src/producer/index.js +++ b/src/producer/index.js @@ -174,7 +174,6 @@ module.exports = ({ instrumentationEmitter.emit(CONNECT) if (idempotent) { - // Do we want to init if we already have a producer id? await transactionManager.initProducerId() } }, diff --git a/src/producer/index.spec.js b/src/producer/index.spec.js index 84ebc1bcb..3ee3a29f1 100644 --- a/src/producer/index.spec.js +++ b/src/producer/index.spec.js @@ -443,16 +443,38 @@ describe('Producer', () => { }) describe('when idempotent=true', () => { - test('it fetches a producer id', async () => { - const cluster = createCluster( - Object.assign(connectionOpts(), { - allowExperimentalV011: true, - createPartitioner: createModPartitioner, - }) - ) + test('sends messages', async () => { + const topics = [`test-topic-${secureRandom()}`, `test-topic-${secureRandom()}`] + + await createTopic({ topic: topics[0] }) + await createTopic({ topic: topics[1] }) + + const cluster = createCluster({ + ...connectionOpts(), + createPartitioner: createModPartitioner, + }) producer = createProducer({ cluster, logger: newLogger(), idempotent: true }) await producer.connect() + + const sendBatch = async topics => { + const topicMessages = topics.map(topic => ({ + acks: -1, + topic, + messages: new Array(10).fill().map((_, i) => ({ + key: `key-${i}`, + value: `value-${i}`, + })), + })) + + return producer.sendBatch({ + acks: -1, + topicMessages, + }) + } + + await sendBatch(topics) + await sendBatch(topics) }) }) }) diff --git a/src/producer/sendMessages.spec.js b/src/producer/sendMessages.spec.js index 97671affe..2bfa16a38 100644 --- a/src/producer/sendMessages.spec.js +++ b/src/producer/sendMessages.spec.js @@ -67,19 +67,14 @@ describe('Producer > sendMessages', () => { '2': [{ key: '2' }, { key: '5' }, { key: '8' }], } - require('./groupMessagesPerPartition').mockImplementation(() => messagesPerPartition) transactionManager = { - getProducerId() { - return -1 - }, - getProducerEpoch() { - return 0 - }, - getSequence() { - return 0 - }, - updateSequence() {}, + getProducerId: jest.fn(() => -1), + getProducerEpoch: jest.fn(() => 0), + getSequence: jest.fn(() => 0), + updateSequence: jest.fn(), } + + require('./groupMessagesPerPartition').mockImplementation(() => messagesPerPartition) }) test('only retry failed brokers', async () => { @@ -200,4 +195,52 @@ describe('Producer > sendMessages', () => { expect(cluster.refreshMetadata).toHaveBeenCalled() }) + + test('retrieves sequence information from the transaction manager and updates', async () => { + const sendMessages = createSendMessages({ + logger: newLogger(), + cluster, + partitioner, + transactionManager, + }) + + transactionManager.getSequence.mockReturnValue(5) + + cluster.findTopicPartitionMetadata + .mockImplementationOnce(() => ({})) + .mockImplementationOnce(() => partitionsPerLeader) + + await sendMessages({ + topicMessages: [{ topic, messages }], + }) + + expect(brokers[1].produce.mock.calls[0][0].topicData[0].partitions[0]).toHaveProperty( + 'firstSequence', + 5 + ) + expect(brokers[2].produce.mock.calls[0][0].topicData[0].partitions[0]).toHaveProperty( + 'firstSequence', + 5 + ) + expect(brokers[3].produce.mock.calls[0][0].topicData[0].partitions[0]).toHaveProperty( + 'firstSequence', + 5 + ) + + expect(transactionManager.updateSequence).toHaveBeenCalledWith( + 'topic-name', + 0, + 5 + messagesPerPartition[0].length + ) + expect(transactionManager.updateSequence).toHaveBeenCalledWith( + 'topic-name', + 1, + 5 + messagesPerPartition[1].length + ) + expect(transactionManager.updateSequence).toHaveBeenCalledWith( + 'topic-name', + 2, + 5 + messagesPerPartition[2].length + ) + }) }) diff --git a/src/producer/transactionManager.js b/src/producer/transactionManager.js index 59c2a3fa5..d8bbfb676 100644 --- a/src/producer/transactionManager.js +++ b/src/producer/transactionManager.js @@ -1,7 +1,13 @@ +const NO_PRODUCER_ID = -1 +const NO_PRODUCER_EPOCH = 0 +const SEQUENCE_START = 0 + module.exports = ({ logger, cluster }) => { - let producerId = -1 - let producerEpoch = 0 - let sequences = {} + let producerId = NO_PRODUCER_ID + let producerEpoch = NO_PRODUCER_EPOCH + let producerSequence = {} // Track sequence by topic-partition + + const isInitialized = () => producerId !== NO_PRODUCER_ID || producerEpoch !== NO_PRODUCER_EPOCH return { getProducerId() { @@ -13,15 +19,19 @@ module.exports = ({ logger, cluster }) => { }, getSequence(topic, partition) { - sequences[topic] = sequences[topic] || {} - sequences[topic][partition] = sequences[topic][partition] || 0 + if (!isInitialized()) return SEQUENCE_START - return sequences[topic][partition] + producerSequence[topic] = producerSequence[topic] || {} + producerSequence[topic][partition] = producerSequence[topic][partition] || SEQUENCE_START + + return producerSequence[topic][partition] }, updateSequence(topic, partition, sequence) { - sequences[topic] = sequences[topic] || {} - sequences[topic][partition] = sequence + if (!isInitialized()) return + + producerSequence[topic] = producerSequence[topic] || {} + producerSequence[topic][partition] = sequence }, initProducerId: async () => { @@ -34,9 +44,9 @@ module.exports = ({ logger, cluster }) => { producerId = result.producerId producerEpoch = result.producerEpoch - sequences = {} + producerSequence = {} - logger.debug('Initialized producer id & epoch', producerId, producerEpoch) + logger.debug('Initialized producer id & epoch', { producerId, producerEpoch }) }, } } From c48b9009f6c6eb6c16201d11a97917cf6b95e294 Mon Sep 17 00:00:00 2001 From: Ian Walker-Sperber Date: Mon, 12 Nov 2018 15:04:50 -0800 Subject: [PATCH 05/16] EoS #200 Test broker produce with idempotent producer --- src/broker/__tests__/produce.spec.js | 96 +++++++++++++++++++++++++++- 1 file changed, 95 insertions(+), 1 deletion(-) diff --git a/src/broker/__tests__/produce.spec.js b/src/broker/__tests__/produce.spec.js index ddfc6b362..8e409acb2 100644 --- a/src/broker/__tests__/produce.spec.js +++ b/src/broker/__tests__/produce.spec.js @@ -1,5 +1,7 @@ const Broker = require('../index') +const COORDINATOR_TYPES = require('../../protocol/coordinatorTypes') const { Types: Compression } = require('../../protocol/message/compression') +const { KafkaJSProtocolError } = require('../../errors') const { secureRandom, createConnection, @@ -17,12 +19,13 @@ describe('Broker > Produce', () => { headers: { [`hkey-${secureRandom()}`]: `hvalue-${secureRandom()}` }, }) - const createTopicData = (headers = false) => [ + const createTopicData = ({ headers, firstSequence } = { headers: false, firstSequence: 0 }) => [ { topic: topicName, partitions: [ { partition: 0, + firstSequence, messages: [ { key: `key-${secureRandom()}`, @@ -203,6 +206,97 @@ describe('Broker > Produce', () => { }) }) + testIfKafka011('request with idempotent producer', async () => { + // Get producer id & epoch + const { + coordinator: { host, port }, + } = await retryProtocol( + 'GROUP_COORDINATOR_NOT_AVAILABLE', + async () => + await broker.findGroupCoordinator({ + groupId: `group-${secureRandom()}`, + coordinatorType: COORDINATOR_TYPES.GROUP, + }) + ) + + const producerBroker = new Broker({ + connection: createConnection({ host, port }), + logger: newLogger(), + }) + + await producerBroker.connect() + const result = await producerBroker.initProducerId({ + transactionTimeout: 30000, + }) + + const producerId = result.producerId + const producerEpoch = result.producerEpoch + + const metadata = await retryProtocol( + 'LEADER_NOT_AVAILABLE', + async () => await broker.metadata([topicName]) + ) + + // Find leader of partition + const partitionBroker = metadata.topicMetadata[0].partitionMetadata[0].leader + const newBrokerData = metadata.brokers.find(b => b.nodeId === partitionBroker) + + // Connect to the correct broker to produce message + broker2 = new Broker({ + connection: createConnection(newBrokerData), + logger: newLogger(), + allowExperimentalV011: true, + }) + await broker2.connect() + + const response1 = await retryProtocol( + 'LEADER_NOT_AVAILABLE', + async () => + await broker2.produce({ + producerId, + producerEpoch, + topicData: createTopicData({ headers: false }), + }) + ) + + expect(response1).toEqual({ + topics: [ + { + topicName, + partitions: [{ baseOffset: '0', errorCode: 0, logAppendTime: '-1', partition: 0 }], + }, + ], + throttleTime: 0, + }) + + // We have to syncronise the sequence number between the producer and the broker + await expect( + broker2.produce({ + producerId, + producerEpoch, + topicData: createTopicData({ headers: false, firstSequence: 1 }), // Too small + }) + ).rejects.toEqual( + new KafkaJSProtocolError('The broker received an out of order sequence number') + ) + + await expect( + broker2.produce({ + producerId, + producerEpoch, + topicData: createTopicData({ headers: false, firstSequence: 5 }), // Too big + }) + ).rejects.toEqual( + new KafkaJSProtocolError('The broker received an out of order sequence number') + ) + + await broker2.produce({ + producerId, + producerEpoch, + topicData: createTopicData({ headers: false, firstSequence: 3 }), // Just right + }) + }) + testIfKafka011('request with headers', async () => { const metadata = await retryProtocol( 'LEADER_NOT_AVAILABLE', From bd321cdb9ed8c43efcf1336d73cff5093ece5fde Mon Sep 17 00:00:00 2001 From: Ian Walker-Sperber Date: Mon, 12 Nov 2018 15:42:59 -0800 Subject: [PATCH 06/16] EoS #200 Document transaction manager --- src/producer/transactionManager.js | 82 ++++++++++++++++++++++-------- 1 file changed, 61 insertions(+), 21 deletions(-) diff --git a/src/producer/transactionManager.js b/src/producer/transactionManager.js index d8bbfb676..3ed88b844 100644 --- a/src/producer/transactionManager.js +++ b/src/producer/transactionManager.js @@ -1,23 +1,54 @@ const NO_PRODUCER_ID = -1 -const NO_PRODUCER_EPOCH = 0 const SEQUENCE_START = 0 +/** + * Manage behavior for an idempotent producer and transactions. + */ module.exports = ({ logger, cluster }) => { + /** + * Current producer ID + */ let producerId = NO_PRODUCER_ID - let producerEpoch = NO_PRODUCER_EPOCH - let producerSequence = {} // Track sequence by topic-partition + /** + * Current producer epoch + */ + let producerEpoch = 0 + /** + * Idempotent production requires that the producer track the sequence number of messages. + * + * Sequences are sent with every Record Batch and tracked per Topic-Partition + */ + let producerSequence = {} - const isInitialized = () => producerId !== NO_PRODUCER_ID || producerEpoch !== NO_PRODUCER_EPOCH + const isInitialized = () => producerId !== NO_PRODUCER_ID return { - getProducerId() { - return producerId - }, + /** + * Initialize the idempotent producer by making an `InitProducerId` request. + * + * Overwrites any existing state in this transaction manager + */ + initProducerId: async () => { + await cluster.refreshMetadata() + // If non-transactional we can request the PID from any broker + const broker = await cluster.findControllerBroker() + const result = await broker.initProducerId({ + transactionTimeout: 30000, + }) - getProducerEpoch() { - return producerEpoch + producerId = result.producerId + producerEpoch = result.producerEpoch + producerSequence = {} + + logger.debug('Initialized producer id & epoch', { producerId, producerEpoch }) }, + /** + * Get the current sequence for a given Topic-Partition. Defaults to 0. + * @param {string} topic + * @param {string} partition + * @returns {number} + */ getSequence(topic, partition) { if (!isInitialized()) return SEQUENCE_START @@ -27,6 +58,14 @@ module.exports = ({ logger, cluster }) => { return producerSequence[topic][partition] }, + /** + * Update the sequence for a given Topic-Partition. + * + * Do nothing if not yet initialized (not idempotent) + * @param {string} topic + * @param {string} partition + * @param {number} sequence + */ updateSequence(topic, partition, sequence) { if (!isInitialized()) return @@ -34,19 +73,20 @@ module.exports = ({ logger, cluster }) => { producerSequence[topic][partition] = sequence }, - initProducerId: async () => { - await cluster.refreshMetadata() - // If non-transactional we can request the PID from any broker - const broker = await cluster.findControllerBroker() - const result = await broker.initProducerId({ - transactionTimeout: 30000, - }) - - producerId = result.producerId - producerEpoch = result.producerEpoch - producerSequence = {} + /** + * Get the current producer id + * @returns {number} + */ + getProducerId() { + return producerId + }, - logger.debug('Initialized producer id & epoch', { producerId, producerEpoch }) + /** + * Get the current producer epoch + * @returns {number} + */ + getProducerEpoch() { + return producerEpoch }, } } From 7f87ce3120473b9e95a57dea32517c941f060e4b Mon Sep 17 00:00:00 2001 From: Ian Walker-Sperber Date: Mon, 12 Nov 2018 17:04:21 -0800 Subject: [PATCH 07/16] EoS #200 Transaction manager should refresh metadata only if necessary --- src/producer/transactionManager.js | 2 +- src/producer/transactionManager.spec.js | 59 +++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 1 deletion(-) create mode 100644 src/producer/transactionManager.spec.js diff --git a/src/producer/transactionManager.js b/src/producer/transactionManager.js index 3ed88b844..abc9b4e20 100644 --- a/src/producer/transactionManager.js +++ b/src/producer/transactionManager.js @@ -29,7 +29,7 @@ module.exports = ({ logger, cluster }) => { * Overwrites any existing state in this transaction manager */ initProducerId: async () => { - await cluster.refreshMetadata() + await cluster.refreshMetadataIfNecessary() // If non-transactional we can request the PID from any broker const broker = await cluster.findControllerBroker() const result = await broker.initProducerId({ diff --git a/src/producer/transactionManager.spec.js b/src/producer/transactionManager.spec.js new file mode 100644 index 000000000..d02fda988 --- /dev/null +++ b/src/producer/transactionManager.spec.js @@ -0,0 +1,59 @@ +const { newLogger } = require('testHelpers') +const createTransactionManager = require('./transactionManager') + +describe('Producer > transactionManager', () => { + const topic = 'topic-name' + const mockInitProducerIdResponse = { + producerId: 1000, + producerEpoch: 1, + } + + let cluster, broker + + beforeEach(() => { + broker = { + initProducerId: jest.fn().mockReturnValue(mockInitProducerIdResponse), + } + cluster = { + refreshMetadataIfNecessary: jest.fn(), + findControllerBroker: jest.fn().mockReturnValue(broker), + } + }) + + test('initializing the producer id and epoch', async () => { + const transactionManager = createTransactionManager({ logger: newLogger(), cluster }) + + expect(transactionManager.getProducerId()).toEqual(-1) + expect(transactionManager.getProducerEpoch()).toEqual(0) + expect(transactionManager.getSequence(topic, 1)).toEqual(0) + + await transactionManager.initProducerId() + + expect(cluster.refreshMetadataIfNecessary).toHaveBeenCalled() + expect(broker.initProducerId).toHaveBeenCalledWith({ transactionTimeout: 30000 }) + + expect(transactionManager.getProducerId()).toEqual(mockInitProducerIdResponse.producerId) + expect(transactionManager.getProducerEpoch()).toEqual(mockInitProducerIdResponse.producerEpoch) + }) + + test('getting & updating the sequence per topic-partition', async () => { + const transactionManager = createTransactionManager({ logger: newLogger(), cluster }) + + expect(transactionManager.getSequence(topic, 1)).toEqual(0) + transactionManager.updateSequence(topic, 1, 10) // No effect if we haven't initialized + expect(transactionManager.getSequence(topic, 1)).toEqual(0) + + await transactionManager.initProducerId() + + expect(transactionManager.getSequence(topic, 1)).toEqual(0) + transactionManager.updateSequence(topic, 1, 5) + transactionManager.updateSequence(topic, 1, 10) // Updates, not increments + expect(transactionManager.getSequence(topic, 1)).toEqual(10) + + expect(transactionManager.getSequence(topic, 2)).toEqual(0) // Different partition + expect(transactionManager.getSequence('foobar', 1)).toEqual(0) // Different topic + + await transactionManager.initProducerId() + expect(transactionManager.getSequence(topic, 1)).toEqual(0) // Sequences reset by initProducerId + }) +}) From 83584948ebba74c8cb89353501972b2991d91d89 Mon Sep 17 00:00:00 2001 From: Ian Walker-Sperber Date: Mon, 12 Nov 2018 17:04:55 -0800 Subject: [PATCH 08/16] EoS #200 Run producer send message tests with idempotent flag as well --- src/producer/createTopicData.spec.js | 38 +++ src/producer/index.js | 3 +- src/producer/index.spec.js | 448 ++++++++++++++------------- 3 files changed, 272 insertions(+), 217 deletions(-) create mode 100644 src/producer/createTopicData.spec.js diff --git a/src/producer/createTopicData.spec.js b/src/producer/createTopicData.spec.js new file mode 100644 index 000000000..eb4a7b7ab --- /dev/null +++ b/src/producer/createTopicData.spec.js @@ -0,0 +1,38 @@ +const createTopicData = require('./createTopicData') + +describe('Producer > createTopicData', () => { + let topic, partitions, messagesPerPartition, sequencePerPartition + + beforeEach(() => { + topic = 'test-topic' + partitions = [1, 2, 3] + + messagesPerPartition = { + 1: [{ key: '1' }], + 2: [{ key: '2' }], + 3: [{ key: '3' }, { key: '4' }], + } + + sequencePerPartition = { + 1: 0, + 2: 5, + 3: 10, + } + }) + + test('format data by topic and partition', () => { + const result = createTopicData([ + { topic, partitions, messagesPerPartition, sequencePerPartition }, + ]) + expect(result).toEqual([ + { + topic, + partitions: [ + { partition: 1, firstSequence: 0, messages: [{ key: '1' }] }, + { partition: 2, firstSequence: 5, messages: [{ key: '2' }] }, + { partition: 3, firstSequence: 10, messages: [{ key: '3' }, { key: '4' }] }, + ], + }, + ]) + }) +}) diff --git a/src/producer/index.js b/src/producer/index.js index c1fac4a37..bb36b4f80 100644 --- a/src/producer/index.js +++ b/src/producer/index.js @@ -20,13 +20,14 @@ module.exports = ({ retry, idempotent = false, }) => { + retry = retry || idempotent ? { retries: Number.MAX_SAFE_INTEGER } : { retries: 5 } + const partitioner = createPartitioner() const retrier = createRetry(Object.assign({}, cluster.retry, retry)) const instrumentationEmitter = new InstrumentationEventEmitter() const logger = rootLogger.namespace('Producer') const transactionManager = createTransactionManager({ logger, cluster }) const sendMessages = createSendMessages({ logger, cluster, partitioner, transactionManager }) - retry = retry || idempotent ? { retries: Number.MAX_SAFE_INTEGER } : { retries: 5 } if (idempotent && retry.retries < Number.MAX_SAFE_INTEGER) { logger.warn('Limiting retries for the idempotent producer may invalidate EoS guarantees') diff --git a/src/producer/index.spec.js b/src/producer/index.spec.js index 3ee3a29f1..2cfadd80e 100644 --- a/src/producer/index.spec.js +++ b/src/producer/index.spec.js @@ -14,7 +14,7 @@ const { createTopic, } = require('testHelpers') -const { KafkaJSSASLAuthenticationError } = require('../errors') +const { KafkaJSSASLAuthenticationError, KafkaJSNonRetriableError } = require('../errors') describe('Producer', () => { let topicName, producer @@ -181,214 +181,6 @@ describe('Producer', () => { expect(cluster.isConnected()).toEqual(true) }) - test('produce messages', async () => { - const cluster = createCluster( - Object.assign(connectionOpts(), { - createPartitioner: createModPartitioner, - }) - ) - - await createTopic({ topic: topicName }) - - producer = createProducer({ cluster, logger: newLogger() }) - await producer.connect() - - const sendMessages = async () => - await producer.send({ - acks: 1, - topic: topicName, - messages: new Array(10).fill().map((_, i) => ({ - key: `key-${i}`, - value: `value-${i}`, - })), - }) - - expect(await sendMessages()).toEqual([ - { - topicName, - errorCode: 0, - offset: '0', - partition: 0, - timestamp: '-1', - }, - ]) - - expect(await sendMessages()).toEqual([ - { - topicName, - errorCode: 0, - offset: '10', - partition: 0, - timestamp: '-1', - }, - ]) - }) - - testIfKafka011('produce messages for Kafka 0.11', async () => { - const cluster = createCluster( - Object.assign(connectionOpts(), { - allowExperimentalV011: true, - createPartitioner: createModPartitioner, - }) - ) - - await createTopic({ topic: topicName }) - - producer = createProducer({ cluster, logger: newLogger() }) - await producer.connect() - - const sendMessages = async () => - await producer.send({ - acks: 1, - topic: topicName, - messages: new Array(10).fill().map((_, i) => ({ - key: `key-${i}`, - value: `value-${i}`, - })), - }) - - expect(await sendMessages()).toEqual([ - { - topicName, - baseOffset: '0', - errorCode: 0, - logAppendTime: '-1', - partition: 0, - }, - ]) - - expect(await sendMessages()).toEqual([ - { - topicName, - baseOffset: '10', - errorCode: 0, - logAppendTime: '-1', - partition: 0, - }, - ]) - }) - - testIfKafka011('produce messages for Kafka 0.11 with headers', async () => { - const cluster = createCluster( - Object.assign(connectionOpts(), { - allowExperimentalV011: true, - createPartitioner: createModPartitioner, - }) - ) - - await createTopic({ topic: topicName }) - - producer = createProducer({ cluster, logger: newLogger() }) - await producer.connect() - - const sendMessages = async () => - await producer.send({ - acks: 1, - topic: topicName, - messages: new Array(10).fill().map((_, i) => ({ - key: `key-${i}`, - value: `value-${i}`, - headers: { - [`header-a${i}`]: `header-value-a${i}`, - [`header-b${i}`]: `header-value-b${i}`, - [`header-c${i}`]: `header-value-c${i}`, - }, - })), - }) - - expect(await sendMessages()).toEqual([ - { - topicName, - baseOffset: '0', - errorCode: 0, - logAppendTime: '-1', - partition: 0, - }, - ]) - - expect(await sendMessages()).toEqual([ - { - topicName, - baseOffset: '10', - errorCode: 0, - logAppendTime: '-1', - partition: 0, - }, - ]) - }) - - test('produce messages to multiple topics', async () => { - const topics = [`test-topic-${secureRandom()}`, `test-topic-${secureRandom()}`] - - await createTopic({ topic: topics[0] }) - await createTopic({ topic: topics[1] }) - - const cluster = createCluster({ - ...connectionOpts(), - createPartitioner: createModPartitioner, - }) - const byTopicName = (a, b) => a.topicName.localeCompare(b.topicName) - - producer = createProducer({ cluster, logger: newLogger() }) - await producer.connect() - - const sendBatch = async topics => { - const topicMessages = topics.map(topic => ({ - acks: 1, - topic, - messages: new Array(10).fill().map((_, i) => ({ - key: `key-${i}`, - value: `value-${i}`, - })), - })) - - return producer.sendBatch({ - acks: 1, - topicMessages, - }) - } - - let result = await sendBatch(topics) - expect(result.sort(byTopicName)).toEqual( - [ - { - topicName: topics[0], - errorCode: 0, - offset: '0', - partition: 0, - timestamp: '-1', - }, - { - topicName: topics[1], - errorCode: 0, - offset: '0', - partition: 0, - timestamp: '-1', - }, - ].sort(byTopicName) - ) - - result = await sendBatch(topics) - expect(result.sort(byTopicName)).toEqual( - [ - { - topicName: topics[0], - errorCode: 0, - offset: '10', - partition: 0, - timestamp: '-1', - }, - { - topicName: topics[1], - errorCode: 0, - offset: '10', - partition: 0, - timestamp: '-1', - }, - ].sort(byTopicName) - ) - }) - test('gives access to its logger', () => { producer = createProducer({ cluster: createCluster(), logger: newLogger() }) expect(producer.logger()).toMatchSnapshot() @@ -442,8 +234,53 @@ describe('Producer', () => { }) }) - describe('when idempotent=true', () => { - test('sends messages', async () => { + function testProduceMessages(idempotent = false) { + const acks = idempotent ? -1 : 1 + + test('produce messages', async () => { + const cluster = createCluster( + Object.assign(connectionOpts(), { + createPartitioner: createModPartitioner, + }) + ) + + await createTopic({ topic: topicName }) + + producer = createProducer({ cluster, logger: newLogger(), idempotent }) + await producer.connect() + + const sendMessages = async () => + await producer.send({ + acks, + topic: topicName, + messages: new Array(10).fill().map((_, i) => ({ + key: `key-${i}`, + value: `value-${i}`, + })), + }) + + expect(await sendMessages()).toEqual([ + { + topicName, + errorCode: 0, + offset: '0', + partition: 0, + timestamp: '-1', + }, + ]) + + expect(await sendMessages()).toEqual([ + { + topicName, + errorCode: 0, + offset: '10', + partition: 0, + timestamp: '-1', + }, + ]) + }) + + test('produce messages to multiple topics', async () => { const topics = [`test-topic-${secureRandom()}`, `test-topic-${secureRandom()}`] await createTopic({ topic: topics[0] }) @@ -453,13 +290,14 @@ describe('Producer', () => { ...connectionOpts(), createPartitioner: createModPartitioner, }) + const byTopicName = (a, b) => a.topicName.localeCompare(b.topicName) - producer = createProducer({ cluster, logger: newLogger(), idempotent: true }) + producer = createProducer({ cluster, logger: newLogger(), idempotent }) await producer.connect() const sendBatch = async topics => { const topicMessages = topics.map(topic => ({ - acks: -1, + acks, topic, messages: new Array(10).fill().map((_, i) => ({ key: `key-${i}`, @@ -468,13 +306,191 @@ describe('Producer', () => { })) return producer.sendBatch({ - acks: -1, + acks, topicMessages, }) } - await sendBatch(topics) - await sendBatch(topics) + let result = await sendBatch(topics) + expect(result.sort(byTopicName)).toEqual( + [ + { + topicName: topics[0], + errorCode: 0, + offset: '0', + partition: 0, + timestamp: '-1', + }, + { + topicName: topics[1], + errorCode: 0, + offset: '0', + partition: 0, + timestamp: '-1', + }, + ].sort(byTopicName) + ) + + result = await sendBatch(topics) + expect(result.sort(byTopicName)).toEqual( + [ + { + topicName: topics[0], + errorCode: 0, + offset: '10', + partition: 0, + timestamp: '-1', + }, + { + topicName: topics[1], + errorCode: 0, + offset: '10', + partition: 0, + timestamp: '-1', + }, + ].sort(byTopicName) + ) + }) + + testIfKafka011('produce messages for Kafka 0.11', async () => { + const cluster = createCluster( + Object.assign(connectionOpts(), { + allowExperimentalV011: true, + createPartitioner: createModPartitioner, + }) + ) + + await createTopic({ topic: topicName }) + + producer = createProducer({ cluster, logger: newLogger(), idempotent }) + await producer.connect() + + const sendMessages = async () => + await producer.send({ + acks, + topic: topicName, + messages: new Array(10).fill().map((_, i) => ({ + key: `key-${i}`, + value: `value-${i}`, + })), + }) + + expect(await sendMessages()).toEqual([ + { + topicName, + baseOffset: '0', + errorCode: 0, + logAppendTime: '-1', + partition: 0, + }, + ]) + + expect(await sendMessages()).toEqual([ + { + topicName, + baseOffset: '10', + errorCode: 0, + logAppendTime: '-1', + partition: 0, + }, + ]) + }) + + testIfKafka011('produce messages for Kafka 0.11 with headers', async () => { + const cluster = createCluster( + Object.assign(connectionOpts(), { + allowExperimentalV011: true, + createPartitioner: createModPartitioner, + }) + ) + + await createTopic({ topic: topicName }) + + producer = createProducer({ cluster, logger: newLogger(), idempotent }) + await producer.connect() + + const sendMessages = async () => + await producer.send({ + acks, + topic: topicName, + messages: new Array(10).fill().map((_, i) => ({ + key: `key-${i}`, + value: `value-${i}`, + headers: { + [`header-a${i}`]: `header-value-a${i}`, + [`header-b${i}`]: `header-value-b${i}`, + [`header-c${i}`]: `header-value-c${i}`, + }, + })), + }) + + expect(await sendMessages()).toEqual([ + { + topicName, + baseOffset: '0', + errorCode: 0, + logAppendTime: '-1', + partition: 0, + }, + ]) + + expect(await sendMessages()).toEqual([ + { + topicName, + baseOffset: '10', + errorCode: 0, + logAppendTime: '-1', + partition: 0, + }, + ]) + }) + } + + testProduceMessages(false) + + describe('when idempotent=true', () => { + testProduceMessages(true) + + test('throws an error if acks != -1', async () => { + const cluster = createCluster( + Object.assign(connectionOpts(), { + allowExperimentalV011: true, + createPartitioner: createModPartitioner, + }) + ) + + producer = createProducer({ cluster, logger: newLogger(), idempotent: true }) + await producer.connect() + + await expect( + producer.send({ + acks: 1, + topic: topicName, + messages: new Array(10).fill().map((_, i) => ({ + key: `key-${i}`, + value: `value-${i}`, + })), + }) + ).rejects.toEqual( + new KafkaJSNonRetriableError( + "Not requiring ack for all messages invalidates the idempotent producer's EoS guarantees" + ) + ) + + await expect( + producer.send({ + acks: 0, + topic: topicName, + messages: new Array(10).fill().map((_, i) => ({ + key: `key-${i}`, + value: `value-${i}`, + })), + }) + ).rejects.toEqual( + new KafkaJSNonRetriableError( + "Not requiring ack for all messages invalidates the idempotent producer's EoS guarantees" + ) + ) }) }) }) From 4c2db540615f29d6033af267a88a2b09da31fc01 Mon Sep 17 00:00:00 2001 From: Ian Walker-Sperber Date: Tue, 13 Nov 2018 09:48:41 -0800 Subject: [PATCH 09/16] EoS #200 Test default retry value for idempotent producer --- src/producer/index.spec.js | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/src/producer/index.spec.js b/src/producer/index.spec.js index 2cfadd80e..1684e0244 100644 --- a/src/producer/index.spec.js +++ b/src/producer/index.spec.js @@ -492,5 +492,26 @@ describe('Producer', () => { ) ) }) + + test('sets the default retry value to MAX_SAFE_INTEGER', async () => { + jest.resetModules() + jest.mock('../retry') + const createRetryMock = require('../retry') + const createProducerMockedRetry = require('./index') + + const cluster = createCluster( + Object.assign(connectionOpts(), { + allowExperimentalV011: true, + createPartitioner: createModPartitioner, + }) + ) + + producer = createProducerMockedRetry({ cluster, logger: newLogger(), idempotent: true }) + expect(createRetryMock).toHaveBeenCalledWith({ retries: Number.MAX_SAFE_INTEGER }) + + try { + await producer.connect() + } catch (e) {} // Jest will complain about "open handles" if we don't connect. Ignore result. + }) }) }) From 3e4faabb4f53abc8cad6df43e91d63857a4a4bb8 Mon Sep 17 00:00:00 2001 From: Ian Walker-Sperber Date: Tue, 13 Nov 2018 10:35:08 -0800 Subject: [PATCH 10/16] EoS #200 Throw error if retries < 1 --- src/producer/index.js | 19 +++++++++++++------ src/producer/index.spec.js | 15 +++++++++++++++ 2 files changed, 28 insertions(+), 6 deletions(-) diff --git a/src/producer/index.js b/src/producer/index.js index bb36b4f80..5417d0710 100644 --- a/src/producer/index.js +++ b/src/producer/index.js @@ -20,19 +20,26 @@ module.exports = ({ retry, idempotent = false, }) => { - retry = retry || idempotent ? { retries: Number.MAX_SAFE_INTEGER } : { retries: 5 } + retry = retry || (idempotent ? { retries: Number.MAX_SAFE_INTEGER } : { retries: 5 }) + + if (idempotent && retry.retries < 1) { + throw new KafkaJSNonRetriableError( + 'Idempotent producer must allow retries to protect against transient errors' + ) + } - const partitioner = createPartitioner() - const retrier = createRetry(Object.assign({}, cluster.retry, retry)) - const instrumentationEmitter = new InstrumentationEventEmitter() const logger = rootLogger.namespace('Producer') - const transactionManager = createTransactionManager({ logger, cluster }) - const sendMessages = createSendMessages({ logger, cluster, partitioner, transactionManager }) if (idempotent && retry.retries < Number.MAX_SAFE_INTEGER) { logger.warn('Limiting retries for the idempotent producer may invalidate EoS guarantees') } + const partitioner = createPartitioner() + const retrier = createRetry(Object.assign({}, cluster.retry, retry)) + const instrumentationEmitter = new InstrumentationEventEmitter() + const transactionManager = createTransactionManager({ logger, cluster }) + const sendMessages = createSendMessages({ logger, cluster, partitioner, transactionManager }) + /** * @typedef {Object} TopicMessages * @property {string} topic diff --git a/src/producer/index.spec.js b/src/producer/index.spec.js index 1684e0244..7ed59d81e 100644 --- a/src/producer/index.spec.js +++ b/src/producer/index.spec.js @@ -513,5 +513,20 @@ describe('Producer', () => { await producer.connect() } catch (e) {} // Jest will complain about "open handles" if we don't connect. Ignore result. }) + + test('throws an error if retries < 1', async () => { + expect(() => + createProducer({ + cluster: {}, + logger: newLogger(), + idempotent: true, + retry: { retries: 0 }, + }) + ).toThrowError( + new KafkaJSNonRetriableError( + 'Idempotent producer must allow retries to protect against transient errors' + ) + ) + }) }) }) From 6fd6a21bdd79e73211d91612bf00be5ba63dd042 Mon Sep 17 00:00:00 2001 From: Ian Walker-Sperber Date: Tue, 13 Nov 2018 10:41:57 -0800 Subject: [PATCH 11/16] EoS #200 Document "idempotent" experimental flag in README --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index d3de26724..c82fb0a62 100644 --- a/README.md +++ b/README.md @@ -258,6 +258,8 @@ await producer.send({ | acks | Control the number of required acks.
__-1__ = all replicas must acknowledge _(default)_
__0__ = no acknowledgments
__1__ = only waits for the leader to acknowledge | `-1` all replicas must acknowledge | | timeout | The time to await a response in ms | `30000` | | compression | Compression codec | `CompressionTypes.None` | +| ----------- |-------------------------------------------------------------------------------------------------- | ------- | +| idempotent | _Experimental._ If enabled producer will ensure each message is written exactly once. Acks _must_ be set to -1 ("all"). Retries will default to MAX_SAFE_INTEGER. | `false` | By default, the producer is configured to distribute the messages with the following logic: From 80a49477ce4845d342edf2b822df7c540c189135 Mon Sep 17 00:00:00 2001 From: tulios Date: Wed, 14 Nov 2018 10:50:56 +0100 Subject: [PATCH 12/16] Cosmetic changes --- src/broker/__tests__/produce.spec.js | 2 +- src/producer/index.js | 6 ++++-- src/producer/sendMessages.js | 4 ++++ src/producer/transactionManager.js | 19 ++++++++++++------- 4 files changed, 21 insertions(+), 10 deletions(-) diff --git a/src/broker/__tests__/produce.spec.js b/src/broker/__tests__/produce.spec.js index 8e409acb2..8bf3154ee 100644 --- a/src/broker/__tests__/produce.spec.js +++ b/src/broker/__tests__/produce.spec.js @@ -19,7 +19,7 @@ describe('Broker > Produce', () => { headers: { [`hkey-${secureRandom()}`]: `hvalue-${secureRandom()}` }, }) - const createTopicData = ({ headers, firstSequence } = { headers: false, firstSequence: 0 }) => [ + const createTopicData = ({ headers = false, firstSequence = 0 }) => [ { topic: topicName, partitions: [ diff --git a/src/producer/index.js b/src/producer/index.js index 5417d0710..e3fa9f40d 100644 --- a/src/producer/index.js +++ b/src/producer/index.js @@ -19,8 +19,9 @@ module.exports = ({ createPartitioner = createDefaultPartitioner, retry, idempotent = false, + transactionTimeout, }) => { - retry = retry || (idempotent ? { retries: Number.MAX_SAFE_INTEGER } : { retries: 5 }) + retry = retry || { retries: idempotent ? Number.MAX_SAFE_INTEGER : 5 } if (idempotent && retry.retries < 1) { throw new KafkaJSNonRetriableError( @@ -37,7 +38,7 @@ module.exports = ({ const partitioner = createPartitioner() const retrier = createRetry(Object.assign({}, cluster.retry, retry)) const instrumentationEmitter = new InstrumentationEventEmitter() - const transactionManager = createTransactionManager({ logger, cluster }) + const transactionManager = createTransactionManager({ logger, cluster, transactionTimeout }) const sendMessages = createSendMessages({ logger, cluster, partitioner, transactionManager }) /** @@ -52,6 +53,7 @@ module.exports = ({ * -1 = all replicas must acknowledge * 0 = no acknowledgments * 1 = only waits for the leader to acknowledge + * * @property {number} [timeout=30000] The time to await a response in ms * @property {Compression.Types} [compression=Compression.Types.None] Compression codec * diff --git a/src/producer/sendMessages.js b/src/producer/sendMessages.js index 8b481cd5c..e12c6c62c 100644 --- a/src/producer/sendMessages.js +++ b/src/producer/sendMessages.js @@ -51,6 +51,7 @@ module.exports = ({ logger, cluster, partitioner, transactionManager }) => { result[partition] = transactionManager.getSequence(topic, partition) return result }, {}) + const partitionsPerLeader = cluster.findLeaderForPartitions(topic, partitions) const leaders = keys(partitionsPerLeader) @@ -93,14 +94,17 @@ module.exports = ({ logger, cluster, partitioner, transactionManager }) => { compression, topicData, }) + const expectResponse = acks !== 0 const formattedResponse = expectResponse ? responseSerializer(response) : [] + formattedResponse.forEach(({ topicName, partition }) => { const size = topicMetadata.get(topicName).messagesPerPartition[partition].length const previous = topicMetadata.get(topicName).sequencePerPartition[partition] transactionManager.updateSequence(topicName, partition, previous + size) }) + responsePerBroker.set(broker, formattedResponse) } catch (e) { responsePerBroker.delete(broker) diff --git a/src/producer/transactionManager.js b/src/producer/transactionManager.js index abc9b4e20..b1b886c24 100644 --- a/src/producer/transactionManager.js +++ b/src/producer/transactionManager.js @@ -4,15 +4,17 @@ const SEQUENCE_START = 0 /** * Manage behavior for an idempotent producer and transactions. */ -module.exports = ({ logger, cluster }) => { +module.exports = ({ logger, cluster, transactionTimeout = 30000 }) => { /** * Current producer ID */ let producerId = NO_PRODUCER_ID + /** * Current producer epoch */ let producerEpoch = 0 + /** * Idempotent production requires that the producer track the sequence number of messages. * @@ -25,16 +27,14 @@ module.exports = ({ logger, cluster }) => { return { /** * Initialize the idempotent producer by making an `InitProducerId` request. - * * Overwrites any existing state in this transaction manager */ initProducerId: async () => { await cluster.refreshMetadataIfNecessary() + // If non-transactional we can request the PID from any broker const broker = await cluster.findControllerBroker() - const result = await broker.initProducerId({ - transactionTimeout: 30000, - }) + const result = await broker.initProducerId({ transactionTimeout }) producerId = result.producerId producerEpoch = result.producerEpoch @@ -45,12 +45,15 @@ module.exports = ({ logger, cluster }) => { /** * Get the current sequence for a given Topic-Partition. Defaults to 0. + * * @param {string} topic * @param {string} partition * @returns {number} */ getSequence(topic, partition) { - if (!isInitialized()) return SEQUENCE_START + if (!isInitialized()) { + return SEQUENCE_START + } producerSequence[topic] = producerSequence[topic] || {} producerSequence[topic][partition] = producerSequence[topic][partition] || SEQUENCE_START @@ -67,7 +70,9 @@ module.exports = ({ logger, cluster }) => { * @param {number} sequence */ updateSequence(topic, partition, sequence) { - if (!isInitialized()) return + if (!isInitialized()) { + return + } producerSequence[topic] = producerSequence[topic] || {} producerSequence[topic][partition] = sequence From 3b719d9777092d81958a2b8d7a7d6f13634e27fb Mon Sep 17 00:00:00 2001 From: tulios Date: Wed, 14 Nov 2018 11:13:47 +0100 Subject: [PATCH 13/16] Add default object --- src/broker/__tests__/produce.spec.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/broker/__tests__/produce.spec.js b/src/broker/__tests__/produce.spec.js index 8bf3154ee..0bae0ffa3 100644 --- a/src/broker/__tests__/produce.spec.js +++ b/src/broker/__tests__/produce.spec.js @@ -19,7 +19,7 @@ describe('Broker > Produce', () => { headers: { [`hkey-${secureRandom()}`]: `hvalue-${secureRandom()}` }, }) - const createTopicData = ({ headers = false, firstSequence = 0 }) => [ + const createTopicData = ({ headers = false, firstSequence = 0 } = {}) => [ { topic: topicName, partitions: [ From b3897726510773184212149aa71a4c33d4165f9b Mon Sep 17 00:00:00 2001 From: tulios Date: Wed, 14 Nov 2018 16:26:42 +0100 Subject: [PATCH 14/16] Document and align transaction timeout with the Java client --- README.md | 1 + src/producer/transactionManager.js | 2 +- src/producer/transactionManager.spec.js | 6 +++++- 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index c82fb0a62..8bcb29d41 100644 --- a/README.md +++ b/README.md @@ -258,6 +258,7 @@ await producer.send({ | acks | Control the number of required acks.
__-1__ = all replicas must acknowledge _(default)_
__0__ = no acknowledgments
__1__ = only waits for the leader to acknowledge | `-1` all replicas must acknowledge | | timeout | The time to await a response in ms | `30000` | | compression | Compression codec | `CompressionTypes.None` | +| transactionTimeout | The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction. If this value is larger than the `transaction.max.timeout.ms` setting in the __broker__, the request will fail with a `InvalidTransactionTimeout` error | `60000` | | ----------- |-------------------------------------------------------------------------------------------------- | ------- | | idempotent | _Experimental._ If enabled producer will ensure each message is written exactly once. Acks _must_ be set to -1 ("all"). Retries will default to MAX_SAFE_INTEGER. | `false` | diff --git a/src/producer/transactionManager.js b/src/producer/transactionManager.js index b1b886c24..b11d46687 100644 --- a/src/producer/transactionManager.js +++ b/src/producer/transactionManager.js @@ -4,7 +4,7 @@ const SEQUENCE_START = 0 /** * Manage behavior for an idempotent producer and transactions. */ -module.exports = ({ logger, cluster, transactionTimeout = 30000 }) => { +module.exports = ({ logger, cluster, transactionTimeout = 60000 }) => { /** * Current producer ID */ diff --git a/src/producer/transactionManager.spec.js b/src/producer/transactionManager.spec.js index d02fda988..6f40d7e1f 100644 --- a/src/producer/transactionManager.spec.js +++ b/src/producer/transactionManager.spec.js @@ -21,7 +21,11 @@ describe('Producer > transactionManager', () => { }) test('initializing the producer id and epoch', async () => { - const transactionManager = createTransactionManager({ logger: newLogger(), cluster }) + const transactionManager = createTransactionManager({ + logger: newLogger(), + cluster, + transactionTimeout: 30000, + }) expect(transactionManager.getProducerId()).toEqual(-1) expect(transactionManager.getProducerEpoch()).toEqual(0) From df65f8a38bc6109688d9575b3774ffc368e6cf6d Mon Sep 17 00:00:00 2001 From: Ian Walker-Sperber Date: Wed, 14 Nov 2018 11:39:08 -0800 Subject: [PATCH 15/16] #203 Defensive coding for max signed int32 --- src/producer/sendMessages.js | 5 ++--- src/producer/sendMessages.spec.js | 6 +++--- src/producer/transactionManager.js | 22 ++++++++++++++++++---- src/producer/transactionManager.spec.js | 9 +++++++-- 4 files changed, 30 insertions(+), 12 deletions(-) diff --git a/src/producer/sendMessages.js b/src/producer/sendMessages.js index e12c6c62c..5dfb6c170 100644 --- a/src/producer/sendMessages.js +++ b/src/producer/sendMessages.js @@ -99,10 +99,9 @@ module.exports = ({ logger, cluster, partitioner, transactionManager }) => { const formattedResponse = expectResponse ? responseSerializer(response) : [] formattedResponse.forEach(({ topicName, partition }) => { - const size = topicMetadata.get(topicName).messagesPerPartition[partition].length - const previous = topicMetadata.get(topicName).sequencePerPartition[partition] + const increment = topicMetadata.get(topicName).messagesPerPartition[partition].length - transactionManager.updateSequence(topicName, partition, previous + size) + transactionManager.updateSequence(topicName, partition, increment) }) responsePerBroker.set(broker, formattedResponse) diff --git a/src/producer/sendMessages.spec.js b/src/producer/sendMessages.spec.js index 2bfa16a38..99bf8510c 100644 --- a/src/producer/sendMessages.spec.js +++ b/src/producer/sendMessages.spec.js @@ -230,17 +230,17 @@ describe('Producer > sendMessages', () => { expect(transactionManager.updateSequence).toHaveBeenCalledWith( 'topic-name', 0, - 5 + messagesPerPartition[0].length + messagesPerPartition[0].length ) expect(transactionManager.updateSequence).toHaveBeenCalledWith( 'topic-name', 1, - 5 + messagesPerPartition[1].length + messagesPerPartition[1].length ) expect(transactionManager.updateSequence).toHaveBeenCalledWith( 'topic-name', 2, - 5 + messagesPerPartition[2].length + messagesPerPartition[2].length ) }) }) diff --git a/src/producer/transactionManager.js b/src/producer/transactionManager.js index b11d46687..89df9e2c6 100644 --- a/src/producer/transactionManager.js +++ b/src/producer/transactionManager.js @@ -1,5 +1,6 @@ const NO_PRODUCER_ID = -1 const SEQUENCE_START = 0 +const INT_32_MAX_VALUE = Math.pow(2, 32) /** * Manage behavior for an idempotent producer and transactions. @@ -24,7 +25,7 @@ module.exports = ({ logger, cluster, transactionTimeout = 60000 }) => { const isInitialized = () => producerId !== NO_PRODUCER_ID - return { + const transactionManager = { /** * Initialize the idempotent producer by making an `InitProducerId` request. * Overwrites any existing state in this transaction manager @@ -67,14 +68,25 @@ module.exports = ({ logger, cluster, transactionTimeout = 60000 }) => { * Do nothing if not yet initialized (not idempotent) * @param {string} topic * @param {string} partition - * @param {number} sequence + * @param {number} increment */ - updateSequence(topic, partition, sequence) { + updateSequence(topic, partition, increment) { if (!isInitialized()) { return } - producerSequence[topic] = producerSequence[topic] || {} + const previous = transactionManager.getSequence(topic, partition) + let sequence = previous + increment + + // Sequence is defined as Int32 in the Record Batch, + // so theoretically should need to rotate here + if (sequence >= INT_32_MAX_VALUE) { + logger.debug( + `Sequence for ${topic} ${partition} exceeds max value (${sequence}). Rotating to 0.` + ) + sequence = 0 + } + producerSequence[topic][partition] = sequence }, @@ -94,4 +106,6 @@ module.exports = ({ logger, cluster, transactionTimeout = 60000 }) => { return producerEpoch }, } + + return transactionManager } diff --git a/src/producer/transactionManager.spec.js b/src/producer/transactionManager.spec.js index 6f40d7e1f..07d9bc117 100644 --- a/src/producer/transactionManager.spec.js +++ b/src/producer/transactionManager.spec.js @@ -51,12 +51,17 @@ describe('Producer > transactionManager', () => { expect(transactionManager.getSequence(topic, 1)).toEqual(0) transactionManager.updateSequence(topic, 1, 5) - transactionManager.updateSequence(topic, 1, 10) // Updates, not increments - expect(transactionManager.getSequence(topic, 1)).toEqual(10) + transactionManager.updateSequence(topic, 1, 10) + expect(transactionManager.getSequence(topic, 1)).toEqual(15) expect(transactionManager.getSequence(topic, 2)).toEqual(0) // Different partition expect(transactionManager.getSequence('foobar', 1)).toEqual(0) // Different topic + transactionManager.updateSequence(topic, 3, Math.pow(2, 32) - 100) + expect(transactionManager.getSequence(topic, 3)).toEqual(Math.pow(2, 32) - 100) // Rotates once we reach 2 ^ 32 (max Int32) + transactionManager.updateSequence(topic, 3, 100) + expect(transactionManager.getSequence(topic, 3)).toEqual(0) // Rotates once we reach 2 ^ 32 (max Int32) + await transactionManager.initProducerId() expect(transactionManager.getSequence(topic, 1)).toEqual(0) // Sequences reset by initProducerId }) From 413dc2f5eabc850992dae2bb975d08952714e0b5 Mon Sep 17 00:00:00 2001 From: Ian Walker-Sperber Date: Wed, 14 Nov 2018 14:43:57 -0800 Subject: [PATCH 16/16] EoS #173 Only init producer id once --- src/producer/index.js | 2 +- src/producer/index.spec.js | 50 +++++++++++++++++++------ src/producer/transactionManager.js | 10 +++-- src/producer/transactionManager.spec.js | 2 + 4 files changed, 48 insertions(+), 16 deletions(-) diff --git a/src/producer/index.js b/src/producer/index.js index e3fa9f40d..d8624b717 100644 --- a/src/producer/index.js +++ b/src/producer/index.js @@ -183,7 +183,7 @@ module.exports = ({ await cluster.connect() instrumentationEmitter.emit(CONNECT) - if (idempotent) { + if (idempotent && !transactionManager.isInitialized()) { await transactionManager.initProducerId() } }, diff --git a/src/producer/index.spec.js b/src/producer/index.spec.js index 7ed59d81e..96252110b 100644 --- a/src/producer/index.spec.js +++ b/src/producer/index.spec.js @@ -1,3 +1,22 @@ +let initProducerIdSpy +let retrySpy + +jest.mock('./transactionManager', () => { + return (...args) => { + const transactionManager = jest.requireActual('./transactionManager')(...args) + + initProducerIdSpy = jest.spyOn(transactionManager, 'initProducerId') + + return transactionManager + } +}) + +jest.mock('../retry', () => { + let spy = jest.fn().mockImplementation(jest.requireActual('../retry')) + retrySpy = spy + return spy +}) + const createProducer = require('./index') const { secureRandom, @@ -494,11 +513,6 @@ describe('Producer', () => { }) test('sets the default retry value to MAX_SAFE_INTEGER', async () => { - jest.resetModules() - jest.mock('../retry') - const createRetryMock = require('../retry') - const createProducerMockedRetry = require('./index') - const cluster = createCluster( Object.assign(connectionOpts(), { allowExperimentalV011: true, @@ -506,12 +520,8 @@ describe('Producer', () => { }) ) - producer = createProducerMockedRetry({ cluster, logger: newLogger(), idempotent: true }) - expect(createRetryMock).toHaveBeenCalledWith({ retries: Number.MAX_SAFE_INTEGER }) - - try { - await producer.connect() - } catch (e) {} // Jest will complain about "open handles" if we don't connect. Ignore result. + producer = createProducer({ cluster, logger: newLogger(), idempotent: true }) + expect(retrySpy).toHaveBeenCalledWith({ retries: Number.MAX_SAFE_INTEGER }) }) test('throws an error if retries < 1', async () => { @@ -528,5 +538,23 @@ describe('Producer', () => { ) ) }) + + test('only calls initProducerId if unitialized', async () => { + const cluster = createCluster( + Object.assign(connectionOpts(), { + allowExperimentalV011: true, + createPartitioner: createModPartitioner, + }) + ) + + producer = createProducer({ cluster, logger: newLogger(), idempotent: true }) + + await producer.connect() + expect(initProducerIdSpy).toHaveBeenCalledTimes(1) + + initProducerIdSpy.mockClear() + await producer.connect() + expect(initProducerIdSpy).toHaveBeenCalledTimes(0) + }) }) }) diff --git a/src/producer/transactionManager.js b/src/producer/transactionManager.js index 89df9e2c6..498159755 100644 --- a/src/producer/transactionManager.js +++ b/src/producer/transactionManager.js @@ -23,9 +23,11 @@ module.exports = ({ logger, cluster, transactionTimeout = 60000 }) => { */ let producerSequence = {} - const isInitialized = () => producerId !== NO_PRODUCER_ID - const transactionManager = { + isInitialized() { + return producerId !== NO_PRODUCER_ID + }, + /** * Initialize the idempotent producer by making an `InitProducerId` request. * Overwrites any existing state in this transaction manager @@ -52,7 +54,7 @@ module.exports = ({ logger, cluster, transactionTimeout = 60000 }) => { * @returns {number} */ getSequence(topic, partition) { - if (!isInitialized()) { + if (!transactionManager.isInitialized()) { return SEQUENCE_START } @@ -71,7 +73,7 @@ module.exports = ({ logger, cluster, transactionTimeout = 60000 }) => { * @param {number} increment */ updateSequence(topic, partition, increment) { - if (!isInitialized()) { + if (!transactionManager.isInitialized()) { return } diff --git a/src/producer/transactionManager.spec.js b/src/producer/transactionManager.spec.js index 07d9bc117..5d81d791c 100644 --- a/src/producer/transactionManager.spec.js +++ b/src/producer/transactionManager.spec.js @@ -30,6 +30,7 @@ describe('Producer > transactionManager', () => { expect(transactionManager.getProducerId()).toEqual(-1) expect(transactionManager.getProducerEpoch()).toEqual(0) expect(transactionManager.getSequence(topic, 1)).toEqual(0) + expect(transactionManager.isInitialized()).toEqual(false) await transactionManager.initProducerId() @@ -38,6 +39,7 @@ describe('Producer > transactionManager', () => { expect(transactionManager.getProducerId()).toEqual(mockInitProducerIdResponse.producerId) expect(transactionManager.getProducerEpoch()).toEqual(mockInitProducerIdResponse.producerEpoch) + expect(transactionManager.isInitialized()).toEqual(true) }) test('getting & updating the sequence per topic-partition', async () => {