Skip to content

Commit 8bcb3c1

Browse files
committed
add dask to requirements and fix missing pattern for unit filter
1 parent a9a39cd commit 8bcb3c1

File tree

2 files changed

+46
-31
lines changed

2 files changed

+46
-31
lines changed

btrdb/stream.py

Lines changed: 45 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ def count(
210210
pointwidth = min(
211211
pointwidth,
212212
pw.from_nanoseconds(to_nanoseconds(end) - to_nanoseconds(start)) - 1,
213-
)
213+
)
214214
points = self.aligned_windows(start, end, pointwidth, version)
215215
return sum([point.count for point, _ in points])
216216

@@ -583,7 +583,7 @@ def insert(self, data, merge="never"):
583583
version = 0
584584
i = 0
585585
while i < len(data):
586-
thisBatch = data[i: i + INSERT_BATCH_SIZE]
586+
thisBatch = data[i : i + INSERT_BATCH_SIZE]
587587
version = self._btrdb.ep.insert(self._uuid, thisBatch, merge)
588588
i += INSERT_BATCH_SIZE
589589
return version
@@ -1435,20 +1435,22 @@ def __repr__(self):
14351435
## StreamSet Classes
14361436
##########################################################################
14371437

1438+
14381439
@delayed
14391440
def get_metadata(stream):
1440-
columns = ['collection', 'tags', 'annotations', 'stream', 'uuid']
1441+
columns = ["collection", "tags", "annotations", "stream", "uuid"]
14411442
stream.refresh_metadata()
1442-
metadata = {c: (getattr(stream, f"_{c}") if c != 'stream' else stream)
1443-
for c in columns}
1443+
metadata = {
1444+
c: (getattr(stream, f"_{c}") if c != "stream" else stream) for c in columns
1445+
}
14441446
return metadata
14451447

1448+
14461449
class StreamSetBase(Sequence):
14471450
"""
14481451
A lighweight wrapper around a list of stream objects
14491452
"""
14501453

1451-
14521454
def __init__(self, streams):
14531455
self._streams = streams
14541456
if len(self._streams) < 1:
@@ -1467,11 +1469,11 @@ def __init__(self, streams):
14671469
self.depth = 0
14681470

14691471
# create a DataFrame to store the metadata for filtering
1470-
_columns = ['collection', 'tags', 'annotations', 'stream', 'uuid']
14711472
_metadata = compute([get_metadata(s) for s in self._streams])[0]
14721473
_metadata = pd.DataFrame(_metadata)
1473-
self._metadata = (_metadata.join(pd.json_normalize(_metadata['tags']))
1474-
.drop(columns=['tags', 'annotations']))
1474+
self._metadata = _metadata.join(pd.json_normalize(_metadata["tags"])).drop(
1475+
columns=["tags", "annotations"]
1476+
)
14751477

