Skip to content

Kafka Provider Adds Invalid "timeout" Property Causing KafkaError in Airflow 2.10.5 #49406

@pedrohenriquecordeiro

Description

@pedrohenriquecordeiro

Apache Airflow Provider(s)

apache-kafka

Versions of Apache Airflow Providers

apache-airflow-providers-apache-kafka==1.8.0

Apache Airflow version

Airflow 2.10.5

Operating System

OS-IMAGE: Container-Optimized OS from Google

Deployment

Official Apache Airflow Helm Chart

Deployment details

Dockerfile :

FROM apache/airflow:2.10.5-python3.12

# install your pip packages
RUN pip install --no-cache-dir \
    apache-airflow-providers-apache-kafka==1.8.0 \
    confluent-kafka==2.8.2 \
    asgiref==3.8.0

RUN pip freeze
RUN airflow providers list

ENV AIRFLOW__CORE__TEST_CONNECTION=Enabled

Helm Chart:

executor: "KubernetesExecutor"
allowPodLaunching: true
webserverSecretKeySecretName: webserver-secret

images:
  airflow:
    repository: phcjesus/apache-airflow-with-extensions # docker hub
    tag: v0.0.4
    pullPolicy: IfNotPresent
  gitSync:
    repository: registry.k8s.io/git-sync/git-sync
    tag: v4.3.0
    pullPolicy: IfNotPresent

affinity:
  nodeAffinity:
    requiredDuringSchedulingIgnoredDuringExecution:
      nodeSelectorTerms:
      - matchExpressions:
        - key: node-pool-name
          operator: In
          values:
          - pool-type-1

securityContext:
  runAsUser: 50000
  fsGroup: 0
  runAsGroup: 0

airflowPodAnnotations:
  gke-gcsfuse/volumes: "true"

postgresql:
  enabled: true

webserver:
  replicas: 2
  hpa:
    enabled: true
    minReplicaCount: 1
    maxReplicaCount: 3
  service:
    type: ClusterIP
  serviceAccount:
    create: false
    name: airflow-service-account

scheduler:
  serviceAccount:
    create: false
    name: airflow-service-account

triggerer:
  serviceAccount:
    create: false
    name: airflow-service-account

workers:
  replicas: 2
  serviceAccount:
    create: false
    name: airflow-service-account
  logGroomerSidecar:
    enabled: true
  hpa:
    enabled: true
    minReplicaCount: 1
    maxReplicaCount: 10

statsd:
  enabled: false

logs:
  persistence:
    enabled: true
    size: 50Gi
    existingClaim: gcs-fuse-csi-pvc

What happened

When I try to test the connection in the Apache Airflow 2.10.5 user interface to a Kafka cluster, no matter what instructions I put in the "Config Dict" field, I get the following error:

KafkaError{code=_INVALID_ARG,val=-186,str="No such configuration property: \"timeout\""}

Here's the evidences when click in Test bottom:

Image

Image

Image

What you think should happen instead

I believe there's an issue in the code that performs the connection test. The system appears to be implicitly injecting the "timeout" parameter, which is not a recognized attribute by the Apache Kafka API, resulting in an error.

How to reproduce

To reproduce the Kafka connection failure in Airflow (v2.10.5), follow these steps:

  1. Access the Airflow user interface (UI) and navigate to Admin > Connections.
  2. Set the connection ID to an arbitrary identifier.
  3. Locate the "Apache Kafka" option within the connection type selection field.
  4. In the "Config Dict" field, enter a JSON string containing valid Kafka properties or leave the field empty.
  5. Execute the connection test.

Anything else

During the execution of the connection test, I monitored the logs and events within the Kubernetes cluster; however, no relevant occurrences were identified throughout the procedure. Consequently, no error logs were detected.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions