Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EoS #200 idempotent producer #203

Merged
merged 16 commits into from
Nov 15, 2018
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,8 @@ await producer.send({
| acks | Control the number of required acks. <br> __-1__ = all replicas must acknowledge _(default)_ <br> __0__ = no acknowledgments <br> __1__ = only waits for the leader to acknowledge | `-1` all replicas must acknowledge |
| timeout | The time to await a response in ms | `30000` |
| compression | Compression codec | `CompressionTypes.None` |
| ----------- |-------------------------------------------------------------------------------------------------- | ------- |
| idempotent | _Experimental._ If enabled producer will ensure each message is written exactly once. Acks _must_ be set to -1 ("all"). Retries will default to MAX_SAFE_INTEGER. | `false` |

By default, the producer is configured to distribute the messages with the following logic:

Expand Down
96 changes: 95 additions & 1 deletion src/broker/__tests__/produce.spec.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
const Broker = require('../index')
const COORDINATOR_TYPES = require('../../protocol/coordinatorTypes')
const { Types: Compression } = require('../../protocol/message/compression')
const { KafkaJSProtocolError } = require('../../errors')
const {
secureRandom,
createConnection,
Expand All @@ -17,12 +19,13 @@ describe('Broker > Produce', () => {
headers: { [`hkey-${secureRandom()}`]: `hvalue-${secureRandom()}` },
})

const createTopicData = (headers = false) => [
const createTopicData = ({ headers, firstSequence } = { headers: false, firstSequence: 0 }) => [
{
topic: topicName,
partitions: [
{
partition: 0,
firstSequence,
messages: [
{
key: `key-${secureRandom()}`,
Expand Down Expand Up @@ -203,6 +206,97 @@ describe('Broker > Produce', () => {
})
})

testIfKafka011('request with idempotent producer', async () => {
// Get producer id & epoch
const {
coordinator: { host, port },
} = await retryProtocol(
'GROUP_COORDINATOR_NOT_AVAILABLE',
async () =>
await broker.findGroupCoordinator({
groupId: `group-${secureRandom()}`,
coordinatorType: COORDINATOR_TYPES.GROUP,
})
)

const producerBroker = new Broker({
connection: createConnection({ host, port }),
logger: newLogger(),
})

await producerBroker.connect()
const result = await producerBroker.initProducerId({
transactionTimeout: 30000,
})

const producerId = result.producerId
const producerEpoch = result.producerEpoch

const metadata = await retryProtocol(
'LEADER_NOT_AVAILABLE',
async () => await broker.metadata([topicName])
)

// Find leader of partition
const partitionBroker = metadata.topicMetadata[0].partitionMetadata[0].leader
const newBrokerData = metadata.brokers.find(b => b.nodeId === partitionBroker)

// Connect to the correct broker to produce message
broker2 = new Broker({
connection: createConnection(newBrokerData),
logger: newLogger(),
allowExperimentalV011: true,
})
await broker2.connect()

const response1 = await retryProtocol(
'LEADER_NOT_AVAILABLE',
async () =>
await broker2.produce({
producerId,
producerEpoch,
topicData: createTopicData({ headers: false }),
})
)

expect(response1).toEqual({
topics: [
{
topicName,
partitions: [{ baseOffset: '0', errorCode: 0, logAppendTime: '-1', partition: 0 }],
},
],
throttleTime: 0,
})

// We have to syncronise the sequence number between the producer and the broker
await expect(
broker2.produce({
producerId,
producerEpoch,
topicData: createTopicData({ headers: false, firstSequence: 1 }), // Too small
})
).rejects.toEqual(
new KafkaJSProtocolError('The broker received an out of order sequence number')
)

await expect(
broker2.produce({
producerId,
producerEpoch,
topicData: createTopicData({ headers: false, firstSequence: 5 }), // Too big
})
).rejects.toEqual(
new KafkaJSProtocolError('The broker received an out of order sequence number')
)

await broker2.produce({
producerId,
producerEpoch,
topicData: createTopicData({ headers: false, firstSequence: 3 }), // Just right
})
})

testIfKafka011('request with headers', async () => {
const metadata = await retryProtocol(
'LEADER_NOT_AVAILABLE',
Expand Down
3 changes: 3 additions & 0 deletions src/broker/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,15 @@ module.exports = class Broker {
* partitions: [
* {
* partition: 0,
* firstSequence: 0,
* messages: [
* { key: '1', value: 'A' },
* { key: '2', value: 'B' },
* ]
* },
* {
* partition: 1,
* firstSequence: 0,
* messages: [
* { key: '3', value: 'C' },
* ]
Expand All @@ -178,6 +180,7 @@ module.exports = class Broker {
* partitions: [
* {
* partition: 4,
* firstSequence: 0,
* messages: [
* { key: '32', value: 'E' },
* ]
Expand Down
17 changes: 10 additions & 7 deletions src/producer/createTopicData.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
module.exports = topicDataForBroker => {
return topicDataForBroker.map(({ topic, partitions, messagesPerPartition }) => ({
topic,
partitions: partitions.map(partition => ({
partition,
messages: messagesPerPartition[partition],
})),
}))
return topicDataForBroker.map(
({ topic, partitions, messagesPerPartition, sequencePerPartition }) => ({
topic,
partitions: partitions.map(partition => ({
partition,
firstSequence: sequencePerPartition[partition],
messages: messagesPerPartition[partition],
})),
})
)
}
38 changes: 38 additions & 0 deletions src/producer/createTopicData.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
const createTopicData = require('./createTopicData')

describe('Producer > createTopicData', () => {
let topic, partitions, messagesPerPartition, sequencePerPartition

beforeEach(() => {
topic = 'test-topic'
partitions = [1, 2, 3]

messagesPerPartition = {
1: [{ key: '1' }],
2: [{ key: '2' }],
3: [{ key: '3' }, { key: '4' }],
}

sequencePerPartition = {
1: 0,
2: 5,
3: 10,
}
})

test('format data by topic and partition', () => {
const result = createTopicData([
{ topic, partitions, messagesPerPartition, sequencePerPartition },
])
expect(result).toEqual([
{
topic,
partitions: [
{ partition: 1, firstSequence: 0, messages: [{ key: '1' }] },
{ partition: 2, firstSequence: 5, messages: [{ key: '2' }] },
{ partition: 3, firstSequence: 10, messages: [{ key: '3' }, { key: '4' }] },
],
},
])
})
})
32 changes: 29 additions & 3 deletions src/producer/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ const createDefaultPartitioner = require('./partitioners/default')
const createSendMessages = require('./sendMessages')
const InstrumentationEventEmitter = require('../instrumentation/emitter')
const events = require('./instrumentationEvents')
const createTransactionManager = require('./transactionManager')
const { CONNECT, DISCONNECT } = require('./instrumentationEvents')
const { KafkaJSNonRetriableError } = require('../errors')

Expand All @@ -16,13 +17,28 @@ module.exports = ({
cluster,
logger: rootLogger,
createPartitioner = createDefaultPartitioner,
retry = { retries: 5 },
retry,
idempotent = false,
}) => {
retry = retry || (idempotent ? { retries: Number.MAX_SAFE_INTEGER } : { retries: 5 })

if (idempotent && retry.retries < 1) {
throw new KafkaJSNonRetriableError(
'Idempotent producer must allow retries to protect against transient errors'
)
}

const logger = rootLogger.namespace('Producer')

if (idempotent && retry.retries < Number.MAX_SAFE_INTEGER) {
logger.warn('Limiting retries for the idempotent producer may invalidate EoS guarantees')
}

const partitioner = createPartitioner()
const retrier = createRetry(Object.assign({}, cluster.retry, retry))
const instrumentationEmitter = new InstrumentationEventEmitter()
const logger = rootLogger.namespace('Producer')
const sendMessages = createSendMessages({ logger, cluster, partitioner })
const transactionManager = createTransactionManager({ logger, cluster })
const sendMessages = createSendMessages({ logger, cluster, partitioner, transactionManager })
Copy link
Contributor Author

@ianwsperber ianwsperber Nov 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently I pass transactionManager to the factory method. I do wonder if ultimately it will make more sense to provide a transaction manager in the sendMessages call. Could also allow us to make the transaction manager less stateful. Currently it doesn't really matter but it may be something to refactor in #173

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In what way would it make the transaction manager less stateful? Maybe I'm missing something, but passing it into the factory method seems much more appropriate than passing it with each sendMessage call.

Copy link
Contributor Author

@ianwsperber ianwsperber Nov 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're probably right @Nevon, it's just that once we introduce transactions the transaction manager will become a bit of a state machine and I was trying to think of strategies to minimize the state we have to manage.

One way of minimizing that would to be to have the transaction manager fetch the producer id on instantiation, so that there's no ambiguity about whether the idempotent producer has already be initialized. Since that fetch occur after connection that would probably mean passing the transaction manager into the send messages method, or lazily creating the sendMessages instance.

Ultimately I do think what I've implemented is the most straight forward solution, with the benefit that it closely mirrors how transactions are managed in other Kafka libraries. Thus far in practice I haven't had an issue working with the state machine (see preview here, just thinking through the alternatives.


/**
* @typedef {Object} TopicMessages
Expand All @@ -47,6 +63,12 @@ module.exports = ({
throw new KafkaJSNonRetriableError(`Invalid topic`)
}

if (idempotent && acks !== -1) {
throw new KafkaJSNonRetriableError(
`Not requiring ack for all messages invalidates the idempotent producer's EoS guarantees`
)
}

for (let { topic, messages } of topicMessages) {
if (!messages) {
throw new KafkaJSNonRetriableError(
Expand Down Expand Up @@ -158,6 +180,10 @@ module.exports = ({
connect: async () => {
await cluster.connect()
instrumentationEmitter.emit(CONNECT)

if (idempotent) {
await transactionManager.initProducerId()
}
},

/**
Expand Down
Loading