Skip to content

Commit

Permalink
Merge pull request #82 from spetlr-org/feature/remove-transformernc
Browse files Browse the repository at this point in the history
Feature/remove transformernc
  • Loading branch information
LauJohansson authored Aug 28, 2023
2 parents c84c437 + a56ea93 commit 2b72880
Show file tree
Hide file tree
Showing 32 changed files with 392 additions and 538 deletions.
19 changes: 9 additions & 10 deletions docs/transformations/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Transformations in spetlr:
- [SelectAndCastColumnsTransformer](#selectandcastcolumnstransformer)
- [ValidFromToTransformer](#validfromtotransformer)
- [DataFrameFilterTransformer](#dataframefiltertransformer)
- [CountryToAlphaCodeTransformerNC](#countrytoalphacodetransformernc)
- [CountryToAlphaCodeTransformer](#countrytoalphacodetransformer)
- [GenerateMd5ColumnTransformer](#generatemd5columntransformer)
## Concatenate data frames

Expand Down Expand Up @@ -432,17 +432,18 @@ transformed_df.display()

```

## CountryToAlphaCodeTransformerNC
## CountryToAlphaCodeTransformer

This is a simple transformer for translating country names to their alpha-2 code equivalent.

Usage example

```python
from spetlr.transformers import CountryToAlphaCodeTransformerNC
from spetlr.transformers import CountryToAlphaCodeTransformer
import pyspark.sql.types as T

from spetlr.spark import Spark

input_schema = T.StructType(
[
T.StructField("countryCol", T.StringType(), True),
Expand All @@ -456,15 +457,13 @@ input_data = [

input_df = Spark.get().createDataFrame(data=input_data, schema=input_schema)

transformed_df = CountryToAlphaCodeTransformerNC(
transformed_df = CountryToAlphaCodeTransformer(
col_name="countryCol",
output_col_name="alphaCodeCol
).process(df_input)


transformed_df.display()

+----------+------------+
|countryCol|alphaCodeCol|
+----------+------------+
| Denmark| DK|
Expand All @@ -480,10 +479,11 @@ This transformer generates a unique column with md5 encoding based on other colu
Usage example

```python
from spetlr.transformers import GenerateMd5ColumnTransformerNC
from spetlr.transformers import GenerateMd5ColumnTransformer
import pyspark.sql.types as T

from spetlr.spark import Spark

input_schema = T.StructType(
[
T.StructField("id", T.IntegerType(), True),
Expand All @@ -498,12 +498,11 @@ input_data = [

input_df = Spark.get().createDataFrame(data=input_data, schema=input_schema)

transformed_df = GenerateMd5ColumnTransformerNC(
transformed_df = GenerateMd5ColumnTransformer(
col_name="md5_col",
col_list=["id", "text"],
).process(input_df)


transformed_df.display()

+-----+-------+----------------------------------+
Expand All @@ -512,4 +511,4 @@ transformed_df.display()
| 1| text1| e86667d75db79395e172c5c343ec2df1|
| 2| Null| c81e728d9d4c2f636f067f89cc14862c|
+-----+-------+-----------------------------------+
```
```
2 changes: 1 addition & 1 deletion src/VERSION.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.1.0
3.0.0
2 changes: 0 additions & 2 deletions src/spetlr/etl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from .loader import Loader
from .orchestrator import Orchestrator
from .transformer import Transformer
from .transformer_nc import TransformerNC
from .types import EtlBase, dataset_group

__all__ = [
Expand All @@ -11,6 +10,5 @@
"Transformer",
"Orchestrator",
"EtlBase",
"TransformerNC",
"dataset_group",
]
46 changes: 0 additions & 46 deletions src/spetlr/etl/transformer_nc.py

This file was deleted.

24 changes: 9 additions & 15 deletions src/spetlr/transformers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,20 @@
from .country_to_alphacode_transformer_nc import ( # noqa: F401
CountryToAlphaCodeTransformerNC,
from .country_to_alphacode_transformer import ( # noqa: F401
CountryToAlphaCodeTransformer,
)
from .drop_oldest_duplicate_transformer import ( # noqa: F401
DropOldestDuplicatesTransformer,
)
from .dropColumnsTransformer_nc import DropColumnsTransformerNC # noqa: F401
from .generate_md5_column_transformer_nc import ( # noqa: F401
GenerateMd5ColumnTransformerNC,
from .dropColumnsTransformer import DropColumnsTransformer # noqa: F401
from .generate_md5_column_transformer import GenerateMd5ColumnTransformer # noqa: F401
from .join_dataframes_transformer import JoinDataframesTransformer # noqa: F401
from .select_and_cast_columns_transformer import ( # noqa: F401
SelectAndCastColumnsTransformer,
)
from .join_dataframes_transformer import JoinDataframesTransformerNC # noqa: F401
from .select_and_cast_columns_transformer_nc import ( # noqa: F401
SelectAndCastColumnsTransformerNC,
)
from .selectColumnsTransformer_nc import SelectColumnsTransformerNC # noqa: F401
from .selectColumnsTransformer import SelectColumnsTransformer # noqa: F401
from .simple_dataframe_filter_transformer import ( # noqa: F401
DataFrameFilterTransformer,
)
from .simple_dataframe_filter_transformer_nc import ( # noqa: F401
DataFrameFilterTransformerNC,
)
from .simple_sql_transformer import SimpleSqlServerTransformer # noqa: F401
from .timezone_transformer_nc import TimeZoneTransformerNC # noqa: F401
from .timezone_transformer import TimeZoneTransformer # noqa: F401
from .union_transformer import UnionTransformer # noqa: F401
from .union_transformer_nc import UnionTransformerNC # noqa: F401
from .validfromto_transformer import ValidFromToTransformer # noqa: F401
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from typing import List, Union
from typing import List

import pycountry
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import DataFrame

from spetlr.etl import TransformerNC
from spetlr.etl import Transformer


def translate_country_to_alpha2(country_name: str) -> str:
Expand All @@ -26,13 +26,14 @@ def translate_country_to_alpha2(country_name: str) -> str:
translateUDF = F.udf(lambda z: translate_country_to_alpha2(z), T.StringType())


class CountryToAlphaCodeTransformerNC(TransformerNC):
class CountryToAlphaCodeTransformer(Transformer):
def __init__(
self,
col_name: str,
output_col_name: str = None,
dataset_input_keys: Union[str, List[str]] = None,
dataset_input_keys: List[str] = None,
dataset_output_key: str = None,
consume_inputs: bool = True,
) -> None:
"""
A simple transformer to translate country names to alpha-2 codes
Expand All @@ -45,6 +46,7 @@ def __init__(
super().__init__(
dataset_input_keys=dataset_input_keys,
dataset_output_key=dataset_output_key,
consume_inputs=consume_inputs,
)
self.col_name = col_name
self.output_col_name = output_col_name or self.col_name
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
from typing import List, Union
from typing import List

from pyspark.sql import DataFrame

from spetlr.etl import TransformerNC
from spetlr.etl import Transformer


class DropColumnsTransformerNC(TransformerNC):
class DropColumnsTransformer(Transformer):
def __init__(
self,
*,
columnList: List[str],
dataset_input_keys: Union[str, List[str]] = None,
dataset_input_keys: List[str] = None,
dataset_output_key: str = None,
consume_inputs: bool = True
):
super().__init__(
dataset_input_keys=dataset_input_keys,
dataset_output_key=dataset_output_key,
consume_inputs=consume_inputs,
)
self.columnList = columnList

Expand Down
16 changes: 14 additions & 2 deletions src/spetlr/transformers/drop_oldest_duplicate_transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,20 @@


class DropOldestDuplicatesTransformer(Transformer):
def __init__(self, *, cols: List[str], orderByColumn: str):
super().__init__()
def __init__(
self,
*,
cols: List[str],
orderByColumn: str,
dataset_input_keys: List[str] = None,
dataset_output_key: str = None,
consume_inputs: bool = True,
):
super().__init__(
dataset_input_keys=dataset_input_keys,
dataset_output_key=dataset_output_key,
consume_inputs=consume_inputs,
)
self.cols = cols
self.orderByColumn = orderByColumn

Expand Down
15 changes: 13 additions & 2 deletions src/spetlr/transformers/fuzzy_select.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,19 @@ class FuzzySelectTransformer(Transformer):
the method find_best_mapping and inspect the returned mapping.
"""

def __init__(self, columns: Iterable[str], match_cutoff=0.6):
super().__init__()
def __init__(
self,
columns: Iterable[str],
match_cutoff=0.6,
dataset_input_keys: List[str] = None,
dataset_output_key: str = None,
consume_inputs: bool = True,
):
super().__init__(
dataset_input_keys=dataset_input_keys,
dataset_output_key=dataset_output_key,
consume_inputs=consume_inputs,
)
self.columns = list(columns)
self.match_cutoff = match_cutoff

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import uuid
from typing import List, Union
from typing import List

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import DataFrame

from spetlr.etl import TransformerNC
from spetlr.etl import Transformer


class GenerateMd5ColumnTransformerNC(TransformerNC):
class GenerateMd5ColumnTransformer(Transformer):
"""
This transformer generates a unique column with md5 encoding based on other columns.
The transformer also handles if a value is NULL, by replacing it with empty string.
Expand All @@ -30,12 +30,14 @@ def __init__(
*,
col_name: str,
col_list: List[str],
dataset_input_keys: Union[str, List[str]] = None,
dataset_input_keys: List[str] = None,
dataset_output_key: str = None,
consume_inputs: bool = True
):
super().__init__(
dataset_input_keys=dataset_input_keys,
dataset_output_key=dataset_output_key,
consume_inputs=consume_inputs,
)
self.col_name = col_name
self.col_list = col_list
Expand Down
9 changes: 6 additions & 3 deletions src/spetlr/transformers/join_dataframes_transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@

from pyspark.sql import DataFrame

from spetlr.etl import TransformerNC
from spetlr.etl import Transformer
from spetlr.etl.types import dataset_group
from spetlr.exceptions import (
ColumnDoesNotExistException,
MoreThanTwoDataFramesException,
)


class JoinDataframesTransformerNC(TransformerNC):
class JoinDataframesTransformer(Transformer):
"""
This transformer joins two DataFrames together.
Expand Down Expand Up @@ -40,6 +40,7 @@ def __init__(
join_type: str = "inner",
dataset_input_keys: List[str] = None,
dataset_output_key: str = None,
consume_inputs: bool = True,
):
if len(dataset_input_keys) > 2:
raise MoreThanTwoDataFramesException(
Expand All @@ -48,7 +49,9 @@ def __init__(
)

super().__init__(
dataset_input_keys=dataset_input_keys, dataset_output_key=dataset_output_key
dataset_input_keys=dataset_input_keys,
dataset_output_key=dataset_output_key,
consume_inputs=consume_inputs,
)
self.first_dataframe_join_key = first_dataframe_join_key
self.second_dataframe_join_key = second_dataframe_join_key
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
from typing import List, Union
from typing import List

from pyspark.sql import DataFrame

from spetlr.etl import TransformerNC
from spetlr.etl import Transformer


class SelectColumnsTransformerNC(TransformerNC):
class SelectColumnsTransformer(Transformer):
def __init__(
self,
*,
columnList: List[str],
dataset_input_keys: Union[str, List[str]] = None,
dataset_input_keys: List[str] = None,
dataset_output_key: str = None,
consume_inputs: bool = True
):
super().__init__(
dataset_input_keys=dataset_input_keys,
dataset_output_key=dataset_output_key,
consume_inputs=consume_inputs,
)
self.columnList = columnList

Expand Down
Loading

0 comments on commit 2b72880

Please sign in to comment.