Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions paimon-python/pypaimon/common/file_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
from pathlib import Path
from typing import List, Optional

import pyarrow
import pyarrow # noqa: F401
import pyarrow.fs as pafs

from pypaimon.common.options import Options

Expand Down Expand Up @@ -97,7 +98,7 @@ def get_file_size(self, path: str) -> int:

def is_dir(self, path: str) -> bool:
file_info = self.get_file_status(path)
return file_info.type == pyarrow.fs.FileType.Directory
return file_info.type == pafs.FileType.Directory

def check_or_mkdirs(self, path: str):
if self.exists(path):
Expand Down Expand Up @@ -153,7 +154,7 @@ def copy_file(self, source_path: str, target_path: str, overwrite: bool = False)
def copy_files(self, source_directory: str, target_directory: str, overwrite: bool = False):
file_infos = self.list_status(source_directory)
for file_info in file_infos:
if file_info.type == pyarrow.fs.FileType.File:
if file_info.type == pafs.FileType.File:
source_file = file_info.path
file_name = source_file.split('/')[-1]
target_file = f"{target_directory.rstrip('/')}/{file_name}" if target_directory else file_name
Expand Down
8 changes: 4 additions & 4 deletions paimon-python/pypaimon/filesystem/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.

import threading
import pyarrow
import pyarrow.fs as pafs
from pathlib import Path
from pyarrow._fs import LocalFileSystem

Expand All @@ -29,15 +29,15 @@ def rename(self, src, dst):
try:
with PaimonLocalFileSystem.rename_lock:
dst_file_info = self.get_file_info([dst])[0]
if dst_file_info.type != pyarrow.fs.FileType.NotFound:
if dst_file_info.type == pyarrow.fs.FileType.File:
if dst_file_info.type != pafs.FileType.NotFound:
if dst_file_info.type == pafs.FileType.File:
return False
# Make it compatible with HadoopFileIO: if dst is an existing directory,
# dst=dst/srcFileName
src_name = Path(src).name
dst = str(Path(dst) / src_name)
final_dst_info = self.get_file_info([dst])[0]
if final_dst_info.type != pyarrow.fs.FileType.NotFound:
if final_dst_info.type != pafs.FileType.NotFound:
return False

# Perform atomic move
Expand Down
8 changes: 4 additions & 4 deletions paimon-python/pypaimon/filesystem/local_file_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from urllib.parse import urlparse

import pyarrow
import pyarrow.fs
import pyarrow.fs as pafs

from pypaimon.common.file_io import FileIO
from pypaimon.common.options import Options
Expand Down Expand Up @@ -119,9 +119,9 @@ def __init__(self, file_path: Path, original_path: str):
self.original_path = original_path
self.size = stat_info.st_size if file_path.is_file() else None
self.type = (
pyarrow.fs.FileType.Directory if file_path.is_dir()
else pyarrow.fs.FileType.File if file_path.is_file()
else pyarrow.fs.FileType.NotFound
pafs.FileType.Directory if file_path.is_dir()
else pafs.FileType.File if file_path.is_file()
else pafs.FileType.NotFound
)
self.mtime = stat_info.st_mtime

Expand Down
42 changes: 18 additions & 24 deletions paimon-python/pypaimon/filesystem/pyarrow_file_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from urllib.parse import splitport, urlparse

import pyarrow
import pyarrow.fs as pafs
from packaging.version import parse
from pyarrow._fs import FileSystem

Expand Down Expand Up @@ -80,8 +81,7 @@ def _create_s3_retry_config(
'connect_timeout': connect_timeout
}
try:
from pyarrow.fs import AwsStandardS3RetryStrategy
retry_strategy = AwsStandardS3RetryStrategy(max_attempts=max_attempts)
retry_strategy = pafs.AwsStandardS3RetryStrategy(max_attempts=max_attempts)
config['retry_strategy'] = retry_strategy
except ImportError:
pass
Expand Down Expand Up @@ -110,8 +110,6 @@ def _extract_oss_bucket(self, location) -> str:
return bucket

