Skip to content

Commit

Permalink
Merge pull request #85 from zqzten/algorithm
Browse files Browse the repository at this point in the history
Algo: Introduce support for pods metric query as workload external query & other tweaks
  • Loading branch information
dayko2019 authored Dec 15, 2023
2 parents 414696c + ad417e9 commit 660a4b3
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 52 deletions.
53 changes: 25 additions & 28 deletions algorithm/kapacity/metric/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,12 @@ def fetch_metrics(addr, namespace, metric, scale_target, start, end):
start=start,
end=end)
elif metric_type == 'Pods':
# TODO: support pods metric type
raise RuntimeError('UnsupportedMetricType')
return fetch_pod_metric_history(addr=addr,
namespace=namespace,
metric=metric,
scale_target=scale_target,
start=start,
end=end)
elif metric_type == 'Object':
return fetch_object_metric_history(addr=addr,
namespace=namespace,
Expand Down Expand Up @@ -71,19 +75,6 @@ def compute_history_range(history_len):
return start, end


def fetch_replicas_metric_history(addr, namespace, metric, scale_target, start, end):
external = metric['external']
metric_identifier = build_metric_identifier(external['metric'])
name, group_kind = get_obj_name_and_group_kind(scale_target)
workload_external = metric_pb.WorkloadExternalQuery(group_kind=group_kind,
namespace=namespace,
name=name,
metric=metric_identifier)
query = metric_pb.Query(type=metric_pb.WORKLOAD_EXTERNAL,
workload_external=workload_external)
return query_metrics(addr=addr, query=query, start=start, end=end)


def fetch_resource_metric_history(addr, namespace, metric, scale_target, start, end):
resource_name = metric['resource']['name']
name, group_kind = get_obj_name_and_group_kind(scale_target)
Expand Down Expand Up @@ -113,6 +104,19 @@ def fetch_container_resource_metric_history(addr, namespace, metric, scale_targe
return query_metrics(addr=addr, query=query, start=start, end=end)


def fetch_pod_metric_history(addr, namespace, metric, scale_target, start, end):
pods = metric['pods']
metric_identifier = build_metric_identifier(pods['metric'])
name, group_kind = get_obj_name_and_group_kind(scale_target)
workload_external = metric_pb.WorkloadExternalQuery(group_kind=group_kind,
namespace=namespace,
name=name,
metric=metric_identifier)
query = metric_pb.Query(type=metric_pb.WORKLOAD_EXTERNAL,
workload_external=workload_external)
return query_metrics(addr=addr, query=query, start=start, end=end)


def fetch_object_metric_history(addr, namespace, metric, start, end):
obj = metric['object']
metric_identifier = build_metric_identifier(obj['metric'])
Expand Down Expand Up @@ -167,19 +171,12 @@ def query_metrics(addr, query, start, end):


def convert_metric_series_to_dataframe(series):
dataframe = None
for item in series:
array = []
for point in item.points:
array.append([point.timestamp, point.value])
df = pd.DataFrame(array, columns=['timestamp', 'value'], dtype=float)
df['timestamp'] = df['timestamp'].map(lambda x: x / 1000).astype('int64')
if dataframe is not None:
# TODO: consider if it's possible to have multiple series
pd.merge(dataframe, df, how='left', on='timestamp')
else:
dataframe = df
return dataframe
df_list = []
for point in series[0].points:
df_list.append([point.timestamp, point.value])
df = pd.DataFrame(df_list, columns=['timestamp', 'value'], dtype=float)
df['timestamp'] = df['timestamp'].map(lambda x: x / 1000).astype('int64')
return df


def time_period_to_minutes(time_period):
Expand Down
23 changes: 9 additions & 14 deletions algorithm/kapacity/portrait/horizontal/predictive/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ class EnvInfo:

class MetricsContext:
workload_identifier = None
resource_name = None
resource_target = 0
resource_history = None
replicas_history = None
Expand Down Expand Up @@ -130,7 +129,7 @@ def predict_replicas(args, metric_ctx, pred_traffics):
pred = estimator.estimate(history,
pred_traffics,
'timestamp',
metric_ctx.resource_name,
'resource',
'replicas',
traffic_col,
metric_ctx.resource_target,
Expand All @@ -155,12 +154,10 @@ def merge_history_dict(history_dict):


def resample_by_freq(old_df, freq, agg_funcs):
df = old_df.copy()
df = df.sort_values(by='timestamp', ascending=True)
df = old_df.sort_values(by='timestamp', ascending=True)
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='s')
df = df.resample(rule=freq, on='timestamp').agg(agg_funcs)
df = df.rename_axis('timestamp').reset_index()
df['timestamp'] = df['timestamp'].astype('int64') // 10 ** 9
df = df.resample(rule=freq, on='timestamp').agg(agg_funcs).reset_index()
df['timestamp'] = df['timestamp'].astype('int64') // 1e9
return df


Expand All @@ -185,18 +182,16 @@ def fetch_metrics_history(args, env, hp_cr):
resource = metric['containerResource']
else:
raise RuntimeError('MetricTypeError')
resource_history = query.fetch_metrics(env.metrics_server_addr, env.namespace, metric, scale_target, start, end)
metric_ctx.resource_name = resource['name']
metric_ctx.resource_target = compute_resource_target(env.namespace, resource, scale_target)
metric_ctx.resource_history = resource_history.rename(columns={'value': resource['name']})
resource_history = query.fetch_metrics(env.metrics_server_addr, env.namespace, metric, scale_target, start, end)
metric_ctx.resource_history = resource_history.rename(columns={'value': 'resource'})
elif i == 1:
if metric_type != 'External':
if metric_type != 'Pods':
raise RuntimeError('MetricTypeError')
replica_history = query.fetch_replicas_metric_history(env.metrics_server_addr, env.namespace, metric,
scale_target, start, end)
replica_history = query.fetch_metrics(env.metrics_server_addr, env.namespace, metric, scale_target, start, end)
metric_ctx.replicas_history = replica_history.rename(columns={'value': 'replicas'})
else:
if metric_type != 'Object' and metric_type != 'External':
if metric_type != 'Pods' and metric_type != 'Object' and metric_type != 'External':
raise RuntimeError('MetricTypeError')
metric_name = metric['name']
traffic_history = query.fetch_metrics(env.metrics_server_addr, env.namespace, metric, scale_target, start, end)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -631,15 +631,15 @@ class EstimationException(Exception):
pass


def estimate(data,
data_pred,
time_col,
resource_col,
replicas_col,
traffic_cols,
resource_target,
time_delta_hours,
test_dataset_size_in_seconds=86400):
def estimate(data: pd.DataFrame,
data_pred: pd.DataFrame,
time_col: str,
resource_col: str,
replicas_col: str,
traffic_cols: list[str],
resource_target: float,
time_delta_hours: int,
test_dataset_size_in_seconds: int = 86400) -> pd.DataFrame:
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s: %(message)s')
logger = logging.getLogger()
Expand Down
2 changes: 1 addition & 1 deletion algorithm/kapacity/timeseries/forecasting/forecaster.py
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ def fit(freq: str,
context_length: int,
learning_rate: float = 1e-3,
epochs: int = 100,
batch_size: int = 1024,
batch_size: int = 32,
num_workers: int = 0,
model_path: str = './',
df: pd.DataFrame = None,
Expand Down

0 comments on commit 660a4b3

Please sign in to comment.