-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Closed
Labels
Description
Apache Airflow Provider(s)
apache-kafka
Versions of Apache Airflow Providers
apache-airflow-providers-apache-kafka==1.6.0
Apache Airflow version
2.10.2
Operating System
Debian GNU/Linux 12 (bookworm)
Deployment
Docker-Compose
Deployment details
No response
What happened
Since Apache Airflow 2.10.0 and introduction of the feature: callable for template_fields (#37028) operators do not longer work as it just fails at execution.
Indeed callable field producer_function of the ProduceToTopicOperator is part of template_fields which make fail DAG execution.
template_fields = (
"topic",
"producer_function",
"producer_function_args",
"producer_function_kwargs",
"kafka_config_id",
)Here's the execution log :
[2024-09-25, 18:13:51 CEST] {abstractoperator.py:778} ERROR - Exception rendering Jinja template for task 'produce_treats', field 'producer_function'. Template: <function prod_function at 0x7ff0cd915d00>
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/abstractoperator.py", line 768, in _do_render_template_fields
rendered_content = value(context=context, jinja_env=jinja_env)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: prod_function() got an unexpected keyword argument 'context'
[2024-09-25, 18:13:51 CEST] {taskinstance.py:3310} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 273, in _run_raw_task
TaskInstance._execute_task_with_callbacks(
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 3114, in _execute_task_with_callbacks
task_orig = self.render_templates(context=context, jinja_env=jinja_env)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 3533, in render_templates
original_task.render_template_fields(context, jinja_env)
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 1419, in render_template_fields
self._do_render_template_fields(self, self.template_fields, context, jinja_env, set())
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/abstractoperator.py", line 768, in _do_render_template_fields
rendered_content = value(context=context, jinja_env=jinja_env)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: prod_function() got an unexpected keyword argument 'context'
The same happens with apply_function field in ConsumeFromTopicOperator :
template_fields = (
"topics",
"apply_function",
"apply_function_args",
"apply_function_kwargs",
"kafka_config_id",
)What you think should happen instead
No response
How to reproduce
Just use the DAG example available here in an Apache Airflow instance: https://airflow.apache.org/docs/apache-airflow-providers-apache-kafka/stable/_modules/tests/system/providers/apache/kafka/example_dag_hello_kafka.html
produce_treats = ProduceToTopicOperator(
task_id="produce_treats",
kafka_config_id="kafka_default",
topic=KAFKA_TOPIC,
producer_function=prod_function,
producer_function_args=["{{ ti.xcom_pull(task_ids='get_number_of_treats')}}"],
producer_function_kwargs={
"pet_name": "{{ ti.xcom_pull(task_ids='get_your_pet_name')}}"
},
poll_timeout=10,
)Anything else
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct