Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
.idea/
!.idea/vcs.xml
.vscode/

# paimon-python-java-bridge
target
Expand Down
2 changes: 2 additions & 0 deletions dev/dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ setuptools>=18.0
wheel
py4j==0.10.9.7
pyarrow>=5.0.0
fastavro>=1.9.0
zstandard>=0.23.0
pandas>=1.3.0
numpy>=1.22.4
python-dateutil>=2.8.0,<3
Expand Down
Binary file not shown.
6 changes: 3 additions & 3 deletions paimon-python-java-bridge/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,22 @@

<groupId>org.apache.paimon</groupId>
<artifactId>paimon-python-java-bridge</artifactId>
<version>0.9-SNAPSHOT</version>
<version>1.0-SNAPSHOT</version>
<name>Paimon : Python-Java Bridge</name>

<packaging>jar</packaging>
<inceptionYear>2024</inceptionYear>

<properties>
<paimon.version>0.9.0</paimon.version>
<paimon.version>1.0.0</paimon.version>
<py4j.version>0.10.9.7</py4j.version>
<slf4j.version>1.7.32</slf4j.version>
<log4j.version>2.17.1</log4j.version>
<spotless.version>2.13.0</spotless.version>
<spotless.delimiter>package</spotless.delimiter>
<arrow.version>14.0.0</arrow.version>
<target.java.version>1.8</target.java.version>
<paimon.ci.tools.version>0.9.0</paimon.ci.tools.version>
<paimon.ci.tools.version>1.0.0</paimon.ci.tools.version>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ public class BytesWriter {

public BytesWriter(TableWrite tableWrite, RowType rowType) {
this.tableWrite = tableWrite;
this.arrowBatchReader = new ArrowBatchReader(rowType);
this.arrowBatchReader = new ArrowBatchReader(rowType, true);
this.allocator = new RootAllocator();
arrowFields =
rowType.getFields().stream()
.map(f -> ArrowUtils.toArrowField(f.name(), f.type()))
.map(f -> ArrowUtils.toArrowField(f.name(), f.id(), f.type(), 0))
.collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,22 @@

package org.apache.paimon.python;

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
import org.apache.paimon.reader.ReaderSupplier;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.TableWrite;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.RowType;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* Call some methods in Python directly will raise py4j.Py4JException: Method method([]) does not
* exist. This util is a workaround.
Expand All @@ -47,4 +56,17 @@ public static ParallelBytesReader createParallelBytesReader(
public static BytesWriter createBytesWriter(TableWrite tableWrite, RowType rowType) {
return new BytesWriter(tableWrite, rowType);
}

/**
* To resolve py4j bug: 'py4j.Py4JException: Method createReader([class java.util.ArrayList])
* does not exist'
*/
public static RecordReader<InternalRow> createReader(TableRead tableRead, List<Split> splits)
throws IOException {
List<ReaderSupplier<InternalRow>> readers = new ArrayList();
for (Split split : splits) {
readers.add(() -> tableRead.createReader(split));
}
return ConcatRecordReader.create(readers);
}
}
123 changes: 108 additions & 15 deletions pypaimon/py4j/java_implementation.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import os

# pypaimon.api implementation based on Java code & py4j lib

Expand All @@ -30,6 +31,11 @@
table_commit, Schema, predicate)
from typing import List, Iterator, Optional, Any, TYPE_CHECKING

from pypaimon.pynative.common.exception import PyNativeNotImplementedError
from pypaimon.pynative.common.predicate import PyNativePredicate
from pypaimon.pynative.common.row.internal_row import InternalRow
from pypaimon.pynative.util.reader_converter import ReaderConverter

if TYPE_CHECKING:
import ray
from duckdb.duckdb import DuckDBPyConnection
Expand Down Expand Up @@ -72,7 +78,12 @@ def __init__(self, j_table, catalog_options: dict):

def new_read_builder(self) -> 'ReadBuilder':
j_read_builder = get_gateway().jvm.InvocationUtil.getReadBuilder(self._j_table)
return ReadBuilder(j_read_builder, self._j_table.rowType(), self._catalog_options)
if self._j_table.primaryKeys().isEmpty():
primary_keys = None
else:
primary_keys = [str(key) for key in self._j_table.primaryKeys()]
return ReadBuilder(j_read_builder, self._j_table.rowType(), self._catalog_options,
primary_keys)

def new_batch_write_builder(self) -> 'BatchWriteBuilder':
java_utils.check_batch_write(self._j_table)
Expand All @@ -82,16 +93,21 @@ def new_batch_write_builder(self) -> 'BatchWriteBuilder':

class ReadBuilder(read_builder.ReadBuilder):

def __init__(self, j_read_builder, j_row_type, catalog_options: dict):
def __init__(self, j_read_builder, j_row_type, catalog_options: dict, primary_keys: List[str]):
self._j_read_builder = j_read_builder
self._j_row_type = j_row_type
self._catalog_options = catalog_options
self._primary_keys = primary_keys
self._predicate = None
self._projection = None

def with_filter(self, predicate: 'Predicate'):
self._predicate = predicate
self._j_read_builder.withFilter(predicate.to_j_predicate())
return self

def with_projection(self, projection: List[str]) -> 'ReadBuilder':
self._projection = projection
field_names = list(map(lambda field: field.name(), self._j_row_type.getFields()))
int_projection = list(map(lambda p: field_names.index(p), projection))
gateway = get_gateway()
Expand All @@ -111,7 +127,8 @@ def new_scan(self) -> 'TableScan':

def new_read(self) -> 'TableRead':
j_table_read = self._j_read_builder.newRead().executeFilter()
return TableRead(j_table_read, self._j_read_builder.readType(), self._catalog_options)
return TableRead(j_table_read, self._j_read_builder.readType(), self._catalog_options,
self._predicate, self._projection, self._primary_keys)

def new_predicate_builder(self) -> 'PredicateBuilder':
return PredicateBuilder(self._j_row_type)
Expand Down Expand Up @@ -185,14 +202,29 @@ def file_paths(self) -> List[str]:

class TableRead(table_read.TableRead):

def __init__(self, j_table_read, j_read_type, catalog_options):
def __init__(self, j_table_read, j_read_type, catalog_options, predicate, projection,
primary_keys: List[str]):
self._j_table_read = j_table_read
self._j_read_type = j_read_type
self._catalog_options = catalog_options

self._predicate = predicate
self._projection = projection
self._primary_keys = primary_keys

self._arrow_schema = java_utils.to_arrow_schema(j_read_type)
self._j_bytes_reader = get_gateway().jvm.InvocationUtil.createParallelBytesReader(
j_table_read, j_read_type, TableRead._get_max_workers(catalog_options))

def to_arrow(self, splits):
record_batch_reader = self.to_arrow_batch_reader(splits)
return pa.Table.from_batches(record_batch_reader, schema=self._arrow_schema)
def to_arrow(self, splits: List['Split']) -> pa.Table:
record_generator = self.to_record_generator(splits)

# If necessary, set the env constants.IMPLEMENT_MODE to 'py4j' to forcibly use py4j reader
if os.environ.get(constants.IMPLEMENT_MODE, '') != 'py4j' and record_generator is not None:
return TableRead._iterator_to_pyarrow_table(record_generator, self._arrow_schema)
else:
record_batch_reader = self.to_arrow_batch_reader(splits)
return pa.Table.from_batches(record_batch_reader, schema=self._arrow_schema)

def to_arrow_batch_reader(self, splits):
j_splits = list(map(lambda s: s.to_j_split(), splits))
Expand All @@ -219,6 +251,60 @@ def to_ray(self, splits: List[Split]) -> "ray.data.dataset.Dataset":

return ray.data.from_arrow(self.to_arrow(splits))

def to_record_generator(self, splits: List['Split']) -> Optional[Iterator[Any]]:
"""
Returns a generator for iterating over records in the table.
If pynative reader is not available, returns None.
"""
try:
j_splits = list(s.to_j_split() for s in splits)
j_reader = get_gateway().jvm.InvocationUtil.createReader(self._j_table_read, j_splits)
converter = ReaderConverter(self._predicate, self._projection, self._primary_keys)
pynative_reader = converter.convert_java_reader(j_reader)

def _record_generator():
try:
batch = pynative_reader.read_batch()
while batch is not None:
record = batch.next()
while record is not None:
yield record
record = batch.next()
batch.release_batch()
batch = pynative_reader.read_batch()
finally:
pynative_reader.close()

return _record_generator()

except PyNativeNotImplementedError as e:
print(f"Generating pynative reader failed, will use py4j reader instead, "
f"error message: {str(e)}")
return None

@staticmethod
def _iterator_to_pyarrow_table(record_generator, arrow_schema):
"""
Converts a record generator into a pyarrow Table using the provided Arrow schema.
"""
record_batches = []
current_batch = []
batch_size = 1024 # Can be adjusted according to needs for batch size

for record in record_generator:
record_dict = {field: record.get_field(i) for i, field in enumerate(arrow_schema.names)}
current_batch.append(record_dict)
if len(current_batch) >= batch_size:
batch = pa.RecordBatch.from_pylist(current_batch, schema=arrow_schema)
record_batches.append(batch)
current_batch = []

if current_batch:
batch = pa.RecordBatch.from_pylist(current_batch, schema=arrow_schema)
record_batches.append(batch)

return pa.Table.from_batches(record_batches, schema=arrow_schema)

@staticmethod
def _get_max_workers(catalog_options):
# default is sequential
Expand Down Expand Up @@ -317,12 +403,16 @@ def close(self):

class Predicate(predicate.Predicate):

def __init__(self, j_predicate_bytes):
def __init__(self, py_predicate: PyNativePredicate, j_predicate_bytes):
self.py_predicate = py_predicate
self._j_predicate_bytes = j_predicate_bytes

def to_j_predicate(self):
return deserialize_java_object(self._j_predicate_bytes)

def test(self, record: InternalRow) -> bool:
return self.py_predicate.test(record)


class PredicateBuilder(predicate.PredicateBuilder):

Expand Down Expand Up @@ -350,7 +440,8 @@ def _build(self, method: str, field: str, literals: Optional[List[Any]] = None):
index,
literals
)
return Predicate(serialize_java_object(j_predicate))
return Predicate(PyNativePredicate(method, index, field, literals),
serialize_java_object(j_predicate))

