Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Add Eurostat source and Prefect tasks #1041

Merged
merged 49 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
052084c
🚀 Added `Eurostat` connector with tests
Sep 4, 2024
ec0268a
🚀 Adding entry in changelog
Sep 4, 2024
e46e775
azure dependency for prefect alligned
fdelgadodyvenia Sep 22, 2024
c58a02d
🐛 Add missing `filter_df_columns` function to utils
Sep 23, 2024
0d7989d
Merge branch 'eurostat_2.0' of https://github.com/dyvenia/viadot into…
Sep 23, 2024
aa96848
🎨 Refactor docstrings in Eurostat source class
Sep 23, 2024
f0faa60
🎨 Add Tsignal stributes in docstring and change typing
Sep 23, 2024
27d4267
🎨 Change typing in methods
Sep 23, 2024
5a7a8f4
🎨 Change docstrings in Eurostat task
Sep 23, 2024
b4f045c
🎨 Refactor Eurostat flow dosctrings
Sep 23, 2024
6e8c5aa
🎨 Improve typing
Sep 23, 2024
c374bae
🎨 Changed test function name
Sep 23, 2024
7824be9
♻️ Refactor unitests for Eurostat
Sep 23, 2024
78c4aad
Merge branch '2.0' into eurostat_2.0
Rafalz13 Oct 1, 2024
57e25fb
🎨 Changed code structure
Rafalz13 Oct 1, 2024
b1da441
🎨 Changed imports structure
Rafalz13 Oct 1, 2024
2cdbf3f
🎨 Changed tests code structure
Rafalz13 Oct 1, 2024
07aec93
🎨 Changed the code and docstrings structure
Rafalz13 Oct 1, 2024
ad9e19f
🎨 Added `# pragma: allowlist secret`
Rafalz13 Oct 1, 2024
ad0addb
✅ Removed `Path` import and usage
Rafalz13 Oct 1, 2024
0f2996d
✅ Updated test
Rafalz13 Oct 1, 2024
fec8cef
✅ Modified assert
Rafalz13 Oct 1, 2024
1ecf06d
✅ Updated tests
Rafalz13 Oct 1, 2024
d9dc709
Update src/viadot/orchestration/prefect/tasks/eurostat.py
Rafalz13 Oct 2, 2024
5391534
Update src/viadot/orchestration/prefect/tasks/eurostat.py
Rafalz13 Oct 2, 2024
80426e0
Update src/viadot/orchestration/prefect/tasks/eurostat.py
Rafalz13 Oct 2, 2024
401cbf7
Update src/viadot/sources/eurostat.py
Rafalz13 Oct 2, 2024
133a3a0
Update src/viadot/sources/eurostat.py
Rafalz13 Oct 2, 2024
77daacd
Update src/viadot/sources/eurostat.py
Rafalz13 Oct 2, 2024
3094254
Update src/viadot/sources/eurostat.py
Rafalz13 Oct 2, 2024
558b0fd
Update src/viadot/sources/eurostat.py
Rafalz13 Oct 2, 2024
a3ac40f
Update src/viadot/sources/eurostat.py
Rafalz13 Oct 2, 2024
c662f04
🎨 Removed unused parts and updated docstrings
Rafalz13 Oct 2, 2024
c0f6ff2
🎨 Added extra space
Rafalz13 Oct 2, 2024
24914a1
✅ Modified tests
Rafalz13 Oct 2, 2024
a137e28
Merge branch '2.0' into eurostat_2.0
Rafalz13 Oct 2, 2024
c4cc7d0
✅ Updated tests
Rafalz13 Oct 2, 2024
bd20719
Merge branch '2.0' into eurostat_2.0
Rafalz13 Oct 2, 2024
c3b5ba3
Merge branch '2.0' into eurostat_2.0
Rafalz13 Oct 3, 2024
bf84d34
Update src/viadot/sources/eurostat.py
Rafalz13 Oct 4, 2024
8b1cdce
Merge branch '2.0' into eurostat_2.0
Rafalz13 Oct 4, 2024
c3589a1
Merge branch '2.0' into eurostat_2.0
Rafalz13 Oct 4, 2024
cab70b2
✅ Updated tests
Rafalz13 Oct 4, 2024
b35bd93
✅ Updated eurostat tests
Rafalz13 Oct 7, 2024
7fedee6
✅ Removed 1 test
Rafalz13 Oct 8, 2024
e3ce874
✅ Updated tests
Rafalz13 Oct 9, 2024
6cd4c6d
🎨 Added `noqa:`
Rafalz13 Oct 9, 2024
92027c0
♻️ Remove unnecessary code & standardize
trymzet Oct 9, 2024
c852e8f
Merge branch '2.0' into eurostat_2.0
trymzet Oct 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

