Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import datetime as dt
import json
from unittest import mock
from unittest.mock import MagicMock, patch

import pytest
Expand Down Expand Up @@ -624,13 +625,15 @@ def test_inject_openlineage_properties_into_dataproc_job_parent_info_only(mock_i
assert result == {"sparkJob": {"properties": expected_properties}}


@patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
@patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
@patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
def test_inject_openlineage_properties_into_dataproc_job_transport_info_only(
mock_is_ol_accessible, mock_ol_listener
):
mock_is_ol_accessible.return_value = True
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport = HttpTransport(
fake_listener = mock.MagicMock()
mock_ol_listener.return_value = fake_listener
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport = HttpTransport(
HttpConfig.from_dict(OPENLINEAGE_HTTP_TRANSPORT_EXAMPLE_CONFIG)
)
expected_properties = {
Expand All @@ -642,13 +645,15 @@ def test_inject_openlineage_properties_into_dataproc_job_transport_info_only(
assert result == {"sparkJob": {"properties": expected_properties}}


@patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
@patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
@patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
def test_inject_openlineage_properties_into_dataproc_job_all_injections(
mock_is_ol_accessible, mock_ol_listener
):
mock_is_ol_accessible.return_value = True
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport = HttpTransport(
fake_listener = mock.MagicMock()
mock_ol_listener.return_value = fake_listener
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport = HttpTransport(
HttpConfig.from_dict(OPENLINEAGE_HTTP_TRANSPORT_EXAMPLE_CONFIG)
)
expected_properties = {
Expand Down Expand Up @@ -899,13 +904,15 @@ def test_inject_openlineage_properties_into_dataproc_batch_parent_info_only(mock
assert result == expected_batch


@patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
@patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
@patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
def test_inject_openlineage_properties_into_dataproc_batch_transport_info_only(
mock_is_ol_accessible, mock_ol_listener
):
mock_is_ol_accessible.return_value = True
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport = HttpTransport(
fake_listener = mock.MagicMock()
mock_ol_listener.return_value = fake_listener
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport = HttpTransport(
HttpConfig.from_dict(OPENLINEAGE_HTTP_TRANSPORT_EXAMPLE_CONFIG)
)
expected_properties = {"existingProperty": "value", **OPENLINEAGE_HTTP_TRANSPORT_EXAMPLE_SPARK_PROPERTIES}
Expand All @@ -917,13 +924,15 @@ def test_inject_openlineage_properties_into_dataproc_batch_transport_info_only(
assert result == expected_batch


@patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
@patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
@patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
def test_inject_openlineage_properties_into_dataproc_batch_all_injections(
mock_is_ol_accessible, mock_ol_listener
):
mock_is_ol_accessible.return_value = True
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport = HttpTransport(
fake_listener = mock.MagicMock()
mock_ol_listener.return_value = fake_listener
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport = HttpTransport(
HttpConfig.from_dict(OPENLINEAGE_HTTP_TRANSPORT_EXAMPLE_CONFIG)
)
expected_properties = {
Expand Down Expand Up @@ -1072,13 +1081,15 @@ def test_inject_openlineage_properties_into_dataproc_workflow_template_parent_in
assert result == expected_template


@patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
@patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
@patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
def test_inject_openlineage_properties_into_dataproc_workflow_template_transport_info_only(
mock_is_ol_accessible, mock_ol_listener
):
mock_is_ol_accessible.return_value = True
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport = HttpTransport(
fake_listener = mock.MagicMock()
mock_ol_listener.return_value = fake_listener
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport = HttpTransport(
HttpConfig.from_dict(OPENLINEAGE_HTTP_TRANSPORT_EXAMPLE_CONFIG)
)
template = {
Expand Down Expand Up @@ -1154,13 +1165,15 @@ def test_inject_openlineage_properties_into_dataproc_workflow_template_transport
assert result == expected_template


@patch("airflow.providers.openlineage.plugins.listener._openlineage_listener")
@patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
@patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
def test_inject_openlineage_properties_into_dataproc_workflow_template_all_injections(
mock_is_ol_accessible, mock_ol_listener
):
mock_is_ol_accessible.return_value = True
mock_ol_listener.adapter.get_or_create_openlineage_client.return_value.transport = HttpTransport(
fake_listener = mock.MagicMock()
mock_ol_listener.return_value = fake_listener
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport = HttpTransport(
HttpConfig.from_dict(OPENLINEAGE_HTTP_TRANSPORT_EXAMPLE_CONFIG)
)
template = {
Expand Down
Loading