Skip to content

HttpToS3Operator throws exception if s3_bucket parameter is not passed #43379

@kostiantyn-lab

Description

@kostiantyn-lab

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.10.1

What happened?

When using the HttpToS3Operator operator without s3_bucket parameter, I get this error:

[2024-10-25, 15:05:43 EEST] {local_task_job_runner.py:123} ▶ Pre task execution logs
[2024-10-25, 15:05:43 EEST] {http_to_s3.py:165} INFO - Calling HTTP method
[2024-10-25, 15:05:43 EEST] {base.py:84} INFO - Retrieving connection 'http_conn'
[2024-10-25, 15:05:44 EEST] {base.py:84} INFO - Retrieving connection 'aws_conn'
[2024-10-25, 15:05:44 EEST] {taskinstance.py:3310} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 767, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 733, in _execute_callable
    return ExecutionCallableRunner(
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/utils/operator_helpers.py", line 252, in run
    return self.func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 406, in wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/transfers/http_to_s3.py", line 168, in execute
    self.s3_hook.load_bytes(
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/hooks/s3.py", line 158, in wrapper
    return func(*bound_args.args, **bound_args.kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/hooks/s3.py", line 132, in wrapper
    return func(*bound_args.args, **bound_args.kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/hooks/s3.py", line 1205, in load_bytes
    self._upload_file_obj(f, key, bucket_name, replace, encrypt, acl_policy)
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/hooks/s3.py", line 1255, in _upload_file_obj
    client.upload_fileobj(
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/boto3/s3/inject.py", line 635, in upload_fileobj
    future = manager.upload(
             ^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/s3transfer/manager.py", line 323, in upload
    self._validate_if_bucket_supported(bucket)
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/s3transfer/manager.py", line 492, in _validate_if_bucket_supported
    match = pattern.match(bucket)
            ^^^^^^^^^^^^^^^^^^^^^
TypeError: expected string or bytes-like object, got 'NoneType'
[2024-10-25, 15:05:44 EEST] {taskinstance.py:1225} INFO - Marking task as UP_FOR_RETRY. dag_id=test, task_id=download, run_id=manual__2024-10-25T12:05:38.785000+00:00, execution_date=20241025T120538, start_date=20241025T120543, end_date=20241025T120544
[2024-10-25, 15:05:44 EEST] {taskinstance.py:340} ▶ Post task execution logs

What you think should happen instead?

the operator worked without errors, since S3Hook gets the S3 bucket name from the service_config in the extra connection information

How to reproduce

Create and run this simple DAG

from datetime import datetime, timedelta

from airflow import DAG
from airflow.providers.amazon.aws.transfers.http_to_s3 import HttpToS3Operator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(year=2019, month=1, day=1),
    'email': ['noreply@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}


with DAG(
        dag_id='http-to-s3-test',
        default_args=default_args,
        description='http-to-s3-test',
        catchup=False,
        schedule_interval=None) as dag:

    download = HttpToS3Operator(
        task_id='download',
        aws_conn_id='aws_conn',
        http_conn_id='http_conn',
        method='GET',
        extra_options={'check_response': True},
        endpoint='/test.txt',
        s3_key='test.txt',
        replace=True,
    )

Operating System

Amazon Linux 2023.5.20240916

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==8.28.0
apache-airflow-providers-apache-spark==4.10.0
apache-airflow-providers-atlassian-jira==2.7.0
apache-airflow-providers-celery==3.8.1
apache-airflow-providers-common-compat==1.2.0
apache-airflow-providers-common-io==1.4.0
apache-airflow-providers-common-sql==1.16.0
apache-airflow-providers-fab==1.3.0
apache-airflow-providers-ftp==3.11.0
apache-airflow-providers-http==4.13.0
apache-airflow-providers-imap==3.7.0
apache-airflow-providers-microsoft-mssql==3.9.0
apache-airflow-providers-postgres==5.12.0
apache-airflow-providers-sftp==4.11.0
apache-airflow-providers-slack==8.9.0
apache-airflow-providers-smtp==1.8.0
apache-airflow-providers-sqlite==3.9.0
apache-airflow-providers-ssh==3.13.1

Deployment

Amazon (AWS) MWAA

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions