Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 19 additions & 15 deletions paimon-python/pypaimon/common/file_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import uuid
from abc import ABC, abstractmethod
from pathlib import Path
from typing import List, Optional
from typing import Dict, List, Optional

import pyarrow # noqa: F401
import pyarrow.fs as pafs
Expand All @@ -31,19 +31,19 @@ class FileIO(ABC):
"""
File IO interface to read and write files.
"""

@abstractmethod
def new_input_stream(self, path: str):
pass

@abstractmethod
def new_output_stream(self, path: str):
pass

@abstractmethod
def get_file_status(self, path: str):
pass

@abstractmethod
def list_status(self, path: str):
pass
Expand All @@ -52,18 +52,22 @@ def list_status(self, path: str):
def exists(self, path: str) -> bool:
pass

def exists_batch(self, paths: List[str]) -> Dict[str, bool]:
"""Check existence of multiple paths, returning {path: bool}."""
return {path: self.exists(path) for path in paths}

@abstractmethod
def delete(self, path: str, recursive: bool = False) -> bool:
pass

@abstractmethod
def mkdirs(self, path: str) -> bool:
pass

@abstractmethod
def rename(self, src: str, dst: str) -> bool:
pass

def delete_quietly(self, path: str):
logger = logging.getLogger(__name__)
if logger.isEnabledFor(logging.DEBUG):
Expand Down Expand Up @@ -115,7 +119,7 @@ def try_to_write_atomic(self, path: str, content: str) -> bool:
if self.exists(path):
if self.is_dir(path):
return False

temp_path = path + str(uuid.uuid4()) + ".tmp"
success = False
try:
Expand Down Expand Up @@ -143,7 +147,7 @@ def copy_file(self, source_path: str, target_path: str, overwrite: bool = False)

target_str = self.to_filesystem_path(target_path)
target_parent = Path(target_str).parent

if str(target_parent) and not self.exists(str(target_parent)):
self.mkdirs(str(target_parent))

Expand Down Expand Up @@ -191,8 +195,8 @@ def to_filesystem_path(self, path: str) -> str:
return path

def parse_location(self, location: str):
from urllib.parse import urlparse
import os
from urllib.parse import urlparse

uri = urlparse(location)
if not uri.scheme:
Expand Down Expand Up @@ -249,10 +253,10 @@ def write_lance(self, path: str, data, **kwargs):
def write_blob(self, path: str, data, **kwargs):
"""Write Blob format file."""
raise NotImplementedError("write_blob must be implemented by FileIO subclasses")

def close(self):
pass

@staticmethod
def get(path: str, catalog_options: Optional[Options] = None) -> 'FileIO':
"""
Expand All @@ -261,13 +265,13 @@ def get(path: str, catalog_options: Optional[Options] = None) -> 'FileIO':
- PyArrowFileIO for remote file systems (oss://, s3://, hdfs://, etc.)
"""
from urllib.parse import urlparse

uri = urlparse(path)
scheme = uri.scheme

if not scheme or scheme == "file":
from pypaimon.filesystem.local_file_io import LocalFileIO
return LocalFileIO(path, catalog_options)

from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO
return PyArrowFileIO(path, catalog_options or Options({}))
50 changes: 43 additions & 7 deletions paimon-python/pypaimon/common/identifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,49 @@ def create(cls, database: str, object: str) -> "Identifier":

@classmethod
def from_string(cls, full_name: str) -> "Identifier":
parts = full_name.split(".")
if len(parts) == 2:
return cls(parts[0], parts[1])
elif len(parts) == 3:
return cls(parts[0], parts[1], parts[2])
else:
raise ValueError("Invalid identifier format: {}".format(full_name))
"""Parse a 'database.object' identifier, with optional backtick quoting."""
if not full_name or not full_name.strip():
raise ValueError("fullName cannot be null or empty")

# Check if backticks are used - if so, parse with backtick support
if '`' in full_name:
return cls._parse_with_backticks(full_name)

# Otherwise, use Java-compatible split on first period only
parts = full_name.split(".", 1)

if len(parts) != 2:
raise ValueError(
f"Cannot get splits from '{full_name}' to get database and object"
)

return cls(parts[0], parts[1])

@classmethod
def _parse_with_backticks(cls, full_name: str) -> "Identifier":
parts = []
current = ""
in_backticks = False

