File tree Expand file tree Collapse file tree 2 files changed +58
-0
lines changed Expand file tree Collapse file tree 2 files changed +58
-0
lines changed Original file line number Diff line number Diff line change @@ -37,3 +37,8 @@ def _register_keras():
3737@partial (register_serialization_lazy , "sparse" )
3838def _register_sparse ():
3939 from . import sparse
40+
41+
42+ @partial (register_serialization_lazy , "arrow" )
43+ def _register_arrow ():
44+ from . import arrow
Original file line number Diff line number Diff line change 1+ from __future__ import print_function , division , absolute_import
2+
3+ from .serialize import register_serialization
4+
5+
6+ def serialize_batch (batch ):
7+ import pyarrow as pa
8+ sink = pa .BufferOutputStream ()
9+ writer = pa .RecordBatchStreamWriter (sink , batch .schema )
10+ writer .write_batch (batch )
11+ writer .close ()
12+ buf = sink .get_result ()
13+ header = {}
14+ frames = [buf .to_pybytes ()]
15+ return header , frames
16+
17+
18+ def deserialize_batch (header , frames ):
19+ import pyarrow as pa
20+ blob = frames [0 ]
21+ reader = pa .RecordBatchStreamReader (pa .BufferReader (blob ))
22+ return reader .read_next_batch ()
23+
24+
25+ def serialize_table (tbl ):
26+ import pyarrow as pa
27+ sink = pa .BufferOutputStream ()
28+ writer = pa .RecordBatchStreamWriter (sink , tbl .schema )
29+ writer .write_table (tbl )
30+ writer .close ()
31+ buf = sink .get_result ()
32+ header = {}
33+ frames = [buf .to_pybytes ()]
34+ return header , frames
35+
36+
37+ def deserialize_table (header , frames ):
38+ import pyarrow as pa
39+ blob = frames [0 ]
40+ reader = pa .RecordBatchStreamReader (pa .BufferReader (blob ))
41+ return reader .read_all ()
42+
43+
44+ register_serialization (
45+ 'pyarrow.lib.RecordBatch' ,
46+ serialize_batch ,
47+ deserialize_batch
48+ )
49+ register_serialization (
50+ 'pyarrow.lib.Table' ,
51+ serialize_table ,
52+ deserialize_table
53+ )
You can’t perform that action at this time.
0 commit comments