Skip to content

Commit

Permalink
Merge pull request dyvenia#849 from malgorzatagwinner/promote_task_ch…
Browse files Browse the repository at this point in the history
…ange

🎨 Delete promote_to task from the flow
  • Loading branch information
Rafalz13 committed Feb 21, 2024
2 parents 624bfd0 + 7c76e7a commit d94078f
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 81 deletions.
45 changes: 0 additions & 45 deletions tests/integration/flows/test_adls_to_azure_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,51 +9,6 @@
from viadot.flows.adls_to_azure_sql import check_dtypes_sort, df_to_csv_task


def test_get_promoted_adls_path_csv_file():
adls_path_file = "raw/supermetrics/adls_ga_load_times_fr_test/2021-07-14T13%3A09%3A02.997357%2B00%3A00.csv"
flow = ADLSToAzureSQL(name="test", adls_path=adls_path_file)
promoted_path = flow.get_promoted_path(env="conformed")
assert (
promoted_path
== "conformed/supermetrics/adls_ga_load_times_fr_test/2021-07-14T13%3A09%3A02.997357%2B00%3A00.csv"
)


def test_get_promoted_adls_path_parquet_file():
adls_path_file = "raw/supermetrics/adls_ga_load_times_fr_test/2021-07-14T13%3A09%3A02.997357%2B00%3A00.parquet"
flow = ADLSToAzureSQL(name="test", adls_path=adls_path_file)
promoted_path = flow.get_promoted_path(env="conformed")
assert promoted_path == "conformed/supermetrics/adls_ga_load_times_fr_test.csv"


def test_get_promoted_adls_path_file_starts_with_slash():
adls_path_dir_starts_with_slash = "/raw/supermetrics/adls_ga_load_times_fr_test/"
flow = ADLSToAzureSQL(name="test", adls_path=adls_path_dir_starts_with_slash)
promoted_path = flow.get_promoted_path(env="conformed")
assert promoted_path == "conformed/supermetrics/adls_ga_load_times_fr_test.csv"


def test_get_promoted_adls_path_dir_slash():
adls_path_dir_slash = "raw/supermetrics/adls_ga_load_times_fr_test/"
flow = ADLSToAzureSQL(name="test", adls_path=adls_path_dir_slash)
promoted_path = flow.get_promoted_path(env="conformed")
assert promoted_path == "conformed/supermetrics/adls_ga_load_times_fr_test.csv"


def test_get_promoted_adls_path_dir():
adls_path_dir = "raw/supermetrics/adls_ga_load_times_fr_test"
flow = ADLSToAzureSQL(name="test", adls_path=adls_path_dir)
promoted_path = flow.get_promoted_path(env="conformed")
assert promoted_path == "conformed/supermetrics/adls_ga_load_times_fr_test.csv"


def test_get_promoted_adls_path_dir_starts_with_slash():
adls_path_dir_starts_with_slash = "/raw/supermetrics/adls_ga_load_times_fr_test/"
flow = ADLSToAzureSQL(name="test", adls_path=adls_path_dir_starts_with_slash)
promoted_path = flow.get_promoted_path(env="conformed")
assert promoted_path == "conformed/supermetrics/adls_ga_load_times_fr_test.csv"


def test_df_to_csv_task():
d = {"col1": ["rat", "\tdog"], "col2": ["cat", 4]}
df = pd.DataFrame(data=d)
Expand Down
36 changes: 0 additions & 36 deletions viadot/flows/adls_to_azure_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,6 @@ def __init__(
self.overwrite_adls = overwrite_adls
self.if_empty = if_empty
self.adls_sp_credentials_secret = adls_sp_credentials_secret
self.adls_path_conformed = self.get_promoted_path(env="conformed")
self.adls_path_operations = self.get_promoted_path(env="operations")

# AzureSQLCreateTable
self.table = table
Expand Down Expand Up @@ -257,20 +255,6 @@ def _map_if_exists(if_exists: str) -> str:
def slugify(name):
return name.replace(" ", "_").lower()

def get_promoted_path(self, env: str) -> str:
adls_path_clean = self.adls_path.strip("/")
extension = adls_path_clean.split(".")[-1].strip()
if extension == "parquet":
file_name = adls_path_clean.split("/")[-2] + ".csv"
common_path = "/".join(adls_path_clean.split("/")[1:-2])
else:
file_name = adls_path_clean.split("/")[-1]
common_path = "/".join(adls_path_clean.split("/")[1:-1])

promoted_path = os.path.join(env, common_path, file_name)

return promoted_path

def gen_flow(self) -> Flow:
lake_to_df_task = AzureDataLakeToDF(timeout=self.timeout)
df = lake_to_df_task.bind(
Expand Down Expand Up @@ -327,22 +311,6 @@ def gen_flow(self) -> Flow:
flow=self,
)

promote_to_conformed_task = AzureDataLakeCopy(timeout=self.timeout)
promote_to_conformed_task.bind(
from_path=self.adls_path,
to_path=self.adls_path_conformed,
sp_credentials_secret=self.adls_sp_credentials_secret,
vault_name=self.vault_name,
flow=self,
)
promote_to_operations_task = AzureDataLakeCopy(timeout=self.timeout)
promote_to_operations_task.bind(
from_path=self.adls_path_conformed,
to_path=self.adls_path_operations,
sp_credentials_secret=self.adls_sp_credentials_secret,
vault_name=self.vault_name,
flow=self,
)
create_table_task = AzureSQLCreateTable(timeout=self.timeout)
create_table_task.bind(
schema=self.schema,
Expand Down Expand Up @@ -372,9 +340,5 @@ def gen_flow(self) -> Flow:

df_reorder.set_upstream(lake_to_df_task, flow=self)
df_to_csv.set_upstream(df_reorder, flow=self)
promote_to_conformed_task.set_upstream(df_to_csv, flow=self)
create_table_task.set_upstream(df_to_csv, flow=self)
promote_to_operations_task.set_upstream(
promote_to_conformed_task, flow=self
)
bulk_insert_task.set_upstream(create_table_task, flow=self)

0 comments on commit d94078f

Please sign in to comment.