Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Add Eurostat source and Prefect tasks #1041

Merged
merged 49 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
052084c
🚀 Added `Eurostat` connector with tests
Sep 4, 2024
ec0268a
🚀 Adding entry in changelog
Sep 4, 2024
e46e775
azure dependency for prefect alligned
fdelgadodyvenia Sep 22, 2024
c58a02d
🐛 Add missing `filter_df_columns` function to utils
Sep 23, 2024
0d7989d
Merge branch 'eurostat_2.0' of https://github.com/dyvenia/viadot into…
Sep 23, 2024
aa96848
🎨 Refactor docstrings in Eurostat source class
Sep 23, 2024
f0faa60
🎨 Add Tsignal stributes in docstring and change typing
Sep 23, 2024
27d4267
🎨 Change typing in methods
Sep 23, 2024
5a7a8f4
🎨 Change docstrings in Eurostat task
Sep 23, 2024
b4f045c
🎨 Refactor Eurostat flow dosctrings
Sep 23, 2024
6e8c5aa
🎨 Improve typing
Sep 23, 2024
c374bae
🎨 Changed test function name
Sep 23, 2024
7824be9
♻️ Refactor unitests for Eurostat
Sep 23, 2024
78c4aad
Merge branch '2.0' into eurostat_2.0
Rafalz13 Oct 1, 2024
57e25fb
🎨 Changed code structure
Rafalz13 Oct 1, 2024
b1da441
🎨 Changed imports structure
Rafalz13 Oct 1, 2024
2cdbf3f
🎨 Changed tests code structure
Rafalz13 Oct 1, 2024
07aec93
🎨 Changed the code and docstrings structure
Rafalz13 Oct 1, 2024
ad9e19f
🎨 Added `# pragma: allowlist secret`
Rafalz13 Oct 1, 2024
ad0addb
✅ Removed `Path` import and usage
Rafalz13 Oct 1, 2024
0f2996d
✅ Updated test
Rafalz13 Oct 1, 2024
fec8cef
✅ Modified assert
Rafalz13 Oct 1, 2024
1ecf06d
✅ Updated tests
Rafalz13 Oct 1, 2024
d9dc709
Update src/viadot/orchestration/prefect/tasks/eurostat.py
Rafalz13 Oct 2, 2024
5391534
Update src/viadot/orchestration/prefect/tasks/eurostat.py
Rafalz13 Oct 2, 2024
80426e0
Update src/viadot/orchestration/prefect/tasks/eurostat.py
Rafalz13 Oct 2, 2024
401cbf7
Update src/viadot/sources/eurostat.py
Rafalz13 Oct 2, 2024
133a3a0
Update src/viadot/sources/eurostat.py
Rafalz13 Oct 2, 2024
77daacd
Update src/viadot/sources/eurostat.py
Rafalz13 Oct 2, 2024
3094254
Update src/viadot/sources/eurostat.py
Rafalz13 Oct 2, 2024
558b0fd
Update src/viadot/sources/eurostat.py
Rafalz13 Oct 2, 2024
a3ac40f
Update src/viadot/sources/eurostat.py
Rafalz13 Oct 2, 2024
c662f04
🎨 Removed unused parts and updated docstrings
Rafalz13 Oct 2, 2024
c0f6ff2
🎨 Added extra space
Rafalz13 Oct 2, 2024
24914a1
✅ Modified tests
Rafalz13 Oct 2, 2024
a137e28
Merge branch '2.0' into eurostat_2.0
Rafalz13 Oct 2, 2024
c4cc7d0
✅ Updated tests
Rafalz13 Oct 2, 2024
bd20719
Merge branch '2.0' into eurostat_2.0
Rafalz13 Oct 2, 2024
c3b5ba3
Merge branch '2.0' into eurostat_2.0
Rafalz13 Oct 3, 2024
bf84d34
Update src/viadot/sources/eurostat.py
Rafalz13 Oct 4, 2024
8b1cdce
Merge branch '2.0' into eurostat_2.0
Rafalz13 Oct 4, 2024
c3589a1
Merge branch '2.0' into eurostat_2.0
Rafalz13 Oct 4, 2024
cab70b2
✅ Updated tests
Rafalz13 Oct 4, 2024
b35bd93
✅ Updated eurostat tests
Rafalz13 Oct 7, 2024
7fedee6
✅ Removed 1 test
Rafalz13 Oct 8, 2024
e3ce874
✅ Updated tests
Rafalz13 Oct 9, 2024
6cd4c6d
🎨 Added `noqa:`
Rafalz13 Oct 9, 2024
92027c0
♻️ Remove unnecessary code & standardize
trymzet Oct 9, 2024
c852e8f
Merge branch '2.0' into eurostat_2.0
trymzet Oct 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/viadot/orchestration/prefect/flows/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from .duckdb_to_sql_server import duckdb_to_sql_server
from .duckdb_transform import duckdb_transform
from .epicor_to_parquet import epicor_to_parquet
from .eurostat_to_adls import eurostat_to_adls
from .exchange_rates_to_adls import exchange_rates_to_adls
from .exchange_rates_to_databricks import exchange_rates_to_databricks
from .exchange_rates_to_redshift_spectrum import exchange_rates_api_to_redshift_spectrum
Expand Down Expand Up @@ -68,5 +69,6 @@
"supermetrics_to_adls",
"transform",
"transform_and_catalog",
"eurostat_to_adls",
"vid_club_to_adls",
]
80 changes: 80 additions & 0 deletions src/viadot/orchestration/prefect/flows/eurostat_to_adls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""Download data from Eurostat and upload it to Azure Data Lake Storage."""

from prefect import flow

from viadot.orchestration.prefect.tasks import df_to_adls, eurostat_to_df


@flow(
name="extract--eurostat--adls",
description="""Flow for downloading data from the Eurostat platform via
HTTPS REST API (no credentials required) to a CSV or Parquet file.
Then upload it to Azure Data Lake.""",
retries=1,
retry_delay_seconds=60,
)
def eurostat_to_adls(
dataset_code: str,
adls_path: str,
params: dict[str, str] | None = None,
columns: list[str] | None = None,
tests: dict | None = None,
adls_credentials_secret: str | None = None,
overwrite_adls: bool = False,
adls_config_key: str | None = None,
) -> None:
"""Flow for downloading data from Eurostat to Azure Data Lake.

