Skip to content

Commit c07a5be

Browse files
authored
Multistream read bench insert bench (#26)
* Fix multistream endpoint bugs * The streamset was passing the incorrect params to the endpoint * The endpoint does not return a `version` in its response, just `stat` and `arrowBytes` Params have been updated and a NoneType is passed around to ignore the lack of version info, which lets us use the same logic for all bytes decoding. * Add multistream benchmark methods for timesnap and no timesnap.
1 parent f41f711 commit c07a5be

File tree

3 files changed

+97
-2
lines changed

3 files changed

+97
-2
lines changed

benchmarks/benchmark_streamset_reads.py

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,92 @@ def time_streamset_arrow_aligned_windows_values(
288288
return results
289289

290290

291+
def time_streamset_arrow_multistream_raw_values_non_timesnapped(
292+
streamset: btrdb.stream.StreamSet,
293+
start: int,
294+
end: int,
295+
version: int = 0,
296+
sampling_frequency: int = None,
297+
) -> Dict[str, Union[str, int, float]]:
298+
"""Use the arrow multistream endpoint that joins the stream data on-server before sending to the client.
299+
300+
We make sure to set a sampling rate of 0 to ensure that we do not time snap and just perform a full-outer join on
301+
the streams.
302+
303+
Parameters
304+
----------
305+
streamset : btrdb.stream.StreamSet, required
306+
The streamset to perform the multistream query on.
307+
start : int, required
308+
The start time (in nanoseconds) to query raw data from.
309+
end : int, required
310+
The end time (in nanoseconds) non-exclusive, to query raw data from.
311+
version : int, optional, default=0
312+
The version of the stream to pin against, currently this is unused.
313+
sampling_frequency : int, optional, ignored
314+
The sampling frequency of the data stream in Hz
315+
316+
Notes
317+
-----
318+
Sampling_frequency is not used here, it will be manually set.
319+
"""
320+
streamset = streamset.filter(start=start, end=end, sampling_frequency=0)
321+
versions = {s.uuid: 0 for s in streamset}
322+
streamset = streamset.pin_versions(versions)
323+
tic = perf_counter()
324+
vals = streamset.arrow_values()
325+
toc = perf_counter()
326+
queried_points = vals.num_rows * len(streamset)
327+
# print(vals)
328+
# print(vals.to_pandas().describe())
329+
run_time = toc - tic
330+
results = _create_streamset_result_dict(
331+
streamset=streamset, total_time=run_time, point_count=queried_points, version=0
332+
)
333+
return results
334+
335+
336+
def time_streamset_arrow_multistream_raw_values_timesnapped(
337+
streamset: btrdb.stream.StreamSet,
338+
start: int,
339+
end: int,
340+
sampling_frequency: int,
341+
version: int = 0,
342+
) -> Dict[str, Union[str, int, float]]:
343+
"""Use the arrow multistream endpoint that joins the stream data on-server before sending to the client.
344+
345+
We make sure to set a sampling rate to ensure that we time snap the returned data.
346+
347+
Parameters
348+
----------
349+
streamset : btrdb.stream.StreamSet, required
350+
The streamset to perform the multistream query on.
351+
start : int, required
352+
The start time (in nanoseconds) to query raw data from.
353+
end : int, required
354+
The end time (in nanoseconds) non-exclusive, to query raw data from.
355+
sampling_frequency : int, required
356+
The common sampling frequency (in Hz) of the data to snap the data points to.
357+
version : int, optional, default=0
358+
The version of the stream to pin against, currently this is unused.
359+
"""
360+
streamset = streamset.filter(
361+
start=start, end=end, sampling_frequency=sampling_frequency
362+
)
363+
versions = {s.uuid: 0 for s in streamset}
364+
streamset = streamset.pin_versions(versions)
365+
tic = perf_counter()
366+
vals = streamset.arrow_values()
367+
toc = perf_counter()
368+
queried_points = vals.num_rows * len(streamset)
369+
# print(vals)
370+
run_time = toc - tic
371+
results = _create_streamset_result_dict(
372+
streamset=streamset, total_time=run_time, point_count=queried_points, version=0
373+
)
374+
return results
375+
376+
291377
def _create_streamset_result_dict(
292378
streamset: btrdb.stream.StreamSet,
293379
point_count: int,
@@ -337,6 +423,13 @@ def main():
337423
res = f(streamset, start, end, pointwidth=pointwidth, version=0)
338424
res["func"] = f.__name__
339425
res_list.append(res)
426+
for f in [
427+
time_streamset_arrow_multistream_raw_values_non_timesnapped,
428+
time_streamset_arrow_multistream_raw_values_timesnapped,
429+
]:
430+
res = f(streamset, start, end, sampling_frequency=2, version=0)
431+
res["func"] = f.__name__
432+
res_list.append(res)
340433

341434
return res_list
342435

btrdb/endpoint.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ def arrowMultiValues(self, uu_list, start, end, version_list, snap_periodNS):
7171
)
7272
for result in self.stub.ArrowMultiValues(params):
7373
check_proto_stat(result.stat)
74-
yield result.arrowBytes, result.versionMajor
74+
yield result.arrowBytes, None
7575

7676
@error_handler
7777
def arrowInsertValues(self, uu: uuid.UUID, values: bytearray, policy: str):

btrdb/stream.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1837,8 +1837,10 @@ def _arrow_multivalues(self, period_ns: int):
18371837
params = self._params_from_filters()
18381838
versions = self.versions()
18391839
params["uu_list"] = [s.uuid for s in self._streams]
1840-
params["versions"] = [versions[s.uuid] for s in self._streams]
1840+
params["version_list"] = [versions[s.uuid] for s in self._streams]
18411841
params["snap_periodNS"] = period_ns
1842+
# dict.pop(key, default_return_value_if_no_key)
1843+
_ = params.pop("sampling_frequency", None)
18421844
arr_bytes = self._btrdb.ep.arrowMultiValues(**params)
18431845
# exhausting the generator from above
18441846
bytes_materialized = list(arr_bytes)

0 commit comments

Comments
 (0)