Skip to content

Error run message queue trigger when event arrived due to serialization #51809

@dingo4dev

Description

@dingo4dev

Apache Airflow version

3.0.2

If "Other Airflow 2 version" selected, which one?

No response

What happened?

What Happened

When below trigger was started and there was a message event arrived, the trigger job running occurred error with unexpected arguments.

def apply_function(message,**kwargs):
    val = json.loads(message.value())
    print(f"Value in message is {val}")
    return True

trigger = MessageQueueTrigger(
    queue="kafka://localhost:9092/test",
    apply_function="kafka_example_dag.apply_function",
    kafka_config_id="kafka_t1"
)

Error

Trigger ID 12 exited with error apply_function() got an unexpected keyword argument '__var' [airflow.jobs.triggerer_job_runner] 
error_detail=[{'exc_type': 'TypeError', 'exc_value': "apply_function() got an unexpected keyword argument '__var'", 
'exc_notes': [], 'syntax_error': None, 'is_cause': False, 'frames': [
{'filename': '/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py', 'lineno': 901, 'name': 'cleanup_finished_triggers'}, 
{'filename': '/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py', 'lineno': 1014, 'name': 'run_trigger'}, 
{'filename': '/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/apache/kafka/triggers/await_message.py', 'lineno': 111, 'name': 'run'}, 
{'filename': '/home/airflow/.local/lib/python3.12/site-packages/asgiref/sync.py', 'lineno': 468, 'name': '__call__'}, {'filename': '/usr/local/lib/python3.12/concurrent/futures/thread.py', 'lineno': 59, 'name': 'run'},
 {'filename': '/home/airflow/.local/lib/python3.12/site-packages/asgiref/sync.py', 'lineno': 522, 'name': 'thread_handler'}],
 'is_group': False, 'exceptions': []}]

What you think should happen instead?

As the 3.0.1 Version is working fine, so I printout the serialized event trigger object.

## use above trigger

from airflow.serialization import serialized_objects


serialized_objects._encode_trigger(trigger)

""" Airflow 3.0.2 outputs:
{'classpath': 'airflow.providers.apache.kafka.triggers.await_message.AwaitMessageTrigger',
 'kwargs': {'topics': ['test_topic'],
  'apply_function': 'test.noop',
  'apply_function_args': {<Encoding.VAR: '__var'>: [],
   <Encoding.TYPE: '__type'>: <DagAttributeTypes.TUPLE: 'tuple'>},
  'apply_function_kwargs': {<Encoding.VAR: '__var'>: {},
   <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>},
  'kafka_config_id': 'kafka_default',
  'poll_timeout': 1,
  'poll_interval': 5}}
"""


""" Airflow 3.0.1 outputs:
{'classpath': 'airflow.providers.apache.kafka.triggers.await_message.AwaitMessageTrigger',
 'kwargs': {'topics': ['test_topic'],
  'apply_function': 'test.noop',
  'apply_function_args': (),
  'apply_function_kwargs': {},
  'kafka_config_id': 'kafka_default',
  'poll_timeout': 1,
  'poll_interval': 5}}
"""

from airflow.providers.apache.kafka.triggers.await_message import AwaitMessageTrigger
trigger = AwaitMessageTrigger(
    topics=["test_topic"],
    apply_function="test.noop",
    kafka_config_id="kafka_default",
)
serialized_objects._encode_trigger(trigger)

""" Airflow 3.0.1 & 3.0.2 outputs:
{'classpath': 'airflow.providers.apache.kafka.triggers.await_message.AwaitMessageTrigger',
 'kwargs': {'topics': ['test_topic'],
  'apply_function': 'test.noop',
  'apply_function_args': (),
  'apply_function_kwargs': {},
  'kafka_config_id': 'kafka_default',
  'poll_timeout': 1,
  'poll_interval': 5}}
"""

How to reproduce

  1. Create kafka_example_dag.py
  2. Create connection for kafka_t1
  3. Add below code and send a message in Kafka topic test
import json

from airflow.providers.common.messaging.triggers.msg_queue import MessageQueueTrigger
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk import dag, Asset, AssetWatcher


def apply_function(message,**kwargs):
    val = json.loads(message.value())
    print(f"Value in message is {val}")
    return True

trigger = MessageQueueTrigger(
    queue="kafka://localhost:9092/test",
    apply_function="kafka_example_dag.apply_function",
    kafka_config_id="kafka_t1"
)

