Skip to content

Commit 354a13a

Browse files
justinGilmerdavidkonigsbergjleifnf
authored
Threaded arrow (#23)
* Release v5.15.0 * update protobuf to v4.22.3 * Add threaded streamset calls Using concurrent.futures.ThreadPoolExecutor * Blacken code * Update for failing tests * Ignore flake8 as part of testing pytest-flake8 seems to have issues with the later versions of flake8 tholo/pytest-flake8#92 * Update .gitignore * Update proto definitions. * Update endpoint to support arrow methods * Support arrow endpoints * Additional arrow updates * Update transformers, add polars conversion * Update .gitignore * Update ignore and remove extra print. * Remove idea folder (pycharm) * Update requirements.txt * Update btrdb/transformers.py * Update the way to check for arrow-enabled btrdb This has not been "turned on" yet though, since we dont know the version number this will be enabled for. The method is currently commented out, but can be re-enabled pretty easily. * Use IPC streams to send the arrow bytes for insert Instead of writing out feather files to an `io.BytesIO` stream and then sending the feather files over the wire, this creates a buffered outputstream and then sends that data back as bytes to btrdb. * Create arrow specific stream methods. * Update test conn object to support minor version * Update tests and migrate arrow code. * Arrow and standard streamset insert * Create basic arrow to dataframe transformer * Support multirawvalues, arrow transformers * Multivalue arrow queries, in progress * Update stream filter to properly filter for sampling frequency * Update arrow values queries for multivalues * Update param passing for sampling frequency * Update index passing, and ignore depth * benchmark raw values queries for arrow and current api * Add aligned windows and run func * Streamset read benchmarks (WIP) In addition: * update streamset.count to support the `precise` boolean flag. * Update mock return value for versionMajor * In progress validation of stream benchs --------- Co-authored-by: David Konigsberg <72822263+davidkonigsberg@users.noreply.github.com> Co-authored-by: Jeff Lin <42981468+jleifnf@users.noreply.github.com>
1 parent f38157d commit 354a13a

17 files changed

+2468
-547
lines changed

.gitignore

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,11 @@ dmypy.json
118118

119119
# Pyre type checker
120120
.pyre/
121+
122+
# arrow parquet files
123+
*.parquet
124+
125+
.idea
121126
.idea/misc.xml
122127
.idea/vcs.xml
123128
.idea/inspectionProfiles/profiles_settings.xml

benchmarks/benchmark_stream_reads.py

Lines changed: 276 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,276 @@
1+
import uuid
2+
from time import perf_counter
3+
from typing import Dict, Union
4+
5+
import btrdb
6+
7+
8+
def time_single_stream_raw_values(
9+
stream: btrdb.stream.Stream, start: int, end: int, version: int = 0
10+
) -> Dict[str, Union[int, str]]:
11+
"""Return the elapsed time for the stream raw values query
12+
13+
Parameters
14+
----------
15+
stream : btrdb.Stream, required
16+
The data stream to return raw values.
17+
start : int, required
18+
The start time (in nanoseconds) to return raw values (inclusive).
19+
end : int, required
20+
The end time (in nanoseconds) to return raw values (exclusive)
21+
version : int, optional, default : 0
22+
The version of the stream to query for points.
23+
24+
Notes
25+
-----
26+
The data points returned will be [start, end)
27+
28+
Returns
29+
-------
30+
results : dict
31+
The performance results of the stream method
32+
"""
33+
expected_count = stream.count(start, end, version=version, precise=True)
34+
tic = perf_counter()
35+
vals = stream.values(start, end, version=version)
36+
toc = perf_counter()
37+
# minus 1 to account for the exclusive end time
38+
queried_points = len(vals)
39+
assert queried_points == expected_count
40+
# time in seconds to run
41+
run_time = toc - tic
42+
results = _create_stream_result_dict(
43+
stream.uuid, point_count=queried_points, total_time=run_time, version=version
44+
)
45+
return results
46+
47+
48+
def time_single_stream_arrow_raw_values(
49+
stream: btrdb.stream.Stream, start: int, end: int, version: int = 0
50+
) -> Dict[str, Union[str, int, float]]:
51+
"""Return the elapsed time for the stream arrow raw values query
52+
53+
Parameters
54+
----------
55+
stream : btrdb.Stream, required
56+
The data stream to return the raw data as an arrow table.
57+
start : int, required
58+
The start time (in nanoseconds) to return raw values (inclusive).
59+
end : int, required
60+
The end time (in nanoseconds) to return raw values (exclusive)
61+
version : int, optional, default : 0
62+
The version of the stream to query for points.
63+
64+
Notes
65+
-----
66+
The data points returned will be [start, end)
67+
68+
Returns
69+
-------
70+
results : dict
71+
The performance results of the stream method
72+
"""
73+
# minus 1 to account for the exclusive end time for the values query
74+
expected_count = stream.count(start, end, version=version, precise=True)
75+
tic = perf_counter()
76+
vals = stream.arrow_values(start, end, version=version)
77+
toc = perf_counter()
78+
# num of rows
79+
queried_points = vals.num_rows
80+
assert queried_points == expected_count
81+
# time in seconds to run
82+
run_time = toc - tic
83+
results = _create_stream_result_dict(
84+
stream.uuid, point_count=queried_points, total_time=run_time, version=version
85+
)
86+
return results
87+
88+
89+
def time_single_stream_windows_values(
90+
stream: btrdb.stream.Stream, start: int, end: int, width_ns: int, version: int = 0
91+
) -> Dict[str, Union[str, int, float]]:
92+
"""Return the elapsed time for the stream windows values query
93+
94+
Parameters
95+
----------
96+
stream : btrdb.Stream, required
97+
The data stream to return the windowed data as a list of statpoints
98+
start : int, required
99+
The start time (in nanoseconds) to return statpoint values
100+
end : int, required
101+
The end time (in nanoseconds) to return statpoint values
102+
width_ns : int, required
103+
The window width (in nanoseconds) for the statpoints
104+
version : int, optional, default : 0
105+
The version of the stream to query for points.
106+
107+
Returns
108+
-------
109+
results : dict
110+
The performance results of the stream method
111+
"""
112+
tic = perf_counter()
113+
vals = stream.windows(start, end, width=width_ns, version=version)
114+
toc = perf_counter()
115+
# num of statpoints
116+
queried_points = len(vals)
117+
assert queried_points != 0
118+
# time in seconds to run
119+
run_time = toc - tic
120+
results = _create_stream_result_dict(
121+
stream.uuid, point_count=queried_points, total_time=run_time, version=version
122+
)
123+
return results
124+
125+
126+
def time_single_stream_arrow_windows_values(
127+
stream: btrdb.stream.Stream, start: int, end: int, width_ns: int, version: int = 0
128+
) -> Dict[str, Union[str, int, float]]:
129+
"""Return the elapsed time for the stream arrow window values query
130+
131+
Parameters
132+
----------
133+
stream : btrdb.Stream, required
134+
The data stream to return the windowed data as an arrow table.
135+
start : int, required
136+
The start time (in nanoseconds) to return statpoint values
137+
end : int, required
138+
The end time (in nanoseconds) to return statpoint values
139+
width_ns : int, required
140+
The window width (in nanoseconds) for the statpoints
141+
version : int, optional, default : 0
142+
The version of the stream to query for points.
143+
144+
Notes
145+
-----
146+
The data points returned will be [start, end)
147+
148+
Returns
149+
-------
150+
results : dict
151+
The performance results of the stream method
152+
"""
153+
tic = perf_counter()
154+
vals = stream.arrow_windows(start, end, width=width_ns, version=version)
155+
toc = perf_counter()
156+
# num of statpoints
157+
queried_points = vals.num_rows
158+
assert queried_points != 0
159+
# time in seconds to run
160+
run_time = toc - tic
161+
results = _create_stream_result_dict(
162+
stream.uuid, point_count=queried_points, total_time=run_time, version=version
163+
)
164+
return results
165+
166+
167+
def time_single_stream_aligned_windows_values(
168+
stream: btrdb.stream.Stream, start: int, end: int, pointwidth: int, version: int = 0
169+
) -> Dict[str, Union[str, int, float]]:
170+
"""Return the elapsed time for the stream window values query
171+
172+
Parameters
173+
----------
174+
stream : btrdb.Stream, required
175+
The data stream to return the windowed data as a list of statpoints
176+
start : int, required
177+
The start time (in nanoseconds) to return statpoint values
178+
end : int, required
179+
The end time (in nanoseconds) to return statpoint values
180+
pointwidth : int, required
181+
The level of the tree to return statpoints (the exponent k in 2**k)
182+
version : int, optional, default : 0
183+
The version of the stream to query for points.
184+
185+
Returns
186+
-------
187+
results : dict
188+
The performance results of the stream method
189+
"""
190+
tic = perf_counter()
191+
vals = stream.aligned_windows(start, end, pointwidth=pointwidth, version=version)
192+
toc = perf_counter()
193+
# num of statpoints
194+
queried_points = len(vals)
195+
assert queried_points != 0
196+
# time in seconds to run
197+
run_time = toc - tic
198+
results = _create_stream_result_dict(
199+
stream.uuid, point_count=queried_points, total_time=run_time, version=version
200+
)
201+
return results
202+
203+
204+
def time_single_stream_arrow_aligned_windows_values(
205+
stream: btrdb.stream.Stream, start: int, end: int, pointwidth: int, version: int = 0
206+
) -> Dict[str, Union[str, int, float]]:
207+
"""Return the elapsed time for the stream arrow aligned window values query
208+
209+
Parameters
210+
----------
211+
stream : btrdb.Stream, required
212+
The data stream to return the windowed data as an arrow table.
213+
start : int, required
214+
The start time (in nanoseconds) to return statpoint values
215+
end : int, required
216+
The end time (in nanoseconds) to return statpoint values
217+
pointwidth : int, required
218+
The level of the tree to return statpoints (the exponent k in 2**k)
219+
version : int, optional, default : 0
220+
The version of the stream to query for points.
221+
222+
Returns
223+
-------
224+
results : dict
225+
The performance results of the stream method
226+
"""
227+
tic = perf_counter()
228+
vals = stream.arrow_aligned_windows(
229+
start, end, pointwidth=pointwidth, version=version
230+
)
231+
toc = perf_counter()
232+
# num of statpoints
233+
queried_points = vals.num_rows
234+
assert queried_points != 0
235+
# time in seconds to run
236+
run_time = toc - tic
237+
results = _create_stream_result_dict(
238+
stream.uuid, point_count=queried_points, total_time=run_time, version=version
239+
)
240+
return results
241+
242+
243+
def _create_stream_result_dict(
244+
uu: uuid.UUID,
245+
point_count: int,
246+
total_time: float,
247+
version: int,
248+
) -> Dict[str, Union[str, int, float]]:
249+
return {
250+
"uuid": str(uu),
251+
"total_points": point_count,
252+
"total_time_seconds": total_time,
253+
"stream_version": version,
254+
}
255+
256+
257+
def main():
258+
"""Run a single run of the benchmarks"""
259+
conn = btrdb.connect(profile="andy")
260+
stream1 = conn.stream_from_uuid(
261+
list(conn.streams_in_collection("andy/7064-6684-5e6e-9e14-ff9ca7bae46e"))[0].uuid
262+
)
263+
start = stream1.earliest()[0].time
264+
end = stream1.latest()[0].time
265+
width_ns = btrdb.utils.timez.ns_delta(minutes=1)
266+
pointwidth = btrdb.utils.general.pointwidth(38)
267+
print(f"pointwidth of: {pointwidth}")
268+
for f in [time_single_stream_arrow_raw_values, time_single_stream_raw_values]:
269+
print(f(stream1, start, end, 0))
270+
for f in [time_single_stream_arrow_windows_values, time_single_stream_windows_values]:
271+
print(f(stream1, start, end, width_ns=width_ns, version=0))
272+
for f in [time_single_stream_arrow_aligned_windows_values, time_single_stream_aligned_windows_values]:
273+
print(f(stream1, start, end, pointwidth=pointwidth, version=0))
274+
275+
if __name__=="__main__":
276+
main()

0 commit comments

Comments
 (0)