-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow Provider(s)
Versions of Apache Airflow Providers
apache-airflow-providers-google ==15.1.0
apache-airflow-providers-sftp ==5.2.1
Apache Airflow version
2.10.2
Operating System
google cloud composer
Deployment
Google Cloud Composer
Deployment details
Composer Version 2.11.5
Airflow Version 2.10.2
What happened
We are looking to use the SFTPToGCSOperator operator to stream files from a SFTP to a GCS bucket. Saw that the streaming for single file was added in #48107 so tried to use it.
The task was configured as shown below, but when executing got the error Failed to execute job **** for task ***** (expected str, bytes or os.PathLike object, not BlobWriter; 33609)
fetch_task = SFTPToGCSOperator(
task_id="task_id",
sftp_conn_id="sftp_conn",
gcp_conn_id="gcp_conn",
source_path="/file.txt",
destination_bucket="test_bucket",
destination_path="path/to/file.txt",
use_stream=True,
)
Looking at the code that setups the stream, and the code in the sftp hook, it is basically expecting write_stream to be an instance of BytesIO, it is instead a BlobWriter, so it ends up calling sftp.get instead of sftp.getfo)
SFTP Hook retrieve_file: https://github.com/apache/airflow/blob/providers-sftp/5.1.2/providers/sftp/src/airflow/providers/sftp/hooks/sftp.py#L277
Storage "Blob": https://github.com/googleapis/python-storage/blob/main/google/cloud/storage/blob.py#L4098
What you think should happen instead
We should pass a BytesIO instance which represents the bucket object
How to reproduce
This is a pretty plan configuration of the operator, I cannot provide a bucket nor sftp server to test though.
fetch_task = SFTPToGCSOperator(
task_id="task_id",
sftp_conn_id="sftp_conn",
gcp_conn_id="gcp_conn",
source_path="/file.txt",
destination_bucket="test_bucket",
destination_path="path/to/file.txt",
use_stream=True,
)
Anything else
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct