Skip to content

Commit

Permalink
Added time series indexes support in the CSVSFileStorage. Minor impro…
Browse files Browse the repository at this point in the history
…vements.
  • Loading branch information
sarusso committed Dec 4, 2021
1 parent fdf9990 commit f6d5298
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 21 deletions.
13 changes: 11 additions & 2 deletions timeseria/datastructures.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ def append(self, item):
try:
try:
if not item.__succedes__(self[-1]):
raise ValueError('Not in succession ("{}" vs "{}")'.format(item,self[-1])) from None
raise ValueError('Not in succession ("{}" does not succeeds "{}")'.format(item,self[-1])) from None
except IndexError:
raise
except AttributeError:
try:
if not item > self[-1]:
raise ValueError('Not in order ("{}" vs "{}")'.format(item,self[-1])) from None
raise ValueError('Not in order ("{}" does not follow "{}")'.format(item,self[-1])) from None
except TypeError:
raise TypeError('Object of class "{}" does not implement a "__gt__" or a "__succedes__" method, cannot append it to a Series (which is ordered)'.format(item.__class__.__name__)) from None
except IndexError:
Expand Down Expand Up @@ -469,6 +469,11 @@ def data_reconstructed(self):
except AttributeError:
return None

@data_reconstructed.setter
def data_reconstructed(self, value):
self._data_reconstructed = value



class DataTimePoint(DataPoint, TimePoint):
"""A point that carries some data in the time dimension."""
Expand Down Expand Up @@ -926,6 +931,10 @@ def data_reconstructed(self):
return self._data_reconstructed
except AttributeError:
return None

@data_reconstructed.setter
def data_reconstructed(self, value):
self._data_reconstructed = value


class DataTimeSlot(DataSlot, TimeSlot):
Expand Down
92 changes: 74 additions & 18 deletions timeseria/storages.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ def get(self, limit=None, tz=None, item_type=None, sort=False):
# Do we have to stop? Note: empty lines still have the "\n" char.
if not line:
break

# Is this line an empty line?
#if not line.strip():
# continue

# Is this line a comment?
comment_chars = ['#', ';']
Expand Down Expand Up @@ -325,7 +329,7 @@ def get(self, limit=None, tz=None, item_type=None, sort=False):
elif self.data_format == list:
data = [to_float(line_items[index],NO_DATA_PLACEHOLDERS) for index in data_column_indexes]
else:
data = {column_labels[index]: to_float(line_items[index],NO_DATA_PLACEHOLDERS) for index in data_column_indexes}
data = {column_labels[index]: to_float(line_items[index],NO_DATA_PLACEHOLDERS,column_labels[index]) for index in data_column_indexes}
else:
# Default here is to set a float if there is only one value.
# TODO: are we sure we want this?
Expand All @@ -340,14 +344,14 @@ def get(self, limit=None, tz=None, item_type=None, sort=False):
logger.debug('Cannot convert value "%s" in line #%s to float, skipping the line.', e, line_number)
continue
else:
raise Exception('Cannot convert value "{}" in line #%{} to float, aborting. Set "skip_errors=True" to drop them instead.'.format(e, line_number)) from None
raise Exception('Cannot convert value "{}" in line #{} to float, aborting. Set "skip_errors=True" to drop them instead.'.format(e, line_number)) from None

except IndexError as e:
if self.skip_errors:
logger.debug('Cannot parse in line #%s as some values are missing, skipping the line.', line_number)
continue
else:
raise Exception('Cannot parse line #%{} as some values are missing, aborting. Set "skip_errors=True" to drop them instead.'.format(line_number)) from None
raise Exception('Cannot parse line #{} as some values are missing, aborting. Set "skip_errors=True" to drop them instead.'.format(line_number)) from None

logger.debug('Set data to "%s"', data)

Expand Down Expand Up @@ -463,7 +467,33 @@ def get(self, limit=None, tz=None, item_type=None, sort=False):
items = sorted(items, key=itemgetter(0))
for item in items:
try:
timeseries.append(DataTimePoint(t=item[0], data=item[1], tz=tz))
data_loss = None
indexes = None
if isinstance (item[1], dict):
# Handle indexes and special data loss case
indexes = {}
for key in item[1]:
if key.startswith('__'):
if key=='__data_loss':
data_loss = item[1][key]
else:
indexes[key] = item[1][key]

