Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions airflow/cli/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ def string_lower_type(val):
# database
ARG_MIGRATION_TIMEOUT = Arg(
("-t", "--migration-wait-timeout"),
help="timeout to wait for db to migrate ",
help="timeout to wait for db to migrate",
type=int,
default=60,
)
Expand Down Expand Up @@ -954,7 +954,7 @@ def string_lower_type(val):
("--limit",),
default=1,
type=positive_int(allow_zero=True),
help="The number of recent jobs that will be checked. To disable limit, set 0. ",
help="The number of recent jobs that will be checked. To disable limit, set 0.",
)

ARG_ALLOW_MULTIPLE = Arg(
Expand Down
2 changes: 1 addition & 1 deletion airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ def start(self):

set_new_process_group()

self.log.info("Processing files using up to %s processes at a time ", self._parallelism)
self.log.info("Processing files using up to %s processes at a time", self._parallelism)
self.log.info("Process each file at most once every %s seconds", self._file_process_interval)
self.log.info(
"Checking for new files in %s every %s seconds", self._dag_directory, self.dag_dir_list_interval
Expand Down
4 changes: 2 additions & 2 deletions airflow/kubernetes/pre_7_4_0_compatibility/pod_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def __init__(

def gen_pod(self) -> k8s.V1Pod:
"""Generate pod."""
warnings.warn("This function is deprecated. ", RemovedInAirflow3Warning)
warnings.warn("This function is deprecated.", RemovedInAirflow3Warning)
result = self.ud_pod

result.metadata.name = add_pod_suffix(pod_name=result.metadata.name)
Expand Down Expand Up @@ -220,7 +220,7 @@ def from_obj(obj) -> dict | k8s.V1Pod | None:
warnings.warn(
"Using a dictionary for the executor_config is deprecated and will soon be removed."
'please use a `kubernetes.client.models.V1Pod` class with a "pod_override" key'
" instead. ",
" instead.",
category=RemovedInAirflow3Warning,
)
return PodGenerator.from_legacy_obj(obj)
Expand Down
6 changes: 3 additions & 3 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -552,14 +552,14 @@ def __init__(
if schedule_interval is not NOTSET:
warnings.warn(
"Param `schedule_interval` is deprecated and will be removed in a future release. "
"Please use `schedule` instead. ",
"Please use `schedule` instead.",
RemovedInAirflow3Warning,
stacklevel=2,
)
if timetable is not None:
warnings.warn(
"Param `timetable` is deprecated and will be removed in a future release. "
"Please use `schedule` instead. ",
"Please use `schedule` instead.",
RemovedInAirflow3Warning,
stacklevel=2,
)
Expand Down Expand Up @@ -3680,7 +3680,7 @@ def deactivate_deleted_dags(
:param processor_subdir: dag processor subdir
:param session: ORM Session
"""
log.debug("Deactivating DAGs (for which DAG files are deleted) from %s table ", cls.__tablename__)
log.debug("Deactivating DAGs (for which DAG files are deleted) from %s table", cls.__tablename__)
dag_models = session.scalars(
select(cls).where(
cls.fileloc.is_not(None),
Expand Down
2 changes: 1 addition & 1 deletion airflow/models/serialized_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ def remove_deleted_dags(
alive_fileloc_hashes = [DagCode.dag_fileloc_hash(fileloc) for fileloc in alive_dag_filelocs]

log.debug(
"Deleting Serialized DAGs (for which DAG files are deleted) from %s table ", cls.__tablename__
"Deleting Serialized DAGs (for which DAG files are deleted) from %s table", cls.__tablename__
)

session.execute(
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/apache/flink/sensors/flink_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def _log_driver(self, application_state: str, response: dict) -> None:
for task_manager in all_pods.items:
task_manager_pod_name = task_manager.metadata.name

self.log.info("Starting logging of task manager pod %s ", task_manager_pod_name)
self.log.info("Starting logging of task manager pod %s", task_manager_pod_name)
try:
log = ""
for line in self.hook.get_pod_logs(task_manager_pod_name, namespace=namespace):
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/apache/kafka/operators/consume.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def __init__(

if self.max_messages and self.max_batch_size > self.max_messages:
self.log.warning(
"max_batch_size (%s) > max_messages (%s). Setting max_messages to %s ",
"max_batch_size (%s) > max_messages (%s). Setting max_messages to %s",
self.max_batch_size,
self.max_messages,
self.max_batch_size,
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/apache/kylin/operators/kylin_cube.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def execute(self, context: Context):
time.sleep(self.interval)

job_status = _hook.get_job_status(job_id)
self.log.info("Kylin job status is %s ", job_status)
self.log.info("Kylin job status is %s", job_status)
if job_status in self.jobs_error_status:
raise AirflowException(f"Kylin job {job_id} status {job_status} is error ")

Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/cncf/kubernetes/pod_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def __init__(

def gen_pod(self) -> k8s.V1Pod:
"""Generates pod."""
warnings.warn("This function is deprecated. ", RemovedInAirflow3Warning)
warnings.warn("This function is deprecated.", RemovedInAirflow3Warning)
result = self.ud_pod

result.metadata.name = add_pod_suffix(pod_name=result.metadata.name)
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/hooks/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ def create_empty_dataset(
dataset_reference["location"] = dataset_reference.get("location", location)

dataset: Dataset = Dataset.from_api_repr(dataset_reference)
self.log.info("Creating dataset: %s in project: %s ", dataset.dataset_id, dataset.project)
self.log.info("Creating dataset: %s in project: %s", dataset.dataset_id, dataset.project)
dataset_object = self.get_client(project_id=project_id, location=location).create_dataset(
dataset=dataset, exists_ok=exists_ok
)
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/hooks/mlengine.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ def cancel_job(
return request.execute(num_retries=self.num_retries)
except HttpError as e:
if e.resp.status == 404:
self.log.error("Job with job_id %s does not exist. ", job_id)
self.log.error("Job with job_id %s does not exist.", job_id)
raise
elif e.resp.status == 400:
self.log.info("Job with job_id %s is already complete, cancellation aborted.", job_id)
Expand Down
6 changes: 3 additions & 3 deletions airflow/providers/google/cloud/operators/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ def execute_complete(self, context: Context, event: dict[str, Any]) -> None:
if event["status"] == "error":
raise AirflowException(event["message"])
self.log.info(
"%s completed with response %s ",
"%s completed with response %s",
self.task_id,
event["message"],
)
Expand Down Expand Up @@ -609,7 +609,7 @@ def execute_complete(self, context: Context, event: dict[str, Any]) -> None:
if event["status"] == "error":
raise AirflowException(event["message"])
self.log.info(
"%s completed with response %s ",
"%s completed with response %s",
self.task_id,
event["message"],
)
Expand Down Expand Up @@ -2883,7 +2883,7 @@ def execute_complete(self, context: Context, event: dict[str, Any]):
if event["status"] == "error":
raise AirflowException(event["message"])
self.log.info(
"%s completed with response %s ",
"%s completed with response %s",
self.task_id,
event["message"],
)
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/operators/cloud_build.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ def execute_complete(self, context: Context, event: dict):
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
)
self.log.info("Cloud Build completed with response %s ", event["message"])
self.log.info("Cloud Build completed with response %s", event["message"])
project_id = self.project_id or hook.project_id
if project_id:
CloudBuildLink.persist(
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/operators/datafusion.py
Original file line number Diff line number Diff line change
Expand Up @@ -872,7 +872,7 @@ def execute_complete(self, context: Context, event: dict[str, Any]):
if event["status"] == "error":
raise AirflowException(event["message"])
self.log.info(
"%s completed with response %s ",
"%s completed with response %s",
self.task_id,
event["message"],
)
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/operators/mlengine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1305,7 +1305,7 @@ def execute_complete(self, context: Context, event: dict[str, Any]):
if event["status"] == "error":
raise AirflowException(event["message"])
self.log.info(
"%s completed with response %s ",
"%s completed with response %s",
self.task_id,
event["message"],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ def execute_complete(self, context: Context, event: dict[str, Any]):
if event["status"] == "error":
raise AirflowException(event["message"])
self.log.info(
"%s completed with response %s ",
"%s completed with response %s",
self.task_id,
event["message"],
)
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def execute(self, context: Context):
)
raise AirflowException(message)
total_row_count = self._decide_and_flush(converted_rows_with_action=converted_rows_with_action)
self.log.info("Facebook Returned %s data points in total: ", total_row_count)
self.log.info("Facebook Returned %s data points in total.", total_row_count)

def _generate_rows_with_action(self, type_check: bool):
if type_check and self.upload_as_account:
Expand All @@ -173,7 +173,7 @@ def _prepare_rows_for_upload(
)
else:
converted_rows_with_action[FlushAction.EXPORT_ONCE].extend(converted_rows)
self.log.info("Facebook Returned %s data points ", len(converted_rows))
self.log.info("Facebook Returned %s data points", len(converted_rows))
return converted_rows_with_action

def _decide_and_flush(self, converted_rows_with_action: dict[FlushAction, list]):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ def execute_complete(self, context: Context, event: dict[str, Any]):
if event["status"] == "error":
raise AirflowException(event["message"])
self.log.info(
"%s completed with response %s ",
"%s completed with response %s",
self.task_id,
event["message"],
)
Expand Down
8 changes: 4 additions & 4 deletions airflow/providers/google/cloud/transfers/gcs_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -522,16 +522,16 @@ def _copy_single_object(self, hook, source_object, destination_object):
elif self.last_modified_time is not None:
# Check to see if object was modified after last_modified_time
if hook.is_updated_after(self.source_bucket, source_object, self.last_modified_time):
self.log.info("Object has been modified after %s ", self.last_modified_time)
self.log.info("Object has been modified after %s", self.last_modified_time)
else:
self.log.debug("Object was not modified after %s ", self.last_modified_time)
self.log.debug("Object was not modified after %s", self.last_modified_time)
return
elif self.maximum_modified_time is not None:
# Check to see if object was modified before maximum_modified_time
if hook.is_updated_before(self.source_bucket, source_object, self.maximum_modified_time):
self.log.info("Object has been modified before %s ", self.maximum_modified_time)
self.log.info("Object has been modified before %s", self.maximum_modified_time)
else:
self.log.debug("Object was not modified before %s ", self.maximum_modified_time)
self.log.debug("Object was not modified before %s", self.maximum_modified_time)
return

self.log.info(
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/transfers/s3_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ def execute_complete(self, context: Context, event: dict[str, Any]) -> None:
"""
if event["status"] == "error":
raise AirflowException(event["message"])
self.log.info("%s completed with response %s ", self.task_id, event["message"])
self.log.info("%s completed with response %s", self.task_id, event["message"])

def get_transfer_hook(self):
return CloudDataTransferServiceHook(
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/utils/field_sanitizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def _sanitize(self, dictionary, remaining_field_spec, current_path):
child = dictionary.get(field_name)
if child is None:
self.log.debug(
"The field %s is missing in %s at the path %s. ", field_name, dictionary, current_path
"The field %s is missing in %s at the path %s.", field_name, dictionary, current_path
)
elif isinstance(child, dict):
self._sanitize(child, remaining_path, f"{current_path}.{field_name}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def upload_data(
)

self.log.info(
"Uploading file to GA file for accountId: %s, webPropertyId:%s and customDataSourceId:%s ",
"Uploading file to GA file for accountId: %s, webPropertyId:%s and customDataSourceId:%s",
account_id,
web_property_id,
custom_data_source_id,
Expand Down Expand Up @@ -165,7 +165,7 @@ def delete_upload_data(
"""
self.log.info(
"Deleting previous uploads to GA file for accountId:%s, "
"webPropertyId:%s and customDataSourceId:%s ",
"webPropertyId:%s and customDataSourceId:%s",
account_id,
web_property_id,
custom_data_source_id,
Expand All @@ -187,7 +187,7 @@ def list_uploads(self, account_id, web_property_id, custom_data_source_id) -> li
:param custom_data_source_id: Custom Data Source Id to which this data import belongs.
"""
self.log.info(
"Getting list of uploads for accountId:%s, webPropertyId:%s and customDataSourceId:%s ",
"Getting list of uploads for accountId:%s, webPropertyId:%s and customDataSourceId:%s",
account_id,
web_property_id,
custom_data_source_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ def execute(self, context: Context) -> None:
with NamedTemporaryFile("w+") as tmp_file:
# Download file from GCS
self.log.info(
"Downloading file from GCS: %s/%s ",
"Downloading file from GCS: %s/%s",
self.storage_bucket,
self.storage_name_object,
)
Expand All @@ -498,7 +498,7 @@ def execute(self, context: Context) -> None:

# Upload newly formatted file to cloud storage
self.log.info(
"Uploading file to GCS: %s/%s ",
"Uploading file to GCS: %s/%s",
self.storage_bucket,
self.storage_name_object,
)
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,7 @@ def _discover_hooks_from_connection_types(
else:
log.warning(
"The connection type '%s' is already registered in the"
" package '%s' with different class names: '%s' and '%s'. ",
" package '%s' with different class names: '%s' and '%s'.",
connection_type,
package_name,
already_registered.hook_class_name,
Expand Down
2 changes: 1 addition & 1 deletion airflow/secrets/local_filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ def load_variables(file_path: str) -> dict[str, str]:
if invalid_keys:
raise AirflowException(f'The "{file_path}" file contains multiple values for keys: {invalid_keys}')
variables = {key: values[0] if isinstance(values, list) else values for key, values in secrets.items()}
log.debug("Loaded %d variables: ", len(variables))
log.debug("Loaded %d variables.", len(variables))
return variables


Expand Down
6 changes: 3 additions & 3 deletions airflow/sensors/external_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,23 +242,23 @@ def poke(self, context: Context, session: Session = NEW_SESSION) -> bool:

if self.external_task_ids:
self.log.info(
"Poking for tasks %s in dag %s on %s ... ",
"Poking for tasks %s in dag %s on %s ...",
self.external_task_ids,
self.external_dag_id,
serialized_dttm_filter,
)

if self.external_task_group_id:
self.log.info(
"Poking for task_group '%s' in dag '%s' on %s ... ",
"Poking for task_group '%s' in dag '%s' on %s ...",
self.external_task_group_id,
self.external_dag_id,
serialized_dttm_filter,
)

if self.external_dag_id and not self.external_task_group_id and not self.external_task_ids:
self.log.info(
"Poking for DAG '%s' on %s ... ",
"Poking for DAG '%s' on %s ...",
self.external_dag_id,
serialized_dttm_filter,
)
Expand Down
2 changes: 1 addition & 1 deletion airflow/utils/db_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ def _suppress_with_logging(table, session):
try:
yield
except (OperationalError, ProgrammingError):
logger.warning("Encountered error when attempting to clean table '%s'. ", table)
logger.warning("Encountered error when attempting to clean table '%s'.", table)
logger.debug("Traceback for table '%s'", table, exc_info=True)
if session.is_active:
logger.debug("Rolling back transaction")
Expand Down
2 changes: 1 addition & 1 deletion airflow/utils/orm_event_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def after_cursor_execute(conn, cursor, statement, parameters, context, executema
stack_info = ">".join([f"{f.filename.rpartition('/')[-1]}:{f.name}" for f in stack][-3:])
conn.info.setdefault("query_start_time", []).append(time.monotonic())
log.info(
"@SQLALCHEMY %s |$ %s |$ %s |$ %s ",
"@SQLALCHEMY %s |$ %s |$ %s |$ %s",
total,
file_name,
stack_info,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1222,7 +1222,7 @@ def _logs(python: str, kubernetes_version: str):
@kubernetes_group.command(
name="logs",
help=f"Dump k8s logs to ${{TMP_DIR}}{os.sep}kind_logs_<cluster_name> directory "
f"(optionally all clusters). ",
f"(optionally all clusters).",
)
@option_python
@option_kubernetes_version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def push_tag_for_final_version(version, release_candidate):
name="start-release",
short_help="Start Airflow release process",
help="Start the process of releasing an Airflow version. "
"This command will guide you through the release process. ",
"This command will guide you through the release process.",
)
@click.option("--release-candidate", required=True)
@click.option("--previous-release", required=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1233,7 +1233,7 @@ class ProviderPRInfo(NamedTuple):
repo = g.get_repo("apache/airflow")
pull_requests: dict[int, PullRequest.PullRequest | Issue.Issue] = {}
with Progress(console=get_console(), disable=disable_progress) as progress:
task = progress.add_task(f"Retrieving {len(all_prs)} PRs ", total=len(all_prs))
task = progress.add_task(f"Retrieving {len(all_prs)} PRs", total=len(all_prs))
for pr_number in all_prs:
progress.console.print(
f"Retrieving PR#{pr_number}: https://github.com/apache/airflow/pull/{pr_number}"
Expand Down
Loading