Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 29 additions & 69 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,22 @@ commands:
- save_cache:
name: Save Cargo Cache
paths:
- /usr/local/cargo/registry
- ~/.cargo/registry
key: cargo-cache-{{ arch }}-{{ .Branch }}-{{ checksum "Cargo.toml" }}

install_packages:
description: Install zlib-dev
description: Install packages
steps:
- run:
name: Install zlib-dev
name: Install packages
environment:
# Keep Ubuntu from doing stupid things
DEBIAN_FRONTEND: noninteractive
NEEDRESTART_MODE: l
command: |
sudo apt-get update
sudo apt-get install -y zlib1g-dev
sudo apt-get install -y clang docker-compose zlib1g-dev
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y

jobs:
fmt:
Expand Down Expand Up @@ -186,7 +191,9 @@ jobs:
- --set redpanda.auto_create_topics_enabled=false
- image: serjs/go-socks5-proxy
name: proxy
resource_class: xlarge # use of a smaller executor tends crashes on link
environment:
- REQUIRE_AUTH=false
resource_class: xlarge # use of a smaller executor tends crashes on link
environment:
# Disable incremental compilation to avoid overhead. We are not preserving these files anyway.
CARGO_INCREMENTAL: "0"
Expand All @@ -207,9 +214,9 @@ jobs:
SOCKS_PROXY: "proxy:1080"
steps:
- checkout
- install_packages
- rust_components
- cache_restore
- install_packages
- run:
name: Cargo test
command: cargo test --all-features --all-targets
Expand All @@ -222,59 +229,9 @@ jobs:
path: proptest-regressions

test-kafka:
# setup multiple docker images (see https://circleci.com/docs/2.0/configuration-reference/#docker)
docker:
- image: quay.io/influxdb/rust:ci
- image: docker.io/bitnami/zookeeper:3.7
name: zookeeper
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
- image: docker.io/bitnami/kafka:3.9.0
name: kafka-0
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CFG_BROKER_ID=0
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT,SECURE:SASL_PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093,SECURE://:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-0:9092,EXTERNAL://kafka-0:9093,SECURE://kafka-0:9094
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
- KAFKA_CLIENT_USERS=admin
- KAFKA_CLIENT_PASSWORDS=admin-secret
- KAFKA_CLIENT_LISTENER_NAME=SECURE
- image: docker.io/bitnami/kafka:3.9.0
name: kafka-1
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CFG_BROKER_ID=1
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT,SECURE:SASL_PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093,SECURE://:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-1:9092,EXTERNAL://kafka-1:9093,SECURE://kafka-1:9094
- KAFKA_CFG_SASL_ENABLED_MECHANISMS=PLAIN
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
- KAFKA_CLIENT_USERS=admin
- KAFKA_CLIENT_PASSWORDS=admin-secret
- KAFKA_CLIENT_LISTENER_NAME=SECURE
- image: docker.io/bitnami/kafka:3.9.0
name: kafka-2
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CFG_BROKER_ID=2
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT,SECURE:SASL_PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093,SECURE://:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-2:9092,EXTERNAL://kafka-2:9093,SECURE://kafka-2:9094
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
- KAFKA_CLIENT_USERS=admin
- KAFKA_CLIENT_PASSWORDS=admin-secret
- KAFKA_CLIENT_LISTENER_NAME=SECURE
- image: serjs/go-socks5-proxy
name: proxy
resource_class: xlarge # use of a smaller executor tends crashes on link
machine:
image: default
resource_class: xlarge # use of a smaller executor tends crashes on link
environment:
# Disable incremental compilation to avoid overhead. We are not preserving these files anyway.
CARGO_INCREMENTAL: "0"
Expand All @@ -291,14 +248,18 @@ jobs:
TEST_JAVA_INTEROPT: 1
# Don't use the first node here since this is likely the controller and we want to ensure that we automatically
# pick the controller for certain actions (e.g. topic creation) and don't just get lucky.
KAFKA_CONNECT: "invalid:9093,kafka-1:9093"
KAFKA_SASL_CONNECT: kafka-1:9094
SOCKS_PROXY: "proxy:1080"
KAFKA_CONNECT: "invalid:9093,kafka-1:9021"
KAFKA_SASL_CONNECT: kafka-1:9097
SOCKS_PROXY: "localhost:1080"
steps:
- checkout
- install_packages
- rust_components
- cache_restore
- install_packages
- run:
name: start container
command: docker-compose -f docker-compose-kafka.yml up
background: true
- run:
name: Cargo test
command: cargo test --all-features --all-targets
Expand All @@ -314,7 +275,7 @@ jobs:
build-default-features:
docker:
- image: quay.io/influxdb/rust:ci
resource_class: xlarge # use of a smaller executor tends crashes on link
resource_class: xlarge # use of a smaller executor tends crashes on link
environment:
# Disable incremental compilation to avoid overhead. We are not preserving these files anyway.
CARGO_INCREMENTAL: "0"
Expand All @@ -336,7 +297,7 @@ jobs:
build-no-default-features:
docker:
- image: quay.io/influxdb/rust:ci
resource_class: xlarge # use of a smaller executor tends crashes on link
resource_class: xlarge # use of a smaller executor tends crashes on link
environment:
# Disable incremental compilation to avoid overhead. We are not preserving these files anyway.
CARGO_INCREMENTAL: "0"
Expand All @@ -358,7 +319,7 @@ jobs:
build-all-features:
docker:
- image: quay.io/influxdb/rust:ci
resource_class: xlarge # use of a smaller executor tends crashes on link
resource_class: xlarge # use of a smaller executor tends crashes on link
environment:
# Disable incremental compilation to avoid overhead. We are not preserving these files anyway.
CARGO_INCREMENTAL: "0"
Expand All @@ -376,12 +337,11 @@ jobs:
command: cargo build --all-features
- cache_save


