Skip to content

Commit ed2c3d7

Browse files
committed
[core] Support partition range predicate pushdown for FilesTable
1 parent 339e8c6 commit ed2c3d7

File tree

2 files changed

+133
-17
lines changed

2 files changed

+133
-17
lines changed

paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java

Lines changed: 78 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,12 @@
3333
import org.apache.paimon.io.DataFilePathFactory;
3434
import org.apache.paimon.manifest.FileSource;
3535
import org.apache.paimon.predicate.Equal;
36+
import org.apache.paimon.predicate.In;
37+
import org.apache.paimon.predicate.LeafBinaryFunction;
3638
import org.apache.paimon.predicate.LeafPredicate;
3739
import org.apache.paimon.predicate.LeafPredicateExtractor;
3840
import org.apache.paimon.predicate.Predicate;
41+
import org.apache.paimon.predicate.PredicateBuilder;
3942
import org.apache.paimon.reader.RecordReader;
4043
import org.apache.paimon.schema.SchemaManager;
4144
import org.apache.paimon.schema.TableSchema;
@@ -64,6 +67,7 @@
6467
import org.apache.paimon.utils.ProjectedRow;
6568
import org.apache.paimon.utils.RowDataToObjectArrayConverter;
6669
import org.apache.paimon.utils.SerializationUtils;
70+
import org.apache.paimon.utils.TypeUtils;
6771

6872
import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
6973

