Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions custom-recipes/pi-system-interpolate/recipe.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
{
"meta": {
"label": "Interpolate",
"description": "Interpolate the transposes PI values",
"icon": "icon-dku-timeseries-resample icon-DKU_timeseries_resample"
},
"kind": "PYTHON",
"selectableFromDataset": "input_dataset",
"inputRoles": [
{
"name": "input_dataset",
"label": "Dataset containing transposed values",
"description": "",
"arity": "UNARY",
"required": true,
"acceptsDataset": true
}
],

"outputRoles": [
{
"name": "api_output",
"label": "Main output displayed name",
"description": "",
"arity": "UNARY",
"required": true,
"acceptsDataset": true
}
],
"params": [
{
"type": "SEPARATOR",
"name": "separator_input",
"label": "Input parameters"
},
{
"name": "datetime_column",
"label": "Time column",
"type": "COLUMN",
"columnRole": "input_dataset",
"allowedColumnTypes": [
"date"
],
"mandatory": true
},
{
"name": "show_advanced_parameters",
"label": "Show advanced parameters",
"description": "",
"type": "BOOLEAN",
"defaultValue": false
}
],
"resourceKeys": []
}
93 changes: 93 additions & 0 deletions custom-recipes/pi-system-interpolate/recipe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# -*- coding: utf-8 -*-
import dataiku
from dataiku.customrecipe import get_input_names_for_role, get_recipe_config, get_output_names_for_role
import pandas
from safe_logger import SafeLogger
from osisoft_constants import OSIsoftConstants
from osisoft_plugin_common import reorder_dataframe, iso_to_epoch, get_datetime_from_row


logger = SafeLogger("pi-system plugin", forbiden_keys=["token", "password"])

logger.info("PIWebAPI Interpolate recipe v{}".format(
OSIsoftConstants.PLUGIN_VERSION
))
current_timestamps_cache = []
current_values_cache = []
next_timestamps_cache = []
next_values_cache = []


input_dataset = get_input_names_for_role('input_dataset')
config = get_recipe_config()
dku_flow_variables = dataiku.get_flow_variables()

output_names_stats = get_output_names_for_role('api_output')
output_dataset = dataiku.Dataset(output_names_stats[0])

logger.info("Initialization with config={}".format(logger.filter_secrets(config)))

datetime_column = config.get("datetime_column")

column_name_suffix_margin = max([
len(OSIsoftConstants.VALUE_COLUMN_SUFFIX),
len(OSIsoftConstants.TIMESTAMP_COLUMN_SUFFIX)
])

input_parameters_dataset = dataiku.Dataset(input_dataset[0])
input_parameters_dataframe = input_parameters_dataset.get_dataframe()

columns_to_interpolate = []
for column in input_parameters_dataframe.columns:
if column.endswith(OSIsoftConstants.VALUE_COLUMN_SUFFIX):
columns_name = column.split(OSIsoftConstants.VALUE_COLUMN_SUFFIX)[0]
# Todo: check that the timestamp is there too before adding to the list
columns_to_interpolate.append(columns_name)

logger.info("Columns to interpolate: {}".format(columns_to_interpolate))

results = []
time_last_request = None
client = None
previous_server_url = ""
groupby_list = {}
file_counter = 0
previous_row = None
first_dataframe = True
final_row = {}

with output_dataset.get_writer() as writer:
for index, input_parameters_row in input_parameters_dataframe.iterrows():
output_rows = []
this_row = input_parameters_row.to_dict()
reference_time = iso_to_epoch(get_datetime_from_row(input_parameters_row, datetime_column))
if previous_row is None:
previous_row = this_row
previous_reference_time = reference_time
continue
# At this stage previous_row is past, this_row is present
for column_to_interpolate in columns_to_interpolate:
sample_time = iso_to_epoch(previous_row.get("{}{}".format(column_to_interpolate, OSIsoftConstants.TIMESTAMP_COLUMN_SUFFIX)))
value = previous_row.get("{}{}".format(column_to_interpolate, OSIsoftConstants.VALUE_COLUMN_SUFFIX))
if sample_time == previous_reference_time:
# This sample can go in output straigth away
previous_row["{}{}".format(column_to_interpolate, OSIsoftConstants.INTERPOLATED_COLUMN_SUFFIX)] = value
elif sample_time < previous_reference_time:
# Sample is in the past, so next one is in the future
future_value = this_row.get("{}{}".format(column_to_interpolate, OSIsoftConstants.VALUE_COLUMN_SUFFIX))
future_time = iso_to_epoch(this_row.get("{}{}".format(column_to_interpolate, OSIsoftConstants.TIMESTAMP_COLUMN_SUFFIX)))
slope = (future_value - value) / (future_time - sample_time)
interpolated_value = value + slope * (previous_reference_time - sample_time)
interpolated_column_name = "{}{}".format(column_to_interpolate, OSIsoftConstants.INTERPOLATED_COLUMN_SUFFIX)
previous_row[interpolated_column_name] = interpolated_value
elif sample_time > previous_reference_time:
# Temporal paradox, that should never happen
raise Exception("Time issue: On row {}, timestamp for {} is advance of the reference timestamp".format(index, column_to_interpolate))
output_dataframe = pandas.DataFrame([previous_row])
output_dataframe = reorder_dataframe(output_dataframe, [datetime_column])
if first_dataframe:
output_dataset.write_schema_from_dataframe(output_dataframe)
first_dataframe = False
writer.write_dataframe(output_dataframe)
previous_row = this_row
previous_reference_time = reference_time
34 changes: 3 additions & 31 deletions custom-recipes/pi-system-transpose/recipe.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
# -*- coding: utf-8 -*-
import dataiku
from dataiku.customrecipe import get_input_names_for_role, get_recipe_config, get_output_names_for_role
import pandas as pd
import pandas
from safe_logger import SafeLogger
import os
from temp_utils import CustomTmpFile
from osisoft_constants import OSIsoftConstants
import dateutil.parser
from column_name import normalise_name
from osisoft_plugin_common import reorder_dataframe
from osisoft_plugin_common import reorder_dataframe, get_datetime_from_row