- Added new version of `Eurostat` connector and test files.
trymzet marked this conversation as resolved.
Show resolved Hide resolved
- Added new version of `Genesys` connector and test files.
- Added new version of `Outlook` connector and test files.
- Added new version of `Hubspot` connector and test files.
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ azure = [
"adlfs==2024.4.1",
"azure-identity>=1.16.0",
"dbt-sqlserver>=1.3, <1.8",
"prefect-azure @ git+https://github.com/Trymzet/prefect-azure@add_keyvault_auth#egg=prefect-azure",
"prefect-azure-dyvenia[key_vault]==0.1.0", # Adds Key Vault support to Prefect Azure; from https://github.com/trymzet/prefect-azure/tree/add_keyvault_auth
trymzet marked this conversation as resolved.
Show resolved Hide resolved
"prefect_github",
]
aws = [
Expand Down
3 changes: 2 additions & 1 deletion src/viadot/orchestration/prefect/flows/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from .sql_server_to_minio import sql_server_to_minio
from .transform import transform
from .transform_and_catalog import transform_and_catalog

from .eurostat_to_adls import eurostat_to_adls

__all__ = [
"cloud_for_customers_to_adls",
Expand All @@ -43,4 +43,5 @@
"sql_server_to_minio",
"transform",
"transform_and_catalog",
"eurostat_to_adls",
]
112 changes: 112 additions & 0 deletions src/viadot/orchestration/prefect/flows/eurostat_to_adls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
"""
'eurostat_to_adls.py'.

Prefect flow for the Eurostat Cloud API connector.

This module provides a prefect flow function to use the Eurostat connector:
- Call to the prefect task wrapper to get a final Data Frame from the connector.
- Upload that data to Azure Data Lake Storage.

Typical usage example:

eurostat_to_adls(
dataset_code: str,
params: dict = None,
columns: list = None,
tests: dict = None,
adls_path: str = None,
adls_credentials_secret: str = None,
overwrite_adls: bool = False,
adls_config_key: str = None,
)

Functions:

eurostat_to_adls(
dataset_code: str,
params: dict = None,
columns: list = None,
tests: dict = None,
adls_path: str = None,
adls_credentials_secret: str = None,
overwrite_adls: bool = False,
adls_config_key: str = None,
):
Flow to download data from Eurostat Cloud API and upload to ADLS.
trymzet marked this conversation as resolved.
Show resolved Hide resolved
"""

from prefect import flow

from viadot.orchestration.prefect.tasks import df_to_adls, eurostat_to_df


@flow(
name="extract--eurostat--adls",
description="""Flow for downloading data from the Eurostat platform via
HTTPS REST API (no credentials required) to a CSV or Parquet file.
Then upload it to Azure Data Lake.""",
retries=1,
retry_delay_seconds=60,
)
def eurostat_to_adls(
dataset_code: str,
params: dict = None,
columns: list = None,
tests: dict = None,
adls_path: str = None,
adls_credentials_secret: str = None,
overwrite_adls: bool = False,
adls_config_key: str = None,
) -> None:
"""
Flow for downloading data from Eurostat to Azure Data Lake.

Args:
name (str): The name of the flow.
dataset_code (str): The code of the Eurostat dataset to be uploaded.
params (Dict[str], optional):
A dictionary with optional URL parameters. The key represents the
parameter ID, while the value is the code for a specific parameter,
for example 'params = {'unit': 'EUR'}' where "unit" is the parameter
to set and "EUR" is the specific parameter code. You can add more
than one parameter, but only one code per parameter! So you CANNOT
provide a list of codes, e.g., 'params = {'unit': ['EUR', 'USD',
'PLN']}'. This parameter is REQUIRED in most cases to pull a specific
dataset from the API. Both the parameter and code must be provided
as a string! Defaults to None.
trymzet marked this conversation as resolved.
Show resolved Hide resolved
columns (List[str], optional): List of needed columns from the DataFrame
- acts as a filter. The data downloaded from Eurostat has the same
structure every time. The filter is applied after the data is
fetched. Defaults to None.
tests:
trymzet marked this conversation as resolved.
Show resolved Hide resolved
- `column_size`: dict{column: size}
- `column_unique_values`: list[columns]
- `column_list_to_match`: list[columns]
- `dataset_row_count`: dict: {'min': number, 'max': number}
- `column_match_regex`: dict: {column: 'regex'}
- `column_sum`: dict: {column: {'min': number, 'max': number}}
adls_dir_path (str, optional): Azure Data Lake destination folder/path.
Defaults to None.
adls_sp_credentials_secret (str, optional): The name of the Azure Key Vault
secret containing a dictionary with ACCOUNT_NAME and Service Principal
credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET) for the Azure Data
Lake. Defaults to None.
overwrite_adls (bool, optional): Whether to overwrite files in the lake.
Defaults to False.
adls_config_key (str, optional): The key in the viadot config holding
relevant credentials. Defaults to None.
"""
df = eurostat_to_df(
dataset_code=dataset_code,
params=params,
columns=columns,
tests=tests,
)
adls = df_to_adls(
df=df,
path=adls_path,
credentials_secret=adls_credentials_secret,
config_key=adls_config_key,
overwrite=overwrite_adls,
)
return adls
3 changes: 2 additions & 1 deletion src/viadot/orchestration/prefect/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
sharepoint_to_df,
)
from .sql_server import create_sql_server_table, sql_server_query, sql_server_to_df

