Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions modules/test/dns/python/src/dns_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,15 +226,15 @@ def extract_dns_data(self):

def _has_dns_traffic(self, tcpdump_filter):
dns_server_queries = self._exec_tcpdump(tcpdump_filter,
DNS_SERVER_CAPTURE_FILE)
self.dns_server_capture_file)
LOGGER.info('DNS Server queries found: ' + str(len(dns_server_queries)))

dns_startup_queries = self._exec_tcpdump(tcpdump_filter,
STARTUP_CAPTURE_FILE)
self.startup_capture_file)
LOGGER.info('Startup DNS queries found: ' + str(len(dns_startup_queries)))

dns_monitor_queries = self._exec_tcpdump(tcpdump_filter,
MONITOR_CAPTURE_FILE)
self.monitor_capture_file)
LOGGER.info('Monitor DNS queries found: ' + str(len(dns_monitor_queries)))

num_query_dns = len(dns_server_queries) + len(dns_startup_queries) + len(
Expand All @@ -243,6 +243,10 @@ def _has_dns_traffic(self, tcpdump_filter):

return num_query_dns > 0

# Added to access the method for dns unittests
def dns_network_from_dhcp(self):
return self._dns_network_from_dhcp()

def _dns_network_from_dhcp(self):
LOGGER.info('Running dns.network.from_dhcp')
LOGGER.info('Checking DNS traffic for configured DHCP DNS server: ' +
Expand All @@ -255,10 +259,9 @@ def _dns_network_from_dhcp(self):
dns_packets_local = self._has_dns_traffic(tcpdump_filter=tcpdump_filter)

# Check if the device sends any DNS traffic to non-DHCP provided server
tcpdump_filter = (f'dst port 53 and dst not host {self._dns_server} ' +
'ether src {self._device_mac}')
tcpdump_filter = (f'dst port 53 and not dst host {self._dns_server} ' +
f'and ether src {self._device_mac}')
dns_packets_not_local = self._has_dns_traffic(tcpdump_filter=tcpdump_filter)

if dns_packets_local or dns_packets_not_local:
if dns_packets_not_local:
description = 'DNS traffic detected to non-DHCP provided server'
Expand Down
Binary file added testing/unit/dns/captures/dns_no_dns/dns.pcap
Binary file not shown.
Binary file not shown.
Binary file added testing/unit/dns/captures/dns_no_dns/startup.pcap
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
157 changes: 111 additions & 46 deletions testing/unit/dns/dns_module_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
"""Module run all the DNS related unit tests"""
from dns_module import DNSModule
import unittest
from scapy.all import rdpcap, DNS, wrpcap
import os
import sys

Expand All @@ -25,14 +24,29 @@
OUTPUT_DIR = os.path.join(TEST_FILES_DIR, 'output/')
REPORTS_DIR = os.path.join(TEST_FILES_DIR, 'reports/')
CAPTURES_DIR = os.path.join(TEST_FILES_DIR, 'captures/')
DNS_NON_DHCP_SERVER_DIR = os.path.join(CAPTURES_DIR, 'dns_non_dhcp_server')
DNS_DHCP_SERVER_DIR = os.path.join(CAPTURES_DIR, 'dns_dhcp_server')
DNS_NO_DNS_DIR = os.path.join(CAPTURES_DIR, 'dns_no_dns')

LOCAL_REPORT = os.path.join(REPORTS_DIR, 'dns_report_local.html')
LOCAL_REPORT_NO_DNS = os.path.join(REPORTS_DIR, 'dns_report_local_no_dns.html')

# Define the capture files to be used for the test
DNS_SERVER_CAPTURE_FILE = os.path.join(CAPTURES_DIR, 'dns.pcap')
STARTUP_CAPTURE_FILE = os.path.join(CAPTURES_DIR, 'startup.pcap')
MONITOR_CAPTURE_FILE = os.path.join(CAPTURES_DIR, 'monitor.pcap')
# The capture files with dns traffic to the provided dhcp server
DNS_DHCP_SERVER_CAPTURE = os.path.join(DNS_DHCP_SERVER_DIR, 'dns.pcap')
STARTUP_DHCP_SERVER_CAPTURE = os.path.join(DNS_DHCP_SERVER_DIR, 'startup.pcap')
MONITOR_DHCP_SERVER_CAPTURE = os.path.join(DNS_DHCP_SERVER_DIR, 'monitor.pcap')

# The capture files with dns traffic to non-dhcp server
DNS_NON_DHCP_SERVER_CAPTURE = os.path.join(DNS_NON_DHCP_SERVER_DIR, 'dns.pcap')
STARTUP_NON_DHCP_SERVER_CAPTURE = os.path.join(DNS_NON_DHCP_SERVER_DIR,
'startup.pcap')
MONITOR_NON_DHCP_SERVER_CAPTURE = os.path.join(DNS_NON_DHCP_SERVER_DIR,
'monitor.pcap')

# The capture files with no dns traffic
DNS_NO_DNS_CAPTURE = os.path.join(DNS_NO_DNS_DIR, 'dns.pcap')
STARTUP_NO_DNS_CAPTURE = os.path.join(DNS_NO_DNS_DIR, 'startup.pcap')
MONITOR_NO_DNS_CAPTURE = os.path.join(DNS_NO_DNS_DIR, 'monitor.pcap')

class DNSModuleTest(unittest.TestCase):
"""Contains and runs all the unit tests concerning DNS behaviors"""
Expand All @@ -44,14 +58,18 @@ def setUpClass(cls):

# Set the MAC address for device in capture files
os.environ['DEVICE_MAC'] = '38:d1:35:01:17:fe'

# Test the module report generation
def dns_module_report_test(self):

# Create a DNSModule instance
dns_module = DNSModule(module=MODULE,
results_dir=OUTPUT_DIR,
dns_server_capture_file=DNS_SERVER_CAPTURE_FILE,
startup_capture_file=STARTUP_CAPTURE_FILE,
monitor_capture_file=MONITOR_CAPTURE_FILE)
dns_server_capture_file=DNS_DHCP_SERVER_CAPTURE,
startup_capture_file=STARTUP_DHCP_SERVER_CAPTURE,
monitor_capture_file=MONITOR_DHCP_SERVER_CAPTURE)

# Generate the report
report_out_path = dns_module.generate_module_report()

# Read the generated report
Expand All @@ -62,76 +80,123 @@ def dns_module_report_test(self):
with open(LOCAL_REPORT, 'r', encoding='utf-8') as file:
report_local = file.read()

# Assert that the generated report is equal to the local report
self.assertEqual(report_out, report_local)

# Test the module report generation if no DNS traffic
# is available
# Test the module report generation if no DNS traffic found
def dns_module_report_no_dns_test(self):
# Read the pcap files
packets_dns_server = rdpcap(DNS_SERVER_CAPTURE_FILE)
packets_startup = rdpcap(STARTUP_CAPTURE_FILE)
packets_monitor = rdpcap(MONITOR_CAPTURE_FILE)

# Filter out packets containing DNS
packets_dns_server = [
packets_dns_server for packets_dns_server in packets_dns_server
if not packets_dns_server.haslayer(DNS)
]
packets_startup = [
packets_startup for packets_startup in packets_startup
if not packets_startup.haslayer(DNS)
]
packets_monitor = [
packets_monitor for packets_monitor in packets_monitor
if not packets_monitor.haslayer(DNS)
]

# Write the filtered packets to a new .pcap file
dns_server_cap_file = os.path.join(OUTPUT_DIR, 'dns_no_dns.pcap')
startup_cap_file = os.path.join(OUTPUT_DIR, 'startup_no_dns.pcap')
monitor_cap_file = os.path.join(OUTPUT_DIR, 'monitor_no_dns.pcap')
wrpcap(dns_server_cap_file, packets_dns_server)
wrpcap(startup_cap_file, packets_startup)
wrpcap(monitor_cap_file, packets_monitor)

dns_module = DNSModule(module='dns',

# Create a DNSModule instance
dns_module = DNSModule(module=MODULE,
results_dir=OUTPUT_DIR,
dns_server_capture_file=dns_server_cap_file,
startup_capture_file=startup_cap_file,
monitor_capture_file=monitor_cap_file)
dns_server_capture_file=DNS_NO_DNS_CAPTURE,
startup_capture_file=STARTUP_NO_DNS_CAPTURE,
monitor_capture_file=MONITOR_NO_DNS_CAPTURE)

# Create the report
report_out_path = dns_module.generate_module_report()

# Read the generated report
# Generate the generated report
with open(report_out_path, 'r', encoding='utf-8') as file:
report_out = file.read()

# Read the local good report
with open(LOCAL_REPORT_NO_DNS, 'r', encoding='utf-8') as file:
report_local = file.read()

# Assert that the generated report is equal to the local report
self.assertEqual(report_out, report_local)

# Test the extraction of DNS data
def extract_dns_data_test(self):

# Create a DNSModule instance
dns_module = DNSModule(module=MODULE,
results_dir=OUTPUT_DIR,
dns_server_capture_file=DNS_SERVER_CAPTURE_FILE,
startup_capture_file=STARTUP_CAPTURE_FILE,
monitor_capture_file=MONITOR_CAPTURE_FILE)
dns_server_capture_file=DNS_DHCP_SERVER_CAPTURE,
startup_capture_file=STARTUP_DHCP_SERVER_CAPTURE,
monitor_capture_file=MONITOR_DHCP_SERVER_CAPTURE)

