Skip to content

Commit deeb3a7

Browse files
committed
#49 Implement Python native read with PyArrow
1 parent 0be8175 commit deeb3a7

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+2486
-20
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
.idea/
22
!.idea/vcs.xml
3+
.vscode/
34

45
# paimon-python-java-bridge
56
target

dev/dev-requirements.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ setuptools>=18.0
2121
wheel
2222
py4j==0.10.9.7
2323
pyarrow>=5.0.0
24+
fastavro>=1.9.0
25+
zstandard>=0.23.0
2426
pandas>=1.3.0
2527
numpy>=1.22.4
2628
python-dateutil>=2.8.0,<3

dev/test_deps/paimon-python-java-bridge-0.9-SNAPSHOT.jar renamed to dev/test_deps/paimon-python-java-bridge-1.0-SNAPSHOT.jar

41.4 MB
Binary file not shown.

paimon-python-java-bridge/pom.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,22 +23,22 @@
2323

2424
<groupId>org.apache.paimon</groupId>
2525
<artifactId>paimon-python-java-bridge</artifactId>
26-
<version>0.9-SNAPSHOT</version>
26+
<version>1.0-SNAPSHOT</version>
2727
<name>Paimon : Python-Java Bridge</name>
2828

2929
<packaging>jar</packaging>
3030
<inceptionYear>2024</inceptionYear>
3131

3232
<properties>
33-
<paimon.version>0.9.0</paimon.version>
33+
<paimon.version>1.0.0</paimon.version>
3434
<py4j.version>0.10.9.7</py4j.version>
3535
<slf4j.version>1.7.32</slf4j.version>
3636
<log4j.version>2.17.1</log4j.version>
3737
<spotless.version>2.13.0</spotless.version>
3838
<spotless.delimiter>package</spotless.delimiter>
3939
<arrow.version>14.0.0</arrow.version>
4040
<target.java.version>1.8</target.java.version>
41-
<paimon.ci.tools.version>0.9.0</paimon.ci.tools.version>
41+
<paimon.ci.tools.version>1.0.0</paimon.ci.tools.version>
4242
</properties>
4343

4444
<dependencies>

