Skip to content

Scheduler crash while using PythonVirtualenvOperator #47349

@atul-astronomer

Description

@atul-astronomer

Apache Airflow version

3.0.0b1

If "Other Airflow 2 version" selected, which one?

No response

What happened?

Scheduler is crasing while using PythonVirtualenvOperator.

Error:

File "/opt/airflow/airflow/cli/commands/local_commands/daemon_utils.py", line 86, in run_command_with_daemon_option
    callback()
  File "/opt/airflow/airflow/cli/commands/local_commands/scheduler_command.py", line 55, in <lambda>
    callback=lambda: _run_scheduler_job(args),
  File "/opt/airflow/airflow/cli/commands/local_commands/scheduler_command.py", line 43, in _run_scheduler_job
    run_job(job=job_runner.job, execute_callable=job_runner._execute)
  File "/opt/airflow/airflow/utils/session.py", line 101, in wrapper
    return func(*args, session=session, **kwargs)
  File "/opt/airflow/airflow/jobs/job.py", line 342, in run_job
    return execute_job(job, execute_callable=execute_callable)
  File "/opt/airflow/airflow/jobs/job.py", line 371, in execute_job
    ret = execute_callable()
  File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 926, in _execute
    self._run_scheduler_loop()
  File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 1051, in _run_scheduler_loop
    executor.heartbeat()
  File "/opt/airflow/airflow/traces/tracer.py", line 54, in wrapper
    return func(*args, **kwargs)
  File "/opt/airflow/airflow/executors/base_executor.py", line 252, in heartbeat
    self.trigger_tasks(open_slots)
  File "/opt/airflow/airflow/traces/tracer.py", line 54, in wrapper
    return func(*args, **kwargs)
  File "/opt/airflow/airflow/executors/base_executor.py", line 409, in trigger_tasks
    self._process_workloads(workloads)  # type: ignore[attr-defined]
  File "/opt/airflow/providers/celery/src/airflow/providers/celery/executors/celery_executor.py", line 281, in _process_workloads
    self._send_tasks(tasks)
  File "/opt/airflow/providers/celery/src/airflow/providers/celery/executors/celery_executor.py", line 290, in _send_tasks
    key_and_async_results = self._send_tasks_to_celery(task_tuples_to_send)
  File "/opt/airflow/providers/celery/src/airflow/providers/celery/executors/celery_executor.py", line 329, in _send_tasks_to_celery
    return list(map(send_task_to_executor, task_tuples_to_send))
  File "/opt/airflow/providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py", line 266, in send_task_to_executor
    args = (args.model_dump_json(),)
  File "/usr/local/lib/python3.9/site-packages/pydantic/main.py", line 477, in model_dump_json
    return self.__pydantic_serializer__.to_json(
pydantic_core._pydantic_core.PydanticSerializationError: Unable to serialize unknown type: <class 'kubernetes.client.models.v1_pod.V1Pod'>

What you think should happen instead?

PythonVirtualenvOperator should not cause scheduler to crash.

How to reproduce

Run the below DAG:

from airflow.models import DAG
from airflow.providers.standard.operators.python import PythonVirtualenvOperator
from pendulum import today
from kubernetes.client import models as k8s

def callable_virtualenv():
    """
    Example function that will be performed in a virtual environment.

    Importing at the module level ensures that it will not attempt to import the
    library before it is installed.
    """
    from time import sleep

    from colorama import Back, Fore, Style

    print(Fore.RED + "some red text")
    print(Back.GREEN + "and with a green background")
    print(Style.DIM + "and in dim text")
    print(Style.RESET_ALL)
    for _ in range(10):
        print(Style.DIM + "Please wait...", flush=True)
        sleep(10)
    print("Finished")


with DAG(
    dag_id="virtualenv_python_operator",
    default_args={"owner": "airflow"},
    schedule=None,
    start_date=today('UTC').add(days=-2),
    tags=["core"],
) as dag:

    task = PythonVirtualenvOperator(
        task_id="virtualenv_python",
        python_callable=callable_virtualenv,
        requirements=["colorama==0.4.0"],
        system_site_packages=False,
        executor_config={
            "pod_override": k8s.V1Pod(
                spec=k8s.V1PodSpec(
                    containers=[
                        k8s.V1Container(
                            name="base",
                            resources=k8s.V1ResourceRequirements(
                                requests={
                                    "cpu": "100m",
                                    "memory": "384Mi",
                                },
                                limits={
                                    "cpu": 1,
                                    "memory": "500Mi",
                                }
                            )
                        )
                    ]
                )
            )
        }
    )

Operating System

Linux

Versions of Apache Airflow Providers

No response

Deployment

Other

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Labels

area:Schedulerincluding HA (high availability) schedulerarea:corekind:bugThis is a clearly a bugpriority:criticalShowstopper bug that should be patched immediately

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions