Skip to content
Merged
17 changes: 17 additions & 0 deletions devel-common/src/tests_common/pytest_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2491,3 +2491,20 @@ def testing_dag_bundle():
if session.query(DagBundleModel).filter(DagBundleModel.name == "testing").count() == 0:
testing = DagBundleModel(name="testing")
session.add(testing)


@pytest.fixture
def create_connection_without_db(monkeypatch):
"""
Fixture to create connections for tests without using the database.

This fixture uses monkeypatch to set the appropriate AIRFLOW_CONN_{conn_id} environment variable.
"""

def _create_conn(connection, session=None):
"""Create connection using environment variable."""

env_var_name = f"AIRFLOW_CONN_{connection.conn_id.upper()}"
monkeypatch.setenv(env_var_name, connection.as_json())

return _create_conn
8 changes: 4 additions & 4 deletions providers/airbyte/tests/unit/airbyte/hooks/test_airbyte.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from airflow.exceptions import AirflowException
from airflow.models import Connection
from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
from airflow.utils import db

# those tests will not work with database isolation because they mock requests
pytestmark = pytest.mark.db_test
Expand Down Expand Up @@ -55,16 +54,17 @@ class TestAirbyteHook:
_mock_job_status_success_response_body = {"job": {"status": "succeeded"}}
_mock_job_cancel_status = "cancelled"

