Skip to content

Commit

Permalink
Merge pull request #203 from tulios/eos-200-idempotent-producer
Browse files Browse the repository at this point in the history
EoS #200 idempotent producer
  • Loading branch information
tulios authored Nov 15, 2018
2 parents 095b6e0 + 413dc2f commit 5cf2fff
Show file tree
Hide file tree
Showing 14 changed files with 799 additions and 220 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,9 @@ await producer.send({
| acks | Control the number of required acks. <br> __-1__ = all replicas must acknowledge _(default)_ <br> __0__ = no acknowledgments <br> __1__ = only waits for the leader to acknowledge | `-1` all replicas must acknowledge |
| timeout | The time to await a response in ms | `30000` |
| compression | Compression codec | `CompressionTypes.None` |
| transactionTimeout | The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction. If this value is larger than the `transaction.max.timeout.ms` setting in the __broker__, the request will fail with a `InvalidTransactionTimeout` error | `60000` |
| ----------- |-------------------------------------------------------------------------------------------------- | ------- |
| idempotent | _Experimental._ If enabled producer will ensure each message is written exactly once. Acks _must_ be set to -1 ("all"). Retries will default to MAX_SAFE_INTEGER. | `false` |

By default, the producer is configured to distribute the messages with the following logic:

Expand Down
96 changes: 95 additions & 1 deletion src/broker/__tests__/produce.spec.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
const Broker = require('../index')
const COORDINATOR_TYPES = require('../../protocol/coordinatorTypes')
const { Types: Compression } = require('../../protocol/message/compression')
const { KafkaJSProtocolError } = require('../../errors')
const {
secureRandom,
createConnection,
Expand All @@ -17,12 +19,13 @@ describe('Broker > Produce', () => {
headers: { [`hkey-${secureRandom()}`]: `hvalue-${secureRandom()}` },
})

const createTopicData = (headers = false) => [
const createTopicData = ({ headers = false, firstSequence = 0 } = {}) => [
{
topic: topicName,
partitions: [
{
partition: 0,
firstSequence,
messages: [
{
key: `key-${secureRandom()}`,
Expand Down Expand Up @@ -203,6 +206,97 @@ describe('Broker > Produce', () => {
})
})

testIfKafka011('request with idempotent producer', async () => {
// Get producer id & epoch
const {
coordinator: { host, port },
} = await retryProtocol(
'GROUP_COORDINATOR_NOT_AVAILABLE',
async () =>
await broker.findGroupCoordinator({
groupId: `group-${secureRandom()}`,
coordinatorType: COORDINATOR_TYPES.GROUP,
})
)

const producerBroker = new Broker({
connection: createConnection({ host, port }),
logger: newLogger(),
})

await producerBroker.connect()
const result = await producerBroker.initProducerId({
transactionTimeout: 30000,
})

const producerId = result.producerId
const producerEpoch = result.producerEpoch

const metadata = await retryProtocol(
'LEADER_NOT_AVAILABLE',
async () => await broker.metadata([topicName])
)

// Find leader of partition
const partitionBroker = metadata.topicMetadata[0].partitionMetadata[0].leader
const newBrokerData = metadata.brokers.find(b => b.nodeId === partitionBroker)

// Connect to the correct broker to produce message
broker2 = new Broker({
connection: createConnection(newBrokerData),
logger: newLogger(),
allowExperimentalV011: true,
})
await broker2.connect()

const response1 = await retryProtocol(
'LEADER_NOT_AVAILABLE',
async () =>
await broker2.produce({
producerId,
producerEpoch,
topicData: createTopicData({ headers: false }),
})
)

expect(response1).toEqual({
topics: [
{
topicName,
partitions: [{ baseOffset: '0', errorCode: 0, logAppendTime: '-1', partition: 0 }],
},
],
throttleTime: 0,
})

// We have to syncronise the sequence number between the producer and the broker
await expect(
broker2.produce({
producerId,
producerEpoch,
topicData: createTopicData({ headers: false, firstSequence: 1 }), // Too small
})
).rejects.toEqual(
new KafkaJSProtocolError('The broker received an out of order sequence number')
)

await expect(
broker2.produce({
producerId,
producerEpoch,
topicData: createTopicData({ headers: false, firstSequence: 5 }), // Too big
})
).rejects.toEqual(
new KafkaJSProtocolError('The broker received an out of order sequence number')
)

await broker2.produce({
producerId,
producerEpoch,
topicData: createTopicData({ headers: false, firstSequence: 3 }), // Just right
})
})

testIfKafka011('request with headers', async () => {
const metadata = await retryProtocol(
'LEADER_NOT_AVAILABLE',
Expand Down
3 changes: 3 additions & 0 deletions src/broker/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,15 @@ module.exports = class Broker {
* partitions: [
* {
* partition: 0,
* firstSequence: 0,
* messages: [
* { key: '1', value: 'A' },
* { key: '2', value: 'B' },
* ]
* },
* {
* partition: 1,
* firstSequence: 0,
* messages: [
* { key: '3', value: 'C' },
* ]
Expand All @@ -178,6 +180,7 @@ module.exports = class Broker {
* partitions: [
* {
* partition: 4,
* firstSequence: 0,
* messages: [
* { key: '32', value: 'E' },
* ]
Expand Down
17 changes: 10 additions & 7 deletions src/producer/createTopicData.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
module.exports = topicDataForBroker => {
return topicDataForBroker.map(({ topic, partitions, messagesPerPartition }) => ({
topic,
partitions: partitions.map(partition => ({
partition,
messages: messagesPerPartition[partition],
})),
}))
return topicDataForBroker.map(
({ topic, partitions, messagesPerPartition, sequencePerPartition }) => ({
topic,
partitions: partitions.map(partition => ({
partition,
firstSequence: sequencePerPartition[partition],
messages: messagesPerPartition[partition],
})),
})
)
}
38 changes: 38 additions & 0 deletions src/producer/createTopicData.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
const createTopicData = require('./createTopicData')

describe('Producer > createTopicData', () => {
let topic, partitions, messagesPerPartition, sequencePerPartition

beforeEach(() => {
topic = 'test-topic'
partitions = [1, 2, 3]

messagesPerPartition = {
1: [{ key: '1' }],
2: [{ key: '2' }],
3: [{ key: '3' }, { key: '4' }],
}

sequencePerPartition = {
1: 0,
2: 5,
3: 10,
}
})

test('format data by topic and partition', () => {
const result = createTopicData([
{ topic, partitions, messagesPerPartition, sequencePerPartition },
])
expect(result).toEqual([
{
topic,
partitions: [
{ partition: 1, firstSequence: 0, messages: [{ key: '1' }] },
{ partition: 2, firstSequence: 5, messages: [{ key: '2' }] },
{ partition: 3, firstSequence: 10, messages: [{ key: '3' }, { key: '4' }] },
],
},
])
})
})
34 changes: 31 additions & 3 deletions src/producer/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ const createDefaultPartitioner = require('./partitioners/default')
const createSendMessages = require('./sendMessages')
const InstrumentationEventEmitter = require('../instrumentation/emitter')
const events = require('./instrumentationEvents')
const createTransactionManager = require('./transactionManager')
const { CONNECT, DISCONNECT } = require('./instrumentationEvents')
const { KafkaJSNonRetriableError } = require('../errors')

Expand All @@ -16,13 +17,29 @@ module.exports = ({
cluster,
logger: rootLogger,
createPartitioner = createDefaultPartitioner,
retry = { retries: 5 },
retry,
idempotent = false,
transactionTimeout,
}) => {
retry = retry || { retries: idempotent ? Number.MAX_SAFE_INTEGER : 5 }

if (idempotent && retry.retries < 1) {
throw new KafkaJSNonRetriableError(
'Idempotent producer must allow retries to protect against transient errors'
)
}

const logger = rootLogger.namespace('Producer')

if (idempotent && retry.retries < Number.MAX_SAFE_INTEGER) {
logger.warn('Limiting retries for the idempotent producer may invalidate EoS guarantees')
}

const partitioner = createPartitioner()
const retrier = createRetry(Object.assign({}, cluster.retry, retry))
const instrumentationEmitter = new InstrumentationEventEmitter()
const logger = rootLogger.namespace('Producer')
const sendMessages = createSendMessages({ logger, cluster, partitioner })
const transactionManager = createTransactionManager({ logger, cluster, transactionTimeout })
const sendMessages = createSendMessages({ logger, cluster, partitioner, transactionManager })

/**
* @typedef {Object} TopicMessages
Expand All @@ -36,6 +53,7 @@ module.exports = ({
* -1 = all replicas must acknowledge
* 0 = no acknowledgments
* 1 = only waits for the leader to acknowledge
*
* @property {number} [timeout=30000] The time to await a response in ms
* @property {Compression.Types} [compression=Compression.Types.None] Compression codec
*
Expand All @@ -47,6 +65,12 @@ module.exports = ({
throw new KafkaJSNonRetriableError(`Invalid topic`)
}

if (idempotent && acks !== -1) {
throw new KafkaJSNonRetriableError(
`Not requiring ack for all messages invalidates the idempotent producer's EoS guarantees`
)
}

for (let { topic, messages } of topicMessages) {
if (!messages) {
throw new KafkaJSNonRetriableError(
Expand Down Expand Up @@ -158,6 +182,10 @@ module.exports = ({
connect: async () => {
await cluster.connect()
instrumentationEmitter.emit(CONNECT)

if (idempotent && !transactionManager.isInitialized()) {
await transactionManager.initProducerId()
}
},

/**
Expand Down
Loading

0 comments on commit 5cf2fff

Please sign in to comment.