Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filebeats added. #410

Merged
merged 2 commits into from
Aug 4, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
302 changes: 298 additions & 4 deletions simulation-system/libs/csle-cli/src/csle_cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,8 @@ def stop_shell_complete(ctx, param, incomplete) -> List[str]:
"emulation-name | statsmanager | emulation_executions | pgadmin | all | nginx | postgresql "
"| docker | clustermanager | hostmanagers | hostmanager | clientmanager | snortmanagers "
"| snortmanager | elkmanager | trafficmanagers | trafficmanager | kafkamanager "
"| ossecmanagers | ossecmanager | ryumanager")
"| ossecmanagers | ossecmanager | ryumanager | filebeats | filebeat | metricbeat "
"| metricbeats")
def stop(entity: str, name: str, id: int = -1, ip: str = "", container_ip: str = "") -> None:
"""
Stops an entity
Expand Down Expand Up @@ -754,6 +755,14 @@ def stop(entity: str, name: str, id: int = -1, ip: str = "", container_ip: str =
stop_ossec_ids_manager(ip=ip, container_ip=container_ip, emulation=name, ip_first_octet=id)
elif entity == "ryumanager":
stop_ryu_manager(ip=ip, emulation=name, ip_first_octet=id)
elif entity == "filebeats":
stop_filebeats(ip=ip, emulation=name, ip_first_octet=id)
elif entity == "filebeat":
stop_filebeat(ip=ip, container_ip=container_ip, emulation=name, ip_first_octet=id)
elif entity == "metricbeats":
stop_metricbeats(ip=ip, emulation=name, ip_first_octet=id)
elif entity == "metricbeat":
stop_metricbeat(ip=ip, container_ip=container_ip, emulation=name, ip_first_octet=id)
else:
container_stopped = False
for node in config.cluster_config.cluster_nodes:
Expand Down Expand Up @@ -1224,6 +1233,110 @@ def stop_traffic_manager(ip: str, container_ip: str, emulation: str, ip_first_oc
bold=False)


def stop_filebeats(ip: str, emulation: str, ip_first_octet: int) -> None:
"""
Utility function for stopping the filebeats

:param ip: the ip of the node to stop the filebeats
:param emulation: the emulation of the execution
:param ip_first_octet: the ID of the execution
:return: None
"""
import csle_common.constants.constants as constants
from csle_common.metastore.metastore_facade import MetastoreFacade
config = MetastoreFacade.get_config(id=1)
for node in config.cluster_config.cluster_nodes:
if node.ip == ip or ip == "":
stopped = ClusterController.stop_filebeats(
ip=ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT, emulation=emulation,
ip_first_octet=ip_first_octet)
if stopped.outcome:
click.secho(f"Stopping filebeats on port:{constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT}")
else:
click.secho(f"Filebeats are not stopped:{constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT}",
bold=False)


def stop_filebeat(ip: str, container_ip: str, emulation: str, ip_first_octet: int) -> None:
"""
Utility function for stopping the filebeat

:param ip: the ip of the node to stop the filebeat
:param container_ip: the ip of the host that traffic is running on
:param emulation: the emulation of the execution
:param ip_first_octet: the ID of the execution
:return: None
"""
import csle_common.constants.constants as constants
from csle_common.metastore.metastore_facade import MetastoreFacade
config = MetastoreFacade.get_config(id=1)
for node in config.cluster_config.cluster_nodes:
if node.ip == ip or ip == "":
stopped = ClusterController.stop_filebeat(
ip=ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT, emulation=emulation,
ip_first_octet=ip_first_octet, container_ip=container_ip)
if stopped.outcome:
click.secho(
f"Stopping filebeat with ip {container_ip} on port:"
f"{constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT}")
else:
click.secho(f"Filebeat with ip {container_ip} is not "
f"stopped:{constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT}",
bold=False)


def stop_metricbeats(ip: str, emulation: str, ip_first_octet: int) -> None:
"""
Utility function for stopping the metricbeats

