Skip to content

Commit 19edda0

Browse files
Subscription endpoint (#63)
Co-authored-by: Andrew Chambers <andrew@pingthings.io>
1 parent 8c4fed5 commit 19edda0

File tree

5 files changed

+754
-645
lines changed

5 files changed

+754
-645
lines changed

btrdb/endpoint.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,3 +386,25 @@ def sql_query(self, stmt, params: typing.List):
386386
for page in self.stub.SQLQuery(request):
387387
check_proto_stat(page.stat)
388388
yield page.SQLQueryRow
389+
390+
@error_handler
391+
def subscribe(self, update_queue):
392+
def updates():
393+
while True:
394+
update = update_queue.get()
395+
if update is None:
396+
return
397+
(to_add, to_remove) = update
398+
if len(to_add) != 0:
399+
yield btrdb_pb2.SubscriptionUpdate(
400+
op=0, uuid=[uu.bytes for uu in to_add]
401+
)
402+
if len(to_remove) != 0:
403+
yield btrdb_pb2.SubscriptionUpdate(
404+
op=1, uuid=[uu.bytes for uu in to_remove]
405+
)
406+
407+
for response in self.stub.Subscribe(updates()):
408+
check_proto_stat(response.stat)
409+
with pa.ipc.open_stream(response.arrowBytes) as reader:
410+
yield uuid.UUID(bytes=response.uuid), reader.read_all()

btrdb/grpcinterface/btrdb.proto

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ service BTrDB {
2828
rpc GetMetadataUsage(MetadataUsageParams) returns (MetadataUsageResponse);
2929
rpc GenerateCSV(GenerateCSVParams) returns (stream GenerateCSVResponse);
3030
rpc SQLQuery(SQLQueryParams) returns (stream SQLQueryResponse);
31+
rpc Subscribe(stream SubscriptionUpdate) returns (stream SubscriptionResp);
3132
//rpc SetCompactionConfig(SetCompactionConfigParams) returns (SetCompactionConfigResponse);
3233
//rpc GetCompactionConfig(GetCompactionConfigParams) returns (GetCompactionConfigResponse);
3334
}
@@ -426,3 +427,19 @@ message ReducedResolutionRange {
426427
int64 End = 2;
427428
uint32 Resolution = 3;
428429
}
430+
431+
enum SubscriptionUpdateOp {
432+
ADD_UUIDS = 0;
433+
REMOVE_UUIDS = 1;
434+
}
435+
436+
message SubscriptionUpdate {
437+
SubscriptionUpdateOp op = 1;
438+
repeated bytes uuid = 2;
439+
}
440+
441+
message SubscriptionResp {
442+
Status stat = 1;
443+
bytes uuid = 2;
444+
bytes arrowBytes = 3;
445+
}

btrdb/grpcinterface/btrdb_pb2.py

Lines changed: 151 additions & 146 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)