From 105979e4568673ceb7c22be4e4adb43bf7640cf9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20Rolland?= Date: Mon, 6 May 2024 18:12:23 +0200 Subject: [PATCH] Fix SFTPSensor.newer_than not working with jinja logical ds/ts expression (#39056) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Fixes https://github.com/apache/airflow/issues/36629 * Fixes PR failed test * Remove an parametrize duplicate tests * Fix formatting * Fix formatting * Fixes https://github.com/apache/airflow/issues/36629 * Fixes PR failed test * Remove an parametrize duplicate tests * update simple-salesforce type hints to support 1.12.6 (#39047) * Fix formatting * Add changelog for airflow python client 2.9.0 (#39060) * Upgrade to latest hatchling as build dependency (#39044) * Prepare docs 1st wave (RC3) + ad hoc April 2024 (#38995) (#39054) * Prepare docs 1st wave (RC3) + ad hoc April 2024 (#38995) * update databricks * [docs] update `DagBag` class docstring to include all params (#38814) * update docstring for DagBag class * break long line * fix space Signed-off-by: kalyanr --------- Signed-off-by: kalyanr * Data aware scheduling docs edits (#38687) * Moves airflow import in deprecated pod_generator to local (#39062) The import might be invoked when K8S executor starts with sentry on and it might lead to circular imports Related: #31442 * KPO xcom sidecar PodDefault usage (#38951) We should use the same, non deprecated, version of PodDefaults for the xcom sidecar when creating and reading xcom. * Fix formatting * Change date/time parsing method for newer_than parameter un SFTPSensor * Add examples in AWS auth manager documentation (#39040) * update document (#39068) * Update hatchling to version 1.24.0 (#39072) * Check that the dataset<>task exists before trying to render graph (#39069) * Change date/time parsing method for newer_than parameter un SFTPSensor * Fix utc timezone in unit tests * Fix utc timezone in unit tests --------- Signed-off-by: kalyanr Co-authored-by: Grégoire Rolland Co-authored-by: Hussein Awala Co-authored-by: Ephraim Anierobi Co-authored-by: Jarek Potiuk Co-authored-by: Elad Kalif <45845474+eladkal@users.noreply.github.com> Co-authored-by: Kalyan Co-authored-by: Laura Zdanski <25642903+lzdanski@users.noreply.github.com> Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> Co-authored-by: Vincent <97131062+vincbeck@users.noreply.github.com> Co-authored-by: humit Co-authored-by: Brent Bovenzi --- airflow/providers/sftp/sensors/sftp.py | 8 +++++--- tests/providers/sftp/sensors/test_sftp.py | 20 +++++++++++++++++--- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/airflow/providers/sftp/sensors/sftp.py b/airflow/providers/sftp/sensors/sftp.py index 02055f31e3f81..de3870937d43b 100644 --- a/airflow/providers/sftp/sensors/sftp.py +++ b/airflow/providers/sftp/sensors/sftp.py @@ -30,7 +30,7 @@ from airflow.providers.sftp.hooks.sftp import SFTPHook from airflow.providers.sftp.triggers.sftp import SFTPTrigger from airflow.sensors.base import BaseSensorOperator, PokeReturnValue -from airflow.utils.timezone import convert_to_utc +from airflow.utils.timezone import convert_to_utc, parse if TYPE_CHECKING: from airflow.utils.context import Context @@ -57,7 +57,7 @@ def __init__( *, path: str, file_pattern: str = "", - newer_than: datetime | None = None, + newer_than: datetime | str | None = None, sftp_conn_id: str = "sftp_default", python_callable: Callable | None = None, op_args: list | None = None, @@ -70,7 +70,7 @@ def __init__( self.file_pattern = file_pattern self.hook: SFTPHook | None = None self.sftp_conn_id = sftp_conn_id - self.newer_than: datetime | None = newer_than + self.newer_than: datetime | str | None = newer_than self.python_callable: Callable | None = python_callable self.op_args = op_args or [] self.op_kwargs = op_kwargs or {} @@ -105,6 +105,8 @@ def poke(self, context: Context) -> PokeReturnValue | bool: continue if self.newer_than: + if isinstance(self.newer_than, str): + self.newer_than = parse(self.newer_than) _mod_time = convert_to_utc(datetime.strptime(mod_time, "%Y%m%d%H%M%S")) _newer_than = convert_to_utc(self.newer_than) if _newer_than <= _mod_time: diff --git a/tests/providers/sftp/sensors/test_sftp.py b/tests/providers/sftp/sensors/test_sftp.py index 6a08b377ce80f..25add45e153fb 100644 --- a/tests/providers/sftp/sensors/test_sftp.py +++ b/tests/providers/sftp/sensors/test_sftp.py @@ -17,7 +17,7 @@ # under the License. from __future__ import annotations -from datetime import datetime +from datetime import datetime, timezone as stdlib_timezone from unittest import mock from unittest.mock import Mock, call, patch @@ -97,11 +97,25 @@ def test_file_not_new_enough(self, sftp_hook_mock): sftp_hook_mock.return_value.get_mod_time.assert_called_once_with("/path/to/file/1970-01-01.txt") assert not output + @pytest.mark.parametrize( + "newer_than", + ( + datetime(2020, 1, 2), + datetime(2020, 1, 2, tzinfo=stdlib_timezone.utc), + "2020-01-02", + "2020-01-02 00:00:00+00:00", + "2020-01-02 00:00:00.001+00:00", + "2020-01-02T00:00:00+00:00", + "2020-01-02T00:00:00Z", + "2020-01-02T00:00:00+04:00", + "2020-01-02T00:00:00.000001+04:00", + ), + ) @patch("airflow.providers.sftp.sensors.sftp.SFTPHook") - def test_naive_datetime(self, sftp_hook_mock): + def test_multiple_datetime_format_in_newer_than(self, sftp_hook_mock, newer_than): sftp_hook_mock.return_value.get_mod_time.return_value = "19700101000000" sftp_sensor = SFTPSensor( - task_id="unit_test", path="/path/to/file/1970-01-01.txt", newer_than=datetime(2020, 1, 2) + task_id="unit_test", path="/path/to/file/1970-01-01.txt", newer_than=newer_than ) context = {"ds": "1970-01-00"} output = sftp_sensor.poke(context)