for char in full_name:
if char == '`':
in_backticks = not in_backticks
elif char == '.' and not in_backticks:
parts.append(current)
current = ""
else:
current += char

if current:
parts.append(current)

if in_backticks:
raise ValueError(f"Unclosed backtick in identifier: {full_name}")

if len(parts) != 2:
raise ValueError(f"Invalid identifier format: {full_name}")

return cls(parts[0], parts[1])

def get_full_name(self) -> str:
if self.branch:
Expand Down
52 changes: 34 additions & 18 deletions paimon-python/pypaimon/filesystem/pyarrow_file_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@
from pypaimon.common.options import Options
from pypaimon.common.options.config import OssOptions, S3Options
from pypaimon.common.uri_reader import UriReaderFactory
from pypaimon.schema.data_types import DataField, AtomicType, PyarrowFieldParser
from pypaimon.table.row.blob import BlobData, BlobDescriptor, Blob
from pypaimon.schema.data_types import (AtomicType, DataField,
PyarrowFieldParser)
from pypaimon.table.row.blob import Blob, BlobData, BlobDescriptor
from pypaimon.table.row.generic_row import GenericRow
from pypaimon.table.row.row_kind import RowKind
from pypaimon.write.blob_format_writer import BlobFormatWriter
Expand Down Expand Up @@ -189,7 +190,7 @@ def new_output_stream(self, path: str):
parent_dir = '/'.join(path_str.split('/')[:-1])
else:
parent_dir = ''

if parent_dir and not self.exists(parent_dir):
self.mkdirs(parent_dir)
else:
Expand All @@ -214,10 +215,10 @@ def _get_file_info(self, path_str: str):
def get_file_status(self, path: str):
path_str = self.to_filesystem_path(path)
file_info = self._get_file_info(path_str)

if file_info.type == pafs.FileType.NotFound:
raise FileNotFoundError(f"File {path} (resolved as {path_str}) does not exist")

return file_info

def list_status(self, path: str):
Expand All @@ -233,13 +234,25 @@ def exists(self, path: str) -> bool:
path_str = self.to_filesystem_path(path)
return self._get_file_info(path_str).type != pafs.FileType.NotFound

def exists_batch(self, paths: List[str]) -> Dict[str, bool]:
"""Check existence of multiple paths in a single batched API call."""
if not paths:
return {}

path_strs = [self.to_filesystem_path(p) for p in paths]
file_infos = self.filesystem.get_file_info(path_strs)
return {
paths[i]: info.type != pyarrow.fs.FileType.NotFound
for i, info in enumerate(file_infos)
}

def delete(self, path: str, recursive: bool = False) -> bool:
path_str = self.to_filesystem_path(path)
file_info = self._get_file_info(path_str)

if file_info.type == pafs.FileType.NotFound:
return False

if file_info.type == pafs.FileType.Directory:
if not recursive:
selector = pafs.FileSelector(path_str, recursive=False, allow_not_found=True)
Expand All @@ -258,15 +271,15 @@ def delete(self, path: str, recursive: bool = False) -> bool:
def mkdirs(self, path: str) -> bool:
path_str = self.to_filesystem_path(path)
file_info = self._get_file_info(path_str)

if file_info.type == pafs.FileType.NotFound:
self.filesystem.create_dir(path_str, recursive=True)
return True
if file_info.type == pafs.FileType.Directory:
return True
elif file_info.type == pafs.FileType.File:
raise FileExistsError(f"Path exists but is not a directory: {path}")

self.filesystem.create_dir(path_str, recursive=True)
return True

Expand All @@ -275,13 +288,13 @@ def rename(self, src: str, dst: str) -> bool:
dst_parent = Path(dst_str).parent
if str(dst_parent) and not self.exists(str(dst_parent)):
self.mkdirs(str(dst_parent))

src_str = self.to_filesystem_path(src)

try:
if hasattr(self.filesystem, 'rename'):
return self.filesystem.rename(src_str, dst_str)

dst_file_info = self._get_file_info(dst_str)
if dst_file_info.type != pafs.FileType.NotFound:
if dst_file_info.type == pafs.FileType.File:
Expand All @@ -293,7 +306,7 @@ def rename(self, src: str, dst: str) -> bool:
final_dst_info = self._get_file_info(dst_str)
if final_dst_info.type != pafs.FileType.NotFound:
return False

