forked from disrupted/bvg-sensor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
sensor.py
300 lines (272 loc) · 11.6 KB
/
sensor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
# Version History:
# Version 0.1 - initial release
# Version 0.2 - added multiple destinations, optimized error logging
# Version 0.3 fixed encoding, simplified config for direction
# Version 0.3.1 fixed a bug when departure is null
# Version 0.3.2 bufix for TypeError
# Version 0.3.3 switched to timezone aware objects, cache_size added to config parameters, optimized logging
# Version 0.3.4 fixed encoding (issue #3), fixed typo in filepath
from urllib.request import urlopen
import json
import pytz
import os.path
from datetime import datetime, timedelta
from urllib.error import URLError
import logging
import voluptuous as vol
from homeassistant.helpers.entity import Entity
import homeassistant.helpers.config_validation as cv
from homeassistant.components.sensor import PLATFORM_SCHEMA
_LOGGER = logging.getLogger(__name__)
ATTR_STOP_ID = "stop_id"
ATTR_STOP_NAME = "stop_name"
ATTR_DUE_IN = "due_in"
ATTR_DELAY = "delay"
ATTR_REAL_TIME = "departure_time"
ATTR_DESTINATION = "direction"
ATTR_TRANS_TYPE = "type"
ATTR_TRIP_ID = "trip"
ATTR_LINE_NAME = "line_name"
ATTR_CONNECTION_STATE = "connection_status"
CONF_NAME = "name"
CONF_STOP_ID = "stop_id"
CONF_DESTINATION = "direction"
CONF_MIN_DUE_IN = "walking_distance"
CONF_CACHE_PATH = "file_path"
CONF_CACHE_SIZE = "cache_size"
CONNECTION_STATE = "connection_state"
CON_STATE_ONLINE = "online"
CON_STATE_OFFLINE = "offline"
ICONS = {
"suburban": "mdi:subway-variant",
"subway": "mdi:subway",
"tram": "mdi:tram",
"bus": "mdi:bus",
"regional": "mdi:train",
"ferry": "mdi:ferry",
"express": "mdi:train",
"n/a": "mdi:clock",
None: "mdi:clock",
}
SCAN_INTERVAL = timedelta(seconds=60)
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_STOP_ID): cv.string,
vol.Required(CONF_DESTINATION): vol.All(cv.ensure_list, [cv.string]),
vol.Optional(CONF_MIN_DUE_IN, default=10): cv.positive_int,
vol.Optional(CONF_CACHE_PATH, default="/"): cv.string,
vol.Optional(CONF_NAME, default="BVG"): cv.string,
vol.Optional(CONF_CACHE_SIZE, default=90): cv.positive_int,
}
)
def setup_platform(hass, config, add_entities, discovery_info=None):
"""Setup the sensor platform."""
stop_id = config[CONF_STOP_ID]
direction = config.get(CONF_DESTINATION)
min_due_in = config.get(CONF_MIN_DUE_IN)
file_path = config.get(CONF_CACHE_PATH)
name = config.get(CONF_NAME)
cache_size = config.get(CONF_CACHE_SIZE)
add_entities(
[BvgSensor(name, stop_id, direction, min_due_in, file_path, hass, cache_size)]
)
class BvgSensor(Entity):
"""Representation of a Sensor."""
def __init__(
self, name, stop_id, direction, min_due_in, file_path, hass, cache_size
):
"""Initialize the sensor."""
self.hass_config = hass.config.as_dict()
self._cache_size = cache_size
self._cache_creation_date = None
self._isCacheValid = True
self._timezone = self.hass_config.get("time_zone")
self._name = name
self._state = None
self._stop_id = stop_id
self.direction = direction
self.min_due_in = min_due_in
self.url = "https://2.bvg.transport.rest/stations/{}/departures?duration={}".format(
self._stop_id, self._cache_size
)
self.data = None
self.singleConnection = None
self.file_path = self.hass_config.get("config_dir") + file_path
self.file_name = "bvg_{}.json".format(stop_id)
self._con_state = {CONNECTION_STATE: CON_STATE_ONLINE}
@property
def name(self):
"""Return the name of the sensor."""
return self._name
@property
def state(self):
"""Return the state of the sensor."""
return self._state
@property
def device_state_attributes(self):
"""Return the state attributes."""
if self.singleConnection is not None:
return {
ATTR_STOP_ID: self._stop_id,
ATTR_STOP_NAME: self.singleConnection.get(ATTR_STOP_NAME),
ATTR_DELAY: self.singleConnection.get(ATTR_DELAY),
ATTR_REAL_TIME: self.singleConnection.get(ATTR_REAL_TIME),
ATTR_DESTINATION: self.singleConnection.get(ATTR_DESTINATION),
ATTR_TRANS_TYPE: self.singleConnection.get(ATTR_TRANS_TYPE),
ATTR_LINE_NAME: self.singleConnection.get(ATTR_LINE_NAME),
}
else:
return {
ATTR_STOP_ID: "n/a",
ATTR_STOP_NAME: "n/a",
ATTR_DELAY: "n/a",
ATTR_REAL_TIME: "n/a",
ATTR_DESTINATION: "n/a",
ATTR_TRANS_TYPE: "n/a",
ATTR_LINE_NAME: "n/a",
}
@property
def unit_of_measurement(self):
"""Return the unit of measurement."""
return "min"
@property
def icon(self):
"""Icon to use in the frontend, if any."""
if self.singleConnection is not None:
return ICONS.get(self.singleConnection.get(ATTR_TRANS_TYPE))
else:
return ICONS.get(None)
def update(self):
"""Fetch new state data for the sensor.
This is the only method that should fetch new data for Home Assistant.
"""
self.fetchDataFromURL
self.singleConnection = self.getSingleConnection(
self.direction, self.min_due_in, 0
)
if self.singleConnection is not None and len(self.singleConnection) > 0:
self._state = self.singleConnection.get(ATTR_DUE_IN)
else:
self._state = "n/a"
# only custom code beyond this line
@property
def fetchDataFromURL(self):
try:
with urlopen(self.url) as response:
source = response.read().decode("utf8")
self.data = json.loads(source)
if self._con_state.get(CONNECTION_STATE) is CON_STATE_OFFLINE:
_LOGGER.info("Connection to BVG API re-established")
self._con_state.update({CONNECTION_STATE: CON_STATE_ONLINE})
# write the response to a file for caching if connection is not available, which seems to happen from time to time
try:
with open("{}{}".format(self.file_path, self.file_name), "w") as fd:
# self.data = json.load(fd)
json.dump(self.data, fd, ensure_ascii=False)
# json.writes(response)
self._cache_creation_date = datetime.now(
pytz.timezone(self._timezone)
)
except IOError as e:
_LOGGER.error(
"Could not write file. Please check your configuration and read/write access for path:{}".format(
self.file_path
)
)
_LOGGER.error("I/O error({}): {}".format(e.errno, e.strerror))
except URLError as e:
if self._con_state.get(CONNECTION_STATE) is CON_STATE_ONLINE:
_LOGGER.debug(e)
_LOGGER.info("Connection to BVG API lost, using local cache instead")
self._con_state.update({CONNECTION_STATE: CON_STATE_OFFLINE})
self.fetchDataFromFile()
def fetchDataFromFile(self):
try:
with open("{}{}".format(self.file_path, self.file_name), "r") as fd:
self.data = json.load(fd)
except IOError as e:
_LOGGER.error(
"Could not read file. Please check your configuration and read/write access for path: {}".format(
self.file_path
)
)
_LOGGER.error("I/O error({}): {}".format(e.errno, e.strerror))
def getSingleConnection(self, direction, min_due_in, nmbr):
timetable_l = list()
date_now = datetime.now(pytz.timezone(self.hass_config.get("time_zone")))
for dest in direction:
_LOGGER.debug(f"Checking direction {dest}")
for pos in self.data:
# _LOGGER.warning("conf_direction: {} pos_direction {}".format(direction, pos['direction']))
# if pos['direction'] in direction:
if dest in pos["direction"]:
if pos["when"] is None:
continue
dep_time = datetime.strptime(pos["when"][:-6], "%Y-%m-%dT%H:%M:%S")
dep_time = pytz.timezone("Europe/Berlin").localize(dep_time)
delay = (pos["delay"] // 60) if pos["delay"] is not None else 0
departure_td = dep_time - date_now
# check if connection is not in the past
if departure_td > timedelta(days=0):
departure_td = departure_td.seconds // 60
if departure_td >= min_due_in:
timetable_l.append(
{
ATTR_DESTINATION: pos["direction"],
ATTR_REAL_TIME: dep_time,
ATTR_DUE_IN: departure_td,
ATTR_DELAY: delay,
ATTR_TRIP_ID: pos["tripId"],
ATTR_STOP_NAME: pos["stop"]["name"],
ATTR_TRANS_TYPE: pos["line"]["product"],
ATTR_LINE_NAME: pos["line"]["name"],
}
)
_LOGGER.debug("Connection found")
else:
_LOGGER.debug(
"Connection is due in under {} minutes".format(
min_due_in
)
)
else:
_LOGGER.debug("Connection lies in the past")
else:
_LOGGER.debug("Not a connection for specified direction")
try:
# Sort connections by when they are due
# This is needed when checking multiple directions and fixes possible sorting errors of the BVG API.
timetable_l.sort(key=lambda connection: connection[ATTR_DUE_IN])
_LOGGER.debug("Valid connections found")
_LOGGER.debug("Connections: {}".format(timetable_l))
return timetable_l[int(nmbr)]
except IndexError as e:
if self.isCacheValid():
_LOGGER.warning(
"No valid connection found for sensor named {}. Please check your configuration.".format(
self.name
)
)
self._isCacheValid = True
else:
if self._isCacheValid:
_LOGGER.warning("Cache is outdated.")
self._isCacheValid = False
# _LOGGER.error(e)
return None
def isCacheValid(self):
date_now = datetime.now(pytz.timezone(self.hass_config.get("time_zone")))
# If the component is starting without internet connection
if self._cache_creation_date is None:
self._cache_creation_date = datetime.fromtimestamp(
os.path.getmtime("{}{}".format(self.file_path, self.file_name)),
pytz.timezone(self._timezone),
)
td = self._cache_creation_date - date_now
td = td.seconds
_LOGGER.debug("td is: {}".format(td))
if td > (self._cache_size * 60):
_LOGGER.debug("Cache Age (not valid): {}".format(td // 60))
return False
else:
return True