Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 77 additions & 84 deletions prometheus_client/parser.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from __future__ import unicode_literals

import re

try:
import StringIO
except ImportError:
Expand All @@ -19,97 +21,88 @@ def text_string_to_metric_families(text):
for metric_family in text_fd_to_metric_families(StringIO.StringIO(text)):
yield metric_family


ESCAPE_SEQUENCES = {
'\\\\': '\\',
'\\n': '\n',
'\\"': '"',
}


def replace_escape_sequence(match):
return ESCAPE_SEQUENCES[match.group(0)]


HELP_ESCAPING_RE = re.compile(r'\\[\\n]')
ESCAPING_RE = re.compile(r'\\[\\n"]')


def _replace_help_escaping(s):
return s.replace("\\n", "\n").replace('\\\\', '\\')
return HELP_ESCAPING_RE.sub(replace_escape_sequence, s)


def _replace_escaping(s):
return s.replace("\\n", "\n").replace('\\\\', '\\').replace('\\"', '"')
return ESCAPING_RE.sub(replace_escape_sequence, s)


LABEL_AND_VALUE_RE = re.compile(
r"""
\s* # - skip initial whitespace
([^=\s]+) # - label name
\s*=\s* # - equal sign ignoring all whitespace around it
"( # - open label value
[^"\\]* # - match any number of non-special characters
(?:(\\.)+[^"\\]*)* # - match 1+ slash-escaped chars followed by any
# number of non-special chars
)" # - close label value
\s* # - skip whitespace
(?:,|$) # - end on a comma or end of string
""",
re.VERBOSE,
)


def _parse_labels(labels_string):
labels = {}
# Return if we don't have valid labels
if "=" not in labels_string:
return labels

escaping = False
if "\\" in labels_string:
escaping = True

# Copy original labels
sub_labels = labels_string
try:
# Process one label at a time
while sub_labels:
# The label name is before the equal
value_start = sub_labels.index("=")
label_name = sub_labels[:value_start]
sub_labels = sub_labels[value_start + 1:].lstrip()
# Find the first quote after the equal
quote_start = sub_labels.index('"') + 1
value_substr = sub_labels[quote_start:]

# Find the last unescaped quote
i = 0
while i < len(value_substr):
i = value_substr.index('"', i)
if value_substr[i - 1] != "\\":
break
i += 1

# The label value is inbetween the first and last quote
quote_end = i + 1
label_value = sub_labels[quote_start:quote_end]
# Replace escaping if needed
if escaping:
label_value = _replace_escaping(label_value)
labels[label_name.strip()] = label_value.strip()

# Remove the processed label from the sub-slice for next iteration
sub_labels = sub_labels[quote_end + 1:]
next_comma = sub_labels.find(",") + 1
sub_labels = sub_labels[next_comma:].lstrip()

return labels

except ValueError:
raise ValueError("Invalid labels: %s" % labels_string)


# If we have multiple values only consider the first
def _parse_value(s):
s = s.lstrip()
separator = " "
if separator not in s:
separator = "\t"
i = s.find(separator)
if i == -1:
return s
return s[:i]


def _parse_sample(text):
# Detect the labels in the text
try:
label_start, label_end = text.index("{"), text.rindex("}")
# The name is before the labels
name = text[:label_start].strip()
# We ignore the starting curly brace
label = text[label_start + 1:label_end]
# The value is after the label end (ignoring curly brace and space)
value = float(_parse_value(text[label_end + 2:]))
return name, _parse_labels(label), value

# We don't have labels
except ValueError:
# Detect what separator is used
separator = " "
if separator not in text:
separator = "\t"
name_end = text.index(separator)
name = text[:name_end]
# The value is after the name
value = float(_parse_value(text[name_end:]))
return name, {}, value
pos = 0
labels_string_len = len(labels_string)
while pos < labels_string_len:
m = LABEL_AND_VALUE_RE.match(labels_string, pos=pos)
try:
label_name, label_value, escaped_chars = m.groups()
except AttributeError:
if m is None:
remaining = labels_string[pos:].strip()
# One trailing comma is consumed by LABEL_AND_VALUE_RE, so the
# remaining string should always be whitespace-only unless there
# were no matches.
comma_is_allowed = pos == 0
if not remaining or (comma_is_allowed and remaining == ','):
return labels
raise ValueError("Invalid labels: %s" % labels_string)
if escaped_chars is not None:
label_value = _replace_escaping(label_value)
labels[label_name] = label_value
pos = m.end()
return labels


