Skip to content

Fix missing logging import, rerun pre-commit #24

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ repos:
hooks:
- id: flake8
args: [--config=setup.cfg]
exclude: ^(btrdb/grpcinterface|tests|setup.py|btrdb4|docs)
exclude: ^(btrdb/grpcinterface|tests|setup.py|btrdb4|docs|benchmarks)
- repo: local
hooks:
- id: pytest-check
Expand Down
19 changes: 14 additions & 5 deletions benchmarks/benchmark_stream_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,9 @@ def main():
"""Run a single run of the benchmarks"""
conn = btrdb.connect(profile="andy")
stream1 = conn.stream_from_uuid(
list(conn.streams_in_collection("andy/7064-6684-5e6e-9e14-ff9ca7bae46e"))[0].uuid
list(conn.streams_in_collection("andy/7064-6684-5e6e-9e14-ff9ca7bae46e"))[
0
].uuid
)
start = stream1.earliest()[0].time
end = stream1.latest()[0].time
Expand All @@ -267,10 +269,17 @@ def main():
print(f"pointwidth of: {pointwidth}")
for f in [time_single_stream_arrow_raw_values, time_single_stream_raw_values]:
print(f(stream1, start, end, 0))
for f in [time_single_stream_arrow_windows_values, time_single_stream_windows_values]:
for f in [
time_single_stream_arrow_windows_values,
time_single_stream_windows_values,
]:
print(f(stream1, start, end, width_ns=width_ns, version=0))
for f in [time_single_stream_arrow_aligned_windows_values, time_single_stream_aligned_windows_values]:
for f in [
time_single_stream_arrow_aligned_windows_values,
time_single_stream_aligned_windows_values,
]:
print(f(stream1, start, end, pointwidth=pointwidth, version=0))

if __name__=="__main__":
main()

