diff --git a/setup.cfg b/setup.cfg index 9354d32..67e2eaf 100644 --- a/setup.cfg +++ b/setup.cfg @@ -7,6 +7,8 @@ description = Detection Rules Optimisation Integration Deployment long_description = file: README.md long_description_content_type = text/markdown url = https://github.com/certeu/droid +repository = "https://github.com/certeu/droid" +license = "License :: OSI Approved :: European Union Public Licence 1.2 (EUPL 1.2)" [options] packages=find: diff --git a/src/droid/__main__.py b/src/droid/__main__.py index b5843d3..b2e93a3 100644 --- a/src/droid/__main__.py +++ b/src/droid/__main__.py @@ -21,16 +21,16 @@ def init_argparse() -> argparse.ArgumentParser: """Initialise the argument parsers - Creates an instance of `argparse.ArgumentParser` and configures it + Creates an instance of "argparse.ArgumentParser" and configures it with the DROID arguments. Returns: - An instance of `argparse.ArgumentParser` configured with the specified + An instance of "argparse.ArgumentParser" configured with the specified command line arguments. """ parser = argparse.ArgumentParser( - prog='droid', - description='Detection Rules Optimization Integration Deployment', + prog="droid", + description="Detection Rules Optimization Integration Deployment", ) parser.add_argument("-v", "--validate", help="Validate the rules", action="store_true") parser.add_argument("-r", "--rules", help="Rules path", required=True) @@ -39,10 +39,10 @@ def init_argparse() -> argparse.ArgumentParser: parser.add_argument("-cf", "--config-file", help="DROID configuration file path") parser.add_argument("-d", "--debug", help="Enable debugging", action="store_true") parser.add_argument("-e", "--export", help="Export the rules", action="store_true") - parser.add_argument("-p", "--platform", help="Platform target", choices=['splunk', 'azure', 'microsoft_defender', 'esql', 'eql']) - parser.add_argument("-sm", "--sentinel-mde", help="Use Sentinel as backend for MDE", action="store_true") - parser.add_argument("-u", "--update", help="Update from source", choices=['sigmahq-core']) - parser.add_argument("-l", "--list", help="List items from rules", choices=['unique_fields', 'pipelines']) + parser.add_argument("-p", "--platform", help="Platform target", choices=["splunk", "microsoft_sentinel", "microsoft_xdr", "esql", "eql"]) + parser.add_argument("-sx", "--sentinel-xdr", help="Use Microsoft Sentinel as a search head for Microsoft XDR", action="store_true") + parser.add_argument("-u", "--update", help="Update from source", choices=["sigmahq-core"]) + parser.add_argument("-l", "--list", help="List items from rules", choices=["unique_fields", "pipelines"]) parser.add_argument("-m", "--mssp", help="Enable MSSP mode", action="store_true") parser.add_argument("-mo", "--module", help="Module mode to return converted rules as a list", action="store_true") parser.add_argument("-j", "--json", help="Drop a JSON log file", action="store_true") @@ -85,19 +85,19 @@ def is_raw_rule(args, base_config): else: return False if ( - (args.platform in ['splunk', 'azure']) and + (args.platform in ["splunk", "microsoft_sentinel"]) and (raw_rule_folder_name in args.rules and args.platform in args.rules) ): return True - elif args.platform in ['esql', 'eql'] and raw_rule_folder_name in args.rules: + elif args.platform in ["esql", "eql"] and raw_rule_folder_name in args.rules: return True - elif args.platform in ['esql', 'eql']: + elif args.platform in ["esql", "eql"]: return False - elif args.platform == 'microsoft_defender' and raw_rule_folder_name in args.rules: + elif args.platform == "microsoft_xdr" and raw_rule_folder_name in args.rules: return True elif ( - args.platform in ['splunk', 'azure'] or - (args.platform == 'microsoft_defender') + args.platform in ["splunk", "microsoft_sentinel"] or + (args.platform == "microsoft_xdr") ): return False @@ -128,7 +128,7 @@ def droid_platform_config(args, config_path): if (args.convert or args.export) and not args.platform: exit("Please select one target platform. Use --help") - if args.platform == 'splunk': + if args.platform == "splunk": try: with open(config_path) as file_obj: content = file_obj.read() @@ -138,42 +138,43 @@ def droid_platform_config(args, config_path): raise Exception(f"Something unexpected happened: {e}") if args.export or args.search or args.integrity: - if environ.get('DROID_SPLUNK_USER'): - splunk_user = environ.get('DROID_SPLUNK_USER') + if environ.get("DROID_SPLUNK_USER"): + splunk_user = environ.get("DROID_SPLUNK_USER") config_splunk["user"] = splunk_user else: raise Exception("Please use: export DROID_SPLUNK_USER=") - if environ.get('DROID_SPLUNK_PASSWORD'): - splunk_password = environ.get('DROID_SPLUNK_PASSWORD') + if environ.get("DROID_SPLUNK_PASSWORD"): + splunk_password = environ.get("DROID_SPLUNK_PASSWORD") config_splunk["password"] = splunk_password else: raise Exception("Please use: export DROID_SPLUNK_PASSWORD=") # Replace Splunk url if env available - if environ.get('DROID_SPLUNK_URL'): - config_splunk['url'] = environ.get('DROID_SPLUNK_URL') + if environ.get("DROID_SPLUNK_URL"): + config_splunk["url"] = environ.get("DROID_SPLUNK_URL") # Replace Splunk webhook url if env available - if environ.get('DROID_SPLUNK_WEBHOOK_URL'): - config_splunk['action']['action.webhook.param.url'] = environ.get('DROID_SPLUNK_WEBHOOK_URL') + if environ.get("DROID_SPLUNK_WEBHOOK_URL"): + config_splunk["action"]["action.webhook.param.url"] = environ.get("DROID_SPLUNK_WEBHOOK_URL") return config_splunk - if args.platform == 'azure' or args.platform == 'microsoft_defender': + if args.platform == "microsoft_sentinel" or args.platform == "microsoft_xdr": try: with open(config_path) as file_obj: content = file_obj.read() config_data = tomllib.loads(content) - if args.platform == 'microsoft_defender' and args.sentinel_mde: - # With Azure Sentinel as backend, loads config from azure but keep MDE pipelines - config = config_data["platforms"]["azure"] - config["pipelines"] = config_data["platforms"]["microsoft_defender"]["pipelines"] + if args.platform == "microsoft_xdr" and args.sentinel_xdr: + + # With Microsoft Sentinel as backend, loads config from microsoft_sentinel but keep Microsoft XDR pipelines + config = config_data["platforms"]["microsoft_sentinel"] + config["pipelines"] = config_data["platforms"]["microsoft_xdr"]["pipelines"] else: config = config_data["platforms"][args.platform] # Replace workspace id and workspace name if env available - if environ.get('DROID_AZURE_WORKSPACE_ID'): - config['workspace_id'] = environ.get('DROID_AZURE_WORKSPACE_ID') - if environ.get('DROID_AZURE_WORKSPACE_NAME'): - config['workspace_name'] = environ.get('DROID_AZURE_WORKSPACE_NAME') + if environ.get("DROID_AZURE_WORKSPACE_ID"): + config["workspace_id"] = environ.get("DROID_AZURE_WORKSPACE_ID") + if environ.get("DROID_AZURE_WORKSPACE_NAME"): + config["workspace_name"] = environ.get("DROID_AZURE_WORKSPACE_NAME") except Exception: raise Exception("Something unexpected happened...") @@ -181,47 +182,47 @@ def droid_platform_config(args, config_path): if config["search_auth"] == "app" and not "credential_file" in config: - if environ.get('DROID_AZURE_TENANT_ID'): - tenant_id = environ.get('DROID_AZURE_TENANT_ID') + if environ.get("DROID_AZURE_TENANT_ID"): + tenant_id = environ.get("DROID_AZURE_TENANT_ID") config["tenant_id"] = tenant_id else: raise Exception("Please use: export DROID_AZURE_TENANT_ID=") - if environ.get('DROID_AZURE_CLIENT_ID'): - client_id = environ.get('DROID_AZURE_CLIENT_ID') + if environ.get("DROID_AZURE_CLIENT_ID"): + client_id = environ.get("DROID_AZURE_CLIENT_ID") config["client_id"] = client_id else: raise Exception("Please use: export DROID_AZURE_CLIENT_ID=") - if environ.get('DROID_AZURE_CLIENT_SECRET'): - client_secret = environ.get('DROID_AZURE_CLIENT_SECRET') + if environ.get("DROID_AZURE_CLIENT_SECRET"): + client_secret = environ.get("DROID_AZURE_CLIENT_SECRET") config["client_secret"] = client_secret else: raise Exception("Please use: export DROID_AZURE_CLIENT_SECRET=") elif config["export_auth"] == "app" and args.export and not "credential_file" in config: - if environ.get('DROID_AZURE_TENANT_ID'): - tenant_id = environ.get('DROID_AZURE_TENANT_ID') + if environ.get("DROID_AZURE_TENANT_ID"): + tenant_id = environ.get("DROID_AZURE_TENANT_ID") config["tenant_id"] = tenant_id else: raise Exception("Please use: export DROID_AZURE_TENANT_ID=") - if environ.get('DROID_AZURE_CLIENT_ID'): - client_id = environ.get('DROID_AZURE_CLIENT_ID') + if environ.get("DROID_AZURE_CLIENT_ID"): + client_id = environ.get("DROID_AZURE_CLIENT_ID") config["client_id"] = client_id else: raise Exception("Please use: export DROID_AZURE_CLIENT_ID=") - if environ.get('DROID_AZURE_CLIENT_SECRET'): - client_secret = environ.get('DROID_AZURE_CLIENT_SECRET') + if environ.get("DROID_AZURE_CLIENT_SECRET"): + client_secret = environ.get("DROID_AZURE_CLIENT_SECRET") config["client_secret"] = client_secret else: raise Exception("Please use: export DROID_AZURE_CLIENT_SECRET=") return config - if args.platform in ['esql', 'eql']: + if args.platform in ["esql", "eql"]: try: with open(config_path) as file_obj: @@ -233,13 +234,13 @@ def droid_platform_config(args, config_path): if config_elastic["auth_method"] == "basic": if args.export or args.search or args.integrity: - if environ.get('DROID_ELASTIC_USERNAME'): - username = environ.get('DROID_ELASTIC_USERNAME') + if environ.get("DROID_ELASTIC_USERNAME"): + username = environ.get("DROID_ELASTIC_USERNAME") config_elastic["username"] = username else: raise Exception("Please use: export DROID_ELASTIC_USERNAME=") - if environ.get('DROID_ELASTIC_PASSWORD'): - password = environ.get('DROID_ELASTIC_PASSWORD') + if environ.get("DROID_ELASTIC_PASSWORD"): + password = environ.get("DROID_ELASTIC_PASSWORD") config_elastic["password"] = password else: raise Exception("Please use: export DROID_ELASTIC_PASSWORD=") @@ -356,36 +357,36 @@ def main(argv=None) -> None: base_config = droid_base_config(args, config_path) - if args.platform == 'splunk': + if args.platform == "splunk": if is_raw_rule(args, base_config): logger.info("Splunk raw rule selected") export_error = export_rule_raw(parameters, droid_platform_config(args, config_path), logger_param) else: export_error = convert_rules(parameters, droid_platform_config(args, config_path), base_config, logger_param) - elif args.platform == 'azure': + elif args.platform == "microsoft_sentinel": if is_raw_rule(args, base_config): - logger.info("Azure Sentinel raw rule selected") + logger.info("Microsoft Sentinel raw rule selected") export_error = export_rule_raw(parameters, droid_platform_config(args, config_path), logger_param) else: export_error = convert_rules(parameters, droid_platform_config(args, config_path), base_config, logger_param) - elif args.platform == 'microsoft_defender' and args.sentinel_mde: + elif args.platform == "microsoft_xdr" and args.sentinel_xdr: if is_raw_rule(args, base_config): - logger.info("Microsoft Defender for Endpoint raw rule selected") + logger.info("Microsoft XDR raw rule selected") export_error = export_rule_raw(parameters, droid_platform_config(args, config_path), logger_param) else: export_error = convert_rules(parameters, droid_platform_config(args, config_path), base_config, logger_param) - elif args.platform == "microsoft_defender": + elif args.platform == "microsoft_xdr": if is_raw_rule(args, base_config): logger.info("Microsoft XDR raw rule selected") export_error = export_rule_raw(parameters, droid_platform_config(args, config_path), logger_param) else: export_error = convert_rules(parameters, droid_platform_config(args, config_path), base_config, logger_param) - elif args.platform == 'esql' or args.platform == 'eql': - args.platform == 'elastic' + elif args.platform == "esql" or args.platform == "eql": + args.platform == "elastic" if is_raw_rule(args, base_config): logger.info("Elastic Security raw rule selected") export_error = export_rule_raw(parameters, droid_platform_config(args, config_path), logger_param) @@ -405,7 +406,7 @@ def main(argv=None) -> None: logger.info(f"Update mode was selected for source {args.update} - source selected: {args.rules}") - if parameters.update == 'sigmahq-core': + if parameters.update == "sigmahq-core": update_sigmahq_core(parameters) elif args.list: diff --git a/src/droid/color.py b/src/droid/color.py index 6ee68f3..815a608 100644 --- a/src/droid/color.py +++ b/src/droid/color.py @@ -65,7 +65,7 @@ def setup_handlers(self): json_file_handler = logging.FileHandler(self.log_file) json_file_handler.setFormatter(json_formatter) self.addHandler(json_file_handler) - + if self.json_stdout: stdout_formater = jsonlogger.JsonFormatter(format_str) else: diff --git a/src/droid/convert.py b/src/droid/convert.py index 96681c7..5dcced2 100644 --- a/src/droid/convert.py +++ b/src/droid/convert.py @@ -26,7 +26,7 @@ class Conversion: def __init__(self, parameters: dict, base_config, platform_name, logger_param) -> None: self.logger = ColorLogger(__name__, **logger_param) self._parameters = parameters["pipelines"] - self._filters_directory = base_config.get('sigma_filters_directory', None) + self._filters_directory = base_config.get("sigma_filters_directory", None) self._platform_name = platform_name def get_pipeline_config_group(self, rule_content): @@ -36,10 +36,10 @@ def get_pipeline_config_group(self, rule_content): Return: a str with the pipeline config group """ - sigma_logsource_fields = ['category', 'product', 'service'] + sigma_logsource_fields = ["category", "product", "service"] rule_logsource = {} - for key, value in rule_content['logsource'].items(): + for key, value in rule_content["logsource"].items(): if key in sigma_logsource_fields: rule_logsource[key] = value @@ -54,6 +54,14 @@ def get_pipeline_config_group(self, rule_content): return group_match + def ms_cloud_kusto(self) -> str | None: + """Function to select the right Kusto backend + + Return: + "kusto" if one of the mscloud platforms or None + """ + return "kusto" if self._platform_name in ["microsoft_sentinel", "microsoft_xdr"] else None + def init_sigma_filters(self, rule_file) -> None: """Function to load Sigma filters Args: @@ -89,25 +97,26 @@ def convert_rule(self, rule_content, rule_file, platform): pipeline_resolver = plugins.get_pipeline_resolver() pipeline_config_group = self.get_pipeline_config_group(rule_content) - if not self._platform_name in backends: - self.logger.error(f"{self._platform_name} backend not installed.") + backend_name = self.ms_cloud_kusto() or self._platform_name + if backend_name not in backends: + self.logger.error(f"{backend_name} backend not installed.") exit(1) # Pipeline config if pipeline_config_group: rule_supported = True - pipeline_config = self._parameters[pipeline_config_group]['pipelines'] + pipeline_config = self._parameters[pipeline_config_group]["pipelines"] # Format - if 'format' in self._parameters[pipeline_config_group]: - self._format = self._parameters[pipeline_config_group]['format'] + if "format" in self._parameters[pipeline_config_group]: + self._format = self._parameters[pipeline_config_group]["format"] else: self._format = "default" else: rule_supported = False if rule_supported: - backend_class = backends[self._platform_name] + backend_class = backends[self.ms_cloud_kusto() or self._platform_name] if pipeline_config: pipeline = pipeline_resolver.resolve(pipeline_config) else: @@ -125,11 +134,11 @@ def convert_rule(self, rule_content, rule_file, platform): def load_rule(rule_file): - with open(rule_file, 'r', encoding="utf-8") as stream: + with open(rule_file, "r", encoding="utf-8") as stream: try: object = list(yaml.safe_load_all(stream))[0] - if 'fields' in object: - object.pop('fields') + if "fields" in object: + object.pop("fields") # Here we remove the fields to avoid Sigma to arbitrary # convert the rule to {{ query }} | table field1,field2 # https://github.com/SigmaHQ/pySigma-backend-splunk/issues/27 @@ -145,7 +154,7 @@ def convert_sigma_rule(rule_file, parameters, logger, sigma_objects, target, pla logger.debug("processing rule {0}".format(rule_file)) rule_content = load_rule(rule_file) - sigma_objects[rule_content['title']] = rule_content + sigma_objects[rule_content["title"]] = rule_content error, search_warning = convert_sigma(parameters, logger, rule_content, rule_file, target, platform, error, search_warning, rules, logger_param) return error, search_warning @@ -170,15 +179,17 @@ def convert_rules(parameters, droid_config, base_config, logger_param): if parameters.platform and (parameters.search or parameters.export or parameters.integrity): platform_name = parameters.platform target = Conversion(droid_config, base_config, platform_name, logger_param) - if platform_name == 'splunk': + if platform_name == "splunk": platform = SplunkPlatform(droid_config, logger_param) - elif 'esql' in platform_name: + elif "esql" in platform_name: platform = ElasticPlatform(droid_config, logger_param, "esql", raw=False) - elif 'eql' in platform_name: + elif "eql" in platform_name: platform = ElasticPlatform(droid_config, logger_param, "eql", raw=False) - elif 'azure' in platform_name: + elif "microsoft_sentinel" in platform_name: + platform = SentinelPlatform(droid_config, logger_param) + elif "microsoft_xdr" in platform_name and parameters.sentinel_xdr: platform = SentinelPlatform(droid_config, logger_param) - elif parameters.platform == 'microsoft_defender': + elif "microsoft_xdr" in platform_name: platform = MicrosoftXDRPlatform(droid_config, logger_param) if path.is_dir(): diff --git a/src/droid/export.py b/src/droid/export.py index 0b795e4..fe0f5c4 100644 --- a/src/droid/export.py +++ b/src/droid/export.py @@ -17,18 +17,18 @@ def post_rule_content(rule_content): Return rule_content dict with post-processing """ - if environ.get('DROID_ENV_DEV') == 'True': - rule_content['title'] = "[DEV]" + " " + rule_content['title'] + if environ.get("DROID_ENV_DEV") == "True": + rule_content["title"] = "[DEV]" + " " + rule_content["title"] return rule_content def load_rule(rule_file): - with open(rule_file, 'r') as stream: + with open(rule_file, "r") as stream: try: object = list(yaml.safe_load_all(stream))[0] - if 'fields' in object: - object.pop('fields') + if "fields" in object: + object.pop("fields") # Here we remove the fields to avoid Sigma to arbitrary # convert the rule to {{ query }} | table field1,field2 return object @@ -48,7 +48,7 @@ def export_rule( rule_content = post_rule_content(rule_content) try: - if rule_content.get('custom', {}).get('removed', False): # If rule is set as removed + if rule_content.get("custom", {}).get("removed", False): # If rule is set as removed platform.remove_rule(rule_content, rule_converted, rule_file) else: platform.create_rule(rule_content, rule_converted, rule_file) @@ -68,13 +68,13 @@ def export_rule_raw(parameters: dict, export_config: dict, logger_param: dict): error = False - if parameters.platform == 'splunk': + if parameters.platform == "splunk": platform = SplunkPlatform(export_config, logger_param) - elif parameters.platform == 'azure': + elif parameters.platform == "microsoft_sentinel": platform = SentinelPlatform(export_config, logger_param) - elif parameters.platform == 'microsoft_defender': + elif parameters.platform == "microsoft_xdr": platform = MicrosoftXDRPlatform(export_config, logger_param) - elif parameters.platform == 'esql' or parameters.platform == 'eql': + elif parameters.platform == "esql" or parameters.platform == "eql": platform = ElasticPlatform(export_config, logger_param, parameters.platform, raw=True) if path.is_dir(): @@ -82,8 +82,8 @@ def export_rule_raw(parameters: dict, export_config: dict, logger_param: dict): for rule_file in path.rglob("*.y*ml"): rule_content = load_rule(rule_file) rule_content = post_rule_content(rule_content) - rule_converted = rule_content['detection'] - if rule_content.get('custom', {}).get('removed', False): # If rule is set as removed + rule_converted = rule_content["detection"] + if rule_content.get("custom", {}).get("removed", False): # If rule is set as removed try: platform.remove_rule(rule_content, rule_converted, rule_file) except: @@ -103,8 +103,8 @@ def export_rule_raw(parameters: dict, export_config: dict, logger_param: dict): rule_file = path rule_content = load_rule(rule_file) rule_content = post_rule_content(rule_content) - rule_converted = rule_content['detection'] - if rule_content.get('custom', {}).get('removed', False): # If rule is set as removed + rule_converted = rule_content["detection"] + if rule_content.get("custom", {}).get("removed", False): # If rule is set as removed try: platform.remove_rule(rule_content, rule_converted, rule_file) except: diff --git a/src/droid/integrity.py b/src/droid/integrity.py index 7e5ce25..edd40e4 100644 --- a/src/droid/integrity.py +++ b/src/droid/integrity.py @@ -13,7 +13,7 @@ def load_rule(rule_file): - with open(rule_file, 'r') as stream: + with open(rule_file, "r") as stream: try: object = list(yaml.safe_load_all(stream))[0] return object @@ -44,8 +44,8 @@ def integrity_rule_splunk(rule_converted, rule_content, platform: SplunkPlatform return error result = { - "description": saved_search['description'], - "search": saved_search['search'] + "description": saved_search["description"], + "search": saved_search["search"] } rule_content["detection"] = rule_converted @@ -62,9 +62,9 @@ def integrity_rule_splunk(rule_converted, rule_content, platform: SplunkPlatform error = True # Check if disabled - is_disabled = rule_content.get('custom', {}).get('disabled') + is_disabled = rule_content.get("custom", {}).get("disabled") - if saved_search['disabled'] == "0": + if saved_search["disabled"] == "0": is_enabled = True else: is_enabled = False @@ -105,9 +105,9 @@ def integrity_rule_sentinel(rule_converted, rule_content, platform: SentinelPlat return error result = { - "name": saved_search.name, - "description": saved_search.description, - "query": saved_search.query + "name": saved_search["name"], + "description": saved_search["description"], + "query": saved_search["query"] } rule_content["detection"] = rule_converted @@ -124,7 +124,7 @@ def integrity_rule_sentinel(rule_converted, rule_content, platform: SentinelPlat error = True # Check if disabled - is_disabled = rule_content.get('custom', {}).get('disabled') + is_disabled = rule_content.get("custom", {}).get("disabled") if is_disabled and not saved_search.enabled: logger.info("The rule is disabled as expected") @@ -157,7 +157,7 @@ def integrity_rule_ms_xdr(rule_converted, rule_content, platform: MicrosoftXDRPl return error result = { - "description": saved_search['detectionAction']['alertTemplate']['description'], + "description": saved_search["detectionAction"]["alertTemplate"]["description"], "query": saved_search["queryCondition"]["queryText"] } @@ -180,7 +180,7 @@ def integrity_rule_ms_xdr(rule_converted, rule_content, platform: MicrosoftXDRPl # Check if disabled - is_disabled = rule_content.get('custom', {}).get('disabled') + is_disabled = rule_content.get("custom", {}).get("disabled") if is_disabled and not saved_search["isEnabled"]: logger.info("The rule is disabled as expected") @@ -212,7 +212,7 @@ def integrity_rule_elastic(rule_converted, rule_content, platform: ElasticPlatfo return error if "metadata _id, _index, _version" not in rule_converted.lower() and "metadata _id, _index, _version" in saved_search["query"].lower(): - saved_search["query"] = saved_search["query"].replace(' METADATA _id, _index, _version', '') + saved_search["query"] = saved_search["query"].replace(" METADATA _id, _index, _version", "") result = { "name": saved_search["name"], @@ -266,16 +266,16 @@ def integrity_rule(parameters, rule_converted, rule_content, platform, rule_file rule_content = post_rule_content(rule_content) - if parameters.platform == 'splunk': + if parameters.platform == "splunk": error = integrity_rule_splunk(rule_converted, rule_content, platform, rule_file, parameters, logger, error) return error elif parameters.platform in ["esql", "eql"]: error = integrity_rule_elastic(rule_converted, rule_content, platform, rule_file, parameters, logger, error) return error - elif parameters.platform == 'microsoft_defender': # TODO: Add Integrity check for Microsoft 365 Defender + elif parameters.platform == "microsoft_xdr": error = integrity_rule_ms_xdr(rule_converted, rule_content, platform, rule_file, parameters, logger, error) return error - elif 'azure' in parameters.platform: + elif "microsoft_sentinel" in parameters.platform: error = integrity_rule_sentinel(rule_converted, rule_content, platform, rule_file, parameters, logger, error) return error @@ -285,21 +285,21 @@ def integrity_rule_raw(parameters: dict, export_config: dict, logger_param: dict logger = ColorLogger(__name__, **logger_param) path = Path(parameters.rules) - if parameters.platform == 'splunk': + if parameters.platform == "splunk": platform = SplunkPlatform(export_config, logger_param) - elif parameters.platform == 'azure': + elif parameters.platform == "microsoft_sentinel": platform = SentinelPlatform(export_config, logger_param) - elif parameters.platform == 'microsoft_defender': + elif parameters.platform == "microsoft_xdr": platform = MicrosoftXDRPlatform(export_config, logger_param) - elif parameters.platform == 'esql' or parameters.platform == 'eql': + elif parameters.platform == "esql" or parameters.platform == "eql": platform = ElasticPlatform(export_config, logger_param, parameters.platform, raw=True) if path.is_dir(): error_i = False for rule_file in path.rglob("*.y*ml"): rule_content = load_rule(rule_file) - rule_converted = rule_content['detection'] - error = integrity_rule(parameters, rule_converted, rule_content, platform, rule_file, error) + rule_converted = rule_content["detection"] + error = integrity_rule(parameters, rule_converted, rule_content, platform, rule_file, error, logger_param) if error: error_i = True if error_i: @@ -309,7 +309,7 @@ def integrity_rule_raw(parameters: dict, export_config: dict, logger_param: dict elif path.is_file(): rule_file = path rule_content = load_rule(rule_file) - rule_converted = rule_content['detection'] + rule_converted = rule_content["detection"] error = integrity_rule(parameters, rule_converted, rule_content, platform, rule_file, error) else: print(f"The path {path} is neither a directory nor a file.") diff --git a/src/droid/platforms/ms_xdr.py b/src/droid/platforms/ms_xdr.py index 5a2a56c..7cec198 100644 --- a/src/droid/platforms/ms_xdr.py +++ b/src/droid/platforms/ms_xdr.py @@ -13,7 +13,6 @@ from msal import ConfidentialClientApplication from azure.identity import DefaultAzureCredential - class MicrosoftXDRPlatform(AbstractPlatform): def __init__(self, parameters: dict, logger_param: dict) -> None: @@ -56,9 +55,14 @@ def __init__(self, parameters: dict, logger_param: dict) -> None: self._tenant_id = self._parameters["tenant_id"] self._client_id = self._parameters["client_id"] self._client_secret = self._parameters["client_secret"] + elif "default" in ( + self._parameters["search_auth"] or self._parameters["export_auth"] + ): + pass else: - # Default auth - self._tenant_id = self._parameters["tenant_id"] + raise Exception( + 'MicrosoftXDRPlatform: "search_auth" and "export_auth" parameters must be one of "default" or "app" or "credential_file".' + ) self._api_base_url = "https://graph.microsoft.com/beta" self._token = self.acquire_token() @@ -170,7 +174,6 @@ def remove_rule(self, rule_content, rule_converted, rule_file): def acquire_token(self): # MSAL configuration - authority = f"https://login.microsoftonline.com/{self._tenant_id}" scope = ["https://graph.microsoft.com/.default"] if self._parameters["search_auth"] == "default": @@ -182,6 +185,7 @@ def acquire_token(self): return token else: + authority = f"https://login.microsoftonline.com/{self._tenant_id}" # Create a confidential client application app = ConfidentialClientApplication( self._client_id, diff --git a/src/droid/platforms/sentinel.py b/src/droid/platforms/sentinel.py index be2c2cd..991de1b 100644 --- a/src/droid/platforms/sentinel.py +++ b/src/droid/platforms/sentinel.py @@ -31,40 +31,29 @@ def __init__(self, parameters: dict, logger_param: dict) -> None: self.logger = ColorLogger(__name__, **logger_param) - if 'threshold_operator' not in self._parameters: - raise Exception('SentinelPlatform: "threshold_operator" parameter is required.') - if 'threshold_value' not in self._parameters: - raise Exception('SentinelPlatform: "threshold_value" parameter is required.') - if 'suppress_status' not in self._parameters: - raise Exception('SentinelPlatform: "suppress_status" parameter is required.') - if 'incident_status' not in self._parameters: - raise Exception('SentinelPlatform: "incident_status" parameter is required.') - if 'grouping_reopen' not in self._parameters: - raise Exception('SentinelPlatform: "grouping_reopen" parameter is required.') - if 'grouping_status' not in self._parameters: - raise Exception('SentinelPlatform: "grouping_status" parameter is required.') - if 'grouping_period' not in self._parameters: - raise Exception('SentinelPlatform: "grouping_period" parameter is required.') - if 'grouping_method' not in self._parameters: - raise Exception('SentinelPlatform: "grouping_method" parameter is required.') - if 'suppress_period' not in self._parameters: - raise Exception('SentinelPlatform: "suppress_period" parameter is required.') - if 'query_frequency' not in self._parameters: - raise Exception('SentinelPlatform: "query_frequency" parameter is required.') - if 'query_period' not in self._parameters: - raise Exception('SentinelPlatform: "query_period" parameter is required.') - if 'subscription_id' not in self._parameters: - raise Exception('SentinelPlatform: "subscription_id" parameter is required.') - if 'resource_group' not in self._parameters: - raise Exception('SentinelPlatform: "resource_group" parameter is required.') - if 'workspace_id' not in self._parameters: - raise Exception('SentinelPlatform: "workspace_id" parameter is required.') - if 'workspace_name' not in self._parameters: - raise Exception('SentinelPlatform: "workspace_name" parameter is required.') - if 'days_ago' not in self._parameters: - raise Exception('SentinelPlatform: "days_ago" parameter is required.') - if 'timeout' not in self._parameters: - raise Exception('SentinelPlatform: "timeout" parameter is required.') + required_parameters = [ + "threshold_operator", + "threshold_value", + "suppress_status", + "incident_status", + "grouping_reopen", + "grouping_status", + "grouping_period", + "grouping_method", + "suppress_period", + "query_frequency", + "query_period", + "subscription_id", + "resource_group", + "workspace_id", + "workspace_name", + "days_ago", + "timeout" + ] + + for param in required_parameters: + if param not in self._parameters: + raise Exception(f'SentinelPlatform: "{param}" parameter is required.') self._workspace_id = self._parameters["workspace_id"] self._workspace_name = self._parameters["workspace_name"] @@ -346,13 +335,12 @@ def create_rule(self, rule_content, rule_converted, rule_file): alert_rule = ScheduledAlertRule( - #query='SecurityEvent | where EventID == "4688" and ((CommandLine contains " --adcs " and CommandLine contains " --port "))', query=rule_converted, description=rule_content['description'], display_name=display_name, severity=severity, - query_frequency=timedelta(hours=2), - query_period=timedelta(hours=2), + query_frequency=timedelta(hours=self._query_frequency), + query_period=timedelta(hours=self._query_period), trigger_operator=TriggerOperator(self._threshold_operator), trigger_threshold=self._threshold_value, enabled=enabled, diff --git a/src/droid/search.py b/src/droid/search.py index 06abb70..287a818 100644 --- a/src/droid/search.py +++ b/src/droid/search.py @@ -12,7 +12,7 @@ def load_rule(rule_file): - with open(rule_file, 'r') as stream: + with open(rule_file, "r") as stream: try: object = list(yaml.safe_load_all(stream))[0] return object @@ -27,9 +27,9 @@ def search_rule_splunk(rule_converted, platform: SplunkPlatform, rule_file, para result: dict = platform.run_splunk_search(rule_converted, rule_file) logger.info(f"Successfully searched the rule {rule_file}") - if result['resultCount'] > 0: # If the rule has match + if result["resultCount"] > 0: # If the rule has match job_url = result["jobUrl"] - logger.warning(f'(Splunk) Match found for {rule_file} - {job_url}') + logger.warning(f"(Splunk) Match found for {rule_file} - {job_url}") search_warning = True return error, search_warning else: @@ -49,7 +49,7 @@ def search_rule_sentinel(rule_converted, platform: SentinelPlatform, rule_file, logger.info(f"Successfully searched the rule {rule_file}") if result > 0: # If the rule has match - logger.warning(f'(Sentinel) Match found for {rule_file}') + logger.warning(f"(Sentinel) Match found for {rule_file}") search_warning = True return error, search_warning else: @@ -69,7 +69,7 @@ def search_rule_ms_xdr(rule_converted, platform: MicrosoftXDRPlatform, rule_file logger.info(f"Successfully searched the rule {rule_file}") if result > 0: # If the rule has match - logger.warning(f'{result} Matches found for {rule_file}') + logger.warning(f"{result} Matches found for {rule_file}") search_warning = True return error, search_warning else: @@ -89,7 +89,7 @@ def search_rule_elastic(rule_converted, platform: ElasticPlatform, rule_file, pa logger.info(f"Successfully searched the rule {rule_file}") if result > 0: # If the rule has match - logger.warning(f'(Elastic) {result} hits found for {rule_file}') + logger.warning(f"(Elastic) {result} hits found for {rule_file}") search_warning = True return error, search_warning else: @@ -108,11 +108,11 @@ def search_rule(parameters, rule_content, rule_converted, platform, rule_file, e error = False search_warning = False - if rule_content.get('custom', {}).get('ignore_search', False): + if rule_content.get("custom", {}).get("ignore_search", False): logger.warning(f"Search is ignored for {rule_file}") return error, search_warning - if parameters.platform == 'splunk': + if parameters.platform == "splunk": error, search_warning = search_rule_splunk(rule_converted, platform, rule_file, parameters, logger, error, search_warning) return error, search_warning elif parameters.platform in ["esql", "eql"]: @@ -120,16 +120,16 @@ def search_rule(parameters, rule_content, rule_converted, platform, rule_file, e parameters, logger, error, search_warning, rule_content, parameters.platform) return error, search_warning - elif parameters.platform == 'microsoft_defender': - error, search_warning = search_rule_ms_xdr(rule_converted, platform, rule_file, parameters, logger, error, search_warning) - return error, search_warning - elif 'azure' in parameters.platform: + elif parameters.platform == "microsoft_sentinel" or (parameters.sentinel_xdr and parameters.platform == "microsoft_xdr"): if parameters.mssp: error, search_warning = search_rule_sentinel(rule_converted, platform, rule_file, parameters, logger, error, search_warning, mssp_mode=True) return error, search_warning else: error, search_warning = search_rule_sentinel(rule_converted, platform, rule_file, parameters, logger, error, search_warning, mssp_mode=False) return error, search_warning + elif parameters.platform == "microsoft_xdr": + error, search_warning = search_rule_ms_xdr(rule_converted, platform, rule_file, parameters, logger, error, search_warning) + return error, search_warning def search_rule_raw(parameters: dict, export_config: dict, logger_param: dict): @@ -140,13 +140,15 @@ def search_rule_raw(parameters: dict, export_config: dict, logger_param: dict): path = Path(parameters.rules) - if parameters.platform == 'splunk': + if parameters.platform == "splunk": platform = SplunkPlatform(export_config, logger_param) - elif parameters.platform == 'azure': + elif parameters.platform == "microsoft_sentinel": + platform = SentinelPlatform(export_config, logger_param) + elif parameters.platform == "microsoft_sentinel" and parameters.sentinel_xdr: platform = SentinelPlatform(export_config, logger_param) - elif parameters.platform == 'microsoft_defender': + elif parameters.platform == "microsoft_xdr": platform = MicrosoftXDRPlatform(export_config, logger_param) - elif parameters.platform == 'esql' or parameters.platform == 'eql': + elif parameters.platform == "esql" or parameters.platform == "eql": platform = ElasticPlatform(export_config, logger_param, parameters.platform, raw=True) if path.is_dir(): @@ -154,7 +156,7 @@ def search_rule_raw(parameters: dict, export_config: dict, logger_param: dict): search_warning_i = False for rule_file in path.rglob("*.y*ml"): rule_content = load_rule(rule_file) - rule_converted = rule_content['detection'] + rule_converted = rule_content["detection"] error, search_warning = search_rule(parameters, rule_content, rule_converted, platform, rule_file, error, search_warning, logger_param) if error: error_i = True @@ -169,7 +171,7 @@ def search_rule_raw(parameters: dict, export_config: dict, logger_param: dict): elif path.is_file(): rule_file = path rule_content = load_rule(rule_file) - rule_converted = rule_content['detection'] + rule_converted = rule_content["detection"] error, search_warning = search_rule(parameters, rule_content, rule_converted, platform, rule_file, error, search_warning, logger_param) else: print(f"The path {path} is neither a directory nor a file.")