def _initialize_oss_fs(self, path) -> FileSystem:
from pyarrow.fs import S3FileSystem

client_kwargs = {
"access_key": self.properties.get(OssOptions.OSS_ACCESS_KEY_ID),
"secret_key": self.properties.get(OssOptions.OSS_ACCESS_KEY_SECRET),
Expand All @@ -129,11 +127,9 @@ def _initialize_oss_fs(self, path) -> FileSystem:
retry_config = self._create_s3_retry_config()
client_kwargs.update(retry_config)

return S3FileSystem(**client_kwargs)
return pafs.S3FileSystem(**client_kwargs)

def _initialize_s3_fs(self) -> FileSystem:
from pyarrow.fs import S3FileSystem

client_kwargs = {
"endpoint_override": self.properties.get(S3Options.S3_ENDPOINT),
"access_key": self.properties.get(S3Options.S3_ACCESS_KEY_ID),
Expand All @@ -147,11 +143,9 @@ def _initialize_s3_fs(self) -> FileSystem:
retry_config = self._create_s3_retry_config()
client_kwargs.update(retry_config)

return S3FileSystem(**client_kwargs)
return pafs.S3FileSystem(**client_kwargs)

def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem:
from pyarrow.fs import HadoopFileSystem

if 'HADOOP_HOME' not in os.environ:
raise RuntimeError("HADOOP_HOME environment variable is not set.")
if 'HADOOP_CONF_DIR' not in os.environ:
Expand All @@ -170,7 +164,7 @@ def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem:
os.environ['CLASSPATH'] = class_paths.stdout.strip()

host, port_str = splitport(netloc)
return HadoopFileSystem(
return pafs.HadoopFileSystem(
host=host,
port=int(port_str),
user=os.environ.get('HADOOP_USER_NAME', 'hadoop')
Expand Down Expand Up @@ -204,35 +198,35 @@ def get_file_status(self, path: str):
file_infos = self.filesystem.get_file_info([path_str])
file_info = file_infos[0]

if file_info.type == pyarrow.fs.FileType.NotFound:
if file_info.type == pafs.FileType.NotFound:
raise FileNotFoundError(f"File {path} (resolved as {path_str}) does not exist")

return file_info

def list_status(self, path: str):
path_str = self.to_filesystem_path(path)
selector = pyarrow.fs.FileSelector(path_str, recursive=False, allow_not_found=True)
selector = pafs.FileSelector(path_str, recursive=False, allow_not_found=True)
return self.filesystem.get_file_info(selector)

def list_directories(self, path: str):
file_infos = self.list_status(path)
return [info for info in file_infos if info.type == pyarrow.fs.FileType.Directory]
return [info for info in file_infos if info.type == pafs.FileType.Directory]

def exists(self, path: str) -> bool:
path_str = self.to_filesystem_path(path)
file_info = self.filesystem.get_file_info([path_str])[0]
return file_info.type != pyarrow.fs.FileType.NotFound
return file_info.type != pafs.FileType.NotFound

def delete(self, path: str, recursive: bool = False) -> bool:
path_str = self.to_filesystem_path(path)
file_info = self.filesystem.get_file_info([path_str])[0]

if file_info.type == pyarrow.fs.FileType.NotFound:
if file_info.type == pafs.FileType.NotFound:
return False

if file_info.type == pyarrow.fs.FileType.Directory:
if file_info.type == pafs.FileType.Directory:
if not recursive:
selector = pyarrow.fs.FileSelector(path_str, recursive=False, allow_not_found=True)
selector = pafs.FileSelector(path_str, recursive=False, allow_not_found=True)
dir_contents = self.filesystem.get_file_info(selector)
if len(dir_contents) > 0:
raise OSError(f"Directory {path} is not empty")
Expand All @@ -249,9 +243,9 @@ def mkdirs(self, path: str) -> bool:
path_str = self.to_filesystem_path(path)
file_info = self.filesystem.get_file_info([path_str])[0]

if file_info.type == pyarrow.fs.FileType.Directory:
if file_info.type == pafs.FileType.Directory:
return True
elif file_info.type == pyarrow.fs.FileType.File:
elif file_info.type == pafs.FileType.File:
raise FileExistsError(f"Path exists but is not a directory: {path}")

self.filesystem.create_dir(path_str, recursive=True)
Expand All @@ -270,15 +264,15 @@ def rename(self, src: str, dst: str) -> bool:
return self.filesystem.rename(src_str, dst_str)

dst_file_info = self.filesystem.get_file_info([dst_str])[0]
if dst_file_info.type != pyarrow.fs.FileType.NotFound:
if dst_file_info.type == pyarrow.fs.FileType.File:
if dst_file_info.type != pafs.FileType.NotFound:
if dst_file_info.type == pafs.FileType.File:
return False
# Make it compatible with HadoopFileIO: if dst is an existing directory,
# dst=dst/srcFileName
src_name = Path(src_str).name
dst_str = str(Path(dst_str) / src_name)
final_dst_info = self.filesystem.get_file_info([dst_str])[0]
if final_dst_info.type != pyarrow.fs.FileType.NotFound:
if final_dst_info.type != pafs.FileType.NotFound:
return False

self.filesystem.move(src_str, dst_str)
Expand Down Expand Up @@ -316,7 +310,7 @@ def try_to_write_atomic(self, path: str, content: str) -> bool:
if self.exists(path):
path_str = self.to_filesystem_path(path)
file_info = self.filesystem.get_file_info([path_str])[0]
if file_info.type == pyarrow.fs.FileType.Directory:
if file_info.type == pafs.FileType.Directory:
return False

temp_path = path + str(uuid.uuid4()) + ".tmp"
Expand Down
34 changes: 15 additions & 19 deletions paimon-python/pypaimon/table/format/format_table_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ def _read_file_to_arrow(
) -> pyarrow.Table:
path = split.data_path()
csv_read_options = None
if fmt == Format.CSV and hasattr(pyarrow, "csv"):
csv_read_options = pyarrow.csv.ReadOptions(block_size=1 << 20)
if fmt == Format.CSV:
import pyarrow.csv as csv
csv_read_options = csv.ReadOptions(block_size=1 << 20)
try:
with file_io.new_input_stream(path) as stream:
chunks = []
Expand All @@ -71,25 +72,25 @@ def _read_file_to_arrow(

if fmt == Format.PARQUET:
import io
import pyarrow.parquet as pq
data = (
bytes(data) if not isinstance(data, bytes) else data
)
if len(data) < 4 or data[:4] != b"PAR1":
return pyarrow.table({})
try:
tbl = pyarrow.parquet.read_table(io.BytesIO(data))
tbl = pq.read_table(io.BytesIO(data))
except pyarrow.ArrowInvalid:
return pyarrow.table({})
elif fmt == Format.CSV:
if hasattr(pyarrow, "csv"):
tbl = pyarrow.csv.read_csv(
import pyarrow.csv as csv
try:
tbl = csv.read_csv(
pyarrow.BufferReader(data),
read_options=csv_read_options,
)
else:
import io
df = pandas.read_csv(io.BytesIO(data))
tbl = pyarrow.Table.from_pandas(df)
except Exception:
return pyarrow.table({})
elif fmt == Format.JSON:
import json
text = data.decode("utf-8") if isinstance(data, bytes) else data
Expand All @@ -103,17 +104,12 @@ def _read_file_to_arrow(
tbl = pyarrow.Table.from_pylist(records)
elif fmt == Format.ORC:
import io
import pyarrow.orc as orc
data = bytes(data) if not isinstance(data, bytes) else data
if hasattr(pyarrow, "orc"):
try:
tbl = pyarrow.orc.read_table(io.BytesIO(data))
except Exception:
return pyarrow.table({})
else:
raise ValueError(
"Format table read for ORC requires PyArrow with ORC support "
"(pyarrow.orc)"
)
try:
tbl = orc.read_table(io.BytesIO(data))
except Exception:
return pyarrow.table({})
elif fmt == Format.TEXT:
text = data.decode("utf-8") if isinstance(data, bytes) else data
lines = (
Expand Down
28 changes: 10 additions & 18 deletions paimon-python/pypaimon/table/format/format_table_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,18 +178,15 @@ def _write_single_batch(
fmt = self._file_format
tbl = pyarrow.Table.from_batches([data])
if fmt == Format.PARQUET:
import pyarrow.parquet as pq
buf = io.BytesIO()
pyarrow.parquet.write_table(tbl, buf, compression="zstd")
pq.write_table(tbl, buf, compression="zstd")
raw = buf.getvalue()
elif fmt == Format.CSV:
if hasattr(pyarrow, "csv"):
buf = io.BytesIO()
pyarrow.csv.write_csv(tbl, buf)
raw = buf.getvalue()
else:
buf = io.StringIO()
tbl.to_pandas().to_csv(buf, index=False)
raw = buf.getvalue().encode("utf-8")
import pyarrow.csv as csv
buf = io.BytesIO()
csv.write_csv(tbl, buf)
raw = buf.getvalue()
elif fmt == Format.JSON:
import json
lines = []
Expand All @@ -201,15 +198,10 @@ def _write_single_batch(
lines.append(json.dumps(row) + "\n")
raw = "".join(lines).encode("utf-8")
elif fmt == Format.ORC:
if hasattr(pyarrow, "orc"):
buf = io.BytesIO()
pyarrow.orc.write_table(tbl, buf)
raw = buf.getvalue()
else:
raise ValueError(
"Format table write for ORC requires PyArrow with ORC "
"support (pyarrow.orc)"
)
import pyarrow.orc as orc
buf = io.BytesIO()
orc.write_table(tbl, buf)
raw = buf.getvalue()
elif fmt == Format.TEXT:
partition_keys = self.table.partition_keys
if partition_keys:
Expand Down
10 changes: 5 additions & 5 deletions paimon-python/pypaimon/tests/file_io_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from unittest.mock import MagicMock, patch

import pyarrow
from pyarrow.fs import S3FileSystem
import pyarrow.fs as pafs

from pypaimon.common.options import Options
from pypaimon.common.options.config import OssOptions
Expand All @@ -37,7 +37,7 @@ class FileIOTest(unittest.TestCase):
def test_s3_filesystem_path_conversion(self):
"""Test S3FileSystem path conversion with various formats."""
file_io = PyArrowFileIO("s3://bucket/warehouse", Options({}))
self.assertIsInstance(file_io.filesystem, S3FileSystem)
self.assertIsInstance(file_io.filesystem, pafs.S3FileSystem)

# Test bucket and path
self.assertEqual(file_io.to_filesystem_path("s3://my-bucket/path/to/file.txt"),
Expand Down Expand Up @@ -75,7 +75,7 @@ def test_s3_filesystem_path_conversion(self):
expected_path = (
"path/to/file.txt" if lt7 else "test-bucket/path/to/file.txt")
self.assertEqual(got, expected_path)
nf = MagicMock(type=pyarrow.fs.FileType.NotFound)
nf = MagicMock(type=pafs.FileType.NotFound)
mock_fs = MagicMock()
mock_fs.get_file_info.side_effect = [[nf], [nf]]
mock_fs.create_dir = MagicMock()
Expand Down Expand Up @@ -244,7 +244,7 @@ def test_exists_does_not_catch_exception(self):
}))
mock_fs = MagicMock()
mock_fs.get_file_info.return_value = [
MagicMock(type=pyarrow.fs.FileType.NotFound)]
MagicMock(type=pafs.FileType.NotFound)]
oss_io.filesystem = mock_fs
self.assertFalse(oss_io.exists("oss://test-bucket/path/to/file.txt"))
finally:
Expand Down Expand Up @@ -331,7 +331,7 @@ def test_get_file_status_raises_error_when_file_not_exists(self):
f.write("test content")

file_info = file_io.get_file_status(f"file://{test_file}")
self.assertEqual(file_info.type, pyarrow.fs.FileType.File)
self.assertEqual(file_info.type, pafs.FileType.File)
self.assertIsNotNone(file_info.size)

with self.assertRaises(FileNotFoundError) as context:
Expand Down