From f6d5298aeac6d3d3bc397837c8c4e1d13dde5fb8 Mon Sep 17 00:00:00 2001 From: Stefano Alberto Russo Date: Sat, 4 Dec 2021 19:16:39 +0100 Subject: [PATCH] Added time series indexes support in the CSVSFileStorage. Minor improvements. --- timeseria/datastructures.py | 13 +++++- timeseria/storages.py | 92 +++++++++++++++++++++++++++++-------- timeseria/utilities.py | 5 +- 3 files changed, 89 insertions(+), 21 deletions(-) diff --git a/timeseria/datastructures.py b/timeseria/datastructures.py index fbbcfa6..7ef4e42 100644 --- a/timeseria/datastructures.py +++ b/timeseria/datastructures.py @@ -56,13 +56,13 @@ def append(self, item): try: try: if not item.__succedes__(self[-1]): - raise ValueError('Not in succession ("{}" vs "{}")'.format(item,self[-1])) from None + raise ValueError('Not in succession ("{}" does not succeeds "{}")'.format(item,self[-1])) from None except IndexError: raise except AttributeError: try: if not item > self[-1]: - raise ValueError('Not in order ("{}" vs "{}")'.format(item,self[-1])) from None + raise ValueError('Not in order ("{}" does not follow "{}")'.format(item,self[-1])) from None except TypeError: raise TypeError('Object of class "{}" does not implement a "__gt__" or a "__succedes__" method, cannot append it to a Series (which is ordered)'.format(item.__class__.__name__)) from None except IndexError: @@ -469,6 +469,11 @@ def data_reconstructed(self): except AttributeError: return None + @data_reconstructed.setter + def data_reconstructed(self, value): + self._data_reconstructed = value + + class DataTimePoint(DataPoint, TimePoint): """A point that carries some data in the time dimension.""" @@ -926,6 +931,10 @@ def data_reconstructed(self): return self._data_reconstructed except AttributeError: return None + + @data_reconstructed.setter + def data_reconstructed(self, value): + self._data_reconstructed = value class DataTimeSlot(DataSlot, TimeSlot): diff --git a/timeseria/storages.py b/timeseria/storages.py index aa540a4..c4d82f0 100644 --- a/timeseria/storages.py +++ b/timeseria/storages.py @@ -115,6 +115,10 @@ def get(self, limit=None, tz=None, item_type=None, sort=False): # Do we have to stop? Note: empty lines still have the "\n" char. if not line: break + + # Is this line an empty line? + #if not line.strip(): + # continue # Is this line a comment? comment_chars = ['#', ';'] @@ -325,7 +329,7 @@ def get(self, limit=None, tz=None, item_type=None, sort=False): elif self.data_format == list: data = [to_float(line_items[index],NO_DATA_PLACEHOLDERS) for index in data_column_indexes] else: - data = {column_labels[index]: to_float(line_items[index],NO_DATA_PLACEHOLDERS) for index in data_column_indexes} + data = {column_labels[index]: to_float(line_items[index],NO_DATA_PLACEHOLDERS,column_labels[index]) for index in data_column_indexes} else: # Default here is to set a float if there is only one value. # TODO: are we sure we want this? @@ -340,14 +344,14 @@ def get(self, limit=None, tz=None, item_type=None, sort=False): logger.debug('Cannot convert value "%s" in line #%s to float, skipping the line.', e, line_number) continue else: - raise Exception('Cannot convert value "{}" in line #%{} to float, aborting. Set "skip_errors=True" to drop them instead.'.format(e, line_number)) from None + raise Exception('Cannot convert value "{}" in line #{} to float, aborting. Set "skip_errors=True" to drop them instead.'.format(e, line_number)) from None except IndexError as e: if self.skip_errors: logger.debug('Cannot parse in line #%s as some values are missing, skipping the line.', line_number) continue else: - raise Exception('Cannot parse line #%{} as some values are missing, aborting. Set "skip_errors=True" to drop them instead.'.format(line_number)) from None + raise Exception('Cannot parse line #{} as some values are missing, aborting. Set "skip_errors=True" to drop them instead.'.format(line_number)) from None logger.debug('Set data to "%s"', data) @@ -463,7 +467,33 @@ def get(self, limit=None, tz=None, item_type=None, sort=False): items = sorted(items, key=itemgetter(0)) for item in items: try: - timeseries.append(DataTimePoint(t=item[0], data=item[1], tz=tz)) + data_loss = None + indexes = None + if isinstance (item[1], dict): + # Handle indexes and special data loss case + indexes = {} + for key in item[1]: + if key.startswith('__'): + if key=='__data_loss': + data_loss = item[1][key] + else: + indexes[key] = item[1][key] + + # Remove indexes and data loss from item data + for index in indexes: + item[1].pop(index) + item[1].pop('__data_loss',None) + + # Create DataTimePoint, set data loss and set indexes + data_time_point = DataTimePoint(t=item[0], data=item[1], data_loss=data_loss, tz=tz) + if indexes: + for index in indexes: + # Set index. The [2:] removes the two trailing underscores + setattr(data_time_point, index[2:], indexes[index]) + + # Append + timeseries.append(data_time_point) + except Exception as e: if self.skip_errors: logger.error(e) @@ -475,8 +505,31 @@ def get(self, limit=None, tz=None, item_type=None, sort=False): for i, item in enumerate(items): try: - # TODO: - timeseries.append(DataTimeSlot(t=item[0], unit=unit, data=item[1], data_loss=0, tz=tz)) + data_loss = 0 + if isinstance (item[1], dict): + # Handle indexes and special data loss case + indexes = {} + for key in item[1]: + if key.startswith('__'): + if key=='__data_loss': + data_loss = item[1][key] + else: + indexes[key] = item[1][key] + + # Remove indexes and data loss from item data + for index in indexes: + item[1].pop(index) + item[1].pop('__data_loss',None) + + # Create DataTimeSlot, set data loss and set indexes + data_time_slot = DataTimeSlot(t=item[0], unit=unit, data=item[1], data_loss=data_loss, tz=tz) + if indexes: + for index in indexes: + # Set index. The [2:] removes the two trailing underscores + setattr(data_time_slot, index[2:], indexes[index]) + + timeseries.append(data_time_slot) + except ValueError as e: # The only ValueError that could (should) arise here is a "Not in succession" error. missing_timestamps = [] @@ -504,10 +557,9 @@ def put(self, timeseries, overwrite=False): if os.path.isfile(self.filename_with_path) and not overwrite: raise Exception('File already exists. use overwrite=True to overwrite.') - - # Detect encoding if not set - #if not self.encoding: - # self.encoding = detect_encoding(self.filename_with_path, streaming=False) + + # Set indexes here once for all or thre will be a slowndowd afterwards + indexes = timeseries.indexes with open(self.filename_with_path, 'w') as csv_file: @@ -527,18 +579,22 @@ def put(self, timeseries, overwrite=False): # 1) Dump headers - if timeseries[0].data_loss is not None: - csv_file.write('epoch,{},data_loss\n'.format(','.join([str(key) for key in timeseries.data_keys()]))) + data_keys_part = ','.join([str(key) for key in timeseries.data_keys()]) + indexes_part = ','.join(['__'+index for index in indexes]) + if indexes_part: + csv_file.write('epoch,{},{}\n'.format(data_keys_part,indexes_part)) else: - csv_file.write('epoch,{}\n'.format(','.join([str(key) for key in timeseries.data_keys()]))) - + csv_file.write('epoch,{}\n'.format(data_keys_part)) - # 2) Dump data + + # 2) Dump data (and indexes) for item in timeseries: - if timeseries[0].data_loss is not None: - csv_file.write('{},{},{}\n'.format(item.t, ','.join([str(item.data[key]) for key in timeseries.data_keys()]),item.data_loss)) + data_part = ','.join([str(item.data[key]) for key in timeseries.data_keys()]) + indexes_part = ','.join([str(getattr(item, index)) for index in indexes]) + if indexes_part: + csv_file.write('{},{},{}\n'.format(item.t, data_part, indexes_part)) else: - csv_file.write('{},{}\n'.format(item.t, ','.join([str(item.data[key]) for key in timeseries.data_keys()]))) + csv_file.write('{},{}\n'.format(item.t, data_part)) diff --git a/timeseria/utilities.py b/timeseria/utilities.py index 8b08d76..7b24fd1 100644 --- a/timeseria/utilities.py +++ b/timeseria/utilities.py @@ -596,13 +596,16 @@ def is_list_of_integers(list): else: return True -def to_float(string,no_data_placeholders=[]): +def to_float(string,no_data_placeholders=[],label=None): sanitized_string_string = sanitize_string(string,no_data_placeholders) if sanitized_string_string: sanitized_string_string = sanitized_string_string.replace(',','.') try: return float(sanitized_string_string) except (ValueError, TypeError): + # Do not raise inf converting indexes as they are allowed to be "None" + if label and label.startswith('__'): + return None raise FloatConversionError(sanitized_string_string)