Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Add Supermetrics source and Prefect tasks #1054

Merged
merged 11 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ dependencies = [
"numpy>=1.23.4, <2.0",
"defusedxml>=0.7.1",
"aiohttp>=3.10.5",
"pytest-mock>=3.14.0",
]
requires-python = ">=3.10"
readme = "README.md"
Expand Down
6 changes: 4 additions & 2 deletions requirements-dev.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
# all-features: false
# with-sources: false
# generate-hashes: false
# universal: false

-e file:.
aiohappyeyeballs==2.4.0
Expand Down Expand Up @@ -433,7 +432,10 @@ pyparsing==3.1.2
# via mike
pytest==8.3.2
# via pytest-asyncio
# via pytest-mock
pytest-asyncio==0.23.8
pytest-mock==3.14.0
# via viadot2
python-dateutil==2.9.0.post0
# via botocore
# via croniter
Expand Down Expand Up @@ -529,7 +531,7 @@ ruamel-yaml==0.18.6
# via prefect
ruamel-yaml-clib==0.2.8
# via ruamel-yaml
ruff==0.6.1
ruff==0.6.7
s3transfer==0.10.2
# via boto3
scipy==1.14.0
Expand Down
13 changes: 12 additions & 1 deletion requirements.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
# all-features: false
# with-sources: false
# generate-hashes: false
# universal: false

-e file:.
aiohappyeyeballs==2.4.0
Expand Down Expand Up @@ -87,6 +86,7 @@ et-xmlfile==1.1.0
exceptiongroup==1.2.2
# via anyio
# via prefect
# via pytest
frozenlist==1.4.1
# via aiohttp
# via aiosignal
Expand Down Expand Up @@ -129,6 +129,8 @@ imagehash==4.3.1
# via viadot2
importlib-resources==6.1.3
# via prefect
iniconfig==2.0.0
# via pytest
itsdangerous==2.2.0
# via prefect
jinja2==3.1.4
Expand Down Expand Up @@ -186,6 +188,7 @@ orjson==3.10.7
# via prefect
packaging==24.1
# via prefect
# via pytest
pandas==2.2.2
# via viadot2
# via visions
Expand All @@ -195,6 +198,8 @@ pendulum==2.1.2
# via prefect
pillow==10.4.0
# via imagehash
pluggy==1.5.0
# via pytest
prefect==2.20.2
# via prefect-github
# via prefect-sqlalchemy
Expand Down Expand Up @@ -226,6 +231,10 @@ pygments==2.18.0
# via rich
pyodbc==5.1.0
# via viadot2
pytest==8.3.3
# via pytest-mock
pytest-mock==3.14.0
# via viadot2
python-dateutil==2.9.0.post0
# via croniter
# via dateparser
Expand Down Expand Up @@ -330,6 +339,8 @@ text-unidecode==1.3
# via python-slugify
toml==0.10.2
# via prefect
tomli==2.0.1
# via pytest
trino==0.328.0
# via viadot2
typer==0.12.4
Expand Down
4 changes: 2 additions & 2 deletions src/viadot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
# but keep WARNING and higher ones in case something actually important happens.

azure_clutter_logger_1 = logging.getLogger(
"azure.core.pipeline.policies.http_logging_policy"
"azure.core.pipeline.policies.http_logging_policy",
)
azure_clutter_logger_1.setLevel(logging.WARNING)


azure_clutter_logger_2 = logging.getLogger(
"azure.identity.aio._internal.get_token_mixin"
"azure.identity.aio._internal.get_token_mixin",
)
azure_clutter_logger_2.setLevel(logging.WARNING)
4 changes: 3 additions & 1 deletion src/viadot/orchestration/prefect/flows/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from .sharepoint_to_s3 import sharepoint_to_s3
from .sql_server_to_minio import sql_server_to_minio
from .sql_server_to_parquet import sql_server_to_parquet
from .supermetrics_to_adls import supermetrics_to_adls
from .transform import transform
from .transform_and_catalog import transform_and_catalog