14761478
@property
14771479
def allow_window(self):
@@ -1735,55 +1737,67 @@ def filter(
17351737
# filter by collection
17361738
if collection is not None:
17371739
if isinstance(collection, RE_PATTERN):
1738-
tf = (tf & obj._metadata.collection.str.contains(collection.pattern, case=False, regex=True))
1740+
tf = tf & obj._metadata.collection.str.contains(
1741+
collection.pattern, case=False, regex=True
1742+
)
17391743
elif isinstance(collection, str):
1740-
tf = (tf & obj._metadata.collection.str.contains(collection, case=False, regex=False))
1744+
tf = tf & obj._metadata.collection.str.contains(
1745+
collection, case=False, regex=False
1746+
)
17411747
else:
17421748
raise BTRDBTypeError("collection must be string or compiled regex")
17431749

17441750
# filter by name
17451751
if name is not None:
17461752
if isinstance(name, RE_PATTERN):
1747-
tf = (tf & obj._metadata.name.str.contains(name.pattern, case=False, regex=True))
1753+
tf = tf & obj._metadata.name.str.contains(
1754+
name.pattern, case=False, regex=True
1755+
)
17481756
elif isinstance(name, str):
1749-
tf = (tf & obj._metadata.name.str.contains(name, case=False, regex=False))
1757+
tf = tf & obj._metadata.name.str.contains(name, case=False, regex=False)
17501758
else:
17511759
raise BTRDBTypeError("name must be string or compiled regex")
17521760

17531761
# filter by unit
17541762
if unit is not None:
17551763
if isinstance(unit, RE_PATTERN):
1756-
tf = (tf & obj._metadata.unit.str.contains(unit, case=False, regex=True))
1764+
tf = tf & obj._metadata.unit.str.contains(
1765+
unit.pattern, case=False, regex=True
1766+
)
17571767
elif isinstance(unit, str):
1758-
tf = (tf & obj._metadata.name.str.contains(unit, case=False, regex=False))
1768+
tf = tf & obj._metadata.name.str.contains(unit, case=False, regex=False)
17591769
else:
17601770
raise BTRDBTypeError("unit must be string or compiled regex")
17611771

17621772
# filter by tags
17631773
if tags:
1764-
tf = (tf & obj._metadata.loc[:, obj._metadata.columns.isin(tags.keys())]
1765-
.apply(lambda x: x.str.contains(tags[x.name], case=False, regex=False))
1766-
.all(axis=1))
1774+
tf = tf & obj._metadata.loc[
1775+
:, obj._metadata.columns.isin(tags.keys())
1776+
].apply(
1777+
lambda x: x.str.contains(tags[x.name], case=False, regex=False)
1778+
).all(
1779+
axis=1
1780+
)
17671781
obj._metadata = obj._metadata[tf]
17681782

17691783
# filter by annotations
17701784
if annotations:
1771-
_annotations = pd.json_normalize(obj._metadata['annotations'])
1785+
_annotations = pd.json_normalize(obj._metadata["annotations"])
17721786
if not _annotations.columns.isin(annotations.keys()).any():
17731787
raise BTRDBValueError("annotations key not found")
1774-
_metadata = obj._metadata.join(
1775-
_annotations,
1776-
rsuffix='_annotations'
1777-
).drop(columns=['annotations'])
1788+
obj._metadata = obj._metadata.join(
1789+
_annotations, rsuffix="_annotations"
1790+
).drop(columns=["annotations"])
17781791

1779-
_columns = list(annotations.keys()) + list(map(lambda s: "".join([s,'_annotations']), annotations.keys()))
1792+
_columns = list(annotations.keys()) + list(
1793+
map(lambda s: "".join([s, "_annotations"]), annotations.keys())
1794+
)
17801795
# filters if the subset of the annotations matches the given annotations
1781-
tf = (tf
1782-
& obj._metadata.loc[:, obj._metadata.columns.isin(_columns)]
1783-
.apply(lambda x: x.str.contains(annotations[x.name], case=False, regex=False))
1784-
.all(axis=1))
1796+
tf = tf & obj._metadata.loc[:, obj._metadata.columns.isin(_columns)].apply(
1797+
lambda x: x.str.contains(annotations[x.name], case=False, regex=False)
1798+
).all(axis=1)
17851799
obj._metadata = obj._metadata[tf]
1786-
obj._streams = obj._metadata['stream']
1800+
obj._streams = obj._metadata["stream"]
17871801
return obj
17881802

17891803
def clone(self):
@@ -2195,7 +2209,7 @@ def arrow_values(
21952209
pa.field(str(s.uuid), pa.float64(), nullable=False)
21962210
for s in self._streams
21972211
],
2198-
)
2212+
)
21992213
data = pa.Table.from_arrays(
22002214
[pa.array([]) for i in range(1 + len(self._streams))], schema=schema
22012215
)
@@ -2277,4 +2291,4 @@ def _coalesce_table_deque(tables: deque):
22772291
main_table = main_table.join(
22782292
t2, "time", join_type="full outer", right_suffix=f"_{idx}"
22792293
)
2280-
return main_table
2294+
return main_table

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
# pip-compile --output-file=requirements.txt --resolver=backtracking pyproject.toml
66
#
77
# This file was modified to remove version pins.
8+
dask
89
certifi
910
# via btrdb (pyproject.toml)
1011
grpcio

0 commit comments

Comments
 (0)