-
Notifications
You must be signed in to change notification settings - Fork 338
Open
Description
Summary
- When bootstrap.servers is set to localhost:, the client fails to connect to the broker with BrokerTransportFailure / AllBrokersDown.
- Root cause: localhost resolves to ::1 (IPv6) on the host, while the broker is reachable only via IPv4. librdkafka attempts IPv6 and gets “Network is unreachable.”
Environment
- OS: Linux archlinux 6.17.7
- Kafka: confluentinc/cp-kafka:7.4.4 (Docker)
- Port mapping: host 29092 -> container
- App: apps/analytics (Rust, rdkafka)
Steps to Reproduce
Prerequisites
- Kafka + Zookeeper running in Docker, Kafka exposed on host port 29092 (typical Confluent cp-kafka setup).
- Your app runs on the host (not in Docker).
Steps
- Create a .env with localhost (this is the trigger)
KAFKA_BOOTSTRAP_SERVERS=localhost:29092
KAFKA_GROUP_ID=analytics-consumer
KAFKA_INPUT_TOPIC=events
- Use a consumer config that does NOT force IPv4
use rdkafka::config::ClientConfig;
use rdkafka::consumer::{Consumer, StreamConsumer};
pub struct App {
consumer: StreamConsumer,
}
impl App {
pub fn new(brokers: &str, group_id: &str, input_topic: &str) -> App {
let consumer: StreamConsumer = ClientConfig::new()
.set("group.id", group_id)
.set("bootstrap.servers", brokers)
// NOTE: intentionally NOT setting "broker.address.family" to reproduce the issue
.set("enable.partition.eof", "false")
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "false")
.create()
.expect("Consumer creation failed");
consumer
.subscribe(&[input_topic])
.expect("Can't subscribe to specified topic");
App { consumer }
}
pub async fn run(&self) {
use futures::TryStreamExt;
let stream_processor = self
.consumer
.stream()
.try_for_each(|borrowed_message| async move {
let owned = borrowed_message.detach();
tokio::spawn(async move {
let result = async {
info!("start processing message");
process_message(owned).await
}
.instrument(span)
.await;
match result {
Ok(_) => ...,
Err(error) => ...,
};
});
Ok(())
});
info!("starting event loop");
stream_processor.await.expect("stream processing failed");
info!("stream processing terminated");
}
}
- Wire it up in main
mod config;
use config::settings::Settings;
use config::app::App;
#[tokio::main]
async fn main() {
let app = App::new(
&s.kafka_bootstrap_servers, // "localhost:29092"
&s.kafka_group_id,
&s.kafka_input_topic,
);
app.run().await;
}
- Run with logs enabled
RUST_LOG=info,rdkafka=debug cargo run
Actual result (example logs)
2025-11-22T13:47:20.858347Z INFO starting event loop
2025-11-22T13:47:21.867736Z ERROR librdkafka: FAIL [thrd:GroupCoordinator]: GroupCoordinator: localhost:29092: Failed to connect to broker at [localhost]:29092: Network is unreachable (after 0ms in state CONNECT)
2025-11-22T13:47:21.867855Z ERROR librdkafka: Global error: BrokerTransportFailure (Local: Broker transport failure): GroupCoordinator: localhost:29092: Failed to connect to broker at [localhost]:29092: Network is unreachable (after 0ms in state CONNECT)
thread 'main' panicked at src/config/app.rs:..., stream processing failed: KafkaError (Message consumption error: BrokerTransportFailure (Local: Broker transport failure))
Notes
- This reproduces the issue where localhost resolves to ::1 (IPv6) and the broker is IPv4-only, causing “Network is unreachable”.
- Workarounds (not applied in the repro to keep it failing by design):
- Use 127.0.0.1:29092 in KAFKA_BOOTSTRAP_SERVERS.
- Or add .set("broker.address.family", "v4") to ClientConfig.
Expected Behavior
- Client connects successfully when using localhost:29092.
Root Cause
- On many Linux systems, localhost resolves to ::1 (IPv6).
- The Kafka broker (in Docker) is listening/advertising only IPv4, so IPv6 connection attempts fail.
- librdkafka tries IPv6 and reports “Network is unreachable.”
Workarounds
- Use IPv4 literal instead of localhost:
KAFKA_BOOTSTRAP_SERVERS=127.0.0.1:29092 cargo run
- Force IPv4 in the client config:
use rdkafka::config::ClientConfig;
let consumer = ClientConfig::new()
.set("group.id", group_id)
.set("bootstrap.servers", brokers)
.set("broker.address.family", "v4") // force IPv4
.set("enable.partition.eof", "false")
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "false")
.create()
.expect("Consumer creation failed");
Metadata
Metadata
Assignees
Labels
No labels