-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathparser.py
More file actions
222 lines (192 loc) · 8.83 KB
/
Copy pathparser.py
File metadata and controls
222 lines (192 loc) · 8.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
import re
import json
from dataclasses import dataclass
from datetime import datetime
from typing import Optional, Tuple
# Compiled regex patterns for performance
_BRACKET_LEVEL_PATTERN = re.compile(
r'\[(TRACE|DEBUG|INFO|NOTICE|WARN|WARNING|ERROR|ERR|CRITICAL|ALERT|FATAL|EMERGENCY)\]',
re.IGNORECASE
)
_BRACKETLESS_LEVEL_PATTERN = re.compile(
r'\b(TRACE|DEBUG|INFO|NOTICE|WARN|WARNING|ERROR|ERR|CRITICAL|ALERT|FATAL|EMERGENCY)\b',
re.IGNORECASE
)
# Multiple timestamp patterns for different log formats
_TIMESTAMP_PATTERNS = [
# ISO 8601: 2026-03-21T10:00:00Z or 2026-03-21T10:00:00.123Z or 2026-03-21T10:00:00+00:00
re.compile(r'(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?(?:Z|[+-]\d{2}:?\d{2})?)'),
# ISO-like with space: 2026-03-21 10:00:00 or 2026-03-21 10:00:00.123
re.compile(r'(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}(?:\.\d+)?)'),
# Common Log Format / Apache: 21/Mar/2026:10:00:00 +0000 or [21/Mar/2026:10:00:00 +0000]
re.compile(r'(\d{2}/[A-Za-z]{3}/\d{4}:\d{2}:\d{2}:\d{2}(?:\s+[+-]\d{4})?)'),
# Syslog-style: Mar 21 10:00:00 (year is assumed current year)
re.compile(r'([A-Za-z]{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})'),
# Unix timestamp: 1711054800 (10 digits for seconds)
re.compile(r'\b(\d{10})\b'),
]
# Month name mapping for parsing
_MONTH_MAP = {
'Jan': 1, 'Feb': 2, 'Mar': 3, 'Apr': 4, 'May': 5, 'Jun': 6,
'Jul': 7, 'Aug': 8, 'Sep': 9, 'Oct': 10, 'Nov': 11, 'Dec': 12
}
@dataclass
class LogEntry:
level: str
message: str
raw: str
timestamp: Optional[datetime] = None
service: Optional[str] = None
trace_id: Optional[str] = None
span_id: Optional[str] = None
# Level normalization constants
_NORMALIZE_LEVEL_MAP = {
"WARNING": "WARN",
"EMERGENCY": "FATAL",
"ERR": "ERROR",
}
_JSON_LEVEL_KEYS = ("level", "severity", "log.level", "severity_text", "severityText")
_JSON_MESSAGE_KEYS = ("message", "msg", "text", "body", "log")
_JSON_TIMESTAMP_KEYS = ("timestamp", "time", "@timestamp")
_MISSING = object()
def _normalize_level(level: str) -> str:
"""Normalize log level aliases to canonical forms."""
return _NORMALIZE_LEVEL_MAP.get(level.upper(), level.upper())
def _first_json_value(data: dict, keys: Tuple[str, ...]):
"""Return the first present JSON value from a list of common log field names."""
for key in keys:
if key in data:
return data[key]
return _MISSING
def _stringify_json_message(value, raw_line: str) -> str:
"""Convert JSON message-like values to stable display text."""
if value is _MISSING:
return raw_line
if isinstance(value, (dict, list)):
return json.dumps(value, sort_keys=True)
return str(value).rstrip("\r\n")
def _extract_json_observability(data: dict) -> Tuple[Optional[str], Optional[str], Optional[str]]:
"""Pull service / trace / span from common JSON log shapes (K8s, OTel, Docker)."""
k8s = data.get("kubernetes")
k8s_d: dict = k8s if isinstance(k8s, dict) else {}
pod_name = k8s_d.get("pod_name")
if not pod_name and isinstance(k8s_d.get("pod"), dict):
pod_name = k8s_d["pod"].get("name")
resource = data.get("resource")
resource_d: dict = resource if isinstance(resource, dict) else {}
resource_attrs = resource_d.get("attributes")
resource_attrs_d: dict = resource_attrs if isinstance(resource_attrs, dict) else {}
service = (
data.get("service")
or data.get("service.name")
or data.get("service_name")
or data.get("resource.attributes.service.name")
or resource_attrs_d.get("service.name")
or resource_attrs_d.get("service_name")
or pod_name
or k8s_d.get("container_name")
or data.get("container")
or data.get("container.name")
or data.get("logger")
or data.get("logger.name")
)
if service is not None:
service = str(service)
trace_id = data.get("trace_id") or data.get("traceId") or data.get("trace.id")
if not trace_id and isinstance(data.get("trace"), dict):
trace_id = data["trace"].get("id")
if not trace_id and isinstance(data.get("otelTraceID"), str):
trace_id = data["otelTraceID"]
if trace_id is not None:
trace_id = str(trace_id)
span_id = data.get("span_id") or data.get("spanId") or data.get("span.id")
if span_id is not None:
span_id = str(span_id)
return service, trace_id, span_id
def parse_line(line: str) -> LogEntry:
"""Parse a single line of log and extract severity level."""
line = line.strip()
# 1. Check if JSON log object (common in docker/kubernetes/modern APIs)
if line.startswith('{') and line.endswith('}'):
try:
data = json.loads(line)
level_value = _first_json_value(data, _JSON_LEVEL_KEYS)
message = _stringify_json_message(_first_json_value(data, _JSON_MESSAGE_KEYS), line)
if level_value is _MISSING:
inner_entry = parse_line(message) if message != line else None
level = inner_entry.level if inner_entry and inner_entry.level != "UNKNOWN" else "UNKNOWN"
if inner_entry and inner_entry.level != "UNKNOWN":
message = inner_entry.message
else:
level = _normalize_level(str(level_value))
# Find timestamp
timestamp_str = _first_json_value(data, _JSON_TIMESTAMP_KEYS)
timestamp = None
if timestamp_str is not _MISSING and timestamp_str:
try:
# Basic ISO parsing
timestamp = datetime.fromisoformat(str(timestamp_str).replace('Z', '+00:00'))
except ValueError:
pass
svc, tid, sid = _extract_json_observability(data)
return LogEntry(
level=level,
message=message,
raw=line,
timestamp=timestamp,
service=svc,
trace_id=tid,
span_id=sid,
)
except json.JSONDecodeError:
pass
# 2. Try typical log formats like [INFO], (WARN), ERROR:
match = _BRACKET_LEVEL_PATTERN.search(line)
if not match:
# Try finding without brackets as a fallback, e.g. "INFO:" or "INFO - "
match = _BRACKETLESS_LEVEL_PATTERN.search(line)
if match:
level = _normalize_level(match.group(1))
# Remove the [LEVEL] part from the message for cleaner display
message = line.replace(match.group(0), '', 1).strip()
# Clean up common separators left behind like ": " or "- "
if message.startswith(':') or message.startswith('-'):
message = message[1:].strip()
return LogEntry(level=level, message=message, raw=line, timestamp=extract_timestamp(line))
# fallback
return LogEntry(level="UNKNOWN", message=line, raw=line, timestamp=extract_timestamp(line))
def extract_timestamp(text: str) -> Optional[datetime]:
"""Extract a timestamp from a raw string using multiple format patterns."""
for pattern in _TIMESTAMP_PATTERNS:
match = pattern.search(text)
if match:
ts_str = match.group(1)
try:
# Try ISO format first (handles most cases)
if '-' in ts_str and ('T' in ts_str or ts_str[10:11] == ' '):
# Handle ISO-like with space instead of T
return datetime.fromisoformat(ts_str.replace('Z', '+00:00').replace(' ', 'T'))
# Handle Common Log Format: 21/Mar/2026:10:00:00 +0000
elif '/' in ts_str:
parts = ts_str.split()
main_part = parts[0]
# Parse: DD/Mon/YYYY:HH:MM:SS
match_parts = re.match(r'(\d{2})/([A-Za-z]{3})/(\d{4}):(\d{2}):(\d{2}):(\d{2})', main_part)
if match_parts:
day, month_str, year, hour, minute, second = match_parts.groups()
month = _MONTH_MAP.get(month_str, 1)
return datetime(int(year), month, int(day), int(hour), int(minute), int(second))
# Handle Syslog-style: Mar 21 10:00:00
elif ts_str[0].isalpha():
match_parts = re.match(r'([A-Za-z]{3})\s+(\d{1,2})\s+(\d{2}):(\d{2}):(\d{2})', ts_str)
if match_parts:
month_str, day, hour, minute, second = match_parts.groups()
month = _MONTH_MAP.get(month_str, 1)
year = datetime.now().year # Assume current year
return datetime(year, month, int(day), int(hour), int(minute), int(second))
# Handle Unix timestamp
elif ts_str.isdigit():
return datetime.fromtimestamp(int(ts_str))
except (ValueError, OSError):
continue
return None