if __name__ == "__main__":
main()
43 changes: 33 additions & 10 deletions benchmarks/benchmark_streamset_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,11 @@ def time_streamset_arrow_raw_values(


def time_streamset_windows_values(
streamset: btrdb.stream.StreamSet, start: int, end: int, width_ns: int, version: int = 0
streamset: btrdb.stream.StreamSet,
start: int,
end: int,
width_ns: int,
version: int = 0,
) -> Dict[str, Union[str, int, float]]:
"""Return the elapsed time for the streamset windows values query

Expand Down Expand Up @@ -139,7 +143,11 @@ def time_streamset_windows_values(


def time_streamset_arrow_windows_values(
streamset: btrdb.stream.StreamSet, start: int, end: int, width_ns: int, version: int = 0
streamset: btrdb.stream.StreamSet,
start: int,
end: int,
width_ns: int,
version: int = 0,
) -> Dict[str, Union[str, int, float]]:
"""Return the elapsed time for the streamset arrow window values query

Expand Down Expand Up @@ -180,7 +188,11 @@ def time_streamset_arrow_windows_values(


def time_streamset_aligned_windows_values(
streamset: btrdb.stream.StreamSet, start: int, end: int, pointwidth: int, version: int = 0
streamset: btrdb.stream.StreamSet,
start: int,
end: int,
pointwidth: int,
version: int = 0,
) -> Dict[str, Union[str, int, float]]:
"""Return the elapsed time for the streamset window values query

Expand Down Expand Up @@ -217,7 +229,11 @@ def time_streamset_aligned_windows_values(


def time_streamset_arrow_aligned_windows_values(
streamset: btrdb.stream.StreamSet, start: int, end: int, pointwidth: int, version: int = 0
streamset: btrdb.stream.StreamSet,
start: int,
end: int,
pointwidth: int,
version: int = 0,
) -> Dict[str, Union[str, int, float]]:
"""Return the elapsed time for the streamset arrow aligned window values query

Expand Down Expand Up @@ -264,7 +280,6 @@ def _create_streamset_result_dict(
return {
"n_streams": len(streamset),
"total_points": point_count,

"total_time_seconds": total_time,
"streamset_version": version,
}
Expand All @@ -274,9 +289,15 @@ def main():
"""Run a single run of the benchmarks"""
conn = btrdb.connect(profile="andy")
stream1 = conn.stream_from_uuid(
list(conn.streams_in_collection("andy/7064-6684-5e6e-9e14-ff9ca7bae46e"))[0].uuid
list(conn.streams_in_collection("andy/7064-6684-5e6e-9e14-ff9ca7bae46e"))[
0
].uuid
)
stream2 = conn.stream_from_uuid(
list(conn.streams_in_collection("andy/30e6-d72f-5cc7-9966-bc1579dc4a72"))[
0
].uuid
)
stream2 = conn.stream_from_uuid(list(conn.streams_in_collection("andy/30e6-d72f-5cc7-9966-bc1579dc4a72"))[0].uuid)
streamset = btrdb.stream.StreamSet([stream1, stream2])
start = max(stream1.earliest()[0].time, stream2.earliest()[0].time)
end = min(stream1.latest()[0].time, stream2.latest()[0].time)
Expand All @@ -286,7 +307,7 @@ def main():
res_list = []
for f in [time_streamset_raw_values, time_streamset_arrow_raw_values]:
res = f(streamset, start, end, 0)
res["func"] = f.__name__
res["func"] = f.__name__
# for f in [time_streamset_windows_values, time_streamset_arrow_windows_values]:
# res = f(streamset, start, end, width_ns=width_ns, version=0)
# res["func"] = f.__name__
Expand All @@ -295,6 +316,8 @@ def main():
# res["func"] = res

return res
if __name__=="__main__":


if __name__ == "__main__":
results = main()
print(pandas.DataFrame(results))
print(pandas.DataFrame(results))
5 changes: 4 additions & 1 deletion btrdb/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
##########################################################################

import json
import logging
import os
import re
import uuid as uuidlib
Expand Down Expand Up @@ -113,6 +114,7 @@ def __init__(self, addrportstr, apikey=None):
)
self.channel = grpc.insecure_channel(addrportstr, chan_ops)


def _is_arrow_enabled(info):
info = {
"majorVersion": info.majorVersion,
Expand All @@ -127,6 +129,7 @@ def _is_arrow_enabled(info):
else:
return False


class BTrDB(object):
"""
The primary server connection object for communicating with a BTrDB server.
Expand All @@ -135,7 +138,7 @@ class BTrDB(object):
def __init__(self, endpoint):
self.ep = endpoint
self._executor = ThreadPoolExecutor()
self._ARROW_ENABLED = True #_is_arrow_enabled(self.ep.info())
self._ARROW_ENABLED = True # _is_arrow_enabled(self.ep.info())
logger.debug(f"ARROW ENABLED: {self._ARROW_ENABLED}")

def query(self, stmt, params=[]):
Expand Down
2 changes: 1 addition & 1 deletion btrdb/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def arrowMultiValues(self, uu_list, start, end, version_list, snap_periodNS):
start=start,
end=end,
versionMajor=[ver for ver in version_list],
snapPeriodNs=int(snap_periodNS)
snapPeriodNs=int(snap_periodNS),
)
for result in self.stub.ArrowMultiValues(params):
check_proto_stat(result.stat)
Expand Down
15 changes: 9 additions & 6 deletions btrdb/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

import pyarrow as pa

import btrdb.grpcinterface.btrdb_pb2_grpc
from btrdb.exceptions import (
BTrDBError,
BTRDBTypeError,
Expand Down Expand Up @@ -1169,7 +1168,7 @@ def versions(self):
self._pinned_versions if self._pinned_versions else self._latest_versions()
)

def count(self, precise:bool=False):
def count(self, precise: bool = False):
"""
Compute the total number of points in the streams using filters.

Expand All @@ -1196,10 +1195,14 @@ def count(self, precise:bool=False):
start = params.get("start", MINIMUM_TIME)
end = params.get("end", MAXIMUM_TIME)

versions = self._pinned_versions if self._pinned_versions else self._latest_versions()
versions = (
self._pinned_versions if self._pinned_versions else self._latest_versions()
)

my_counts_gen = self._btrdb._executor.map(
lambda s: s.count(start, end, version=versions.get(s.uuid, 0), precise=precise),
lambda s: s.count(
start, end, version=versions.get(s.uuid, 0), precise=precise
),
self._streams,
)

Expand Down Expand Up @@ -1655,7 +1658,7 @@ def _arrow_streamset_data(self):
)
data = main_table
return data

def rows(self):
"""
Returns a materialized list of tuples where each tuple contains the
Expand Down Expand Up @@ -1703,7 +1706,7 @@ def rows(self):
def arrow_rows(self):
"""Return tuples of rows from arrow table"""
raise NotImplementedError(
f"arrow_rows has not been implemented yet, please use `rows` if you need this functionality."
"arrow_rows has not been implemented yet, please use `rows` if you need this functionality."
)

def insert(self, data_map: dict, merge: str = "never") -> dict:
Expand Down
14 changes: 6 additions & 8 deletions btrdb/transformers.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import pandas as pd


##########################################################################
## Helper Functions
##########################################################################
Expand Down Expand Up @@ -119,7 +118,7 @@ def arrow_to_series(streamset, agg="mean", name_callable=None):
"""
if not streamset._btrdb._ARROW_ENABLED:
raise NotImplementedError(
f"arrow_to_series requires an arrow-enabled BTrDB server."
"arrow_to_series requires an arrow-enabled BTrDB server."
)
arrow_df = arrow_to_dataframe(
streamset=streamset, agg=agg, name_callable=name_callable
Expand Down Expand Up @@ -150,7 +149,7 @@ def arrow_to_dataframe(
"""
if not streamset._btrdb._ARROW_ENABLED:
raise NotImplementedError(
f"arrow_to_dataframe requires an arrow-enabled BTrDB server."
"arrow_to_dataframe requires an arrow-enabled BTrDB server."
)

try:
Expand Down Expand Up @@ -182,7 +181,6 @@ def arrow_to_dataframe(
if not callable(name_callable):
name_callable = lambda s: s.collection + "/" + s.name
tmp_table = streamset.arrow_values()
my_cols = [c for c in tmp_table.column_names]
col_names = _stream_names(streamset, name_callable)
cols = []
for name in col_names:
Expand Down Expand Up @@ -299,7 +297,7 @@ def arrow_to_polars(streamset, agg="mean", name_callable=None):
"""
if not streamset._btrdb._ARROW_ENABLED:
raise NotImplementedError(
f"arrow_to_polars requires an arrow-enabled BTrDB server."
"arrow_to_polars requires an arrow-enabled BTrDB server."
)
try:
import polars as pl
Expand All @@ -314,7 +312,7 @@ def arrow_to_polars(streamset, agg="mean", name_callable=None):
def arrow_to_arrow_table(streamset):
if not streamset._btrdb._ARROW_ENABLED:
raise NotImplementedError(
f"arrow_to_arrow_table requires an arrow-enabled BTrDB server."
"arrow_to_arrow_table requires an arrow-enabled BTrDB server."
)
return streamset.arrow_values()

Expand Down Expand Up @@ -427,7 +425,7 @@ def arrow_to_numpy(streamset, agg="mean"):
"""
if not streamset._btrdb._ARROW_ENABLED:
raise NotImplementedError(
f"arrow_to_numpy requires an arrow-enabled BTrDB server."
"arrow_to_numpy requires an arrow-enabled BTrDB server."
)
arrow_df = arrow_to_dataframe(streamset=streamset, agg=agg, name_callable=None)
return arrow_df.values
Expand Down Expand Up @@ -496,7 +494,7 @@ def arrow_to_dict(streamset, agg="mean", name_callable=None):
"""
if not streamset._btrdb._ARROW_ENABLED:
raise NotImplementedError(
f"arrow_to_dict requires an arrow-enabled BTrDB server."
"arrow_to_dict requires an arrow-enabled BTrDB server."
)
arrow_df = arrow_to_dataframe(
streamset=streamset, agg=agg, name_callable=name_callable
Expand Down
7 changes: 4 additions & 3 deletions tests/btrdb/test_arrow_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

@pytest.fixture
def stream1():
uu = uuid.UUID('0d22a53b-e2ef-4e0a-ab89-b2d48fb2592a')
uu = uuid.UUID("0d22a53b-e2ef-4e0a-ab89-b2d48fb2592a")
stream = Mock(Stream)
stream.version = Mock(return_value=11)
stream.uuid = Mock(return_value=uu)
Expand All @@ -28,7 +28,7 @@ def stream1():

@pytest.fixture
def stream2():
uu = uuid.UUID('17dbe387-89ea-42b6-864b-f505cdb483f5')
uu = uuid.UUID("17dbe387-89ea-42b6-864b-f505cdb483f5")
stream = Mock(Stream)
stream.version = Mock(return_value=22)
stream.uuid = Mock(return_value=uu)
Expand All @@ -42,5 +42,6 @@ def stream2():
stream._btrdb._ARROW_ENABLED = Mock(return_value=True)
return stream


class TestArrowStreams(object):
pass
pass
15 changes: 8 additions & 7 deletions tests/btrdb/test_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,6 @@
Testing package for the btrdb stream module
"""
import concurrent.futures

##########################################################################
## Imports
##########################################################################

import datetime
import json
import re
Expand All @@ -35,13 +30,16 @@
InvalidOperation,
NoSuchPoint,
StreamNotFoundError,
InvalidCollection,
NoSuchPoint,
)
from btrdb.grpcinterface import btrdb_pb2
from btrdb.point import RawPoint, StatPoint
from btrdb.stream import INSERT_BATCH_SIZE, Stream, StreamFilter, StreamSet

##########################################################################
## Imports
##########################################################################


RawPointProto = btrdb_pb2.RawPoint
StatPointProto = btrdb_pb2.StatPoint
EST = pytz.timezone("America/New_York")
Expand Down Expand Up @@ -85,6 +83,7 @@ def stream2():
stream._btrdb._ARROW_ENABLED = Mock(return_value=False)
return stream


@pytest.fixture
def arrow_stream3():
uu = uuid.UUID("17dbe387-89ea-42b6-864b-f505cdb483f5")
Expand All @@ -101,6 +100,7 @@ def arrow_stream3():
stream._btrdb._ARROW_ENABLED = Mock(return_value=True)
return stream


##########################################################################
## Stream Tests
##########################################################################
Expand Down Expand Up @@ -1195,6 +1195,7 @@ def test_earliest(self, stream1, stream2):
RawPoint(time=10, value=1),
RawPoint(time=20, value=1),
)

def test_latest(self, stream1, stream2):
"""
Assert latest returns correct time code
Expand Down