# Builds fuzzing.
build-fuzz:
docker:
- image: quay.io/influxdb/rust:ci
resource_class: xlarge # use of a smaller executor tends crashes on link
resource_class: xlarge # use of a smaller executor tends crashes on link
environment:
# Disable incremental compilation to avoid overhead. We are not preserving these files anyway.
CARGO_INCREMENTAL: "0"
Expand Down Expand Up @@ -409,7 +369,7 @@ jobs:
run-fuzz:
docker:
- image: quay.io/influxdb/rust:ci
resource_class: xlarge # use of a smaller executor tends crashes on link
resource_class: xlarge # use of a smaller executor tends crashes on link
environment:
# Disable incremental compilation to avoid overhead. We are not preserving these files anyway.
CARGO_INCREMENTAL: "0"
Expand Down
100 changes: 44 additions & 56 deletions docker-compose-kafka.yml
Original file line number Diff line number Diff line change
@@ -1,96 +1,84 @@
---
version: "3"

services:
zookeeper:
image: docker.io/bitnami/zookeeper:3.7
ports:
- "2181:2181"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
volumes:
- zookeeper_data:/bitnami/zookeeper
kafka-0:
image: docker.io/bitnami/kafka:3.9.0
build:
context: docker
dockerfile: Dockerfile.kafka
ports:
- "9010:9010"
- "9096:9096"
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CFG_BROKER_ID=0
- CLUSTER_ID='4L6g3nShT-eMCtK--X86sw'
- KAFKA_NODE_ID=0
- KAFKA_PROCESS_ROLES=broker,controller
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT,FOR_PROXY:PLAINTEXT,SECURE:SASL_PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9000,EXTERNAL://:9010,FOR_PROXY://:9020,SECURE://:9096
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-0:9000,EXTERNAL://localhost:9010,FOR_PROXY://kafka-0:9020,SECURE://localhost:9096
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT,FOR_PROXY:PLAINTEXT,SECURE:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT
- KAFKA_LISTENERS=CLIENT://:9000,EXTERNAL://:9010,FOR_PROXY://:9020,SECURE://:9096,CONTROLLER://:9050
- KAFKA_ADVERTISED_LISTENERS=CLIENT://kafka-0:9000,EXTERNAL://localhost:9010,FOR_PROXY://kafka-0:9020,SECURE://localhost:9096
- KAFKA_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9050,1@kafka-1:9051,2@kafka-2:9052
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
- KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_AUTO_CREATE_TOPICS_ENABLE=false
- KAFKA_CLIENT_USERS=admin
- KAFKA_CLIENT_PASSWORDS=admin-secret
- KAFKA_CLIENT_LISTENER_NAME=SECURE
volumes:
- kafka_0_data:/bitnami/kafka
- ./kafka_jaas.conf:/opt/bitnami/kafka/config/kafka_jaas.conf
depends_on:
- zookeeper
- KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/kafka_jaas.conf
- KAFKA_SASL_ENABLED_MECHANISMS=PLAIN
kafka-1:
image: docker.io/bitnami/kafka:3.9.0
build:
context: docker
dockerfile: Dockerfile.kafka
ports:
- "9011:9011"
- "9097:9097"
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CFG_BROKER_ID=1
- CLUSTER_ID='4L6g3nShT-eMCtK--X86sw'
- KAFKA_NODE_ID=1
- KAFKA_PROCESS_ROLES=broker,controller
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT,FOR_PROXY:PLAINTEXT,SECURE:SASL_PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9000,EXTERNAL://:9011,FOR_PROXY://:9021,SECURE://:9097
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-1:9000,EXTERNAL://localhost:9011,FOR_PROXY://kafka-1:9021,SECURE://localhost:9097
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT,FOR_PROXY:PLAINTEXT,SECURE:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT
- KAFKA_LISTENERS=CLIENT://:9000,EXTERNAL://:9011,FOR_PROXY://:9021,SECURE://:9097,CONTROLLER://:9051
- KAFKA_ADVERTISED_LISTENERS=CLIENT://kafka-1:9000,EXTERNAL://localhost:9011,FOR_PROXY://kafka-1:9021,SECURE://localhost:9097
- KAFKA_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9050,1@kafka-1:9051,2@kafka-2:9052
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
- KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_AUTO_CREATE_TOPICS_ENABLE=false
- KAFKA_CLIENT_USERS=admin
- KAFKA_CLIENT_PASSWORDS=admin-secret
- KAFKA_CLIENT_LISTENER_NAME=SECURE
volumes:
- kafka_1_data:/bitnami/kafka
- ./kafka_jaas.conf:/opt/bitnami/kafka/config/kafka_jaas.conf
depends_on:
- zookeeper
- KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/kafka_jaas.conf
- KAFKA_SASL_ENABLED_MECHANISMS=PLAIN
kafka-2:
image: docker.io/bitnami/kafka:3.9.0
build:
context: docker
dockerfile: Dockerfile.kafka
ports:
- "9012:9012"
- "9098:9098"
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CFG_BROKER_ID=2
- CLUSTER_ID='4L6g3nShT-eMCtK--X86sw'
- KAFKA_NODE_ID=2
- KAFKA_PROCESS_ROLES=broker,controller
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT,FOR_PROXY:PLAINTEXT,SECURE:SASL_PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9000,EXTERNAL://:9012,FOR_PROXY://:9022,SECURE://:9098
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-2:9000,EXTERNAL://localhost:9012,FOR_PROXY://kafka-2:9022,SECURE://localhost:9098
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT,FOR_PROXY:PLAINTEXT,SECURE:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT
- KAFKA_LISTENERS=CLIENT://:9000,EXTERNAL://:9012,FOR_PROXY://:9022,SECURE://:9098,CONTROLLER://:9052
- KAFKA_ADVERTISED_LISTENERS=CLIENT://kafka-2:9000,EXTERNAL://localhost:9012,FOR_PROXY://kafka-2:9022,SECURE://localhost:9098
- KAFKA_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9050,1@kafka-1:9051,2@kafka-2:9052
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
- KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_AUTO_CREATE_TOPICS_ENABLE=false
- KAFKA_CLIENT_USERS=admin
- KAFKA_CLIENT_PASSWORDS=admin-secret
- KAFKA_CLIENT_LISTENER_NAME=SECURE
volumes:
- kafka_2_data:/bitnami/kafka
- ./kafka_jaas.conf:/opt/bitnami/kafka/config/kafka_jaas.conf
depends_on:
- zookeeper
- KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/kafka_jaas.conf
- KAFKA_SASL_ENABLED_MECHANISMS=PLAIN
proxy:
image: serjs/go-socks5-proxy
ports:
- "1080:1080"
environment:
- REQUIRE_AUTH=false
depends_on:
- kafka-0
- kafka-1
- kafka-2

