-
Notifications
You must be signed in to change notification settings - Fork 7.9k
/
Copy pathmr_s3_log_parser.py
137 lines (110 loc) · 4.39 KB
/
mr_s3_log_parser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import time
from mrjob.job import MRJob
from mrjob.protocol import RawValueProtocol, ReprProtocol
import re
class MrS3LogParser(MRJob):
"""Parses the logs from S3 based on the S3 logging format:
http://docs.aws.amazon.com/AmazonS3/latest/dev/LogFormat.html
Aggregates a user's daily requests by user agent and operation
Outputs date_time, requester, user_agent, operation, count
"""
LOGPATS = r'(\S+) (\S+) \[(.*?)\] (\S+) (\S+) ' \
r'(\S+) (\S+) (\S+) ("([^"]+)"|-) ' \
r'(\S+) (\S+) (\S+) (\S+) (\S+) (\S+) ' \
r'("([^"]+)"|-) ("([^"]+)"|-)'
NUM_ENTRIES_PER_LINE = 17
logpat = re.compile(LOGPATS)
(S3_LOG_BUCKET_OWNER,
S3_LOG_BUCKET,
S3_LOG_DATE_TIME,
S3_LOG_IP,
S3_LOG_REQUESTER_ID,
S3_LOG_REQUEST_ID,
S3_LOG_OPERATION,
S3_LOG_KEY,
S3_LOG_HTTP_METHOD,
S3_LOG_HTTP_STATUS,
S3_LOG_S3_ERROR,
S3_LOG_BYTES_SENT,
S3_LOG_OBJECT_SIZE,
S3_LOG_TOTAL_TIME,
S3_LOG_TURN_AROUND_TIME,
S3_LOG_REFERER,
S3_LOG_USER_AGENT) = range(NUM_ENTRIES_PER_LINE)
DELIMITER = '\t'
# We use RawValueProtocol for input to be format agnostic
# and avoid any type of parsing errors
INPUT_PROTOCOL = RawValueProtocol
# We use RawValueProtocol for output so we can output raw lines
# instead of (k, v) pairs
OUTPUT_PROTOCOL = RawValueProtocol
# Encode the intermediate records using repr() instead of JSON, so the
# record doesn't get Unicode-encoded
INTERNAL_PROTOCOL = ReprProtocol
def clean_date_time_zone(self, raw_date_time_zone):
"""Converts entry 22/Jul/2013:21:04:17 +0000 to the format
'YYYY-MM-DD HH:MM:SS' which is more suitable for loading into
a database such as Redshift or RDS
Note: requires the chars "[ ]" to be stripped prior to input
Returns the converted datetime annd timezone
or None for both values if failed
TODO: Needs to combine timezone with date as one field
"""
date_time = None
time_zone_parsed = None
# TODO: Probably cleaner to parse this with a regex
date_parsed = raw_date_time_zone[:raw_date_time_zone.find(":")]
time_parsed = raw_date_time_zone[raw_date_time_zone.find(":") + 1:
raw_date_time_zone.find("+") - 1]
time_zone_parsed = raw_date_time_zone[raw_date_time_zone.find("+"):]
try:
date_struct = time.strptime(date_parsed, "%d/%b/%Y")
converted_date = time.strftime("%Y-%m-%d", date_struct)
date_time = converted_date + " " + time_parsed
# Throws a ValueError exception if the operation fails that is
# caught by the calling function and is handled appropriately
except ValueError as error:
raise ValueError(error)
else:
return converted_date, date_time, time_zone_parsed
def mapper(self, _, line):
line = line.strip()
match = self.logpat.search(line)
date_time = None
requester = None
user_agent = None
operation = None
try:
for n in range(self.NUM_ENTRIES_PER_LINE):
group = match.group(1 + n)
if n == self.S3_LOG_DATE_TIME:
date, date_time, time_zone_parsed = \
self.clean_date_time_zone(group)
# Leave the following line of code if
# you want to aggregate by date
date_time = date + " 00:00:00"
elif n == self.S3_LOG_REQUESTER_ID:
requester = group
elif n == self.S3_LOG_USER_AGENT:
user_agent = group
elif n == self.S3_LOG_OPERATION:
operation = group
else:
pass
except Exception:
yield (("Error while parsing line: %s", line), 1)
else:
yield ((date_time, requester, user_agent, operation), 1)
def reducer(self, key, values):
output = list(key)
output = self.DELIMITER.join(output) + \
self.DELIMITER + \
str(sum(values))
yield None, output
def steps(self):
return [
self.mr(mapper=self.mapper,
reducer=self.reducer)
]
if __name__ == '__main__':
MrS3LogParser.run()