Skip to content

Commit

Permalink
Merge pull request #388 from tulios/test-kafka-2.2
Browse files Browse the repository at this point in the history
Add Kafka 2.2 and use as default in pipeline
  • Loading branch information
tulios authored Jun 12, 2019
2 parents b5a576d + f40ed1f commit eae5ea4
Show file tree
Hide file tree
Showing 11 changed files with 218 additions and 20 deletions.
4 changes: 2 additions & 2 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ pr:
variables:
- group: website_secrets
- name: KAFKA_VERSION
value: 1.1
value: 2.2
- name: COMPOSE_FILE
value: docker-compose.1_1.yml
value: docker-compose.2_2.yml

####### Linter
jobs:
Expand Down
146 changes: 146 additions & 0 deletions docker-compose.2_2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/server-jaas.conf -Dzookeeper.authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider"
volumes:
- ./testHelpers/kafka/server-jaas.conf:/etc/kafka/server-jaas.conf

kafka1:
image: confluentinc/cp-kafka:5.2.1
hostname: kafka1
container_name: kafka1
labels:
- "custom.project=kafkajs"
- "custom.service=kafka1"
depends_on:
- zookeeper
ports:
- "29092:29092"
- "9092:9092"
- "29093:29093"
- "9093:9093"
- "29094:29094"
- "9094:9094"
environment:
KAFKA_BROKER_ID: 0
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SSL:SSL,SSL_HOST:SSL,SASL_SSL:SASL_SSL,SASL_SSL_HOST:SASL_SSL
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://localhost:9092,SSL://kafka1:29093,SSL_HOST://localhost:9093,SASL_SSL://kafka1:29094,SASL_SSL_HOST://localhost:9094
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_DELETE_TOPIC_ENABLE: 'true'
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_SSL_KEYSTORE_FILENAME: "kafka.server.keystore.jks"
KAFKA_SSL_KEYSTORE_CREDENTIALS: "keystore_creds"
KAFKA_SSL_KEY_CREDENTIALS: "sslkey_creds"
KAFKA_SSL_TRUSTSTORE_FILENAME: "kafka.server.truststore.jks"
KAFKA_SSL_TRUSTSTORE_CREDENTIALS: "truststore_creds"
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: "PLAIN"
KAFKA_SASL_ENABLED_MECHANISMS: "PLAIN,SCRAM-SHA-256,SCRAM-SHA-512"
KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/server-jaas.conf"
# suppress verbosity
# https://github.com/confluentinc/cp-docker-images/blob/master/debian/kafka/include/etc/confluent/docker/log4j.properties.template
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- ./testHelpers/certs/kafka.server.keystore.jks:/etc/kafka/secrets/kafka.server.keystore.jks
- ./testHelpers/certs/kafka.server.truststore.jks:/etc/kafka/secrets/kafka.server.truststore.jks
- ./testHelpers/certs/keystore_creds:/etc/kafka/secrets/keystore_creds
- ./testHelpers/certs/sslkey_creds:/etc/kafka/secrets/sslkey_creds
- ./testHelpers/certs/truststore_creds:/etc/kafka/secrets/truststore_creds
- ./testHelpers/kafka/server-jaas.conf:/opt/kafka/config/server-jaas.conf

