Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 17 additions & 19 deletions airflow/providers/amazon/aws/transfers/sql_to_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,15 @@

FILE_FORMAT = Enum(
"FILE_FORMAT",
"CSV, PARQUET",
"CSV, JSON, PARQUET",
)

FileOptions = namedtuple('FileOptions', ['mode', 'suffix'])
FileOptions = namedtuple('FileOptions', ['mode', 'suffix', 'function'])

FILE_OPTIONS_MAP = {
FILE_FORMAT.CSV: FileOptions('r+', '.csv'),
FILE_FORMAT.PARQUET: FileOptions('rb+', '.parquet'),
FILE_FORMAT.CSV: FileOptions('r+', '.csv', 'to_csv'),
FILE_FORMAT.JSON: FileOptions('r+', '.json', 'to_json'),
FILE_FORMAT.PARQUET: FileOptions('rb+', '.parquet', 'to_parquet'),
}


Expand All @@ -69,8 +70,8 @@ class SqlToS3Operator(BaseOperator):
- ``path/to/cert/bundle.pem``: A filename of the CA cert bundle to uses.
You can specify this argument if you want to use a different
CA cert bundle than the one used by botocore.
:param file_format: the destination file format, only string 'csv' or 'parquet' is accepted.
:param pd_kwargs: arguments to include in ``DataFrame.to_parquet()`` or ``DataFrame.to_csv()``.
:param file_format: the destination file format, only string 'csv', 'json' or 'parquet' is accepted.
:param pd_kwargs: arguments to include in DataFrame ``.to_parquet()``, ``.to_json()`` or ``.to_csv()``.
"""

template_fields: Sequence[str] = (
Expand All @@ -81,7 +82,6 @@ class SqlToS3Operator(BaseOperator):
template_ext: Sequence[str] = ('.sql',)
template_fields_renderers = {
"query": "sql",
"pd_csv_kwargs": "json",
"pd_kwargs": "json",
}

Expand All @@ -96,7 +96,7 @@ def __init__(
replace: bool = False,
aws_conn_id: str = 'aws_default',
verify: Optional[Union[bool, str]] = None,
file_format: Literal['csv', 'parquet'] = 'csv',
file_format: Literal['csv', 'json', 'parquet'] = 'csv',
pd_kwargs: Optional[dict] = None,
**kwargs,
) -> None:
Expand All @@ -111,13 +111,12 @@ def __init__(
self.pd_kwargs = pd_kwargs or {}
self.parameters = parameters

if file_format == "csv":
self.file_format = FILE_FORMAT.CSV
if "path_or_buf" in self.pd_kwargs:
raise AirflowException('The argument path_or_buf is not allowed, please remove it')
elif file_format == "parquet":
self.file_format = FILE_FORMAT.PARQUET
else:
if "path_or_buf" in self.pd_kwargs:
raise AirflowException('The argument path_or_buf is not allowed, please remove it')

self.file_format = getattr(FILE_FORMAT, file_format.upper(), None)

if self.file_format is None:
raise AirflowException(f"The argument file_format doesn't support {file_format} value.")

@staticmethod
Expand Down Expand Up @@ -147,11 +146,10 @@ def execute(self, context: 'Context') -> None:

with NamedTemporaryFile(mode=file_options.mode, suffix=file_options.suffix) as tmp_file:

if self.file_format == FILE_FORMAT.CSV:
data_df.to_csv(tmp_file.name, **self.pd_kwargs)
else:
data_df.to_parquet(tmp_file.name, **self.pd_kwargs)
self.log.info("Writing data to temp file")
getattr(data_df, file_options.function)(tmp_file.name, **self.pd_kwargs)

self.log.info("Uploading data to S3")
s3_conn.load_file(
filename=tmp_file.name, key=self.s3_key, bucket_name=self.s3_bucket, replace=self.replace
)
Expand Down
54 changes: 54 additions & 0 deletions tests/providers/amazon/aws/transfers/test_sql_to_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
from unittest import mock

import pandas as pd
import pytest

from airflow.exceptions import AirflowException
from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator


Expand Down Expand Up @@ -102,6 +104,46 @@ def test_execute_parquet(self, mock_s3_hook, temp_mock):
filename=f.name, key=s3_key, bucket_name=s3_bucket, replace=False
)

@mock.patch("airflow.providers.amazon.aws.transfers.sql_to_s3.NamedTemporaryFile")
@mock.patch("airflow.providers.amazon.aws.transfers.sql_to_s3.S3Hook")
def test_execute_json(self, mock_s3_hook, temp_mock):
query = "query"
s3_bucket = "bucket"
s3_key = "key"

mock_dbapi_hook = mock.Mock()
test_df = pd.DataFrame({'a': '1', 'b': '2'}, index=[0, 1])
get_pandas_df_mock = mock_dbapi_hook.return_value.get_pandas_df
get_pandas_df_mock.return_value = test_df
with NamedTemporaryFile() as f:
temp_mock.return_value.__enter__.return_value.name = f.name

op = SqlToS3Operator(
query=query,
s3_bucket=s3_bucket,
s3_key=s3_key,
sql_conn_id="mysql_conn_id",
aws_conn_id="aws_conn_id",
task_id="task_id",
file_format="json",
replace=True,
pd_kwargs={'date_format': "iso", 'lines': True, 'orient': "records"},
dag=None,
)
op._get_hook = mock_dbapi_hook
op.execute(None)
mock_s3_hook.assert_called_once_with(aws_conn_id="aws_conn_id", verify=None)

get_pandas_df_mock.assert_called_once_with(sql=query, parameters=None)

temp_mock.assert_called_once_with(mode='r+', suffix=".json")
mock_s3_hook.return_value.load_file.assert_called_once_with(
filename=f.name,
key=s3_key,
bucket_name=s3_bucket,
replace=True,
)

def test_fix_int_dtypes(self):
op = SqlToS3Operator(
query="query",
Expand All @@ -113,3 +155,15 @@ def test_fix_int_dtypes(self):
dirty_df = pd.DataFrame({"strings": ["a", "b", "c"], "ints": [1, 2, None]})
op._fix_int_dtypes(df=dirty_df)
assert dirty_df["ints"].dtype.kind == "i"

def test_invalid_file_format(self):
with pytest.raises(AirflowException):
SqlToS3Operator(
query="query",
s3_bucket="bucket",
s3_key="key",
sql_conn_id="mysql_conn_id",
task_id="task_id",
file_format="invalid_format",
dag=None,
)