Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
0413f3b
added deprecation of project_id parameter
KamranImaaz Dec 28, 2025
522e72e
updated all operators try/except in bigtable
KamranImaaz Jan 24, 2026
7e50759
removed bigquery deprecated project id comment
KamranImaaz Jan 24, 2026
15a653e
added exception library in hook
KamranImaaz Jan 24, 2026
ccca98a
re-corrected the try..except block for deletetable operator
KamranImaaz Jan 24, 2026
58f4dee
reverted back previous if..else condition in the operators
KamranImaaz Jan 25, 2026
7b98f0f
handled test files
KamranImaaz Jan 25, 2026
f137f47
handled tests part 2
KamranImaaz Jan 25, 2026
d0c7da1
handled tests part 3
KamranImaaz Jan 25, 2026
33052b4
handled tests part 4
KamranImaaz Jan 25, 2026
7b52610
removed instance_id from parameter in update cluster
KamranImaaz Jan 28, 2026
324e492
added unit test for update cluster which does not exists in hooks big…
KamranImaaz Jan 28, 2026
1272481
removed the parameter mock_project_id from test_delete_table as it is…
KamranImaaz Jan 28, 2026
8083d05
added unit test for delete table when not exists in hooks test_bigtab…
KamranImaaz Jan 28, 2026
53f1abb
added mock validation in test_bigtable hook
KamranImaaz Jan 29, 2026
e30d0aa
modified test_delete_table_when_no_table_exists for mock credentials
KamranImaaz Jan 29, 2026
5b74d5b
test_delete_table_when_no_table_exists modified it
KamranImaaz Jan 29, 2026
331b627
handled CI failures for table delete when no table exists hooks/test_…
KamranImaaz Jan 29, 2026
79fb50b
added the mock project id for test_delete_table_when_no_table_exists …
KamranImaaz Jan 29, 2026
e209a97
modified docstrings
KamranImaaz Jan 31, 2026
90c7d44
prek run
KamranImaaz Jan 31, 2026
599fefe
prek run part 2
KamranImaaz Jan 31, 2026
7701c29
fixed d205 docstring error
KamranImaaz Jan 31, 2026
33dee89
final fix D205 docs string error
KamranImaaz Jan 31, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
from collections.abc import Sequence
from typing import TYPE_CHECKING

import google.api_core.exceptions
from google.cloud.bigtable import Client, enums
from google.cloud.bigtable.cluster import Cluster
from google.cloud.bigtable.instance import Instance
from google.cloud.bigtable.table import ClusterState, Table

from airflow.providers.common.compat.sdk import AirflowException
from airflow.providers.google.common.consts import CLIENT_INFO
from airflow.providers.google.common.hooks.base_google import GoogleBaseHook

Expand Down Expand Up @@ -87,8 +89,7 @@ def delete_instance(self, instance_id: str, project_id: str) -> None:
"""
Delete the specified Cloud Bigtable instance.

Raises google.api_core.exceptions.NotFound if the Cloud Bigtable instance does
not exist.
If the instance does not exist, logs a warning message and exits.

:param project_id: Optional, Google Cloud project ID where the
BigTable exists. If set to None or missing,
Expand Down Expand Up @@ -203,7 +204,6 @@ def update_instance(
instance_type=instance_type,
labels=instance_labels,
)

operation = instance.update()
operation.result(timeout)

Expand Down Expand Up @@ -241,7 +241,8 @@ def delete_table(self, instance_id: str, table_id: str, project_id: str) -> None
"""
Delete the specified table in Cloud Bigtable.

Raises google.api_core.exceptions.NotFound if the table does not exist.
If the instance does not exist, raises RuntimeError.
If the table does not exist, logs a warning message and returns.

:param instance_id: The ID of the Cloud Bigtable instance.
:param table_id: The ID of the table in Cloud Bigtable.
Expand All @@ -252,23 +253,32 @@ def delete_table(self, instance_id: str, table_id: str, project_id: str) -> None
instance = self.get_instance(instance_id=instance_id, project_id=project_id)
if instance is None:
raise RuntimeError(f"Instance {instance_id} did not exist; unable to delete table {table_id}")

