Skip to content

Commit

Permalink
[core] Data files with delete records should not be upgraded directly…
Browse files Browse the repository at this point in the history
… to max level (apache#2962)
  • Loading branch information
tsreaper authored and zhu3pang committed Mar 29, 2024
1 parent a0817bd commit 731f3ef
Show file tree
Hide file tree
Showing 21 changed files with 175 additions and 40 deletions.
57 changes: 47 additions & 10 deletions paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;

import javax.annotation.Nullable;

import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
Expand All @@ -58,6 +60,8 @@ public class DataFileMeta {

private final String fileName;
private final long fileSize;

// total number of rows (including add & delete) in this file
private final long rowCount;

private final BinaryRow minKey;
Expand All @@ -73,6 +77,12 @@ public class DataFileMeta {
private final List<String> extraFiles;
private final Timestamp creationTime;

// rowCount = addRowCount + deleteRowCount
// Why don't we keep addRowCount and deleteRowCount?
// Because in previous versions of DataFileMeta, we only keep rowCount.
// We have to keep the compatibility.
private final @Nullable Long deleteRowCount;

public static DataFileMeta forAppend(
String fileName,
long fileSize,
Expand All @@ -92,7 +102,8 @@ public static DataFileMeta forAppend(
minSequenceNumber,
maxSequenceNumber,
schemaId,
DUMMY_LEVEL);
DUMMY_LEVEL,
0L);
}

public DataFileMeta(
Expand All @@ -106,7 +117,8 @@ public DataFileMeta(
long minSequenceNumber,
long maxSequenceNumber,
long schemaId,
int level) {
int level,
@Nullable Long deleteRowCount) {
this(
fileName,
fileSize,
Expand All @@ -120,7 +132,8 @@ public DataFileMeta(
schemaId,
level,
Collections.emptyList(),
Timestamp.fromLocalDateTime(LocalDateTime.now()).toMillisTimestamp());
Timestamp.fromLocalDateTime(LocalDateTime.now()).toMillisTimestamp(),
deleteRowCount);
}

public DataFileMeta(
Expand All @@ -136,9 +149,11 @@ public DataFileMeta(
long schemaId,
int level,
List<String> extraFiles,
Timestamp creationTime) {
Timestamp creationTime,
@Nullable Long deleteRowCount) {
this.fileName = fileName;
this.fileSize = fileSize;

this.rowCount = rowCount;

this.minKey = minKey;
Expand All @@ -152,6 +167,8 @@ public DataFileMeta(
this.schemaId = schemaId;
this.extraFiles = Collections.unmodifiableList(extraFiles);
this.creationTime = creationTime;

this.deleteRowCount = deleteRowCount;
}

public String fileName() {
Expand All @@ -166,6 +183,14 @@ public long rowCount() {
return rowCount;
}

public Optional<Long> addRowCount() {
return Optional.ofNullable(deleteRowCount).map(c -> rowCount - c);
}

public Optional<Long> deleteRowCount() {
return Optional.ofNullable(deleteRowCount);
}

public BinaryRow minKey() {
return minKey;
}
Expand Down Expand Up @@ -250,7 +275,8 @@ public DataFileMeta upgrade(int newLevel) {
schemaId,
newLevel,
extraFiles,
creationTime);
creationTime,
deleteRowCount);
}

public List<Path> collectFiles(DataFilePathFactory pathFactory) {
Expand All @@ -274,11 +300,15 @@ public DataFileMeta copy(List<String> newExtraFiles) {
schemaId,
level,
newExtraFiles,
creationTime);
creationTime,
deleteRowCount);
}

@Override
public boolean equals(Object o) {
if (o == this) {
return true;
}
if (!(o instanceof DataFileMeta)) {
return false;
}
Expand All @@ -295,7 +325,8 @@ public boolean equals(Object o) {
&& schemaId == that.schemaId
&& level == that.level
&& Objects.equals(extraFiles, that.extraFiles)
&& Objects.equals(creationTime, that.creationTime);
&& Objects.equals(creationTime, that.creationTime)
&& Objects.equals(deleteRowCount, that.deleteRowCount);
}

@Override
Expand All @@ -313,13 +344,17 @@ public int hashCode() {
schemaId,
level,
extraFiles,
creationTime);
creationTime,
deleteRowCount);
}

@Override
public String toString() {
return String.format(
"{%s, %d, %d, %s, %s, %s, %s, %d, %d, %d, %d, %s, %s}",
"{fileName: %s, fileSize: %d, rowCount: %d, "
+ "minKey: %s, maxKey: %s, keyStats: %s, valueStats: %s, "
+ "minSequenceNumber: %d, maxSequenceNumber: %d, "
+ "schemaId: %d, level: %d, extraFiles: %s, creationTime: %s, deleteRowCount: %d}",
fileName,
fileSize,
rowCount,
Expand All @@ -332,7 +367,8 @@ public String toString() {
schemaId,
level,
extraFiles,
creationTime);
creationTime,
deleteRowCount);
}

public static RowType schema() {
Expand All @@ -350,6 +386,7 @@ public static RowType schema() {
fields.add(new DataField(10, "_LEVEL", new IntType(false)));
fields.add(new DataField(11, "_EXTRA_FILES", new ArrayType(false, newStringType(false))));
fields.add(new DataField(12, "_CREATION_TIME", DataTypes.TIMESTAMP_MILLIS()));
fields.add(new DataField(13, "_DELETE_ROW_COUNT", new BigIntType(true)));
return new RowType(fields);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ public InternalRow toRow(DataFileMeta meta) {
meta.schemaId(),
meta.level(),
toStringArrayData(meta.extraFiles()),
meta.creationTime());
meta.creationTime(),
meta.deleteRowCount().orElse(null));
}

@Override
Expand All @@ -71,6 +72,7 @@ public DataFileMeta fromRow(InternalRow row) {
row.getLong(9),
row.getInt(10),
fromStringArrayData(row.getArray(11)),
row.getTimestamp(12, 3));
row.getTimestamp(12, 3),
row.isNullAt(13) ? null : row.getLong(13));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class KeyValueDataFileWriter
private InternalRow maxKey = null;
private long minSeqNumber = Long.MAX_VALUE;
private long maxSeqNumber = Long.MIN_VALUE;
private long deleteRecordCount = 0;

public KeyValueDataFileWriter(
FileIO fileIO,
Expand Down Expand Up @@ -111,6 +112,10 @@ public void write(KeyValue kv) throws IOException {
updateMinSeqNumber(kv);
updateMaxSeqNumber(kv);

if (kv.valueKind().isRetract()) {
deleteRecordCount++;
}

if (LOG.isDebugEnabled()) {
LOG.debug("Write to Path " + path + " key value " + kv.toString(keyType, valueType));
}
Expand Down Expand Up @@ -162,6 +167,7 @@ public DataFileMeta result() throws IOException {
minSeqNumber,
maxSeqNumber,
schemaId,
level);
level,
deleteRecordCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ public Levels(
this.keyComparator = keyComparator;

// in case the num of levels is not specified explicitly
int restoredMaxLevel =
int restoredNumLevels =
Math.max(
numLevels,
inputFiles.stream().mapToInt(DataFileMeta::level).max().orElse(-1) + 1);
checkArgument(restoredMaxLevel > 1, "levels must be at least 2.");
checkArgument(restoredNumLevels > 1, "Number of levels must be at least 2.");
this.level0 =
new TreeSet<>(
(a, b) -> {
Expand All @@ -70,7 +70,7 @@ public Levels(
}
});
this.levels = new ArrayList<>();
for (int i = 1; i < restoredMaxLevel; i++) {
for (int i = 1; i < restoredNumLevels; i++) {
levels.add(SortedRun.empty());
}

Expand Down Expand Up @@ -108,6 +108,10 @@ public int numberOfLevels() {
return levels.size() + 1;
}

public int maxLevel() {
return levels.size();
}

public int numberOfSortedRuns() {
int numberOfSortedRuns = level0.size();
for (SortedRun run : levels) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ private void submitCompaction(CompactUnit unit, boolean dropDelete) {
rewriter,
unit,
dropDelete,
levels.maxLevel(),
metricsReporter);
if (LOG.isDebugEnabled()) {
LOG.debug(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class MergeTreeCompactTask extends CompactTask {
private final List<List<SortedRun>> partitioned;

private final boolean dropDelete;
private final int maxLevel;

// metric
private int upgradeFilesNum;
Expand All @@ -54,13 +55,15 @@ public MergeTreeCompactTask(
CompactRewriter rewriter,
CompactUnit unit,
boolean dropDelete,
int maxLevel,
@Nullable CompactionMetrics.Reporter metricsReporter) {
super(metricsReporter);
this.minFileSize = minFileSize;
this.rewriter = rewriter;
this.outputLevel = unit.outputLevel();
this.partitioned = new IntervalPartition(unit.files(), keyComparator).partition();
this.dropDelete = dropDelete;
this.maxLevel = maxLevel;

this.upgradeFilesNum = 0;
}
Expand Down Expand Up @@ -107,10 +110,20 @@ protected String logMetric(
}

private void upgrade(DataFileMeta file, CompactResult toUpdate) throws Exception {
if (file.level() != outputLevel) {
if (file.level() == outputLevel) {
return;
}

if (outputLevel != maxLevel || file.deleteRowCount().map(d -> d == 0).orElse(false)) {
CompactResult upgradeResult = rewriter.upgrade(outputLevel, file);
toUpdate.merge(upgradeResult);
upgradeFilesNum++;
} else {
// files with delete records should not be upgraded directly to max level
List<List<SortedRun>> candidate = new ArrayList<>();
candidate.add(new ArrayList<>());
candidate.get(0).add(SortedRun.fromSingle(file));
rewriteImpl(candidate, toUpdate);
}
}

Expand All @@ -130,6 +143,11 @@ private void rewrite(List<List<SortedRun>> candidate, CompactResult toUpdate) th
return;
}
}
rewriteImpl(candidate, toUpdate);
}

private void rewriteImpl(List<List<SortedRun>> candidate, CompactResult toUpdate)
throws Exception {
CompactResult rewriteResult = rewriter.rewrite(outputLevel, dropDelete, candidate);
toUpdate.merge(rewriteResult);
candidate.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ private DataFileMeta newFile(long fileSize) {
0,
0,
0,
0);
0,
0L);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ private static DataFileMeta newFile(long timeMillis) {
Timestamp.fromLocalDateTime(
Instant.ofEpochMilli(timeMillis)
.atZone(ZoneId.systemDefault())
.toLocalDateTime()));
.toLocalDateTime()),
0L);
}

private Pair<InternalRow, Integer> row(int pt, int col, int pk, int bucket) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ private Data createDataFile(List<KeyValue> kvs, int level, BinaryRow partition,
minSequenceNumber,
maxSequenceNumber,
0,
level),
level,
kvs.stream().filter(kv -> kv.valueKind().isRetract()).count()),
kvs);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public static DataFileMeta newFile(long minSeq, long maxSeq) {
return new DataFileMeta(
"",
maxSeq - minSeq + 1,
maxSeq - minSeq + 1,
0L,
DataFileMeta.EMPTY_MIN_KEY,
DataFileMeta.EMPTY_MAX_KEY,
DataFileMeta.EMPTY_KEY_STATS,
Expand All @@ -50,7 +50,8 @@ public static DataFileMeta newFile(long minSeq, long maxSeq) {
0L,
DataFileMeta.DUMMY_LEVEL,
Collections.emptyList(),
Timestamp.fromEpochMillis(100));
Timestamp.fromEpochMillis(100),
maxSeq - minSeq + 1);
}

public static DataFileMeta newFile() {
Expand All @@ -65,7 +66,8 @@ public static DataFileMeta newFile() {
0,
0,
0,
0);
0,
0L);
}

public static DataFileMeta newFile(
Expand All @@ -81,7 +83,8 @@ public static DataFileMeta newFile(
0,
maxSequence,
0,
level);
level,
0L);
}

public static BinaryRow row(int i) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public static DataFileMeta newFile(int name, int level) {
0,
1,
0,
level);
level,
0L);
}
}
Loading

0 comments on commit 731f3ef

Please sign in to comment.