Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes for large datasets #229

Merged
merged 10 commits into from
Sep 7, 2023
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ jobs:
fail-fast: false
matrix:
os: [ubuntu-latest, macos-latest, windows-latest]
python-version: [3.7, 3.8, 3.9, '3.10']
python-version: [3.8, 3.9, '3.10']
steps:
- name: Clone repo
uses: actions/checkout@v2
Expand Down
43 changes: 33 additions & 10 deletions hierarchicalforecast/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,22 +70,29 @@ def cov2corr(cov, return_std=False):
return corr

# %% ../nbs/utils.ipynb 9
def _to_summing_matrix(S_df: pd.DataFrame):
def _to_summing_matrix(S_df: pd.DataFrame, sparse_s: bool = False):
"""Transforms the DataFrame `df` of hierarchies to a summing matrix S."""
categories = [S_df[col].unique() for col in S_df.columns]
cat_sizes = [len(cats) for cats in categories]
idx_bottom = np.argmax(cat_sizes)
cats_bottom = categories[idx_bottom]
encoder = OneHotEncoder(categories=categories, sparse=False, dtype=np.float32)

try:
encoder = OneHotEncoder(categories=categories, sparse_output=sparse_s, dtype=np.float32)
except TypeError: # sklearn < 1.2
encoder = OneHotEncoder(categories=categories, sparse=sparse_s, dtype=np.float32)

S = encoder.fit_transform(S_df).T
# TODO: This likely still instantiates a dense matrix:
S = pd.DataFrame(S, index=chain(*categories), columns=cats_bottom)
jmoralez marked this conversation as resolved.
Show resolved Hide resolved
tags = dict(zip(S_df.columns, categories))
return S, tags