table = instance.table(table_id=table_id)
table.delete()
try:
table.delete()
except google.api_core.exceptions.NotFound:
self.log.info("The table '%s' no longer exists. Consider it as deleted", table_id)

@staticmethod
def update_cluster(instance: Instance, cluster_id: str, nodes: int) -> None:
"""
Update number of nodes in the specified Cloud Bigtable cluster.

Raises google.api_core.exceptions.NotFound if the cluster does not exist.
If the cluster does not exist, raises AirflowException.

:param instance: The Cloud Bigtable instance that owns the cluster.
:param cluster_id: The ID of the cluster.
:param nodes: The desired number of nodes.
"""
cluster = Cluster(cluster_id, instance)
# "reload" is required to set location_id attribute on cluster.
cluster.reload()
try:
cluster.reload()
except google.api_core.exceptions.NotFound:
raise AirflowException(
f"Dependency: cluster '{cluster_id}' does not exist for instance '{instance.instance_id}'."
)
cluster.serve_nodes = nodes
cluster.update()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,24 +164,21 @@ def execute(self, context: Context) -> None:
)
BigtableInstanceLink.persist(context=context)
return
try:
hook.create_instance(
project_id=self.project_id,
instance_id=self.instance_id,
main_cluster_id=self.main_cluster_id,
main_cluster_zone=self.main_cluster_zone,
replica_clusters=self.replica_clusters,
instance_display_name=self.instance_display_name,
instance_type=self.instance_type,
instance_labels=self.instance_labels,
cluster_nodes=self.cluster_nodes,
cluster_storage_type=self.cluster_storage_type,
timeout=self.timeout,
)
BigtableInstanceLink.persist(context=context)
except google.api_core.exceptions.GoogleAPICallError as e:
self.log.error("An error occurred. Exiting.")
raise e

hook.create_instance(
project_id=self.project_id,
instance_id=self.instance_id,
main_cluster_id=self.main_cluster_id,
main_cluster_zone=self.main_cluster_zone,
replica_clusters=self.replica_clusters,
instance_display_name=self.instance_display_name,
instance_type=self.instance_type,
instance_labels=self.instance_labels,
cluster_nodes=self.cluster_nodes,
cluster_storage_type=self.cluster_storage_type,
timeout=self.timeout,
)
BigtableInstanceLink.persist(context=context)


class BigtableUpdateInstanceOperator(GoogleCloudBaseOperator, BigtableValidationMixin):
Expand Down Expand Up @@ -263,19 +260,15 @@ def execute(self, context: Context) -> None:
if not instance:
raise AirflowException(f"Dependency: instance '{self.instance_id}' does not exist.")

try:
hook.update_instance(
project_id=self.project_id,
instance_id=self.instance_id,
instance_display_name=self.instance_display_name,
instance_type=self.instance_type,
instance_labels=self.instance_labels,
timeout=self.timeout,
)
BigtableInstanceLink.persist(context=context)
except google.api_core.exceptions.GoogleAPICallError as e:
self.log.error("An error occurred. Exiting.")
raise e
hook.update_instance(
project_id=self.project_id,
instance_id=self.instance_id,
instance_display_name=self.instance_display_name,
instance_type=self.instance_type,
instance_labels=self.instance_labels,
timeout=self.timeout,
)
BigtableInstanceLink.persist(context=context)


class BigtableDeleteInstanceOperator(GoogleCloudBaseOperator, BigtableValidationMixin):
Expand Down Expand Up @@ -511,22 +504,11 @@ def execute(self, context: Context) -> None:
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
)
instance = hook.get_instance(project_id=self.project_id, instance_id=self.instance_id)
if not instance:
raise AirflowException(f"Dependency: instance '{self.instance_id}' does not exist.")

try:
hook.delete_table(
project_id=self.project_id,
instance_id=self.instance_id,
table_id=self.table_id,
)
except google.api_core.exceptions.NotFound:
# It's OK if table doesn't exists.
self.log.info("The table '%s' no longer exists. Consider it as deleted", self.table_id)
except google.api_core.exceptions.GoogleAPICallError as e:
self.log.error("An error occurred. Exiting.")
raise e
hook.delete_table(
project_id=self.project_id,
instance_id=self.instance_id,
table_id=self.table_id,
)