kafka2:
image: confluentinc/cp-kafka:5.2.1
hostname: kafka2
container_name: kafka2
labels:
- "custom.project=kafkajs"
- "custom.service=kafka2"
depends_on:
- zookeeper
ports:
- "29095:29095"
- "9095:9095"
- "29096:29096"
- "9096:9096"
- "29097:29097"
- "9097:9097"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SSL:SSL,SSL_HOST:SSL,SASL_SSL:SASL_SSL,SASL_SSL_HOST:SASL_SSL
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:29095,PLAINTEXT_HOST://localhost:9095,SSL://kafka2:29096,SSL_HOST://localhost:9096,SASL_SSL://kafka2:29097,SASL_SSL_HOST://localhost:9097
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_DELETE_TOPIC_ENABLE: 'true'
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_SSL_KEYSTORE_FILENAME: "kafka.server.keystore.jks"
KAFKA_SSL_KEYSTORE_CREDENTIALS: "keystore_creds"
KAFKA_SSL_KEY_CREDENTIALS: "sslkey_creds"
KAFKA_SSL_TRUSTSTORE_FILENAME: "kafka.server.truststore.jks"
KAFKA_SSL_TRUSTSTORE_CREDENTIALS: "truststore_creds"
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: "PLAIN"
KAFKA_SASL_ENABLED_MECHANISMS: "PLAIN,SCRAM-SHA-256,SCRAM-SHA-512"
KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/server-jaas.conf"
# suppress verbosity
# https://github.com/confluentinc/cp-docker-images/blob/master/debian/kafka/include/etc/confluent/docker/log4j.properties.template
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- ./testHelpers/certs/kafka.server.keystore.jks:/etc/kafka/secrets/kafka.server.keystore.jks
- ./testHelpers/certs/kafka.server.truststore.jks:/etc/kafka/secrets/kafka.server.truststore.jks
- ./testHelpers/certs/keystore_creds:/etc/kafka/secrets/keystore_creds
- ./testHelpers/certs/sslkey_creds:/etc/kafka/secrets/sslkey_creds
- ./testHelpers/certs/truststore_creds:/etc/kafka/secrets/truststore_creds
- ./testHelpers/kafka/server-jaas.conf:/opt/kafka/config/server-jaas.conf

