-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Closed
Labels
affected_version:3.0Issues Reported for 3.0Issues Reported for 3.0area:Triggererarea:corekind:bugThis is a clearly a bugThis is a clearly a bugpriority:mediumBug that should be fixed before next release but would not block a releaseBug that should be fixed before next release but would not block a release
Description
Apache Airflow version
3.0.2
If "Other Airflow 2 version" selected, which one?
No response
What happened?
What Happened
When below trigger was started and there was a message event arrived, the trigger job running occurred error with unexpected arguments.
def apply_function(message,**kwargs):
val = json.loads(message.value())
print(f"Value in message is {val}")
return True
trigger = MessageQueueTrigger(
queue="kafka://localhost:9092/test",
apply_function="kafka_example_dag.apply_function",
kafka_config_id="kafka_t1"
)Error
Trigger ID 12 exited with error apply_function() got an unexpected keyword argument '__var' [airflow.jobs.triggerer_job_runner]
error_detail=[{'exc_type': 'TypeError', 'exc_value': "apply_function() got an unexpected keyword argument '__var'",
'exc_notes': [], 'syntax_error': None, 'is_cause': False, 'frames': [
{'filename': '/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py', 'lineno': 901, 'name': 'cleanup_finished_triggers'},
{'filename': '/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py', 'lineno': 1014, 'name': 'run_trigger'},
{'filename': '/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/apache/kafka/triggers/await_message.py', 'lineno': 111, 'name': 'run'},
{'filename': '/home/airflow/.local/lib/python3.12/site-packages/asgiref/sync.py', 'lineno': 468, 'name': '__call__'}, {'filename': '/usr/local/lib/python3.12/concurrent/futures/thread.py', 'lineno': 59, 'name': 'run'},
{'filename': '/home/airflow/.local/lib/python3.12/site-packages/asgiref/sync.py', 'lineno': 522, 'name': 'thread_handler'}],
'is_group': False, 'exceptions': []}]What you think should happen instead?
As the 3.0.1 Version is working fine, so I printout the serialized event trigger object.
## use above trigger
from airflow.serialization import serialized_objects
serialized_objects._encode_trigger(trigger)
""" Airflow 3.0.2 outputs:
{'classpath': 'airflow.providers.apache.kafka.triggers.await_message.AwaitMessageTrigger',
'kwargs': {'topics': ['test_topic'],
'apply_function': 'test.noop',
'apply_function_args': {<Encoding.VAR: '__var'>: [],
<Encoding.TYPE: '__type'>: <DagAttributeTypes.TUPLE: 'tuple'>},
'apply_function_kwargs': {<Encoding.VAR: '__var'>: {},
<Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>},
'kafka_config_id': 'kafka_default',
'poll_timeout': 1,
'poll_interval': 5}}
"""
""" Airflow 3.0.1 outputs:
{'classpath': 'airflow.providers.apache.kafka.triggers.await_message.AwaitMessageTrigger',
'kwargs': {'topics': ['test_topic'],
'apply_function': 'test.noop',
'apply_function_args': (),
'apply_function_kwargs': {},
'kafka_config_id': 'kafka_default',
'poll_timeout': 1,
'poll_interval': 5}}
"""
from airflow.providers.apache.kafka.triggers.await_message import AwaitMessageTrigger
trigger = AwaitMessageTrigger(
topics=["test_topic"],
apply_function="test.noop",
kafka_config_id="kafka_default",
)
serialized_objects._encode_trigger(trigger)
""" Airflow 3.0.1 & 3.0.2 outputs:
{'classpath': 'airflow.providers.apache.kafka.triggers.await_message.AwaitMessageTrigger',
'kwargs': {'topics': ['test_topic'],
'apply_function': 'test.noop',
'apply_function_args': (),
'apply_function_kwargs': {},
'kafka_config_id': 'kafka_default',
'poll_timeout': 1,
'poll_interval': 5}}
"""How to reproduce
- Create
kafka_example_dag.py - Create connection for
kafka_t1 - Add below code and send a message in Kafka topic
test
import json
from airflow.providers.common.messaging.triggers.msg_queue import MessageQueueTrigger
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk import dag, Asset, AssetWatcher
def apply_function(message,**kwargs):
val = json.loads(message.value())
print(f"Value in message is {val}")
return True
trigger = MessageQueueTrigger(
queue="kafka://localhost:9092/test",
apply_function="kafka_example_dag.apply_function",
kafka_config_id="kafka_t1"
)
asset = Asset("kafka_queue_asset_2", watchers=[AssetWatcher(name="kafka_watcher_2", trigger=trigger)])
@dag(dag_id="example_kafka_watcher_2", schedule=[asset])
def example_kafka_watcher():
EmptyOperator(task_id="task")
example_kafka_watcher()Operating System
Debian GNU/Linux 12 (bookworm)
Versions of Apache Airflow Providers
apache-airflow-providers-apache-kafka==1.9.0
apache-airflow-providers-common-messaging==1.0.2
Deployment
Docker-Compose
Deployment details
x-airflow-common:
&airflow-common
# In order to add custom dependencies or upgrade provider distributions you can use your extended image.
# Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
# and uncomment the "build" line below, Then run `docker-compose build` to build the images.
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:3.0.2}
# build: .
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__AUTH_MANAGER: airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__CORE__EXECUTION_API_SERVER_URL: 'http://airflow-apiserver:8080/execution/'
AIRFLOW_CONN_KAFKA_DEFAULT: 'kafka://kafka:29092'
PYTHONPATH: /opt/airflow/dags:$PYTHONPATH
# yamllint disable rule:line-length
# Use simple http server on scheduler for health checks
# See https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/check-health.html#scheduler-health-check-server
# yamllint enable rule:line-length
AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true'
# WARNING: Use _PIP_ADDITIONAL_REQUIREMENTS option ONLY for a quick checks
# for other purpose (development, test and especially production usage) build/extend Airflow image.
_PIP_ADDITIONAL_REQUIREMENTS: apache-airflow-providers-apache-kafka apache-airflow-providers-git
# The following line can be used to set a custom config file, stored in the local config folder
AIRFLOW_CONFIG: '/opt/airflow/config/airflow.cfg'
volumes:
- ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags
- ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
- ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config
- ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
user: "${AIRFLOW_UID:-50000}:0"
depends_on:
&airflow-common-depends-on
redis:
condition: service_healthy
postgres:
condition: service_healthy
kafka:
condition: service_healthy
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
ports:
- "5432:5432"
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 10s
retries: 5
start_period: 5s
restart: always
redis:
# Redis is limited to 7.2-bookworm due to licencing change
# https://redis.io/blog/redis-adopts-dual-source-available-licensing/
image: redis:7.2-bookworm
expose:
- 6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 30s
retries: 50
start_period: 30s
restart: always
kafbat-ui:
container_name: kafbat-ui
image: ghcr.io/kafbat/kafka-ui:latest
ports:
- 8082:8080
environment:
DYNAMIC_CONFIG_ENABLED: 'true'
KAFKA_CLUSTERS_0_NAME: airflow_test
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
depends_on:
kafka:
condition: service_healthy
kafka:
image: confluentinc/cp-kafka:latest
hostname: kafka
user: "1000" # Run as non-root user
expose:
- 9092
- 29092
- 29093
ports:
- "9092:9092"
- "29093:29093" # Controller port
- "29092:29092" # Broker port
environment:
KAFKA_KRAFT_MODE: "true" # Enables KRaft mode.
KAFKA_PROCESS_ROLES: broker,controller # Kafka acts as both controller and broker.
KAFKA_NODE_ID: 1 # Unique ID for the Kafka instance.
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka:29093" # Controller quorum.
KAFKA_LISTENERS: PLAINTEXT://kafka:29092,CONTROLLER://kafka:29093,PLAINTEXT_HOST://kafka:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://kafka:9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" # Enables automatic topic creation.
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 # Single replica for simplicity.
KAFKA_LOG_RETENTION_HOURS: 168 # Log retention period (7 days).
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 # No rebalance delay.
CLUSTER_ID: "c31d0fa15a5b483bbdf1edd4c8017394" # Unique Kafka cluster ID.
healthcheck:
test: ["CMD", "kafka-topics", "--bootstrap-server", "kafka:9092", "--list"]
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
volumes:
- ./kafka-data:/var/lib/kafka/data
airflow-apiserver:
<<: *airflow-common
command: api-server
ports:
- "8080:8080"
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/api/v2/version"]
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-scheduler:
<<: *airflow-common
command: scheduler
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8974/health"]
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-dag-processor:
<<: *airflow-common
command: dag-processor
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type DagProcessorJob --hostname "$${HOSTNAME}"']
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-worker:
<<: *airflow-common
command: celery worker
healthcheck:
# yamllint disable rule:line-length
test:
- "CMD-SHELL"
- 'celery --app airflow.providers.celery.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}" || celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
environment:
<<: *airflow-common-env
# Required to handle warm shutdown of the celery workers properly
# See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation
DUMB_INIT_SETSID: "0"
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-apiserver:
condition: service_healthy
airflow-init:
condition: service_completed_successfully
airflow-triggerer:
<<: *airflow-common
command: triggerer
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-init:
<<: *airflow-common
entrypoint: /bin/bash
# yamllint disable rule:line-length
command:
- -c
- |
if [[ -z "${AIRFLOW_UID}" ]]; then
echo
echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
echo "If you are on Linux, you SHOULD follow the instructions below to set "
echo "AIRFLOW_UID environment variable, otherwise files will be owned by root."
echo "For other operating systems you can get rid of the warning with manually created .env file:"
echo " See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user"
echo
export AIRFLOW_UID=$(id -u)
fi
one_meg=1048576
mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
disk_available=$$(df / | tail -1 | awk '{print $$4}')
warning_resources="false"
if (( mem_available < 4000 )) ; then
echo
echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
echo
warning_resources="true"
fi
if (( cpus_available < 2 )); then
echo
echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
echo "At least 2 CPUs recommended. You have $${cpus_available}"
echo
warning_resources="true"
fi
if (( disk_available < one_meg * 10 )); then
echo
echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
echo
warning_resources="true"
fi
if [[ $${warning_resources} == "true" ]]; then
echo
echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
echo "Please follow the instructions to increase amount of resources available:"
echo " https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin"
echo
fi
echo
echo "Creating missing opt dirs if missing:"
echo
mkdir -v -p /opt/airflow/{logs,dags,plugins,config}
echo
echo "Airflow version:"
/entrypoint airflow version
echo
echo "Files in shared volumes:"
echo
ls -la /opt/airflow/{logs,dags,plugins,config}
echo
echo "Running airflow config list to create default config file if missing."
echo
/entrypoint airflow config list >/dev/null
echo
echo "Files in shared volumes:"
echo
ls -la /opt/airflow/{logs,dags,plugins,config}
echo
echo "Change ownership of files in /opt/airflow to ${AIRFLOW_UID}:0"
echo
chown -R "${AIRFLOW_UID}:0" /opt/airflow/
echo
echo "Change ownership of files in shared volumes to ${AIRFLOW_UID}:0"
echo
chown -v -R "${AIRFLOW_UID}:0" /opt/airflow/{logs,dags,plugins,config}
echo
echo "Files in shared volumes:"
echo
ls -la /opt/airflow/{logs,dags,plugins,config}
# yamllint enable rule:line-length
environment:
<<: *airflow-common-env
_AIRFLOW_DB_MIGRATE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
_PIP_ADDITIONAL_REQUIREMENTS: 'apache-airflow-providers-apache-kafka apache-airflow-providers-git'
user: "0:0"
airflow-cli:
<<: *airflow-common
profiles:
- debug
environment:
<<: *airflow-common-env
CONNECTION_CHECK_MAX_COUNT: "0"
# Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252
command:
- bash
- -c
- airflow
depends_on:
<<: *airflow-common-depends-on
# You can enable flower by adding "--profile flower" option e.g. docker-compose --profile flower up
# or by explicitly targeted on the command line e.g. docker-compose up flower.
# See: https://docs.docker.com/compose/profiles/
flower:
<<: *airflow-common
command: celery flower
profiles:
- flower
ports:
- "5555:5555"
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
networks:
default:
driver: bridge
driver_opts:
com.docker.network.bridge.host_binding_ipv4: "127.0.0.1"
volumes:
postgres-db-volume:Anything else?
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct
Metadata
Metadata
Assignees
Labels
affected_version:3.0Issues Reported for 3.0Issues Reported for 3.0area:Triggererarea:corekind:bugThis is a clearly a bugThis is a clearly a bugpriority:mediumBug that should be fixed before next release but would not block a releaseBug that should be fixed before next release but would not block a release