# %% ../nbs/utils.ipynb 10
def aggregate_before(df: pd.DataFrame,
spec: List[List[str]],
agg_fn: Callable = np.sum):
agg_fn: Callable = np.sum,
sparse_s: bool = False):
"""Utils Aggregation Function.

Aggregates bottom level series contained in the pd.DataFrame `df` according
Expand All @@ -95,6 +102,8 @@ def aggregate_before(df: pd.DataFrame,
`df`: pd.DataFrame with columns `['ds', 'y']` and columns to aggregate.<br>
`spec`: List of levels. Each element of the list contains a list of columns of `df` to aggregate.<br>
`agg_fn`: Function used to aggregate `'y'`.<br>
`sparse_s`: bool=False, whether the returned S should be a sparse DataFrame.<br>


**Returns:**<br>
`Y_df, S, tags`: tuple with hierarchically structured series `Y_df` ($\mathbf{y}_{[a,b]}$),
Expand All @@ -121,7 +130,7 @@ def aggregate_before(df: pd.DataFrame,
Y_df = df_hiers[['unique_id', 'ds', 'y']].set_index('unique_id')

# Aggregations constraints S definition
S, tags = _to_summing_matrix(S_df.loc[bottom_hier, hiers_cols])
S, tags = _to_summing_matrix(S_df.loc[bottom_hier, hiers_cols], sparse_s)
return Y_df, S, tags

# %% ../nbs/utils.ipynb 11
Expand Down Expand Up @@ -176,9 +185,11 @@ def _to_summing_dataframe(
tags = dict(zip(S_df.columns, categories))
tags[bottom_col] = bottom_ids

encoder = OneHotEncoder(
categories=categories, sparse=sparse_s, dtype=np.float32
)
try:
encoder = OneHotEncoder(categories=categories, sparse_output=sparse_s, dtype=np.float32)
except TypeError: # sklearn < 1.2
encoder = OneHotEncoder(categories=categories, sparse=sparse_s, dtype=np.float32)

S = encoder.fit_transform(S_df).T

if sparse_s:
Expand Down Expand Up @@ -255,9 +266,12 @@ def aggregate(

#------------------------------- Aggregation -------------------------------#
n_agg = S_df.shape[0] - S_df.shape[1]
Agg = S_df.values[:n_agg, :]
y_bottom = balanced_df.y.values
if sparse_s:
Agg = S_df.sparse.to_coo().tocsr()[:n_agg, :]
jmoralez marked this conversation as resolved.
Show resolved Hide resolved
else:
Agg = S_df.values[:n_agg, :]

y_bottom = balanced_df.y.values
y_bottom = y_bottom.reshape(len(S_df.columns), len(dates))
y_bottom_mask = np.isnan(y_bottom)
y_agg = Agg @ np.nan_to_num(y_bottom)
Expand All @@ -271,7 +285,16 @@ def aggregate(
unique_id = np.repeat(S_df.index, len(dates)),
ds = np.tile(dates, len(S_df.index)),
y = np.concatenate([y_agg, y_bottom], axis=0)))
Y_df = Y_df.set_index('unique_id').dropna()

Y_df = Y_df.set_index("unique_id").dropna()

if S_df.index.nunique() != Y_df.index.nunique():
print(
"Warning: Some time series were dropped from Y_df. "
"Are you sure your time series don't have missing entries?"
)
jmoralez marked this conversation as resolved.
Show resolved Hide resolved


return Y_df, S_df, tags

# %% ../nbs/utils.ipynb 19
Expand Down
43 changes: 33 additions & 10 deletions nbs/utils.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,20 @@
"outputs": [],
"source": [
"#| exporti\n",
"def _to_summing_matrix(S_df: pd.DataFrame):\n",
"def _to_summing_matrix(S_df: pd.DataFrame, sparse_s: bool = False):\n",
" \"\"\"Transforms the DataFrame `df` of hierarchies to a summing matrix S.\"\"\"\n",
" categories = [S_df[col].unique() for col in S_df.columns]\n",
" cat_sizes = [len(cats) for cats in categories]\n",
" idx_bottom = np.argmax(cat_sizes)\n",
" cats_bottom = categories[idx_bottom]\n",
" encoder = OneHotEncoder(categories=categories, sparse=False, dtype=np.float32)\n",
"\n",
" try:\n",
" encoder = OneHotEncoder(categories=categories, sparse_output=sparse_s, dtype=np.float32)\n",
" except TypeError: # sklearn < 1.2\n",
" encoder = OneHotEncoder(categories=categories, sparse=sparse_s, dtype=np.float32)\n",
"\n",
" S = encoder.fit_transform(S_df).T\n",
" # TODO: This likely still instantiates a dense matrix:\n",
" S = pd.DataFrame(S, index=chain(*categories), columns=cats_bottom)\n",
" tags = dict(zip(S_df.columns, categories))\n",
" return S, tags"
Expand All @@ -182,7 +188,8 @@
"#| exporti\n",
"def aggregate_before(df: pd.DataFrame,\n",
" spec: List[List[str]],\n",
" agg_fn: Callable = np.sum):\n",
" agg_fn: Callable = np.sum,\n",
" sparse_s: bool = False):\n",
" \"\"\"Utils Aggregation Function.\n",
"\n",
" Aggregates bottom level series contained in the pd.DataFrame `df` according \n",
Expand All @@ -192,6 +199,8 @@
" `df`: pd.DataFrame with columns `['ds', 'y']` and columns to aggregate.<br>\n",
" `spec`: List of levels. Each element of the list contains a list of columns of `df` to aggregate.<br>\n",
" `agg_fn`: Function used to aggregate `'y'`.<br>\n",
" `sparse_s`: bool=False, whether the returned S should be a sparse DataFrame.<br>\n",
"\n",
"\n",
" **Returns:**<br>\n",
" `Y_df, S, tags`: tuple with hierarchically structured series `Y_df` ($\\mathbf{y}_{[a,b]}$),\n",
Expand All @@ -218,7 +227,7 @@
" Y_df = df_hiers[['unique_id', 'ds', 'y']].set_index('unique_id')\n",
" \n",
" # Aggregations constraints S definition\n",
" S, tags = _to_summing_matrix(S_df.loc[bottom_hier, hiers_cols])\n",
" S, tags = _to_summing_matrix(S_df.loc[bottom_hier, hiers_cols], sparse_s)\n",
" return Y_df, S, tags"
]
},
Expand Down Expand Up @@ -281,9 +290,11 @@
" tags = dict(zip(S_df.columns, categories))\n",
" tags[bottom_col] = bottom_ids\n",
"\n",
" encoder = OneHotEncoder(\n",
" categories=categories, sparse=sparse_s, dtype=np.float32\n",
" )\n",
" try:\n",
" encoder = OneHotEncoder(categories=categories, sparse_output=sparse_s, dtype=np.float32)\n",
" except TypeError: # sklearn < 1.2\n",
" encoder = OneHotEncoder(categories=categories, sparse=sparse_s, dtype=np.float32)\n",
"\n",
" S = encoder.fit_transform(S_df).T\n",
"\n",
" if sparse_s:\n",
Expand Down Expand Up @@ -367,9 +378,12 @@
"\n",
" #------------------------------- Aggregation -------------------------------#\n",
" n_agg = S_df.shape[0] - S_df.shape[1]\n",
" Agg = S_df.values[:n_agg, :]\n",
" y_bottom = balanced_df.y.values\n",
" if sparse_s:\n",
" Agg = S_df.sparse.to_coo().tocsr()[:n_agg, :]\n",
" else:\n",
" Agg = S_df.values[:n_agg, :]\n",
"\n",
" y_bottom = balanced_df.y.values\n",
" y_bottom = y_bottom.reshape(len(S_df.columns), len(dates))\n",
" y_bottom_mask = np.isnan(y_bottom)\n",
" y_agg = Agg @ np.nan_to_num(y_bottom)\n",
Expand All @@ -383,7 +397,16 @@
" unique_id = np.repeat(S_df.index, len(dates)),\n",
" ds = np.tile(dates, len(S_df.index)),\n",
" y = np.concatenate([y_agg, y_bottom], axis=0)))\n",
" Y_df = Y_df.set_index('unique_id').dropna()\n",
"\n",
" Y_df = Y_df.set_index(\"unique_id\").dropna()\n",
"\n",
" if S_df.index.nunique() != Y_df.index.nunique():\n",
" print(\n",
" \"Warning: Some time series were dropped from Y_df. \"\n",
" \"Are you sure your time series don't have missing entries?\"\n",
" )\n",
"\n",
"\n",
" return Y_df, S_df, tags"
]
},
Expand Down
2 changes: 1 addition & 1 deletion settings.ini
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ author_email = business@nixtla.io
copyright = Nixtla Inc.
branch = main
version = 0.3.0
min_python = 3.7
min_python = 3.8
audience = Developers
language = English
custom_sidebar = True
Expand Down
Loading