Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
procamora committed Mar 18, 2020
1 parent 446af59 commit 218d60f
Show file tree
Hide file tree
Showing 8 changed files with 250 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@

.idea/

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
Expand Down
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[submodule "sqlite"]
path = sqlite
url = git@github.com:procamora/python3-sqlite.git
154 changes: 154 additions & 0 deletions ScanNmap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from __future__ import annotations

import ipaddress
import subprocess
from dataclasses import dataclass, field
from typing import List, Optional, Tuple, Union, Dict, Text

import netifaces
import nmap
from mac_vendor_lookup import MacLookup

from host import Host
from implement_sqlite import select_all_hosts, insert_host, update_host, update_host_offline
from sqlite.logger import get_logger, logging

logger: logging = get_logger(False, 'sqlite')


@dataclass
class ScanNmap:
local_interfaces: Dict[Text, Text] = field(default_factory=dict)
subnets: List[Union[ipaddress.IPv4Interface, ipaddress.IPv6Interface]] = field(default_factory=list)
hosts_db: Dict[Text, Host] = field(default_factory=dict)
vendor = MacLookup()

def __post_init__(self):
self.vendor.load_vendors()
self.set_local_interfaces()
self.set_ip_interfaces()
self.hosts_db = select_all_hosts()

def set_local_interfaces(self: ScanNmap):
"""
Metodo para obtener todas las interfaces del sistema que tienen asignada una direccion IP y enlazarla con su MAC
en un diccionario
:return:
"""
for i in netifaces.interfaces():
addrs = netifaces.ifaddresses(i)
if netifaces.AF_INET in addrs and netifaces.AF_LINK in addrs:
self.local_interfaces[addrs[netifaces.AF_INET][0]['addr']] = addrs[netifaces.AF_LINK][0]['addr']

def set_ip_interfaces(self: ScanNmap):
"""
Metodo para obtener las IP con las subred, uso este metodo en vez de utilizar el anterior por comodida, ya que
de esta forma se obtiene la mascara en formato valido para pasarselo a nmap
:return:
"""
# Obtengo todas las interfaces, filtrando aquillas que no son globales (scope global)
command: Text = 'ip addr | grep "inet" | grep -vE "scope host|scope link"'
stdout, stderr, ex = ScanNmap.execute_command(command)
if len(stderr) > 0:
logger.error(f'{stderr}')

for i in stdout.split('\n'):
if len(i) > 0:
interface = i.strip().split(' ')[1]
subnet = ipaddress.ip_interface(interface)
self.subnets.append(subnet)

def get_mac_aux(self: ScanNmap, ip: Text) -> Text:
"""
Metodo para obtener del sistema la MAC asociada a una IP
:param ip:
:return:
"""
# Si es una IP del propio sistema la devulvo directamente, ya que no esta en la tabla ARP
if ip in self.local_interfaces.keys():
return self.local_interfaces[ip]

command: Text = f'ip neigh show | grep "{ip} dev"'
stdout, stderr, ex = self.execute_command(command)
if len(stderr) > 0:
logger.error(f'{ip} -> {stderr}')
return stdout.strip().split(' ')[4]

def ping_scan(self: ScanNmap, subnet: Union[ipaddress.IPv4Interface, ipaddress.IPv6Interface]) -> List[Host]:
hosts: List[Host] = list()
nm: nmap.nmap.PortScanner = nmap.PortScanner()
scan: Dict = nm.scan(hosts=str(subnet), arguments='-n -sP', sudo=False)
print(nm.command_line())

ip: Text
for ip in nm.all_hosts():
t: nmap.nmap.PortScannerHostDict = nm[ip]
ip = t["addresses"]["ipv4"]

if 'mac' not in t["addresses"]:
mac = self.get_mac_aux(ip)
else:
mac = t["addresses"]["mac"]

if len(t["vendor"]) == 0:
# desc = MacLookup().lookup(mac)
vend = self.vendor.lookup(mac)
else:
vend = t["vendor"]

host: Host = Host(ip, mac, True, vend, scan["nmap"]["scanstats"]["timestr"])
print(host)
hosts.append(host)

return hosts