@@ -193,32 +197,89 @@ public InnerTableScan withFilter(Predicate pushdown) {
193197
@Override
194198
public Plan innerPlan() {
195199
SnapshotReader snapshotReader = fileStoreTable.newSnapshotReader();
196-
if (partitionPredicate != null && partitionPredicate.function() instanceof Equal) {
197-
String partitionStr = partitionPredicate.literals().get(0).toString();
198-
if (partitionStr.startsWith("{")) {
199-
partitionStr = partitionStr.substring(1);
200-
}
201-
if (partitionStr.endsWith("}")) {
202-
partitionStr = partitionStr.substring(0, partitionStr.length() - 1);
203-
}
204-
String[] partFields = partitionStr.split(", ");
205-
LinkedHashMap<String, String> partSpec = new LinkedHashMap<>();
200+
if (partitionPredicate != null) {
206201
List<String> partitionKeys = fileStoreTable.partitionKeys();
207-
if (partitionKeys.size() != partFields.length) {
208-
return Collections::emptyList;
209-
}
210-
for (int i = 0; i < partitionKeys.size(); i++) {
211-
partSpec.put(partitionKeys.get(i), partFields[i]);
202+
RowType partitionType = fileStoreTable.schema().logicalPartitionType();
203+
if (partitionPredicate.function() instanceof Equal) {
204+
LinkedHashMap<String, String> partSpec =
205+
parsePartitionSpec(
206+
partitionPredicate.literals().get(0).toString(), partitionKeys);
207+
if (partSpec == null) {
208+
return Collections::emptyList;
209+
}
210+
snapshotReader.withPartitionFilter(partSpec);
211+
} else if (partitionPredicate.function() instanceof In) {
212+
List<Predicate> orPredicates = new ArrayList<>();
213+
PredicateBuilder partBuilder = new PredicateBuilder(partitionType);
214+
for (Object literal : partitionPredicate.literals()) {
215+
LinkedHashMap<String, String> partSpec =
216+
parsePartitionSpec(literal.toString(), partitionKeys);
217+
if (partSpec == null) {
218+
continue;
219+
}
220+
List<Predicate> andPredicates = new ArrayList<>();
221+
for (int i = 0; i < partitionKeys.size(); i++) {
222+
Object value =
223+
TypeUtils.castFromString(
224+
partSpec.get(partitionKeys.get(i)),
225+
partitionType.getTypeAt(i));
226+
andPredicates.add(partBuilder.equal(i, value));
227+
}
228+
orPredicates.add(PredicateBuilder.and(andPredicates));
229+
}
230+
if (!orPredicates.isEmpty()) {
231+
snapshotReader.withPartitionFilter(PredicateBuilder.or(orPredicates));
232+
}
233+
} else if (partitionPredicate.function() instanceof LeafBinaryFunction) {
234+
LinkedHashMap<String, String> partSpec =
235+
parsePartitionSpec(
236+
partitionPredicate.literals().get(0).toString(), partitionKeys);
237+
if (partSpec != null) {
238+
PredicateBuilder partBuilder = new PredicateBuilder(partitionType);
239+
List<Predicate> predicates = new ArrayList<>();
240+
for (int i = 0; i < partitionKeys.size(); i++) {
241+
Object value =
242+
TypeUtils.castFromString(
243+
partSpec.get(partitionKeys.get(i)),
244+
partitionType.getTypeAt(i));
245+
predicates.add(
246+
new LeafPredicate(
247+
partitionPredicate.function(),
248+
partitionType.getTypeAt(i),
249+
i,
250+
partitionKeys.get(i),
251+
Collections.singletonList(value)));
252+
}
253+
snapshotReader.withPartitionFilter(PredicateBuilder.and(predicates));
254+
}
212255
}
213-
snapshotReader.withPartitionFilter(partSpec);
214-
// TODO support range?
215256
}
216257

217258
return () ->
218259
snapshotReader.partitions().stream()
219260
.map(p -> new FilesSplit(p, bucketPredicate, levelPredicate))
220261
.collect(Collectors.toList());
221262
}
263+
264+
@Nullable
265+
private LinkedHashMap<String, String> parsePartitionSpec(
266+
String partitionStr, List<String> partitionKeys) {
267+
if (partitionStr.startsWith("{")) {
268+
partitionStr = partitionStr.substring(1);
269+
}
270+
if (partitionStr.endsWith("}")) {
271+
partitionStr = partitionStr.substring(0, partitionStr.length() - 1);
272+
}
273+
String[] partFields = partitionStr.split(", ");
274+
if (partitionKeys.size() != partFields.length) {
275+
return null;
276+
}
277+
LinkedHashMap<String, String> partSpec = new LinkedHashMap<>();
278+
for (int i = 0; i < partitionKeys.size(); i++) {
279+
partSpec.put(partitionKeys.get(i), partFields[i]);
280+
}
281+
return partSpec;
282+
}
222283
}
223284

224285
private static class FilesSplit extends SingletonSplit {

paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import org.apache.paimon.manifest.FileKind;
3131
import org.apache.paimon.manifest.ManifestEntry;
3232
import org.apache.paimon.operation.FileStoreScan;
33+
import org.apache.paimon.predicate.In;
34+
import org.apache.paimon.predicate.LeafPredicate;
3335
import org.apache.paimon.predicate.Predicate;
3436
import org.apache.paimon.predicate.PredicateBuilder;
3537
import org.apache.paimon.schema.Schema;
@@ -40,6 +42,7 @@
4042
import org.apache.paimon.table.FileStoreTableFactory;
4143
import org.apache.paimon.table.TableTestBase;
4244
import org.apache.paimon.table.source.ReadBuilder;
45+
import org.apache.paimon.types.DataField;
4346
import org.apache.paimon.types.DataTypes;
4447
import org.apache.paimon.utils.SnapshotManager;
4548

@@ -134,6 +137,58 @@ private List<String> readPartBucketLevel(Predicate predicate) throws IOException
134137
return rows;
135138
}
136139

140+
@Test
141+
public void testReadWithPartitionRange() throws Exception {
142+
compact(table, row(2, 20), 0);
143+
write(table, GenericRow.of(3, 1, 10, 1));
144+
145+
PredicateBuilder builder = new PredicateBuilder(FilesTable.TABLE_TYPE);
146+
147+
assertThat(readPartBucketLevel(builder.greaterThan(0, BinaryString.fromString("{1, 10}"))))
148+
.containsExactlyInAnyOrder("{2, 20}-0-5");
149+
150+
assertThat(
151+
readPartBucketLevel(
152+
builder.greaterOrEqual(0, BinaryString.fromString("{2, 20}"))))
153+
.containsExactlyInAnyOrder("{2, 20}-0-5");
154+
155+
assertThat(readPartBucketLevel(builder.lessThan(0, BinaryString.fromString("{2, 20}"))))
156+
.containsExactlyInAnyOrder("{1, 10}-0-0", "{1, 10}-0-0", "{1, 10}-1-0");
157+
158+
assertThat(readPartBucketLevel(builder.lessOrEqual(0, BinaryString.fromString("{1, 10}"))))
159+
.containsExactlyInAnyOrder("{1, 10}-0-0", "{1, 10}-0-0", "{1, 10}-1-0");
160+
}
161+
162+
@Test
163+
public void testReadWithPartitionIn() throws Exception {
164+
compact(table, row(2, 20), 0);
165+
write(table, GenericRow.of(3, 1, 10, 1));
166+
167+
assertThat(
168+
readPartBucketLevel(
169+
buildInPredicate(
170+
BinaryString.fromString("{1, 10}"),
171+
BinaryString.fromString("{2, 20}"))))
172+
.containsExactlyInAnyOrder(
173+
"{1, 10}-0-0", "{1, 10}-0-0", "{1, 10}-1-0", "{2, 20}-0-5");
174+
175+
assertThat(readPartBucketLevel(buildInPredicate(BinaryString.fromString("{2, 20}"))))
176+
.containsExactlyInAnyOrder("{2, 20}-0-5");
177+
178+
assertThat(readPartBucketLevel(buildInPredicate(BinaryString.fromString("{3, 30}"))))
179+
.isEmpty();
180+
}
181+
182+
private Predicate buildInPredicate(BinaryString... values) {
183+
DataField partitionField = FilesTable.TABLE_TYPE.getFields().get(0);
184+
return new LeafPredicate(
185+
In.INSTANCE,
186+
partitionField.type(),
187+
0,
188+
partitionField.name(),
189+
Arrays.asList(values));
190+
}
191+
137192
@Test
138193
public void testReadFilesFromLatest() throws Exception {
139194
List<InternalRow> expectedRow = getExpectedResult(2L);

0 commit comments

Comments
 (0)