Skip to content

Commit 23ed456

Browse files
committed
#49 Implement Python native read with PyArrow
1 parent c741aea commit 23ed456

24 files changed

+204
-414
lines changed

pypaimon/pynative/common/exception.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@
22

33
class PyNativeNotImplementedError(NotImplementedError):
44
""" Method or function hasn't been implemented by py-native paimon yet. """
5-
pass
5+
pass

pypaimon/pynative/common/other.py

Lines changed: 0 additions & 46 deletions
This file was deleted.
Lines changed: 71 additions & 134 deletions
Original file line numberDiff line numberDiff line change
@@ -1,134 +1,71 @@
1-
# from typing import List, Optional
2-
# from dataclasses import dataclass
3-
#
4-
#
5-
# @dataclass
6-
# class PartitionInfo:
7-
# """分区信息类,用于处理分区字段的映射和值获取"""
8-
#
9-
# def __init__(self, mapping: List[int], partition_type: 'RowType', partition_row: 'BinaryRow'):
10-
# """
11-
# Args:
12-
# mapping: 字段映射数组。
13-
# - 如果mapping[i] > 0,表示字段在数据行中的位置
14-
# - 如果mapping[i] < 0,表示字段在分区中的位置(需要从partition_row获取)
15-
# partition_type: 分区字段的类型信息
16-
# partition_row: 包含分区值的二进制行
17-
# """
18-
# self.mapping = mapping
19-
# self.partition_type = partition_type
20-
# self.partition_row = partition_row
21-
#
22-
# @classmethod
23-
# def from_(cls, mapping: List[int], partition_type: 'RowType',
24-
# partition_row: 'BinaryRow') -> 'PartitionInfo':
25-
# """创建PartitionInfo实例"""
26-
# return cls(mapping, partition_type, partition_row)
27-
#
28-
# def size(self) -> int:
29-
# """返回总字段数(包括数据字段和分区字段)"""
30-
# return len(self.mapping)
31-
#
32-
# def in_partition_row(self, pos: int) -> bool:
33-
# """判断指定位置的字段是否是分区字段
34-
#
35-
# Args:
36-
# pos: 字段位置
37-
#
38-
# Returns:
39-
# 如果是分区字段返回True,否则返回False
40-
# """
41-
# return self.mapping[pos] < 0
42-
#
43-
# def get_real_index(self, pos: int) -> int:
44-
# """获取字段的实际索引
45-
#
46-
# Args:
47-
# pos: 字段位置
48-
#
49-
# Returns:
50-
# - 如果是数据字段,返回在数据行中的位置(mapping[pos] - 1)
51-
# - 如果是分区字段,返回在分区行中的位置(abs(mapping[pos]) - 1)
52-
# """
53-
# mapped = self.mapping[pos]
54-
# return abs(mapped) - 1
55-
#
56-
# def get_type(self, pos: int) -> 'DataType':
57-
# """获取指定位置字段的数据类型
58-
#
59-
# Args:
60-
# pos: 字段位置
61-
#
62-
# Returns:
63-
# 字段的DataType
64-
# """
65-
# if not self.in_partition_row(pos):
66-
# raise ValueError(f"Position {pos} is not in partition row")
67-
# return self.partition_type.get_type_at(self.get_real_index(pos))
68-
#
69-
# def get_partition_row(self) -> 'BinaryRow':
70-
# """获取分区行数据"""
71-
# return self.partition_row
72-
#
73-
#
74-
# class PartitionUtils:
75-
# """分区工具类"""
76-
#
77-
# @staticmethod
78-
# def construct_partition_mapping(data_schema: 'TableSchema',
79-
# data_fields: List['DataField']) -> tuple:
80-
# """构造分区映射
81-
#
82-
# Args:
83-
# data_schema: 表结构
84-
# data_fields: 数据字段列表
85-
#
86-
# Returns:
87-
# (partition_mapping, fields_without_partition) 元组
88-
# - partition_mapping: (mapping数组, 分区RowType)或None
89-
# - fields_without_partition: 不包含分区字段的字段列表
90-
# """
91-
# if not data_schema.partition_keys():
92-
# return None, data_fields
93-
#
94-
# partition_names = data_schema.partition_keys()
95-
# fields_without_partition = []
96-
#
97-
# # 构建映射数组
98-
# mapping = [0] * (len(data_fields) + 1)
99-
# p_count = 0
100-
#
101-
# for i, field in enumerate(data_fields):
102-
# if field.name() in partition_names:
103-
# # 分区字段用负数表示
104-
# mapping[i] = -(partition_names.index(field.name()) + 1)
105-
# p_count += 1
106-
# else:
107-
# # 非分区字段用正数表示
108-
# mapping[i] = (i - p_count) + 1
109-
# fields_without_partition.append(field)
110-
#
111-
# # 如果没有分区字段,返回None
112-
# if len(fields_without_partition) == len(data_fields):
113-
# return None, data_fields
114-
#
115-
# # 创建分区类型
116-
# partition_type = data_schema.projected_logical_row_type(data_schema.partition_keys())
117-
#
118-
# return (mapping, partition_type), fields_without_partition
119-
#
120-
# @staticmethod
121-
# def create(partition_pair: Optional[tuple], binary_row: 'BinaryRow') -> Optional[PartitionInfo]:
122-
# """创建PartitionInfo实例
123-
#
124-
# Args:
125-
# partition_pair: (mapping数组, 分区RowType)元组或None
126-
# binary_row: 分区数据行
127-
#
128-
# Returns:
129-
# PartitionInfo实例或None
130-
# """
131-
# if partition_pair is None:
132-
# return None
133-
# mapping, row_type = partition_pair
134-
# return PartitionInfo(mapping, row_type, binary_row)
1+
from typing import List, Optional
2+
from dataclasses import dataclass
3+
4+
5+
@dataclass
6+
class PartitionInfo:
7+
8+
def __init__(self, mapping: List[int], partition_type: 'RowType', partition_row: 'BinaryRow'):
9+
self.mapping = mapping
10+
self.partition_type = partition_type
11+
self.partition_row = partition_row
12+
13+
@classmethod
14+
def from_(cls, mapping: List[int], partition_type: 'RowType',
15+
partition_row: 'BinaryRow') -> 'PartitionInfo':
16+
return cls(mapping, partition_type, partition_row)
17+
18+
def size(self) -> int:
19+
return len(self.mapping)
20+
21+
def in_partition_row(self, pos: int) -> bool:
22+
return self.mapping[pos] < 0
23+
24+
def get_real_index(self, pos: int) -> int:
25+
mapped = self.mapping[pos]
26+
return abs(mapped) - 1
27+
28+
def get_type(self, pos: int) -> 'DataType':
29+
if not self.in_partition_row(pos):
30+
raise ValueError(f"Position {pos} is not in partition row")
31+
return self.partition_type.get_type_at(self.get_real_index(pos))
32+
33+
def get_partition_row(self) -> 'BinaryRow':
34+
return self.partition_row
35+
36+
37+
class PartitionUtils:
38+
39+
@staticmethod
40+
def construct_partition_mapping(data_schema: 'TableSchema',
41+
data_fields: List['DataField']) -> tuple:
42+
if not data_schema.partition_keys():
43+
return None, data_fields
44+
45+
partition_names = data_schema.partition_keys()
46+
fields_without_partition = []
47+
48+
mapping = [0] * (len(data_fields) + 1)
49+
p_count = 0
50+
51+
for i, field in enumerate(data_fields):
52+
if field.name() in partition_names:
53+
mapping[i] = -(partition_names.index(field.name()) + 1)
54+
p_count += 1
55+
else:
56+
mapping[i] = (i - p_count) + 1
57+
fields_without_partition.append(field)
58+
59+
if len(fields_without_partition) == len(data_fields):
60+
return None, data_fields
61+
62+
partition_type = data_schema.projected_logical_row_type(data_schema.partition_keys())
63+
64+
return (mapping, partition_type), fields_without_partition
65+
66+
@staticmethod
67+
def create(partition_pair: Optional[tuple], binary_row: 'BinaryRow') -> Optional[PartitionInfo]:
68+
if partition_pair is None:
69+
return None
70+
mapping, row_type = partition_pair
71+
return PartitionInfo(mapping, row_type, binary_row)

0 commit comments

Comments
 (0)