Skip to content

Commit 85f4b8f

Browse files
[python] support custom source split target size and split open file cost (apache#6527)
1 parent 0d55170 commit 85f4b8f

File tree

4 files changed

+356
-2
lines changed

4 files changed

+356
-2
lines changed

paimon-python/pypaimon/common/core_options.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
from enum import Enum
2020

21+
from pypaimon.common.memory_size import MemorySize
22+
2123

2224
class CoreOptions(str, Enum):
2325
"""Core options for paimon."""
@@ -48,6 +50,8 @@ def __str__(self):
4850
# Scan options
4951
SCAN_FALLBACK_BRANCH = "scan.fallback-branch"
5052
INCREMENTAL_BETWEEN_TIMESTAMP = "incremental-between-timestamp"
53+
SOURCE_SPLIT_TARGET_SIZE = "source.split.target-size"
54+
SOURCE_SPLIT_OPEN_FILE_COST = "source.split.open-file-cost"
5155
# Commit options
5256
COMMIT_USER_PREFIX = "commit.user-prefix"
5357
ROW_TRACKING_ENABLED = "row-tracking.enabled"
@@ -56,3 +60,19 @@ def __str__(self):
5660
@staticmethod
5761
def get_blob_as_descriptor(options: dict) -> bool:
5862
return options.get(CoreOptions.FILE_BLOB_AS_DESCRIPTOR, "false").lower() == 'true'
63+
64+
@staticmethod
65+
def get_split_target_size(options: dict) -> int:
66+
"""Get split target size from options, default to 128MB."""
67+
if CoreOptions.SOURCE_SPLIT_TARGET_SIZE in options:
68+
size_str = options[CoreOptions.SOURCE_SPLIT_TARGET_SIZE]
69+
return MemorySize.parse(size_str).get_bytes()
70+
return MemorySize.of_mebi_bytes(128).get_bytes()
71+
72+
@staticmethod
73+
def get_split_open_file_cost(options: dict) -> int:
74+
"""Get split open file cost from options, default to 4MB."""
75+
if CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST in options:
76+
cost_str = options[CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST]
77+
return MemorySize.parse(cost_str).get_bytes()
78+
return MemorySize.of_mebi_bytes(4).get_bytes()
Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
################################################################################
18+
19+
from typing import Optional
20+
21+
22+
class MemorySize:
23+
"""MemorySize is a representation of a number of bytes, viewable in different units."""
24+
25+
ZERO = None
26+
MAX_VALUE = None
27+
28+
def __init__(self, bytes: int):
29+
"""Constructs a new MemorySize."""
30+
if bytes < 0:
31+
raise ValueError("bytes must be >= 0")
32+
self.bytes = bytes
33+
34+
@staticmethod
35+
def of_mebi_bytes(mebi_bytes: int) -> 'MemorySize':
36+
return MemorySize(mebi_bytes << 20)
37+
38+
@staticmethod
39+
def of_kibi_bytes(kibi_bytes: int) -> 'MemorySize':
40+
return MemorySize(kibi_bytes << 10)
41+
42+
@staticmethod
43+
def of_bytes(bytes: int) -> 'MemorySize':
44+
return MemorySize(bytes)
45+
46+
def get_bytes(self) -> int:
47+
return self.bytes
48+
49+
def get_kibi_bytes(self) -> int:
50+
return self.bytes >> 10
51+
52+
def get_mebi_bytes(self) -> int:
53+
return self.bytes >> 20
54+
55+
def get_gibi_bytes(self) -> int:
56+
return self.bytes >> 30
57+
58+
def get_tebi_bytes(self) -> int:
59+
return self.bytes >> 40
60+
61+
def __eq__(self, other) -> bool:
62+
return isinstance(other, MemorySize) and self.bytes == other.bytes
63+
64+
def __hash__(self) -> int:
65+
return hash(self.bytes)
66+
67+
def __str__(self) -> str:
68+
return self.format_to_string()
69+
70+
def format_to_string(self) -> str:
71+
ORDERED_UNITS = [MemoryUnit.BYTES, MemoryUnit.KILO_BYTES, MemoryUnit.MEGA_BYTES,
72+
MemoryUnit.GIGA_BYTES, MemoryUnit.TERA_BYTES]
73+
74+
highest_integer_unit = MemoryUnit.BYTES
75+
for idx, unit in enumerate(ORDERED_UNITS):
76+
if self.bytes % unit.multiplier != 0:
77+
if idx == 0:
78+
highest_integer_unit = ORDERED_UNITS[0]
79+
else:
80+
highest_integer_unit = ORDERED_UNITS[idx - 1]
81+
break
82+
else:
83+
highest_integer_unit = MemoryUnit.BYTES
84+
85+
return f"{self.bytes // highest_integer_unit.multiplier} {highest_integer_unit.units[1]}"
86+
87+
def __repr__(self) -> str:
88+
return f"MemorySize({self.bytes})"
89+
90+
def __lt__(self, other: 'MemorySize') -> bool:
91+
return self.bytes < other.bytes
92+
93+
def __le__(self, other: 'MemorySize') -> bool:
94+
return self.bytes <= other.bytes
95+
96+
def __gt__(self, other: 'MemorySize') -> bool:
97+
return self.bytes > other.bytes
98+
99+
def __ge__(self, other: 'MemorySize') -> bool:
100+
return self.bytes >= other.bytes
101+
102+
@staticmethod
103+
def parse(text: str) -> 'MemorySize':
104+
return MemorySize(MemorySize.parse_bytes(text))
105+
106+
@staticmethod
107+
def parse_bytes(text: str) -> int:
108+
if text is None:
109+
raise ValueError("text cannot be None")
110+
111+
trimmed = text.strip()
112+
if not trimmed:
113+
raise ValueError("argument is an empty- or whitespace-only string")
114+
115+
pos = 0
116+
while pos < len(trimmed) and trimmed[pos].isdigit():
117+
pos += 1
118+
119+
number_str = trimmed[:pos]
120+
unit_str = trimmed[pos:].strip().lower()
121+
122+
if not number_str:
123+
raise ValueError("text does not start with a number")
124+
125+
try:
126+
value = int(number_str)
127+
except ValueError:
128+
raise ValueError(
129+
f"The value '{number_str}' cannot be represented as 64bit number (numeric overflow).")
130+
131+
unit = MemorySize._parse_unit(unit_str)
132+
multiplier = unit.multiplier if unit else 1
133+
result = value * multiplier
134+
135+
if result // multiplier != value:
136+
raise ValueError(
137+
f"The value '{text}' cannot be represented as 64bit number of bytes (numeric overflow).")
138+
139+
return result
140+
141+
@staticmethod
142+
def _parse_unit(unit_str: str) -> Optional['MemoryUnit']:
143+
if not unit_str:
144+
return None
145+
146+
for unit in [MemoryUnit.BYTES, MemoryUnit.KILO_BYTES, MemoryUnit.MEGA_BYTES,
147+
MemoryUnit.GIGA_BYTES, MemoryUnit.TERA_BYTES]:
148+
if unit_str in unit.units:
149+
return unit
150+
151+
raise ValueError(
152+
f"Memory size unit '{unit_str}' does not match any of the recognized units: "
153+
f"{MemoryUnit.get_all_units()}")
154+
155+
156+
class MemoryUnit:
157+
"""Enum which defines memory unit, mostly used to parse value from configuration file."""
158+
159+
def __init__(self, units: list, multiplier: int):
160+
self.units = units
161+
self.multiplier = multiplier
162+
163+
BYTES = None
164+
KILO_BYTES = None
165+
MEGA_BYTES = None
166+
GIGA_BYTES = None
167+
TERA_BYTES = None
168+
169+
@staticmethod
170+
def get_all_units() -> str:
171+
all_units = []
172+
for unit in [MemoryUnit.BYTES, MemoryUnit.KILO_BYTES, MemoryUnit.MEGA_BYTES,
173+
MemoryUnit.GIGA_BYTES, MemoryUnit.TERA_BYTES]:
174+
all_units.append("(" + " | ".join(unit.units) + ")")
175+
return " / ".join(all_units)
176+
177+
@staticmethod
178+
def has_unit(text: str) -> bool:
179+
if text is None:
180+
raise ValueError("text cannot be None")
181+
182+
trimmed = text.strip()
183+
if not trimmed:
184+
raise ValueError("argument is an empty- or whitespace-only string")
185+
186+
pos = 0
187+
while pos < len(trimmed) and trimmed[pos].isdigit():
188+
pos += 1
189+
190+
unit = trimmed[pos:].strip().lower()
191+
return len(unit) > 0
192+
193+
194+
MemoryUnit.BYTES = MemoryUnit(["b", "bytes"], 1)
195+
MemoryUnit.KILO_BYTES = MemoryUnit(["k", "kb", "kibibytes"], 1024)
196+
MemoryUnit.MEGA_BYTES = MemoryUnit(["m", "mb", "mebibytes"], 1024 * 1024)
197+
MemoryUnit.GIGA_BYTES = MemoryUnit(["g", "gb", "gibibytes"], 1024 * 1024 * 1024)
198+
MemoryUnit.TERA_BYTES = MemoryUnit(["t", "tb", "tebibytes"], 1024 * 1024 * 1024 * 1024)
199+
200+
MemorySize.ZERO = MemorySize(0)
201+
MemorySize.MAX_VALUE = MemorySize(2**63 - 1)

paimon-python/pypaimon/read/scanner/full_starting_scanner.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,9 @@ def __init__(self, table, predicate: Optional[Predicate], limit: Optional[int]):
5454
self.partition_key_predicate = trim_and_transform_predicate(
5555
self.predicate, self.table.field_names, self.table.partition_keys)
5656

57-
self.target_split_size = 128 * 1024 * 1024
58-
self.open_file_cost = 4 * 1024 * 1024
57+
# Get split target size and open file cost from table options
58+
self.target_split_size = CoreOptions.get_split_target_size(self.table.options)
59+
self.open_file_cost = CoreOptions.get_split_open_file_cost(self.table.options)
5960

6061
self.idx_of_this_subtask = None
6162
self.number_of_para_subtasks = None

paimon-python/pypaimon/tests/reader_base_test.py

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -675,3 +675,135 @@ def _test_value_stats_cols_case(self, manifest_manager, table, value_stats_cols,
675675
max_values)
676676

677677
self.assertEqual(read_entry.file.value_stats.null_counts, null_counts)
678+
679+
def test_split_target_size(self):
680+
"""Test source.split.target-size configuration effect on split generation."""
681+
from pypaimon.common.core_options import CoreOptions
682+
683+
pa_schema = pa.schema([
684+
('f0', pa.int64()),
685+
('f1', pa.string())
686+
])
687+
688+
# Test with small target_split_size (512B) - should generate more splits
689+
schema_small = Schema.from_pyarrow_schema(
690+
pa_schema,
691+
options={CoreOptions.SOURCE_SPLIT_TARGET_SIZE: '512b'}
692+
)
693+
self.catalog.create_table('default.test_split_target_size_small', schema_small, False)
694+
table_small = self.catalog.get_table('default.test_split_target_size_small')
695+
696+
for i in range(10):
697+
write_builder = table_small.new_batch_write_builder()
698+
table_write = write_builder.new_write()
699+
table_commit = write_builder.new_commit()
700+
data = pa.Table.from_pydict({
701+
'f0': list(range(i * 100, (i + 1) * 100)),
702+
'f1': [f'value_{j}' for j in range(i * 100, (i + 1) * 100)]
703+
}, schema=pa_schema)
704+
table_write.write_arrow(data)
705+
table_commit.commit(table_write.prepare_commit())
706+
table_write.close()
707+
table_commit.close()
708+
709+
read_builder = table_small.new_read_builder()
710+
splits_small = read_builder.new_scan().plan().splits()
711+
712+
schema_default = Schema.from_pyarrow_schema(pa_schema)
713+
self.catalog.create_table('default.test_split_target_size_default', schema_default, False)
714+
table_default = self.catalog.get_table('default.test_split_target_size_default')
715+
716+
for i in range(10):
717+
write_builder = table_default.new_batch_write_builder()
718+
table_write = write_builder.new_write()
719+
table_commit = write_builder.new_commit()
720+
data = pa.Table.from_pydict({
721+
'f0': list(range(i * 100, (i + 1) * 100)),
722+
'f1': [f'value_{j}' for j in range(i * 100, (i + 1) * 100)]
723+
}, schema=pa_schema)
724+
table_write.write_arrow(data)
725+
table_commit.commit(table_write.prepare_commit())
726+
table_write.close()
727+
table_commit.close()
728+
729+
# Generate splits with default target_split_size
730+
read_builder = table_default.new_read_builder()
731+
splits_default = read_builder.new_scan().plan().splits()
732+
733+
self.assertGreater(
734+
len(splits_small), len(splits_default),
735+
f"Small target_split_size should generate more splits. "
736+
f"Got {len(splits_small)} splits with 512B vs "
737+
f"{len(splits_default)} splits with default")
738+
739+
def test_split_open_file_cost(self):
740+
"""Test source.split.open-file-cost configuration effect on split generation."""
741+
from pypaimon.common.core_options import CoreOptions
742+
743+
pa_schema = pa.schema([
744+
('f0', pa.int64()),
745+
('f1', pa.string())
746+
])
747+
748+
# Test with large open_file_cost (64MB) - should generate more splits
749+
schema_large_cost = Schema.from_pyarrow_schema(
750+
pa_schema,
751+
options={
752+
CoreOptions.SOURCE_SPLIT_TARGET_SIZE: '128mb',
753+
CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST: '64mb'
754+
}
755+
)
756+
self.catalog.create_table('default.test_split_open_file_cost_large', schema_large_cost, False)
757+
table_large_cost = self.catalog.get_table('default.test_split_open_file_cost_large')
758+
759+
# Write multiple batches to create multiple files
760+
# Write 10 batches, each with 100 rows
761+
for i in range(10):
762+
write_builder = table_large_cost.new_batch_write_builder()
763+
table_write = write_builder.new_write()
764+
table_commit = write_builder.new_commit()
765+
data = pa.Table.from_pydict({
766+
'f0': list(range(i * 100, (i + 1) * 100)),
767+
'f1': [f'value_{j}' for j in range(i * 100, (i + 1) * 100)]
768+
}, schema=pa_schema)
769+
table_write.write_arrow(data)
770+
table_commit.commit(table_write.prepare_commit())
771+
table_write.close()
772+
table_commit.close()
773+
774+
# Generate splits with large open_file_cost
775+
read_builder = table_large_cost.new_read_builder()
776+
splits_large_cost = read_builder.new_scan().plan().splits()
777+
778+
# Test with default open_file_cost (4MB) - should generate fewer splits
779+
schema_default = Schema.from_pyarrow_schema(
780+
pa_schema,
781+
options={CoreOptions.SOURCE_SPLIT_TARGET_SIZE: '128mb'}
782+
)
783+
self.catalog.create_table('default.test_split_open_file_cost_default', schema_default, False)
784+
table_default = self.catalog.get_table('default.test_split_open_file_cost_default')
785+
786+
# Write same amount of data
787+
for i in range(10):
788+
write_builder = table_default.new_batch_write_builder()
789+
table_write = write_builder.new_write()
790+
table_commit = write_builder.new_commit()
791+
data = pa.Table.from_pydict({
792+
'f0': list(range(i * 100, (i + 1) * 100)),
793+
'f1': [f'value_{j}' for j in range(i * 100, (i + 1) * 100)]
794+
}, schema=pa_schema)
795+
table_write.write_arrow(data)
796+
table_commit.commit(table_write.prepare_commit())
797+
table_write.close()
798+
table_commit.close()
799+
800+
# Generate splits with default open_file_cost
801+
read_builder = table_default.new_read_builder()
802+
splits_default = read_builder.new_scan().plan().splits()
803+
804+
# With default open_file_cost (4MB), more files can be packed into each split
805+
self.assertGreater(
806+
len(splits_large_cost), len(splits_default),
807+
f"Large open_file_cost should generate more splits. "
808+
f"Got {len(splits_large_cost)} splits with 64MB cost vs "
809+
f"{len(splits_default)} splits with default")

0 commit comments

Comments
 (0)