Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 41 additions & 6 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1308,15 +1308,13 @@ jobs:
needs: [build-info, wait-for-ci-images]
env:
RUNS_ON: "${{needs.build-info.outputs.runs-on}}"
PARALLEL_TEST_TYPES: "Quarantined"
TEST_TYPE: "Quarantined"
PR_LABELS: "${{needs.build-info.outputs.pull-request-labels}}"
PYTHON_MAJOR_MINOR_VERSION: "${{needs.build-info.outputs.default-python-version}}"
DEBUG_RESOURCES: "${{needs.build-info.outputs.debug-resources}}"
BACKEND: "sqlite"
BACKEND_VERSION: ""
JOB_ID: "quarantined-${{needs.build-info.outputs.default-python-version}}"
COVERAGE: "${{needs.build-info.outputs.run-coverage}}"
if: needs.build-info.outputs.run-tests == 'true'
if: needs.build-info.outputs.run-tests == 'true' && needs.build-info.outputs.runs-on == 'self-hosted'
steps:
- name: Cleanup repo
shell: bash
Expand All @@ -1329,8 +1327,45 @@ jobs:
Prepare breeze & CI image: ${{needs.build-info.outputs.default-python-version}}:${{env.IMAGE_TAG}}
uses: ./.github/actions/prepare_breeze_and_image
- name: >
Tests: ${{needs.build-info.outputs.default-python-version}}:Quarantined
run: breeze testing tests --run-in-parallel || true
Tests: postgres:${{needs.build-info.outputs.default-python-version}}:Quarantined
run: breeze testing tests || true
env:
BACKEND: "postgres"
BACKEND_VERSION: "11"
POSTGRES_VERSION: "11"
- name: >
Cleaning up ${{needs.build-info.outputs.default-python-version}}:Quarantined
run: breeze down
- name: >
Tests: mysql:${{needs.build-info.outputs.default-python-version}}:Quarantined
run: breeze testing tests || true
env:
BACKEND: "mysql"
BACKEND_VERSION: ${{needs.build-info.outputs.default-mysql-version}}
MYSQL_VERSION: ${{needs.build-info.outputs.default-mysql-version}}
- name: >
Cleaning up ${{needs.build-info.outputs.default-python-version}}:Quarantined
run: breeze down
- name: >
Tests: mssql:${{needs.build-info.outputs.default-python-version}}:Quarantined
run: breeze testing tests || true
env:
BACKEND: "mssql"
BACKEND_VERSION: ${{needs.build-info.outputs.default-mssql-version}}
MSSQL_VERSION: ${{needs.build-info.outputs.default-mssql-version}}
- name: >
Cleaning up ${{needs.build-info.outputs.default-python-version}}:Quarantined
run: breeze down
- name: >
Tests: mssql:${{needs.build-info.outputs.default-python-version}}:Quarantined
run: breeze testing tests || true
env:
BACKEND: "sqlite"
BACKEND_VERSION: ""
MSSQL_VERSION: ""
- name: >
Cleaning up ${{needs.build-info.outputs.default-python-version}}:Quarantined
run: breeze down
- name: >
Post Tests: ${{needs.build-info.outputs.default-python-version}}:Quarantined"
uses: ./.github/actions/post_tests
Expand Down
10 changes: 5 additions & 5 deletions airflow/cli/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -843,7 +843,7 @@ def string_lower_type(val):
("-c", "--concurrency"),
type=int,
help="The number of worker processes",
default=conf.getint("celery", "worker_concurrency", fallback=16),
default=conf.getint("celery", "worker_concurrency"),
)
ARG_CELERY_HOSTNAME = Arg(
("-H", "--celery-hostname"),
Expand All @@ -870,24 +870,24 @@ def string_lower_type(val):
ARG_BROKER_API = Arg(("-a", "--broker-api"), help="Broker API")
ARG_FLOWER_HOSTNAME = Arg(
("-H", "--hostname"),
default=conf.get("celery", "FLOWER_HOST", fallback="0.0.0.0"),
default=conf.get("celery", "FLOWER_HOST"),
help="Set the hostname on which to run the server",
)
ARG_FLOWER_PORT = Arg(
("-p", "--port"),
default=conf.getint("celery", "FLOWER_PORT", fallback=5555),
default=conf.getint("celery", "FLOWER_PORT"),
type=int,
help="The port on which to run the server",
)
ARG_FLOWER_CONF = Arg(("-c", "--flower-conf"), help="Configuration file for flower")
ARG_FLOWER_URL_PREFIX = Arg(
("-u", "--url-prefix"),
default=conf.get("celery", "FLOWER_URL_PREFIX", fallback=""),
default=conf.get("celery", "FLOWER_URL_PREFIX"),
help="URL prefix for Flower",
)
ARG_FLOWER_BASIC_AUTH = Arg(
("-A", "--basic-auth"),
default=conf.get("celery", "FLOWER_BASIC_AUTH", fallback=""),
default=conf.get("celery", "FLOWER_BASIC_AUTH"),
help=(
"Securing Flower with Basic Authentication. "
"Accepts user:password pairs separated by a comma. "
Expand Down
93 changes: 93 additions & 0 deletions airflow/config_templates/pre_2_7_defaults.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# This file contains pre Airflow 2.7, provider defaults for Airflow configuration.
# They are provided as fallback option to older version of the
# providers that might expect them to be present.
#
# NOTE !!!! Please DO NOT modify values in the file even if they change in corresponding
# providers. The values here should be treated as "read only" and should not be modified
# even if defaults in newer versions of corresponding Providers change.
# They are only here so that backwards compatible behaviour for old provider
# versions can be maintained.

[atlas]
sasl_enabled = False
host =
port = 21000
username =
password =

[hive]
default_hive_mapred_queue =

[local_kubernetes_executor]
kubernetes_queue = kubernetes

[celery_kubernetes_executor]
kubernetes_queue = kubernetes

[celery]
celery_app_name = airflow.executors.celery_executor
worker_concurrency = 16
worker_prefetch_multiplier = 1
worker_enable_remote_control = true
broker_url = redis://redis:6379/0
result_backend_sqlalchemy_engine_options =
flower_host = 0.0.0.0
flower_url_prefix =
flower_port = 5555
flower_basic_auth =
sync_parallelism = 0
celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
ssl_active = False
ssl_key =
ssl_cert =
ssl_cacert =
pool = prefork
operation_timeout = 1.0
task_track_started = True
task_publish_max_retries = 3
worker_precheck = False

[elasticsearch_configs]
use_ssl = False
verify_certs = True

[kubernetes_executor]
api_client_retry_configuration =
logs_task_metadata = False
pod_template_file =
worker_container_repository =
worker_container_tag =
namespace = default
delete_worker_pods = True
delete_worker_pods_on_failure = False
worker_pods_creation_batch_size = 1
multi_namespace_mode = False
multi_namespace_mode_namespace_list =
in_cluster = True
kube_client_request_args =
delete_option_kwargs =
enable_tcp_keepalive = True
tcp_keep_idle = 120
tcp_keep_intvl = 30
tcp_keep_cnt = 6
verify_ssl = True
worker_pods_queued_check_interval = 60
ssl_ca_cert =
27 changes: 26 additions & 1 deletion airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ def retrieve_configuration_description(
:param include_airflow: Include Airflow configs
:param include_providers: Include provider configs
:param selected_provider: If specified, include selected provider only
:param config_file_name: name of the file in "config_templates" directory to read default config from
:return: Python dictionary containing configs & their info
"""
base_configuration_description: dict[str, dict[str, Any]] = {}
Expand Down Expand Up @@ -208,6 +207,7 @@ def __init__(
# interpolation placeholders. The _default_values config parser will interpolate them
# properly when we call get() on it.
self._default_values = create_default_config_parser(self.configuration_description)
self._pre_2_7_default_values = create_pre_2_7_defaults()
if default_config is not None:
self._update_defaults_from_string(default_config)
self._update_logging_deprecated_template_to_one_from_defaults()
Expand Down Expand Up @@ -287,6 +287,10 @@ def get_default_value(self, section: str, key: str, fallback: Any = None, raw=Fa
return value.replace("%", "%%")
return value

def get_default_pre_2_7_value(self, section: str, key: str, **kwargs) -> Any:
"""Get pre 2.7 default config values."""
return self._pre_2_7_default_values.get(section, key, fallback=None, **kwargs)

# These configuration elements can be fetched as the stdout of commands
# following the "{section}__{name}_cmd" pattern, the idea behind this
# is to not store password on boxes in text files.
Expand Down Expand Up @@ -1043,6 +1047,11 @@ def get( # type: ignore[override,misc]
# ...then the default config
if self.get_default_value(section, key) is not None or "fallback" in kwargs:
return expand_env_var(self.get_default_value(section, key, **kwargs))

if self.get_default_pre_2_7_value(section, key) is not None:
# no expansion needed
return self.get_default_pre_2_7_value(section, key, **kwargs)

if not suppress_warnings:
log.warning("section/key [%s/%s] not found in config", section, key)

Expand Down Expand Up @@ -1402,7 +1411,10 @@ def as_dict(
)

config_sources: ConfigSourcesType = {}

# We check sequentially all those sources and the last one we saw it in will "win"
configs: Iterable[tuple[str, ConfigParser]] = [
("default-pre-2-7", self._pre_2_7_default_values),
("default", self._default_values),
("airflow.cfg", self),
]
Expand Down Expand Up @@ -1735,6 +1747,7 @@ def _replace_section_config_with_display_sources(
if display_source:
updated_source_name = source_name
if source_name == "default":
# defaults can come from other sources (default-<PROVIDER>) that should be used here
source_description_section = configuration_description.get(section, {})
source_description_key = source_description_section.get("options", {}).get(k, {})
if source_description_key is not None:
Expand Down Expand Up @@ -1929,6 +1942,18 @@ def create_default_config_parser(configuration_description: dict[str, dict[str,
return parser


def create_pre_2_7_defaults() -> ConfigParser:
"""
Creates parser using the old defaults from Airflow < 2.7.0, in order to be able to fall-back to those
defaults when old version of provider, not supporting "config contribution" is installed with Airflow
2.7.0+. This "default" configuration does not support variable expansion, those are pretty much
hard-coded defaults we want to fall-back to in such case.
"""
config_parser = ConfigParser()
config_parser.read(_default_config_file_path("pre_2_7_defaults.cfg"))
return config_parser


def write_default_airflow_configuration_if_needed() -> AirflowConfigParser:
if not os.path.isfile(AIRFLOW_CONFIG):
log.debug("Creating new Airflow config file in: %s", AIRFLOW_CONFIG)
Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ def _get_pod_namespace(ti: TaskInstance):
namespace = None
with suppress(Exception):
namespace = pod_override.metadata.namespace
return namespace or conf.get("kubernetes_executor", "namespace", fallback="default")
return namespace or conf.get("kubernetes_executor", "namespace")

def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], list[str]]:
messages = []
Expand Down
2 changes: 1 addition & 1 deletion airflow/kubernetes/kubernetes_helper_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def annotations_to_key(annotations: dict[str, str]) -> TaskInstanceKey:

@cache
def get_logs_task_metadata() -> bool:
return conf.getboolean("kubernetes_executor", "logs_task_metadata", fallback=False)
return conf.getboolean("kubernetes_executor", "logs_task_metadata")


def annotations_for_logging_task_metadata(annotation_set):
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/celery/executors/celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def __init__(self):
self.bulk_state_fetcher = BulkStateFetcher(self._sync_parallelism)
self.tasks = {}
self.task_publish_retries: Counter[TaskInstanceKey] = Counter()
self.task_publish_max_retries = conf.getint("celery", "task_publish_max_retries", fallback=3)
self.task_publish_max_retries = conf.getint("celery", "task_publish_max_retries")

def start(self) -> None:
self.log.debug("Starting Celery Executor using %s processes for syncing", self._sync_parallelism)
Expand Down
24 changes: 3 additions & 21 deletions airflow/providers/celery/executors/celery_executor_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,36 +57,18 @@

TaskInstanceInCelery = Tuple[TaskInstanceKey, CommandType, Optional[str], Task]

# IMPORTANT NOTE! Celery Executor has initialization done dynamically and it performs initialization when
# it is imported, so we need fallbacks here in order to be able to import the class directly without
# having configuration initialized before. Do not remove those fallbacks!
#
# This is not strictly needed for production:
#
# * for Airflow 2.6 and before the defaults will come from the core defaults
# * for Airflow 2.7+ the defaults will be loaded via ProvidersManager
#
# But it helps in our tests to import the executor class and validate if the celery code can be imported
# in the current and older versions of Airflow.

OPERATION_TIMEOUT = conf.getfloat("celery", "operation_timeout", fallback=1.0)
OPERATION_TIMEOUT = conf.getfloat("celery", "operation_timeout")

# Make it constant for unit test.
CELERY_FETCH_ERR_MSG_HEADER = "Error fetching Celery task state"

if conf.has_option("celery", "celery_config_options"):
celery_configuration = conf.getimport(
"celery",
"celery_config_options",
fallback="airflow.providers.celery.executors.default_celery.DEFAULT_CELERY_CONFIG",
)
celery_configuration = conf.getimport("celery", "celery_config_options")

else:
celery_configuration = DEFAULT_CELERY_CONFIG

celery_app_name = conf.get(
"celery", "CELERY_APP_NAME", fallback="airflow.providers.celery.executors.celery_executor"
)
celery_app_name = conf.get("celery", "CELERY_APP_NAME")
if celery_app_name == "airflow.executors.celery_executor":
warnings.warn(
"The celery.CELERY_APP_NAME configuration uses deprecated package name: "
Expand Down
2 changes: 1 addition & 1 deletion airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ def validate_session():
"""Validate ORM Session."""
global engine

worker_precheck = conf.getboolean("celery", "worker_precheck", fallback=False)
worker_precheck = conf.getboolean("celery", "worker_precheck")
if not worker_precheck:
return True
else:
Expand Down
2 changes: 1 addition & 1 deletion airflow/utils/log/file_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ def _get_pod_namespace(ti: TaskInstance):
namespace = None
with suppress(Exception):
namespace = pod_override.metadata.namespace
return namespace or conf.get("kubernetes_executor", "namespace", fallback="default")
return namespace or conf.get("kubernetes_executor", "namespace")

def _get_log_retrieval_url(
self, ti: TaskInstance, log_relative_path: str, log_type: LogType | None = None
Expand Down
2 changes: 2 additions & 0 deletions dev/breeze/src/airflow_breeze/commands/testing_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,12 +331,14 @@ def run_tests_in_parallel(
"should be run: `Providers[airbyte,http]` or "
"excluded from the full test suite: `Providers[-amazon,google]`",
default="All",
envvar="TEST_TYPE",
type=NotVerifiedBetterChoice(ALLOWED_TEST_TYPE_CHOICES),
)
@click.option(
"--test-timeout",
help="Test timeout. Set the pytest setup, execution and teardown timeouts to this value",
default=60,
envvar="TEST_TIMEOUT",
type=IntRange(min=0),
show_default=True,
)
Expand Down
4 changes: 2 additions & 2 deletions images/breeze/output-commands-hash.txt
Original file line number Diff line number Diff line change
Expand Up @@ -66,5 +66,5 @@ static-checks:f9ec0d7edaba84180403d95469d94ea0
testing:docker-compose-tests:0c810047fc66a0cfe91119e2d08b3507
testing:helm-tests:8e491da2e01ebd815322c37562059d77
testing:integration-tests:486e4d91449ecdb7630ef2a470d705a3
testing:tests:e18fbd845ca5783879244bf0f9f9c51e
testing:f360a66d2b1b4c39880117ae6166007f
testing:tests:3c202e65824e405269e78f58936980e0
testing:68a089bc30e0e60f834f205df1e9f086
Loading