Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,12 @@
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.predicate.Equal;
import org.apache.paimon.predicate.In;
import org.apache.paimon.predicate.LeafBinaryFunction;
import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.LeafPredicateExtractor;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
Expand Down Expand Up @@ -64,6 +67,7 @@
import org.apache.paimon.utils.ProjectedRow;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;
import org.apache.paimon.utils.SerializationUtils;
import org.apache.paimon.utils.TypeUtils;

import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;

Expand Down Expand Up @@ -193,32 +197,89 @@ public InnerTableScan withFilter(Predicate pushdown) {
@Override
public Plan innerPlan() {
SnapshotReader snapshotReader = fileStoreTable.newSnapshotReader();
if (partitionPredicate != null && partitionPredicate.function() instanceof Equal) {
String partitionStr = partitionPredicate.literals().get(0).toString();
if (partitionStr.startsWith("{")) {
partitionStr = partitionStr.substring(1);
}
if (partitionStr.endsWith("}")) {
partitionStr = partitionStr.substring(0, partitionStr.length() - 1);
}
String[] partFields = partitionStr.split(", ");
LinkedHashMap<String, String> partSpec = new LinkedHashMap<>();
if (partitionPredicate != null) {
List<String> partitionKeys = fileStoreTable.partitionKeys();
if (partitionKeys.size() != partFields.length) {
return Collections::emptyList;
}
for (int i = 0; i < partitionKeys.size(); i++) {
partSpec.put(partitionKeys.get(i), partFields[i]);
RowType partitionType = fileStoreTable.schema().logicalPartitionType();
if (partitionPredicate.function() instanceof Equal) {
LinkedHashMap<String, String> partSpec =
parsePartitionSpec(
partitionPredicate.literals().get(0).toString(), partitionKeys);
if (partSpec == null) {
return Collections::emptyList;
}
snapshotReader.withPartitionFilter(partSpec);
} else if (partitionPredicate.function() instanceof In) {
List<Predicate> orPredicates = new ArrayList<>();
PredicateBuilder partBuilder = new PredicateBuilder(partitionType);
for (Object literal : partitionPredicate.literals()) {
LinkedHashMap<String, String> partSpec =
parsePartitionSpec(literal.toString(), partitionKeys);
if (partSpec == null) {
continue;
}
List<Predicate> andPredicates = new ArrayList<>();
for (int i = 0; i < partitionKeys.size(); i++) {
Object value =
TypeUtils.castFromString(
partSpec.get(partitionKeys.get(i)),
partitionType.getTypeAt(i));
andPredicates.add(partBuilder.equal(i, value));
}
orPredicates.add(PredicateBuilder.and(andPredicates));
}
if (!orPredicates.isEmpty()) {
snapshotReader.withPartitionFilter(PredicateBuilder.or(orPredicates));
}
} else if (partitionPredicate.function() instanceof LeafBinaryFunction) {
LinkedHashMap<String, String> partSpec =
parsePartitionSpec(
partitionPredicate.literals().get(0).toString(), partitionKeys);
if (partSpec != null) {
PredicateBuilder partBuilder = new PredicateBuilder(partitionType);
List<Predicate> predicates = new ArrayList<>();
for (int i = 0; i < partitionKeys.size(); i++) {
Object value =
TypeUtils.castFromString(
partSpec.get(partitionKeys.get(i)),
partitionType.getTypeAt(i));
predicates.add(
new LeafPredicate(
partitionPredicate.function(),
partitionType.getTypeAt(i),
i,
partitionKeys.get(i),
Collections.singletonList(value)));
}
snapshotReader.withPartitionFilter(PredicateBuilder.and(predicates));
}
}
snapshotReader.withPartitionFilter(partSpec);
// TODO support range?
}

return () ->
snapshotReader.partitions().stream()
.map(p -> new FilesSplit(p, bucketPredicate, levelPredicate))
.collect(Collectors.toList());
}

@Nullable
private LinkedHashMap<String, String> parsePartitionSpec(
String partitionStr, List<String> partitionKeys) {
if (partitionStr.startsWith("{")) {
partitionStr = partitionStr.substring(1);
}
if (partitionStr.endsWith("}")) {
partitionStr = partitionStr.substring(0, partitionStr.length() - 1);
}
String[] partFields = partitionStr.split(", ");
if (partitionKeys.size() != partFields.length) {
return null;
}
LinkedHashMap<String, String> partSpec = new LinkedHashMap<>();
for (int i = 0; i < partitionKeys.size(); i++) {
partSpec.put(partitionKeys.get(i), partFields[i]);
}
return partSpec;
}
}

private static class FilesSplit extends SingletonSplit {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.predicate.In;
import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.Schema;
Expand All @@ -40,6 +42,7 @@
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.SnapshotManager;

Expand Down Expand Up @@ -134,6 +137,58 @@ private List<String> readPartBucketLevel(Predicate predicate) throws IOException
return rows;
}

@Test
public void testReadWithPartitionRange() throws Exception {
compact(table, row(2, 20), 0);
write(table, GenericRow.of(3, 1, 10, 1));

PredicateBuilder builder = new PredicateBuilder(FilesTable.TABLE_TYPE);

assertThat(readPartBucketLevel(builder.greaterThan(0, BinaryString.fromString("{1, 10}"))))
.containsExactlyInAnyOrder("{2, 20}-0-5");

assertThat(
readPartBucketLevel(
builder.greaterOrEqual(0, BinaryString.fromString("{2, 20}"))))
.containsExactlyInAnyOrder("{2, 20}-0-5");

assertThat(readPartBucketLevel(builder.lessThan(0, BinaryString.fromString("{2, 20}"))))
.containsExactlyInAnyOrder("{1, 10}-0-0", "{1, 10}-0-0", "{1, 10}-1-0");

assertThat(readPartBucketLevel(builder.lessOrEqual(0, BinaryString.fromString("{1, 10}"))))
.containsExactlyInAnyOrder("{1, 10}-0-0", "{1, 10}-0-0", "{1, 10}-1-0");
}

@Test
public void testReadWithPartitionIn() throws Exception {
compact(table, row(2, 20), 0);
write(table, GenericRow.of(3, 1, 10, 1));

assertThat(
readPartBucketLevel(
buildInPredicate(
BinaryString.fromString("{1, 10}"),
BinaryString.fromString("{2, 20}"))))
.containsExactlyInAnyOrder(
"{1, 10}-0-0", "{1, 10}-0-0", "{1, 10}-1-0", "{2, 20}-0-5");

assertThat(readPartBucketLevel(buildInPredicate(BinaryString.fromString("{2, 20}"))))
.containsExactlyInAnyOrder("{2, 20}-0-5");

assertThat(readPartBucketLevel(buildInPredicate(BinaryString.fromString("{3, 30}"))))
.isEmpty();
}

private Predicate buildInPredicate(BinaryString... values) {
DataField partitionField = FilesTable.TABLE_TYPE.getFields().get(0);
return new LeafPredicate(
In.INSTANCE,
partitionField.type(),
0,
partitionField.name(),
Arrays.asList(values));
}

@Test
public void testReadFilesFromLatest() throws Exception {
List<InternalRow> expectedRow = getExpectedResult(2L);
Expand Down