def equal(self, field: str, literal: Any) -> Predicate:
return self._build('equal', field, [literal])
Expand Down Expand Up @@ -396,11 +487,13 @@ def between(self, field: str, included_lower_bound: Any, included_upper_bound: A
return self._build('between', field, [included_lower_bound, included_upper_bound])

def and_predicates(self, predicates: List[Predicate]) -> Predicate:
predicates = list(map(lambda p: p.to_j_predicate(), predicates))
j_predicate = get_gateway().jvm.PredicationUtil.buildAnd(predicates)
return Predicate(serialize_java_object(j_predicate))
j_predicates = list(map(lambda p: p.to_j_predicate(), predicates))
j_predicate = get_gateway().jvm.PredicationUtil.buildAnd(j_predicates)
return Predicate(PyNativePredicate('and', None, None, predicates),
serialize_java_object(j_predicate))

def or_predicates(self, predicates: List[Predicate]) -> Predicate:
predicates = list(map(lambda p: p.to_j_predicate(), predicates))
j_predicate = get_gateway().jvm.PredicationUtil.buildOr(predicates)
return Predicate(serialize_java_object(j_predicate))
j_predicates = list(map(lambda p: p.to_j_predicate(), predicates))
j_predicate = get_gateway().jvm.PredicationUtil.buildOr(j_predicates)
return Predicate(PyNativePredicate('or', None, None, predicates),
serialize_java_object(j_predicate))
1 change: 1 addition & 0 deletions pypaimon/py4j/util/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@

# ------------------ for tests (Please don't use it) ------------------
PYPAIMON4J_TEST_MODE = '_PYPAIMON4J_TEST_MODE'
IMPLEMENT_MODE = '_IMPLEMENT_MODE'
17 changes: 17 additions & 0 deletions pypaimon/pynative/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
17 changes: 17 additions & 0 deletions pypaimon/pynative/common/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
21 changes: 21 additions & 0 deletions pypaimon/pynative/common/exception.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

class PyNativeNotImplementedError(NotImplementedError):
""" Method or function hasn't been implemented by py-native paimon yet. """
pass
Loading