self.filesystem.move(src_str, dst_str)
return True
except FileNotFoundError:
Expand Down Expand Up @@ -331,7 +344,7 @@ def try_to_write_atomic(self, path: str, content: str) -> bool:
file_info = self._get_file_info(path_str)
if file_info.type == pafs.FileType.Directory:
return False

temp_path = path + str(uuid.uuid4()) + ".tmp"
success = False
try:
Expand All @@ -349,7 +362,7 @@ def copy_file(self, source_path: str, target_path: str, overwrite: bool = False)
source_str = self.to_filesystem_path(source_path)
target_str = self.to_filesystem_path(target_path)
target_parent = Path(target_str).parent

if str(target_parent) and not self.exists(str(target_parent)):
self.mkdirs(str(target_parent))

Expand All @@ -373,13 +386,14 @@ def write_orc(self, path: str, data: pyarrow.Table, compression: str = 'zstd',
zstd_level: int = 1, **kwargs):
try:
"""Write ORC file using PyArrow ORC writer.

Note: PyArrow's ORC writer doesn't support compression_level parameter.
ORC files will use zstd compression with default level
(which is 3, see https://github.com/facebook/zstd/blob/dev/programs/zstdcli.c)
instead of the specified level.
"""
import sys

import pyarrow.orc as orc

data = self._cast_time_columns_for_orc(data)
Expand Down Expand Up @@ -434,7 +448,7 @@ def record_generator():
'zstd': 'zstandard', # zstd is commonly used in Paimon
}
compression_lower = compression.lower()

codec = codec_map.get(compression_lower)
if codec is None:
raise ValueError(
Expand All @@ -450,6 +464,7 @@ def record_generator():
def write_lance(self, path: str, data: pyarrow.Table, **kwargs):
try:
import lance

from pypaimon.read.reader.lance_utils import to_lance_specified
file_path_for_lance, storage_options = to_lance_specified(self, path)

Expand Down Expand Up @@ -516,9 +531,10 @@ def write_blob(self, path: str, data: pyarrow.Table, **kwargs):
raise RuntimeError(f"Failed to write blob file {path}: {e}") from e

def to_filesystem_path(self, path: str) -> str:
from pyarrow.fs import S3FileSystem
import re

from pyarrow.fs import S3FileSystem

parsed = urlparse(path)
normalized_path = re.sub(r'/+', '/', parsed.path) if parsed.path else ''

Expand Down
16 changes: 15 additions & 1 deletion paimon-python/pypaimon/manifest/manifest_list_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from typing import List, Optional

import fastavro

from pypaimon.manifest.schema.manifest_file_meta import (
MANIFEST_FILE_META_SCHEMA, ManifestFileMeta)
from pypaimon.manifest.schema.simple_stats import SimpleStats
Expand All @@ -41,6 +40,7 @@ def __init__(self, table):
self.file_io = self.table.file_io

def read_all(self, snapshot: Optional[Snapshot]) -> List[ManifestFileMeta]:
"""Read base + delta manifest lists for full file state."""
if snapshot is None:
return []
manifest_files = []
Expand All @@ -50,10 +50,24 @@ def read_all(self, snapshot: Optional[Snapshot]) -> List[ManifestFileMeta]:
manifest_files.extend(delta_manifests)
return manifest_files

def read_base(self, snapshot: Snapshot) -> List[ManifestFileMeta]:
"""Read only the base manifest list for the given snapshot."""
return self.read(snapshot.base_manifest_list)

def read_delta(self, snapshot: Snapshot) -> List[ManifestFileMeta]:
return self.read(snapshot.delta_manifest_list)

def read_changelog(self, snapshot: Snapshot) -> List[ManifestFileMeta]:
"""Read changelog manifest files from snapshot, or empty list if none."""
if snapshot.changelog_manifest_list is None:
return []
return self.read(snapshot.changelog_manifest_list)

def read(self, manifest_list_name: str) -> List[ManifestFileMeta]:
return self._read_from_storage(manifest_list_name)

def _read_from_storage(self, manifest_list_name: str) -> List[ManifestFileMeta]:
"""Read manifest list from storage."""
manifest_files = []

manifest_list_path = f"{self.manifest_path}/{manifest_list_name}"
Expand Down
Loading
Loading