-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Closed
Labels
area:Schedulerincluding HA (high availability) schedulerincluding HA (high availability) schedulerarea:corekind:bugThis is a clearly a bugThis is a clearly a bugpriority:criticalShowstopper bug that should be patched immediatelyShowstopper bug that should be patched immediately
Milestone
Description
Apache Airflow version
3.0.0b1
If "Other Airflow 2 version" selected, which one?
No response
What happened?
Scheduler is crasing while using PythonVirtualenvOperator.
Error:
File "/opt/airflow/airflow/cli/commands/local_commands/daemon_utils.py", line 86, in run_command_with_daemon_option
callback()
File "/opt/airflow/airflow/cli/commands/local_commands/scheduler_command.py", line 55, in <lambda>
callback=lambda: _run_scheduler_job(args),
File "/opt/airflow/airflow/cli/commands/local_commands/scheduler_command.py", line 43, in _run_scheduler_job
run_job(job=job_runner.job, execute_callable=job_runner._execute)
File "/opt/airflow/airflow/utils/session.py", line 101, in wrapper
return func(*args, session=session, **kwargs)
File "/opt/airflow/airflow/jobs/job.py", line 342, in run_job
return execute_job(job, execute_callable=execute_callable)
File "/opt/airflow/airflow/jobs/job.py", line 371, in execute_job
ret = execute_callable()
File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 926, in _execute
self._run_scheduler_loop()
File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 1051, in _run_scheduler_loop
executor.heartbeat()
File "/opt/airflow/airflow/traces/tracer.py", line 54, in wrapper
return func(*args, **kwargs)
File "/opt/airflow/airflow/executors/base_executor.py", line 252, in heartbeat
self.trigger_tasks(open_slots)
File "/opt/airflow/airflow/traces/tracer.py", line 54, in wrapper
return func(*args, **kwargs)
File "/opt/airflow/airflow/executors/base_executor.py", line 409, in trigger_tasks
self._process_workloads(workloads) # type: ignore[attr-defined]
File "/opt/airflow/providers/celery/src/airflow/providers/celery/executors/celery_executor.py", line 281, in _process_workloads
self._send_tasks(tasks)
File "/opt/airflow/providers/celery/src/airflow/providers/celery/executors/celery_executor.py", line 290, in _send_tasks
key_and_async_results = self._send_tasks_to_celery(task_tuples_to_send)
File "/opt/airflow/providers/celery/src/airflow/providers/celery/executors/celery_executor.py", line 329, in _send_tasks_to_celery
return list(map(send_task_to_executor, task_tuples_to_send))
File "/opt/airflow/providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py", line 266, in send_task_to_executor
args = (args.model_dump_json(),)
File "/usr/local/lib/python3.9/site-packages/pydantic/main.py", line 477, in model_dump_json
return self.__pydantic_serializer__.to_json(
pydantic_core._pydantic_core.PydanticSerializationError: Unable to serialize unknown type: <class 'kubernetes.client.models.v1_pod.V1Pod'>What you think should happen instead?
PythonVirtualenvOperator should not cause scheduler to crash.
How to reproduce
Run the below DAG:
from airflow.models import DAG
from airflow.providers.standard.operators.python import PythonVirtualenvOperator
from pendulum import today
from kubernetes.client import models as k8s
def callable_virtualenv():
"""
Example function that will be performed in a virtual environment.
Importing at the module level ensures that it will not attempt to import the
library before it is installed.
"""
from time import sleep
from colorama import Back, Fore, Style
print(Fore.RED + "some red text")
print(Back.GREEN + "and with a green background")
print(Style.DIM + "and in dim text")
print(Style.RESET_ALL)
for _ in range(10):
print(Style.DIM + "Please wait...", flush=True)
sleep(10)
print("Finished")
with DAG(
dag_id="virtualenv_python_operator",
default_args={"owner": "airflow"},
schedule=None,
start_date=today('UTC').add(days=-2),
tags=["core"],
) as dag:
task = PythonVirtualenvOperator(
task_id="virtualenv_python",
python_callable=callable_virtualenv,
requirements=["colorama==0.4.0"],
system_site_packages=False,
executor_config={
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
resources=k8s.V1ResourceRequirements(
requests={
"cpu": "100m",
"memory": "384Mi",
},
limits={
"cpu": 1,
"memory": "500Mi",
}
)
)
]
)
)
}
)Operating System
Linux
Versions of Apache Airflow Providers
No response
Deployment
Other
Deployment details
No response
Anything else?
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct
Metadata
Metadata
Assignees
Labels
area:Schedulerincluding HA (high availability) schedulerincluding HA (high availability) schedulerarea:corekind:bugThis is a clearly a bugThis is a clearly a bugpriority:criticalShowstopper bug that should be patched immediatelyShowstopper bug that should be patched immediately