Skip to content

SFTP Sensor fails to locate file #28121

@RishuGuru

Description

@RishuGuru

Apache Airflow version

2.5.0

What happened

While creating SFTP sensor I have tried to find a file under directory. But I was getting error as Time Out, not found.
So after debugging code found that there is a issue with poke function.

As after getting matched file we are trying to find last modified time of the file using self.hook.get_mod_time which take full path (path + filename) and we are giving only filename as arguments.

What you think should happen instead

I have solved that issue by adding path with filename and then calling self.hook.get_mod_time function.

Here is modified code,


def poke(self, context: Context) -> bool:
        self.hook = SFTPHook(self.sftp_conn_id)
        self.log.info("Poking for %s, with pattern %s", self.path, self.file_pattern)

        if self.file_pattern:
            file_from_pattern = self.hook.get_file_by_pattern(self.path, self.file_pattern)
            if file_from_pattern:
                '''actual_file_to_check = file_from_pattern'''
                actual_file_to_check = self.path + file_from_pattern
            else:
                return False
        else:
            actual_file_to_check = self.path

        try:
            mod_time = self.hook.get_mod_time(actual_file_to_check)
            self.log.info("Found File %s last modified: %s", str(actual_file_to_check), str(mod_time))
        except OSError as e:
            if e.errno != SFTP_NO_SUCH_FILE:
                raise e
            return False
        self.hook.close_conn()
        if self.newer_than:
            _mod_time = convert_to_utc(datetime.strptime(mod_time, "%Y%m%d%H%M%S"))
            _newer_than = convert_to_utc(self.newer_than)
            return _newer_than <= _mod_time
        else:
            return True

How to reproduce

You can get same issue by creating a DAG as mentioned

with DAG(
        dag_id='sftp_sensor_dag',
        max_active_runs=1,
        default_args=default_args,
) as dag:
 file_sensing_task = SFTPSensor(
        task_id='sensor_for_file',
        path= "Weekly/11/",
        file_pattern = "*pdf*,
        sftp_conn_id='sftp_hook_conn',
        poke_interval=30
    )

Operating System

Microsoft Windows [Version 10.0.19044.2251]

Versions of Apache Airflow Providers

No response

Deployment

Docker-Compose

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions