Skip to content

Commit

Permalink
Merge pull request #3157 from riley206/releases/9.0rc0
Browse files Browse the repository at this point in the history
24 Hour Reporting and Default Config Implementation for Log Stat Agent
  • Loading branch information
schandrika authored Jul 31, 2024
2 parents 28e7f07 + 2f5820b commit 22d6bc8
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 58 deletions.
35 changes: 20 additions & 15 deletions services/ops/LogStatisticsAgent/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,28 @@ which may be an indication of some sort of failure or breach.

### Configuration

The Log Statistics agent has 4 required configuration values:

- `file_path`: This should be the path to the "volttron.log" file
- `analysis_interval_secs`: The interval in seconds between publishing the size delta statistic to the message bus
- `publish_topic`: Can be used to specify a topic to publish log statistics to which does not get captured by the
historian framework (topics not prefixed by any of: "datalogger", "record", "analysis", "devices")
- `historian_topic`: Can be used to specify a topic to publish log statistics to which gets captured by the
historian framework ("datalogger", "record", "analysis", "devices")

The following is an example configuration file:

The Log Statistics agent has 4 configuration parameters, all of which are required:

- `file_path`: The file path to the log file. If no config provided, defaults to `'volttron.log'` located within your VOLTTRON_HOME environment variable.
- `analysis_interval_secs`: The interval in seconds between publishes of the size delta statistic to the message bus. If no config provided, defaults to 60 seconds.
- `publish_topic`: Used to specify a topic to publish log statistics to which does not get captured by the
historian framework (topics not prefixed by any of: "datalogger", "record", "analysis", "devices"). If no config provided, defaults to `"platform/log_statistics"`.
- `historian_topic`: Can be used to specify a topic to publish log statistics to which gets captured by the
historian framework ("datalogger", "record", "analysis", "devices"). If no config provided, defaults to `record/log_statistics`.
- `unit`: Can be used to specify units. Defaults to `bytes`.
- "bytes"
- "kb"
- "mb"
- "gb"

Here is an example configuration file named `log_stat_config.json`.
```json
{
"file_path" : "~/volttron/volttron.log",
"analysis_interval_min" : 60,
"publish_topic" : "platform/log_statistics",
"historian_topic" : "record/log_statistics"
"analysis_interval_sec": 60,
"file_path": "path/to/.log/",
"historian_topic": "analysis/log_statistics",
"publish_topic": "platform/log_statistics",
"unit": "bytes"
}
```

Expand Down
7 changes: 7 additions & 0 deletions services/ops/LogStatisticsAgent/log_stat_config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"analysis_interval_sec": 20,
"file_path": "/home/riley/DRIVERWORK/PORTS/rileysVOLTTRON/volttron.log",
"historian_topic": "analysis/log_statistics",
"publish_topic": "platform/log_statistics",
"unit": "mb"
}
6 changes: 0 additions & 6 deletions services/ops/LogStatisticsAgent/logstatisticsagent.config

This file was deleted.

132 changes: 95 additions & 37 deletions services/ops/LogStatisticsAgent/logstatisticsagent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,6 @@
_log = logging.getLogger(__name__)
__version__ = '1.0'


def log_statistics(config_path, **kwargs):
"""
Load the LogStatisticsAgent agent configuration and returns and instance
of the agent created using that configuration.
:param config_path: Path to a configuration file.
:type config_path: str
:returns: LogStatisticsAgent agent instance
:rtype: LogStatisticsAgent agent
"""
config = utils.load_config(config_path)
return LogStatisticsAgent(config, **kwargs)


class LogStatisticsAgent(Agent):
"""
LogStatisticsAgent reads volttron.log file size every hour, compute the size delta from previous hour and publish
Expand All @@ -66,39 +52,67 @@ class LogStatisticsAgent(Agent):
}
"""

def __init__(self, config, **kwargs):
def __init__(self, config_path=None, **kwargs):
super(LogStatisticsAgent, self).__init__(**kwargs)
self.configured = False
self.last_std_dev_time = get_aware_utc_now()

self.default_config = {
"file_path": "volttron.log",
"analysis_interval_sec": 60,
"publish_topic": "platform/log_statistics",
"historian_topic": "analysis/log_statistics",
"unit": "bytes"
}
if config_path:
self.default_config.update(utils.load_config(config_path))
self.vip.config.set_default("config", self.default_config)
self.vip.config.subscribe(self.configure_main, actions=["NEW", "UPDATE"], pattern="config")

def configure_main(self, config_name, action, contents):
config = self.default_config.copy()
config.update(contents)
self.configured = True
if action == "NEW" or "UPDATE":
self.reset_parameters(config)
_log.info("Starting " + self.__class__.__name__ + " agent")

def reset_parameters(self, config=None):
self.analysis_interval_sec = config["analysis_interval_sec"]
self.file_path = config["file_path"]
self.publish_topic = config["publish_topic"]
self.historian_topic = config["historian_topic"]
self.unit = config["unit"]
self.size_delta_list = []
self.file_start_size = None
self.prev_file_size = None
self._scheduled_event = None

@Core.receiver('onstart')
def starting(self, sender, **kwargs):
_log.info("Starting " + self.__class__.__name__ + " agent")
self.publish_analysis()
if self.configured:
self.publish_analysis()

def publish_analysis(self):
"""
Publishes file's size increment in previous time interval (60 minutes) with timestamp.
Also publishes standard deviation of file's hourly size differences every 24 hour.
"""
if not hasattr(self, '_scheduled_event'):
# The settings haven't been initialized, so skip the rest of the method
return

if self._scheduled_event is not None:
self._scheduled_event.cancel()

if self.prev_file_size is None:
self.prev_file_size = self.get_file_size()
_log.debug("init_file_size = {}".format(self.prev_file_size))
_log.debug(f"init_file_size = {self.convert_bytes(self.prev_file_size, self.unit)} {self.unit}")
else:
# read file size
curr_file_size = self.get_file_size()

# calculate size delta
size_delta = curr_file_size - self.prev_file_size
size_delta = self.convert_bytes(size_delta, self.unit)

self.prev_file_size = curr_file_size

self.size_delta_list.append(size_delta)
Expand All @@ -107,23 +121,56 @@ def publish_analysis(self):

publish_message = {'timestamp': datetime.datetime.utcnow().isoformat() + 'Z',
'log_size_delta': size_delta}
historian_message = [{"log_size_delta ": size_delta},
{"log_size_delta ": {'units': 'bytes', 'tz': 'UTC', 'type': 'float'}}]

if len(self.size_delta_list) == 24:
standard_deviation = statistics.stdev(self.size_delta_list)
publish_message['log_std_dev'] = standard_deviation
historian_message[0]['log_std_dev'] = standard_deviation
historian_message[1]['log_std_dev'] = {'units': 'bytes', 'tz': 'UTC', 'type': 'float'}

_log.debug('publishing message {} with header {} on historian topic {}'
.format(historian_message, headers, self.historian_topic))
self.vip.pubsub.publish(peer="pubsub", topic=self.historian_topic, headers=headers,
message=historian_message)
historian_message = [{
"log_size_delta ": size_delta
}, {
"log_size_delta ": {
'units': f'{self.unit}',
'tz': 'UTC',
'type': 'float'
}
}]

now = get_aware_utc_now()
hours_since_last_std_dev = (now - self.last_std_dev_time).total_seconds() / 3600

if hours_since_last_std_dev >= 24:
if self.size_delta_list: # make sure it has something in it
if len(self.size_delta_list) >= 2: # make sure it has more than two items
mean = statistics.mean(self.size_delta_list)
standard_deviation = statistics.stdev(self.size_delta_list)

publish_message['log_mean'] = mean
print(f"Calculated mean: {mean}")
publish_message['log_std_dev'] = standard_deviation

historian_message[0]['log_mean'] = mean
historian_message[0]['log_std_dev'] = standard_deviation

historian_message[1]['log_mean'] = {'units': f'{self.unit}', 'tz': 'UTC', 'type': 'float'}
historian_message[1]['log_std_dev'] = {'units': f'{self.unit}', 'tz': 'UTC',
'type': 'float'}

else:
_log.info("Not enough data points to calculate standard deviation")

else:
_log.info("Not enough data points to calculate mean and standard deviation")

# Reset time
self.last_std_dev_time = now

self.size_delta_list = []

_log.debug('publishing message {} on topic {}'.format(publish_message, self.publish_topic))
_log.debug(f'publishing message {historian_message}'
f' with header {headers}'
f' on historian topic {self.historian_topic}')
self.vip.pubsub.publish(peer="pubsub",
topic=self.historian_topic,
headers=headers,
message=historian_message)

_log.debug(f'publishing message {publish_message} {self.unit} on topic {self.publish_topic}')
self.vip.pubsub.publish(peer="pubsub", topic=self.publish_topic, message=publish_message)

_log.debug('Scheduling next periodic call')
Expand All @@ -138,13 +185,24 @@ def get_file_size(self):
except OSError as e:
_log.error(e)

def convert_bytes(self, size, unit):
"""
Converts size from bytes to the specified unit
"""
unit = unit.lower()
if unit == 'kb':
return size / 1024
elif unit == 'mb':
return size / 1024 ** 2
elif unit == 'gb':
return size / 1024 ** 3
return size

def main(argv=sys.argv):
"""
Main method called by the platform.
"""
utils.vip_main(log_statistics, identity='platform.logstatisticsagent')

utils.vip_main(LogStatisticsAgent, identity='platform.log_statistics')

if __name__ == '__main__':
# Entry point for script
Expand Down

0 comments on commit 22d6bc8

Please sign in to comment.