Expand All @@ -32,9 +33,9 @@
"duckdb_to_sql_server",
"duckdb_transform",
"epicor_to_parquet",
"exchange_rates_api_to_redshift_spectrum",
"exchange_rates_to_adls",
"exchange_rates_to_databricks",
"exchange_rates_api_to_redshift_spectrum",
"genesys_to_adls",
"hubspot_to_adls",
"mindful_to_adls",
Expand All @@ -47,6 +48,7 @@
"sharepoint_to_s3",
"sql_server_to_minio",
"sql_server_to_parquet",
"supermetrics_to_adls",
"transform",
"transform_and_catalog",
]
92 changes: 92 additions & 0 deletions src/viadot/orchestration/prefect/flows/supermetrics_to_adls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
"""Flow for downloading the data from Superpetrics and uploading it to ADLS."""

from typing import Any

from prefect import flow
from prefect.task_runners import ConcurrentTaskRunner

from viadot.orchestration.prefect.tasks import (
df_to_adls,
supermetrics_to_df,
)


@flow(
name="Supermetrics extraction to ADLS",
description="Extract data from Supermetrics and load it into ADLS.",
retries=1,
retry_delay_seconds=60,
task_runner=ConcurrentTaskRunner,
)
def supermetrics_to_adls(
# supermetrics
query_params: dict[str, Any] | None = None,
# ADLS
adls_path: str | None = None,
overwrite: bool = False,
# Auth
supermetrics_credentials_secret: str | None = None,
supermetrics_config_key: str | None = None,
adls_credentials_secret: str | None = None,
adls_config_key: str | None = None,
**kwargs: dict[str, Any] | None,
) -> None:
"""Flow to extract data from the Supermetrics API and save it to ADLS.

This function queries data from the Supermetrics API using the provided query
parameters and saves the resulting DataFrame to Azure Data Lake Storage (ADLS)
as a file.

Args:
----
query_params (dict[str, Any], optional):
A dictionary of query parameters for the Supermetrics API. These parameters
specify the data to retrieve from Supermetrics. If not provided, the default
parameters from the Supermetrics configuration will be used.
adls_path (str, optional):
The destination path in ADLS where the DataFrame will be saved. This should
include the file name and extension (e.g., 'myfolder/myfile.csv'). If not
provided, the function will use a default path from the configuration
or raise an error.
overwrite (bool, optional):
A flag indicating whether to overwrite the existing file in ADLS. If set
to Falseand the file exists, an error will be raised. Default is False.
supermetrics_credentials_secret (str, optional):
The name of the secret in the secret management system containing
the Supermetrics API credentials. If not provided, the function will use
credentials specified in the configuration.
supermetrics_config_key (str, optional):
The key in the viadot configuration holding relevant credentials.
Defaults to None.
adls_credentials_secret (str, optional):
The name of the secret in the secret management system containing
the ADLS credentials. If not provided, the function will use credentials
specified in the configuration.
adls_config_key (str, optional):
The key in the viadot configuration holding relevant credentials.
Defaults to None.
**kwargs (dict[str, Any], optional):
Additional keyword arguments to pass to the `supermetrics_to_df` function
for further customization of the Supermetrics query.

Raises:
------
ValueError:
If `adls_path` is not provided and cannot be determined from
the configuration.

"""
df = supermetrics_to_df(
query_params=query_params,
credentials_secret=supermetrics_credentials_secret,
config_key=supermetrics_config_key,
**kwargs,
)

return df_to_adls(
df=df,
path=adls_path,
credentials_secret=adls_credentials_secret,
config_key=adls_config_key,
overwrite=overwrite,
)
14 changes: 8 additions & 6 deletions src/viadot/orchestration/prefect/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,31 +23,33 @@
sharepoint_to_df,
)
from .sql_server import create_sql_server_table, sql_server_query, sql_server_to_df
from .supermetrics import supermetrics_to_df


