Skip to content

Commit 038c142

Browse files
authored
Fix arrow inserts (#28)
* Add insert benchmarking methods Benchmarking methods added for: * stream inserts using tuples of time, value data * stream inserts using pyarrow tables of timestamps, value columns * streamset inserts using a dict map of streamset stream uuids, and lists of tuples of time, value data * streamset inserts using a dict map of streamset stream uuids, and pyarrow tables of timestamps, values. * Include nullable false in pyarrow schema inserts * This was the only difference in the schemas between go and python. * also using a bytesIO stream to act as the sink for the ipc bytes.
1 parent 0a88f69 commit 038c142

File tree

1 file changed

+9
-5
lines changed

1 file changed

+9
-5
lines changed

btrdb/stream.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
"""
1111
Module for Stream and related classes
1212
"""
13+
import io
14+
1315
##########################################################################
1416
## Imports
1517
##########################################################################
@@ -512,8 +514,8 @@ def arrow_insert(self, data: pa.Table, merge: str = "never") -> int:
512514
logger.debug(f"tmp_table schema: {tmp_table.schema}")
513515
new_schema = pa.schema(
514516
[
515-
(pa.field("time", pa.timestamp(unit="ns", tz="UTC"))),
516-
(pa.field("value", pa.float64())),
517+
(pa.field("time", pa.timestamp(unit="ns", tz="UTC"), nullable=False)),
518+
(pa.field("value", pa.float64(), nullable=False)),
517519
]
518520
)
519521
tmp_table = tmp_table.cast(new_schema)
@@ -1930,10 +1932,12 @@ def _materialize_stream_as_table(arrow_bytes):
19301932

19311933

19321934
def _table_slice_to_feather_bytes(table_slice: pa.Table) -> bytes:
1933-
sink = pa.BufferOutputStream()
1935+
# sink = pa.BufferOutputStream()
1936+
sink = io.BytesIO()
19341937
with pa.ipc.new_stream(sink=sink, schema=table_slice.schema) as writer:
1935-
writer.write_table(table_slice)
1936-
return sink.readall()
1938+
writer.write(table_slice)
1939+
buf = sink.getvalue()
1940+
return buf
19371941

19381942

19391943
def _coalesce_table_deque(tables: deque):

0 commit comments

Comments
 (0)