Skip to content

Threadpool executor #22

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,8 @@ dmypy.json

# Pyre type checker
.pyre/
.idea/misc.xml
.idea/vcs.xml
.idea/inspectionProfiles/profiles_settings.xml
.idea/inspectionProfiles/Project_Default.xml
/.idea/
68 changes: 43 additions & 25 deletions btrdb/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import json
import certifi
import uuid as uuidlib
from concurrent.futures import ThreadPoolExecutor

import grpc
from grpc._cython.cygrpc import CompressionAlgorithm
Expand All @@ -42,8 +43,8 @@
## Classes
##########################################################################

class Connection(object):

class Connection(object):
def __init__(self, addrportstr, apikey=None):
"""
Connects to a BTrDB server
Expand All @@ -57,7 +58,7 @@ def __init__(self, addrportstr, apikey=None):

"""
addrport = addrportstr.split(":", 2)
chan_ops = [('grpc.default_compression_algorithm', CompressionAlgorithm.gzip)]
chan_ops = [("grpc.default_compression_algorithm", CompressionAlgorithm.gzip)]

if len(addrport) != 2:
raise ValueError("expecting address:port")
Expand All @@ -82,40 +83,44 @@ def __init__(self, addrportstr, apikey=None):
except Exception:
if env_bundle != "":
# The user has given us something but we can't use it, we need to make noise
raise Exception("BTRDB_CA_BUNDLE(%s) env is defined but could not read file" % ca_bundle)
raise Exception(
"BTRDB_CA_BUNDLE(%s) env is defined but could not read file"
% ca_bundle
)
else:
contents = None

if apikey is None:
self.channel = grpc.secure_channel(
addrportstr,
grpc.ssl_channel_credentials(contents),
options=chan_ops
options=chan_ops,
)
else:
self.channel = grpc.secure_channel(
addrportstr,
grpc.composite_channel_credentials(
grpc.ssl_channel_credentials(contents),
grpc.access_token_call_credentials(apikey)
grpc.access_token_call_credentials(apikey),
),
options=chan_ops
options=chan_ops,
)
else:
if apikey is not None:
raise ValueError("cannot use an API key with an insecure (port 4410) BTrDB API. Try port 4411")
raise ValueError(
"cannot use an API key with an insecure (port 4410) BTrDB API. Try port 4411"
)
self.channel = grpc.insecure_channel(addrportstr, chan_ops)




class BTrDB(object):
"""
The primary server connection object for communicating with a BTrDB server.
"""

def __init__(self, endpoint):
self.ep = endpoint
self._executor = ThreadPoolExecutor()

def query(self, stmt, params=[]):
"""
Expand Down Expand Up @@ -153,7 +158,6 @@ def query(self, stmt, params=[]):
for row in page
]


def streams(self, *identifiers, versions=None, is_collection_prefix=False):
"""
Returns a StreamSet object with BTrDB streams from the supplied
Expand Down Expand Up @@ -196,20 +200,23 @@ def streams(self, *identifiers, versions=None, is_collection_prefix=False):
found = self.streams_in_collection(
"/".join(parts[:-1]),
is_collection_prefix=is_collection_prefix,
tags={"name": parts[-1]}
tags={"name": parts[-1]},
)
if len(found) == 1:
streams.append(found[0])
continue
raise StreamNotFoundError(f"Could not identify stream `{ident}`")

raise ValueError(f"Could not identify stream based on `{ident}`. Identifier must be UUID or collection/name.")

raise ValueError(
f"Could not identify stream based on `{ident}`. Identifier must be UUID or collection/name."
)

obj = StreamSet(streams)

if versions:
version_dict = {streams[idx].uuid: versions[idx] for idx in range(len(versions))}
version_dict = {
streams[idx].uuid: versions[idx] for idx in range(len(versions))
}
obj.pin_versions(version_dict)

return obj
Expand Down Expand Up @@ -257,12 +264,14 @@ def create(self, uuid, collection, tags=None, annotations=None):
annotations = {}

self.ep.create(uuid, collection, tags, annotations)
return Stream(self, uuid,
return Stream(
self,
uuid,
known_to_exist=True,
collection=collection,
tags=tags.copy(),
annotations=annotations.copy(),
property_version=0
property_version=0,
)

def info(self):
Expand All @@ -279,7 +288,7 @@ def info(self):
return {
"majorVersion": info.majorVersion,
"build": info.build,
"proxy": { "proxyEndpoints": [ep for ep in info.proxy.proxyEndpoints] },
"proxy": {"proxyEndpoints": [ep for ep in info.proxy.proxyEndpoints]},
}

def list_collections(self, starts_with=""):
Expand All @@ -294,7 +303,9 @@ def list_collections(self, starts_with=""):
"""
return [c for some in self.ep.listCollections(starts_with) for c in some]

def streams_in_collection(self, *collection, is_collection_prefix=True, tags=None, annotations=None):
def streams_in_collection(
self, *collection, is_collection_prefix=True, tags=None, annotations=None
):
"""
Search for streams matching given parameters

Expand Down Expand Up @@ -329,16 +340,23 @@ def streams_in_collection(self, *collection, is_collection_prefix=True, tags=Non
collection = [None]

for item in collection:
streams = self.ep.lookupStreams(item, is_collection_prefix, tags, annotations)
streams = self.ep.lookupStreams(
item, is_collection_prefix, tags, annotations
)
for desclist in streams:
for desc in desclist:
tagsanns = unpack_stream_descriptor(desc)
result.append(Stream(
self, uuidlib.UUID(bytes = desc.uuid),
known_to_exist=True, collection=desc.collection,
tags=tagsanns[0], annotations=tagsanns[1],
property_version=desc.propertyVersion
))
result.append(
Stream(
self,
uuidlib.UUID(bytes=desc.uuid),
known_to_exist=True,
collection=desc.collection,
tags=tagsanns[0],
annotations=tagsanns[1],
property_version=desc.propertyVersion,
)
)

return result

Expand Down
6 changes: 6 additions & 0 deletions btrdb/grpcinterface/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
To create new protobuf files in this folder:
```commandline
python -m grpc_tools.protoc -I. --python_out=. --pyi_out=. --grpc_python_out=. ./btrdb.proto
```

Make sure the proto file is newest.
Loading