Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@
from airflow.providers.google.cloud.hooks.compute_ssh import ComputeEngineSSHHook
from airflow.providers.google.cloud.hooks.os_login import OSLoginHook

pytestmark = pytest.mark.db_test


TEST_PROJECT_ID = "test-project-id"

TEST_INSTANCE_NAME = "test-instance"
Expand All @@ -56,6 +53,7 @@ def test_os_login_hook(self, mocker):
assert ComputeEngineSSHHook(gcp_conn_id="gcpssh")._oslogin_hook
mock_os_login_hook.assert_called_with(gcp_conn_id="gcpssh")

@pytest.mark.db_test
@mock.patch("airflow.providers.google.cloud.hooks.compute_ssh.ComputeEngineHook")
@mock.patch("airflow.providers.google.cloud.hooks.compute_ssh.paramiko")
@mock.patch("airflow.providers.google.cloud.hooks.compute_ssh._GCloudAuthorizedSSHClient")
Expand Down Expand Up @@ -112,6 +110,7 @@ def test_get_conn_default_configuration(self, mock_ssh_client, mock_paramiko, mo
]
)

@pytest.mark.db_test
@pytest.mark.parametrize(
"exception_type, error_message",
[(SSHException, r"Error occurred when establishing SSH connection using Paramiko")],
Expand Down Expand Up @@ -156,6 +155,7 @@ def test_get_conn_default_configuration_test_exceptions(
assert error_message in caplog.text
assert "Failed establish SSH connection" in caplog.text

@pytest.mark.db_test
@mock.patch("airflow.providers.google.cloud.hooks.compute_ssh.ComputeEngineHook")
@mock.patch("airflow.providers.google.cloud.hooks.compute_ssh.OSLoginHook")
@mock.patch("airflow.providers.google.cloud.hooks.compute_ssh.paramiko")
Expand Down Expand Up @@ -214,6 +214,7 @@ def test_get_conn_authorize_using_instance_metadata(

mock_os_login_hook.return_value.import_ssh_public_key.assert_not_called()

@pytest.mark.db_test
@pytest.mark.parametrize(
"exception_type, error_message",
[
Expand Down Expand Up @@ -257,6 +258,7 @@ def test_get_conn_authorize_using_instance_metadata_test_exception(
assert error_message in caplog.text
assert "Failed establish SSH connection" in caplog.text

@pytest.mark.db_test
@mock.patch("airflow.providers.google.cloud.hooks.compute_ssh.ComputeEngineHook")
@mock.patch("airflow.providers.google.cloud.hooks.compute_ssh.OSLoginHook")
@mock.patch("airflow.providers.google.cloud.hooks.compute_ssh.paramiko")
Expand Down Expand Up @@ -287,6 +289,7 @@ def test_get_conn_authorize_using_instance_metadata_append_ssh_keys(
zone=TEST_ZONE,
)

@pytest.mark.db_test
@mock.patch("airflow.providers.google.cloud.hooks.compute_ssh.ComputeEngineHook")
@mock.patch("airflow.providers.google.cloud.hooks.compute_ssh.OSLoginHook")
@mock.patch("airflow.providers.google.cloud.hooks.compute_ssh.paramiko")
Expand Down Expand Up @@ -315,6 +318,7 @@ def test_get_conn_private_ip(self, mock_ssh_client, mock_paramiko, mock_os_login
hostname=INTERNAL_IP, look_for_keys=mock.ANY, pkey=mock.ANY, sock=mock.ANY, username=mock.ANY
)

@pytest.mark.db_test
@mock.patch("airflow.providers.google.cloud.hooks.compute_ssh.ComputeEngineHook")
@mock.patch("airflow.providers.google.cloud.hooks.compute_ssh.OSLoginHook")
@mock.patch("airflow.providers.google.cloud.hooks.compute_ssh.paramiko")
Expand Down Expand Up @@ -343,6 +347,7 @@ def test_get_conn_custom_hostname(
username=mock.ANY,
)

@pytest.mark.db_test
@mock.patch("airflow.providers.google.cloud.hooks.compute_ssh.ComputeEngineHook")
@mock.patch("airflow.providers.google.cloud.hooks.compute_ssh.OSLoginHook")
@mock.patch("airflow.providers.google.cloud.hooks.compute_ssh.paramiko")
Expand Down Expand Up @@ -372,6 +377,7 @@ def test_get_conn_iap_tunnel(self, mock_ssh_client, mock_paramiko, mock_os_login
f"--zone={TEST_ZONE} --verbosity=warning"
)

@pytest.mark.db_test
@mock.patch("airflow.providers.google.cloud.hooks.compute_ssh.ComputeEngineHook")
@mock.patch("airflow.providers.google.cloud.hooks.compute_ssh.OSLoginHook")
@mock.patch("airflow.providers.google.cloud.hooks.compute_ssh.paramiko")
Expand Down Expand Up @@ -407,6 +413,7 @@ def test_get_conn_iap_tunnel_with_impersonation_chain(
f"--zone={TEST_ZONE} --verbosity=warning --impersonate-service-account={IMPERSONATION_CHAIN}"
)

@pytest.mark.db_test
@pytest.mark.parametrize(
"exception_type, error_message",
[(SSHException, r"Error occurred when establishing SSH connection using Paramiko")],
Expand Down Expand Up @@ -442,6 +449,7 @@ def test_get_conn_iap_tunnel_test_exception(
assert error_message in caplog.text
assert "Failed establish SSH connection" in caplog.text

@pytest.mark.db_test
@mock.patch("airflow.providers.google.cloud.hooks.compute_ssh.ComputeEngineHook")
@mock.patch("airflow.providers.google.cloud.hooks.compute_ssh.OSLoginHook")
@mock.patch("airflow.providers.google.cloud.hooks.compute_ssh.paramiko")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1695,7 +1695,6 @@ def callback(job_id):
)
assert found_job_id is None

@pytest.mark.db_test
@mock.patch("subprocess.Popen")
@mock.patch("select.select")
def test_dataflow_wait_for_done_logging(self, mock_select, mock_popen, caplog):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@

from unit.google.cloud.utils.base_gcp_mock import mock_base_gcp_hook_default_project_id

pytestmark = pytest.mark.db_test


BASE_STRING = "airflow.providers.google.common.hooks.base_google.{}"
DATAFORM_STRING = "airflow.providers.google.cloud.hooks.dataform.{}"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,6 @@ def test_delete_bucket(self, mock_service):
mock_service.return_value.bucket.assert_called_once_with(test_bucket, user_project=None)
mock_service.return_value.bucket.return_value.delete.assert_called_once()

@pytest.mark.db_test
@mock.patch(GCS_STRING.format("GCSHook.get_conn"))
def test_delete_nonexisting_bucket(self, mock_service, caplog):
mock_service.return_value.bucket.return_value.delete.side_effect = exceptions.NotFound(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,6 @@
from airflow.utils.task_group import TaskGroup
from airflow.utils.timezone import datetime

pytestmark = pytest.mark.db_test


TASK_ID = "test-bq-generic-operator"
TEST_DATASET = "test-dataset"
TEST_DATASET_LOCATION = "EU"
Expand Down Expand Up @@ -1820,6 +1817,7 @@ def test_bigquery_insert_job_operator_async(self, mock_hook, create_task_instanc
"Trigger is not a BigQueryInsertJobTrigger"
)

@pytest.mark.db_test
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
def test_bigquery_insert_job_operator_async_inherits_hook_project_id_when_non_given(
self, mock_hook, create_task_instance_of_operator
Expand Down Expand Up @@ -2202,6 +2200,7 @@ def test_task_label_too_big(self):
op._add_job_labels()
assert "labels" not in configuration

@pytest.mark.db_test
def test_dag_label_too_big(self, dag_maker):
configuration = {
"query": {
Expand All @@ -2219,6 +2218,7 @@ def test_dag_label_too_big(self, dag_maker):
op._add_job_labels()
assert "labels" not in configuration

@pytest.mark.db_test
def test_labels_lowercase(self, dag_maker):
configuration = {
"query": {
Expand All @@ -2237,6 +2237,7 @@ def test_labels_lowercase(self, dag_maker):
assert configuration["labels"]["airflow-dag"] == "yelling_dag_name"
assert configuration["labels"]["airflow-task"] == "yelling_task_id"

@pytest.mark.db_test
def test_labels_starting_with_numbers(self, dag_maker):
configuration = {
"query": {
Expand All @@ -2255,6 +2256,7 @@ def test_labels_starting_with_numbers(self, dag_maker):
assert configuration["labels"]["airflow-dag"] == "123_dag"
assert configuration["labels"]["airflow-task"] == "123_task"

@pytest.mark.db_test
def test_labels_starting_with_underscore(self, dag_maker):
configuration = {
"query": {
Expand All @@ -2274,6 +2276,7 @@ def test_labels_starting_with_underscore(self, dag_maker):
assert configuration["labels"]["airflow-dag"] == "_dag_starting_with_underscore"
assert configuration["labels"]["airflow-task"] == "_task_starting_with_underscore"

@pytest.mark.db_test
def test_labels_starting_with_hyphen(self, dag_maker):
configuration = {
"query": {
Expand Down Expand Up @@ -2309,6 +2312,7 @@ def test_labels_invalid_names(self, dag_maker):
op._add_job_labels()
assert "labels" not in configuration

@pytest.mark.db_test
def test_labels_replace_dots_with_hyphens(self, dag_maker):
configuration = {
"query": {
Expand Down Expand Up @@ -2341,6 +2345,7 @@ def test_labels_replace_dots_with_hyphens(self, dag_maker):
assert configuration["labels"]["airflow-dag"] == "dag_with_taskgroup"
assert configuration["labels"]["airflow-task"] == "task_group-task_name"

@pytest.mark.db_test
def test_labels_with_task_group_prefix_group_id(self, dag_maker):
configuration = {
"query": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,6 @@ def test_response_invalid(self):
with pytest.raises(AirflowException):
DataflowCreatePipelineOperator(**init_kwargs).execute(mock.MagicMock())

@pytest.mark.db_test
@mock.patch("airflow.providers.google.cloud.operators.dataflow.DataflowHook")
def test_response_409(self, mock_hook, create_operator):
"""
Expand All @@ -595,7 +594,6 @@ def test_response_409(self, mock_hook, create_operator):
)


@pytest.mark.db_test
class TestDataflowRunPipelineOperator:
@pytest.fixture
def run_operator(self):
Expand Down Expand Up @@ -627,6 +625,7 @@ def test_execute(self, data_pipeline_hook_mock, run_operator):
location=TEST_LOCATION,
)

@pytest.mark.db_test
def test_invalid_data_pipeline_name(self):
"""
Test that AirflowException is raised if Run Operator is not given a data pipeline name.
Expand All @@ -641,6 +640,7 @@ def test_invalid_data_pipeline_name(self):
with pytest.raises(AirflowException):
DataflowRunPipelineOperator(**init_kwargs).execute(mock.MagicMock())

@pytest.mark.db_test
def test_invalid_project_id(self):
"""
Test that AirflowException is raised if Run Operator is not given a project ID.
Expand All @@ -655,6 +655,7 @@ def test_invalid_project_id(self):
with pytest.raises(AirflowException):
DataflowRunPipelineOperator(**init_kwargs).execute(mock.MagicMock())

@pytest.mark.db_test
def test_invalid_location(self):
"""
Test that AirflowException is raised if Run Operator is not given a location.
Expand Down Expand Up @@ -685,7 +686,6 @@ def test_invalid_response(self):
"error": {"message": "example error"}
}

@pytest.mark.db_test
@mock.patch("airflow.providers.google.cloud.operators.dataflow.DataflowHook")
def test_response_404(self, mock_hook, run_operator):
"""
Expand All @@ -702,7 +702,6 @@ def test_response_404(self, mock_hook, run_operator):
)


@pytest.mark.db_test
class TestDataflowDeletePipelineOperator:
@pytest.fixture
def run_operator(self):
Expand Down Expand Up @@ -736,6 +735,7 @@ def test_execute(self, data_pipeline_hook_mock, run_operator):
location=TEST_LOCATION,
)

@pytest.mark.db_test
def test_invalid_data_pipeline_name(self):
"""
Test that AirflowException is raised if Delete Operator is not given a data pipeline name.
Expand All @@ -750,6 +750,7 @@ def test_invalid_data_pipeline_name(self):
with pytest.raises(AirflowException):
DataflowDeletePipelineOperator(**init_kwargs).execute(mock.MagicMock())

@pytest.mark.db_test
def test_invalid_project_id(self):
"""
Test that AirflowException is raised if Delete Operator is not given a project ID.
Expand All @@ -764,6 +765,7 @@ def test_invalid_project_id(self):
with pytest.raises(AirflowException):
DataflowDeletePipelineOperator(**init_kwargs).execute(mock.MagicMock())

@pytest.mark.db_test
def test_invalid_location(self):
"""
Test that AirflowException is raised if Delete Operator is not given a location.
Expand All @@ -778,6 +780,7 @@ def test_invalid_location(self):
with pytest.raises(AirflowException):
DataflowDeletePipelineOperator(**init_kwargs).execute(mock.MagicMock())

@pytest.mark.db_test
def test_invalid_response(self):
"""
Test that AirflowException is raised if Delete Operator fails execution and returns error.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ def test_execute(self, mock_hook, create_operator):
)


@pytest.mark.db_test
class TestDataflowRunPipelineOperator:
@pytest.fixture
def run_operator(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,6 @@ def test_invalid_source_code_union_field__execute(self, source_code, message):
with pytest.raises(AirflowException, match=message):
op.execute(None)

@pytest.mark.db_test
@pytest.mark.parametrize(
"source_code, message",
[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
from airflow.models import Connection
from airflow.providers.google.cloud.secrets.secret_manager import CloudSecretManagerBackend

pytestmark = pytest.mark.db_test

CREDENTIALS = "test-creds"
KEY_FILE = "test-file.json"
PROJECT_ID = "test-project-id"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1620,13 +1620,13 @@ def create_task_instance(create_task_instance_of_operator, session):
)


@pytest.mark.db_test
class TestAsyncGCSToBigQueryOperator:
def _set_execute_complete(self, session, ti, **next_kwargs):
ti.next_method = "execute_complete"
ti.next_kwargs = next_kwargs
session.flush()

@pytest.mark.db_test
@mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
def test_execute_without_external_table_async_should_execute_successfully(
self, hook, create_task_instance, session
Expand Down Expand Up @@ -1658,6 +1658,7 @@ def test_execute_without_external_table_async_should_execute_successfully(
trigger_cls = session.scalar(select(Trigger.classpath).where(Trigger.id == ti.trigger_id))
assert trigger_cls == "airflow.providers.google.cloud.triggers.bigquery.BigQueryInsertJobTrigger"

@pytest.mark.db_test
def test_execute_without_external_table_async_should_throw_ex_when_event_status_error(
self, create_task_instance, session
):
Expand Down Expand Up @@ -1710,6 +1711,7 @@ def test_execute_logging_without_external_table_async_should_execute_successfull
"%s completed with response %s ", "test-gcs-to-bq-operator", "Job completed"
)

@pytest.mark.db_test
@mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
def test_execute_without_external_table_generate_job_id_async_should_execute_successfully(
self, hook, create_task_instance, session
Expand Down Expand Up @@ -1749,6 +1751,7 @@ def test_execute_without_external_table_generate_job_id_async_should_execute_suc
force_rerun=True,
)

@pytest.mark.db_test
@mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
def test_execute_without_external_table_reattach_async_should_execute_successfully(
self, hook, create_task_instance, session
Expand Down Expand Up @@ -1788,6 +1791,7 @@ def test_execute_without_external_table_reattach_async_should_execute_successful
project_id=JOB_PROJECT_ID,
)

@pytest.mark.db_test
@mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
def test_execute_without_external_table_force_rerun_async_should_execute_successfully(
self, hook, create_task_instance
Expand Down Expand Up @@ -1836,6 +1840,7 @@ def test_execute_without_external_table_force_rerun_async_should_execute_success
project_id=JOB_PROJECT_ID,
)

@pytest.mark.db_test
@mock.patch(GCS_TO_BQ_PATH.format("GCSHook"))
@mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
def test_schema_fields_without_external_table_async_should_execute_successfully(
Expand Down Expand Up @@ -1897,6 +1902,7 @@ def test_schema_fields_without_external_table_async_should_execute_successfully(

bq_hook.return_value.insert_job.assert_has_calls(calls)

@pytest.mark.db_test
@mock.patch(GCS_TO_BQ_PATH.format("GCSHook"))
@mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
def test_schema_fields_int_without_external_table_async_should_execute_successfully(
Expand Down Expand Up @@ -1963,6 +1969,7 @@ def test_schema_fields_int_without_external_table_async_should_execute_successfu

bq_hook.return_value.insert_job.assert_has_calls(calls)

@pytest.mark.db_test
@mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
def test_execute_complete_reassigns_job_id(self, bq_hook, create_task_instance, session):
"""Assert that we use job_id from event after deferral."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
)
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator

pytestmark = pytest.mark.db_test


class TestFileToGcsOperator:
_config = {
Expand Down
Loading