volumes:
zookeeper_data:
driver: local
kafka_0_data:
driver: local
kafka_1_data:
driver: local
kafka_2_data:
driver: local
3 changes: 3 additions & 0 deletions docker/Dockerfile.kafka
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
FROM docker.io/apache/kafka:4.1.0

COPY ./kafka_jaas.conf /etc/kafka/kafka_jaas.conf
6 changes: 4 additions & 2 deletions kafka_jaas.conf → docker/kafka_jaas.conf
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
user_admin="admin-secret";
user_admin="admin-secret"
serviceName="kafka";
};
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret";
password="admin-secret"
serviceName="kafka";
};
Client{};
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[toolchain]
channel = "1.88"
channel = "1.90"
components = [ "rustfmt", "clippy" ]
8 changes: 4 additions & 4 deletions src/backoff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,10 @@ impl Iterator for Backoff {
let backoff =
Duration::from_secs_f64(std::mem::replace(&mut self.next_backoff_secs, next_backoff));

if let Some(deadline) = self.deadline {
if self.total >= deadline {
return None;
}
if let Some(deadline) = self.deadline
&& self.total >= deadline
{
return None;
}
Some(backoff)
}
Expand Down
10 changes: 5 additions & 5 deletions src/client/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,11 +490,11 @@ where
// If this caller is is intending to conditionally flush a specific
// batch, verify this BatchBuilder is the batch it is indenting to
// flush.
if let Some(token) = flusher_token {
if token != self.flush_clock {
debug!(client=?self.client, "spurious batch flush call");
return Ok(());
}
if let Some(token) = flusher_token
&& token != self.flush_clock
{
debug!(client=?self.client, "spurious batch flush call");
return Ok(());
}

debug!(client=?self.client, "flushing batch");
Expand Down
8 changes: 4 additions & 4 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,10 +290,10 @@ impl BrokerConnector {
// perform multiple requests until the cache is populated. However, the
// Client initialises this cache at construction time, so unless
// invalidated, there will always be a cached entry available.
if matches!(metadata_mode, MetadataLookupMode::CachedArbitrary) {
if let Some((m, r#gen)) = self.cached_metadata.get(&topics) {
return Ok((m, Some(r#gen)));
}
if matches!(metadata_mode, MetadataLookupMode::CachedArbitrary)
&& let Some((m, r#gen)) = self.cached_metadata.get(&topics)
{
return Ok((m, Some(r#gen)));
}

let backoff = Backoff::new(&self.backoff_config);
Expand Down
Loading