# Extract the DNS data
dns_data = dns_module.extract_dns_data()

self.assertTrue(len(dns_data) > 0)

# Test dns.network.from_dhcp for traffic detected to DHCP server
def dns_traffic_to_dhcp_provided_server_test(self):

# Create a DNSModule instance
dns_module = DNSModule(module=MODULE,
results_dir=OUTPUT_DIR,
dns_server_capture_file=DNS_DHCP_SERVER_CAPTURE,
startup_capture_file=STARTUP_DHCP_SERVER_CAPTURE,
monitor_capture_file=MONITOR_DHCP_SERVER_CAPTURE)

# Get the result from dns.network.from_dhcp test
result = dns_module.dns_network_from_dhcp()

# Assign the expected test result
description = 'DNS traffic detected only to DHCP provided server'
expected_result = ('Informational', description)

# Assert that the actual result matches the expected result
self.assertEqual(expected_result, result)

# Test dns.network.from_dhcp for traffic detected to non-DHCP servers
def dns_traffic_to_non_dhcp_server_test(self):

# Set the MAC address for device in capture files
os.environ['DEVICE_MAC'] = '00:30:64:8a:c8:cc'

# Create a DNSModule instance
dns_module = DNSModule(module=MODULE,
results_dir=OUTPUT_DIR,
dns_server_capture_file=DNS_NON_DHCP_SERVER_CAPTURE,
startup_capture_file=STARTUP_NON_DHCP_SERVER_CAPTURE,
monitor_capture_file=MONITOR_NON_DHCP_SERVER_CAPTURE)

