diff --git a/airflow/providers/sftp/hooks/sftp.py b/airflow/providers/sftp/hooks/sftp.py index 0907fba2eb49bc..7a2e34a21561b0 100644 --- a/airflow/providers/sftp/hooks/sftp.py +++ b/airflow/providers/sftp/hooks/sftp.py @@ -549,7 +549,7 @@ async def get_files_and_attrs_by_pattern( matched_files = [file for file in files_list if fnmatch(str(file.filename), fnmatch_pattern)] return matched_files - async def get_mod_time(self, path: str) -> str: + async def get_mod_time(self, path: str) -> str: # type: ignore[return] """ Make SFTP async connection. @@ -558,9 +558,10 @@ async def get_mod_time(self, path: str) -> str: :param path: full path to the remote file """ - ssh_conn = await self._get_conn() - sftp_client = await ssh_conn.start_sftp_client() + ssh_conn = None try: + ssh_conn = await self._get_conn() + sftp_client = await ssh_conn.start_sftp_client() ftp_mdtm = await sftp_client.stat(path) modified_time = ftp_mdtm.mtime mod_time = datetime.datetime.fromtimestamp(modified_time).strftime("%Y%m%d%H%M%S") # type: ignore[arg-type] @@ -568,3 +569,6 @@ async def get_mod_time(self, path: str) -> str: return mod_time except asyncssh.SFTPNoSuchFile: raise AirflowException("No files matching") + finally: + if ssh_conn: + ssh_conn.close() diff --git a/airflow/providers/sftp/sensors/sftp.py b/airflow/providers/sftp/sensors/sftp.py index de3870937d43b2..f56ad9341001de 100644 --- a/airflow/providers/sftp/sensors/sftp.py +++ b/airflow/providers/sftp/sensors/sftp.py @@ -111,6 +111,19 @@ def poke(self, context: Context) -> PokeReturnValue | bool: _newer_than = convert_to_utc(self.newer_than) if _newer_than <= _mod_time: files_found.append(actual_file_to_check) + self.log.info( + "File %s has modification time: '%s', which is newer than: '%s'", + actual_file_to_check, + str(_mod_time), + str(_newer_than), + ) + else: + self.log.info( + "File %s has modification time: '%s', which is older than: '%s'", + actual_file_to_check, + str(_mod_time), + str(_newer_than), + ) else: files_found.append(actual_file_to_check)