Skip to content

[FR] Add white space checking for KQL parse #3789

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion lib/kql/kql/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from kql.errors import KqlParseError
from .ast import * # noqa: F403
from .utils import check_whitespace, collect_token_positions


STRING_FIELDS = ("keyword", "text")
Expand Down Expand Up @@ -376,7 +377,14 @@ def lark_parse(text):
walker = BaseKqlParser(text)

try:
return lark_parser.parse(text)
tree = lark_parser.parse(text)

# Check for whitespace around "and" and "or" tokens
lines = text.split('\n')
check_whitespace(collect_token_positions(tree, "and"), 'and', lines)
check_whitespace(collect_token_positions(tree, "or"), 'or', lines)

return tree
except UnexpectedEOF:
raise KqlParseError("Unexpected EOF", len(walker.lines), len(walker.lines[-1].strip()), walker.lines[-1])
except LarkError as exc:
Expand Down
53 changes: 53 additions & 0 deletions lib/kql/kql/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.

import re

from lark import Token # noqa: F401
from lark import Tree

from typing import List
from kql.errors import KqlParseError


def check_whitespace(token_positions: List, token: str, lines: List[str]) -> None:
"""Check for whitespace around a token."""
for line_num, column in token_positions:
# Check the substring at the given position
line = lines[line_num - 1]
start = column - 1
end = column + len(token) - 1

# Handle cases where token starts at the beginning of the line and is followed by whitespace
if start == 0 and (end < len(line) and re.match(r"\s", line[end])):
continue

# Check for whitespace around the token
if (
start > 0
and (end < len(line) and re.match(r"\s", line[end]) or end == len(line))
and re.match(r"\s", line[start - 1])
):
continue
else:
raise KqlParseError(
error_msg=f"Missing whitespace around '{token}' token",
line=line_num,
column=column,
source=line,
width=len(token),
trailer=None
)


def collect_token_positions(tree: Tree, token: str) -> List:
"""Collect token positions from a tree."""
token_positions = []
for child in tree.children:
if isinstance(child, Token) and child.value.lower() in [token]:
token_positions.append((child.line, child.column))
elif isinstance(child, Tree):
token_positions.extend(collect_token_positions(child, token))
return token_positions
2 changes: 1 addition & 1 deletion lib/kql/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "detection-rules-kql"
version = "0.1.8"
version = "0.1.9"
description = "Kibana Query Language parser for Elastic Detection Rules"
license = {text = "Elastic License v2"}
keywords = ["Elastic", "sour", "Detection Rules", "Security", "Elasticsearch", "kql"]
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "detection_rules"
version = "1.3.9"
version = "1.3.10"
description = "Detection Rules is the home for rules used by Elastic Security. This repository is used for the development, maintenance, testing, validation, and release of rules for Elastic Security’s Detection Engine."
readme = "README.md"
requires-python = ">=3.12"
Expand Down
7 changes: 7 additions & 0 deletions tests/kuery/test_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,10 @@ def test_optimization(self):
"{'match': {'destination.ip': '169.254.169.254'}}]}}]}}"
)
self.assertEqual(dsl_str, good_case, "DSL string does not match the good case, optimization failed.")

def test_blank_space(self):
with self.assertRaises(kql.KqlParseError):
kql.lark_parse('"Test-ServiceDaclPermission" or"Update-ExeFunctions"')
kql.lark_parse('"Test-ServiceDaclPermission" or "Update-ExeFunctions"')
kql.lark_parse('"Test-ServiceDaclPermission" \nor "Update-ExeFunctions"')
kql.lark_parse('"Test-ServiceDaclPermission" or\n "Update-ExeFunctions"')
Loading