Skip to content

Commit

Permalink
added df validation to aselite
Browse files Browse the repository at this point in the history
  • Loading branch information
dominikjedlinski committed Oct 24, 2023
1 parent 1e2d708 commit 8f5348a
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 2 deletions.
62 changes: 61 additions & 1 deletion tests/integration/flows/test_aselite_to_adls.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from prefect.tasks.secrets import PrefectSecret

from viadot.flows.aselite_to_adls import ASELiteToADLS
from viadot.task_utils import df_converts_bytes_to_int, df_to_csv
from viadot.task_utils import df_converts_bytes_to_int, df_to_csv, validate_df
from viadot.tasks import AzureDataLakeUpload
from viadot.tasks.aselite import ASELiteToDF

Expand Down Expand Up @@ -62,3 +62,63 @@ def test_aselite_to_adls():
assert MAIN_DF.shape == (10, 17)

os.remove(TMP_FILE_NAME)


def test_aselite_to_adls_validate_df():
credentials_secret = PrefectSecret("aselite").run()
vault_name = PrefectSecret("AZURE_DEFAULT_KEYVAULT").run()

query_designer = """SELECT TOP 10 [ID]
,[SpracheText]
,[SpracheKat]
,[SpracheMM]
,[KatSprache]
,[KatBasisSprache]
,[CodePage]
,[Font]
,[Neu]
,[Upd]
,[UpdL]
,[LosKZ]
,[AstNr]
,[KomKz]
,[RKZ]
,[ParentLanguageNo]
,[UPD_FIELD]
FROM [UCRMDEV].[dbo].[CRM_00]"""

validate_df_dict = {
"column_size": {"ParentLanguageNo": 1},
"column_unique_values": ["ID"],
"column_list_to_match": [
"SpracheText",
"SpracheMM",
"KatSprache",
"KatBasisSprache",
"CodePage",
],
"dataset_row_count": {"min": 10, "max": 10},
"column_match_regex": {"SpracheText", r"TE_.*"},
}

flow = ASELiteToADLS(
"Test flow",
query=query_designer,
sqldb_credentials_secret=credentials_secret,
vault_name=vault_name,
file_path=TMP_FILE_NAME,
validate_df_dict=validate_df_dict,
to_path="raw/supermetrics/mp/result_df_flow_at_des_m.csv",
run_config=None,
)

result = flow.run()
assert result.is_successful()

MAIN_DF = pd.read_csv(TMP_FILE_NAME, delimiter="\t")

assert isinstance(MAIN_DF, pd.DataFrame) == True

assert MAIN_DF.shape == (10, 17)

os.remove(TMP_FILE_NAME)
16 changes: 15 additions & 1 deletion viadot/flows/aselite_to_adls.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@

from prefect import Flow

from viadot.task_utils import df_clean_column, df_converts_bytes_to_int, df_to_csv
from viadot.task_utils import (
df_clean_column,
df_converts_bytes_to_int,
df_to_csv,
validate_df,
)
from viadot.tasks import AzureDataLakeUpload
from viadot.tasks.aselite import ASELiteToDF

Expand All @@ -19,6 +24,7 @@ def __init__(
to_path: str = None,
if_exists: Literal["replace", "append", "delete"] = "replace",
overwrite: bool = True,
validate_df_dict: dict = None,
convert_bytes: bool = False,
sp_credentials_secret: str = None,
remove_special_characters: bool = None,
Expand All @@ -41,6 +47,7 @@ def __init__(
to_path (str): The path to an ADLS file. Defaults to None.
if_exists (Literal, optional): What to do if the table exists. Defaults to "replace".
overwrite (str, optional): Whether to overwrite the destination file. Defaults to True.
validate_df_dict (Dict[str], optional): A dictionary with optional list of tests to verify the output dataframe. If defined, triggers the `validate_df` task from task_utils. Defaults to None.
sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with
ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET). Defaults to None.
remove_special_characters (str, optional): Call a function that remove special characters like escape symbols. Defaults to None.
Expand All @@ -53,6 +60,7 @@ def __init__(
self.sqldb_credentials_secret = sqldb_credentials_secret
self.vault_name = vault_name
self.overwrite = overwrite
self.validate_df_dict = validate_df_dict

self.file_path = file_path
self.sep = sep
Expand Down Expand Up @@ -83,6 +91,11 @@ def gen_flow(self) -> Flow:
if self.remove_special_characters == True:
df = df_clean_column(df, columns_to_clean=self.columns_to_clean, flow=self)

if self.validate_df_dict:
validation_task = validate_df.bind(
df, tests=self.validate_df_dict, flow=self
)

create_csv = df_to_csv.bind(
df,
path=self.file_path,
Expand All @@ -100,5 +113,6 @@ def gen_flow(self) -> Flow:
flow=self,
)

# validation_task.set_upstream(df, flow=self)
create_csv.set_upstream(df, flow=self)
adls_upload.set_upstream(create_csv, flow=self)

0 comments on commit 8f5348a

Please sign in to comment.