Skip to content

Commit

Permalink
Merge pull request #65 from spetlr-org/feature/sqlMinDate
Browse files Browse the repository at this point in the history
Convert min date of timestamp columns in SimpleSqlServerTransformer
  • Loading branch information
LauJohansson authored Jun 27, 2023
2 parents 8be9266 + 0984dfa commit ee03b12
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 7 deletions.
18 changes: 18 additions & 0 deletions src/spetlr/transformers/simple_sql_transformer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from datetime import datetime, timezone

import pyspark.sql.functions as f
from pyspark.sql import DataFrame
from pyspark.sql.types import TimestampType

from spetlr.etl import Transformer
from spetlr.sql.SqlServer import SqlServer
Expand All @@ -12,6 +15,7 @@ def __init__(self, *, table_id: str, server: SqlServer, ignoreCase=False):
self.server = server
self.table_id = table_id
self.ignoreCase = ignoreCase
self.min_time = datetime(1900, 1, 1, 0, 0, 1, tzinfo=timezone.utc)

def process(self, df: DataFrame) -> DataFrame:
# This transformation ensures that the selected columns
Expand All @@ -33,4 +37,18 @@ def process(self, df: DataFrame) -> DataFrame:
]
df = df.select(col_choose)

# The sql server min date is 1753-01-01. If a timestamp column of the
# dataframe has a date value lower than this min date, it will not be
# possible to load the data to the sql server. Here, we check all columns
# of the dataframe that are the timestamp type, and convert values of
# those columns to our defined min date, if values are smaller than this date.
for column in df.columns:
if df.schema[column].dataType == TimestampType():
df = df.withColumn(
column,
f.when(
f.col(column) < f.lit(self.min_time), f.lit(self.min_time)
).otherwise(f.col(column)),
)

return df
24 changes: 17 additions & 7 deletions tests/cluster/sql/test_simple_sql_etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,26 @@ def test01_can_transform(self):
df = self.create_data()

# Test that timestamp before has miliseconds also
date_test = df.select("testcolumn4").collect()[0][0]
date_test_0 = df.select("testcolumn4").collect()[0][0] # row 1
date_test_1 = df.select("testcolumn4").collect()[1][0] # row 2
self.assertEqual(
dt_utc(2021, 1, 1, 14, 45, 22, 32).replace(tzinfo=None), date_test
dt_utc(2021, 1, 1, 14, 45, 22, 32).replace(tzinfo=None), date_test_0
)
self.assertEqual(dt_utc(1, 1, 2, 0, 0, 1, 1).replace(tzinfo=None), date_test_1)

df_out = SimpleSqlServerTransformer(
table_id=self.table_id, server=self.sql_server
).process(df)

# Check that timestamp are truncated down to only seconds
# The 32 miliseconds should be gone.
date_test = df_out.select("testcolumn4").collect()[0][0]
self.assertEqual(dt_utc(2021, 1, 1, 14, 45, 22).replace(tzinfo=None), date_test)
# The 32 and 1 miliseconds from row 1 and 2, respectively should be gone.
# The 0001-01-01 date in row 2 should be converted to 1900-01-01
date_test_0 = df_out.select("testcolumn4").collect()[0][0] # row 1
date_test_1 = df_out.select("testcolumn4").collect()[1][0] # row 2
self.assertEqual(
dt_utc(2021, 1, 1, 14, 45, 22).replace(tzinfo=None), date_test_0
)
self.assertEqual(dt_utc(1900, 1, 1, 0, 0, 1).replace(tzinfo=None), date_test_1)

schema_expected = StructType(
[
Expand Down Expand Up @@ -132,10 +139,13 @@ def create_data(self) -> DataFrame:
)
cols = ["testcolumn", "testcolumn2", "testcolumn3", "testcolumn4"]

insert_data = (123, 1001.322, "Hello", dt_utc(2021, 1, 1, 14, 45, 22, 32))
insert_data = [
(123, 1001.322, "Hello", dt_utc(2021, 1, 1, 14, 45, 22, 32)), # row 1
(456, 2002.123, "Hello_2", dt_utc(1, 1, 2, 0, 0, 1, 1)), # row 2
]

df_new = DataframeCreator.make_partial(
schema=schema, columns=cols, data=[insert_data]
schema=schema, columns=cols, data=insert_data
)

return df_new.orderBy("testcolumn")

0 comments on commit ee03b12

Please sign in to comment.