forked from home-assistant/core
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add Fortigate integration (home-assistant#24908)
* Add Fortigate integration * added feedback changes * removed the only case * fixed a description * removed the CONFIG_PLATFORM * deleted README * added return from setup * added return from setup * fixed reviews * Link updated * Rename var and a couple of other minor changes * Typos
- Loading branch information
Showing
6 changed files
with
181 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
"""Fortigate integration.""" | ||
import logging | ||
|
||
import voluptuous as vol | ||
|
||
from homeassistant.const import ( | ||
CONF_DEVICES, CONF_HOST, CONF_API_KEY, CONF_USERNAME, | ||
EVENT_HOMEASSISTANT_STOP) | ||
from homeassistant.helpers import config_validation as cv | ||
from homeassistant.helpers.discovery import async_load_platform | ||
|
||
_LOGGER = logging.getLogger(__name__) | ||
|
||
DOMAIN = 'fortigate' | ||
|
||
DATA_FGT = DOMAIN | ||
|
||
CONFIG_SCHEMA = vol.Schema({ | ||
DOMAIN: vol.Schema({ | ||
vol.Required(CONF_HOST): cv.string, | ||
vol.Required(CONF_USERNAME): cv.string, | ||
vol.Required(CONF_API_KEY): cv.string, | ||
vol.Optional(CONF_DEVICES, default=[]): | ||
vol.All(cv.ensure_list, [cv.string]), | ||
}) | ||
}, extra=vol.ALLOW_EXTRA) | ||
|
||
|
||
async def async_setup(hass, config): | ||
"""Start the Fortigate component.""" | ||
conf = config[DOMAIN] | ||
|
||
host = conf[CONF_HOST] | ||
user = conf[CONF_USERNAME] | ||
api_key = conf[CONF_API_KEY] | ||
devices = conf[CONF_DEVICES] | ||
|
||
is_success = await async_setup_fortigate( | ||
hass, config, host, user, api_key, devices | ||
) | ||
|
||
return is_success | ||
|
||
|
||
async def async_setup_fortigate(hass, config, host, user, api_key, devices): | ||
"""Start up the Fortigate component platforms.""" | ||
from pyFGT.fortigate import FGTConnectionError, FortiGate | ||
|
||
fgt = FortiGate(host, user, apikey=api_key, disable_request_warnings=True) | ||
|
||
try: | ||
fgt.login() | ||
except FGTConnectionError: | ||
_LOGGER.error("Failed to connect to Fortigate") | ||
return False | ||
|
||
hass.data[DATA_FGT] = { | ||
'fgt': fgt, | ||
'devices': devices | ||
} | ||
|
||
hass.async_create_task(async_load_platform( | ||
hass, 'device_tracker', DOMAIN, {}, config)) | ||
|
||
async def close_fgt(event): | ||
"""Close Fortigate connection on HA Stop.""" | ||
fgt.logout() | ||
|
||
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, close_fgt) | ||
|
||
return True |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
"""Device tracker for Fortigate firewalls.""" | ||
from collections import namedtuple | ||
import logging | ||
|
||
from homeassistant.components.device_tracker import DeviceScanner | ||
|
||
from . import DATA_FGT | ||
|
||
_LOGGER = logging.getLogger(__name__) | ||
|
||
DETECTED_DEVICES = "/monitor/user/detected-device" | ||
|
||
|
||
async def async_get_scanner(hass, config): | ||
"""Validate the configuration and return a Fortigate scanner.""" | ||
scanner = FortigateDeviceScanner(hass.data[DATA_FGT]) | ||
await scanner.async_connect() | ||
return scanner if scanner.success_init else None | ||
|
||
|
||
Device = namedtuple('Device', ['hostname', 'mac']) | ||
|
||
|
||
def _build_device(device_dict): | ||
"""Return a Device from data.""" | ||
return Device( | ||
device_dict['hostname'], | ||
device_dict['mac']) | ||
|
||
|
||
class FortigateDeviceScanner(DeviceScanner): | ||
"""Query the Fortigate firewall.""" | ||
|
||
def __init__(self, hass_data): | ||
"""Initialize the scanner.""" | ||
self.last_results = {} | ||
self.success_init = False | ||
self.connection = hass_data['fgt'] | ||
self.devices = hass_data['devices'] | ||
|
||
def get_results(self): | ||
"""Get the results from the Fortigate.""" | ||
results = self.connection.get( | ||
DETECTED_DEVICES, "vdom=root")[1]['results'] | ||
|
||
ret = [] | ||
for result in results: | ||
if 'hostname' not in result: | ||
continue | ||
|
||
ret.append(result) | ||
|
||
return ret | ||
|
||
async def async_connect(self): | ||
"""Initialize connection to the router.""" | ||
# Test if the firewall is accessible | ||
data = self.get_results() | ||
self.success_init = data is not None | ||
|
||
async def async_scan_devices(self): | ||
"""Scan for new devices and return a list with found device MACs.""" | ||
await self.async_update_info() | ||
return [device.mac for device in self.last_results] | ||
|
||
async def get_device_name(self, device): | ||
"""Return the name of the given device or None if we don't know.""" | ||
name = next(( | ||
result.hostname for result in self.last_results | ||
if result.mac == device), None) | ||
return name | ||
|
||
async def async_update_info(self): | ||
"""Ensure the information from the Fortigate firewall is up to date.""" | ||
_LOGGER.debug("Checking devices") | ||
|
||
hosts = self.get_results() | ||
|
||
all_results = [_build_device(device) for device in hosts | ||
if device['is_online']] | ||
|
||
# If the 'devices' configuration field is filled | ||
if self.devices is not None: | ||
last_results = [ | ||
device for device in all_results | ||
if device.hostname in self.devices | ||
] | ||
_LOGGER.debug(last_results) | ||
# If the 'devices' configuration field is not filled | ||
else: | ||
last_results = all_results | ||
|
||
self.last_results = last_results |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
{ | ||
"domain": "fortigate", | ||
"name": "Fortigate", | ||
"documentation": "https://www.home-assistant.io/components/fortigate", | ||
"dependencies": [], | ||
"codeowners": [ | ||
"@kifeo" | ||
], | ||
"requirements": [ | ||
"pyfgt==0.5.1" | ||
] | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters