Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
de18729
fix: Handling scenarios when cluster is non-existent for DataprocDele…
chirodip98 Dec 27, 2025
21893f9
fix: variable renaming fof operation
chirodip98 Dec 27, 2025
f572beb
fix: added type
chirodip98 Dec 27, 2025
39d577d
fix:mypy errors fixed for airflow.utils.timezone
chirodip98 Dec 27, 2025
078dd9e
fix: Handling scenarios when cluster is non-existent for DataprocDele…
chirodip98 Dec 27, 2025
a1c938c
fix: variable renaming fof operation
chirodip98 Dec 27, 2025
f2a4981
fix: added type
chirodip98 Dec 27, 2025
2f24ab0
fix:mypy errors fixed for airflow.utils.timezone
chirodip98 Dec 27, 2025
0426aaf
Merge branch 'main' into fix/delete-cluster-dataproc-bug
chirodip98 Dec 28, 2025
a36c53e
fix: failing test cases addressed
chirodip98 Dec 28, 2025
b9258b6
Merge branch 'fix/delete-cluster-dataproc-bug' of https://github.com/…
chirodip98 Dec 28, 2025
e71f067
fix: failing test cases addressed
chirodip98 Dec 28, 2025
99d1a2c
fix: Handling scenarios when cluster is non-existent for DataprocDele…
chirodip98 Dec 27, 2025
8f62631
fix: variable renaming fof operation
chirodip98 Dec 27, 2025
f9f3dee
fix: added type
chirodip98 Dec 27, 2025
fe66a26
fix:mypy errors fixed for airflow.utils.timezone
chirodip98 Dec 27, 2025
90b5eff
fix: failing test cases addressed
chirodip98 Dec 28, 2025
a3a41e3
Remove unused assignment dag_version_id from expand_mapped_task (#59834)
gopidesupavan Dec 27, 2025
eb0efaf
Exclude FastAPI 0.128.0 (#59856)
jscheffl Dec 27, 2025
72bebea
DagBag: use `Path.relative_to` for consistent cross-platform behavior…
Dev-iL Dec 27, 2025
a80d18d
Remove top-level SDK reference in Core (#59817)
uranusjr Dec 28, 2025
678781a
fix: Handling scenarios when cluster is non-existent for DataprocDele…
chirodip98 Dec 27, 2025
7a883c0
fix: variable renaming fof operation
chirodip98 Dec 27, 2025
e156b53
fix: added type
chirodip98 Dec 27, 2025
a1db2d3
Sqlalchemy 2.0 changes for test_scheduler_job.py (#59823)
kunaljubce Dec 28, 2025
8cf2c36
fix: failing test cases addressed
chirodip98 Dec 28, 2025
e5e7d49
chore: conflicts resolved
chirodip98 Dec 28, 2025
a51e6e1
Merge branch 'main' of https://github.com/chirodip98/airflow-contrib …
chirodip98 Jan 2, 2026
cb24dc9
Merge branch 'main' into fix/delete-cluster-dataproc-bug
chirodip98 Jan 2, 2026
84552bb
Merge branch 'main' into fix/delete-cluster-dataproc-bug
chirodip98 Jan 3, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import warnings
from collections.abc import MutableSequence, Sequence
from dataclasses import dataclass
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from enum import Enum
from functools import cached_property
from typing import TYPE_CHECKING, Any
Expand All @@ -36,7 +36,7 @@
from google.cloud.dataproc_v1 import Batch, Cluster, ClusterStatus, JobStatus

from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.providers.common.compat.sdk import AirflowException, conf
from airflow.providers.common.compat.sdk import AirflowException, conf, AirflowSkipException
from airflow.providers.google.cloud.hooks.dataproc import (
DataprocHook,
DataProcJobBuilder,
Expand All @@ -63,7 +63,6 @@
)
from airflow.providers.google.cloud.utils.dataproc import DataprocOperationType
from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID
from airflow.utils import timezone

if TYPE_CHECKING:
from google.api_core import operation
Expand Down Expand Up @@ -391,7 +390,7 @@ def _build_lifecycle_config(self, cluster_data):
cluster_data[lifecycle_config]["idle_delete_ttl"] = {"seconds": self.idle_delete_ttl}

if self.auto_delete_time:
utc_auto_delete_time = timezone.convert_to_utc(self.auto_delete_time)
utc_auto_delete_time = self.auto_delete_time.astimezone(timezone.utc)
cluster_data[lifecycle_config]["auto_delete_time"] = utc_auto_delete_time.strftime(
"%Y-%m-%dT%H:%M:%S.%fZ"
)
Expand Down Expand Up @@ -995,35 +994,31 @@ def __init__(

def execute(self, context: Context) -> None:
hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
operation = self._delete_cluster(hook)
if not self.deferrable:
hook.wait_for_operation(timeout=self.timeout, result_retry=self.retry, operation=operation)
self.log.info("Cluster deleted.")
else:
try:
hook.get_cluster(
project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
)
except NotFound:
try:
op: operation.Operation = self._delete_cluster(hook)
if not self.deferrable:
hook.wait_for_operation(timeout=self.timeout, result_retry=self.retry, operation=op)
self.log.info("Cluster deleted.")
return
except Exception as e:
raise AirflowException(str(e))

end_time: float = time.time() + self.timeout
self.defer(
trigger=DataprocDeleteClusterTrigger(
gcp_conn_id=self.gcp_conn_id,
project_id=self.project_id,
region=self.region,
cluster_name=self.cluster_name,
end_time=end_time,
metadata=self.metadata,
impersonation_chain=self.impersonation_chain,
polling_interval_seconds=self.polling_interval_seconds,
),
method_name="execute_complete",
)
else:
end_time: float = time.time() + self.timeout
self.defer(
trigger=DataprocDeleteClusterTrigger(
gcp_conn_id=self.gcp_conn_id,
project_id=self.project_id,
region=self.region,
cluster_name=self.cluster_name,
end_time=end_time,
metadata=self.metadata,
impersonation_chain=self.impersonation_chain,
polling_interval_seconds=self.polling_interval_seconds,
),
method_name="execute_complete",
)
except NotFound:
self.log.info(f"Cluster {self.cluster_name} not found in region {self.region}. Skipping deletion.")
raise AirflowSkipException(f"Cluster {self.cluster_name} in region {self.region} was not found - it may have already been deleted")
except Exception as e:
raise AirflowException(str(e))

def execute_complete(self, context: Context, event: dict[str, Any] | None = None) -> Any:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,12 @@
from airflow import __version__ as AIRFLOW_VERSION
from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.models import DAG, DagBag
from airflow.providers.common.compat.sdk import AirflowException, AirflowTaskTimeout, TaskDeferred
from airflow.providers.common.compat.sdk import (
AirflowException,
AirflowSkipException,
AirflowTaskTimeout,
TaskDeferred,
)
from airflow.providers.google.cloud.links.dataproc import (
DATAPROC_BATCH_LINK,
DATAPROC_CLUSTER_LINK_DEPRECATED,
Expand Down Expand Up @@ -1269,7 +1274,36 @@ def test_create_execute_call_finished_before_defer(self, mock_trigger_hook, mock
)

mock_hook.return_value.wait_for_operation.assert_not_called()
assert not mock_defer.called
assert mock_defer.called
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this required?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes


@mock.patch(DATAPROC_PATH.format("DataprocHook"))
def test_execute_cluster_not_found(self, mock_hook):
mock_hook.return_value.create_cluster.return_value = None
mock_hook.return_value.delete_cluster.side_effect = NotFound("test")
delete_cluster_op = DataprocDeleteClusterOperator(
task_id="test_task",
region=GCP_REGION,
cluster_name=CLUSTER_NAME,
project_id=GCP_PROJECT,
cluster_uuid=None,
request_id=REQUEST_ID,
retry=RETRY,
timeout=TIMEOUT,
metadata=METADATA,
)
with pytest.raises(AirflowSkipException, match="Cluster Already Deleted"):
delete_cluster_op.execute(context=mock.MagicMock())

mock_hook.return_value.delete_cluster.assert_called_once_with(
project_id=GCP_PROJECT,
region=GCP_REGION,
cluster_name=CLUSTER_NAME,
cluster_uuid=None,
request_id=REQUEST_ID,
retry=RETRY,
timeout=TIMEOUT,
metadata=METADATA,
)


class TestDataprocSubmitJobOperator(DataprocJobTestBase):
Expand Down
Loading