def setup_method(self):
db.merge_conn(
@pytest.fixture(autouse=True)
def setup_connections(self, create_connection_without_db):
create_connection_without_db(
Connection(
conn_id=self.airbyte_conn_id,
conn_type=self.conn_type,
host=self.host,
port=self.port,
)
)
db.merge_conn(
create_connection_without_db(
Connection(
conn_id=self.airbyte_conn_id_with_proxy,
conn_type=self.conn_type,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

from airflow.models import Connection
from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
from airflow.utils import db


@pytest.mark.db_test
Expand All @@ -41,9 +40,9 @@ class TestAirbyteTriggerSyncOp:

@mock.patch("airbyte_api.jobs.Jobs.create_job")
@mock.patch("airflow.providers.airbyte.hooks.airbyte.AirbyteHook.wait_for_job", return_value=None)
def test_execute(self, mock_wait_for_job, mock_submit_sync_connection):
def test_execute(self, mock_wait_for_job, mock_submit_sync_connection, create_connection_without_db):
conn = Connection(conn_id=self.airbyte_conn_id, conn_type="airbyte", host="airbyte.com")
db.merge_conn(conn)
create_connection_without_db(conn)
mock_response = mock.Mock()
mock_response.job_response = JobResponse(
connection_id="connection-mock",
Expand Down Expand Up @@ -71,9 +70,9 @@ def test_execute(self, mock_wait_for_job, mock_submit_sync_connection):
)

@mock.patch("airflow.providers.airbyte.hooks.airbyte.AirbyteHook.cancel_job")
def test_on_kill(self, mock_cancel_job):
def test_on_kill(self, mock_cancel_job, create_connection_without_db):
conn = Connection(conn_id=self.airbyte_conn_id, conn_type="airbyte", host="airbyte.com")
db.merge_conn(conn)
create_connection_without_db(conn)

op = AirbyteTriggerSyncOperator(
task_id="test_Airbyte_op",
Expand Down
6 changes: 3 additions & 3 deletions providers/airbyte/tests/unit/airbyte/sensors/test_airbyte.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from airflow.exceptions import AirflowException
from airflow.models import Connection
from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor
from airflow.utils import db


@pytest.mark.db_test
Expand All @@ -46,8 +45,9 @@ def get_job(self, status):
)
return response

def setup_method(self):
db.merge_conn(
@pytest.fixture(autouse=True)
def setup_connections(self, create_connection_without_db):
create_connection_without_db(
Connection(conn_id=self.airbyte_conn_id, conn_type="airbyte", host="http://test-airbyte")
)

Expand Down
8 changes: 5 additions & 3 deletions providers/airbyte/tests/unit/airbyte/triggers/test_airbyte.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
from airflow.providers.airbyte.triggers.airbyte import AirbyteSyncTrigger
from airflow.triggers.base import TriggerEvent
from airflow.utils import db


@pytest.mark.db_test
Expand All @@ -39,8 +38,11 @@ class TestAirbyteSyncTrigger:
END_TIME = time.time() + 60 * 60 * 24 * 7
POLL_INTERVAL = 3.0

def setup_method(self):
db.merge_conn(Connection(conn_id=self.CONN_ID, conn_type="airbyte", host="http://test-airbyte"))
@pytest.fixture(autouse=True)
def setup_connections(self, create_connection_without_db):
create_connection_without_db(
Connection(conn_id=self.CONN_ID, conn_type="airbyte", host="http://test-airbyte")
)

def test_serialization(self):
"""Assert TestAirbyteSyncTrigger correctly serializes its arguments and classpath."""
Expand Down
8 changes: 4 additions & 4 deletions providers/amazon/tests/unit/amazon/aws/hooks/test_chime.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from airflow.exceptions import AirflowException
from airflow.models import Connection
from airflow.providers.amazon.aws.hooks.chime import ChimeWebhookHook
from airflow.utils import db

pytestmark = pytest.mark.db_test

Expand All @@ -42,8 +41,9 @@ class TestChimeWebhookHook:

expected_payload = json.dumps(expected_payload_dict)

def setup_method(self):
db.merge_conn(
@pytest.fixture(autouse=True)
def setup_connections(self, create_connection_without_db):
create_connection_without_db(
Connection(
conn_id="default-chime-webhook",
conn_type="chime",
Expand All @@ -52,7 +52,7 @@ def setup_method(self):
schema="https",
)
)
db.merge_conn(
create_connection_without_db(
Connection(
conn_id="chime-bad-url",
conn_type="chime",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@
from airflow.providers.amazon.aws.hooks.chime import ChimeWebhookHook
from airflow.providers.amazon.aws.notifications.chime import ChimeNotifier, send_chime_notification
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.utils import db

pytestmark = pytest.mark.db_test


class TestChimeNotifier:
# Chime webhooks can't really have a default connection, so we need to create one for tests.
def setup_method(self):
db.merge_conn(
@pytest.fixture(autouse=True)
def setup_connections(self, create_connection_without_db):
create_connection_without_db(
Connection(
conn_id="default-chime-webhook",
conn_type="chime",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@
from airflow import models
from airflow.models.xcom import MAX_XCOM_SIZE
from airflow.providers.amazon.aws.transfers.google_api_to_s3 import GoogleApiToS3Operator
from airflow.utils import db

# This test mocks json.dumps so it won't work for database isolation mode
pytestmark = pytest.mark.db_test


class TestGoogleApiToS3:
def setup_method(self):
db.merge_conn(
@pytest.fixture(autouse=True)
def setup_connections(self, create_connection_without_db):
create_connection_without_db(
models.Connection(
conn_id="google_test",
host="google",
Expand All @@ -42,7 +42,7 @@ def setup_method(self):
password="client_secret",
)
)
db.merge_conn(
create_connection_without_db(
models.Connection(
conn_id="s3_test",
conn_type="s3",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@
from airflow import models
from airflow.exceptions import AirflowException
from airflow.providers.amazon.aws.transfers.s3_to_sql import S3ToSqlOperator
from airflow.utils import db
from airflow.utils.session import create_session

pytestmark = pytest.mark.db_test


class TestS3ToSqlTransfer:
def setup_method(self):
db.merge_conn(
@pytest.fixture(autouse=True)
def setup_connections(self, create_connection_without_db):
create_connection_without_db(
models.Connection(
conn_id="s3_test",
conn_type="aws",
Expand All @@ -41,7 +41,7 @@ def setup_method(self):
' "aws_secret_access_key"}',
)
)
db.merge_conn(
create_connection_without_db(
models.Connection(
conn_id="sql_test",
conn_type="postgres",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@

from airflow.models import Connection
from airflow.providers.apache.cassandra.hooks.cassandra import CassandraHook
from airflow.utils import db


@pytest.mark.integration("cassandra")
class TestCassandraHook:
def setup_method(self):
db.merge_conn(
@pytest.fixture(autouse=True)
def setup_connections(self, create_connection_without_db):
create_connection_without_db(
Connection(
conn_id="cassandra_test",
conn_type="cassandra",
Expand All @@ -47,7 +47,7 @@ def setup_method(self):
extra='{"load_balancing_policy":"TokenAwarePolicy","protocol_version":4}',
)
)
db.merge_conn(
create_connection_without_db(
Connection(
conn_id="cassandra_default_with_schema",
conn_type="cassandra",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from airflow import DAG
from airflow.models import Connection
from airflow.providers.apache.flink.operators.flink_kubernetes import FlinkKubernetesOperator
from airflow.utils import db, timezone
from airflow.utils import timezone

pytestmark = pytest.mark.db_test

Expand Down Expand Up @@ -188,11 +188,12 @@

@patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook.get_conn")
class TestFlinkKubernetesOperator:
def setup_method(self):
db.merge_conn(
@pytest.fixture(autouse=True)
def setup_connections(self, create_connection_without_db):
create_connection_without_db(
Connection(conn_id="kubernetes_default_kube_config", conn_type="kubernetes", extra=json.dumps({}))
)
db.merge_conn(
create_connection_without_db(
Connection(
conn_id="kubernetes_with_namespace",
conn_type="kubernetes",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from airflow.exceptions import AirflowException
from airflow.models import Connection
from airflow.providers.apache.flink.sensors.flink_kubernetes import FlinkKubernetesSensor
from airflow.utils import db, timezone
from airflow.utils import timezone

pytestmark = pytest.mark.db_test

Expand Down Expand Up @@ -866,16 +866,19 @@

@patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook.get_conn")
class TestFlinkKubernetesSensor:
def setup_method(self):
db.merge_conn(Connection(conn_id="kubernetes_default", conn_type="kubernetes", extra=json.dumps({})))
db.merge_conn(
@pytest.fixture(autouse=True)
def setup_connections(self, create_connection_without_db):
create_connection_without_db(
Connection(conn_id="kubernetes_default", conn_type="kubernetes", extra=json.dumps({}))
)
create_connection_without_db(
Connection(
conn_id="kubernetes_default",
conn_type="kubernetes",
extra=json.dumps({}),
)
)
db.merge_conn(
create_connection_without_db(
Connection(
conn_id="kubernetes_with_namespace",
conn_type="kubernetes",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@

from airflow.models import Connection
from airflow.providers.apache.kafka.hooks.client import KafkaAdminClientHook
from airflow.utils import db

client_config = {"socket.timeout.ms": 1000, "bootstrap.servers": "broker:29092", "group.id": "my-group"}


@pytest.mark.integration("kafka")
class TestKafkaAdminClientHook:
def setup_method(self):
db.merge_conn(
@pytest.fixture(autouse=True)
def setup_connections(self, create_connection_without_db):
create_connection_without_db(
Connection(
conn_id="kafka_d",
conn_type="kafka",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
# Import Hook
from airflow.providers.apache.kafka.hooks.client import KafkaAdminClientHook
from airflow.providers.apache.kafka.hooks.consume import KafkaConsumerHook
from airflow.utils import db

TOPIC = "consumer_hook_test_1"

Expand All @@ -44,8 +43,9 @@ class TestConsumerHook:
Test consumer hook.
"""

def setup_method(self):
db.merge_conn(
@pytest.fixture(autouse=True)
def setup_connections(self, create_connection_without_db):
create_connection_without_db(
Connection(
conn_id="kafka_d",
conn_type="kafka",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from airflow.models import Connection
from airflow.providers.apache.kafka.hooks.client import KafkaAdminClientHook
from airflow.providers.apache.kafka.hooks.produce import KafkaProducerHook
from airflow.utils import db

log = logging.getLogger(__name__)
config = {"bootstrap.servers": "broker:29092", "group.id": "hook.producer.integration.test"}
Expand All @@ -36,8 +35,9 @@ class TestProducerHook:
Test consumer hook.
"""

def setup_method(self):
db.merge_conn(
@pytest.fixture(autouse=True)
def setup_connections(self, create_connection_without_db):
create_connection_without_db(
Connection(
conn_id="kafka_default",
conn_type="kafka",
Expand Down
Loading