:param ip: the ip of the node to stop the metricbeats
:param emulation: the emulation of the execution
:param ip_first_octet: the ID of the execution
:return: None
"""
import csle_common.constants.constants as constants
from csle_common.metastore.metastore_facade import MetastoreFacade
config = MetastoreFacade.get_config(id=1)
for node in config.cluster_config.cluster_nodes:
if node.ip == ip or ip == "":
stopped = ClusterController.stop_metricbeats(
ip=ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT, emulation=emulation,
ip_first_octet=ip_first_octet)
if stopped.outcome:
click.secho(f"Stopping metricbeats on port:{constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT}")
else:
click.secho(f"Metricbeats are not stopped:{constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT}",
bold=False)


def stop_metricbeat(ip: str, container_ip: str, emulation: str, ip_first_octet: int) -> None:
"""
Utility function for stopping the metricbeat

:param ip: the ip of the node to stop the metricbeat
:param container_ip: the ip of the host that traffic is running on
:param emulation: the emulation of the execution
:param ip_first_octet: the ID of the execution
:return: None
"""
import csle_common.constants.constants as constants
from csle_common.metastore.metastore_facade import MetastoreFacade
config = MetastoreFacade.get_config(id=1)
for node in config.cluster_config.cluster_nodes:
if node.ip == ip or ip == "":
stopped = ClusterController.stop_metricbeat(
ip=ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT, emulation=emulation,
ip_first_octet=ip_first_octet, container_ip=container_ip)
if stopped.outcome:
click.secho(
f"Stopping metricbeat with ip {container_ip} on port:"
f"{constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT}")
else:
click.secho(f"Metricbeat with ip {container_ip} is not "
f"stopped:{constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT}",
bold=False)


@click.argument('max_workers', default=10, type=int)
@click.argument('log_file', default="docker_statsmanager.log", type=str)
@click.argument('log_dir', default="/var/log/csle", type=str)
Expand Down Expand Up @@ -1405,6 +1518,7 @@ def start_shell_complete(ctx, param, incomplete) -> List[str]:

@click.option('--ip', default="", type=str)
@click.option('--container_ip', default="", type=str)
@click.option('--initial_start', default=False, type=bool)
@click.option('--id', default=None, type=int)
@click.option('--no_clients', is_flag=True, help='skip starting the client population')
@click.option('--no_traffic', is_flag=True, help='skip starting the traffic generators')
Expand All @@ -1417,9 +1531,9 @@ def start_shell_complete(ctx, param, incomplete) -> List[str]:
"| system_id_job | nginx | postgresql | docker | clustermanager | hostmanagers "
"| hostmanager | clientmanager | snortmanagers | snortmanager | elkmanager "
"| trafficmanagers | trafficmanager | kafkamanager | ossecmanagers | ossecmanager "
"| ryumanager")
"| ryumanager | filebeats | filebeat | metricbeats | metricbeat")
def start(entity: str, no_traffic: bool, name: str, id: int, no_clients: bool, no_network: bool, ip: str,
container_ip: str, no_beats: bool) -> None:
container_ip: str, no_beats: bool, initial_start: bool) -> None:
"""
Starts an entity, e.g., a container or the management system

Expand Down Expand Up @@ -1496,6 +1610,16 @@ def start(entity: str, no_traffic: bool, name: str, id: int, no_clients: bool, n
start_ossec_ids_manager(ip=ip, container_ip=container_ip, emulation=name, ip_first_octet=id)
elif entity == "ryumanager":
start_ryu_manager(ip=ip, emulation=name, ip_first_octet=id)
elif entity == "filebeats":
start_filebeats(ip=ip, emulation=name, ip_first_octet=id, initial_start=initial_start)
elif entity == "filebeat":
start_filebeat(ip=ip, container_ip=container_ip, emulation=name, ip_first_octet=id,
initial_start=initial_start)
elif entity == "metricbeats":
start_metricbeats(ip=ip, emulation=name, ip_first_octet=id, initial_start=initial_start)
elif entity == "metricbeat":
start_metricbeat(ip=ip, container_ip=container_ip, emulation=name, ip_first_octet=id,
initial_start=initial_start)
else:
container_started = False
for node in config.cluster_config.cluster_nodes:
Expand Down Expand Up @@ -1718,6 +1842,108 @@ def start_ryu_manager(ip: str, emulation: str, ip_first_octet: int):
bold=False)