class BigtableUpdateClusterOperator(GoogleCloudBaseOperator, BigtableValidationMixin):
Expand Down Expand Up @@ -603,13 +585,5 @@ def execute(self, context: Context) -> None:
if not instance:
raise AirflowException(f"Dependency: instance '{self.instance_id}' does not exist.")

try:
hook.update_cluster(instance=instance, cluster_id=self.cluster_id, nodes=self.nodes)
BigtableClusterLink.persist(context=context)
except google.api_core.exceptions.NotFound:
raise AirflowException(
f"Dependency: cluster '{self.cluster_id}' does not exist for instance '{self.instance_id}'."
)
except google.api_core.exceptions.GoogleAPICallError as e:
self.log.error("An error occurred. Exiting.")
raise e
hook.update_cluster(instance=instance, cluster_id=self.cluster_id, nodes=self.nodes)
BigtableClusterLink.persist(context=context)
47 changes: 47 additions & 0 deletions providers/google/tests/unit/google/cloud/hooks/test_bigtable.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
from unittest.mock import PropertyMock

import google
import pytest
from google.cloud.bigtable import Client, enums
from google.cloud.bigtable.instance import Instance

from airflow.exceptions import AirflowException
from airflow.providers.google.cloud.hooks.bigtable import BigtableHook
from airflow.providers.google.common.consts import CLIENT_INFO

Expand Down Expand Up @@ -486,6 +488,27 @@ def test_delete_table_overridden_project_id(self, get_client):
instance_exists_method.assert_called_once_with()
table_delete_method.assert_called_once_with()

@mock.patch(
"airflow.providers.google.common.hooks.base_google.GoogleBaseHook.get_credentials_and_project_id"
)
@mock.patch("airflow.providers.google.cloud.hooks.bigtable.BigtableHook._get_client")
def test_delete_table_when_no_table_exists(self, mock_get_client, mock_get_creds_and_project):
mock_get_creds_and_project.return_value = (mock.Mock(), "test-project")
mock_client = mock.Mock()
mock_get_client.return_value = mock_client
instance = mock_client.instance.return_value
instance.exists.return_value = True
table = instance.table.return_value
table.delete.side_effect = google.api_core.exceptions.NotFound("Table not found")
self.bigtable_hook_default_project_id.delete_table(
instance_id=CBT_INSTANCE,
table_id=CBT_TABLE,
)
mock_get_client.assert_called_once_with(project_id="test-project")
instance.exists.assert_called_once_with()
instance.table.assert_called_once_with(table_id=CBT_TABLE)
table.delete.assert_called_once_with()

@mock.patch("google.cloud.bigtable.table.Table.create")
@mock.patch("airflow.providers.google.cloud.hooks.bigtable.BigtableHook._get_client")
def test_create_table(self, get_client, create):
Expand Down Expand Up @@ -514,6 +537,30 @@ def test_update_cluster(self, get_client, reload, update):
reload.assert_called_once_with()
update.assert_called_once_with()

@mock.patch("google.cloud.bigtable.cluster.Cluster.update")
@mock.patch("google.cloud.bigtable.cluster.Cluster.reload")
@mock.patch("airflow.providers.google.cloud.hooks.bigtable.BigtableHook._get_client")
def test_update_cluster_does_not_exist(self, get_client, reload, update):
instance_method = get_client.return_value.instance
instance_method = instance_method.return_value
instance_exists_method = instance_method.exists
instance_exists_method.return_value = True
reload.side_effect = google.api_core.exceptions.NotFound("Cluster not found")
client = mock.Mock(Client)
instance = google.cloud.bigtable.instance.Instance(instance_id=CBT_INSTANCE, client=client)
with pytest.raises(
AirflowException,
match=f"Dependency: cluster '{CBT_CLUSTER}' does not exist for instance '{CBT_INSTANCE}'.",
):
self.bigtable_hook_default_project_id.update_cluster(
instance=instance,
cluster_id=CBT_CLUSTER,
nodes=4,
)
get_client.assert_not_called()
reload.assert_called_once_with()
update.assert_not_called()