__all__ = [
"adls_upload",
"df_to_adls",
"bcp",
"clone_repo",
"cloud_for_customers_to_df",
"df_to_databricks",
"create_sql_server_table",
"dbt_task",
"df_to_adls",
"df_to_databricks",
"df_to_minio",
"df_to_redshift_spectrum",
"duckdb_query",
"epicor_to_df",
"exchange_rates_to_df",
"genesys_to_df",
"clone_repo",
"hubspot_to_df",
"luma_ingest_task",
"mindful_to_df",
"df_to_minio",
"outlook_to_df",
"df_to_redshift_spectrum",
"s3_upload_file",
"sap_rfc_to_df",
"sharepoint_download_file",
"sharepoint_to_df",
"create_sql_server_table",
"sql_server_query",
"sql_server_to_df",
"supermetrics_to_df",
]
62 changes: 62 additions & 0 deletions src/viadot/orchestration/prefect/tasks/supermetrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""Task for connecting to Supermetrics API."""

import pandas as pd
from prefect import task

from viadot.orchestration.prefect.exceptions import MissingSourceCredentialsError
from viadot.orchestration.prefect.utils import get_credentials
from viadot.sources import Supermetrics


@task(retries=3, log_prints=True, retry_delay_seconds=10, timeout_seconds=60 * 60)
def supermetrics_to_df(
query_params: dict,
config_key: str | None = None,
credentials_secret: str | None = None,
) -> pd.DataFrame:
"""Task to retrive data from Supermetrics and returns it as a pandas DataFrame.

This function queries the Supermetrics API using the provided query parameters and
returns the data as a pandas DataFrame. The function supports both
configuration-based and secret-based credentials.

The function is decorated with a Prefect task, allowing it to handle retries,
logging, and timeout behavior.

Args:
----
query_params (dict):
A dictionary containing the parameters for querying the Supermetrics API.
These parameters define what data to retrieve and how the query should
be constructed.
config_key (str, optional): The key in the viadot config holding relevant
credentials. Defaults to None.
credentials_secret (str, optional):
The name of the secret in your secret management system that contains
the Supermetrics API credentials. If `config_key` is not provided,
this secret is used to authenticate with the Supermetrics API.

Returns:
-------
pd.DataFrame:
A pandas DataFrame containing the data retrieved from Supermetrics based
on the provided query parameters.

Raises:
------
MissingSourceCredentialsError:
Raised if neither `credentials_secret` nor `config_key` is provided,
indicating that no valid credentials were supplied to access
the Supermetrics API.

"""
if not (credentials_secret or config_key):
raise MissingSourceCredentialsError

credentials = get_credentials(credentials_secret) if not config_key else None

supermetrics = Supermetrics(
credentials=credentials,
config_key=config_key,
)
return supermetrics.to_df(query_params=query_params)
15 changes: 5 additions & 10 deletions src/viadot/sources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from .outlook import Outlook
from .sharepoint import Sharepoint
from .sql_server import SQLServer
from .supermetrics import Supermetrics, SupermetricsCredentials
from .uk_carbon_intensity import UKCarbonIntensity


Expand All @@ -21,46 +22,40 @@
"Epicor",
"ExchangeRates",
"Genesys",
"Outlook",
"Hubspot",
"Mindful",
"Outlook",
"SQLServer",
"Sharepoint",
"Supermetrics",
"SupermetricsCredentials", # pragma: allowlist-secret
"Trino",
"SQLServer",
"UKCarbonIntensity",
]

if find_spec("adlfs"):
from viadot.sources.azure_data_lake import AzureDataLake # noqa: F401

__all__.extend(["AzureDataLake"])

if find_spec("duckdb"):
from viadot.sources._duckdb import DuckDB # noqa: F401

__all__.extend(["DuckDB"])

if find_spec("redshift_connector"):
from viadot.sources.redshift_spectrum import RedshiftSpectrum # noqa: F401

__all__.extend(["RedshiftSpectrum"])

if find_spec("s3fs"):
from viadot.sources.s3 import S3 # noqa: F401

__all__.extend(["S3"])

if find_spec("s3fs"):
from viadot.sources.minio import MinIO # noqa: F401

__all__.extend(["MinIO"])


if find_spec("pyrfc"):
from viadot.sources.sap_rfc import SAPRFC, SAPRFCV2 # noqa: F401

__all__.extend(["SAPRFC", "SAPRFCV2"])

if find_spec("pyspark"):
from viadot.sources.databricks import Databricks # noqa: F401

Expand Down
Loading