Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2911,15 +2911,6 @@ components:
description: |
The last time the DAG was parsed.

*New in version 2.3.0*
last_pickled:
type: string
format: date-time
readOnly: true
nullable: true
description: |
The last time the DAG was pickled.

*New in version 2.3.0*
last_expired:
type: string
Expand Down
1 change: 0 additions & 1 deletion airflow/api_connexion/schemas/dag_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ class Meta:
is_paused = auto_field()
is_active = auto_field(dump_only=True)
last_parsed_time = auto_field(dump_only=True)
last_pickled = auto_field(dump_only=True)
last_expired = auto_field(dump_only=True)
default_view = auto_field(dump_only=True)
fileloc = auto_field(dump_only=True)
Expand Down
42 changes: 0 additions & 42 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2370,24 +2370,12 @@ components:
format: date-time
- type: 'null'
title: Last Parsed Time
last_pickled:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Last Pickled
last_expired:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Last Expired
pickle_id:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Pickle Id
default_view:
anyOf:
- type: string
Expand Down Expand Up @@ -2541,9 +2529,7 @@ components:
- is_paused
- is_active
- last_parsed_time
- last_pickled
- last_expired
- pickle_id
- default_view
- fileloc
- description
Expand Down Expand Up @@ -2606,24 +2592,12 @@ components:
format: date-time
- type: 'null'
title: Last Parsed Time
last_pickled:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Last Pickled
last_expired:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Last Expired
pickle_id:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Pickle Id
default_view:
anyOf:
- type: string
Expand Down Expand Up @@ -2710,9 +2684,7 @@ components:
- is_paused
- is_active
- last_parsed_time
- last_pickled
- last_expired
- pickle_id
- default_view
- fileloc
- description
Expand Down Expand Up @@ -2976,24 +2948,12 @@ components:
format: date-time
- type: 'null'
title: Last Parsed Time
last_pickled:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Last Pickled
last_expired:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Last Expired
pickle_id:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Pickle Id
default_view:
anyOf:
- type: string
Expand Down Expand Up @@ -3085,9 +3045,7 @@ components:
- is_paused
- is_active
- last_parsed_time
- last_pickled
- last_expired
- pickle_id
- default_view
- fileloc
- description
Expand Down
2 changes: 0 additions & 2 deletions airflow/api_fastapi/core_api/serializers/dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@ class DAGResponse(BaseModel):
is_paused: bool
is_active: bool
last_parsed_time: datetime | None
last_pickled: datetime | None
last_expired: datetime | None
pickle_id: datetime | None
default_view: str | None
fileloc: str
description: str | None
Expand Down
18 changes: 0 additions & 18 deletions airflow/cli/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -576,10 +576,6 @@ def string_lower_type(val):
choices={"check", "ignore", "wait"},
default="check",
)
ARG_SHIP_DAG = Arg(
("--ship-dag",), help="Pickles (serializes) the DAG and ships it to the worker", action="store_true"
)
ARG_PICKLE = Arg(("-p", "--pickle"), help="Serialized pickle object of the entire dag (used internally)")
ARG_CFG_PATH = Arg(("--cfg-path",), help="Path to config file to use instead of airflow.cfg")
ARG_MAP_INDEX = Arg(("--map-index",), type=int, default=-1, help="Mapped task index")
ARG_READ_FROM_DB = Arg(("--read-from-db",), help="Read dag from DB instead of dag file", action="store_true")
Expand Down Expand Up @@ -795,16 +791,6 @@ def string_lower_type(val):
type=int,
help="Set the number of runs to execute before exiting",
)
ARG_DO_PICKLE = Arg(
("-p", "--do-pickle"),
default=False,
help=(
"Attempt to pickle the DAG object to send over "
"to the workers, instead of letting workers run their version "
"of the code"
),
action="store_true",
)

