Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 60 additions & 4 deletions airflow/cli/commands/config_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,27 +84,30 @@ class ConfigChange:
:param config: The configuration parameter being changed.
:param suggestion: A suggestion for replacing or handling the removed configuration.
:param renamed_to: The new section and option if the configuration is renamed.
:param was_deprecated: If the config is removed, whether the old config was deprecated.
"""

config: ConfigParameter
suggestion: str = ""
renamed_to: ConfigParameter | None = None
was_deprecated: bool = True

@property
def message(self) -> str:
"""Generate a message for this configuration change."""
if self.renamed_to:
if self.config.section != self.renamed_to.section:
return (
f"`{self.config.option}` configuration parameter moved from `{self.config.section}` section to `"
f"{self.renamed_to.section}` section as `{self.renamed_to.option}`."
f"`{self.config.option}` configuration parameter moved from `{self.config.section}` section to "
f"`{self.renamed_to.section}` section as `{self.renamed_to.option}`."
)
return (
f"`{self.config.option}` configuration parameter renamed to `{self.renamed_to.option}` "
f"in the `{self.config.section}` section."
)
return (
f"Removed deprecated `{self.config.option}` configuration parameter from `{self.config.section}` section. "
f"Removed{' deprecated' if self.was_deprecated else ''} `{self.config.option}` configuration parameter "
f"from `{self.config.section}` section. "
f"{self.suggestion}"
)

Expand Down Expand Up @@ -199,6 +202,16 @@ def message(self) -> str:
),
ConfigChange(config=ConfigParameter("core", "task_runner")),
ConfigChange(config=ConfigParameter("core", "enable_xcom_pickling")),
ConfigChange(
config=ConfigParameter("core", "dag_file_processor_timeout"),
renamed_to=ConfigParameter("dag_processor", "dag_file_processor_timeout"),
),
ConfigChange(
config=ConfigParameter("core", "dag_processor_manager_log_location"),
),
ConfigChange(
config=ConfigParameter("core", "log_processor_filename_template"),
),
# api
ConfigChange(
config=ConfigParameter("api", "access_control_allow_origin"),
Expand All @@ -214,6 +227,18 @@ def message(self) -> str:
suggestion="Remove TaskContextLogger: Replaced by the Log table for better handling of task log "
"messages outside the execution context.",
),
ConfigChange(
config=ConfigParameter("logging", "dag_processor_manager_log_location"),
was_deprecated=False,
),
ConfigChange(
config=ConfigParameter("logging", "dag_processor_manager_log_stdout"),
was_deprecated=False,
),
ConfigChange(
config=ConfigParameter("logging", "log_processor_filename_template"),
was_deprecated=False,
),
# metrics
ConfigChange(
config=ConfigParameter("metrics", "metrics_use_pattern_match"),
Expand Down Expand Up @@ -276,6 +301,9 @@ def message(self) -> str:
ConfigChange(
config=ConfigParameter("scheduler", "dependency_detector"),
),
ConfigChange(
config=ConfigParameter("scheduler", "allow_trigger_in_future"),
),
ConfigChange(
config=ConfigParameter("scheduler", "processor_poll_interval"),
renamed_to=ConfigParameter("scheduler", "scheduler_idle_sleep_time"),
Expand All @@ -289,7 +317,7 @@ def message(self) -> str:
),
ConfigChange(
config=ConfigParameter("scheduler", "max_threads"),
renamed_to=ConfigParameter("scheduler", "parsing_processes"),
renamed_to=ConfigParameter("dag_processor", "parsing_processes"),
),
ConfigChange(
config=ConfigParameter("scheduler", "statsd_host"),
Expand Down Expand Up @@ -327,6 +355,34 @@ def message(self) -> str:
config=ConfigParameter("scheduler", "statsd_custom_client_path"),
renamed_to=ConfigParameter("metrics", "statsd_custom_client_path"),
),
ConfigChange(
config=ConfigParameter("scheduler", "parsing_processes"),
renamed_to=ConfigParameter("dag_processor", "parsing_processes"),
),
ConfigChange(
config=ConfigParameter("scheduler", "file_parsing_sort_mode"),
renamed_to=ConfigParameter("dag_processor", "file_parsing_sort_mode"),
),
ConfigChange(
config=ConfigParameter("scheduler", "max_callbacks_per_loop"),
renamed_to=ConfigParameter("dag_processor", "max_callbacks_per_loop"),
),
ConfigChange(
config=ConfigParameter("scheduler", "min_file_process_interval"),
renamed_to=ConfigParameter("dag_processor", "min_file_process_interval"),
),
ConfigChange(
config=ConfigParameter("scheduler", "stale_dag_threshold"),
renamed_to=ConfigParameter("dag_processor", "stale_dag_threshold"),
),
ConfigChange(
config=ConfigParameter("scheduler", "print_stats_interval"),
renamed_to=ConfigParameter("dag_processor", "print_stats_interval"),
),
ConfigChange(
config=ConfigParameter("scheduler", "dag_dir_list_interval"),
renamed_to=ConfigParameter("dag_processor", "refresh_interval"),
),
# celery
ConfigChange(
config=ConfigParameter("celery", "stalled_task_timeout"),
Expand Down
Loading