asset = Asset("kafka_queue_asset_2", watchers=[AssetWatcher(name="kafka_watcher_2", trigger=trigger)])

@dag(dag_id="example_kafka_watcher_2", schedule=[asset])
def example_kafka_watcher():
    EmptyOperator(task_id="task")
    
example_kafka_watcher()

Operating System

Debian GNU/Linux 12 (bookworm)

Versions of Apache Airflow Providers

apache-airflow-providers-apache-kafka==1.9.0
apache-airflow-providers-common-messaging==1.0.2

Deployment

Docker-Compose

Deployment details

x-airflow-common:
  &airflow-common
  # In order to add custom dependencies or upgrade provider distributions you can use your extended image.
  # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
  # and uncomment the "build" line below, Then run `docker-compose build` to build the images.
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:3.0.2}
  # build: .
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__CORE__AUTH_MANAGER: airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
    AIRFLOW__CORE__EXECUTION_API_SERVER_URL: 'http://airflow-apiserver:8080/execution/'
    AIRFLOW_CONN_KAFKA_DEFAULT: 'kafka://kafka:29092'
    PYTHONPATH: /opt/airflow/dags:$PYTHONPATH
    # yamllint disable rule:line-length
    # Use simple http server on scheduler for health checks
    # See https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/check-health.html#scheduler-health-check-server
    # yamllint enable rule:line-length
    AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true'
    # WARNING: Use _PIP_ADDITIONAL_REQUIREMENTS option ONLY for a quick checks
    # for other purpose (development, test and especially production usage) build/extend Airflow image.
    _PIP_ADDITIONAL_REQUIREMENTS: apache-airflow-providers-apache-kafka apache-airflow-providers-git
    # The following line can be used to set a custom config file, stored in the local config folder
    AIRFLOW_CONFIG: '/opt/airflow/config/airflow.cfg'
  volumes:
    - ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags
    - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
    - ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config
    - ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
  user: "${AIRFLOW_UID:-50000}:0"
  depends_on:
    &airflow-common-depends-on
    redis:
      condition: service_healthy
    postgres:
      condition: service_healthy
    kafka:
      condition: service_healthy
    