@mock.patch("google.cloud.bigtable.table.Table.list_column_families")
@mock.patch("airflow.providers.google.cloud.hooks.bigtable.BigtableHook._get_client")
def test_list_column_families(self, get_client, list_column_families):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ def test_create_instance_with_replicas_that_doesnt_exists(self, mock_hook):

class TestBigtableInstanceUpdate:
@mock.patch("airflow.providers.google.cloud.operators.bigtable.BigtableHook")
def test_delete_execute(self, mock_hook):
def test_update_execute(self, mock_hook):
op = BigtableUpdateInstanceOperator(
project_id=PROJECT_ID,
instance_id=INSTANCE_ID,
Expand Down Expand Up @@ -455,8 +455,8 @@ def test_updating_cluster_but_instance_does_not_exists_empty_project_id(self, mo
@mock.patch("airflow.providers.google.cloud.operators.bigtable.BigtableHook")
def test_updating_cluster_that_does_not_exists(self, mock_hook):
instance = mock_hook.return_value.get_instance.return_value = mock.Mock(Instance)
mock_hook.return_value.update_cluster.side_effect = mock.Mock(
side_effect=google.api_core.exceptions.NotFound("Cluster not found.")
mock_hook.return_value.update_cluster.side_effect = AirflowException(
f"Dependency: cluster '{CLUSTER_ID}' does not exist for instance '{INSTANCE_ID}'."
)
op = BigtableUpdateClusterOperator(
project_id=PROJECT_ID,
Expand All @@ -483,8 +483,8 @@ def test_updating_cluster_that_does_not_exists(self, mock_hook):
@mock.patch("airflow.providers.google.cloud.operators.bigtable.BigtableHook")
def test_updating_cluster_that_does_not_exists_empty_project_id(self, mock_hook):
instance = mock_hook.return_value.get_instance.return_value = mock.Mock(Instance)
mock_hook.return_value.update_cluster.side_effect = mock.Mock(
side_effect=google.api_core.exceptions.NotFound("Cluster not found.")
mock_hook.return_value.update_cluster.side_effect = AirflowException(
f"Dependency: cluster '{CLUSTER_ID}' does not exist for instance '{INSTANCE_ID}'."
)
op = BigtableUpdateClusterOperator(
instance_id=INSTANCE_ID,
Expand Down Expand Up @@ -655,9 +655,6 @@ def test_deleting_table_that_doesnt_exists(self, mock_hook):
impersonation_chain=IMPERSONATION_CHAIN,
)

mock_hook.return_value.delete_table.side_effect = mock.Mock(
side_effect=google.api_core.exceptions.NotFound("Table not found.")
)
op.execute(None)
mock_hook.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID,
Expand All @@ -677,9 +674,6 @@ def test_deleting_table_that_doesnt_exists_empty_project_id(self, mock_hook):
impersonation_chain=IMPERSONATION_CHAIN,
)

mock_hook.return_value.delete_table.side_effect = mock.Mock(
side_effect=google.api_core.exceptions.NotFound("Table not found.")
)
op.execute(None)
mock_hook.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID,
Expand All @@ -689,28 +683,6 @@ def test_deleting_table_that_doesnt_exists_empty_project_id(self, mock_hook):
project_id=None, instance_id=INSTANCE_ID, table_id=TABLE_ID
)

@mock.patch("airflow.providers.google.cloud.operators.bigtable.BigtableHook")
def test_deleting_table_when_instance_doesnt_exists(self, mock_hook):
op = BigtableDeleteTableOperator(
project_id=PROJECT_ID,
instance_id=INSTANCE_ID,
table_id=TABLE_ID,
task_id="id",
gcp_conn_id=GCP_CONN_ID,
impersonation_chain=IMPERSONATION_CHAIN,
)

mock_hook.return_value.get_instance.return_value = None
with pytest.raises(AirflowException) as ctx:
op.execute(None)
err = ctx.value
assert str(err) == f"Dependency: instance '{INSTANCE_ID}' does not exist."
mock_hook.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID,
impersonation_chain=IMPERSONATION_CHAIN,
)
mock_hook.return_value.delete_table.assert_not_called()

@mock.patch("airflow.providers.google.cloud.operators.bigtable.BigtableHook")
def test_different_error_reraised(self, mock_hook):
op = BigtableDeleteTableOperator(
Expand Down