ARG_WITHOUT_MINGLE = Arg(
("--without-mingle",),
Expand Down Expand Up @@ -1351,8 +1337,6 @@ class GroupCommand(NamedTuple):
ARG_IGNORE_ALL_DEPENDENCIES,
ARG_IGNORE_DEPENDENCIES,
ARG_DEPENDS_ON_PAST,
ARG_SHIP_DAG,
ARG_PICKLE,
ARG_INTERACTIVE,
ARG_SHUT_DOWN_LOGGING,
ARG_MAP_INDEX,
Expand Down Expand Up @@ -1968,7 +1952,6 @@ class GroupCommand(NamedTuple):
args=(
ARG_SUBDIR,
ARG_NUM_RUNS,
ARG_DO_PICKLE,
ARG_PID,
ARG_DAEMON,
ARG_STDOUT,
Expand Down Expand Up @@ -2010,7 +1993,6 @@ class GroupCommand(NamedTuple):
ARG_DAEMON,
ARG_SUBDIR,
ARG_NUM_RUNS,
ARG_DO_PICKLE,
ARG_STDOUT,
ARG_STDERR,
ARG_LOG_FILE,
Expand Down
1 change: 0 additions & 1 deletion airflow/cli/commands/dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,6 @@ def _get_dagbag_dag_details(dag: DAG) -> dict:
"is_paused": dag.get_is_paused(),
"is_active": dag.get_is_active(),
"last_parsed_time": None,
"last_pickled": None,
"last_expired": None,
"default_view": dag.default_view,
"fileloc": dag.fileloc,
Expand Down
1 change: 0 additions & 1 deletion airflow/cli/commands/dag_processor_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ def _create_dag_processor_job_runner(args: Any) -> DagProcessorJobRunner:
dag_directory=args.subdir,
max_runs=args.num_runs,
dag_ids=[],
pickle_dags=args.do_pickle,
),
)

Expand Down
4 changes: 1 addition & 3 deletions airflow/cli/commands/scheduler_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@


