Skip to content

Commit

Permalink
Merge pull request #172 from spetlr-org/feauture/add-too_many_rows-lo…
Browse files Browse the repository at this point in the history
…gic-to-CachedLoader

Add too_many_rows logic to the CachedLoader class.
  • Loading branch information
RadekBuczkowski authored Jun 4, 2024
2 parents 63cd233 + 3c23eea commit ca225b4
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 4 deletions.
12 changes: 12 additions & 0 deletions docs/etl/specializations/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,18 @@ Therefore, this method needs to return the rows that were actually deleted,
updates the cache table with them, to mark them as having been deleted.
returning `None` here skips the rest of the deleting logic.

### `too_many_rows`

The function is called instead of write_operation and delete_operation
functions if there were too many new or modified rows in the data set.
To specify the too many rows threshold, set the parameter
`do_nothing_if_more_rows_than`.
The function has no parameters and returns nothing.
The logic can be used to implement full-loads that take longer than
24 hours to be executed parallel to daily incremental loads
that must not exceed the threshold. The function can send a warning
that starting the full-load job is required.

## Simple Extrator/Loader

Often the step of extracting from, e.g. a delta handle or an eventhub,
Expand Down
20 changes: 18 additions & 2 deletions src/spetlr/cache/CachedLoader.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ class CachedLoader(Loader):
The cache table needs to exist and have the correct schema matching
the configured parameters, see the CachedLoaderParameters class.
Remember to override self.write_operation and/or self.delete_operation
Remember to override self.write_operation and/or self.delete_operation.
If the parameter "do_nothing_if_more_rows_than" was specified,
you also need to override self.too_many_rows.
Any variable rows need to be added in this function.
"""

Expand Down Expand Up @@ -53,8 +55,13 @@ def validate(self):
# validate overloading
write_not_overloaded = "write_operation" not in self.__class__.__dict__
delete_not_overloaded = "delete_operation" not in self.__class__.__dict__
too_many_rows_not_overloaded = "too_many_rows" not in self.__class__.__dict__
if write_not_overloaded or delete_not_overloaded:
raise AssertionError("write_operation and delete_operation required")
raise AssertionError(
"write_operation and delete_operation methods required"
)
if too_many_rows_not_overloaded and p.do_nothing_if_more_rows_than is not None:
raise AssertionError("too_many_rows method required")

if self.__class__ is CachedLoader:
raise AssertionError("You should inherit from this class")
Expand All @@ -67,6 +74,10 @@ def delete_operation(self, df: DataFrame) -> Optional[DataFrame]:
"""Abstract Method to be overridden in child."""
raise NotImplementedError()

def too_many_rows(self) -> None:
"""Abstract Method to be overridden in child."""
raise NotImplementedError()

def save(self, df: DataFrame) -> None:
in_cols = df.columns

Expand All @@ -82,6 +93,11 @@ def save(self, df: DataFrame) -> None:

result = self._discard_non_new_rows_against_cache(df, cache)

if self.params.do_nothing_if_more_rows_than is not None:
if result.to_be_written.count() > self.params.do_nothing_if_more_rows_than:
self.too_many_rows()
return

# write branch
df_written = self.write_operation(result.to_be_written)
if df_written:
Expand Down
6 changes: 6 additions & 0 deletions src/spetlr/cache/CachedLoaderParameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,18 @@ def __init__(
cache_table_name: str,
key_cols: List[str],
cache_id_cols: List[str] = None,
*,
do_nothing_if_more_rows_than: int = None,
):
"""
Args:
cache_table_name: The table that holds the cache
key_cols: the set of columns that form the primary key for a row
cache_id_cols: These columns, added by the write operation, will be saved
in the cache to identify e.g. the written batch.
do_nothing_if_more_rows_than: if the input data set contains more rows than
the specified number of rows, nothing will be written or deleted.
Instead, the method too_many_rows() will be called.
The table cache_table_name must exist and must have the following schema:
(
Expand All @@ -36,3 +41,4 @@ def __init__(
self.rowHash = "rowHash"
self.loadedTime = "loadedTime"
self.deletedTime = "deletedTime"
self.do_nothing_if_more_rows_than = do_nothing_if_more_rows_than
49 changes: 47 additions & 2 deletions tests/cluster/cache/test_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@

class ChildCacher(CachedLoader):
to_be_written: DataFrame
written: DataFrame
written: DataFrame = None
to_be_deleted: DataFrame
deleted: DataFrame
deleted: DataFrame = None
too_many_rows_was_called: bool = False

def write_operation(self, df: DataFrame):
self.to_be_written = df
Expand All @@ -30,6 +31,10 @@ def delete_operation(self, df: DataFrame) -> DataFrame:
self.deleted = df.filter(df["b"] == 8)
return self.deleted

def too_many_rows(self) -> None:
self.too_many_rows_was_called = True
return


class CachedLoaderTests(unittest.TestCase):
params: CachedLoaderParameters
Expand Down Expand Up @@ -85,6 +90,20 @@ class CachedLoaderTests(unittest.TestCase):
("7", 7, "foo7"), # duplicate row will only be loaded once
]

too_much_data = [
("11", 1, "foo1"), # new
("12", 1, "foo2"), # new
("13", 1, "foo3"), # new
("14", 1, "foo4"), # new
("15", 1, "foo5"), # new
("16", 1, "foo6"), # new
("17", 1, "foo7"), # new
("18", 1, "foo8"), # new
("19", 1, "foo9"), # new
("20", 1, "foo10"), # new
("21", 1, "foo11"), # new
]

@classmethod
def setUpClass(cls) -> None:
tc = Configurator()
Expand Down Expand Up @@ -147,6 +166,7 @@ def setUpClass(cls) -> None:
cache_table_name=tc.table_name("CachedTest"),
key_cols=["a", "b"],
cache_id_cols=["myId"],
do_nothing_if_more_rows_than=10,
)

cls.sut = ChildCacher(cls.params)
Expand Down Expand Up @@ -203,3 +223,28 @@ def test_01_can_perform_cached_write(self):
del_cache = cache.filter(cache[self.sut.params.deletedTime].isNotNull())
(del_id,) = [row.a for row in del_cache.collect()]
self.assertEqual(del_id, "8")
self.assertFalse(self.sut.too_many_rows_was_called)

def test_02_checks_for_too_many_rows(self):
self.sut.written = None
self.sut.deleted = None

cache_dh = DeltaHandle.from_tc("CachedTest")
# prime the cache
df_old_cache = Spark.get().createDataFrame(
self.old_cache, schema=cache_dh.read().schema
)
cache_dh.overwrite(df_old_cache)

# prepare the new data with too many rows
target_dh = DeltaHandle.from_tc("CachedTestTarget")
df_new = Spark.get().createDataFrame(
self.too_much_data, schema=target_dh.read().schema
)

# execute the system under test
self.sut.save(df_new)

self.assertTrue(self.sut.too_many_rows_was_called)
self.assertIsNone(self.sut.written)
self.assertIsNone(self.sut.deleted)

0 comments on commit ca225b4

Please sign in to comment.