def start_filebeats(ip: str, emulation: str, ip_first_octet: int, initial_start: bool):
"""
Utility function for starting filebeats

:param ip: the ip of the node to start filebeats
:param emulation: the emulation of the execution
:param ip_first_octet: the ID of the execution
:return: None
"""
import csle_common.constants.constants as constants
from csle_common.metastore.metastore_facade import MetastoreFacade
config = MetastoreFacade.get_config(id=1)
for node in config.cluster_config.cluster_nodes:
if node.ip == ip or ip == "":
operation_outcome = ClusterController.start_filebeats(
ip=ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT, emulation=emulation,
ip_first_octet=ip_first_octet, initial_start=initial_start)
if operation_outcome.outcome:
click.secho(f"Starting filebeats on port:{constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT}")
else:
click.secho(f"Filebeats are not started:{constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT}",
bold=False)


def start_filebeat(ip: str, container_ip: str, emulation: str, ip_first_octet: int, initial_start: bool):
"""
Utility function for starting filebeat

:param ip: the ip of the node to start filebeat
:param container_ip: the ip of the host to start
:param emulation: the emulation of the execution
:param ip_first_octet: the ID of the execution
:return: None
"""
import csle_common.constants.constants as constants
from csle_common.metastore.metastore_facade import MetastoreFacade
config = MetastoreFacade.get_config(id=1)
for node in config.cluster_config.cluster_nodes:
if node.ip == ip or ip == "":
operation_outcome = ClusterController.start_filebeat(
ip=ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT, emulation=emulation,
ip_first_octet=ip_first_octet, container_ip=container_ip, initial_start=initial_start)
if operation_outcome.outcome:
click.secho(f"Started filebeat with ip {container_ip} on "
f"port:{constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT}")
else:
click.secho(f"Filebeat with ip {container_ip} is not "
f"started:{constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT}",
bold=False)


def start_metricbeats(ip: str, emulation: str, ip_first_octet: int, initial_start: bool):
"""
Utility function for starting metricbeats

:param ip: the ip of the node to start metricbeats
:param emulation: the emulation of the execution
:param ip_first_octet: the ID of the execution
:return: None
"""
import csle_common.constants.constants as constants
from csle_common.metastore.metastore_facade import MetastoreFacade
config = MetastoreFacade.get_config(id=1)
for node in config.cluster_config.cluster_nodes:
if node.ip == ip or ip == "":
operation_outcome = ClusterController.start_metricbeats(
ip=ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT, emulation=emulation,
ip_first_octet=ip_first_octet, initial_start=initial_start)
if operation_outcome.outcome:
click.secho(f"Starting metricbeats on port:{constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT}")
else:
click.secho(f"Metricbeats are not started:{constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT}",
bold=False)


def start_metricbeat(ip: str, container_ip: str, emulation: str, ip_first_octet: int, initial_start: bool):
"""
Utility function for starting metricbeat

:param ip: the ip of the node to start metricbeat
:param container_ip: the ip of the host to start
:param emulation: the emulation of the execution
:param ip_first_octet: the ID of the execution
:return: None
"""
import csle_common.constants.constants as constants
from csle_common.metastore.metastore_facade import MetastoreFacade
config = MetastoreFacade.get_config(id=1)
for node in config.cluster_config.cluster_nodes:
if node.ip == ip or ip == "":
operation_outcome = ClusterController.start_metricbeat(
ip=ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT, emulation=emulation,
ip_first_octet=ip_first_octet, container_ip=container_ip, initial_start=initial_start)
if operation_outcome.outcome:
click.secho(f"Started metricbeat with ip {container_ip} on "
f"port:{constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT}")
else:
click.secho(f"Metricbeat with ip {container_ip} is not "
f"started:{constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT}",
bold=False)


