-
Notifications
You must be signed in to change notification settings - Fork 1
/
tracking_merlion.py
277 lines (220 loc) · 9.15 KB
/
tracking_merlion.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
"""
About
Use MLflow and CrateDB to track the metrics, parameters, and outcomes of an ML
experiment program using Merlion. It uses the `machine_temperature_system_failure.csv`
dataset from the Numenta Anomaly Benchmark data.
- https://github.com/crate/mlflow-cratedb
- https://mlflow.org/docs/latest/tracking.html
Usage
Before running the program, optionally define the `MLFLOW_TRACKING_URI` environment
variable, in order to record events and metrics either directly into the database,
or by submitting them to an MLflow Tracking Server.
# Use CrateDB database directly
export MLFLOW_TRACKING_URI="crate://crate@localhost/?schema=mlflow"
# Use MLflow Tracking Server
export MLFLOW_TRACKING_URI=http://127.0.0.1:5000
Resources
- https://mlflow.org/
- https://github.com/crate/crate
- https://github.com/salesforce/Merlion
- https://github.com/numenta/NAB
"""
import datetime as dt
import os
import mlflow
import numpy as np
import pandas as pd
from crate import client
from merlion.evaluate.anomaly import TSADMetric
from merlion.models.defaults import DefaultDetector, DefaultDetectorConfig
from merlion.utils import TimeSeries
def connect_database():
"""
Connect to CrateDB, and return database connection object.
"""
dburi = os.getenv("CRATEDB_HTTP_URL", "http://crate@localhost:4200")
return client.connect(dburi)
def table_exists(table_name: str) -> bool:
"""
Check if database table exists.
"""
conn = connect_database()
cursor = conn.cursor()
sql = (
f"SELECT table_name FROM information_schema.tables " # noqa: S608
f"WHERE table_name = '{table_name}' AND table_schema = CURRENT_SCHEMA"
)
cursor.execute(sql)
rowcount = cursor.rowcount
cursor.close()
conn.close()
return rowcount > 0
def import_data(data_table_name: str, anomalies_table_name: str):
"""
Download Numenta Anomaly Benchmark data, and load into database.
"""
data = pd.read_csv(
"https://raw.githubusercontent.com/numenta/NAB/master/data/realKnownCause/machine_temperature_system_failure.csv"
)
# Split the data into chunks of 1000 rows each for better insert performance.
chunk_size = 1000
chunks = np.array_split(data, int(len(data) / chunk_size))
# Insert data into CrateDB.
with connect_database() as conn:
cursor = conn.cursor()
# Create the table if it doesn't exist.
cursor.execute(f"CREATE TABLE IF NOT EXISTS {data_table_name} (timestamp TIMESTAMP, temperature DOUBLE)")
# Insert the data in chunks.
for chunk in chunks:
sql = f"INSERT INTO {data_table_name} (timestamp, temperature) VALUES (?, ?)" # noqa: S608
cursor.executemany(sql, list(chunk.itertuples(index=False, name=None)))
# Create the table to hold known and detected anomalies.
cursor.execute(
f"""
CREATE TABLE IF NOT EXISTS {anomalies_table_name} (
"ts_start" TIMESTAMP WITHOUT TIME ZONE,
"ts_end" TIMESTAMP WITHOUT TIME ZONE,
"score" DOUBLE PRECISION,
"experiment_id" BIGINT,
"run_id" TEXT,
"description" TEXT
);
"""
)
known_anomalies = [
("2013-12-15 17:50:00.000000", "2013-12-17 17:00:00.000000"),
("2014-01-27 14:20:00.000000", "2014-01-29 13:30:00.000000"),
("2014-02-07 14:55:00.000000", "2014-02-09 14:05:00.000000"),
]
cursor.executemany(
f"INSERT INTO {anomalies_table_name} (ts_start, ts_end) VALUES (?, ?)", known_anomalies # noqa: S608
)
def refresh_table(table_name: str):
"""
Flush/Synchronize CrateDB write operations.
Refresh the table, to make sure the data is up-to-date.
https://cratedb.com/docs/crate/reference/en/latest/sql/statements/refresh.html
"""
with connect_database() as conn:
cursor = conn.cursor()
cursor.execute(f"REFRESH TABLE {table_name}")
cursor.close()
def read_data(table_name: str) -> pd.DataFrame:
"""
Read data from database into pandas DataFrame.
"""
conn = connect_database()
with conn:
cursor = conn.cursor()
cursor.execute(
f"""SELECT DATE_BIN('5 min'::INTERVAL, "timestamp", 0),
MAX(temperature)
FROM {table_name}
GROUP BY 1
ORDER BY 1 ASC
""" # noqa: S608
)
data = cursor.fetchall()
# Convert database response to pandas DataFrame.
time_series = pd.DataFrame(
[{"timestamp": pd.Timestamp.fromtimestamp(item[0] / 1000), "value": item[1]} for item in data]
)
# Set the timestamp as the index
return time_series.set_index("timestamp")
def read_anomalies():
"""
Read anomalies from database into pandas DataFrame.
"""
conn = connect_database()
with conn:
cursor = conn.cursor()
cursor.execute("SELECT ts_start, ts_end FROM anomalies WHERE experiment_id IS NULL")
data = cursor.fetchall()
return [[pd.to_datetime(start, unit="ms"), pd.to_datetime(end, unit="ms")] for start, end in data]
def save_anomalies(table_name: str, anomalies: pd.DataFrame, mttd: int):
"""
Save anomalies from a pandas DataFrame into the database.
"""
conn = connect_database()
with conn:
cursor = conn.cursor()
rows = []
for index, row in anomalies.iterrows():
rows.append(
(
index,
index + dt.timedelta(seconds=mttd),
row["anom_score"],
row["experiment_id"],
row["run_id"],
None,
)
)
cursor.executemany(
f"""
INSERT INTO {table_name} (ts_start, ts_end, score, experiment_id, run_id, description)
VALUES (?, ?, ?, ?, ?, ?)
""", # noqa: S608
rows,
)
def run_experiment(time_series: pd.DataFrame, anomalies_table_name: str):
"""
Run experiment on DataFrame, using Merlion. Track it using MLflow.
"""
mlflow.set_experiment("numenta-merlion-experiment")
with mlflow.start_run() as current_run:
input_test_data = time_series[time_series.index < pd.to_datetime("2013-12-15")]
train_data = TimeSeries.from_pd(input_test_data)
test_data = TimeSeries.from_pd(time_series[time_series.index >= pd.to_datetime("2013-12-15")])
model = DefaultDetector(DefaultDetectorConfig())
model.train(train_data=train_data)
test_pred = model.get_anomaly_label(time_series=test_data)
# Prepare the test labels
time_frames = read_anomalies()
time_series["test_labels"] = 0
for start, end in time_frames:
time_series.loc[(time_series.index >= start) & (time_series.index <= end), "test_labels"] = 1
test_labels = TimeSeries.from_pd(time_series["test_labels"])
p = TSADMetric.Precision.value(ground_truth=test_labels, predict=test_pred)
r = TSADMetric.Recall.value(ground_truth=test_labels, predict=test_pred)
f1 = TSADMetric.F1.value(ground_truth=test_labels, predict=test_pred)
mttd = TSADMetric.MeanTimeToDetect.value(ground_truth=test_labels, predict=test_pred)
print(f"Precision: {p:.4f}, Recall: {r:.4f}, F1: {f1:.4f}\n" f"Mean Time To Detect: {mttd}") # noqa: T201
mlflow.log_input(mlflow.data.from_pandas(input_test_data), context="training")
mlflow.log_metric("precision", p)
mlflow.log_metric("recall", r)
mlflow.log_metric("f1", f1)
mlflow.log_metric("mttd", mttd.total_seconds())
mlflow.log_param("anomaly_threshold", model.config.threshold.alm_threshold)
mlflow.log_param("min_alm_window", model.config.threshold.min_alm_in_window)
mlflow.log_param("alm_window_minutes", model.config.threshold.alm_window_minutes)
mlflow.log_param("alm_suppress_minutes", model.config.threshold.alm_suppress_minutes)
mlflow.log_param("ensemble_size", model.config.model.combiner.n_models)
# Save the model to MLflow.
model.save("model")
mlflow.log_artifact("model")
anomalies = test_pred.to_pd()
anomalies = anomalies[anomalies["anom_score"] > 0]
anomalies["experiment_id"] = current_run.info.experiment_id
anomalies["run_id"] = current_run.info.run_id
# Save detected anomalies to CrateDB
save_anomalies(anomalies_table_name, anomalies, mttd.total_seconds())
def main():
"""
Provision dataset, and run experiment.
"""
# Table name where the actual data is stored.
data_table = "machine_data"
# Taable name where to store anomalies
anomalies_table = "anomalies"
# Provision data to operate on, only once.
if not table_exists(data_table):
import_data(data_table, anomalies_table)
# Flush/Synchronize CrateDB write operations.
refresh_table(data_table)
# Read data into pandas DataFrame.
data = read_data(data_table)
# Run experiment on data.
run_experiment(data, anomalies_table)
if __name__ == "__main__":
main()