def update_or_insert_host(self: ScanNmap, hosts: List[Host]):
host: Host
for host in hosts:
if host.ip in self.hosts_db.keys():
print(f'update {host}')
update_host(host)
else:
insert_host(host)
logger.info(f'new host: {host}')

def run(self: ScanNmap):
for subnet in self.subnets:
logger.info(f'Scanning: {subnet}')
hosts: List[Host] = self.ping_scan(subnet)
self.update_or_insert_host(hosts)
# FIXME plantear otro diseño, su hay varias interfaces pondria el del resto como inactivos
update_host_offline(hosts[0].date)

#import sys
#sys.exit(0)

@staticmethod
def format_text(param_text: bytes) -> Optional[Text]:
if param_text is not None:
text = param_text.decode('utf-8')
return str(text)
return param_text

@staticmethod
def execute_command(command: Text) -> Tuple[Text, Text, subprocess.Popen]:
# FIXME CAMBIAR Popen por run
execute = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = execute.communicate()
return ScanNmap.format_text(stdout), ScanNmap.format_text(stderr), execute


def main():
a = ScanNmap()
a.run()


if __name__ == '__main__':
main()

# https://github.com/maaaaz/nmaptocsv

# ENVIAR UN MSG CON LAS IPs DE LAS INTERFACES
Binary file added database.db
Binary file not shown.
18 changes: 18 additions & 0 deletions host.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from __future__ import annotations

from dataclasses import dataclass
from typing import Text


@dataclass
class Host():
ip: Text
mac: Text
active: bool # Es in integer en la BD 0 (false) and 1 (true).
vendor: Text
date: Text
description: Text = str()
id: int = int()
59 changes: 59 additions & 0 deletions implement_sqlite.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-


from pathlib import Path # nueva forma de trabajar con rutas
from typing import Dict, Any, List, Text, NoReturn

from host import Host
from sqlite.interface_sqlite import *
from sqlite.logger import get_logger, logging

logger: logging = get_logger(False, 'sqlite')
DB: Path = Path("database.db")


def select_all_hosts() -> Dict[Text, Host]:
query: Text = "SELECT * FROM Hosts"
response_query: List[Dict[Text, Any]] = conection_sqlite(DB, query, is_dict=True)
response: Dict[Text, Host] = dict()
for i in response_query:
active: bool = bool(int(i['active'])) # La conversion a bool se hace con un int no con str
host: Host = Host(i['ip'], i['mac'], active, i['vendor'], i['date'], i['description'], i['id'])
response[host.ip] = host
return response


def insert_host(host: Host) -> NoReturn:
# active es in integer en la BD 0 (false) and 1 (true). Si se inserta es porque esta activo
# >> > int(True == True)
# 1
# >> > int(False == True)
# 0

active: int = int(host.active == True)

query: Text = f"INSERT INTO Hosts(ip, mac, active, vendor, description, date) VALUES ('{host.ip}','{host.mac}', " \
f"{active}, '{host.vendor}','{host.description}','{host.date}');"
logger.debug(query)
conection_sqlite(DB, query)


def update_host(host: Host) -> NoReturn:
active: int = int(host.active == True)
query: Text = f"UPDATE Hosts SET ip='{host.ip}', mac='{host.mac}', vendor='{host.vendor}', date='{host.date}', " \
f"active={active} WHERE ip LIKE '{host.ip}';"
logger.debug(query)
conection_sqlite(DB, query)


def update_host_offline(date: Text):
"""
Metodo que pone inactivo todos aquellos host que no se hayan actualizado con el ultimo escaneo
:param date:
:return:
"""
query: Text = f"UPDATE Hosts SET active=0 WHERE date <> '{date}';"
logger.debug(query)
print(query)
conection_sqlite(DB, query)
12 changes: 12 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Obtener interfaces locales y sus MACs
netifaces

# Obtener el fabricante de una MAC
mac-vendor-lookup

# nmap
python-nmap

# Logging con color
colorlog
Werkzeug
1 change: 1 addition & 0 deletions sqlite
Submodule sqlite added at 18bd75

0 comments on commit 218d60f

Please sign in to comment.