Skip to content

Apache Kafka operators ProduceToTopicOperator and ConsumeFromTopicOperator unusable since Airflow >= 2.10.0 #42502

@mxmrlt

Description

@mxmrlt

Apache Airflow Provider(s)

apache-kafka

Versions of Apache Airflow Providers

apache-airflow-providers-apache-kafka==1.6.0

Apache Airflow version

2.10.2

Operating System

Debian GNU/Linux 12 (bookworm)

Deployment

Docker-Compose

Deployment details

No response

What happened

Since Apache Airflow 2.10.0 and introduction of the feature: callable for template_fields (#37028) operators do not longer work as it just fails at execution.

Indeed callable field producer_function of the ProduceToTopicOperator is part of template_fields which make fail DAG execution.

template_fields = (
    "topic",
    "producer_function",
    "producer_function_args",
    "producer_function_kwargs",
    "kafka_config_id",
)

Here's the execution log :

[2024-09-25, 18:13:51 CEST] {abstractoperator.py:778} ERROR - Exception rendering Jinja template for task 'produce_treats', field 'producer_function'. Template: <function prod_function at 0x7ff0cd915d00>
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/abstractoperator.py", line 768, in _do_render_template_fields
    rendered_content = value(context=context, jinja_env=jinja_env)
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: prod_function() got an unexpected keyword argument 'context'
[2024-09-25, 18:13:51 CEST] {taskinstance.py:3310} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 273, in _run_raw_task
    TaskInstance._execute_task_with_callbacks(
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 3114, in _execute_task_with_callbacks
    task_orig = self.render_templates(context=context, jinja_env=jinja_env)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 3533, in render_templates
    original_task.render_template_fields(context, jinja_env)
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 1419, in render_template_fields
    self._do_render_template_fields(self, self.template_fields, context, jinja_env, set())
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/abstractoperator.py", line 768, in _do_render_template_fields
    rendered_content = value(context=context, jinja_env=jinja_env)
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: prod_function() got an unexpected keyword argument 'context'

The same happens with apply_function field in ConsumeFromTopicOperator :

template_fields = (
    "topics",
    "apply_function",
    "apply_function_args",
    "apply_function_kwargs",
    "kafka_config_id",
)

What you think should happen instead

No response

How to reproduce

Just use the DAG example available here in an Apache Airflow instance: https://airflow.apache.org/docs/apache-airflow-providers-apache-kafka/stable/_modules/tests/system/providers/apache/kafka/example_dag_hello_kafka.html

produce_treats = ProduceToTopicOperator(
    task_id="produce_treats",
    kafka_config_id="kafka_default",
    topic=KAFKA_TOPIC,
    producer_function=prod_function,
    producer_function_args=["{{ ti.xcom_pull(task_ids='get_number_of_treats')}}"],
    producer_function_kwargs={
        "pet_name": "{{ ti.xcom_pull(task_ids='get_your_pet_name')}}"
    },
    poll_timeout=10,
)

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions