This repository was archived by the owner on Dec 7, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathhistory_stats.py
320 lines (258 loc) · 10.2 KB
/
history_stats.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
"""
Component to make instant statistics about your history.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/sensor.history_stats/
"""
import datetime
import logging
import math
import voluptuous as vol
import homeassistant.components.history as history
import homeassistant.helpers.config_validation as cv
import homeassistant.util.dt as dt_util
from homeassistant.components.sensor import PLATFORM_SCHEMA
from homeassistant.const import (
CONF_NAME, CONF_ENTITY_ID, CONF_STATE, CONF_TYPE,
EVENT_HOMEASSISTANT_START)
from homeassistant.exceptions import TemplateError
from homeassistant.helpers.entity import Entity
from homeassistant.helpers.event import track_state_change
_LOGGER = logging.getLogger(__name__)
DOMAIN = 'history_stats'
DEPENDENCIES = ['history']
CONF_START = 'start'
CONF_END = 'end'
CONF_DURATION = 'duration'
CONF_PERIOD_KEYS = [CONF_START, CONF_END, CONF_DURATION]
CONF_TYPE_TIME = 'time'
CONF_TYPE_RATIO = 'ratio'
CONF_TYPE_COUNT = 'count'
CONF_TYPE_KEYS = [CONF_TYPE_TIME, CONF_TYPE_RATIO, CONF_TYPE_COUNT]
DEFAULT_NAME = 'unnamed statistics'
UNITS = {
CONF_TYPE_TIME: 'h',
CONF_TYPE_RATIO: '%',
CONF_TYPE_COUNT: ''
}
ICON = 'mdi:chart-line'
ATTR_VALUE = 'value'
def exactly_two_period_keys(conf):
"""Ensure exactly 2 of CONF_PERIOD_KEYS are provided."""
if sum(param in conf for param in CONF_PERIOD_KEYS) != 2:
raise vol.Invalid('You must provide exactly 2 of the following:'
' start, end, duration')
return conf
PLATFORM_SCHEMA = vol.All(PLATFORM_SCHEMA.extend({
vol.Required(CONF_ENTITY_ID): cv.entity_id,
vol.Required(CONF_STATE): cv.string,
vol.Optional(CONF_START): cv.template,
vol.Optional(CONF_END): cv.template,
vol.Optional(CONF_DURATION): cv.time_period,
vol.Optional(CONF_TYPE, default=CONF_TYPE_TIME): vol.In(CONF_TYPE_KEYS),
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
}), exactly_two_period_keys)
# noinspection PyUnusedLocal
def setup_platform(hass, config, add_devices, discovery_info=None):
"""Set up the History Stats sensor."""
entity_id = config.get(CONF_ENTITY_ID)
entity_state = config.get(CONF_STATE)
start = config.get(CONF_START)
end = config.get(CONF_END)
duration = config.get(CONF_DURATION)
sensor_type = config.get(CONF_TYPE)
name = config.get(CONF_NAME)
for template in [start, end]:
if template is not None:
template.hass = hass
add_devices([HistoryStatsSensor(hass, entity_id, entity_state, start, end,
duration, sensor_type, name)])
return True
class HistoryStatsSensor(Entity):
"""Representation of a HistoryStats sensor."""
def __init__(
self, hass, entity_id, entity_state, start, end, duration,
sensor_type, name):
"""Initialize the HistoryStats sensor."""
self._hass = hass
self._entity_id = entity_id
self._entity_state = entity_state
self._duration = duration
self._start = start
self._end = end
self._type = sensor_type
self._name = name
self._unit_of_measurement = UNITS[sensor_type]
self._period = (datetime.datetime.now(), datetime.datetime.now())
self.value = None
self.count = None
def force_refresh(*args):
"""Force the component to refresh."""
self.schedule_update_ha_state(True)
# Update value when home assistant starts
hass.bus.listen_once(EVENT_HOMEASSISTANT_START, force_refresh)
# Update value when tracked entity changes its state
track_state_change(hass, entity_id, force_refresh)
@property
def name(self):
"""Return the name of the sensor."""
return self._name
@property
def state(self):
"""Return the state of the sensor."""
if self.value is None or self.count is None:
return None
if self._type == CONF_TYPE_TIME:
return round(self.value, 2)
if self._type == CONF_TYPE_RATIO:
return HistoryStatsHelper.pretty_ratio(self.value, self._period)
if self._type == CONF_TYPE_COUNT:
return self.count
@property
def unit_of_measurement(self):
"""Return the unit the value is expressed in."""
return self._unit_of_measurement
@property
def should_poll(self):
"""Polling required."""
return True
@property
def device_state_attributes(self):
"""Return the state attributes of the sensor."""
if self.value is None:
return {}
hsh = HistoryStatsHelper
return {
ATTR_VALUE: hsh.pretty_duration(self.value),
}
@property
def icon(self):
"""Return the icon to use in the frontend, if any."""
return ICON
def update(self):
"""Get the latest data and updates the states."""
# Get previous values of start and end
p_start, p_end = self._period
# Parse templates
self.update_period()
start, end = self._period
# Convert times to UTC
start = dt_util.as_utc(start)
end = dt_util.as_utc(end)
p_start = dt_util.as_utc(p_start)
p_end = dt_util.as_utc(p_end)
now = datetime.datetime.now()
# Compute integer timestamps
start_timestamp = math.floor(dt_util.as_timestamp(start))
end_timestamp = math.floor(dt_util.as_timestamp(end))
p_start_timestamp = math.floor(dt_util.as_timestamp(p_start))
p_end_timestamp = math.floor(dt_util.as_timestamp(p_end))
now_timestamp = math.floor(dt_util.as_timestamp(now))
# If period has not changed and current time after the period end...
if start_timestamp == p_start_timestamp and \
end_timestamp == p_end_timestamp and \
end_timestamp <= now_timestamp:
# Don't compute anything as the value cannot have changed
return
# Get history between start and end
history_list = history.state_changes_during_period(
self.hass, start, end, str(self._entity_id))
if self._entity_id not in history_list.keys():
return
# Get the first state
last_state = history.get_state(self.hass, start, self._entity_id)
last_state = (last_state is not None and
last_state == self._entity_state)
last_time = start_timestamp
elapsed = 0
count = 0
# Make calculations
for item in history_list.get(self._entity_id):
current_state = item.state == self._entity_state
current_time = item.last_changed.timestamp()
if last_state:
elapsed += current_time - last_time
if current_state and not last_state:
count += 1
last_state = current_state
last_time = current_time
# Count time elapsed between last history state and end of measure
if last_state:
measure_end = min(end_timestamp, now_timestamp)
elapsed += measure_end - last_time
# Save value in hours
self.value = elapsed / 3600
# Save counter
self.count = count
def update_period(self):
"""Parse the templates and store a datetime tuple in _period."""
start = None
end = None
# Parse start
if self._start is not None:
try:
start_rendered = self._start.render()
except (TemplateError, TypeError) as ex:
HistoryStatsHelper.handle_template_exception(ex, 'start')
return
start = dt_util.parse_datetime(start_rendered)
if start is None:
try:
start = dt_util.as_local(dt_util.utc_from_timestamp(
math.floor(float(start_rendered))))
except ValueError:
_LOGGER.error('PARSING ERROR: start must be a datetime'
' or a timestamp.')
return
# Parse end
if self._end is not None:
try:
end_rendered = self._end.render()
except (TemplateError, TypeError) as ex:
HistoryStatsHelper.handle_template_exception(ex, 'end')
return
end = dt_util.parse_datetime(end_rendered)
if end is None:
try:
end = dt_util.as_local(dt_util.utc_from_timestamp(
math.floor(float(end_rendered))))
except ValueError:
_LOGGER.error('PARSING ERROR: end must be a datetime'
' or a timestamp.')
return
# Calculate start or end using the duration
if start is None:
start = end - self._duration
if end is None:
end = start + self._duration
self._period = start, end
class HistoryStatsHelper:
"""Static methods to make the HistoryStatsSensor code lighter."""
@staticmethod
def pretty_duration(hours):
"""Format a duration in days, hours, minutes, seconds."""
seconds = int(3600 * hours)
days, seconds = divmod(seconds, 86400)
hours, seconds = divmod(seconds, 3600)
minutes, seconds = divmod(seconds, 60)
if days > 0:
return '%dd %dh %dm' % (days, hours, minutes)
elif hours > 0:
return '%dh %dm' % (hours, minutes)
return '%dm' % minutes
@staticmethod
def pretty_ratio(value, period):
"""Format the ratio of value / period duration."""
if len(period) != 2 or period[0] == period[1]:
return 0.0
ratio = 100 * 3600 * value / (period[1] - period[0]).total_seconds()
return round(ratio, 1)
@staticmethod
def handle_template_exception(ex, field):
"""Log an error nicely if the template cannot be interpreted."""
if ex.args and ex.args[0].startswith(
"UndefinedError: 'None' has no attribute"):
# Common during HA startup - so just a warning
_LOGGER.warning(ex)
return
_LOGGER.error('Error parsing template for [' + field + ']')
_LOGGER.error(ex)