Skip to content

Commit

Permalink
sftp_sensor: fixing resource management with sensor (#40022)
Browse files Browse the repository at this point in the history
closes: #39922

Summary
When a user tries to use the SFTPSensor operator with deferrable=True, using path/newer_than, it will open a connection and remain open, the reason is because of method get_mod_time in opening a sftp connection but not closing it afterward.

As part of this change, we are closing the connection.
  • Loading branch information
pateash authored Jun 7, 2024
1 parent 0c51bd6 commit f5c8059
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 3 deletions.
10 changes: 7 additions & 3 deletions airflow/providers/sftp/hooks/sftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ async def get_files_and_attrs_by_pattern(
matched_files = [file for file in files_list if fnmatch(str(file.filename), fnmatch_pattern)]
return matched_files

async def get_mod_time(self, path: str) -> str:
async def get_mod_time(self, path: str) -> str: # type: ignore[return]
"""
Make SFTP async connection.
Expand All @@ -558,13 +558,17 @@ async def get_mod_time(self, path: str) -> str:
:param path: full path to the remote file
"""
ssh_conn = await self._get_conn()
sftp_client = await ssh_conn.start_sftp_client()
ssh_conn = None
try:
ssh_conn = await self._get_conn()
sftp_client = await ssh_conn.start_sftp_client()
ftp_mdtm = await sftp_client.stat(path)
modified_time = ftp_mdtm.mtime
mod_time = datetime.datetime.fromtimestamp(modified_time).strftime("%Y%m%d%H%M%S") # type: ignore[arg-type]
self.log.info("Found File %s last modified: %s", str(path), str(mod_time))
return mod_time
except asyncssh.SFTPNoSuchFile:
raise AirflowException("No files matching")
finally:
if ssh_conn:
ssh_conn.close()
13 changes: 13 additions & 0 deletions airflow/providers/sftp/sensors/sftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,19 @@ def poke(self, context: Context) -> PokeReturnValue | bool:
_newer_than = convert_to_utc(self.newer_than)
if _newer_than <= _mod_time:
files_found.append(actual_file_to_check)
self.log.info(
"File %s has modification time: '%s', which is newer than: '%s'",
actual_file_to_check,
str(_mod_time),
str(_newer_than),
)
else:
self.log.info(
"File %s has modification time: '%s', which is older than: '%s'",
actual_file_to_check,
str(_mod_time),
str(_newer_than),
)
else:
files_found.append(actual_file_to_check)

Expand Down

0 comments on commit f5c8059

Please sign in to comment.