Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -285,13 +285,15 @@ def execute(self, context: Context):

# If no custom report_name provided, use DV360 name
file_url = resource["metadata"]["googleCloudStoragePath"]
if urllib.parse.urlparse(file_url).scheme == "file":
raise AirflowException("Accessing local file is not allowed in this operator")
report_name = self.report_name or urlsplit(file_url).path.split("/")[-1]
report_name = self._resolve_file_name(report_name)

# Download the report
self.log.info("Starting downloading report %s", self.report_id)
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
with urllib.request.urlopen(file_url) as response:
with urllib.request.urlopen(file_url) as response: # nosec
shutil.copyfileobj(response, temp_file, length=self.chunk_size)

temp_file.flush()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import pytest

from airflow.exceptions import AirflowException
from airflow.models import DAG, TaskInstance as TI
from airflow.providers.google.marketing_platform.operators.display_video import (
GoogleDisplayVideo360CreateQueryOperator,
Expand Down Expand Up @@ -78,6 +79,9 @@ def teardown_method(self):
with create_session() as session:
session.query(TI).delete()

@pytest.mark.parametrize(
"file_path, should_except", [("https://host/path", False), ("file:/path/to/file", True)]
)
@mock.patch("airflow.providers.google.marketing_platform.operators.display_video.shutil")
@mock.patch("airflow.providers.google.marketing_platform.operators.display_video.urllib.request")
@mock.patch("airflow.providers.google.marketing_platform.operators.display_video.tempfile")
Expand All @@ -97,12 +101,14 @@ def test_execute(
mock_temp,
mock_request,
mock_shutil,
file_path,
should_except,
):
mock_temp.NamedTemporaryFile.return_value.__enter__.return_value.name = FILENAME
mock_hook.return_value.get_report.return_value = {
"metadata": {
"status": {"state": "DONE"},
"googleCloudStoragePath": "TEST",
"googleCloudStoragePath": file_path,
}
}
op = GoogleDisplayVideo360DownloadReportV2Operator(
Expand All @@ -112,6 +118,10 @@ def test_execute(
report_name=REPORT_NAME,
task_id="test_task",
)
if should_except:
with pytest.raises(AirflowException):
op.execute(context=None)
return
op.execute(context=None)
mock_hook.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID,
Expand Down