def _run_scheduler_job(args) -> None:
job_runner = SchedulerJobRunner(
job=Job(), subdir=process_subdir(args.subdir), num_runs=args.num_runs, do_pickle=args.do_pickle
)
job_runner = SchedulerJobRunner(job=Job(), subdir=process_subdir(args.subdir), num_runs=args.num_runs)
ExecutorLoader.validate_database_executor_compatibility(job_runner.job.executor.__class__)
enable_health_check = conf.getboolean("scheduler", "ENABLE_HEALTH_CHECK")
with _serve_logs(args.skip_serve_logs), _serve_health_check(enable_health_check):
Expand Down
27 changes: 3 additions & 24 deletions airflow/cli/commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
from airflow.jobs.job import Job, run_job
from airflow.jobs.local_task_job_runner import LocalTaskJobRunner
from airflow.listeners.listener import get_listener_manager
from airflow.models import DagPickle, TaskInstance
from airflow.models import TaskInstance
from airflow.models.dag import DAG, _run_inline_trigger
from airflow.models.dagrun import DagRun
from airflow.models.param import ParamsDict
Expand All @@ -56,7 +56,6 @@
from airflow.utils.cli import (
get_dag,
get_dag_by_file_location,
get_dag_by_pickle,
get_dags,
should_ignore_depends_on_past,
suppress_logs_and_warning,
Expand Down Expand Up @@ -266,20 +265,6 @@ def _run_task_by_executor(args, dag: DAG, ti: TaskInstance) -> None:

This can result in the task being started by another host if the executor implementation does.
"""
pickle_id = None
if args.ship_dag:
try:
# Running remotely, so pickling the DAG
with create_session() as session:
pickle = DagPickle(dag)
session.add(pickle)
pickle_id = pickle.id
# TODO: This should be written to a log
print(f"Pickled dag {dag} as pickle_id: {pickle_id}")
except Exception as e:
print("Could not pickle the DAG")
print(e)
raise e
if ti.executor:
executor = ExecutorLoader.load_executor(ti.executor)
else:
Expand All @@ -290,7 +275,6 @@ def _run_task_by_executor(args, dag: DAG, ti: TaskInstance) -> None:
executor.queue_task_instance(
ti,
mark_success=args.mark_success,
pickle_id=pickle_id,
ignore_all_deps=args.ignore_all_dependencies,
ignore_depends_on_past=should_ignore_depends_on_past(args),
wait_for_past_depends_before_skipping=(args.depends_on_past == "wait"),
Expand All @@ -311,7 +295,6 @@ def _run_task_by_local_task_job(args, ti: TaskInstance | TaskInstancePydantic) -
job=Job(dag_id=ti.dag_id),
task_instance=ti,
mark_success=args.mark_success,
pickle_id=args.pickle,
ignore_all_deps=args.ignore_all_dependencies,
ignore_depends_on_past=should_ignore_depends_on_past(args),
wait_for_past_depends_before_skipping=(args.depends_on_past == "wait"),
Expand Down Expand Up @@ -435,8 +418,7 @@ def task_run(args, dag: DAG | None = None) -> TaskReturnCode | None:
f"You provided the option {unsupported_flags}. "
"Delete it to execute the command."
)
if dag and args.pickle:
raise AirflowException("You cannot use the --pickle option when using DAG.cli() method.")

if args.cfg_path:
with open(args.cfg_path) as conf_file:
conf_dict = json.load(conf_file)
Expand All @@ -451,10 +433,7 @@ def task_run(args, dag: DAG | None = None) -> TaskReturnCode | None:

get_listener_manager().hook.on_starting(component=TaskCommandMarker())

if args.pickle:
print(f"Loading pickle id: {args.pickle}")
_dag = get_dag_by_pickle(args.pickle)
elif not dag:
if not dag:
_dag = get_dag(args.subdir, args.dag_id, args.read_from_db)
else:
_dag = dag
Expand Down
13 changes: 1 addition & 12 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ class DagFileProcessorAgent(LoggingMixin, MultiprocessingStartMethodMixin):
for unlimited.
:param processor_timeout: How long to wait before timing out a DAG file processor
:param dag_ids: if specified, only schedule tasks with these DAG IDs
:param pickle_dags: whether to pickle DAGs.
:param async_mode: Whether to start agent in async mode
"""

Expand All @@ -127,15 +126,13 @@ def __init__(
max_runs: int,
processor_timeout: timedelta,
dag_ids: list[str] | None,
pickle_dags: bool,
async_mode: bool,
):
super().__init__()
self._dag_directory: os.PathLike = dag_directory
self._max_runs = max_runs
self._processor_timeout = processor_timeout
self._dag_ids = dag_ids
self._pickle_dags = pickle_dags
self._async_mode = async_mode
# Map from file path to the processor
self._processors: dict[str, DagFileProcessorProcess] = {}
Expand Down Expand Up @@ -163,7 +160,6 @@ def start(self) -> None:
self._processor_timeout,
child_signal_conn,
self._dag_ids,
self._pickle_dags,
self._async_mode,
),
)
Expand Down Expand Up @@ -223,7 +219,6 @@ def _run_processor_manager(
processor_timeout: timedelta,
signal_conn: MultiprocessingConnection,
dag_ids: list[str] | None,
pickle_dags: bool,
async_mode: bool,
) -> None:
# Make this process start as a new process group - that makes it easy
Expand All @@ -240,7 +235,6 @@ def _run_processor_manager(
max_runs=max_runs,
processor_timeout=processor_timeout,
dag_ids=dag_ids,
pickle_dags=pickle_dags,
signal_conn=signal_conn,
async_mode=async_mode,
)
Expand Down Expand Up @@ -353,7 +347,6 @@ class DagFileProcessorManager(LoggingMixin):
:param processor_timeout: How long to wait before timing out a DAG file processor
:param signal_conn: connection to communicate signal with processor agent.
:param dag_ids: if specified, only schedule tasks with these DAG IDs
:param pickle_dags: whether to pickle DAGs.
:param async_mode: whether to start the manager in async mode
"""

Expand All @@ -372,7 +365,6 @@ def __init__(
max_runs: int,
processor_timeout: timedelta,
dag_ids: list[str] | None,
pickle_dags: bool,
signal_conn: MultiprocessingConnection | None = None,
async_mode: bool = True,
):
Expand All @@ -383,7 +375,6 @@ def __init__(
self._max_runs = max_runs
# signal_conn is None for dag_processor_standalone mode.
self._direct_scheduler_conn = signal_conn
self._pickle_dags = pickle_dags
self._dag_ids = dag_ids
self._async_mode = async_mode
self._parsing_start_time: float | None = None
Expand Down Expand Up @@ -1191,11 +1182,10 @@ def collect_results(self) -> None:
self.log.debug("%s file paths queued for processing", len(self._file_path_queue))

@staticmethod
def _create_process(file_path, pickle_dags, dag_ids, dag_directory, callback_requests):
def _create_process(file_path, dag_ids, dag_directory, callback_requests):
"""Create DagFileProcessorProcess instance."""
return DagFileProcessorProcess(
file_path=file_path,
pickle_dags=pickle_dags,
dag_ids=dag_ids,
dag_directory=dag_directory,
callback_requests=callback_requests,
Expand All @@ -1217,7 +1207,6 @@ def start_new_processes(self):
callback_to_execute_for_file = self._callback_to_execute[file_path]
processor = self._create_process(
file_path,
self._pickle_dags,
self._dag_ids,
self.get_dag_directory(),
callback_to_execute_for_file,
Expand Down
Loading