Skip to content

Commit

Permalink
feat(refactor-core-api-and-add-systematic-data-generator): feat: refa…
Browse files Browse the repository at this point in the history
…ctor core api and add systematic data generator

- implement new core api to support multiple backends (pandas, polars, modin)
- add synthetic_data_generator for systematic testing across backends
- refactor core modules: core_utils, exceptions, temporal_data_loader, temporal_target_shifter
- add new temporal_core_processing module
- restructure and update test files to align with new api design
- enhance functionality to support both single-step and multi-step operations
- update pyproject.toml to reflect new structure and dependencies
- fix pre-commit issues with MyPy and Ruff
- merged changes from main branch to integrate latest updates and resolve conflicts
  • Loading branch information
philip-ndikum committed Oct 6, 2024
1 parent 803024a commit de3b268
Show file tree
Hide file tree
Showing 16 changed files with 198 additions and 334 deletions.
76 changes: 19 additions & 57 deletions src/temporalscope/core/core_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,15 @@
"""

import os
from typing import Dict, Optional, Union, cast, Callable, Type
from datetime import datetime, timedelta, date
import warnings
from typing import Callable, Dict, Optional, Type, Union, cast

import modin.pandas as mpd
import pandas as pd
import polars as pl
from dotenv import load_dotenv
from temporalscope.core.exceptions import UnsupportedBackendError, MixedFrequencyWarning

from temporalscope.core.exceptions import MixedFrequencyWarning, UnsupportedBackendError

# Load environment variables from the .env file
load_dotenv()
Expand Down Expand Up @@ -191,10 +191,7 @@ def validate_mode(backend: str, mode: str) -> None:


def validate_and_convert_input(
df: SupportedBackendDataFrame,
backend: str,
time_col: Optional[str] = None,
mode: str = MODE_SINGLE_STEP
df: SupportedBackendDataFrame, backend: str, time_col: Optional[str] = None, mode: str = MODE_SINGLE_STEP
) -> SupportedBackendDataFrame:
"""Validates and converts the input DataFrame to the specified backend type, with optional time column casting.
Expand All @@ -203,35 +200,9 @@ def validate_and_convert_input(
:param time_col: Optional; the name of the time column for casting.
:param mode: The processing mode ('single_step' or 'multi_step').
:raises TypeError: If input DataFrame type doesn't match the specified backend or conversion fails.
:raises NotImplementedError: If multi-step mode is requested for unsupported backends or unsupported conversion to Polars.
:raises NotImplementedError: If multi-step mode is requested for unsupported backends.
:return: The DataFrame converted to the specified backend type.
Example
-------
Here's how you would use this function to convert a Pandas DataFrame to Polars:
.. code-block:: python
import pandas as pd
import polars as pl
data = {'col1': [1, 2], 'col2': [3, 4], 'time': pd.date_range(start='1/1/2023', periods=2)}
df = pd.DataFrame(data)
# Convert the DataFrame from Pandas to Polars, with an optional time column for casting
converted_df = validate_and_convert_input(df, 'pl', time_col='time')
print(type(converted_df)) # Output: <class 'polars.DataFrame'>
# If you don't need to cast the time column, just omit the time_col argument
converted_df = validate_and_convert_input(df, 'pl')
print(type(converted_df)) # Output: <class 'polars.DataFrame'>
.. note::
- This function first converts the input DataFrame into the appropriate backend.
- If `time_col` is specified and the backend is Polars, it casts the time column to `pl.Datetime`.
- Pandas to Polars conversion is currently unsupported and raises a `NotImplementedError`. This needs to be implemented later.
"""
# Validate the backend and mode combination
validate_backend(backend)
validate_mode(backend, mode)

Expand All @@ -240,12 +211,11 @@ def validate_and_convert_input(
str, Dict[Type[SupportedBackendDataFrame], Callable[[SupportedBackendDataFrame], SupportedBackendDataFrame]]
] = {
BACKEND_POLARS: {
# Polars to Polars
pl.DataFrame: lambda x: x,
# Pandas to Polars - currently not supported
pd.DataFrame: lambda x: (_ for _ in ()).throw(NotImplementedError("Pandas to Polars conversion is not currently supported.")),
# Modin to Polars
mpd.DataFrame: lambda x: pl.from_pandas(x._to_pandas()),
pd.DataFrame: lambda x: pl.from_pandas(x), # Use polars.from_pandas for conversion
mpd.DataFrame: lambda x: pl.from_pandas(
x._to_pandas() if hasattr(x, "_to_pandas") else x
), # Safely handle the Modin conversion
},
BACKEND_PANDAS: {
pd.DataFrame: lambda x: x, # Pandas to Pandas
Expand All @@ -260,27 +230,20 @@ def validate_and_convert_input(
}

# Step 1: Convert the DataFrame to the desired backend
converted_df = None
for dataframe_type, conversion_func in backend_conversion_map[backend].items():
if isinstance(df, dataframe_type):
converted_df = conversion_func(df)
break

if converted_df is None:
else:
raise TypeError(f"Input DataFrame type {type(df)} does not match the specified backend '{backend}'")

# Step 2: Explicitly cast the time column to pl.Datetime if backend is Polars and the column exists
if backend == BACKEND_POLARS and time_col and time_col in converted_df.columns:
# Force cast time_col to pl.Datetime
converted_df = converted_df.with_columns(pl.col(time_col).cast(pl.Datetime))

# Check the type of the column and assert it is correct
assert isinstance(converted_df[time_col][0], pl.Datetime), f"Expected a timestamp-like time column, but got {type(converted_df[time_col][0])}"

return converted_df



def get_api_keys() -> Dict[str, Optional[str]]:
"""Retrieve API keys from environment variables.
Expand Down Expand Up @@ -332,16 +295,15 @@ def check_nulls(df: SupportedBackendDataFrame, backend: str) -> bool:
elif backend == BACKEND_MODIN:
return bool(cast(mpd.DataFrame, df).isnull().values.any())

# Suppress the warning since this path is unreachable due to `validate_backend`
# mypy: ignore
raise UnsupportedBackendError(f"Unsupported backend: {backend}")


def check_nans(df: SupportedBackendDataFrame, backend: str) -> bool:
"""Check for NaN values in the DataFrame using the specified backend.
:param df: The DataFrame to check for NaN values.
:type df: SupportedBackendDataFrame
:param backend: The backend used for the DataFrame ('pl' for Polars, 'pd' for Pandas, 'mpd' for Modin').
:param backend: The backend used for the DataFrame ('pl', 'pd', 'mpd').
:type backend: str
:return: True if there are NaN values, False otherwise.
:rtype: bool
Expand All @@ -357,8 +319,7 @@ def check_nans(df: SupportedBackendDataFrame, backend: str) -> bool:
elif backend == BACKEND_MODIN:
return bool(cast(mpd.DataFrame, df).isna().values.any())

# Suppress the warning since this path is unreachable due to `validate_backend`
# mypy: ignore
raise UnsupportedBackendError(f"Unsupported backend: {backend}")


def is_timestamp_like(df: SupportedBackendDataFrame, time_col: str) -> bool:
Expand Down Expand Up @@ -393,6 +354,8 @@ def is_timestamp_like(df: SupportedBackendDataFrame, time_col: str) -> bool:
elif isinstance(df, pl.DataFrame):
return time_column.dtype == pl.Datetime

raise UnsupportedBackendError(f"Unsupported DataFrame type: {type(df)}")


def is_numeric(df: SupportedBackendDataFrame, time_col: str) -> bool:
"""Check if the specified column in the DataFrame is numeric.
Expand All @@ -412,15 +375,12 @@ def is_numeric(df: SupportedBackendDataFrame, time_col: str) -> bool:

# Handle empty columns for different backends
if isinstance(df, pl.DataFrame):
# Polars: Check if the DataFrame has zero rows or if the column is empty
if df.height == 0 or time_column.is_empty():
return False
elif isinstance(df, mpd.DataFrame):
# Modin: Check if the column is empty by using length
if len(time_column) == 0:
return False
elif isinstance(df, pd.DataFrame):
# Pandas: Check if the column is empty
if isinstance(time_column, pd.Series) and time_column.empty:
return False

Expand All @@ -430,6 +390,8 @@ def is_numeric(df: SupportedBackendDataFrame, time_col: str) -> bool:
elif isinstance(df, pl.DataFrame):
return time_column.dtype in [pl.Int32, pl.Int64, pl.Float32, pl.Float64]

raise UnsupportedBackendError(f"Unsupported DataFrame type: {type(df)}")


def has_mixed_frequencies(df: SupportedBackendDataFrame, time_col: str, min_non_null_values: int = 3) -> bool:
"""Check if the given time column in the DataFrame contains mixed frequencies.
Expand Down Expand Up @@ -501,10 +463,8 @@ def sort_dataframe(
:raises TypeError: If the DataFrame type does not match the backend.
:raises UnsupportedBackendError: If the backend is unsupported or validation fails.
"""
# Validate backend
validate_backend(backend)

# Select backend-specific sorting logic
if backend == BACKEND_POLARS:
if not isinstance(df, pl.DataFrame):
raise TypeError(f"Expected Polars DataFrame but got {type(df)}")
Expand All @@ -522,6 +482,8 @@ def sort_dataframe(
df.sort_values(by=time_col, ascending=ascending, inplace=True)
return df

raise UnsupportedBackendError(f"Unsupported backend: {backend}")


def check_empty_columns(df: SupportedBackendDataFrame, backend: str) -> bool:
"""Check for empty columns in the DataFrame using the specified backend.
Expand Down
20 changes: 13 additions & 7 deletions src/temporalscope/core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,13 @@
--------------
.. code-block:: python
from temporalscope.core.exceptions import (
TimeColumnError, MixedTypesWarning, MixedTimezonesWarning
)
from temporalscope.core.exceptions import TimeColumnError, MixedTypesWarning, MixedTimezonesWarning
def validate_time_column(df):
if df['time'].dtype == object:
if df["time"].dtype == object:
raise TimeColumnError("Invalid time column data type.")
elif contains_mixed_types(df['time']):
elif contains_mixed_types(df["time"]):
warnings.warn("Mixed numeric and timestamp types.", MixedTypesWarning)
"""
Expand All @@ -64,7 +63,7 @@ class TimeFrameError(Exception):


class TimeColumnError(TimeFrameError):
""" Exception raised for errors related to the `time_col`.
"""Exception raised for errors related to the `time_col`.
This error is raised when the `time_col` in the TimeFrame is either
missing, contains unsupported types (non-numeric or non-timestamp),
Expand All @@ -80,6 +79,7 @@ class TimeColumnError(TimeFrameError):
if not pd.api.types.is_numeric_dtype(df[time_col]) and \
not pd.api.types.is_datetime64_any_dtype(df[time_col]):
raise TimeColumnError("`time_col` must be numeric or timestamp-like.")
"""

pass
Expand Down Expand Up @@ -149,9 +149,15 @@ class UnsupportedBackendError(Exception):
Attributes:
backend (str): The invalid backend that caused the error.
message (str): Explanation of the error.
"""

def __init__(self, backend: str, message: str = "Unsupported backend"):
def __init__(self, backend, message="Unsupported backend"):
"""Initialize the UnsupportedBackendError.
:param backend: The invalid backend (e.g., 'pl', 'pd', 'mpd') that caused the error.
:param message: Optional; a custom error message. Defaults to "Unsupported backend".
"""
self.backend = backend
self.message = f"{message}: {backend}. Supported backends are 'pd', 'mpd', 'pl'."
super().__init__(self.message)
20 changes: 6 additions & 14 deletions src/temporalscope/core/temporal_core_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,9 @@
from temporal_core_processing import convert_to_tensorflow, convert_to_pandas
# Example DataFrame
df = pd.DataFrame({
'time': pd.date_range(start='2023-01-01', periods=100, freq='D'),
'feature_1': range(100),
'target': range(100)
})
df = pd.DataFrame(
{"time": pd.date_range(start="2023-01-01", periods=100, freq="D"), "feature_1": range(100), "target": range(100)}
)
# Convert DataFrame to TensorFlow Dataset
tf_dataset = convert_to_tensorflow(df)
Expand All @@ -55,18 +53,14 @@
df_back = convert_to_pandas(tf_dataset)
"""

from typing import Union
import pandas as pd
import polars as pl
import modin.pandas as mpd
import tensorflow as tf

from temporalscope.core.core_utils import SupportedBackendDataFrame


def convert_to_tensorflow(df: SupportedBackendDataFrame) -> tf.data.Dataset:
"""
Stub: Convert a DataFrame to a TensorFlow Dataset.
"""Stub: Convert a DataFrame to a TensorFlow Dataset.
This function will convert Pandas, Modin, or Polars DataFrames into a TensorFlow Dataset
to enable compatibility with deep learning frameworks like TensorFlow.
Expand All @@ -78,8 +72,7 @@ def convert_to_tensorflow(df: SupportedBackendDataFrame) -> tf.data.Dataset:


def convert_to_pandas(df: SupportedBackendDataFrame) -> pd.DataFrame:
"""
Stub: Convert a DataFrame or TensorFlow Dataset to a Pandas DataFrame.
"""Stub: Convert a DataFrame or TensorFlow Dataset to a Pandas DataFrame.
This function will handle converting Modin, Polars, or TensorFlow Datasets back to Pandas
DataFrames to ensure interoperability across backends and downstream tasks.
Expand All @@ -91,8 +84,7 @@ def convert_to_pandas(df: SupportedBackendDataFrame) -> pd.DataFrame:


def handle_multi_step_conversion(df: pd.DataFrame, sequence_length: int) -> pd.DataFrame:
"""
Stub: Prepare DataFrame for multi-step forecasting.
"""Stub: Prepare DataFrame for multi-step forecasting.
This function will handle the preparation of multi-step targets by expanding the target
column into sequences of the specified length, suitable for sequential models.
Expand Down
Loading

0 comments on commit de3b268

Please sign in to comment.