Skip to content

SFTPToGCSOperator Streaming unexpected BlobWritter #49801

@ignaciojcano

Description

@ignaciojcano

Apache Airflow Provider(s)

google

Versions of Apache Airflow Providers

apache-airflow-providers-google ==15.1.0
apache-airflow-providers-sftp ==5.2.1

Apache Airflow version

2.10.2

Operating System

google cloud composer

Deployment

Google Cloud Composer

Deployment details

Composer Version 2.11.5
Airflow Version 2.10.2

What happened

We are looking to use the SFTPToGCSOperator operator to stream files from a SFTP to a GCS bucket. Saw that the streaming for single file was added in #48107 so tried to use it.

The task was configured as shown below, but when executing got the error Failed to execute job **** for task ***** (expected str, bytes or os.PathLike object, not BlobWriter; 33609)

fetch_task = SFTPToGCSOperator(
        task_id="task_id",
        sftp_conn_id="sftp_conn",
        gcp_conn_id="gcp_conn",
        source_path="/file.txt",
        destination_bucket="test_bucket",
        destination_path="path/to/file.txt",
        use_stream=True,
    )

Looking at the code that setups the stream, and the code in the sftp hook, it is basically expecting write_stream to be an instance of BytesIO, it is instead a BlobWriter, so it ends up calling sftp.get instead of sftp.getfo)

SFTP Hook retrieve_file: https://github.com/apache/airflow/blob/providers-sftp/5.1.2/providers/sftp/src/airflow/providers/sftp/hooks/sftp.py#L277

Storage "Blob": https://github.com/googleapis/python-storage/blob/main/google/cloud/storage/blob.py#L4098

What you think should happen instead

We should pass a BytesIO instance which represents the bucket object

How to reproduce

This is a pretty plan configuration of the operator, I cannot provide a bucket nor sftp server to test though.

fetch_task = SFTPToGCSOperator(
        task_id="task_id",
        sftp_conn_id="sftp_conn",
        gcp_conn_id="gcp_conn",
        source_path="/file.txt",
        destination_bucket="test_bucket",
        destination_path="path/to/file.txt",
        use_stream=True,
    )

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions