-
Notifications
You must be signed in to change notification settings - Fork 81
/
calculateAveragePypy.py
157 lines (127 loc) · 4.34 KB
/
calculateAveragePypy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# time pypy3 calculateAveragePypy.py
import os
import multiprocessing as mp
def get_file_chunks(
file_name: str,
max_cpu: int = 8,
) -> list:
"""Split flie into chunks"""
cpu_count = min(max_cpu, mp.cpu_count())
file_size = os.path.getsize(file_name)
chunk_size = file_size // cpu_count
start_end = list()
with open(file_name, "r+b") as f:
def is_new_line(position):
if position == 0:
return True
else:
f.seek(position - 1)
return f.read(1) == b"\n"
def next_line(position):
f.seek(position)
f.readline()
return f.tell()
chunk_start = 0
while chunk_start < file_size:
chunk_end = min(file_size, chunk_start + chunk_size)
while not is_new_line(chunk_end):
chunk_end -= 1
if chunk_start == chunk_end:
chunk_end = next_line(chunk_end)
start_end.append(
(
file_name,
chunk_start,
chunk_end,
)
)
chunk_start = chunk_end
return (
cpu_count,
start_end,
)
def _process_file_chunk(
file_name: str,
chunk_start: int,
chunk_end: int,
blocksize: int = 1024 * 1024,
) -> dict:
"""Process each file chunk in a different process"""
result = dict()
with open(file_name, "r+b") as fh:
fh.seek(chunk_start)
tail = b""
location = None
byte_count = chunk_end - chunk_start
while byte_count > 0:
if blocksize > byte_count:
blocksize = byte_count
byte_count -= blocksize
index = 0
data = tail + fh.read(blocksize)
while data:
if location is None:
try:
semicolon = data.index(b";", index)
except ValueError:
tail = data[index:]
break
location = data[index:semicolon]
index = semicolon + 1
try:
newline = data.index(b"\n", index)
except ValueError:
tail = data[index:]
break
value = float(data[index:newline])
index = newline + 1
if location not in result:
result[location] = [
value,
value,
value,
1,
] # min, max, sum, count
else:
if value < result[location][0]:
result[location][0] = value
if value > result[location][1]:
result[location][1] = value
result[location][2] += value
result[location][3] += 1
location = None
return result
def process_file(
cpu_count: int,
start_end: list,
) -> dict:
"""Process data file"""
with mp.Pool(cpu_count) as p:
# Run chunks in parallel
chunk_results = p.starmap(
_process_file_chunk,
start_end,
)
# Combine all results from all chunks
result = dict()
for chunk_result in chunk_results:
for location, measurements in chunk_result.items():
if location not in result:
result[location] = measurements
else:
if measurements[0] < result[location][0]:
result[location][0] = measurements[0]
if measurements[1] > result[location][1]:
result[location][1] = measurements[1]
result[location][2] += measurements[2]
result[location][3] += measurements[3]
# Print final results
results_calculated = dict()
for location, measurements in sorted(result.items()):
results_calculated[
location.decode("utf-8")
] = f"{measurements[0]:.1f}/{(measurements[2] / measurements[3]) if measurements[3] !=0 else 0:.1f}/{measurements[1]:.1f}"
return results_calculated
if __name__ == "__main__":
cpu_count, *start_end = get_file_chunks("measurements.txt")
print(process_file(cpu_count, start_end[0]))