kafka3:
image: confluentinc/cp-kafka:5.2.1
hostname: kafka3
container_name: kafka3
labels:
- "custom.project=kafkajs"
- "custom.service=kafka3"
depends_on:
- zookeeper
ports:
- "29098:29098"
- "9098:9098"
- "29099:29099"
- "9099:9099"
- "29100:29100"
- "9100:9100"
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SSL:SSL,SSL_HOST:SSL,SASL_SSL:SASL_SSL,SASL_SSL_HOST:SASL_SSL
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:29098,PLAINTEXT_HOST://localhost:9098,SSL://kafka3:29099,SSL_HOST://localhost:9099,SASL_SSL://kafka3:29100,SASL_SSL_HOST://localhost:9100
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_DELETE_TOPIC_ENABLE: 'true'
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_SSL_KEYSTORE_FILENAME: "kafka.server.keystore.jks"
KAFKA_SSL_KEYSTORE_CREDENTIALS: "keystore_creds"
KAFKA_SSL_KEY_CREDENTIALS: "sslkey_creds"
KAFKA_SSL_TRUSTSTORE_FILENAME: "kafka.server.truststore.jks"
KAFKA_SSL_TRUSTSTORE_CREDENTIALS: "truststore_creds"
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: "PLAIN"
KAFKA_SASL_ENABLED_MECHANISMS: "PLAIN,SCRAM-SHA-256,SCRAM-SHA-512"
KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/server-jaas.conf"
# suppress verbosity
# https://github.com/confluentinc/cp-docker-images/blob/master/debian/kafka/include/etc/confluent/docker/log4j.properties.template
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- ./testHelpers/certs/kafka.server.keystore.jks:/etc/kafka/secrets/kafka.server.keystore.jks
- ./testHelpers/certs/kafka.server.truststore.jks:/etc/kafka/secrets/kafka.server.truststore.jks
- ./testHelpers/certs/keystore_creds:/etc/kafka/secrets/keystore_creds
- ./testHelpers/certs/sslkey_creds:/etc/kafka/secrets/sslkey_creds
- ./testHelpers/certs/truststore_creds:/etc/kafka/secrets/truststore_creds
- ./testHelpers/kafka/server-jaas.conf:/opt/kafka/config/server-jaas.conf
6 changes: 3 additions & 3 deletions docs/DevelopmentEnvironment.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ For testing KafkaJS we use a multi-broker Kafka cluster as well as Zookeeper for
This boots the Kafka cluster using the default docker-compose.yml file described in [scripts/dockerComposeUp.sh](https://github.com/tulios/kafkajs/blob/master/scripts/dockerComposeUp.sh). If you want to run a different version of Kafka, specify a different compose file using the `COMPOSE_FILE` environment variable:

```sh
COMPOSE_FILE="docker-compose.1_1.yml" ./scripts/dockerComposeUp.sh
COMPOSE_FILE="docker-compose.2_2.yml" ./scripts/dockerComposeUp.sh
```

If you run `docker-compose -f docker-compose.1_1.yml ps` (specify whichever compose file you used in the step above), you should see something like:
If you run `docker-compose -f docker-compose.2_2.yml ps` (specify whichever compose file you used in the step above), you should see something like:

```sh
$ docker-compose -f docker-compose.1_1.yml ps
$ docker-compose -f docker-compose.2_2.yml ps
WARNING: The HOST_IP variable is not set. Defaulting to a blank string.
Name Command State Ports
----------------------------------------------------------------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion docs/DockerLocal.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,4 @@ You will now be able to connect to your Kafka broker at `$(HOST_IP):9092`. See t

## SSL & authentication methods

To configure Kafka to use SSL and/or authentication methods such as SASL, see [docker-compose.yml](https://github.com/tulios/kafkajs/blob/master/docker-compose.1_1.yml). This configuration is used while developing KafkaJS, and is more complicated to set up, but may give you a more production-like development environment.
To configure Kafka to use SSL and/or authentication methods such as SASL, see [docker-compose.yml](https://github.com/tulios/kafkajs/blob/master/docker-compose.2_2.yml). This configuration is used while developing KafkaJS, and is more complicated to set up, but may give you a more production-like development environment.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
},
"homepage": "https://kafka.js.org",
"scripts": {
"jest": "export KAFKA_VERSION=${KAFKA_VERSION:='1.1'} && NODE_ENV=test echo \"KAFKA_VERSION: ${KAFKA_VERSION}\" && KAFKAJS_DEBUG_PROTOCOL_BUFFERS=1 ./node_modules/.bin/jest",
"jest": "export KAFKA_VERSION=${KAFKA_VERSION:='2.2'} && NODE_ENV=test echo \"KAFKA_VERSION: ${KAFKA_VERSION}\" && KAFKAJS_DEBUG_PROTOCOL_BUFFERS=1 ./node_modules/.bin/jest",
"test:local": "yarn jest --detectOpenHandles",
"test:debug": "NODE_ENV=test KAFKAJS_DEBUG_PROTOCOL_BUFFERS=1 node --inspect-brk node_modules/.bin/jest --detectOpenHandles --runInBand --watch",
"test:local:watch": "yarn test:local --watch",
Expand Down
2 changes: 1 addition & 1 deletion scripts/dockerComposeUp.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/bash -e

COMPOSE_FILE=${COMPOSE_FILE:="docker-compose.1_1.yml"}
COMPOSE_FILE=${COMPOSE_FILE:="docker-compose.2_2.yml"}

echo "Running compose file: ${COMPOSE_FILE}:"
docker-compose -f "${COMPOSE_FILE}" up --force-recreate -d
Expand Down
2 changes: 1 addition & 1 deletion scripts/testWithKafka.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
testCommand="$1"
extraArgs="$2"

export COMPOSE_FILE=${COMPOSE_FILE:="docker-compose.1_1.yml"}
export COMPOSE_FILE=${COMPOSE_FILE:="docker-compose.2_2.yml"}
export KAFKAJS_DEBUG_PROTOCOL_BUFFERS=${KAFKAJS_DEBUG_PROTOCOL_BUFFERS:=1}

find_container_id() {
Expand Down
2 changes: 1 addition & 1 deletion src/broker/__tests__/describeConfigs.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ describe('Broker > describeConfigs', () => {
},
{
configName: 'message.format.version',
configValue: expect.stringMatching(/^(0\.11\.0-IV2|1\.1-IV0)$/),
configValue: expect.stringMatching(/^(0\.11\.0-IV2|1\.1-IV0|2\.2-IV1)$/),
isDefault: false,
isSensitive: false,
readOnly: false,
Expand Down
10 changes: 8 additions & 2 deletions src/broker/__tests__/fetch.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,11 @@ describe('Broker > Fetch', () => {
abortedTransactions: [], // None of these messages have been aborted yet
errorCode: 0,
highWatermark: '3', // Number of produced messages
lastStableOffset: '-1', // None
// The end offset of a partition for a read_committed consumer would be the
// offset of the first message in the partition belonging to an open transaction.
//
// Note: In version < 2 this was '-1'
lastStableOffset: '0',
lastStartOffset: '0',
partition: 0,
messages: [
Expand Down Expand Up @@ -802,6 +806,8 @@ describe('Broker > Fetch', () => {

await txn.abort()

// Although the messages are aborted, they will still be there in the log,
// so the offset will still increase
topics[0].partitions[0].fetchOffset = 3

// It appears there can be a delay between the EndTxn response
Expand Down Expand Up @@ -829,7 +835,7 @@ describe('Broker > Fetch', () => {
abortedTransactions: [],
errorCode: 0,
highWatermark: '4',
lastStableOffset: '-1',
lastStableOffset: '4',
lastStartOffset: '0',
partition: 0,
messages: [
Expand Down
60 changes: 53 additions & 7 deletions src/producer/eosManager/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ const INIT_PRODUCER_RETRIABLE_PROTOCOL_ERRORS = [
*/
'CONCURRENT_TRANSACTIONS',
]
const COMMIT_RETRIABLE_PROTOCOL_ERRORS = [
'UNKNOWN_TOPIC_OR_PARTITION',
'COORDINATOR_LOAD_IN_PROGRESS',
]
const COMMIT_STALE_COORDINATOR_PROTOCOL_ERRORS = ['COORDINATOR_NOT_AVAILABLE', 'NOT_COORDINATOR']

/**
* Manage behavior for an idempotent producer and transactions.
Expand Down Expand Up @@ -323,17 +328,58 @@ module.exports = ({
groupId: consumerGroupId,
})

const groupCoordinator = await cluster.findGroupCoordinator({
let groupCoordinator = await cluster.findGroupCoordinator({
groupId: consumerGroupId,
coordinatorType: COORDINATOR_TYPES.GROUP,
})

await groupCoordinator.txnOffsetCommit({
transactionalId,
producerId,
producerEpoch,
groupId: consumerGroupId,
topics,
return retrier(async (bail, retryCount, retryTime) => {
try {
await groupCoordinator.txnOffsetCommit({
transactionalId,
producerId,
producerEpoch,
groupId: consumerGroupId,
topics,
})
} catch (e) {
if (COMMIT_RETRIABLE_PROTOCOL_ERRORS.includes(e.type)) {
logger.debug('Group coordinator is not ready yet, retrying', {
error: e.message,
stack: e.stack,
transactionalId,
retryCount,
retryTime,
})

throw e
}

if (
COMMIT_STALE_COORDINATOR_PROTOCOL_ERRORS.includes(e.type) ||
e.code === 'ECONNREFUSED'
) {
logger.debug(
'Invalid group coordinator, finding new group coordinator and retrying',
{
error: e.message,
stack: e.stack,
transactionalId,
retryCount,
retryTime,
}
)

groupCoordinator = await cluster.findGroupCoordinator({
groupId: consumerGroupId,
coordinatorType: COORDINATOR_TYPES.GROUP,
})

throw e
}

bail(e)
}
})
},
},
Expand Down
2 changes: 1 addition & 1 deletion src/producer/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -860,7 +860,7 @@ describe('Producer', () => {

const markOffsetAsCommittedSpy = jest.spyOn(cluster, 'markOffsetAsCommitted')

await createTopic({ topic: topicName })
await createTopic({ topic: topicName, partitions: 2 })

producer = createProducer({
cluster,
Expand Down

0 comments on commit eae5ea4

Please sign in to comment.