Skip to content

Commit

Permalink
[FIX] Aggregate unbalanced datasets (#190)
Browse files Browse the repository at this point in the history
* fix: aggregate unbalanced datasets

* fix: recover sparse argument

* fix: install dev deps

* fix: change cells order (prevent errors)

* fix: install dev deps circleci

* fix: reconcile with series of different size
  • Loading branch information
AzulGarza authored Jun 6, 2023
1 parent bc630cb commit c107217
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 27 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
conda init bash
source ~/.bashrc
conda activate hierarchicalforecast
pip install ./
pip install ".[dev]"
nbdev_test --do_print --timing --n_workers 1
test-model-performance:
resource_class: large
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ jobs:
cache-env: true

- name: Install pip requirements
run: pip install ./
run: pip install ".[dev]"

- name: Run tests
run: nbdev_test
21 changes: 15 additions & 6 deletions hierarchicalforecast/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ def _prepare_fit(self,

# TODO: Complete y_hat_insample protection
if intervals_method in ['bootstrap', 'permbu']:
if not (set(model_names) <= set(Y_df.columns)):
raise Exception('Check `Y_hat_df`s models are included in `Y_df` columns')
if not (set(model_names) <= set(Y_df.columns)):
raise Exception('Check `Y_hat_df`s models are included in `Y_df` columns')

uids = Y_hat_df.index.unique()

Expand All @@ -168,7 +168,7 @@ def _prepare_fit(self,
# Check Y_hat_df\Y_df series difference
Y_diff = len(Y_df.index.difference(uids))
Y_hat_diff = len(Y_hat_df.index.difference(Y_df.index.unique()))
if Y_diff > 0 or Y_hat_diff > 0:
if Y_diff > 0 or Y_hat_diff > 0:
raise Exception(f'Check `Y_hat_df`, `Y_df` series difference, Y_hat\Y={Y_hat_diff}, Y\Y_hat={Y_diff}')

# Same Y_hat_df/S_df/Y_df's unique_id order to prevent errors
Expand All @@ -185,7 +185,9 @@ def reconcile(self,
intervals_method: str = 'normality',
num_samples: int = -1,
seed: int = 0,
sort_df: bool = True):
sort_df: bool = True,
is_balanced: bool = False,
):
"""Hierarchical Reconciliation Method.
The `reconcile` method is analogous to SKLearn `fit_predict` method, it
Expand All @@ -212,6 +214,7 @@ def reconcile(self,
`num_samples`: int=-1, if positive return that many probabilistic coherent samples.
`seed`: int=0, random seed for numpy generator's replicability.<br>
`sort_df` : bool (default=True), if True, sort `df` by [`unique_id`,`ds`].<br>
`is_balanced`: bool=False, wether `Y_df` is balanced, set it to True to speed things up if `Y_df` is balanced.<br>
**Returns:**<br>
`Y_tilde_df`: pd.DataFrame, with reconciled predictions.
Expand All @@ -233,7 +236,10 @@ def reconcile(self,
tags={key: S_df.index.get_indexer(val) for key, val in tags.items()}
)
if Y_df is not None:
y_insample = Y_df['y'].values.reshape(len(S_df), -1).astype(np.float32)
if is_balanced:
y_insample = Y_df['y'].values.reshape(len(S_df), -1).astype(np.float32)
else:
y_insample = Y_df.pivot(columns='ds', values='y').loc[S_df.index].values.astype(np.float32)
reconciler_args['y_insample'] = y_insample

Y_tilde_df= Y_hat_df.copy()
Expand All @@ -252,7 +258,10 @@ def reconcile(self,
reconciler_args['y_hat'] = y_hat

if (self.insample and has_fitted) or intervals_method in ['bootstrap', 'permbu']:
y_hat_insample = Y_df[model_name].values.reshape(len(S_df), -1).astype(np.float32)
if is_balanced:
y_hat_insample = Y_df[model_name].values.reshape(len(S_df), -1).astype(np.float32)
else:
y_hat_insample = Y_df.pivot(columns='ds', values=model_name).loc[S_df.index].values.astype(np.float32)
reconciler_args['y_hat_insample'] = y_hat_insample

if has_level and (level is not None):
Expand Down
15 changes: 9 additions & 6 deletions hierarchicalforecast/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ def _to_summing_dataframe(df: pd.DataFrame,
max_len_idx = np.argmax([len(hier) for hier in spec])
bottom_comb = spec[max_len_idx]
hiers_cols = []
df = df.copy()
for hier in spec:
if hier == bottom_comb:
hier_col = 'unique_id'
Expand All @@ -275,15 +276,15 @@ def _to_summing_dataframe(df: pd.DataFrame,
encoder = OneHotEncoder(categories=categories,
sparse=False, dtype=np.float32)
S = encoder.fit_transform(S_df).T
S = np.concatenate([S, np.eye(len(bottom_ids))], axis=0)
S = np.concatenate([S, np.eye(len(bottom_ids), dtype=np.float32)], axis=0)
S_df = pd.DataFrame(S, columns=bottom_ids,
index=list(chain(*categories))+bottom_ids)

# Match index ordering of S_df and collapse df to Y_bottom_df
Y_bottom_df = df.copy()
Y_bottom_df = Y_bottom_df.groupby(['unique_id', 'ds'])['y'].sum().reset_index()
Y_bottom_df.unique_id = Y_bottom_df.unique_id.astype('category')
Y_bottom_df.unique_id = Y_bottom_df.unique_id.cat.set_categories(S_df.columns)
Y_bottom_df = Y_bottom_df.groupby(['unique_id', 'ds'])['y'].sum().reset_index()
return Y_bottom_df, S_df, tags

def aggregate(df: pd.DataFrame,
Expand Down Expand Up @@ -322,7 +323,6 @@ def aggregate(df: pd.DataFrame,
balanced_df = balanced_df.merge(Y_bottom_df[['y']],
how='left', left_on=['unique_id', 'ds'],
right_index=True).reset_index()
balanced_df['y'].fillna(0, inplace=True)
Y_bottom_df.reset_index(inplace=True)
else:
dates = Y_bottom_df['ds'].unique()
Expand All @@ -334,19 +334,22 @@ def aggregate(df: pd.DataFrame,
y_bottom = balanced_df.y.values

y_bottom = y_bottom.reshape(len(S_df.columns), len(dates))
y_agg = Agg @ y_bottom
y_bottom_mask = np.isnan(y_bottom)
y_agg = Agg @ np.nan_to_num(y_bottom)
y_agg_mask = Agg @ y_bottom_mask

# Create long format hierarchical dataframe
y_agg = y_agg.flatten()
y_agg[y_agg_mask.flatten() > 1] = np.nan
y_bottom = y_bottom.flatten()
Y_df = pd.DataFrame(dict(
unique_id = np.repeat(S_df.index, len(dates)),
ds = np.tile(dates, len(S_df.index)),
y = np.concatenate([y_agg, y_bottom], axis=0)))
Y_df = Y_df.set_index('unique_id')
Y_df = Y_df.set_index('unique_id').dropna()
return Y_df, S_df, tags

# %% ../nbs/utils.ipynb 21
# %% ../nbs/utils.ipynb 22
class HierarchicalPlot:
""" Hierarchical Plot
Expand Down
117 changes: 111 additions & 6 deletions nbs/core.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,8 @@
" \n",
" # TODO: Complete y_hat_insample protection\n",
" if intervals_method in ['bootstrap', 'permbu']:\n",
" if not (set(model_names) <= set(Y_df.columns)):\n",
" raise Exception('Check `Y_hat_df`s models are included in `Y_df` columns')\n",
" if not (set(model_names) <= set(Y_df.columns)):\n",
" raise Exception('Check `Y_hat_df`s models are included in `Y_df` columns')\n",
"\n",
" uids = Y_hat_df.index.unique()\n",
"\n",
Expand All @@ -268,7 +268,7 @@
" # Check Y_hat_df\\Y_df series difference\n",
" Y_diff = len(Y_df.index.difference(uids))\n",
" Y_hat_diff = len(Y_hat_df.index.difference(Y_df.index.unique()))\n",
" if Y_diff > 0 or Y_hat_diff > 0:\n",
" if Y_diff > 0 or Y_hat_diff > 0:\n",
" raise Exception(f'Check `Y_hat_df`, `Y_df` series difference, Y_hat\\Y={Y_hat_diff}, Y\\Y_hat={Y_diff}')\n",
"\n",
" # Same Y_hat_df/S_df/Y_df's unique_id order to prevent errors\n",
Expand All @@ -285,7 +285,9 @@
" intervals_method: str = 'normality',\n",
" num_samples: int = -1,\n",
" seed: int = 0,\n",
" sort_df: bool = True):\n",
" sort_df: bool = True,\n",
" is_balanced: bool = False,\n",
" ):\n",
" \"\"\"Hierarchical Reconciliation Method.\n",
"\n",
" The `reconcile` method is analogous to SKLearn `fit_predict` method, it \n",
Expand All @@ -312,6 +314,7 @@
" `num_samples`: int=-1, if positive return that many probabilistic coherent samples.\n",
" `seed`: int=0, random seed for numpy generator's replicability.<br>\n",
" `sort_df` : bool (default=True), if True, sort `df` by [`unique_id`,`ds`].<br>\n",
" `is_balanced`: bool=False, wether `Y_df` is balanced, set it to True to speed things up if `Y_df` is balanced.<br>\n",
"\n",
" **Returns:**<br>\n",
" `Y_tilde_df`: pd.DataFrame, with reconciled predictions.\n",
Expand All @@ -333,7 +336,10 @@
" tags={key: S_df.index.get_indexer(val) for key, val in tags.items()}\n",
" )\n",
" if Y_df is not None:\n",
" y_insample = Y_df['y'].values.reshape(len(S_df), -1).astype(np.float32)\n",
" if is_balanced:\n",
" y_insample = Y_df['y'].values.reshape(len(S_df), -1).astype(np.float32)\n",
" else:\n",
" y_insample = Y_df.pivot(columns='ds', values='y').loc[S_df.index].values.astype(np.float32)\n",
" reconciler_args['y_insample'] = y_insample\n",
"\n",
" Y_tilde_df= Y_hat_df.copy()\n",
Expand All @@ -352,7 +358,10 @@
" reconciler_args['y_hat'] = y_hat\n",
"\n",
" if (self.insample and has_fitted) or intervals_method in ['bootstrap', 'permbu']:\n",
" y_hat_insample = Y_df[model_name].values.reshape(len(S_df), -1).astype(np.float32)\n",
" if is_balanced:\n",
" y_hat_insample = Y_df[model_name].values.reshape(len(S_df), -1).astype(np.float32)\n",
" else:\n",
" y_hat_insample = Y_df.pivot(columns='ds', values=model_name).loc[S_df.index].values.astype(np.float32)\n",
" reconciler_args['y_hat_insample'] = y_hat_insample\n",
"\n",
" if has_level and (level is not None):\n",
Expand Down Expand Up @@ -812,6 +821,102 @@
" test_close(reconciled['y'], reconciled[model], eps)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#| hide\n",
"# test is_balanced behaviour\n",
"reconciled_balanced = hrec.reconcile(\n",
" Y_hat_df=hier_strict_df_h, \n",
" Y_df=hier_strict_df, \n",
" S=S_strict, \n",
" tags=tags_strict,\n",
" is_balanced=True,\n",
")\n",
"test_eq(reconciled, reconciled_balanced)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#| hide\n",
"from statsforecast import StatsForecast\n",
"from statsforecast.utils import generate_series\n",
"from statsforecast.models import RandomWalkWithDrift"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#| hide\n",
"# test unbalanced dataset\n",
"max_tenure = 24\n",
"dates = pd.date_range(start='2019-01-31', freq='M', periods=max_tenure)\n",
"cohort_tenure = [24, 23, 22, 21]\n",
"\n",
"ts_list = []\n",
"\n",
"# Create ts for each cohort\n",
"for i in range(len(cohort_tenure)):\n",
" ts_list.append(\n",
" generate_series(n_series=1, freq='M', min_length=cohort_tenure[i], max_length=cohort_tenure[i]).reset_index() \\\n",
" .assign(ult=i) \\\n",
" .assign(ds=dates[-cohort_tenure[i]:]) \\\n",
" .drop(columns=['unique_id'])\n",
" )\n",
"df = pd.concat(ts_list, ignore_index=True)\n",
"\n",
"# Create categories\n",
"df.loc[df['ult'] < 2, 'pen'] = 'a'\n",
"df.loc[df['ult'] >= 2, 'pen'] = 'b'\n",
"# Note that unique id requires strings\n",
"df['ult'] = df['ult'].astype(str)\n",
"\n",
"hier_levels = [\n",
" ['pen'],\n",
" ['pen', 'ult'],\n",
"]\n",
"hier_df, S_df, tags = aggregate(df=df, spec=hier_levels)\n",
"\n",
"train_df = hier_df.query(\"ds <= @pd.to_datetime('2019-12-31')\")\n",
"test_df = hier_df.query(\"ds > @pd.to_datetime('2019-12-31')\")\n",
"\n",
"fcst = StatsForecast(\n",
" models=[\n",
" RandomWalkWithDrift(),\n",
" ],\n",
" freq='M',\n",
" n_jobs=1,\n",
")\n",
"\n",
"hrec = HierarchicalReconciliation(\n",
" reconcilers=[\n",
" BottomUp(),\n",
" MinTrace(method='mint_shrink'),\n",
" ]\n",
")\n",
"\n",
"fcst.fit(df=train_df)\n",
"fcst_df = fcst.forecast(h=12, fitted=True)\n",
"fitted_df = fcst.forecast_fitted_values()\n",
"\n",
"fcst_df = hrec.reconcile(\n",
" Y_hat_df=fcst_df,\n",
" Y_df=fitted_df,\n",
" S=S_df,\n",
" tags=tags,\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand Down
Loading

0 comments on commit c107217

Please sign in to comment.