Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions paimon-python/pypaimon/manifest/schema/data_file_meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ def copy_without_stats(self) -> 'DataFileMeta':
file_path=self.file_path
)

@staticmethod
def is_blob_file(file_name: str) -> bool:
return file_name.endswith(".blob")

def assign_first_row_id(self, first_row_id: int) -> 'DataFileMeta':
"""Create a new DataFileMeta with the assigned first_row_id."""
return DataFileMeta(
Expand Down
7 changes: 1 addition & 6 deletions paimon-python/pypaimon/read/reader/field_bunch.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def __init__(self, expected_row_count: int):

def add(self, file: DataFileMeta) -> None:
"""Add a blob file to this bunch."""
if not self._is_blob_file(file.file_name):
if not DataFileMeta.is_blob_file(file.file_name):
raise ValueError("Only blob file can be added to a blob bunch.")

if file.first_row_id == self.latest_first_row_id:
Expand Down Expand Up @@ -113,8 +113,3 @@ def row_count(self) -> int:

def files(self) -> List[DataFileMeta]:
return self._files

@staticmethod
def _is_blob_file(file_name: str) -> bool:
"""Check if a file is a blob file based on its extension."""
return file_name.endswith('.blob')
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def sort_key(manifest_entry: ManifestEntry) -> tuple:
if manifest_entry.file.first_row_id is not None
else float('-inf')
)
is_blob = 1 if self._is_blob_file(manifest_entry.file.file_name) else 0
is_blob = 1 if DataFileMeta.is_blob_file(manifest_entry.file.file_name) else 0
max_seq = manifest_entry.file.max_sequence_number
return first_row_id, is_blob, -max_seq

Expand Down Expand Up @@ -341,7 +341,7 @@ def _split_by_row_id(self, files: List[DataFileMeta]) -> List[List[DataFileMeta]
split_by_row_id.append([file])
continue

if not self._is_blob_file(file.file_name) and first_row_id != last_row_id:
if not DataFileMeta.is_blob_file(file.file_name) and first_row_id != last_row_id:
if current_split:
split_by_row_id.append(current_split)

Expand Down Expand Up @@ -382,7 +382,7 @@ def _compute_slice_split_file_idx_map(
current_pos = file_end_pos
data_file_infos = []
for file in split.files:
if self._is_blob_file(file.file_name):
if DataFileMeta.is_blob_file(file.file_name):
continue
file_begin_pos = current_pos
current_pos += file.row_count
Expand All @@ -402,7 +402,7 @@ def _compute_slice_split_file_idx_map(

# Second pass: only blob files (data files already in shard_file_idx_map from first pass)
for file in split.files:
if not self._is_blob_file(file.file_name):
if not DataFileMeta.is_blob_file(file.file_name):
continue
blob_first_row_id = file.first_row_id if file.first_row_id is not None else 0
data_file_range = None
Expand Down Expand Up @@ -489,7 +489,7 @@ def _filter_blob(files: List[DataFileMeta]) -> List[DataFileMeta]:
row_id_end = -1

for file in files:
if not DataEvolutionSplitGenerator._is_blob_file(file.file_name):
if not DataFileMeta.is_blob_file(file.file_name):
if file.first_row_id is not None:
row_id_start = file.first_row_id
row_id_end = file.first_row_id + file.row_count
Expand Down
5 changes: 0 additions & 5 deletions paimon-python/pypaimon/read/scanner/split_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,3 @@ def _compute_file_range(
return -1, -1
# File is completely within the shard range
return None

@staticmethod
def _is_blob_file(file_name: str) -> bool:
"""Check if a file is a blob file."""
return file_name.endswith('.blob')
13 changes: 4 additions & 9 deletions paimon-python/pypaimon/read/split_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ def _split_by_row_id(self, files: List[DataFileMeta]) -> List[List[DataFileMeta]
# Sort files by firstRowId and then by maxSequenceNumber
def sort_key(file: DataFileMeta) -> tuple:
first_row_id = file.first_row_id if file.first_row_id is not None else float('-inf')
is_blob = 1 if self._is_blob_file(file.file_name) else 0
is_blob = 1 if DataFileMeta.is_blob_file(file.file_name) else 0
max_seq = file.max_sequence_number
return (first_row_id, is_blob, -max_seq)

Expand All @@ -493,7 +493,7 @@ def sort_key(file: DataFileMeta) -> tuple:
split_by_row_id.append([file])
continue

if not self._is_blob_file(file.file_name) and first_row_id != last_row_id:
if not DataFileMeta.is_blob_file(file.file_name) and first_row_id != last_row_id:
if current_split:
split_by_row_id.append(current_split)
if first_row_id < check_row_id_start:
Expand Down Expand Up @@ -541,7 +541,7 @@ def _create_union_reader(self, need_merge_files: List[DataFileMeta]) -> RecordRe
first_file = bunch.files()[0]

# Get field IDs for this bunch
if self._is_blob_file(first_file.file_name):
if DataFileMeta.is_blob_file(first_file.file_name):
# For blob files, we need to get the field ID from the write columns
field_ids = [self._get_field_id_from_write_cols(first_file)]
elif first_file.write_cols:
Expand Down Expand Up @@ -615,7 +615,7 @@ def _split_field_bunches(self, need_merge_files: List[DataFileMeta]) -> List[Fie
row_count = -1

for file in need_merge_files:
if self._is_blob_file(file.file_name):
if DataFileMeta.is_blob_file(file.file_name):
field_id = self._get_field_id_from_write_cols(file)
if field_id not in blob_bunch_map:
blob_bunch_map[field_id] = BlobBunch(row_count)
Expand Down Expand Up @@ -650,10 +650,5 @@ def _get_field_ids_from_write_cols(self, write_cols: List[str]) -> List[int]:
field_ids.append(SpecialFields.SEQUENCE_NUMBER.id)
return field_ids

@staticmethod
def _is_blob_file(file_name: str) -> bool:
"""Check if a file is a blob file based on its extension."""
return file_name.endswith('.blob')

def _get_all_data_fields(self):
return SpecialFields.row_type_with_row_tracking(self.table.fields)
8 changes: 2 additions & 6 deletions paimon-python/pypaimon/write/file_store_commit.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from pypaimon.common.predicate_builder import PredicateBuilder
from pypaimon.manifest.manifest_file_manager import ManifestFileManager
from pypaimon.manifest.manifest_list_manager import ManifestListManager
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
from pypaimon.manifest.schema.manifest_entry import ManifestEntry
from pypaimon.manifest.schema.manifest_file_meta import ManifestFileMeta
from pypaimon.manifest.schema.simple_stats import SimpleStats
Expand Down Expand Up @@ -648,7 +649,7 @@ def _assign_row_tracking_meta(self, first_row_id_start: int, commit_entries: Lis
entry.file.file_source == 0 and # APPEND file source
entry.file.first_row_id is None): # No existing first_row_id

if self._is_blob_file(entry.file.file_name):
if DataFileMeta.is_blob_file(entry.file.file_name):
# Handle blob files specially
if blob_start >= start:
raise RuntimeError(
Expand All @@ -669,8 +670,3 @@ def _assign_row_tracking_meta(self, first_row_id_start: int, commit_entries: Lis
row_id_assigned.append(entry)

return row_id_assigned, start

@staticmethod
def _is_blob_file(file_name: str) -> bool:
"""Check if a file is a blob file based on its extension."""
return file_name.endswith('.blob')