logger = SafeLogger("pi-system plugin", forbiden_keys=["token", "password"])
Expand Down Expand Up @@ -36,33 +35,6 @@ def parse_timestamp_and_value(line):
return date, value


def get_datetime_from_string(datetime):
try:
time_stamp = dateutil.parser.isoparse(datetime)
return time_stamp
except:
pass
return None


def get_datetime_from_pandas(datetime):
try:
time_stamp = datetime.strftime('%Y-%m-%dT%H:%M:%SZ')
return time_stamp
except:
pass
return None


def get_datetime_from_row(row, datetime_column):
raw_datetime = row[datetime_column]
if type(raw_datetime) is str:
formated_datetime = get_datetime_from_string(raw_datetime)
else:
formated_datetime = get_datetime_from_pandas(raw_datetime)
return formated_datetime


def get_latest_values_at_timestamp(file_handles, seek_timestamp):
attribute_index = 0
values = {}
Expand Down Expand Up @@ -220,7 +192,7 @@ def clean_cache(groupby_list):
synchronize_on_identifier: value
})
unnested_items_rows.append(dictionary)
unnested_items_rows = pd.DataFrame(unnested_items_rows)
unnested_items_rows = pandas.DataFrame(unnested_items_rows)
unnested_items_rows = reorder_dataframe(unnested_items_rows, [OSIsoftConstants.TIMESTAMP_COLUMN_NAME, synchronize_on_identifier])
if first_dataframe:
output_dataset.write_schema_from_dataframe(unnested_items_rows)
Expand Down
2 changes: 2 additions & 0 deletions python-lib/osisoft_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ class OSIsoftConstants(object):
DEFAULT_SCHEME = "https"
DEFAULT_WAIT_BEFORE_RETRY = 60
DKU_ERROR_KEY = "Errors"
INTERPOLATED_COLUMN_SUFFIX = "_ip"
LINKS = "Links"
MAXIMUM_RETRIES_ON_THROTTLING = 5
POSSIBLE_WEB_ID_STARTS = ["F1", "I1", "P1", "L1", "D1"]
Expand Down Expand Up @@ -376,6 +377,7 @@ class OSIsoftConstants(object):
SEARCH_PATH = "search"
STREAM_PATH = "streams"
STREAMSETS_PATH = "streamsets"
TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
TIMESTAMP_COLUMN_NAME = "Timestamp"
TIMESTAMP_COLUMN_SUFFIX = "_ts"
PIWEBAPI_AF_ENDPOINTS = {
Expand Down
27 changes: 27 additions & 0 deletions python-lib/osisoft_plugin_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,33 @@ def iso_to_epoch(iso_timestamp):
return epoch_timestamp


def get_datetime_from_row(row, datetime_column):
raw_datetime = row[datetime_column]
if type(raw_datetime) is str:
formated_datetime = get_datetime_from_string(raw_datetime)
else:
formated_datetime = get_datetime_from_pandas(raw_datetime)
return formated_datetime


def get_datetime_from_string(datetime):
try:
_ = date_parser.isoparse(datetime)
return datetime
except Exception:
pass
return None


def get_datetime_from_pandas(datetime):
try:
time_stamp = datetime.strftime(OSIsoftConstants.TIME_FORMAT)
return time_stamp
except Exception:
pass
return None


def reorder_dataframe(unnested_items_rows, first_elements):
columns = unnested_items_rows.columns.tolist()
for first_element in reversed(first_elements):
Expand Down