Skip to content

implement async create for concurrency without threads #61

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 35 additions & 4 deletions btrdb/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
import io
import typing
import uuid
import grpc
import asyncio

from btrdb.exceptions import BTrDBError, check_proto_stat, error_handler
from btrdb.exceptions import BTrDBError, check_proto_stat, error_handler, handle_grpc_error
from btrdb.grpcinterface import btrdb_pb2, btrdb_pb2_grpc
from btrdb.point import RawPoint
from btrdb.utils.general import unpack_stream_descriptor
Expand Down Expand Up @@ -221,8 +223,7 @@ def setStreamTags(self, uu, expected, tags, collection):
result = self.stub.SetStreamTags(params)
check_proto_stat(result.stat)

@error_handler
def create(self, uu, collection, tags, annotations):
def _create_params(self, uu, collection, tags, annotations):
tagkvlist = []
for k, v in tags.items():
kv = btrdb_pb2.KeyOptValue(key=k, val=btrdb_pb2.OptValue(value=v))
Expand All @@ -231,12 +232,42 @@ def create(self, uu, collection, tags, annotations):
for k, v in annotations.items():
kv = btrdb_pb2.KeyOptValue(key=k, val=btrdb_pb2.OptValue(value=v))
annkvlist.append(kv)
params = btrdb_pb2.CreateParams(
return btrdb_pb2.CreateParams(
uuid=uu.bytes, collection=collection, tags=tagkvlist, annotations=annkvlist
)

@error_handler
def create(self, uu, collection, tags, annotations):
params = self._create_params(uu, collection, tags, annotations)
result = self.stub.Create(params)
check_proto_stat(result.stat)

async def create_async(self, uu, collection, tags, annotations):
loop = asyncio.get_running_loop()
params = self._create_params(uu, collection, tags, annotations)
fut = self.stub.Create.future(params)
async_fut = loop.create_future()

def async_done_cb(async_fut):
if async_fut.cancelled():
fut.cancel()

def done_cb(fut):
if fut.cancelled():
return
try:
try: # XXX it would be nice to avoid this double try nesting.
result = fut.result()
loop.call_soon_threadsafe(async_fut.set_result, result)
except grpc.RpcError as e:
handle_grpc_error(e)
except Exception as e:
loop.call_soon_threadsafe(async_fut.set_exception, e)

fut.add_done_callback(done_cb)
async_fut.add_done_callback(async_done_cb)
return await async_fut

@error_handler
def listCollections(self, prefix):
"""
Expand Down
2 changes: 0 additions & 2 deletions btrdb/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ def wrap(*args, **kwargs):

return wrap


##########################################################################
## gRPC error handling
##########################################################################
Expand Down Expand Up @@ -148,7 +147,6 @@ def check_proto_stat(stat):
raise BTRDBServerError(stat.msg)
raise BTrDBError(stat.msg)


##########################################################################
## BTrDB Exceptions
##########################################################################
Expand Down
29 changes: 3 additions & 26 deletions btrdb/transformers.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,18 +138,13 @@ def arrow_to_series(streamset, agg="mean", name_callable=None):
return [arrow_df[col] for col in arrow_df]


def arrow_to_dataframe(
streamset, columns=None, agg=None, name_callable=None
) -> pd.DataFrame:
def arrow_to_dataframe(streamset, agg=None, name_callable=None) -> pd.DataFrame:
"""
Returns a Pandas DataFrame object indexed by time and using the values of a
stream for each column.

Parameters
----------
columns: sequence
column names to use for DataFrame. Deprecated and not compatible with name_callable.

agg : List[str], default: ["mean"]
Specify the StatPoint fields (e.g. aggregating function) to create the dataframe
from. Must be one or more of "min", "mean", "max", "count", "stddev", or "all". This
Expand All @@ -175,13 +170,6 @@ def arrow_to_dataframe(
raise ImportError(
f"Please install Pandas and pyarrow to use this transformation function. ErrorMessage: {err}"
)
# deprecation warning added in v5.8
if columns:
warn(
"the columns argument is deprecated and will be removed in a future release",
DeprecationWarning,
stacklevel=2,
)

if agg is None:
agg = ["mean"]
Expand Down Expand Up @@ -227,16 +215,13 @@ def arrow_to_dataframe(
return tmp.to_pandas(date_as_object=False, types_mapper=pd.ArrowDtype)


def to_dataframe(streamset, columns=None, agg="mean", name_callable=None):
def to_dataframe(streamset, agg="mean", name_callable=None):
"""
Returns a Pandas DataFrame object indexed by time and using the values of a
stream for each column.

Parameters
----------
columns: sequence
column names to use for DataFrame. Deprecated and not compatible with name_callable.

agg : str, default: "mean"
Specify the StatPoint field (e.g. aggregating function) to create the Series
from. Must be one of "min", "mean", "max", "count", "stddev", or "all". This
Expand All @@ -253,14 +238,6 @@ def to_dataframe(streamset, columns=None, agg="mean", name_callable=None):
except ImportError:
raise ImportError("Please install Pandas to use this transformation function.")

# deprecation warning added in v5.8
if columns:
warn(
"the columns argument is deprecated and will be removed in a future release",
DeprecationWarning,
stacklevel=2,
)

# TODO: allow this at some future point
if agg == "all" and name_callable is not None:
raise AttributeError(
Expand Down Expand Up @@ -288,7 +265,7 @@ def to_dataframe(streamset, columns=None, agg="mean", name_callable=None):
]
df.columns = pd.MultiIndex.from_tuples(stream_names)
else:
df.columns = columns if columns else _stream_names(streamset, name_callable)
df.columns = _stream_names(streamset, name_callable)

return df

Expand Down
6 changes: 4 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ version = "5.31.0"
authors = [
{name="PingThingsIO", email="support@pingthings.io"},
]
maintainers = [
{name="PingThingsIO", email="support@pingthings.io"},
]
description = "Bindings to interact with the Berkeley Tree Database using gRPC."
readme = "README.md"
license = {file="LICENSE.txt"}
Expand Down Expand Up @@ -65,8 +68,7 @@ all = [
]

[project.urls]
"Homepage" = "https://btrdb.io"
"Docs" = "https://btrdb.readthedocs.io"
"Docs" = "https://btrdb-python.readthedocs.io/"
"Repository" = "https://github.com/pingthingsio/btrdb-python.git"

[build-system]
Expand Down
4 changes: 2 additions & 2 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
description-file = DESCRIPTION.md
license_file = LICENSE.txt
description_file = DESCRIPTION.md
license_files = LICENSE.txt

[aliases]
test=pytest
Expand Down
3 changes: 1 addition & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
REPOSITORY = "https://github.com/PingThingsIO/btrdb-python"
PACKAGE = "btrdb"
URL = "http://btrdb.io/"
DOCS_URL = "https://btrdb.readthedocs.io/en/latest/"
DOCS_URL = "https://btrdb-python.readthedocs.io/"

## Define the keywords
KEYWORDS = ("btrdb", "berkeley", "timeseries", "database", "bindings" "gRPC")
Expand Down Expand Up @@ -133,7 +133,6 @@ def get_description_type(path=PKG_DESCRIBE):
"license": LICENSE,
"author": AUTHOR,
"author_email": EMAIL,
"url": URL,
"maintainer": MAINTAINER,
"maintainer_email": EMAIL,
"project_urls": {
Expand Down
21 changes: 7 additions & 14 deletions tests/btrdb/test_transformers.py
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ def test_to_series(self, streamset):

def test_to_series_name_lambda(self, streamset):
"""
assert to_dateframe uses name lambda
assert to_series uses name lambda
"""
result = streamset.to_series(name_callable=lambda s: s.name)
assert [s.name for s in result] == ["stream0", "stream1", "stream2", "stream3"]
Expand Down Expand Up @@ -691,23 +691,16 @@ def test_to_dataframe(self, streamset):
df.set_index("time", inplace=True)
assert to_dataframe(streamset).equals(df)

def test_to_dataframe_column_issues_warning(self, statpoint_streamset):
def test_to_dataframe_column_issues_error(self, statpoint_streamset):
"""
assert to_dateframe with column argument issues warning
assert to_dateframe with column argument issues error
"""
columns = ["test/cats", "test/dogs", "test/horses", "test/fishes"]
with pytest.deprecated_call():
with pytest.raises(TypeError) as unexpected_key_err:
statpoint_streamset.to_dataframe(columns=columns)

def test_to_dataframe_column(self, statpoint_streamset):
"""
assert to_dateframe with column argument actually renames columns
"""
columns = ["test/cats", "test/dogs", "test/horses", "test/fishes"]
with pytest.deprecated_call():
df = statpoint_streamset.to_dataframe(columns=columns)

assert df.columns.tolist() == columns
assert "got an unexpected keyword argument 'columns'" in str(
unexpected_key_err.value
)

def test_to_dataframe_multindex(self, statpoint_streamset):
"""
Expand Down