-
Notifications
You must be signed in to change notification settings - Fork 0
/
tasks.py
79 lines (59 loc) · 2.31 KB
/
tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
from celery import Celery
# from pottery import Redlock
from redis import Redis
#from pottery import RedisList
# from joblib import load
# import numpy as np
# # COMENTAR EN COLIBRI
# try:
# print('Attempting colibri import')
# import tflite_runtime.interpreter as tflite
# COLIBRI=True
# print('Succeded, running on colibri platform')
# except Exception as e:
# import tensorflow as tf
# COLIBRI=False
# print('Failed, running on full platform')
# # Load the TFLite model and allocate tensors.
# if COLIBRI is True:
# interpreter = tflite.Interpreter(model_path="model.tflite")
# else:
# interpreter = tf.lite.Interpreter(model_path="model.tflite")
# interpreter.allocate_tensors()
# # Get input and output tensors.
# input_details = interpreter.get_input_details()
# output_details = interpreter.get_output_details()
MEASUREMENTS_TO_GROUP = 3
app = Celery('tasks', broker='redis://localhost')
redis = Redis.from_url('redis://localhost:6379/1')
@app.task
def process_new_measurement(measurements):
global redis, MEASUREMENTS_TO_GROUP
# lock = Redlock(key='process_new_measurement', masters={redis})
# with lock:
measurements_list = [] #RedisList(redis=redis, key='measurements_list')
measurements_list += [measurements]
valid = True
if len(measurements_list) > MEASUREMENTS_TO_GROUP:
# TODO: Check if measurement must be added to queue and if measurement is valid
measurements_list = measurements_list[len(measurements_list)-MEASUREMENTS_TO_GROUP:]
if len(measurements_list) == MEASUREMENTS_TO_GROUP and valid:
process_measurment_list.delay(list(measurements_list))
@app.task
def process_measurment_list(measurements_list):
# Extract values from protobuf report and take average
print(measurements_list)
# @app.task
# def pre_process_measurements(raw_measurements, measurements):
# colibri_scaler = load('scaler.sklearn')
# colibri_scaler.transform(measurements)
# inference_measurements.delay(raw_measurements, measurements)
# @app.task
# def inference_measurements(raw_measurements, measurements):
# global interpreter
# interpreter.set_tensor(input_details[0]['index'], input_data)
# interpreter.invoke()
# output_data = interpreter.get_tensor(output_details[0]['index'])
# # Calculo de mae
# mae = np.mean(np.abs(output_data-input_data[-1]))
# anomaly = mae > anomaly_threshold