# Get the result from dns.network.from_dhcp test
result = dns_module.dns_network_from_dhcp()

# Assign the expected test result
description = 'DNS traffic detected to non-DHCP provided server'
expected_result = ('Informational', description)

# Assert that the actual result matches the expected result
self.assertEqual(expected_result, result)

# Test dns.network.from_dhcp when no traffic is detected
def dns_no_dns_traffic_test(self):

# Create a DNSModule instance
dns_module = DNSModule(module=MODULE,
results_dir=OUTPUT_DIR,
dns_server_capture_file=DNS_NO_DNS_CAPTURE,
startup_capture_file=STARTUP_NO_DNS_CAPTURE,
monitor_capture_file=MONITOR_NO_DNS_CAPTURE)

# Get the result from dns.network.from_dhcp test
result = dns_module.dns_network_from_dhcp()

# Assign the expected test result
description = 'No DNS traffic detected from the device'
expected_result = ('Informational', description)

# Assert that the actual result matches the expected result.
self.assertEqual(expected_result, result)

if __name__ == '__main__':
suite = unittest.TestSuite()

# Module report test
suite.addTest(DNSModuleTest('dns_module_report_test'))
suite.addTest(DNSModuleTest('dns_module_report_no_dns_test'))
suite.addTest(DNSModuleTest('extract_dns_data_test'))
suite.addTest(DNSModuleTest('dns_traffic_to_dhcp_provided_server_test'))
suite.addTest(DNSModuleTest('dns_traffic_to_non_dhcp_server_test'))
suite.addTest(DNSModuleTest('dns_no_dns_traffic_test'))

# Run the tests

runner = unittest.TextTestRunner()
test_result = runner.run(suite)

Expand Down
Loading