Skip to content

Commit

Permalink
Make specific Kafka 0.11 broker tests conditional
Browse files Browse the repository at this point in the history
  • Loading branch information
tulios committed Jun 23, 2018
1 parent d102f24 commit bb13c42
Show file tree
Hide file tree
Showing 3 changed files with 299 additions and 13 deletions.
194 changes: 186 additions & 8 deletions src/broker/__tests__/fetch.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const {
newLogger,
createTopic,
retryProtocol,
testIfKafka011,
} = require('testHelpers')
const { Types: Compression } = require('../../protocol/message/compression')

Expand All @@ -15,12 +16,32 @@ const maxWaitTime = 5
const timestamp = 1509827900073

describe('Broker > Fetch', () => {
let topicName, seedBroker, broker
let topicName, seedBroker, broker, newBrokerData

const createMessages = (n = 0) => [
{ key: `key-${n}`, value: `some-value-${n}`, timestamp },
{ key: `key-${n + 1}`, value: `some-value-${n + 1}`, timestamp },
{ key: `key-${n + 2}`, value: `some-value-${n + 2}`, timestamp },
const headerFor = message => {
const keys = Object.keys(message.headers)
return { [keys[0]]: Buffer.from(message.headers[keys[0]]) }
}

const createMessages = (n = 0, headers = false) => [
{
key: `key-${n}`,
value: `some-value-${n}`,
timestamp,
...(!headers ? {} : { headers: { [`header-key-${n}`]: `header-value-${n}` } }),
},
{
key: `key-${n + 1}`,
value: `some-value-${n + 1}`,
timestamp,
...(!headers ? {} : { headers: { [`header-key-${n + 1}`]: `header-value-${n + 1}` } }),
},
{
key: `key-${n + 2}`,
value: `some-value-${n + 2}`,
timestamp,
...(!headers ? {} : { headers: { [`header-key-${n + 2}`]: `header-value-${n + 2}` } }),
},
]

const createTopicData = (partition, messages) => [
Expand Down Expand Up @@ -51,7 +72,7 @@ describe('Broker > Fetch', () => {

// Find leader of partition
const partitionBroker = metadata.topicMetadata[0].partitionMetadata[0].leader
const newBrokerData = metadata.brokers.find(b => b.nodeId === partitionBroker)
newBrokerData = metadata.brokers.find(b => b.nodeId === partitionBroker)

// Connect to the correct broker to produce message
broker = new Broker({
Expand Down Expand Up @@ -140,7 +161,6 @@ describe('Broker > Fetch', () => {
],
})

createMessages()
topicData = createTopicData(targetPartition, createMessages(1))
await broker.produce({ topicData })
fetchResponse = await broker.fetch({ maxWaitTime, minBytes, maxBytes, topics })
Expand Down Expand Up @@ -215,10 +235,168 @@ describe('Broker > Fetch', () => {
],
})

createMessages()
topicData = createTopicData(targetPartition, createMessages(1))
await broker.produce({ topicData, compression: Compression.GZIP })
fetchResponse = await broker.fetch({ maxWaitTime, minBytes, maxBytes, topics })
expect(fetchResponse.responses[0].partitions[0].highWatermark).toEqual('6')
})

describe('Record batch', () => {
beforeEach(async () => {
await broker.disconnect()

broker = new Broker({
connection: createConnection(newBrokerData),
logger: newLogger(),
allowExperimentalV011: true,
})
await broker.connect()
})

testIfKafka011('request', async () => {
const targetPartition = 0
const messages = createMessages()
let topicData = createTopicData(targetPartition, messages)
await broker.produce({ topicData })

const topics = [
{
topic: topicName,
partitions: [
{
partition: targetPartition,
fetchOffset: 0,
maxBytes: maxBytesPerPartition,
},
],
},
]

let fetchResponse = await broker.fetch({ maxWaitTime, minBytes, maxBytes, topics })
expect(fetchResponse).toEqual({
throttleTime: 0,
responses: [
{
topicName,
partitions: [
{
abortedTransactions: [],
errorCode: 0,
highWatermark: '3',
lastStableOffset: '3',
partition: 0,
messages: [
{
magicByte: 2,
attributes: 0,
offset: '0',
timestamp: '1509827900073',
headers: {},
key: Buffer.from(messages[0].key),
value: Buffer.from(messages[0].value),
},
{
magicByte: 2,
attributes: 0,
offset: '1',
timestamp: '1509827900073',
headers: {},
key: Buffer.from(messages[1].key),
value: Buffer.from(messages[1].value),
},
{
magicByte: 2,
attributes: 0,
offset: '2',
timestamp: '1509827900073',
headers: {},
key: Buffer.from(messages[2].key),
value: Buffer.from(messages[2].value),
},
],
},
],
},
],
})

topicData = createTopicData(targetPartition, createMessages(1))
await broker.produce({ topicData })
fetchResponse = await broker.fetch({ maxWaitTime, minBytes, maxBytes, topics })
expect(fetchResponse.responses[0].partitions[0].highWatermark).toEqual('6')
})

testIfKafka011('request with headers', async () => {
const targetPartition = 0
const messages = createMessages(0, true)
let topicData = createTopicData(targetPartition, messages)
await broker.produce({ topicData })

const topics = [
{
topic: topicName,
partitions: [
{
partition: targetPartition,
fetchOffset: 0,
maxBytes: maxBytesPerPartition,
},
],
},
]

let fetchResponse = await broker.fetch({ maxWaitTime, minBytes, maxBytes, topics })
expect(fetchResponse).toEqual({
throttleTime: 0,
responses: [
{
topicName,
partitions: [
{
abortedTransactions: [],
errorCode: 0,
highWatermark: '3',
lastStableOffset: '3',
partition: 0,
messages: [
{
magicByte: 2,
attributes: 0,
offset: '0',
timestamp: '1509827900073',
headers: headerFor(messages[0]),
key: Buffer.from(messages[0].key),
value: Buffer.from(messages[0].value),
},
{
magicByte: 2,
attributes: 0,
offset: '1',
timestamp: '1509827900073',
headers: headerFor(messages[1]),
key: Buffer.from(messages[1].key),
value: Buffer.from(messages[1].value),
},
{
magicByte: 2,
attributes: 0,
offset: '2',
timestamp: '1509827900073',
headers: headerFor(messages[2]),
key: Buffer.from(messages[2].key),
value: Buffer.from(messages[2].value),
},
],
},
],
},
],
})

topicData = createTopicData(targetPartition, createMessages(1, true))
await broker.produce({ topicData })
fetchResponse = await broker.fetch({ maxWaitTime, minBytes, maxBytes, topics })
expect(fetchResponse.responses[0].partitions[0].highWatermark).toEqual('6')
})
})
})
111 changes: 106 additions & 5 deletions src/broker/__tests__/produce.spec.js
Original file line number Diff line number Diff line change
@@ -1,21 +1,46 @@
const Broker = require('../index')
const { Types: Compression } = require('../../protocol/message/compression')
const { secureRandom, createConnection, newLogger, createTopic } = require('testHelpers')
const {
secureRandom,
createConnection,
newLogger,
createTopic,
testIfKafka011,
} = require('testHelpers')

describe('Broker > Produce', () => {
let topicName, broker, broker2
const timestamp = 1509928155660

const createTopicData = () => [
const createHeader = () => ({
headers: { [`hkey-${secureRandom()}`]: `hvalue-${secureRandom()}` },
})

const createTopicData = (headers = false) => [
{
topic: topicName,
partitions: [
{
partition: 0,
messages: [
{ key: `key-${secureRandom()}`, value: `some-value-${secureRandom()}`, timestamp },
{ key: `key-${secureRandom()}`, value: `some-value-${secureRandom()}`, timestamp },
{ key: `key-${secureRandom()}`, value: `some-value-${secureRandom()}`, timestamp },
{
key: `key-${secureRandom()}`,
value: `some-value-${secureRandom()}`,
timestamp,
...(headers ? createHeader() : {}),
},
{
key: `key-${secureRandom()}`,
value: `some-value-${secureRandom()}`,
timestamp,
...(headers ? createHeader() : {}),
},
{
key: `key-${secureRandom()}`,
value: `some-value-${secureRandom()}`,
timestamp,
...(headers ? createHeader() : {}),
},
],
},
],
Expand Down Expand Up @@ -115,4 +140,80 @@ describe('Broker > Produce', () => {
throttleTime: 0,
})
})

describe('Record batch', () => {
testIfKafka011('request', async () => {
const metadata = await broker.metadata([topicName])
// Find leader of partition
const partitionBroker = metadata.topicMetadata[0].partitionMetadata[0].leader
const newBrokerData = metadata.brokers.find(b => b.nodeId === partitionBroker)

// Connect to the correct broker to produce message
broker2 = new Broker({
connection: createConnection(newBrokerData),
logger: newLogger(),
allowExperimentalV011: true,
})
await broker2.connect()

const response1 = await broker2.produce({ topicData: createTopicData() })
expect(response1).toEqual({
responses: [
{
topicName,
partitions: [{ baseOffset: '0', errorCode: 0, logAppendTime: '-1', partition: 0 }],
},
],
throttleTime: 0,
})

const response2 = await broker2.produce({ topicData: createTopicData() })
expect(response2).toEqual({
responses: [
{
topicName,
partitions: [{ baseOffset: '3', errorCode: 0, logAppendTime: '-1', partition: 0 }],
},
],
throttleTime: 0,
})
})

testIfKafka011('request with headers', async () => {
const metadata = await broker.metadata([topicName])
// Find leader of partition
const partitionBroker = metadata.topicMetadata[0].partitionMetadata[0].leader
const newBrokerData = metadata.brokers.find(b => b.nodeId === partitionBroker)

// Connect to the correct broker to produce message
broker2 = new Broker({
connection: createConnection(newBrokerData),
logger: newLogger(),
allowExperimentalV011: true,
})
await broker2.connect()

const response1 = await broker2.produce({ topicData: createTopicData(true) })
expect(response1).toEqual({
responses: [
{
topicName,
partitions: [{ baseOffset: '0', errorCode: 0, logAppendTime: '-1', partition: 0 }],
},
],
throttleTime: 0,
})

const response2 = await broker2.produce({ topicData: createTopicData() })
expect(response2).toEqual({
responses: [
{
topicName,
partitions: [{ baseOffset: '3', errorCode: 0, logAppendTime: '-1', partition: 0 }],
},
],
throttleTime: 0,
})
})
})
})
7 changes: 7 additions & 0 deletions testHelpers/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ const createTopic = async ({ topic, partitions = 1 }) => {
}
}

const testIfKafka011 = (description, callback) => {
return process.env.KAFKA_VERSION === '0.11'
? test(description, callback)
: test.skip(description, callback)
}

module.exports = {
secureRandom,
connectionOpts,
Expand All @@ -150,4 +156,5 @@ module.exports = {
createTopic,
waitFor: testWaitFor,
waitForMessages,
testIfKafka011,
}

0 comments on commit bb13c42

Please sign in to comment.