services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    ports:
      - "5432:5432"
    volumes:
      - postgres-db-volume:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "airflow"]
      interval: 10s
      retries: 5
      start_period: 5s
    restart: always

  redis:
    # Redis is limited to 7.2-bookworm due to licencing change
    # https://redis.io/blog/redis-adopts-dual-source-available-licensing/
    image: redis:7.2-bookworm
    expose:
      - 6379
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 10s
      timeout: 30s
      retries: 50
      start_period: 30s
    restart: always

  kafbat-ui:
    container_name: kafbat-ui
    image: ghcr.io/kafbat/kafka-ui:latest
    ports:
      - 8082:8080
    environment:
      DYNAMIC_CONFIG_ENABLED: 'true'
      KAFKA_CLUSTERS_0_NAME: airflow_test
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
    depends_on:
      kafka:
        condition: service_healthy
  kafka:
    image: confluentinc/cp-kafka:latest
    hostname: kafka
    user: "1000"  # Run as non-root user
    expose:
      - 9092
      - 29092
      - 29093
    ports:
      - "9092:9092"
      - "29093:29093"  # Controller port
      - "29092:29092"  # Broker port

    environment:
      KAFKA_KRAFT_MODE: "true"  # Enables KRaft mode.
      KAFKA_PROCESS_ROLES: broker,controller  # Kafka acts as both controller and broker.
      KAFKA_NODE_ID: 1  # Unique ID for the Kafka instance.
      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka:29093"  # Controller quorum.
      KAFKA_LISTENERS: PLAINTEXT://kafka:29092,CONTROLLER://kafka:29093,PLAINTEXT_HOST://kafka:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://kafka:9092
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"  # Enables automatic topic creation.
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1  # Single replica for simplicity.
      KAFKA_LOG_RETENTION_HOURS: 168  # Log retention period (7 days).
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0  # No rebalance delay.
      CLUSTER_ID: "c31d0fa15a5b483bbdf1edd4c8017394"  # Unique Kafka cluster ID.
    healthcheck:
      test: ["CMD", "kafka-topics", "--bootstrap-server", "kafka:9092", "--list"]
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    restart: always
    volumes:
      - ./kafka-data:/var/lib/kafka/data

  airflow-apiserver:
    <<: *airflow-common
    command: api-server
    ports:
      - "8080:8080"
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8080/api/v2/version"]
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8974/health"]
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-dag-processor:
    <<: *airflow-common
    command: dag-processor
    healthcheck:
      test: ["CMD-SHELL", 'airflow jobs check --job-type DagProcessorJob --hostname "$${HOSTNAME}"']
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-worker:
    <<: *airflow-common
    command: celery worker
    healthcheck:
      # yamllint disable rule:line-length
      test:
        - "CMD-SHELL"
        - 'celery --app airflow.providers.celery.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}" || celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    environment:
      <<: *airflow-common-env
      # Required to handle warm shutdown of the celery workers properly
      # See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation
      DUMB_INIT_SETSID: "0"
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-apiserver:
        condition: service_healthy
      airflow-init:
        condition: service_completed_successfully

  airflow-triggerer:
    <<: *airflow-common
    command: triggerer
    healthcheck:
      test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-init:
    <<: *airflow-common
    entrypoint: /bin/bash
    # yamllint disable rule:line-length
    command:
      - -c
      - |
        if [[ -z "${AIRFLOW_UID}" ]]; then
          echo
          echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
          echo "If you are on Linux, you SHOULD follow the instructions below to set "
          echo "AIRFLOW_UID environment variable, otherwise files will be owned by root."
          echo "For other operating systems you can get rid of the warning with manually created .env file:"
          echo "    See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user"
          echo
          export AIRFLOW_UID=$(id -u)
        fi
        one_meg=1048576
        mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
        cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
        disk_available=$$(df / | tail -1 | awk '{print $$4}')
        warning_resources="false"
        if (( mem_available < 4000 )) ; then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
          echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
          echo
          warning_resources="true"
        fi
        if (( cpus_available < 2 )); then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
          echo "At least 2 CPUs recommended. You have $${cpus_available}"
          echo
          warning_resources="true"
        fi
        if (( disk_available < one_meg * 10 )); then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
          echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
          echo
          warning_resources="true"
        fi
        if [[ $${warning_resources} == "true" ]]; then
          echo
          echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
          echo "Please follow the instructions to increase amount of resources available:"
          echo "   https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin"
          echo
        fi
        echo
        echo "Creating missing opt dirs if missing:"
        echo
        mkdir -v -p /opt/airflow/{logs,dags,plugins,config}
        echo
        echo "Airflow version:"
        /entrypoint airflow version
        echo
        echo "Files in shared volumes:"
        echo
        ls -la /opt/airflow/{logs,dags,plugins,config}
        echo
        echo "Running airflow config list to create default config file if missing."
        echo
        /entrypoint airflow config list >/dev/null
        echo
        echo "Files in shared volumes:"
        echo
        ls -la /opt/airflow/{logs,dags,plugins,config}
        echo
        echo "Change ownership of files in /opt/airflow to ${AIRFLOW_UID}:0"
        echo
        chown -R "${AIRFLOW_UID}:0" /opt/airflow/
        echo
        echo "Change ownership of files in shared volumes to ${AIRFLOW_UID}:0"
        echo
        chown -v -R "${AIRFLOW_UID}:0" /opt/airflow/{logs,dags,plugins,config}
        echo
        echo "Files in shared volumes:"
        echo
        ls -la /opt/airflow/{logs,dags,plugins,config}

    # yamllint enable rule:line-length
    environment:
      <<: *airflow-common-env
      _AIRFLOW_DB_MIGRATE: 'true'
      _AIRFLOW_WWW_USER_CREATE: 'true'
      _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
      _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
      _PIP_ADDITIONAL_REQUIREMENTS: 'apache-airflow-providers-apache-kafka apache-airflow-providers-git'
    user: "0:0"

  airflow-cli:
    <<: *airflow-common
    profiles:
      - debug
    environment:
      <<: *airflow-common-env
      CONNECTION_CHECK_MAX_COUNT: "0"
    # Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252
    command:
      - bash
      - -c
      - airflow
    depends_on:
      <<: *airflow-common-depends-on

  # You can enable flower by adding "--profile flower" option e.g. docker-compose --profile flower up
  # or by explicitly targeted on the command line e.g. docker-compose up flower.
  # See: https://docs.docker.com/compose/profiles/
  flower:
    <<: *airflow-common
    command: celery flower
    profiles:
      - flower
    ports:
      - "5555:5555"
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

networks:
  default:
    driver: bridge
    driver_opts:
      com.docker.network.bridge.host_binding_ipv4: "127.0.0.1"

volumes:
  postgres-db-volume:

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions