Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes for large datasets #229

Merged
merged 10 commits into from
Sep 7, 2023
45 changes: 32 additions & 13 deletions hierarchicalforecast/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,22 +70,34 @@ def cov2corr(cov, return_std=False):
return corr

# %% ../nbs/utils.ipynb 9
def _to_summing_matrix(S_df: pd.DataFrame):
def _to_summing_matrix(S_df: pd.DataFrame, sparse_s: bool = False):
"""Transforms the DataFrame `df` of hierarchies to a summing matrix S."""
categories = [S_df[col].unique() for col in S_df.columns]
cat_sizes = [len(cats) for cats in categories]
idx_bottom = np.argmax(cat_sizes)
cats_bottom = categories[idx_bottom]
encoder = OneHotEncoder(categories=categories, sparse=False, dtype=np.float32)

try:
encoder = OneHotEncoder(categories=categories, sparse_output=sparse_s, dtype=np.float32)
except TypeError: # sklearn < 1.2
encoder = OneHotEncoder(categories=categories, sparse=sparse_s, dtype=np.float32)

S = encoder.fit_transform(S_df).T
S = pd.DataFrame(S, index=chain(*categories), columns=cats_bottom)

if sparse_s:
df_constructor = pd.DataFrame.sparse.from_spmatrix
else:
df_constructor = pd.DataFrame
S = df_constructor(S, index=chain(*categories), columns=cats_bottom)

tags = dict(zip(S_df.columns, categories))
return S, tags

# %% ../nbs/utils.ipynb 10
def aggregate_before(df: pd.DataFrame,
spec: List[List[str]],
agg_fn: Callable = np.sum):
agg_fn: Callable = np.sum,
sparse_s: bool = False):
"""Utils Aggregation Function.

Aggregates bottom level series contained in the pd.DataFrame `df` according
Expand All @@ -95,6 +107,8 @@ def aggregate_before(df: pd.DataFrame,
`df`: pd.DataFrame with columns `['ds', 'y']` and columns to aggregate.<br>
`spec`: List of levels. Each element of the list contains a list of columns of `df` to aggregate.<br>
`agg_fn`: Function used to aggregate `'y'`.<br>
`sparse_s`: bool=False, whether the returned S should be a sparse DataFrame.<br>


**Returns:**<br>
`Y_df, S, tags`: tuple with hierarchically structured series `Y_df` ($\mathbf{y}_{[a,b]}$),
Expand All @@ -121,7 +135,7 @@ def aggregate_before(df: pd.DataFrame,
Y_df = df_hiers[['unique_id', 'ds', 'y']].set_index('unique_id')

# Aggregations constraints S definition
S, tags = _to_summing_matrix(S_df.loc[bottom_hier, hiers_cols])
S, tags = _to_summing_matrix(S_df.loc[bottom_hier, hiers_cols], sparse_s)
return Y_df, S, tags

# %% ../nbs/utils.ipynb 11
Expand Down Expand Up @@ -176,9 +190,11 @@ def _to_summing_dataframe(
tags = dict(zip(S_df.columns, categories))
tags[bottom_col] = bottom_ids

encoder = OneHotEncoder(
categories=categories, sparse=sparse_s, dtype=np.float32
)
try:
encoder = OneHotEncoder(categories=categories, sparse_output=sparse_s, dtype=np.float32)
except TypeError: # sklearn < 1.2
encoder = OneHotEncoder(categories=categories, sparse=sparse_s, dtype=np.float32)

S = encoder.fit_transform(S_df).T

if sparse_s:
Expand Down Expand Up @@ -255,9 +271,12 @@ def aggregate(

#------------------------------- Aggregation -------------------------------#
n_agg = S_df.shape[0] - S_df.shape[1]
Agg = S_df.values[:n_agg, :]
y_bottom = balanced_df.y.values
if sparse_s:
Agg = S_df.sparse.to_coo().tocsr()[:n_agg, :]
jmoralez marked this conversation as resolved.
Show resolved Hide resolved
else:
Agg = S_df.values[:n_agg, :]

y_bottom = balanced_df.y.values
y_bottom = y_bottom.reshape(len(S_df.columns), len(dates))
y_bottom_mask = np.isnan(y_bottom)
y_agg = Agg @ np.nan_to_num(y_bottom)
Expand All @@ -274,7 +293,7 @@ def aggregate(
Y_df = Y_df.set_index('unique_id').dropna()
return Y_df, S_df, tags

# %% ../nbs/utils.ipynb 19
# %% ../nbs/utils.ipynb 20
class HierarchicalPlot:
""" Hierarchical Plot

Expand Down Expand Up @@ -468,7 +487,7 @@ def plot_hierarchical_predictions_gap(self,
plt.grid()
plt.show()

# %% ../nbs/utils.ipynb 34
# %% ../nbs/utils.ipynb 35
# convert levels to output quantile names
def level_to_outputs(level:Iterable[int]):
""" Converts list of levels into output names matching StatsForecast and NeuralForecast methods.
Expand Down Expand Up @@ -512,7 +531,7 @@ def quantiles_to_outputs(quantiles:Iterable[float]):
output_names.append('-median')
return quantiles, output_names

# %% ../nbs/utils.ipynb 35
# %% ../nbs/utils.ipynb 36
# given input array of sample forecasts and inptut quantiles/levels,
# output a Pandas Dataframe with columns of quantile predictions
def samples_to_quantiles_df(samples:np.ndarray,
Expand Down
78 changes: 68 additions & 10 deletions nbs/utils.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -159,15 +159,26 @@
"outputs": [],
"source": [
"#| exporti\n",
"def _to_summing_matrix(S_df: pd.DataFrame):\n",
"def _to_summing_matrix(S_df: pd.DataFrame, sparse_s: bool = False):\n",
" \"\"\"Transforms the DataFrame `df` of hierarchies to a summing matrix S.\"\"\"\n",
" categories = [S_df[col].unique() for col in S_df.columns]\n",
" cat_sizes = [len(cats) for cats in categories]\n",
" idx_bottom = np.argmax(cat_sizes)\n",
" cats_bottom = categories[idx_bottom]\n",
" encoder = OneHotEncoder(categories=categories, sparse=False, dtype=np.float32)\n",
"\n",
" try:\n",
" encoder = OneHotEncoder(categories=categories, sparse_output=sparse_s, dtype=np.float32)\n",
" except TypeError: # sklearn < 1.2\n",
" encoder = OneHotEncoder(categories=categories, sparse=sparse_s, dtype=np.float32)\n",
"\n",
" S = encoder.fit_transform(S_df).T\n",
" S = pd.DataFrame(S, index=chain(*categories), columns=cats_bottom)\n",
"\n",
" if sparse_s:\n",
" df_constructor = pd.DataFrame.sparse.from_spmatrix\n",
" else:\n",
" df_constructor = pd.DataFrame\n",
" S = df_constructor(S, index=chain(*categories), columns=cats_bottom)\n",
"\n",
" tags = dict(zip(S_df.columns, categories))\n",
" return S, tags"
]
Expand All @@ -182,7 +193,8 @@
"#| exporti\n",
"def aggregate_before(df: pd.DataFrame,\n",
" spec: List[List[str]],\n",
" agg_fn: Callable = np.sum):\n",
" agg_fn: Callable = np.sum,\n",
" sparse_s: bool = False):\n",
" \"\"\"Utils Aggregation Function.\n",
"\n",
" Aggregates bottom level series contained in the pd.DataFrame `df` according \n",
Expand All @@ -192,6 +204,8 @@
" `df`: pd.DataFrame with columns `['ds', 'y']` and columns to aggregate.<br>\n",
" `spec`: List of levels. Each element of the list contains a list of columns of `df` to aggregate.<br>\n",
" `agg_fn`: Function used to aggregate `'y'`.<br>\n",
" `sparse_s`: bool=False, whether the returned S should be a sparse DataFrame.<br>\n",
"\n",
"\n",
" **Returns:**<br>\n",
" `Y_df, S, tags`: tuple with hierarchically structured series `Y_df` ($\\mathbf{y}_{[a,b]}$),\n",
Expand All @@ -218,7 +232,7 @@
" Y_df = df_hiers[['unique_id', 'ds', 'y']].set_index('unique_id')\n",
" \n",
" # Aggregations constraints S definition\n",
" S, tags = _to_summing_matrix(S_df.loc[bottom_hier, hiers_cols])\n",
" S, tags = _to_summing_matrix(S_df.loc[bottom_hier, hiers_cols], sparse_s)\n",
" return Y_df, S, tags"
]
},
Expand Down Expand Up @@ -281,9 +295,11 @@
" tags = dict(zip(S_df.columns, categories))\n",
" tags[bottom_col] = bottom_ids\n",
"\n",
" encoder = OneHotEncoder(\n",
" categories=categories, sparse=sparse_s, dtype=np.float32\n",
" )\n",
" try:\n",
" encoder = OneHotEncoder(categories=categories, sparse_output=sparse_s, dtype=np.float32)\n",
" except TypeError: # sklearn < 1.2\n",
" encoder = OneHotEncoder(categories=categories, sparse=sparse_s, dtype=np.float32)\n",
"\n",
" S = encoder.fit_transform(S_df).T\n",
"\n",
" if sparse_s:\n",
Expand Down Expand Up @@ -367,9 +383,12 @@
"\n",
" #------------------------------- Aggregation -------------------------------#\n",
" n_agg = S_df.shape[0] - S_df.shape[1]\n",
" Agg = S_df.values[:n_agg, :]\n",
" y_bottom = balanced_df.y.values\n",
" if sparse_s:\n",
" Agg = S_df.sparse.to_coo().tocsr()[:n_agg, :]\n",
" else:\n",
" Agg = S_df.values[:n_agg, :]\n",
"\n",
" y_bottom = balanced_df.y.values\n",
" y_bottom = y_bottom.reshape(len(S_df.columns), len(dates))\n",
" y_bottom_mask = np.isnan(y_bottom)\n",
" y_agg = Agg @ np.nan_to_num(y_bottom)\n",
Expand Down Expand Up @@ -558,6 +577,45 @@
"test_eq(Y_df.index, before_Y_df.index)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#| hide\n",
"# Test equality of sparse and non-sparse aggregation\n",
"with CodeTimer('strict non-sparse aggregate'):\n",
" Y_df, S_df, tags = aggregate(df=df, sparse_s=False, spec=hiers_strictly)\n",
"\n",
"with CodeTimer('strict sparse aggregate'):\n",
" Y_df_sparse, S_df_sparse, tags_sparse = aggregate(df=df, sparse_s=True, spec=hiers_strictly)\n",
"\n",
"test_close(Y_df.y.values, Y_df_sparse.y.values)\n",
"test_eq(S_df.values, S_df_sparse.values)\n",
"\n",
"test_eq(S_df.columns, S_df_sparse.columns)\n",
"test_eq(S_df.index, S_df_sparse.index)\n",
"\n",
"test_eq(Y_df.columns, Y_df_sparse.columns)\n",
"test_eq(Y_df.index, Y_df_sparse.index)\n",
"\n",
"with CodeTimer('grouped non-sparse aggregate'):\n",
" Y_df, S_df, tags = aggregate(df=df, sparse_s=False, spec=hiers_grouped)\n",
"\n",
"with CodeTimer('grouped sparse aggregate'):\n",
" Y_df_sparse, S_df_sparse, tags_sparse = aggregate(df=df, sparse_s=True, spec=hiers_grouped)\n",
"\n",
"test_close(Y_df.y.values, Y_df_sparse.y.values)\n",
"test_eq(S_df.values, S_df_sparse.values)\n",
"\n",
"test_eq(S_df.columns, S_df_sparse.columns)\n",
"test_eq(S_df.index, S_df_sparse.index)\n",
"\n",
"test_eq(Y_df.columns, Y_df_sparse.columns)\n",
"test_eq(Y_df.index, Y_df_sparse.index)"
]
},
{
"cell_type": "markdown",
"id": "22febc26-1901-4bef-a181-09ae2f52453b",
Expand Down
Loading