paimon-python-java-bridge/src/main/java/org/apache/paimon/python/BytesWriter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,11 @@ public class BytesWriter {
4444

4545
public BytesWriter(TableWrite tableWrite, RowType rowType) {
4646
this.tableWrite = tableWrite;
47-
this.arrowBatchReader = new ArrowBatchReader(rowType);
47+
this.arrowBatchReader = new ArrowBatchReader(rowType, true);
4848
this.allocator = new RootAllocator();
4949
arrowFields =
5050
rowType.getFields().stream()
51-
.map(f -> ArrowUtils.toArrowField(f.name(), f.type()))
51+
.map(f -> ArrowUtils.toArrowField(f.name(), f.id(), f.type(), 0))
5252
.collect(Collectors.toList());
5353
}
5454

paimon-python-java-bridge/src/main/java/org/apache/paimon/python/InvocationUtil.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,22 @@
1818

1919
package org.apache.paimon.python;
2020

21+
import org.apache.paimon.data.InternalRow;
22+
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
23+
import org.apache.paimon.reader.ReaderSupplier;
24+
import org.apache.paimon.reader.RecordReader;
2125
import org.apache.paimon.table.Table;
2226
import org.apache.paimon.table.sink.BatchWriteBuilder;
2327
import org.apache.paimon.table.sink.TableWrite;
2428
import org.apache.paimon.table.source.ReadBuilder;
29+
import org.apache.paimon.table.source.Split;
2530
import org.apache.paimon.table.source.TableRead;
2631
import org.apache.paimon.types.RowType;
2732

33+
import java.io.IOException;
34+
import java.util.ArrayList;
35+
import java.util.List;
36+
2837
/**
2938
* Call some methods in Python directly will raise py4j.Py4JException: Method method([]) does not
3039
* exist. This util is a workaround.
@@ -47,4 +56,13 @@ public static ParallelBytesReader createParallelBytesReader(
4756
public static BytesWriter createBytesWriter(TableWrite tableWrite, RowType rowType) {
4857
return new BytesWriter(tableWrite, rowType);
4958
}
59+
60+
public static RecordReader<InternalRow> createReader(TableRead tableRead, List<Split> splits)
61+
throws IOException {
62+
List<ReaderSupplier<InternalRow>> readers = new ArrayList();
63+
for (Split split : splits) {
64+
readers.add(() -> tableRead.createReader(split));
65+
}
66+
return ConcatRecordReader.create(readers);
67+
}
5068
}

pypaimon/py4j/java_implementation.py

Lines changed: 107 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
# See the License for the specific language governing permissions and
1616
# limitations under the License.
1717
################################################################################
18+
import os
1819

1920
# pypaimon.api implementation based on Java code & py4j lib
2021

@@ -30,6 +31,11 @@
3031
table_commit, Schema, predicate)
3132
from typing import List, Iterator, Optional, Any, TYPE_CHECKING
3233

34+
from pypaimon.pynative.common.exception import PyNativeNotImplementedError
35+
from pypaimon.pynative.common.predicate import PyNativePredicate
36+
from pypaimon.pynative.common.row.internal_row import InternalRow
37+
from pypaimon.pynative.util.reader_converter import ReaderConverter
38+
3339
if TYPE_CHECKING:
3440
import ray
3541
from duckdb.duckdb import DuckDBPyConnection
@@ -72,7 +78,12 @@ def __init__(self, j_table, catalog_options: dict):
7278

7379
def new_read_builder(self) -> 'ReadBuilder':
7480
j_read_builder = get_gateway().jvm.InvocationUtil.getReadBuilder(self._j_table)
75-
return ReadBuilder(j_read_builder, self._j_table.rowType(), self._catalog_options)
81+
if self._j_table.primaryKeys().isEmpty():
82+
primary_keys = None
83+
else:
84+
primary_keys = [str(key) for key in self._j_table.primaryKeys()]
85+
return ReadBuilder(j_read_builder, self._j_table.rowType(), self._catalog_options,
86+
primary_keys)
7687

7788
def new_batch_write_builder(self) -> 'BatchWriteBuilder':
7889
java_utils.check_batch_write(self._j_table)
@@ -82,16 +93,21 @@ def new_batch_write_builder(self) -> 'BatchWriteBuilder':
8293

8394
class ReadBuilder(read_builder.ReadBuilder):
8495

85-
def __init__(self, j_read_builder, j_row_type, catalog_options: dict):
96+
def __init__(self, j_read_builder, j_row_type, catalog_options: dict, primary_keys: List[str]):
8697
self._j_read_builder = j_read_builder
8798
self._j_row_type = j_row_type
8899
self._catalog_options = catalog_options
100+
self._primary_keys = primary_keys
101+
self._predicate = None
102+
self._projection = None
89103

90104
def with_filter(self, predicate: 'Predicate'):
105+
self._predicate = predicate
91106
self._j_read_builder.withFilter(predicate.to_j_predicate())
92107
return self
93108

94109
def with_projection(self, projection: List[str]) -> 'ReadBuilder':
110+
self._projection = projection
95111
field_names = list(map(lambda field: field.name(), self._j_row_type.getFields()))
96112
int_projection = list(map(lambda p: field_names.index(p), projection))
97113
gateway = get_gateway()
@@ -111,7 +127,8 @@ def new_scan(self) -> 'TableScan':
111127

112128
def new_read(self) -> 'TableRead':
113129
j_table_read = self._j_read_builder.newRead().executeFilter()
114-
return TableRead(j_table_read, self._j_read_builder.readType(), self._catalog_options)
130+
return TableRead(j_table_read, self._j_read_builder.readType(), self._catalog_options,
131+
self._predicate, self._projection, self._primary_keys)
115132

116133
def new_predicate_builder(self) -> 'PredicateBuilder':
117134
return PredicateBuilder(self._j_row_type)
@@ -185,14 +202,28 @@ def file_paths(self) -> List[str]:
185202

186203
class TableRead(table_read.TableRead):
187204

188-
def __init__(self, j_table_read, j_read_type, catalog_options):
205+
def __init__(self, j_table_read, j_read_type, catalog_options, predicate, projection,
206+
primary_keys: List[str]):
207+
self._j_table_read = j_table_read
208+
self._j_read_type = j_read_type
209+
self._catalog_options = catalog_options
210+
211+
self._predicate = predicate
212+
self._projection = projection
213+
self._primary_keys = primary_keys
214+
189215
self._arrow_schema = java_utils.to_arrow_schema(j_read_type)
190216
self._j_bytes_reader = get_gateway().jvm.InvocationUtil.createParallelBytesReader(
191217
j_table_read, j_read_type, TableRead._get_max_workers(catalog_options))
192218

193-
def to_arrow(self, splits):
194-
record_batch_reader = self.to_arrow_batch_reader(splits)
195-
return pa.Table.from_batches(record_batch_reader, schema=self._arrow_schema)
219+
def to_arrow(self, splits: List['Split']) -> pa.Table:
220+
record_generator = self.to_record_generator(splits)
221+
222+
if os.environ.get(constants.IMPLEMENT_MODE, '') != 'py4j' and record_generator is not None:
223+
return TableRead._generator_to_pyarrow_table(record_generator, self._arrow_schema)
224+
else:
225+
record_batch_reader = self.to_arrow_batch_reader(splits)
226+
return pa.Table.from_batches(record_batch_reader, schema=self._arrow_schema)
196227

197228
def to_arrow_batch_reader(self, splits):
198229
j_splits = list(map(lambda s: s.to_j_split(), splits))
@@ -219,6 +250,60 @@ def to_ray(self, splits: List[Split]) -> "ray.data.dataset.Dataset":
219250

220251
return ray.data.from_arrow(self.to_arrow(splits))
221252

253+
def to_record_generator(self, splits: List['Split']) -> Optional[Iterator[Any]]:
254+
"""
255+
Returns a generator for iterating over records in the table.
256+
If pynative reader is not available, returns None.
257+
"""
258+
try:
259+
j_splits = list(s.to_j_split() for s in splits)
260+
j_reader = get_gateway().jvm.InvocationUtil.createReader(self._j_table_read, j_splits)
261+
converter = ReaderConverter(self._predicate, self._projection, self._primary_keys)
262+
pynative_reader = converter.convert_java_reader(j_reader)
263+
264+
def _record_generator():
265+
try:
266+
batch = pynative_reader.read_batch()
267+
while batch is not None:
268+
record = batch.next()
269+
while record is not None:
270+
yield record
271+
record = batch.next()
272+
batch.release_batch()
273+
batch = pynative_reader.read_batch()
274+
finally:
275+
pynative_reader.close()
276+
277+
return _record_generator()
278+
279+
except PyNativeNotImplementedError as e:
280+
print(f"Generating pynative reader failed, will use py4j reader instead, "
281+
f"error message: {str(e)}")
282+
return None
283+
284+
@staticmethod
285+
def _generator_to_pyarrow_table(record_generator, arrow_schema):
286+
"""
287+
Converts a record generator into a pyarrow Table using the provided Arrow schema.
288+
"""
289+
record_batches = []
290+
current_batch = []
291+
batch_size = 1024 # Can be adjusted according to needs for batch size
292+
293+
for record in record_generator:
294+
record_dict = {field: record.get_field(i) for i, field in enumerate(arrow_schema.names)}
295+
current_batch.append(record_dict)
296+
if len(current_batch) >= batch_size:
297+
batch = pa.RecordBatch.from_pylist(current_batch, schema=arrow_schema)
298+
record_batches.append(batch)
299+
current_batch = []
300+
301+
if current_batch:
302+
batch = pa.RecordBatch.from_pylist(current_batch, schema=arrow_schema)
303+
record_batches.append(batch)
304+
305+
return pa.Table.from_batches(record_batches, schema=arrow_schema)
306+
222307
@staticmethod
223308
def _get_max_workers(catalog_options):
224309
# default is sequential
@@ -317,12 +402,16 @@ def close(self):
317402

318403
class Predicate(predicate.Predicate):
319404

320-
def __init__(self, j_predicate_bytes):
405+
def __init__(self, py_predicate: PyNativePredicate, j_predicate_bytes):
406+
self.py_predicate = py_predicate
321407
self._j_predicate_bytes = j_predicate_bytes
322408

323409
def to_j_predicate(self):
324410
return deserialize_java_object(self._j_predicate_bytes)
325411

412+
def test(self, record: InternalRow) -> bool:
413+
return self.py_predicate.test(record)
414+
326415

327416
class PredicateBuilder(predicate.PredicateBuilder):
328417

@@ -350,7 +439,8 @@ def _build(self, method: str, field: str, literals: Optional[List[Any]] = None):
350439
index,
351440
literals
352441
)
353-
return Predicate(serialize_java_object(j_predicate))
442+
return Predicate(PyNativePredicate(method, index, field, literals),
443+
serialize_java_object(j_predicate))
354444

355445
def equal(self, field: str, literal: Any) -> Predicate:
356446
return self._build('equal', field, [literal])
@@ -396,11 +486,13 @@ def between(self, field: str, included_lower_bound: Any, included_upper_bound: A
396486
return self._build('between', field, [included_lower_bound, included_upper_bound])
397487

398488
def and_predicates(self, predicates: List[Predicate]) -> Predicate:
399-
predicates = list(map(lambda p: p.to_j_predicate(), predicates))
400-
j_predicate = get_gateway().jvm.PredicationUtil.buildAnd(predicates)
401-
return Predicate(serialize_java_object(j_predicate))
489+
j_predicates = list(map(lambda p: p.to_j_predicate(), predicates))
490+
j_predicate = get_gateway().jvm.PredicationUtil.buildAnd(j_predicates)
491+
return Predicate(PyNativePredicate('and', None, None, predicates),
492+
serialize_java_object(j_predicate))
402493

403494
def or_predicates(self, predicates: List[Predicate]) -> Predicate:
404-
predicates = list(map(lambda p: p.to_j_predicate(), predicates))
405-
j_predicate = get_gateway().jvm.PredicationUtil.buildOr(predicates)
406-
return Predicate(serialize_java_object(j_predicate))
495+
j_predicates = list(map(lambda p: p.to_j_predicate(), predicates))
496+
j_predicate = get_gateway().jvm.PredicationUtil.buildOr(j_predicates)
497+
return Predicate(PyNativePredicate('or', None, None, predicates),
498+
serialize_java_object(j_predicate))

pypaimon/py4j/util/constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,3 +29,4 @@
2929

3030
# ------------------ for tests (Please don't use it) ------------------
3131
PYPAIMON4J_TEST_MODE = '_PYPAIMON4J_TEST_MODE'
32+
IMPLEMENT_MODE = '_IMPLEMENT_MODE'

pypaimon/pynative/__init__.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
################################################################################

pypaimon/pynative/common/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)