Skip to content

Commit f38157d

Browse files
justinGilmerdavidkonigsbergjleifnf
authored
Threadpool executor (#22)
* Release v5.15.0 * update protobuf to v4.22.3 * Add threaded streamset calls Using concurrent.futures.ThreadPoolExecutor * Blacken code * Update for failing tests * Ignore flake8 as part of testing pytest-flake8 seems to have issues with the later versions of flake8 tholo/pytest-flake8#92 * Update .gitignore * Update ignore and remove extra print. * Remove idea folder (pycharm) --------- Co-authored-by: David Konigsberg <72822263+davidkonigsberg@users.noreply.github.com> Co-authored-by: Jeff Lin <42981468+jleifnf@users.noreply.github.com>
1 parent 74e7567 commit f38157d

File tree

10 files changed

+1684
-4008
lines changed

10 files changed

+1684
-4008
lines changed

.gitignore

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,3 +118,8 @@ dmypy.json
118118

119119
# Pyre type checker
120120
.pyre/
121+
.idea/misc.xml
122+
.idea/vcs.xml
123+
.idea/inspectionProfiles/profiles_settings.xml
124+
.idea/inspectionProfiles/Project_Default.xml
125+
/.idea/

btrdb/conn.py

Lines changed: 43 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import json
2121
import certifi
2222
import uuid as uuidlib
23+
from concurrent.futures import ThreadPoolExecutor
2324

2425
import grpc
2526
from grpc._cython.cygrpc import CompressionAlgorithm
@@ -42,8 +43,8 @@
4243
## Classes
4344
##########################################################################
4445

45-
class Connection(object):
4646

47+
class Connection(object):
4748
def __init__(self, addrportstr, apikey=None):
4849
"""
4950
Connects to a BTrDB server
@@ -57,7 +58,7 @@ def __init__(self, addrportstr, apikey=None):
5758
5859
"""
5960
addrport = addrportstr.split(":", 2)
60-
chan_ops = [('grpc.default_compression_algorithm', CompressionAlgorithm.gzip)]
61+
chan_ops = [("grpc.default_compression_algorithm", CompressionAlgorithm.gzip)]
6162

6263
if len(addrport) != 2:
6364
raise ValueError("expecting address:port")
@@ -82,40 +83,44 @@ def __init__(self, addrportstr, apikey=None):
8283
except Exception:
8384
if env_bundle != "":
8485
# The user has given us something but we can't use it, we need to make noise
85-
raise Exception("BTRDB_CA_BUNDLE(%s) env is defined but could not read file" % ca_bundle)
86+
raise Exception(
87+
"BTRDB_CA_BUNDLE(%s) env is defined but could not read file"
88+
% ca_bundle
89+
)
8690
else:
8791
contents = None
8892

8993
if apikey is None:
9094
self.channel = grpc.secure_channel(
9195
addrportstr,
9296
grpc.ssl_channel_credentials(contents),
93-
options=chan_ops
97+
options=chan_ops,
9498
)
9599
else:
96100
self.channel = grpc.secure_channel(
97101
addrportstr,
98102
grpc.composite_channel_credentials(
99103
grpc.ssl_channel_credentials(contents),
100-
grpc.access_token_call_credentials(apikey)
104+
grpc.access_token_call_credentials(apikey),
101105
),
102-
options=chan_ops
106+
options=chan_ops,
103107
)
104108
else:
105109
if apikey is not None:
106-
raise ValueError("cannot use an API key with an insecure (port 4410) BTrDB API. Try port 4411")
110+
raise ValueError(
111+
"cannot use an API key with an insecure (port 4410) BTrDB API. Try port 4411"
112+
)
107113
self.channel = grpc.insecure_channel(addrportstr, chan_ops)
108114

109115

110-
111-
112116
class BTrDB(object):
113117
"""
114118
The primary server connection object for communicating with a BTrDB server.
115119
"""
116120

117121
def __init__(self, endpoint):
118122
self.ep = endpoint
123+
self._executor = ThreadPoolExecutor()
119124

120125
def query(self, stmt, params=[]):
121126
"""
@@ -153,7 +158,6 @@ def query(self, stmt, params=[]):
153158
for row in page
154159
]
155160

156-
157161
def streams(self, *identifiers, versions=None, is_collection_prefix=False):
158162
"""
159163
Returns a StreamSet object with BTrDB streams from the supplied
@@ -196,20 +200,23 @@ def streams(self, *identifiers, versions=None, is_collection_prefix=False):
196200
found = self.streams_in_collection(
197201
"/".join(parts[:-1]),
198202
is_collection_prefix=is_collection_prefix,
199-
tags={"name": parts[-1]}
203+
tags={"name": parts[-1]},
200204
)
201205
if len(found) == 1:
202206
streams.append(found[0])
203207
continue
204208
raise StreamNotFoundError(f"Could not identify stream `{ident}`")
205209

206-
raise ValueError(f"Could not identify stream based on `{ident}`. Identifier must be UUID or collection/name.")
207-
210+
raise ValueError(
211+
f"Could not identify stream based on `{ident}`. Identifier must be UUID or collection/name."
212+
)
208213

209214
obj = StreamSet(streams)
210215

211216
if versions:
212-
version_dict = {streams[idx].uuid: versions[idx] for idx in range(len(versions))}
217+
version_dict = {
218+
streams[idx].uuid: versions[idx] for idx in range(len(versions))
219+
}
213220
obj.pin_versions(version_dict)
214221

215222
return obj
@@ -257,12 +264,14 @@ def create(self, uuid, collection, tags=None, annotations=None):
257264
annotations = {}
258265

259266
self.ep.create(uuid, collection, tags, annotations)
260-
return Stream(self, uuid,
267+
return Stream(
268+
self,
269+
uuid,
261270
known_to_exist=True,
262271
collection=collection,
263272
tags=tags.copy(),
264273
annotations=annotations.copy(),
265-
property_version=0
274+
property_version=0,
266275
)
267276

268277
def info(self):
@@ -279,7 +288,7 @@ def info(self):
279288
return {
280289
"majorVersion": info.majorVersion,
281290
"build": info.build,
282-
"proxy": { "proxyEndpoints": [ep for ep in info.proxy.proxyEndpoints] },
291+
"proxy": {"proxyEndpoints": [ep for ep in info.proxy.proxyEndpoints]},
283292
}
284293

285294
def list_collections(self, starts_with=""):
@@ -294,7 +303,9 @@ def list_collections(self, starts_with=""):
294303
"""
295304
return [c for some in self.ep.listCollections(starts_with) for c in some]
296305

297-
def streams_in_collection(self, *collection, is_collection_prefix=True, tags=None, annotations=None):
306+
def streams_in_collection(
307+
self, *collection, is_collection_prefix=True, tags=None, annotations=None
308+
):
298309
"""
299310
Search for streams matching given parameters
300311
@@ -329,16 +340,23 @@ def streams_in_collection(self, *collection, is_collection_prefix=True, tags=Non
329340
collection = [None]
330341

331342
for item in collection:
332-
streams = self.ep.lookupStreams(item, is_collection_prefix, tags, annotations)
343+
streams = self.ep.lookupStreams(
344+
item, is_collection_prefix, tags, annotations
345+
)
333346
for desclist in streams:
334347
for desc in desclist:
335348
tagsanns = unpack_stream_descriptor(desc)
336-
result.append(Stream(
337-
self, uuidlib.UUID(bytes = desc.uuid),
338-
known_to_exist=True, collection=desc.collection,
339-
tags=tagsanns[0], annotations=tagsanns[1],
340-
property_version=desc.propertyVersion
341-
))
349+
result.append(
350+
Stream(
351+
self,
352+
uuidlib.UUID(bytes=desc.uuid),
353+
known_to_exist=True,
354+
collection=desc.collection,
355+
tags=tagsanns[0],
356+
annotations=tagsanns[1],
357+
property_version=desc.propertyVersion,
358+
)
359+
)
342360

343361
return result
344362

btrdb/grpcinterface/README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
To create new protobuf files in this folder:
2+
```commandline
3+
python -m grpc_tools.protoc -I. --python_out=. --pyi_out=. --grpc_python_out=. ./btrdb.proto
4+
```
5+
6+
Make sure the proto file is newest.

0 commit comments

Comments
 (0)