diff --git a/README.md b/README.md
index d3de26724..8bcb29d41 100644
--- a/README.md
+++ b/README.md
@@ -258,6 +258,9 @@ await producer.send({
| acks | Control the number of required acks.
__-1__ = all replicas must acknowledge _(default)_
__0__ = no acknowledgments
__1__ = only waits for the leader to acknowledge | `-1` all replicas must acknowledge |
| timeout | The time to await a response in ms | `30000` |
| compression | Compression codec | `CompressionTypes.None` |
+| transactionTimeout | The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction. If this value is larger than the `transaction.max.timeout.ms` setting in the __broker__, the request will fail with a `InvalidTransactionTimeout` error | `60000` |
+| ----------- |-------------------------------------------------------------------------------------------------- | ------- |
+| idempotent | _Experimental._ If enabled producer will ensure each message is written exactly once. Acks _must_ be set to -1 ("all"). Retries will default to MAX_SAFE_INTEGER. | `false` |
By default, the producer is configured to distribute the messages with the following logic:
diff --git a/src/broker/__tests__/produce.spec.js b/src/broker/__tests__/produce.spec.js
index ddfc6b362..0bae0ffa3 100644
--- a/src/broker/__tests__/produce.spec.js
+++ b/src/broker/__tests__/produce.spec.js
@@ -1,5 +1,7 @@
const Broker = require('../index')
+const COORDINATOR_TYPES = require('../../protocol/coordinatorTypes')
const { Types: Compression } = require('../../protocol/message/compression')
+const { KafkaJSProtocolError } = require('../../errors')
const {
secureRandom,
createConnection,
@@ -17,12 +19,13 @@ describe('Broker > Produce', () => {
headers: { [`hkey-${secureRandom()}`]: `hvalue-${secureRandom()}` },
})
- const createTopicData = (headers = false) => [
+ const createTopicData = ({ headers = false, firstSequence = 0 } = {}) => [
{
topic: topicName,
partitions: [
{
partition: 0,
+ firstSequence,
messages: [
{
key: `key-${secureRandom()}`,
@@ -203,6 +206,97 @@ describe('Broker > Produce', () => {
})
})
+ testIfKafka011('request with idempotent producer', async () => {
+ // Get producer id & epoch
+ const {
+ coordinator: { host, port },
+ } = await retryProtocol(
+ 'GROUP_COORDINATOR_NOT_AVAILABLE',
+ async () =>
+ await broker.findGroupCoordinator({
+ groupId: `group-${secureRandom()}`,
+ coordinatorType: COORDINATOR_TYPES.GROUP,
+ })
+ )
+
+ const producerBroker = new Broker({
+ connection: createConnection({ host, port }),
+ logger: newLogger(),
+ })
+
+ await producerBroker.connect()
+ const result = await producerBroker.initProducerId({
+ transactionTimeout: 30000,
+ })
+
+ const producerId = result.producerId
+ const producerEpoch = result.producerEpoch
+
+ const metadata = await retryProtocol(
+ 'LEADER_NOT_AVAILABLE',
+ async () => await broker.metadata([topicName])
+ )
+
+ // Find leader of partition
+ const partitionBroker = metadata.topicMetadata[0].partitionMetadata[0].leader
+ const newBrokerData = metadata.brokers.find(b => b.nodeId === partitionBroker)
+
+ // Connect to the correct broker to produce message
+ broker2 = new Broker({
+ connection: createConnection(newBrokerData),
+ logger: newLogger(),
+ allowExperimentalV011: true,
+ })
+ await broker2.connect()
+
+ const response1 = await retryProtocol(
+ 'LEADER_NOT_AVAILABLE',
+ async () =>
+ await broker2.produce({
+ producerId,
+ producerEpoch,
+ topicData: createTopicData({ headers: false }),
+ })
+ )
+
+ expect(response1).toEqual({
+ topics: [
+ {
+ topicName,
+ partitions: [{ baseOffset: '0', errorCode: 0, logAppendTime: '-1', partition: 0 }],
+ },
+ ],
+ throttleTime: 0,
+ })
+
+ // We have to syncronise the sequence number between the producer and the broker
+ await expect(
+ broker2.produce({
+ producerId,
+ producerEpoch,
+ topicData: createTopicData({ headers: false, firstSequence: 1 }), // Too small
+ })
+ ).rejects.toEqual(
+ new KafkaJSProtocolError('The broker received an out of order sequence number')
+ )
+
+ await expect(
+ broker2.produce({
+ producerId,
+ producerEpoch,
+ topicData: createTopicData({ headers: false, firstSequence: 5 }), // Too big
+ })
+ ).rejects.toEqual(
+ new KafkaJSProtocolError('The broker received an out of order sequence number')
+ )
+
+ await broker2.produce({
+ producerId,
+ producerEpoch,
+ topicData: createTopicData({ headers: false, firstSequence: 3 }), // Just right
+ })
+ })
+
testIfKafka011('request with headers', async () => {
const metadata = await retryProtocol(
'LEADER_NOT_AVAILABLE',
diff --git a/src/broker/index.js b/src/broker/index.js
index 6d5658e23..3850d0b21 100644
--- a/src/broker/index.js
+++ b/src/broker/index.js
@@ -160,6 +160,7 @@ module.exports = class Broker {
* partitions: [
* {
* partition: 0,
+ * firstSequence: 0,
* messages: [
* { key: '1', value: 'A' },
* { key: '2', value: 'B' },
@@ -167,6 +168,7 @@ module.exports = class Broker {
* },
* {
* partition: 1,
+ * firstSequence: 0,
* messages: [
* { key: '3', value: 'C' },
* ]
@@ -178,6 +180,7 @@ module.exports = class Broker {
* partitions: [
* {
* partition: 4,
+ * firstSequence: 0,
* messages: [
* { key: '32', value: 'E' },
* ]
diff --git a/src/producer/createTopicData.js b/src/producer/createTopicData.js
index b8527e635..83f70bf1f 100644
--- a/src/producer/createTopicData.js
+++ b/src/producer/createTopicData.js
@@ -1,9 +1,12 @@
module.exports = topicDataForBroker => {
- return topicDataForBroker.map(({ topic, partitions, messagesPerPartition }) => ({
- topic,
- partitions: partitions.map(partition => ({
- partition,
- messages: messagesPerPartition[partition],
- })),
- }))
+ return topicDataForBroker.map(
+ ({ topic, partitions, messagesPerPartition, sequencePerPartition }) => ({
+ topic,
+ partitions: partitions.map(partition => ({
+ partition,
+ firstSequence: sequencePerPartition[partition],
+ messages: messagesPerPartition[partition],
+ })),
+ })
+ )
}
diff --git a/src/producer/createTopicData.spec.js b/src/producer/createTopicData.spec.js
new file mode 100644
index 000000000..eb4a7b7ab
--- /dev/null
+++ b/src/producer/createTopicData.spec.js
@@ -0,0 +1,38 @@
+const createTopicData = require('./createTopicData')
+
+describe('Producer > createTopicData', () => {
+ let topic, partitions, messagesPerPartition, sequencePerPartition
+
+ beforeEach(() => {
+ topic = 'test-topic'
+ partitions = [1, 2, 3]
+
+ messagesPerPartition = {
+ 1: [{ key: '1' }],
+ 2: [{ key: '2' }],
+ 3: [{ key: '3' }, { key: '4' }],
+ }
+
+ sequencePerPartition = {
+ 1: 0,
+ 2: 5,
+ 3: 10,
+ }
+ })
+
+ test('format data by topic and partition', () => {
+ const result = createTopicData([
+ { topic, partitions, messagesPerPartition, sequencePerPartition },
+ ])
+ expect(result).toEqual([
+ {
+ topic,
+ partitions: [
+ { partition: 1, firstSequence: 0, messages: [{ key: '1' }] },
+ { partition: 2, firstSequence: 5, messages: [{ key: '2' }] },
+ { partition: 3, firstSequence: 10, messages: [{ key: '3' }, { key: '4' }] },
+ ],
+ },
+ ])
+ })
+})
diff --git a/src/producer/index.js b/src/producer/index.js
index 49856b024..d8624b717 100644
--- a/src/producer/index.js
+++ b/src/producer/index.js
@@ -3,6 +3,7 @@ const createDefaultPartitioner = require('./partitioners/default')
const createSendMessages = require('./sendMessages')
const InstrumentationEventEmitter = require('../instrumentation/emitter')
const events = require('./instrumentationEvents')
+const createTransactionManager = require('./transactionManager')
const { CONNECT, DISCONNECT } = require('./instrumentationEvents')
const { KafkaJSNonRetriableError } = require('../errors')
@@ -16,13 +17,29 @@ module.exports = ({
cluster,
logger: rootLogger,
createPartitioner = createDefaultPartitioner,
- retry = { retries: 5 },
+ retry,
+ idempotent = false,
+ transactionTimeout,
}) => {
+ retry = retry || { retries: idempotent ? Number.MAX_SAFE_INTEGER : 5 }
+
+ if (idempotent && retry.retries < 1) {
+ throw new KafkaJSNonRetriableError(
+ 'Idempotent producer must allow retries to protect against transient errors'
+ )
+ }
+
+ const logger = rootLogger.namespace('Producer')
+
+ if (idempotent && retry.retries < Number.MAX_SAFE_INTEGER) {
+ logger.warn('Limiting retries for the idempotent producer may invalidate EoS guarantees')
+ }
+
const partitioner = createPartitioner()
const retrier = createRetry(Object.assign({}, cluster.retry, retry))
const instrumentationEmitter = new InstrumentationEventEmitter()
- const logger = rootLogger.namespace('Producer')
- const sendMessages = createSendMessages({ logger, cluster, partitioner })
+ const transactionManager = createTransactionManager({ logger, cluster, transactionTimeout })
+ const sendMessages = createSendMessages({ logger, cluster, partitioner, transactionManager })
/**
* @typedef {Object} TopicMessages
@@ -36,6 +53,7 @@ module.exports = ({
* -1 = all replicas must acknowledge
* 0 = no acknowledgments
* 1 = only waits for the leader to acknowledge
+ *
* @property {number} [timeout=30000] The time to await a response in ms
* @property {Compression.Types} [compression=Compression.Types.None] Compression codec
*
@@ -47,6 +65,12 @@ module.exports = ({
throw new KafkaJSNonRetriableError(`Invalid topic`)
}
+ if (idempotent && acks !== -1) {
+ throw new KafkaJSNonRetriableError(
+ `Not requiring ack for all messages invalidates the idempotent producer's EoS guarantees`
+ )
+ }
+
for (let { topic, messages } of topicMessages) {
if (!messages) {
throw new KafkaJSNonRetriableError(
@@ -158,6 +182,10 @@ module.exports = ({
connect: async () => {
await cluster.connect()
instrumentationEmitter.emit(CONNECT)
+
+ if (idempotent && !transactionManager.isInitialized()) {
+ await transactionManager.initProducerId()
+ }
},
/**
diff --git a/src/producer/index.spec.js b/src/producer/index.spec.js
index 372fef065..96252110b 100644
--- a/src/producer/index.spec.js
+++ b/src/producer/index.spec.js
@@ -1,3 +1,22 @@
+let initProducerIdSpy
+let retrySpy
+
+jest.mock('./transactionManager', () => {
+ return (...args) => {
+ const transactionManager = jest.requireActual('./transactionManager')(...args)
+
+ initProducerIdSpy = jest.spyOn(transactionManager, 'initProducerId')
+
+ return transactionManager
+ }
+})
+
+jest.mock('../retry', () => {
+ let spy = jest.fn().mockImplementation(jest.requireActual('../retry'))
+ retrySpy = spy
+ return spy
+})
+
const createProducer = require('./index')
const {
secureRandom,
@@ -14,7 +33,7 @@ const {
createTopic,
} = require('testHelpers')
-const { KafkaJSSASLAuthenticationError } = require('../errors')
+const { KafkaJSSASLAuthenticationError, KafkaJSNonRetriableError } = require('../errors')
describe('Producer', () => {
let topicName, producer
@@ -181,255 +200,304 @@ describe('Producer', () => {
expect(cluster.isConnected()).toEqual(true)
})
- test('produce messages', async () => {
- const cluster = createCluster(
- Object.assign(connectionOpts(), {
- createPartitioner: createModPartitioner,
- })
- )
-
- await createTopic({ topic: topicName })
-
- producer = createProducer({ cluster, logger: newLogger() })
- await producer.connect()
-
- const sendMessages = async () =>
- await producer.send({
- acks: 1,
- topic: topicName,
- messages: new Array(10).fill().map((_, i) => ({
- key: `key-${i}`,
- value: `value-${i}`,
- })),
- })
-
- expect(await sendMessages()).toEqual([
- {
- topicName,
- errorCode: 0,
- offset: '0',
- partition: 0,
- timestamp: '-1',
- },
- ])
-
- expect(await sendMessages()).toEqual([
- {
- topicName,
- errorCode: 0,
- offset: '10',
- partition: 0,
- timestamp: '-1',
- },
- ])
+ test('gives access to its logger', () => {
+ producer = createProducer({ cluster: createCluster(), logger: newLogger() })
+ expect(producer.logger()).toMatchSnapshot()
})
- testIfKafka011('produce messages for Kafka 0.11', async () => {
- const cluster = createCluster(
- Object.assign(connectionOpts(), {
- allowExperimentalV011: true,
- createPartitioner: createModPartitioner,
- })
+ test('on throws an error when provided with an invalid event name', () => {
+ producer = createProducer({ cluster: createCluster(), logger: newLogger() })
+
+ expect(() => producer.on('NON_EXISTENT_EVENT', () => {})).toThrow(
+ /Event name should be one of producer.events./
)
+ })
- await createTopic({ topic: topicName })
+ test('emits connection events', async () => {
+ producer = createProducer({ cluster: createCluster(), logger: newLogger() })
+ const connectListener = jest.fn().mockName('connect')
+ const disconnectListener = jest.fn().mockName('disconnect')
+ producer.on(producer.events.CONNECT, connectListener)
+ producer.on(producer.events.DISCONNECT, disconnectListener)
- producer = createProducer({ cluster, logger: newLogger() })
await producer.connect()
+ expect(connectListener).toHaveBeenCalled()
- const sendMessages = async () =>
- await producer.send({
- acks: 1,
- topic: topicName,
- messages: new Array(10).fill().map((_, i) => ({
- key: `key-${i}`,
- value: `value-${i}`,
- })),
- })
-
- expect(await sendMessages()).toEqual([
- {
- topicName,
- baseOffset: '0',
- errorCode: 0,
- logAppendTime: '-1',
- partition: 0,
- },
- ])
-
- expect(await sendMessages()).toEqual([
- {
- topicName,
- baseOffset: '10',
- errorCode: 0,
- logAppendTime: '-1',
- partition: 0,
- },
- ])
+ await producer.disconnect()
+ expect(disconnectListener).toHaveBeenCalled()
})
- testIfKafka011('produce messages for Kafka 0.11 with headers', async () => {
- const cluster = createCluster(
- Object.assign(connectionOpts(), {
- allowExperimentalV011: true,
+ describe('when acks=0', () => {
+ it('returns immediately', async () => {
+ const cluster = createCluster({
+ ...connectionOpts(),
createPartitioner: createModPartitioner,
})
- )
- await createTopic({ topic: topicName })
+ await createTopic({ topic: topicName })
- producer = createProducer({ cluster, logger: newLogger() })
- await producer.connect()
+ producer = createProducer({ cluster, logger: newLogger() })
+ await producer.connect()
- const sendMessages = async () =>
- await producer.send({
- acks: 1,
- topic: topicName,
- messages: new Array(10).fill().map((_, i) => ({
- key: `key-${i}`,
- value: `value-${i}`,
- headers: {
- [`header-a${i}`]: `header-value-a${i}`,
- [`header-b${i}`]: `header-value-b${i}`,
- [`header-c${i}`]: `header-value-c${i}`,
- },
- })),
- })
+ const sendMessages = async () =>
+ await producer.send({
+ acks: 0,
+ topic: topicName,
+ messages: new Array(10).fill().map((_, i) => ({
+ key: `key-${i}`,
+ value: `value-${i}`,
+ })),
+ })
- expect(await sendMessages()).toEqual([
- {
- topicName,
- baseOffset: '0',
- errorCode: 0,
- logAppendTime: '-1',
- partition: 0,
- },
- ])
-
- expect(await sendMessages()).toEqual([
- {
- topicName,
- baseOffset: '10',
- errorCode: 0,
- logAppendTime: '-1',
- partition: 0,
- },
- ])
+ expect(await sendMessages()).toEqual([])
+ })
})
- test('produce messages to multiple topics', async () => {
- const topics = [`test-topic-${secureRandom()}`, `test-topic-${secureRandom()}`]
+ function testProduceMessages(idempotent = false) {
+ const acks = idempotent ? -1 : 1
- await createTopic({ topic: topics[0] })
- await createTopic({ topic: topics[1] })
+ test('produce messages', async () => {
+ const cluster = createCluster(
+ Object.assign(connectionOpts(), {
+ createPartitioner: createModPartitioner,
+ })
+ )
- const cluster = createCluster({
- ...connectionOpts(),
- createPartitioner: createModPartitioner,
- })
- const byTopicName = (a, b) => a.topicName.localeCompare(b.topicName)
+ await createTopic({ topic: topicName })
- producer = createProducer({ cluster, logger: newLogger() })
- await producer.connect()
+ producer = createProducer({ cluster, logger: newLogger(), idempotent })
+ await producer.connect()
- const sendBatch = async topics => {
- const topicMessages = topics.map(topic => ({
- acks: 1,
- topic,
- messages: new Array(10).fill().map((_, i) => ({
- key: `key-${i}`,
- value: `value-${i}`,
- })),
- }))
-
- return producer.sendBatch({
- acks: 1,
- topicMessages,
- })
- }
+ const sendMessages = async () =>
+ await producer.send({
+ acks,
+ topic: topicName,
+ messages: new Array(10).fill().map((_, i) => ({
+ key: `key-${i}`,
+ value: `value-${i}`,
+ })),
+ })
- let result = await sendBatch(topics)
- expect(result.sort(byTopicName)).toEqual(
- [
+ expect(await sendMessages()).toEqual([
{
- topicName: topics[0],
+ topicName,
errorCode: 0,
offset: '0',
partition: 0,
timestamp: '-1',
},
+ ])
+
+ expect(await sendMessages()).toEqual([
{
- topicName: topics[1],
+ topicName,
errorCode: 0,
- offset: '0',
+ offset: '10',
partition: 0,
timestamp: '-1',
},
- ].sort(byTopicName)
- )
+ ])
+ })
+
+ test('produce messages to multiple topics', async () => {
+ const topics = [`test-topic-${secureRandom()}`, `test-topic-${secureRandom()}`]
+
+ await createTopic({ topic: topics[0] })
+ await createTopic({ topic: topics[1] })
+
+ const cluster = createCluster({
+ ...connectionOpts(),
+ createPartitioner: createModPartitioner,
+ })
+ const byTopicName = (a, b) => a.topicName.localeCompare(b.topicName)
+
+ producer = createProducer({ cluster, logger: newLogger(), idempotent })
+ await producer.connect()
+
+ const sendBatch = async topics => {
+ const topicMessages = topics.map(topic => ({
+ acks,
+ topic,
+ messages: new Array(10).fill().map((_, i) => ({
+ key: `key-${i}`,
+ value: `value-${i}`,
+ })),
+ }))
+
+ return producer.sendBatch({
+ acks,
+ topicMessages,
+ })
+ }
+
+ let result = await sendBatch(topics)
+ expect(result.sort(byTopicName)).toEqual(
+ [
+ {
+ topicName: topics[0],
+ errorCode: 0,
+ offset: '0',
+ partition: 0,
+ timestamp: '-1',
+ },
+ {
+ topicName: topics[1],
+ errorCode: 0,
+ offset: '0',
+ partition: 0,
+ timestamp: '-1',
+ },
+ ].sort(byTopicName)
+ )
- result = await sendBatch(topics)
- expect(result.sort(byTopicName)).toEqual(
- [
+ result = await sendBatch(topics)
+ expect(result.sort(byTopicName)).toEqual(
+ [
+ {
+ topicName: topics[0],
+ errorCode: 0,
+ offset: '10',
+ partition: 0,
+ timestamp: '-1',
+ },
+ {
+ topicName: topics[1],
+ errorCode: 0,
+ offset: '10',
+ partition: 0,
+ timestamp: '-1',
+ },
+ ].sort(byTopicName)
+ )
+ })
+
+ testIfKafka011('produce messages for Kafka 0.11', async () => {
+ const cluster = createCluster(
+ Object.assign(connectionOpts(), {
+ allowExperimentalV011: true,
+ createPartitioner: createModPartitioner,
+ })
+ )
+
+ await createTopic({ topic: topicName })
+
+ producer = createProducer({ cluster, logger: newLogger(), idempotent })
+ await producer.connect()
+
+ const sendMessages = async () =>
+ await producer.send({
+ acks,
+ topic: topicName,
+ messages: new Array(10).fill().map((_, i) => ({
+ key: `key-${i}`,
+ value: `value-${i}`,
+ })),
+ })
+
+ expect(await sendMessages()).toEqual([
{
- topicName: topics[0],
+ topicName,
+ baseOffset: '0',
errorCode: 0,
- offset: '10',
+ logAppendTime: '-1',
partition: 0,
- timestamp: '-1',
},
+ ])
+
+ expect(await sendMessages()).toEqual([
{
- topicName: topics[1],
+ topicName,
+ baseOffset: '10',
errorCode: 0,
- offset: '10',
+ logAppendTime: '-1',
partition: 0,
- timestamp: '-1',
},
- ].sort(byTopicName)
- )
- })
+ ])
+ })
- test('gives access to its logger', () => {
- producer = createProducer({ cluster: createCluster(), logger: newLogger() })
- expect(producer.logger()).toMatchSnapshot()
- })
+ testIfKafka011('produce messages for Kafka 0.11 with headers', async () => {
+ const cluster = createCluster(
+ Object.assign(connectionOpts(), {
+ allowExperimentalV011: true,
+ createPartitioner: createModPartitioner,
+ })
+ )
- test('on throws an error when provided with an invalid event name', () => {
- producer = createProducer({ cluster: createCluster(), logger: newLogger() })
+ await createTopic({ topic: topicName })
- expect(() => producer.on('NON_EXISTENT_EVENT', () => {})).toThrow(
- /Event name should be one of producer.events./
- )
- })
+ producer = createProducer({ cluster, logger: newLogger(), idempotent })
+ await producer.connect()
- test('emits connection events', async () => {
- producer = createProducer({ cluster: createCluster(), logger: newLogger() })
- const connectListener = jest.fn().mockName('connect')
- const disconnectListener = jest.fn().mockName('disconnect')
- producer.on(producer.events.CONNECT, connectListener)
- producer.on(producer.events.DISCONNECT, disconnectListener)
+ const sendMessages = async () =>
+ await producer.send({
+ acks,
+ topic: topicName,
+ messages: new Array(10).fill().map((_, i) => ({
+ key: `key-${i}`,
+ value: `value-${i}`,
+ headers: {
+ [`header-a${i}`]: `header-value-a${i}`,
+ [`header-b${i}`]: `header-value-b${i}`,
+ [`header-c${i}`]: `header-value-c${i}`,
+ },
+ })),
+ })
- await producer.connect()
- expect(connectListener).toHaveBeenCalled()
+ expect(await sendMessages()).toEqual([
+ {
+ topicName,
+ baseOffset: '0',
+ errorCode: 0,
+ logAppendTime: '-1',
+ partition: 0,
+ },
+ ])
- await producer.disconnect()
- expect(disconnectListener).toHaveBeenCalled()
- })
+ expect(await sendMessages()).toEqual([
+ {
+ topicName,
+ baseOffset: '10',
+ errorCode: 0,
+ logAppendTime: '-1',
+ partition: 0,
+ },
+ ])
+ })
+ }
- describe('when acks=0', () => {
- it('returns immediately', async () => {
- const cluster = createCluster({
- ...connectionOpts(),
- createPartitioner: createModPartitioner,
- })
+ testProduceMessages(false)
- await createTopic({ topic: topicName })
+ describe('when idempotent=true', () => {
+ testProduceMessages(true)
- producer = createProducer({ cluster, logger: newLogger() })
+ test('throws an error if acks != -1', async () => {
+ const cluster = createCluster(
+ Object.assign(connectionOpts(), {
+ allowExperimentalV011: true,
+ createPartitioner: createModPartitioner,
+ })
+ )
+
+ producer = createProducer({ cluster, logger: newLogger(), idempotent: true })
await producer.connect()
- const sendMessages = async () =>
- await producer.send({
+ await expect(
+ producer.send({
+ acks: 1,
+ topic: topicName,
+ messages: new Array(10).fill().map((_, i) => ({
+ key: `key-${i}`,
+ value: `value-${i}`,
+ })),
+ })
+ ).rejects.toEqual(
+ new KafkaJSNonRetriableError(
+ "Not requiring ack for all messages invalidates the idempotent producer's EoS guarantees"
+ )
+ )
+
+ await expect(
+ producer.send({
acks: 0,
topic: topicName,
messages: new Array(10).fill().map((_, i) => ({
@@ -437,8 +505,56 @@ describe('Producer', () => {
value: `value-${i}`,
})),
})
+ ).rejects.toEqual(
+ new KafkaJSNonRetriableError(
+ "Not requiring ack for all messages invalidates the idempotent producer's EoS guarantees"
+ )
+ )
+ })
- expect(await sendMessages()).toEqual([])
+ test('sets the default retry value to MAX_SAFE_INTEGER', async () => {
+ const cluster = createCluster(
+ Object.assign(connectionOpts(), {
+ allowExperimentalV011: true,
+ createPartitioner: createModPartitioner,
+ })
+ )
+
+ producer = createProducer({ cluster, logger: newLogger(), idempotent: true })
+ expect(retrySpy).toHaveBeenCalledWith({ retries: Number.MAX_SAFE_INTEGER })
+ })
+
+ test('throws an error if retries < 1', async () => {
+ expect(() =>
+ createProducer({
+ cluster: {},
+ logger: newLogger(),
+ idempotent: true,
+ retry: { retries: 0 },
+ })
+ ).toThrowError(
+ new KafkaJSNonRetriableError(
+ 'Idempotent producer must allow retries to protect against transient errors'
+ )
+ )
+ })
+
+ test('only calls initProducerId if unitialized', async () => {
+ const cluster = createCluster(
+ Object.assign(connectionOpts(), {
+ allowExperimentalV011: true,
+ createPartitioner: createModPartitioner,
+ })
+ )
+
+ producer = createProducer({ cluster, logger: newLogger(), idempotent: true })
+
+ await producer.connect()
+ expect(initProducerIdSpy).toHaveBeenCalledTimes(1)
+
+ initProducerIdSpy.mockClear()
+ await producer.connect()
+ expect(initProducerIdSpy).toHaveBeenCalledTimes(0)
})
})
})
diff --git a/src/producer/sendMessages.js b/src/producer/sendMessages.js
index 9cdeb9d22..5dfb6c170 100644
--- a/src/producer/sendMessages.js
+++ b/src/producer/sendMessages.js
@@ -12,7 +12,7 @@ const staleMetadata = e =>
e.type
)
-module.exports = ({ logger, cluster, partitioner }) => {
+module.exports = ({ logger, cluster, partitioner, transactionManager }) => {
const retrier = createRetry({ retries: TOTAL_INDIVIDUAL_ATTEMPTS })
return async ({ acks, timeout, compression, topicMessages }) => {
@@ -47,10 +47,19 @@ module.exports = ({ logger, cluster, partitioner }) => {
})
const partitions = keys(messagesPerPartition)
+ const sequencePerPartition = partitions.reduce((result, partition) => {
+ result[partition] = transactionManager.getSequence(topic, partition)
+ return result
+ }, {})
+
const partitionsPerLeader = cluster.findLeaderForPartitions(topic, partitions)
const leaders = keys(partitionsPerLeader)
- topicMetadata.set(topic, { partitionsPerLeader, messagesPerPartition })
+ topicMetadata.set(topic, {
+ partitionsPerLeader,
+ messagesPerPartition,
+ sequencePerPartition,
+ })
for (let nodeId of leaders) {
const broker = await cluster.findBroker({ nodeId })
@@ -67,18 +76,35 @@ module.exports = ({ logger, cluster, partitioner }) => {
const entries = Array.from(topicMetadata.entries())
const topicDataForBroker = entries
.filter(([_, { partitionsPerLeader }]) => !!partitionsPerLeader[broker.nodeId])
- .map(([topic, { partitionsPerLeader, messagesPerPartition }]) => ({
+ .map(([topic, { partitionsPerLeader, messagesPerPartition, sequencePerPartition }]) => ({
topic,
partitions: partitionsPerLeader[broker.nodeId],
+ sequencePerPartition,
messagesPerPartition,
}))
const topicData = createTopicData(topicDataForBroker)
try {
- const response = await broker.produce({ acks, timeout, compression, topicData })
+ const response = await broker.produce({
+ producerId: transactionManager.getProducerId(),
+ producerEpoch: transactionManager.getProducerEpoch(),
+ acks,
+ timeout,
+ compression,
+ topicData,
+ })
+
const expectResponse = acks !== 0
- responsePerBroker.set(broker, expectResponse ? responseSerializer(response) : [])
+ const formattedResponse = expectResponse ? responseSerializer(response) : []
+
+ formattedResponse.forEach(({ topicName, partition }) => {
+ const increment = topicMetadata.get(topicName).messagesPerPartition[partition].length
+
+ transactionManager.updateSequence(topicName, partition, increment)
+ })
+
+ responsePerBroker.set(broker, formattedResponse)
} catch (e) {
responsePerBroker.delete(broker)
throw e
diff --git a/src/producer/sendMessages.spec.js b/src/producer/sendMessages.spec.js
index 97907042f..99bf8510c 100644
--- a/src/producer/sendMessages.spec.js
+++ b/src/producer/sendMessages.spec.js
@@ -26,7 +26,13 @@ describe('Producer > sendMessages', () => {
3: [2],
}
- let messages, partitioner, brokers, cluster, messagesPerPartition, topicPartitionMetadata
+ let messages,
+ partitioner,
+ brokers,
+ cluster,
+ messagesPerPartition,
+ topicPartitionMetadata,
+ transactionManager
beforeEach(() => {
messages = []
@@ -61,11 +67,23 @@ describe('Producer > sendMessages', () => {
'2': [{ key: '2' }, { key: '5' }, { key: '8' }],
}
+ transactionManager = {
+ getProducerId: jest.fn(() => -1),
+ getProducerEpoch: jest.fn(() => 0),
+ getSequence: jest.fn(() => 0),
+ updateSequence: jest.fn(),
+ }
+
require('./groupMessagesPerPartition').mockImplementation(() => messagesPerPartition)
})
test('only retry failed brokers', async () => {
- const sendMessages = createSendMessages({ logger: newLogger(), cluster, partitioner })
+ const sendMessages = createSendMessages({
+ logger: newLogger(),
+ cluster,
+ partitioner,
+ transactionManager,
+ })
brokers[1].produce
.mockImplementationOnce(() => {
@@ -111,7 +129,12 @@ describe('Producer > sendMessages', () => {
}
}
- const sendMessages = createSendMessages({ logger: newLogger(), cluster, partitioner })
+ const sendMessages = createSendMessages({
+ logger: newLogger(),
+ cluster,
+ partitioner,
+ transactionManager,
+ })
brokers[1].produce
.mockImplementationOnce(() => {
throw new FakeError()
@@ -125,7 +148,12 @@ describe('Producer > sendMessages', () => {
}
test('does not re-produce messages to brokers that are no longer leaders after metadata refresh', async () => {
- const sendMessages = createSendMessages({ logger: newLogger(), cluster, partitioner })
+ const sendMessages = createSendMessages({
+ logger: newLogger(),
+ cluster,
+ partitioner,
+ transactionManager,
+ })
brokers[2].produce
.mockImplementationOnce(() => {
@@ -152,7 +180,12 @@ describe('Producer > sendMessages', () => {
})
test('refreshes metadata if partition metadata is empty', async () => {
- const sendMessages = createSendMessages({ logger: newLogger(), cluster, partitioner })
+ const sendMessages = createSendMessages({
+ logger: newLogger(),
+ cluster,
+ partitioner,
+ transactionManager,
+ })
cluster.findTopicPartitionMetadata
.mockImplementationOnce(() => ({}))
@@ -162,4 +195,52 @@ describe('Producer > sendMessages', () => {
expect(cluster.refreshMetadata).toHaveBeenCalled()
})
+
+ test('retrieves sequence information from the transaction manager and updates', async () => {
+ const sendMessages = createSendMessages({
+ logger: newLogger(),
+ cluster,
+ partitioner,
+ transactionManager,
+ })
+
+ transactionManager.getSequence.mockReturnValue(5)
+
+ cluster.findTopicPartitionMetadata
+ .mockImplementationOnce(() => ({}))
+ .mockImplementationOnce(() => partitionsPerLeader)
+
+ await sendMessages({
+ topicMessages: [{ topic, messages }],
+ })
+
+ expect(brokers[1].produce.mock.calls[0][0].topicData[0].partitions[0]).toHaveProperty(
+ 'firstSequence',
+ 5
+ )
+ expect(brokers[2].produce.mock.calls[0][0].topicData[0].partitions[0]).toHaveProperty(
+ 'firstSequence',
+ 5
+ )
+ expect(brokers[3].produce.mock.calls[0][0].topicData[0].partitions[0]).toHaveProperty(
+ 'firstSequence',
+ 5
+ )
+
+ expect(transactionManager.updateSequence).toHaveBeenCalledWith(
+ 'topic-name',
+ 0,
+ messagesPerPartition[0].length
+ )
+ expect(transactionManager.updateSequence).toHaveBeenCalledWith(
+ 'topic-name',
+ 1,
+ messagesPerPartition[1].length
+ )
+ expect(transactionManager.updateSequence).toHaveBeenCalledWith(
+ 'topic-name',
+ 2,
+ messagesPerPartition[2].length
+ )
+ })
})
diff --git a/src/producer/transactionManager.js b/src/producer/transactionManager.js
new file mode 100644
index 000000000..498159755
--- /dev/null
+++ b/src/producer/transactionManager.js
@@ -0,0 +1,113 @@
+const NO_PRODUCER_ID = -1
+const SEQUENCE_START = 0
+const INT_32_MAX_VALUE = Math.pow(2, 32)
+
+/**
+ * Manage behavior for an idempotent producer and transactions.
+ */
+module.exports = ({ logger, cluster, transactionTimeout = 60000 }) => {
+ /**
+ * Current producer ID
+ */
+ let producerId = NO_PRODUCER_ID
+
+ /**
+ * Current producer epoch
+ */
+ let producerEpoch = 0
+
+ /**
+ * Idempotent production requires that the producer track the sequence number of messages.
+ *
+ * Sequences are sent with every Record Batch and tracked per Topic-Partition
+ */
+ let producerSequence = {}
+
+ const transactionManager = {
+ isInitialized() {
+ return producerId !== NO_PRODUCER_ID
+ },
+
+ /**
+ * Initialize the idempotent producer by making an `InitProducerId` request.
+ * Overwrites any existing state in this transaction manager
+ */
+ initProducerId: async () => {
+ await cluster.refreshMetadataIfNecessary()
+
+ // If non-transactional we can request the PID from any broker
+ const broker = await cluster.findControllerBroker()
+ const result = await broker.initProducerId({ transactionTimeout })
+
+ producerId = result.producerId
+ producerEpoch = result.producerEpoch
+ producerSequence = {}
+
+ logger.debug('Initialized producer id & epoch', { producerId, producerEpoch })
+ },
+
+ /**
+ * Get the current sequence for a given Topic-Partition. Defaults to 0.
+ *
+ * @param {string} topic
+ * @param {string} partition
+ * @returns {number}
+ */
+ getSequence(topic, partition) {
+ if (!transactionManager.isInitialized()) {
+ return SEQUENCE_START
+ }
+
+ producerSequence[topic] = producerSequence[topic] || {}
+ producerSequence[topic][partition] = producerSequence[topic][partition] || SEQUENCE_START
+
+ return producerSequence[topic][partition]
+ },
+
+ /**
+ * Update the sequence for a given Topic-Partition.
+ *
+ * Do nothing if not yet initialized (not idempotent)
+ * @param {string} topic
+ * @param {string} partition
+ * @param {number} increment
+ */
+ updateSequence(topic, partition, increment) {
+ if (!transactionManager.isInitialized()) {
+ return
+ }
+
+ const previous = transactionManager.getSequence(topic, partition)
+ let sequence = previous + increment
+
+ // Sequence is defined as Int32 in the Record Batch,
+ // so theoretically should need to rotate here
+ if (sequence >= INT_32_MAX_VALUE) {
+ logger.debug(
+ `Sequence for ${topic} ${partition} exceeds max value (${sequence}). Rotating to 0.`
+ )
+ sequence = 0
+ }
+
+ producerSequence[topic][partition] = sequence
+ },
+
+ /**
+ * Get the current producer id
+ * @returns {number}
+ */
+ getProducerId() {
+ return producerId
+ },
+
+ /**
+ * Get the current producer epoch
+ * @returns {number}
+ */
+ getProducerEpoch() {
+ return producerEpoch
+ },
+ }
+
+ return transactionManager
+}
diff --git a/src/producer/transactionManager.spec.js b/src/producer/transactionManager.spec.js
new file mode 100644
index 000000000..5d81d791c
--- /dev/null
+++ b/src/producer/transactionManager.spec.js
@@ -0,0 +1,70 @@
+const { newLogger } = require('testHelpers')
+const createTransactionManager = require('./transactionManager')
+
+describe('Producer > transactionManager', () => {
+ const topic = 'topic-name'
+ const mockInitProducerIdResponse = {
+ producerId: 1000,
+ producerEpoch: 1,
+ }
+
+ let cluster, broker
+
+ beforeEach(() => {
+ broker = {
+ initProducerId: jest.fn().mockReturnValue(mockInitProducerIdResponse),
+ }
+ cluster = {
+ refreshMetadataIfNecessary: jest.fn(),
+ findControllerBroker: jest.fn().mockReturnValue(broker),
+ }
+ })
+
+ test('initializing the producer id and epoch', async () => {
+ const transactionManager = createTransactionManager({
+ logger: newLogger(),
+ cluster,
+ transactionTimeout: 30000,
+ })
+
+ expect(transactionManager.getProducerId()).toEqual(-1)
+ expect(transactionManager.getProducerEpoch()).toEqual(0)
+ expect(transactionManager.getSequence(topic, 1)).toEqual(0)
+ expect(transactionManager.isInitialized()).toEqual(false)
+
+ await transactionManager.initProducerId()
+
+ expect(cluster.refreshMetadataIfNecessary).toHaveBeenCalled()
+ expect(broker.initProducerId).toHaveBeenCalledWith({ transactionTimeout: 30000 })
+
+ expect(transactionManager.getProducerId()).toEqual(mockInitProducerIdResponse.producerId)
+ expect(transactionManager.getProducerEpoch()).toEqual(mockInitProducerIdResponse.producerEpoch)
+ expect(transactionManager.isInitialized()).toEqual(true)
+ })
+
+ test('getting & updating the sequence per topic-partition', async () => {
+ const transactionManager = createTransactionManager({ logger: newLogger(), cluster })
+
+ expect(transactionManager.getSequence(topic, 1)).toEqual(0)
+ transactionManager.updateSequence(topic, 1, 10) // No effect if we haven't initialized
+ expect(transactionManager.getSequence(topic, 1)).toEqual(0)
+
+ await transactionManager.initProducerId()
+
+ expect(transactionManager.getSequence(topic, 1)).toEqual(0)
+ transactionManager.updateSequence(topic, 1, 5)
+ transactionManager.updateSequence(topic, 1, 10)
+ expect(transactionManager.getSequence(topic, 1)).toEqual(15)
+
+ expect(transactionManager.getSequence(topic, 2)).toEqual(0) // Different partition
+ expect(transactionManager.getSequence('foobar', 1)).toEqual(0) // Different topic
+
+ transactionManager.updateSequence(topic, 3, Math.pow(2, 32) - 100)
+ expect(transactionManager.getSequence(topic, 3)).toEqual(Math.pow(2, 32) - 100) // Rotates once we reach 2 ^ 32 (max Int32)
+ transactionManager.updateSequence(topic, 3, 100)
+ expect(transactionManager.getSequence(topic, 3)).toEqual(0) // Rotates once we reach 2 ^ 32 (max Int32)
+
+ await transactionManager.initProducerId()
+ expect(transactionManager.getSequence(topic, 1)).toEqual(0) // Sequences reset by initProducerId
+ })
+})
diff --git a/src/protocol/requests/produce/fixtures/v3_request.json b/src/protocol/requests/produce/fixtures/v3_request.json
index 72d749e18..a9e06152d 100644
--- a/src/protocol/requests/produce/fixtures/v3_request.json
+++ b/src/protocol/requests/produce/fixtures/v3_request.json
@@ -1 +1 @@
-{"type":"Buffer","data":[255,255,255,255,0,0,117,48,0,0,0,1,0,31,116,101,115,116,45,116,111,112,105,99,45,101,98,98,97,54,56,56,55,57,99,54,102,53,48,56,49,100,56,99,50,0,0,0,1,0,0,0,0,0,0,1,6,0,0,0,0,0,0,0,0,0,0,0,250,0,0,0,0,2,156,66,78,51,0,0,0,0,0,2,0,0,1,95,142,187,58,12,0,0,1,95,142,187,58,12,255,255,255,255,255,255,255,255,0,0,0,0,0,0,0,0,0,3,130,1,0,0,0,48,107,101,121,45,57,100,48,102,51,52,56,99,98,50,101,55,51,48,101,49,101,100,99,52,62,115,111,109,101,45,118,97,108,117,101,45,97,49,55,98,52,99,56,49,102,57,101,99,100,49,101,56,57,54,101,51,2,2,97,2,98,130,1,0,0,2,48,107,101,121,45,99,55,48,55,51,101,57,54,53,99,51,52,98,52,99,99,54,52,52,50,62,115,111,109,101,45,118,97,108,117,101,45,54,53,100,102,52,50,50,48,55,48,100,55,97,100,55,51,57,49,52,102,2,2,97,2,98,130,1,0,0,4,48,107,101,121,45,49,54,57,51,98,49,56,52,97,57,98,53,50,100,98,101,48,51,98,99,62,115,111,109,101,45,118,97,108,117,101,45,51,102,99,98,54,53,102,102,99,97,48,56,55,99,98,97,50,48,97,100,2,2,97,2,98]}
+{"type":"Buffer","data":[255,255,255,255,0,0,117,48,0,0,0,1,0,31,116,101,115,116,45,116,111,112,105,99,45,101,98,98,97,54,56,56,55,57,99,54,102,53,48,56,49,100,56,99,50,0,0,0,1,0,0,0,0,0,0,1,6,0,0,0,0,0,0,0,0,0,0,0,250,0,0,0,0,2,224,115,141,85,0,0,0,0,0,2,0,0,1,95,142,187,58,12,0,0,1,95,142,187,58,12,255,255,255,255,255,255,255,255,0,0,0,0,0,10,0,0,0,3,130,1,0,0,0,48,107,101,121,45,57,100,48,102,51,52,56,99,98,50,101,55,51,48,101,49,101,100,99,52,62,115,111,109,101,45,118,97,108,117,101,45,97,49,55,98,52,99,56,49,102,57,101,99,100,49,101,56,57,54,101,51,2,2,97,2,98,130,1,0,0,2,48,107,101,121,45,99,55,48,55,51,101,57,54,53,99,51,52,98,52,99,99,54,52,52,50,62,115,111,109,101,45,118,97,108,117,101,45,54,53,100,102,52,50,50,48,55,48,100,55,97,100,55,51,57,49,52,102,2,2,97,2,98,130,1,0,0,4,48,107,101,121,45,49,54,57,51,98,49,56,52,97,57,98,53,50,100,98,101,48,51,98,99,62,115,111,109,101,45,118,97,108,117,101,45,51,102,99,98,54,53,102,102,99,97,48,56,55,99,98,97,50,48,97,100,2,2,97,2,98]}
diff --git a/src/protocol/requests/produce/v3/request.js b/src/protocol/requests/produce/v3/request.js
index 67726bc36..c661c529c 100644
--- a/src/protocol/requests/produce/v3/request.js
+++ b/src/protocol/requests/produce/v3/request.js
@@ -78,6 +78,7 @@ const partitionsEncoder = compression => async ({
partition,
messages,
transactionalId,
+ firstSequence,
producerId,
producerEpoch,
}) => {
@@ -106,6 +107,7 @@ const partitionsEncoder = compression => async ({
maxTimestamp,
producerId,
producerEpoch,
+ firstSequence,
transactional: !!transactionalId,
lastOffsetDelta: records.length - 1,
})
diff --git a/src/protocol/requests/produce/v3/request.spec.js b/src/protocol/requests/produce/v3/request.spec.js
index 49142b6d0..0b7bf060b 100644
--- a/src/protocol/requests/produce/v3/request.spec.js
+++ b/src/protocol/requests/produce/v3/request.spec.js
@@ -16,6 +16,7 @@ describe('Protocol > Requests > Produce > v3', () => {
partitions: [
{
partition: 0,
+ firstSequence: 0,
messages: [
{
key: 'key-9d0f348cb2e730e1edc4',
@@ -58,6 +59,7 @@ describe('Protocol > Requests > Produce > v3', () => {
partitions: [
{
partition: 0,
+ firstSequence: 10,
messages: [
{
key: 'key-9d0f348cb2e730e1edc4',