forked from home-assistant/core
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_config_entries.py
314 lines (236 loc) · 10.4 KB
/
test_config_entries.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
"""Test the config manager."""
import asyncio
from datetime import timedelta
from unittest.mock import MagicMock, patch
import pytest
from homeassistant import config_entries, loader, data_entry_flow
from homeassistant.setup import async_setup_component
from homeassistant.util import dt
from tests.common import (
MockModule, mock_coro, MockConfigEntry, async_fire_time_changed)
@pytest.fixture
def manager(hass):
"""Fixture of a loaded config manager."""
manager = config_entries.ConfigEntries(hass, {})
manager._entries = []
manager._store._async_ensure_stop_listener = lambda: None
hass.config_entries = manager
return manager
@asyncio.coroutine
def test_call_setup_entry(hass):
"""Test we call <component>.setup_entry."""
MockConfigEntry(domain='comp').add_to_hass(hass)
mock_setup_entry = MagicMock(return_value=mock_coro(True))
loader.set_component(
hass, 'comp',
MockModule('comp', async_setup_entry=mock_setup_entry))
result = yield from async_setup_component(hass, 'comp', {})
assert result
assert len(mock_setup_entry.mock_calls) == 1
@asyncio.coroutine
def test_remove_entry(hass, manager):
"""Test that we can remove an entry."""
mock_unload_entry = MagicMock(return_value=mock_coro(True))
loader.set_component(
hass, 'test',
MockModule('comp', async_unload_entry=mock_unload_entry))
MockConfigEntry(domain='test', entry_id='test1').add_to_manager(manager)
MockConfigEntry(domain='test', entry_id='test2').add_to_manager(manager)
MockConfigEntry(domain='test', entry_id='test3').add_to_manager(manager)
assert [item.entry_id for item in manager.async_entries()] == \
['test1', 'test2', 'test3']
result = yield from manager.async_remove('test2')
assert result == {
'require_restart': False
}
assert [item.entry_id for item in manager.async_entries()] == \
['test1', 'test3']
assert len(mock_unload_entry.mock_calls) == 1
@asyncio.coroutine
def test_remove_entry_raises(hass, manager):
"""Test if a component raises while removing entry."""
@asyncio.coroutine
def mock_unload_entry(hass, entry):
"""Mock unload entry function."""
raise Exception("BROKEN")
loader.set_component(
hass, 'test',
MockModule('comp', async_unload_entry=mock_unload_entry))
MockConfigEntry(domain='test', entry_id='test1').add_to_manager(manager)
MockConfigEntry(domain='test', entry_id='test2').add_to_manager(manager)
MockConfigEntry(domain='test', entry_id='test3').add_to_manager(manager)
assert [item.entry_id for item in manager.async_entries()] == \
['test1', 'test2', 'test3']
result = yield from manager.async_remove('test2')
assert result == {
'require_restart': True
}
assert [item.entry_id for item in manager.async_entries()] == \
['test1', 'test3']
@asyncio.coroutine
def test_add_entry_calls_setup_entry(hass, manager):
"""Test we call setup_config_entry."""
mock_setup_entry = MagicMock(return_value=mock_coro(True))
loader.set_component(
hass, 'comp',
MockModule('comp', async_setup_entry=mock_setup_entry))
class TestFlow(data_entry_flow.FlowHandler):
VERSION = 1
@asyncio.coroutine
def async_step_user(self, user_input=None):
return self.async_create_entry(
title='title',
data={
'token': 'supersecret'
})
with patch.dict(config_entries.HANDLERS, {'comp': TestFlow, 'beer': 5}):
yield from manager.flow.async_init(
'comp', context={'source': config_entries.SOURCE_USER})
yield from hass.async_block_till_done()
assert len(mock_setup_entry.mock_calls) == 1
p_hass, p_entry = mock_setup_entry.mock_calls[0][1]
assert p_hass is hass
assert p_entry.data == {
'token': 'supersecret'
}
@asyncio.coroutine
def test_entries_gets_entries(manager):
"""Test entries are filtered by domain."""
MockConfigEntry(domain='test').add_to_manager(manager)
entry1 = MockConfigEntry(domain='test2')
entry1.add_to_manager(manager)
entry2 = MockConfigEntry(domain='test2')
entry2.add_to_manager(manager)
assert manager.async_entries('test2') == [entry1, entry2]
@asyncio.coroutine
def test_domains_gets_uniques(manager):
"""Test we only return each domain once."""
MockConfigEntry(domain='test').add_to_manager(manager)
MockConfigEntry(domain='test2').add_to_manager(manager)
MockConfigEntry(domain='test2').add_to_manager(manager)
MockConfigEntry(domain='test').add_to_manager(manager)
MockConfigEntry(domain='test3').add_to_manager(manager)
assert manager.async_domains() == ['test', 'test2', 'test3']
async def test_saving_and_loading(hass):
"""Test that we're saving and loading correctly."""
loader.set_component(
hass, 'test',
MockModule('test', async_setup_entry=lambda *args: mock_coro(True)))
class TestFlow(data_entry_flow.FlowHandler):
VERSION = 5
@asyncio.coroutine
def async_step_user(self, user_input=None):
return self.async_create_entry(
title='Test Title',
data={
'token': 'abcd'
}
)
with patch.dict(config_entries.HANDLERS, {'test': TestFlow}):
await hass.config_entries.flow.async_init(
'test', context={'source': config_entries.SOURCE_USER})
class Test2Flow(data_entry_flow.FlowHandler):
VERSION = 3
@asyncio.coroutine
def async_step_user(self, user_input=None):
return self.async_create_entry(
title='Test 2 Title',
data={
'username': 'bla'
}
)
with patch('homeassistant.config_entries.HANDLERS.get',
return_value=Test2Flow):
await hass.config_entries.flow.async_init(
'test', context={'source': config_entries.SOURCE_USER})
# To trigger the call_later
async_fire_time_changed(hass, dt.utcnow() + timedelta(seconds=1))
# To execute the save
await hass.async_block_till_done()
# Now load written data in new config manager
manager = config_entries.ConfigEntries(hass, {})
await manager.async_load()
# Ensure same order
for orig, loaded in zip(hass.config_entries.async_entries(),
manager.async_entries()):
assert orig.version == loaded.version
assert orig.domain == loaded.domain
assert orig.title == loaded.title
assert orig.data == loaded.data
assert orig.source == loaded.source
async def test_forward_entry_sets_up_component(hass):
"""Test we setup the component entry is forwarded to."""
entry = MockConfigEntry(domain='original')
mock_original_setup_entry = MagicMock(return_value=mock_coro(True))
loader.set_component(
hass, 'original',
MockModule('original', async_setup_entry=mock_original_setup_entry))
mock_forwarded_setup_entry = MagicMock(return_value=mock_coro(True))
loader.set_component(
hass, 'forwarded',
MockModule('forwarded', async_setup_entry=mock_forwarded_setup_entry))
await hass.config_entries.async_forward_entry_setup(entry, 'forwarded')
assert len(mock_original_setup_entry.mock_calls) == 0
assert len(mock_forwarded_setup_entry.mock_calls) == 1
async def test_forward_entry_does_not_setup_entry_if_setup_fails(hass):
"""Test we do not set up entry if component setup fails."""
entry = MockConfigEntry(domain='original')
mock_setup = MagicMock(return_value=mock_coro(False))
mock_setup_entry = MagicMock()
hass, loader.set_component(hass, 'forwarded', MockModule(
'forwarded',
async_setup=mock_setup,
async_setup_entry=mock_setup_entry,
))
await hass.config_entries.async_forward_entry_setup(entry, 'forwarded')
assert len(mock_setup.mock_calls) == 1
assert len(mock_setup_entry.mock_calls) == 0
async def test_discovery_notification(hass):
"""Test that we create/dismiss a notification when source is discovery."""
loader.set_component(hass, 'test', MockModule('test'))
await async_setup_component(hass, 'persistent_notification', {})
class TestFlow(data_entry_flow.FlowHandler):
VERSION = 5
async def async_step_discovery(self, user_input=None):
if user_input is not None:
return self.async_create_entry(
title='Test Title',
data={
'token': 'abcd'
}
)
return self.async_show_form(
step_id='discovery',
)
with patch.dict(config_entries.HANDLERS, {'test': TestFlow}):
result = await hass.config_entries.flow.async_init(
'test', context={'source': config_entries.SOURCE_DISCOVERY})
await hass.async_block_till_done()
state = hass.states.get('persistent_notification.config_entry_discovery')
assert state is not None
result = await hass.config_entries.flow.async_configure(
result['flow_id'], {})
assert result['type'] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
await hass.async_block_till_done()
state = hass.states.get('persistent_notification.config_entry_discovery')
assert state is None
async def test_discovery_notification_not_created(hass):
"""Test that we not create a notification when discovery is aborted."""
loader.set_component(hass, 'test', MockModule('test'))
await async_setup_component(hass, 'persistent_notification', {})
class TestFlow(data_entry_flow.FlowHandler):
VERSION = 5
async def async_step_discovery(self, user_input=None):
return self.async_abort(reason='test')
with patch.dict(config_entries.HANDLERS, {'test': TestFlow}):
await hass.config_entries.flow.async_init(
'test', context={'source': config_entries.SOURCE_DISCOVERY})
await hass.async_block_till_done()
state = hass.states.get('persistent_notification.config_entry_discovery')
assert state is None
async def test_loading_default_config(hass):
"""Test loading the default config."""
manager = config_entries.ConfigEntries(hass, {})
with patch('homeassistant.util.json.open', side_effect=FileNotFoundError):
await manager.async_load()
assert len(manager.async_entries()) == 0