Skip to content

Commit

Permalink
Merge pull request #86 from zqzten/algorithm
Browse files Browse the repository at this point in the history
Algo: time series forecasting tweaks
  • Loading branch information
dayko2019 authored Jan 9, 2024
2 parents 660a4b3 + b9a0be7 commit 7a7e46a
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 35 deletions.
13 changes: 5 additions & 8 deletions algorithm/kapacity/portrait/horizontal/predictive/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,19 @@ def parse_args():
parser.add_argument('--tsf-model-path',
help='dir path containing related files of the time series forecasting model',
required=True, default='/opt/kapacity/timeseries/forcasting/model')
parser.add_argument('--tsf-freq', help='frequency (precision) of the time series forecasting model,'
'should be the same as set for training', required=True)
parser.add_argument('--tsf-dataloader-num-workers', help='number of worker subprocesses to use for data loading'
'of the time series forecasting model',
required=False, default=0)
parser.add_argument('--re-history-len', help='history length of training data for replicas estimation',
parser.add_argument('--re-history-len', help='history length of training data for replicas estimation,'
'should be longer than the context duration of the time series forecasting model',
required=True)
parser.add_argument('--re-time-delta-hours', help='time zone offset for replicas estimation model',
required=True)
parser.add_argument('--re-test-dataset-size-in-seconds',
help='size of test dataset in seconds for replicas estimation model',
required=False, default=86400)
parser.add_argument('--scaling-freq', help='frequency of scaling, the duration should be larger than tsf-freq',
parser.add_argument('--scaling-freq', help='frequency of scaling, the duration should be larger than the frequency'
'of the time series forecasting model',
required=True)
args = parser.parse_args()
return args
Expand All @@ -105,11 +105,8 @@ def predict_traffics(args, metric_ctx):
df = None
for key in metric_ctx.traffics_history_dict:
traffic_history = resample_by_freq(metric_ctx.traffics_history_dict[key], freq, {'value': 'mean'})
traffic_history = traffic_history[len(traffic_history) - context_length:len(traffic_history)]
traffic_history = traffic_history[len(traffic_history)-context_length:len(traffic_history)]
df_traffic = model.predict(df=traffic_history,
freq=args.tsf_freq,
target_col='value',
time_col='timestamp',
series_cols_dict={'workload': metric_ctx.workload_identifier, 'metric': key})
df_traffic.rename(columns={'value': key}, inplace=True)
if df is None:
Expand Down
47 changes: 20 additions & 27 deletions algorithm/kapacity/timeseries/forecasting/forecaster.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ def __init__(self,
self.seq_len = self.config['context_length'] + self.config['prediction_length']
self.config = config
self.features_map = {
'min': ['minute', 'hour', 'dayofweek'],
'1min': ['minute', 'hour', 'dayofweek'],
'10min': ['minute', 'hour', 'dayofweek', 'day'],
'H': ['hour', 'dayofweek', 'day'],
'1H': ['hour', 'dayofweek', 'day'],
'min': ['minute', 'hour', 'dayofweek', 'day'],
'1min': ['minute', 'hour', 'dayofweek', 'day'],
'10min': ['minute', 'hour', 'dayofweek', 'day'],
'D': ['dayofweek', 'day'],
'1D': ['dayofweek', 'day']
}
Expand Down Expand Up @@ -319,15 +319,15 @@ def __init__(self,
"""
super(Estimator, self).__init__()
self.config = config
assert self.config['freq'] in ['H', '1H', 'min', '1min', '10min', 'D',
'1D'], "freq must be in ['H','1H','min','1min','10min','D','1D']"
assert self.config['freq'] in ['min', '1min', '10min', 'H', '1H', 'D', '1D'], \
"freq must be in ['min','1min','10min','H','1H','D','1D']"

self.feat_cardinality_map = {
'min': [60, 24, 8],
'1min': [60, 24, 8],
'10min': [60, 24, 8, 32],
'H': [24, 8, 32],
'1H': [24, 8, 32],
'min': [60, 24, 8, 32],
'1min': [60, 24, 8, 32],
'10min': [60, 24, 8, 32],
'D': [8, 32],
'1D': [8, 32]
}
Expand Down Expand Up @@ -546,33 +546,25 @@ def test(self, test_loader):

def predict(self,
df,
freq,
target_col,
time_col,
series_cols_dict):
query_df, ctx_end_date = self.pre_processing_query(query=df,
time_col=time_col,
target_col=target_col,
series_cols_dict=series_cols_dict,
freq=freq)
series_cols_dict=series_cols_dict)
test_data, test_loader = self.get_data(
flag='test',
df=query_df,
df_path=None)
pred = self.test(test_loader=test_loader)
result = self.post_processing_result(pred=pred,
ctx_end_date=ctx_end_date,
time_col=time_col,
target_col=target_col,
freq=freq)
ctx_end_date=ctx_end_date)
return result

def pre_processing_query(self,
query,
time_col,
target_col,
series_cols_dict,
freq):
series_cols_dict):
time_col = self.config['time_col']
target_col = self.config['target_col']
freq = self.config['freq']

test_df = query.sort_values([time_col]).reset_index(drop=True)
ctx_end_date = test_df[time_col].iat[-1]

Expand All @@ -598,10 +590,11 @@ def pre_processing_query(self,

def post_processing_result(self,
pred,
ctx_end_date,
time_col,
target_col,
freq):
ctx_end_date):
time_col = self.config['time_col']
target_col = self.config['target_col']
freq = self.config['freq']

result_df = pd.DataFrame()
pred_len = self.config['prediction_length']
for i in range(1, pred_len + 1):
Expand Down

0 comments on commit 7a7e46a

Please sign in to comment.