SAMPLE_RE = re.compile("""
\s* # skip initial whitespace
([^{\s]+) # metric name: all chars except braces and spaces
(?:\s*{(.*)})? # optional labels with optional whitespace in front
\s+(\S+) # value
""", re.VERBOSE)


def _parse_sample(text, match=SAMPLE_RE.match):
m = match(text)
if m is None:
raise ValueError('Invalid sample string: %s' % text)
name, labels, value = m.groups()
parsed_labels = _parse_labels(labels) if labels is not None else {}
return name, parsed_labels, float(value)


def text_fd_to_metric_families(fd):
Expand Down
63 changes: 62 additions & 1 deletion tests/test_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,32 @@ def test_commas(self):
families = text_string_to_metric_families("""# TYPE a counter
# HELP a help
a{foo="bar",} 1
a{foo="baz", } 1
# TYPE b counter
# HELP b help
b{,} 2
# TYPE c counter
# HELP c help
c{ ,} 3
# TYPE d counter
# HELP d help
d{, } 4
""")
a = CounterMetricFamily("a", "help", labels=["foo"])
a.add_metric(["bar"], 1)
a.add_metric(["baz"], 1)
b = CounterMetricFamily("b", "help", value=2)
self.assertEqual([a, b], list(families))
c = CounterMetricFamily("c", "help", value=3)
d = CounterMetricFamily("d", "help", value=4)
self.assertEqual([a, b, c, d], list(families))

def test_multiple_trailing_commas(self):
text = """# TYPE a counter
# HELP a help
a{foo="bar",, } 1
"""
self.assertRaises(ValueError,
lambda: list(text_string_to_metric_families(text)))

def test_empty_brackets(self):
families = text_string_to_metric_families("""# TYPE a counter
Expand All @@ -200,6 +218,49 @@ def test_empty_label(self):
metric_family.add_metric([""], 2)
self.assertEqual([metric_family], list(families))

def test_label_escaping(self):
for escaped_val, etalon_val in [
('foo', 'foo'),
('\\foo', '\\foo'),
('\\\\foo', '\\foo'),
('foo\\\\', 'foo\\'),
('\\n', '\n'),
('\\\\n', '\\n'),
('\\\\\\n', '\\\n'),
('\\"', '"'),
('\\\\\\"', '\\"')]:
families = list(text_string_to_metric_families("""
# TYPE a counter
# HELP a help
a{foo="%s",bar="baz"} 1
""" % escaped_val))
metric_family = CounterMetricFamily(
"a", "help", labels=["foo", "bar"])
metric_family.add_metric([etalon_val, "baz"], 1)
self.assertEqual([metric_family], list(families))

def test_help_escaping(self):
for escaped_val, etalon_val in [
('foo', 'foo'),
('\\foo', '\\foo'),
('\\\\foo', '\\foo'),
('foo\\', 'foo\\'),
('foo\\\\', 'foo\\'),
('\\n', '\n'),
('\\\\n', '\\n'),
('\\\\\\n', '\\\n'),
('\\"', '\\"'),
('\\\\"', '\\"'),
('\\\\\\"', '\\\\"')]:
families = list(text_string_to_metric_families("""
# TYPE a counter
# HELP a %s
a{foo="bar"} 1
""" % escaped_val))
metric_family = CounterMetricFamily("a", etalon_val, labels=["foo"])
metric_family.add_metric(["bar"], 1)
self.assertEqual([metric_family], list(families))

def test_escaping(self):
families = text_string_to_metric_families("""# TYPE a counter
# HELP a he\\n\\\\l\\tp
Expand Down