from .eurostat import eurostat_to_df

__all__ = [
"adls_upload",
Expand All @@ -48,4 +48,5 @@
"create_sql_server_table",
"sql_server_query",
"sql_server_to_df",
"eurostat_to_df",
]
82 changes: 82 additions & 0 deletions src/viadot/orchestration/prefect/tasks/eurostat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
"""
'eurostat.py'.

Prefect task wrapper for the Eurostat Cloud API connector.

This module provides an intermediate wrapper between the prefect flow and the connector:
- Generate the Eurostat Cloud API connector.
- Create and return a pandas Data Frame with the response of the API.

Typical usage example:

data_frame = eurostat_to_df(
dataset_code: str,
params: dict = None,
columns: list = None,
tests: dict = None,
)

Functions:

eurostat_to_df(
dataset_code: str,
params: dict = None,
columns: list = None,
tests: dict = None,
):
Task to download data from Eurostat Cloud API.

trymzet marked this conversation as resolved.
Show resolved Hide resolved
"""

from prefect import task

from viadot.sources import Eurostat


@task(retries=3, retry_delay_seconds=10, timeout_seconds=60 * 60)
def eurostat_to_df(
dataset_code: str,
params: dict = None,
columns: list = None,
tests: dict = None,
):
"""
Task for creating pandas data frame from Eurostat HTTPS REST API.

(no credentials required).

Args:
dataset_code (str): The code of eurostat dataset that we would like to upload.
params (Dict[str], optional):
A dictionary with optional URL parameters. The key represents the
parameter id, while the value is the code for a specific parameter,
for example 'params = {'unit': 'EUR'}' where "unit" is the parameter
that you would like to set and "EUR" is the code of the specific
parameter. You can add more than one parameter, but only one code per
parameter! So you CAN NOT provide list of codes as in example
'params = {'unit': ['EUR', 'USD', 'PLN']}' This parameter is REQUIRED
in most cases to pull a specific dataset from the API. Both parameter
and code has to provided as a string! Defaults to None.
base_url (str): The base URL used to access the Eurostat API. This parameter
specifies the root URL for all requests made to the API. It should not be
modified unless the API changes its URL scheme. Defaults to
"https://ec.europa.eu/eurostat/api/dissemination/statistics/1.0/data/"
columns (List[str], optional): list of needed names of columns. Names should
be given as str's into the list. Defaults to None.
tests:
- `column_size`: dict{column: size}
- `column_unique_values`: list[columns]
- `column_list_to_match`: list[columns]
- `dataset_row_count`: dict: {'min': number, 'max', number}
- `column_match_regex`: dict: {column: 'regex'}
- `column_sum`: dict: {column: {'min': number, 'max': number}}

Returns:
pd.DataFrame: Pandas DataFrame.
"""
data_frame = Eurostat(dataset_code=dataset_code,
params=params,
columns=columns,
tests=tests).to_df()

return data_frame
3 changes: 2 additions & 1 deletion src/viadot/sources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from .sql_server import SQLServer
from .trino import Trino
from .uk_carbon_intensity import UKCarbonIntensity

from .eurostat import Eurostat

__all__ = [
"CloudForCustomers",
Expand All @@ -26,6 +26,7 @@
"Trino",
"SQLServer",
"UKCarbonIntensity",
"Eurostat",
]

if find_spec("adlfs"):
Expand Down
Loading
Loading