Skip to content

Commit 69cc6af

Browse files
bachyapvizeli
authored andcommitted
Add support for recording history to Apache Kafka (home-assistant#25085)
* Add support for Apache Kafka * Simplified * Revert "Simplified" This reverts commit fde4624. * Revert "Revert "Simplified"" This reverts commit 5ae57e6. * Completed * Updated requirements * Updated .coveragerc * Removed unused import * Updated codeowner
1 parent 8fdebf4 commit 69cc6af

File tree

5 files changed

+128
-0
lines changed

5 files changed

+128
-0
lines changed

.coveragerc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ omit =
3434
homeassistant/components/androidtv/*
3535
homeassistant/components/anel_pwrctrl/switch.py
3636
homeassistant/components/anthemav/media_player.py
37+
homeassistant/components/apache_kafka/*
3738
homeassistant/components/apcupsd/*
3839
homeassistant/components/apple_tv/*
3940
homeassistant/components/aqualogic/*

CODEOWNERS

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ homeassistant/components/alpha_vantage/* @fabaff
2424
homeassistant/components/amazon_polly/* @robbiet480
2525
homeassistant/components/ambiclimate/* @danielhiversen
2626
homeassistant/components/ambient_station/* @bachya
27+
homeassistant/components/apache_kafka/* @bachya
2728
homeassistant/components/api/* @home-assistant/core
2829
homeassistant/components/aprs/* @PhilRW
2930
homeassistant/components/arcam_fmj/* @elupus
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
"""Support for Apache Kafka."""
2+
from datetime import datetime
3+
import json
4+
import logging
5+
6+
from aiokafka import AIOKafkaProducer
7+
import voluptuous as vol
8+
9+
from homeassistant.const import (
10+
CONF_IP_ADDRESS, CONF_PORT, EVENT_HOMEASSISTANT_STOP, EVENT_STATE_CHANGED,
11+
STATE_UNAVAILABLE, STATE_UNKNOWN)
12+
import homeassistant.helpers.config_validation as cv
13+
from homeassistant.helpers.entityfilter import FILTER_SCHEMA
14+
15+
_LOGGER = logging.getLogger(__name__)
16+
17+
DOMAIN = 'apache_kafka'
18+
19+
CONF_FILTER = 'filter'
20+
CONF_TOPIC = 'topic'
21+
22+
CONFIG_SCHEMA = vol.Schema({
23+
DOMAIN: vol.Schema({
24+
vol.Required(CONF_IP_ADDRESS): cv.string,
25+
vol.Required(CONF_PORT): cv.port,
26+
vol.Required(CONF_TOPIC): cv.string,
27+
vol.Optional(CONF_FILTER, default={}): FILTER_SCHEMA,
28+
}),
29+
}, extra=vol.ALLOW_EXTRA)
30+
31+
32+
async def async_setup(hass, config):
33+
"""Activate the Apache Kafka integration."""
34+
conf = config[DOMAIN]
35+
36+
kafka = hass.data[DOMAIN] = KafkaManager(
37+
hass,
38+
conf[CONF_IP_ADDRESS],
39+
conf[CONF_PORT],
40+
conf[CONF_TOPIC],
41+
conf[CONF_FILTER])
42+
43+
hass.bus.async_listen(EVENT_HOMEASSISTANT_STOP, kafka.shutdown())
44+
45+
await kafka.start()
46+
47+
return True
48+
49+
50+
class DateTimeJSONEncoder(json.JSONEncoder):
51+
"""Encode python objects.
52+
53+
Additionally add encoding for datetime objects as isoformat.
54+
"""
55+
56+
def default(self, o): # pylint: disable=E0202
57+
"""Implement encoding logic."""
58+
if isinstance(o, datetime):
59+
return o.isoformat()
60+
return super().default(o)
61+
62+
63+
class KafkaManager:
64+
"""Define a manager to buffer events to Kafka."""
65+
66+
def __init__(
67+
self,
68+
hass,
69+
ip_address,
70+
port,
71+
topic,
72+
entities_filter):
73+
"""Initialize."""
74+
self._encoder = DateTimeJSONEncoder()
75+
self._entities_filter = entities_filter
76+
self._hass = hass
77+
self._producer = AIOKafkaProducer(
78+
loop=hass.loop,
79+
bootstrap_servers="{0}:{1}".format(ip_address, port),
80+
compression_type="gzip",
81+
)
82+
self._topic = topic
83+
84+
def _encode_event(self, event):
85+
"""Translate events into a binary JSON payload."""
86+
state = event.data.get('new_state')
87+
if (state is None
88+
or state.state in (STATE_UNKNOWN, '', STATE_UNAVAILABLE)
89+
or not self._entities_filter(state.entity_id)):
90+
return
91+
92+
return json.dumps(
93+
obj=state.as_dict(),
94+
default=self._encoder.encode
95+
).encode('utf-8')
96+
97+
async def start(self):
98+
"""Start the Kafka manager."""
99+
self._hass.bus.async_listen(EVENT_STATE_CHANGED, self.write)
100+
await self._producer.start()
101+
102+
async def shutdown(self):
103+
"""Shut the manager down."""
104+
await self._producer.stop()
105+
106+
async def write(self, event):
107+
"""Write a binary payload to Kafka."""
108+
payload = self._encode_event(event)
109+
110+
if payload:
111+
await self._producer.send_and_wait(self._topic, payload)
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
{
2+
"domain": "apache_kafka",
3+
"name": "Apache Kafka",
4+
"documentation": "https://www.home-assistant.io/components/apache_kafka",
5+
"requirements": [
6+
"aiokafka==0.5.1"
7+
],
8+
"dependencies": [],
9+
"codeowners": [
10+
"@bachya"
11+
]
12+
}

requirements_all.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,9 @@ aiohue==1.9.1
150150
# homeassistant.components.imap
151151
aioimaplib==0.7.15
152152

153+
# homeassistant.components.apache_kafka
154+
aiokafka==0.5.1
155+
153156
# homeassistant.components.lifx
154157
aiolifx==0.6.7
155158

0 commit comments

Comments
 (0)