# Remove indexes and data loss from item data
for index in indexes:
item[1].pop(index)
item[1].pop('__data_loss',None)

# Create DataTimePoint, set data loss and set indexes
data_time_point = DataTimePoint(t=item[0], data=item[1], data_loss=data_loss, tz=tz)
if indexes:
for index in indexes:
# Set index. The [2:] removes the two trailing underscores
setattr(data_time_point, index[2:], indexes[index])

# Append
timeseries.append(data_time_point)

except Exception as e:
if self.skip_errors:
logger.error(e)
Expand All @@ -475,8 +505,31 @@ def get(self, limit=None, tz=None, item_type=None, sort=False):

for i, item in enumerate(items):
try:
# TODO:
timeseries.append(DataTimeSlot(t=item[0], unit=unit, data=item[1], data_loss=0, tz=tz))
data_loss = 0
if isinstance (item[1], dict):
# Handle indexes and special data loss case
indexes = {}
for key in item[1]:
if key.startswith('__'):
if key=='__data_loss':
data_loss = item[1][key]
else:
indexes[key] = item[1][key]

# Remove indexes and data loss from item data
for index in indexes:
item[1].pop(index)
item[1].pop('__data_loss',None)

# Create DataTimeSlot, set data loss and set indexes
data_time_slot = DataTimeSlot(t=item[0], unit=unit, data=item[1], data_loss=data_loss, tz=tz)
if indexes:
for index in indexes:
# Set index. The [2:] removes the two trailing underscores
setattr(data_time_slot, index[2:], indexes[index])

timeseries.append(data_time_slot)

except ValueError as e:
# The only ValueError that could (should) arise here is a "Not in succession" error.
missing_timestamps = []
Expand Down Expand Up @@ -504,10 +557,9 @@ def put(self, timeseries, overwrite=False):

if os.path.isfile(self.filename_with_path) and not overwrite:
raise Exception('File already exists. use overwrite=True to overwrite.')

# Detect encoding if not set
#if not self.encoding:
# self.encoding = detect_encoding(self.filename_with_path, streaming=False)

# Set indexes here once for all or thre will be a slowndowd afterwards
indexes = timeseries.indexes

with open(self.filename_with_path, 'w') as csv_file:

Expand All @@ -527,18 +579,22 @@ def put(self, timeseries, overwrite=False):


# 1) Dump headers
if timeseries[0].data_loss is not None:
csv_file.write('epoch,{},data_loss\n'.format(','.join([str(key) for key in timeseries.data_keys()])))
data_keys_part = ','.join([str(key) for key in timeseries.data_keys()])
indexes_part = ','.join(['__'+index for index in indexes])
if indexes_part:
csv_file.write('epoch,{},{}\n'.format(data_keys_part,indexes_part))
else:
csv_file.write('epoch,{}\n'.format(','.join([str(key) for key in timeseries.data_keys()])))

csv_file.write('epoch,{}\n'.format(data_keys_part))

# 2) Dump data

# 2) Dump data (and indexes)
for item in timeseries:
if timeseries[0].data_loss is not None:
csv_file.write('{},{},{}\n'.format(item.t, ','.join([str(item.data[key]) for key in timeseries.data_keys()]),item.data_loss))
data_part = ','.join([str(item.data[key]) for key in timeseries.data_keys()])
indexes_part = ','.join([str(getattr(item, index)) for index in indexes])
if indexes_part:
csv_file.write('{},{},{}\n'.format(item.t, data_part, indexes_part))
else:
csv_file.write('{},{}\n'.format(item.t, ','.join([str(item.data[key]) for key in timeseries.data_keys()])))
csv_file.write('{},{}\n'.format(item.t, data_part))



Expand Down
5 changes: 4 additions & 1 deletion timeseria/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -596,13 +596,16 @@ def is_list_of_integers(list):
else:
return True

def to_float(string,no_data_placeholders=[]):
def to_float(string,no_data_placeholders=[],label=None):
sanitized_string_string = sanitize_string(string,no_data_placeholders)
if sanitized_string_string:
sanitized_string_string = sanitized_string_string.replace(',','.')
try:
return float(sanitized_string_string)
except (ValueError, TypeError):
# Do not raise inf converting indexes as they are allowed to be "None"
if label and label.startswith('__'):
return None
raise FloatConversionError(sanitized_string_string)


Expand Down

0 comments on commit f6d5298

Please sign in to comment.