def start_host_manager(ip: str, container_ip: str, emulation: str, ip_first_octet: int):
"""
Utility function for starting host manager
Expand Down Expand Up @@ -2278,7 +2504,7 @@ def ls_shell_complete(ctx, param, incomplete) -> List[str]:
"| node_exporter | cadvisor | pgadmin | statsmanager | flask | "
"simulations | emulation_executions | cluster | nginx | postgresql | docker | hostmanagers | "
"clientmanager | snortmanagers | elkmanager | trafficmanagers | kafkamanager | "
"ossecmanagers | ryumanager")
"ossecmanagers | ryumanager | filebeats | metricbeats")
@click.argument('entity', default='all', type=str, shell_complete=ls_shell_complete)
@click.option('--all', is_flag=True, help='list all')
@click.option('--running', is_flag=True, help='list running only (default)')
Expand Down Expand Up @@ -2359,6 +2585,10 @@ def ls(entity: str, all: bool, running: bool, stopped: bool, ip: str, name: str,
list_ossec_ids_managers(ip=ip, emulation=name, ip_first_octet=id)
elif entity == "ryumanager":
list_ryu_manager(ip=ip, emulation=name, ip_first_octet=id)
elif entity == "filebeats":
list_filebeats(ip=ip, emulation=name, ip_first_octet=id)
elif entity == "metricbeats":
list_metricbeats(ip=ip, emulation=name, ip_first_octet=id)
else:
container = get_running_container(name=entity)
if container is not None:
Expand Down Expand Up @@ -2393,6 +2623,70 @@ def ls(entity: str, all: bool, running: bool, stopped: bool, ip: str, name: str,
click.secho(f"entity: {entity} is not recognized", fg="red", bold=True)


def list_filebeats(ip: str, emulation: str, ip_first_octet: int) -> None:
"""
Utility function for listing filebeats

:param ip: the ip of the node to list filebeats
:param emulation: the emulation of the execution
:param ip_first_octet: the ID of the execution

:return: None
"""
import csle_common.constants.constants as constants
from csle_common.metastore.metastore_facade import MetastoreFacade
config = MetastoreFacade.get_config(id=1)
for node in config.cluster_config.cluster_nodes:
if node.ip == ip or ip == "":
filebeats_info = ClusterController.get_host_managers_info(
ip=ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT, emulation=emulation,
ip_first_octet=ip_first_octet)
click.secho('+' + '-' * 60 + '+', fg='white')
click.secho(f'|{"Host IP":^30}|{"Filebeats running Status":^29}|', fg='white')
click.secho('+' + '-' * 60 + '+', fg='white')
for i in range(len(filebeats_info.hostManagersStatuses)):
status = "Running" if filebeats_info.hostManagersStatuses[i].filebeat_running else "Stopped"
status_color = 'green' if filebeats_info.hostManagersStatuses[i].filebeat_running else 'red'
click.secho('|', nl=False, fg='white')
click.secho(f'{filebeats_info.ips[i]:<30}', nl=False, fg='white')
click.secho('|', nl=False, fg='white')
click.secho(f'{status:^29}', nl=False, fg=status_color)
click.secho('|', fg='white')
click.secho('+' + '-' * 60 + '+', fg='white')


def list_metricbeats(ip: str, emulation: str, ip_first_octet: int) -> None:
"""
Utility function for listing filebeats

:param ip: the ip of the node to list filebeats
:param emulation: the emulation of the execution
:param ip_first_octet: the ID of the execution

:return: None
"""
import csle_common.constants.constants as constants
from csle_common.metastore.metastore_facade import MetastoreFacade
config = MetastoreFacade.get_config(id=1)
for node in config.cluster_config.cluster_nodes:
if node.ip == ip or ip == "":
metricbeats_info = ClusterController.get_host_managers_info(
ip=ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT, emulation=emulation,
ip_first_octet=ip_first_octet)
click.secho('+' + '-' * 60 + '+', fg='white')
click.secho(f'|{"Host IP":^30}|{"Metricbeats running Status":^29}|', fg='white')
click.secho('+' + '-' * 60 + '+', fg='white')
for i in range(len(metricbeats_info.hostManagersStatuses)):
status = "Running" if metricbeats_info.hostManagersStatuses[i].metricbeat_running else "Stopped"
status_color = 'green' if metricbeats_info.hostManagersStatuses[i].metricbeat_running else 'red'
click.secho('|', nl=False, fg='white')
click.secho(f'{metricbeats_info.ips[i]:<30}', nl=False, fg='white')
click.secho('|', nl=False, fg='white')
click.secho(f'{status:^29}', nl=False, fg=status_color)
click.secho('|', fg='white')
click.secho('+' + '-' * 60 + '+', fg='white')


def list_ryu_manager(ip: str, emulation: str, ip_first_octet: int) -> None:
"""
Utility function for listing ryu manager
Expand Down