This module provides a prefect flow function to use the Eurostat connector:
- Call the prefect task wrapper to retrieve a DataFrame from the connector.
- Upload the retrieved data to Azure Data Lake Storage.

Args:
dataset_code (str):
The code of the Eurostat dataset to be uploaded.
adls_path (str | None, optional): The destination folder or path
in Azure Data Lake.
params (dict[str, str] | None, optional):
A dictionary with optional URL parameters. Each key is a parameter ID,
and the value is a specific parameter code, e.g.,
`params = {'unit': 'EUR'}` where "unit" is the parameter, and "EUR"
is the code. You can pass one code per parameter. Defaults to None.
columns (list[str] | None, optional):
A list of columns to filter from the DataFrame after downloading.
This acts as a filter to retrieve only the needed columns. Defaults to None.
tests (dict | None, optional):
A dictionary containing test cases for the data, such as:
- `column_size`: dict{column: size}
- `column_unique_values`: list[columns]
- `column_list_to_match`: list[columns]
- `dataset_row_count`: dict{'min': number, 'max': number}
- `column_match_regex`: dict{column: 'regex'}
- `column_sum`: dict{column: {'min': number, 'max': number}}.
Defaults to None.

adls_credentials_secret (str | None, optional):
The Azure Key Vault secret containing Service Principal credentials
(TENANT_ID, CLIENT_ID, CLIENT_SECRET) and ACCOUNT_NAME for Azure Data
Lake access. Defaults to None.
overwrite_adls (bool, optional):
Whether to overwrite files in the lake if they exist. Defaults to False.
adls_config_key (str | None, optional):
The key in the viadot config that holds the credentials for Azure
Data Lake. Defaults to None.

Returns:
None
"""
df = eurostat_to_df(
dataset_code=dataset_code,
params=params,
columns=columns,
tests=tests,
)
df_to_adls(
df=df,
path=adls_path,
credentials_secret=adls_credentials_secret,
config_key=adls_config_key,
overwrite=overwrite_adls,
)
2 changes: 2 additions & 0 deletions src/viadot/orchestration/prefect/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from .dbt import dbt_task
from .duckdb import duckdb_query
from .epicor import epicor_to_df
from .eurostat import eurostat_to_df
from .exchange_rates import exchange_rates_to_df
from .genesys import genesys_to_df
from .git import clone_repo
Expand Down Expand Up @@ -66,6 +67,7 @@
"sharepoint_to_df",
"sql_server_query",
"sql_server_to_df",
"eurostat_to_df",
"vid_club_to_df",
"supermetrics_to_df",
]
51 changes: 51 additions & 0 deletions src/viadot/orchestration/prefect/tasks/eurostat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""Tasks for interacting with Eurostat."""

import pandas as pd
from prefect import task

from viadot.sources import Eurostat


@task(retries=3, retry_delay_seconds=10, timeout_seconds=60 * 60)
def eurostat_to_df(
dataset_code: str,
params: dict[str, str] | None = None,
columns: list[str] | None = None,
tests: dict | None = None,
) -> pd.DataFrame:
"""Task for creating a pandas DataFrame from Eurostat HTTPS REST API.

This function serves as an intermediate wrapper between the prefect flow
and the Eurostat connector:
- Instantiates an Eurostat Cloud API connector.
- Creates and returns a pandas DataFrame with the response from the API.

Args:
dataset_code (str): The code of the Eurostat dataset that you would like
to download.
params (dict[str, str] | None, optional):
A dictionary with optional URL parameters. Each key is a parameter ID,
and the value is a specific parameter code, e.g.,
`params = {'unit': 'EUR'}` where "unit" is the parameter, and "EUR" is
the code. Only one code per parameter is allowed. Defaults to None.
columns (list[str] | None, optional):
A list of column names (as strings) that are required from the dataset.
Filters the DataFrame to only include the specified columns.
Defaults to None.
tests (dict | None, optional):
A dictionary containing test cases for the data, including:
- `column_size`: dict{column: size}
- `column_unique_values`: list[columns]
- `column_list_to_match`: list[columns]
- `dataset_row_count`: dict{'min': number, 'max': number}
- `column_match_regex`: dict{column: 'regex'}
- `column_sum`: dict{column: {'min': number, 'max': number}}.
Defaults to None.

Returns:
pd.DataFrame:
A pandas DataFrame containing the data retrieved from the Eurostat API.
"""
return Eurostat(
dataset_code=dataset_code, params=params, columns=columns, tests=tests
).to_df()
2 changes: 2 additions & 0 deletions src/viadot/sources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from .cloud_for_customers import CloudForCustomers
from .customer_gauge import CustomerGauge
from .epicor import Epicor
from .eurostat import Eurostat
from .exchange_rates import ExchangeRates
from .genesys import Genesys
from .hubspot import Hubspot
Expand Down Expand Up @@ -48,6 +49,7 @@
"Supermetrics",
"Trino",
"UKCarbonIntensity",
"Eurostat",
"VidClub",
]

Expand Down
Loading