Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Support commit metrics #1638

Merged
merged 13 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[core] Add tests for init total- metrics and resolve other comments
  • Loading branch information
schnappi17 committed Oct 19, 2023
commit 9f059ccfab1673e14c88a49c01bc2ddc1e5d5411
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.paimon.manifest.IndexManifestFile;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.metrics.commit.CommitMetrics;
import org.apache.paimon.operation.FileStoreCommitImpl;
import org.apache.paimon.operation.FileStoreExpireImpl;
import org.apache.paimon.operation.PartitionExpire;
Expand Down Expand Up @@ -166,8 +165,7 @@ public FileStoreCommitImpl newCommit(String commitUser) {
options.manifestFullCompactionThresholdSize(),
options.manifestMergeMinCount(),
partitionType.getFieldCount() > 0 && options.dynamicPartitionOverwrite(),
newKeyComparator(),
new CommitMetrics(pathFactory(), fileIO));
newKeyComparator());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,22 +46,30 @@ public class CommitMetrics {
protected static final String GROUP_NAME = "commit";

private final MetricGroup genericMetricGroup;

private final FileStorePathFactory pathFactory;
private final FileIO fileIO;

private long initTableFilesCount = 0;
private long initChangelogFilesCount = 0;

public CommitMetrics(FileStorePathFactory pathFactory, FileIO fileIO) {
this.pathFactory = pathFactory;
this.genericMetricGroup =
GenericMetricGroup.createGenericMetricGroup(
pathFactory.root().getName(), GROUP_NAME);
initDataFilesCount(pathFactory, fileIO);
this.pathFactory = pathFactory;
this.fileIO = fileIO;
initDataFilesCount();
registerGenericCommitMetrics();
}

private void initDataFilesCount(FileStorePathFactory pathFactory, FileIO fileIO) {
@VisibleForTesting
void initDataFilesCount() {
try {
List<Path> dirs = Arrays.stream(fileIO.listStatus(pathFactory.root())).map(f -> f.getPath()).collect(Collectors.toList());
List<Path> dirs =
Arrays.stream(fileIO.listStatus(pathFactory.root()))
.map(f -> f.getPath())
.collect(Collectors.toList());
boolean hasPartition = true;
for (Path dir : dirs) {
if (dir.getName().startsWith("bucket-")) {
Expand All @@ -73,7 +81,16 @@ private void initDataFilesCount(FileStorePathFactory pathFactory, FileIO fileIO)
List<Path> buckets = new ArrayList<>();
for (Path dir : dirs) {
FileStatus[] fileStatuses = fileIO.listStatus(dir);
buckets.addAll(Arrays.stream(fileStatuses).filter(f -> f.isDir() && f.getPath().getName().startsWith("bucket-")).map(f -> f.getPath()).collect(Collectors.toList()));
buckets.addAll(
Arrays.stream(fileStatuses)
.filter(
f ->
f.isDir()
&& f.getPath()
.getName()
.startsWith("bucket-"))
.map(f -> f.getPath())
.collect(Collectors.toList()));
}
for (Path bucket : buckets) {
FileStatus[] fileStatuses = fileIO.listStatus(bucket);
Expand All @@ -86,13 +103,29 @@ private void initDataFilesCount(FileStorePathFactory pathFactory, FileIO fileIO)
}
}
} catch (IOException ie) {
log.warn("List table files failed, the 'total' prefixed commit metrics will calculate the number of total files since the job started. ");
log.warn(
"List table files failed, the 'total' prefixed commit metrics will calculate the number of total files since the job started. ");
}
}

private void accFilesCount(FileStatus[] fileStatuses) {
initTableFilesCount += Arrays.stream(fileStatuses).filter(f -> f.getPath().getName().startsWith(DataFilePathFactory.DATA_FILE_PREFIX)).count();
initChangelogFilesCount += Arrays.stream(fileStatuses).filter(f -> f.getPath().getName().startsWith(DataFilePathFactory.CHANGELOG_FILE_PREFIX)).count();
initTableFilesCount +=
Arrays.stream(fileStatuses)
.filter(
f ->
f.getPath()
.getName()
.startsWith(DataFilePathFactory.DATA_FILE_PREFIX))
.count();
initChangelogFilesCount +=
Arrays.stream(fileStatuses)
.filter(
f ->
f.getPath()
.getName()
.startsWith(
DataFilePathFactory.CHANGELOG_FILE_PREFIX))
.count();
}

public MetricGroup getGenericMetricGroup() {
Expand Down Expand Up @@ -143,81 +176,49 @@ public MetricGroup getGenericMetricGroup() {

private void registerGenericCommitMetrics() {
genericMetricGroup.gauge(
LAST_COMMIT_DURATION,
() -> {
return latestCommit == null ? 0L : latestCommit.getDuration();
});
LAST_COMMIT_DURATION, () -> latestCommit == null ? 0L : latestCommit.getDuration());
genericMetricGroup.gauge(
LAST_COMMIT_ATTEMPTS,
() -> {
return latestCommit == null ? 0L : latestCommit.getAttempts();
});
LAST_COMMIT_ATTEMPTS, () -> latestCommit == null ? 0L : latestCommit.getAttempts());
genericMetricGroup.gauge(
LAST_GENERATED_SNAPSHOTS,
() -> {
return latestCommit == null ? 0L : latestCommit.getGeneratedSnapshots();
});
() -> latestCommit == null ? 0L : latestCommit.getGeneratedSnapshots());
genericMetricGroup.gauge(
LAST_PARTITIONS_WRITTEN,
() -> {
return latestCommit == null ? 0L : latestCommit.getNumPartitionsWritten();
});
() -> latestCommit == null ? 0L : latestCommit.getNumPartitionsWritten());
genericMetricGroup.gauge(
LAST_BUCKETS_WRITTEN,
() -> {
return latestCommit == null ? 0L : latestCommit.getNumBucketsWritten();
});
() -> latestCommit == null ? 0L : latestCommit.getNumBucketsWritten());
genericMetricGroup.histogram(COMMIT_DURATION, durationHistogram);
genericMetricGroup.gauge(
LAST_TABLE_FILES_ADDED,
() -> {
return latestCommit == null ? 0L : latestCommit.getTableFilesAdded();
});
() -> latestCommit == null ? 0L : latestCommit.getTableFilesAdded());
genericMetricGroup.gauge(
LAST_TABLE_FILES_DELETED,
() -> {
return latestCommit == null ? 0L : latestCommit.getTableFilesDeleted();
});
() -> latestCommit == null ? 0L : latestCommit.getTableFilesDeleted());
genericMetricGroup.gauge(
LAST_TABLE_FILES_APPENDED,
() -> {
return latestCommit == null ? 0L : latestCommit.getTableFilesAppended();
});
() -> latestCommit == null ? 0L : latestCommit.getTableFilesAppended());
genericMetricGroup.gauge(
LAST_TABLE_FILES_COMMIT_COMPACTED,
() -> {
return latestCommit == null ? 0L : latestCommit.getTableFilesCompacted();
});
() -> latestCommit == null ? 0L : latestCommit.getTableFilesCompacted());
genericMetricGroup.gauge(
LAST_CHANGELOG_FILES_APPENDED,
() -> {
return latestCommit == null ? 0L : latestCommit.getChangelogFilesAppended();
});
() -> latestCommit == null ? 0L : latestCommit.getChangelogFilesAppended());
genericMetricGroup.gauge(
LAST_CHANGELOG_FILES_COMMIT_COMPACTED,
() -> {
return latestCommit == null ? 0L : latestCommit.getChangelogFilesCompacted();
});
() -> latestCommit == null ? 0L : latestCommit.getChangelogFilesCompacted());
genericMetricGroup.gauge(
LAST_DELTA_RECORDS_APPENDED,
() -> {
return latestCommit == null ? 0L : latestCommit.getDeltaRecordsAppended();
});
() -> latestCommit == null ? 0L : latestCommit.getDeltaRecordsAppended());
genericMetricGroup.gauge(
LAST_CHANGELOG_RECORDS_APPENDED,
() -> {
return latestCommit == null ? 0L : latestCommit.getChangelogRecordsAppended();
});
() -> latestCommit == null ? 0L : latestCommit.getChangelogRecordsAppended());
genericMetricGroup.gauge(
LAST_DELTA_RECORDS_COMMIT_COMPACTED,
() -> {
return latestCommit == null ? 0L : latestCommit.getDeltaRecordsCompacted();
});
() -> latestCommit == null ? 0L : latestCommit.getDeltaRecordsCompacted());
genericMetricGroup.gauge(
LAST_CHANGELOG_RECORDS_COMMIT_COMPACTED,
() -> {
return latestCommit == null ? 0L : latestCommit.getChangelogRecordsCompacted();
});
() -> latestCommit == null ? 0L : latestCommit.getChangelogRecordsCompacted());
totalTableFilesCounter = genericMetricGroup.counter(TOTAL_TABLE_FILES);
totalTableFilesCounter.inc(initTableFilesCount);
totalChangelogFilesCounter = genericMetricGroup.counter(TOTAL_CHANGELOG_FILES);
Expand All @@ -232,4 +233,24 @@ public void reportCommit(CommitStats commitStats) {
commitStats.getChangelogFilesAppended() + commitStats.getChangelogFilesCompacted());
durationHistogram.update(commitStats.getDuration());
}

@VisibleForTesting
long getInitTableFilesCount() {
return initTableFilesCount;
}

@VisibleForTesting
long getInitChangelogFilesCount() {
return initChangelogFilesCount;
}

@VisibleForTesting
FileStorePathFactory getPathFactory() {
return pathFactory;
}

@VisibleForTesting
FileIO getFileIO() {
return fileIO;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
/** Statistics for a commit. */
public class CommitStats {
private final long duration;
private final long attempts;
private final int attempts;
private final long tableFilesAdded;
private final long tableFilesAppended;
private final long tableFilesDeleted;
Expand All @@ -51,7 +51,6 @@ public class CommitStats {
private final long generatedSnapshots;
private final long numPartitionsWritten;
private final long numBucketsWritten;
private final Map<BinaryRow, Set<Integer>> bucketsWritten;

public CommitStats(
List<ManifestEntry> appendTableFiles,
Expand All @@ -60,17 +59,16 @@ public CommitStats(
List<ManifestEntry> compactChangelogFiles,
long commitDuration,
int generatedSnapshots,
long attempts) {
int attempts) {
List<ManifestEntry> addedTableFiles = new ArrayList<>(appendTableFiles);
addedTableFiles.addAll(
compactTableFiles.stream()
.filter(f -> FileKind.ADD.equals(f.kind()))
.collect(Collectors.toList()));
List<ManifestEntry> deletedTableFiles =
new ArrayList<>(
compactTableFiles.stream()
.filter(f -> FileKind.DELETE.equals(f.kind()))
.collect(Collectors.toList()));
compactTableFiles.stream()
.filter(f -> FileKind.DELETE.equals(f.kind()))
.collect(Collectors.toList());

this.tableFilesAdded = addedTableFiles.size();
this.tableFilesAppended = appendTableFiles.size();
Expand All @@ -80,7 +78,6 @@ public CommitStats(
this.changelogFilesCompacted = compactChangelogFiles.size();
this.numPartitionsWritten = numChangedPartitions(appendTableFiles, compactTableFiles);
this.numBucketsWritten = numChangedBuckets(appendTableFiles, compactTableFiles);
this.bucketsWritten = changedPartBuckets(appendTableFiles, compactTableFiles);
this.changelogRecordsCompacted = getRowCounts(compactChangelogFiles);
this.deltaRecordsCompacted = getRowCounts(compactTableFiles);
this.changelogRecordsAppended = getRowCounts(appendChangelogFiles);
Expand All @@ -96,8 +93,7 @@ protected static long numChangedPartitions(List<ManifestEntry>... changes) {
.flatMap(Collection::stream)
.map(ManifestEntry::partition)
.distinct()
.collect(Collectors.toList())
.size();
.count();
}

@VisibleForTesting
Expand Down Expand Up @@ -207,7 +203,7 @@ protected long getDuration() {
}

@VisibleForTesting
protected long getAttempts() {
protected int getAttempts() {
return attempts;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,7 @@ public FileStoreCommitImpl(
MemorySize manifestFullCompactionSize,
int manifestMergeMinCount,
boolean dynamicPartitionOverwrite,
@Nullable Comparator<InternalRow> keyComparator,
CommitMetrics commitMetrics) {
@Nullable Comparator<InternalRow> keyComparator) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
this.commitUser = commitUser;
Expand All @@ -153,7 +152,7 @@ public FileStoreCommitImpl(

this.lock = null;
this.ignoreEmptyCommit = true;
this.commitMetrics = commitMetrics;
this.commitMetrics = new CommitMetrics(pathFactory, fileIO);
}

@Override
Expand Down Expand Up @@ -199,7 +198,7 @@ public void commit(ManifestCommittable committable, Map<String, String> properti

long started = System.nanoTime();
int generatedSnapshot = 0;
long attempts = 0;
int attempts = 0;
Snapshot latestSnapshot = null;
Long safeLatestSnapshotId = null;
List<ManifestEntry> baseEntries = new ArrayList<>();
Expand Down Expand Up @@ -236,7 +235,7 @@ public void commit(ManifestCommittable committable, Map<String, String> properti
latestSnapshot, appendTableFiles, compactTableFiles));
try {
noConflictsOrFail(latestSnapshot.commitUser(), baseEntries, appendTableFiles);
} catch (RuntimeException e) {
} finally {
reportCommit(
Collections.emptyList(),
Collections.emptyList(),
Expand All @@ -245,7 +244,6 @@ public void commit(ManifestCommittable committable, Map<String, String> properti
0,
0,
1);
throw e;
}
safeLatestSnapshotId = latestSnapshot.id();
}
Expand Down Expand Up @@ -274,7 +272,7 @@ public void commit(ManifestCommittable committable, Map<String, String> properti
baseEntries.addAll(appendTableFiles);
try {
noConflictsOrFail(latestSnapshot.commitUser(), baseEntries, compactTableFiles);
} catch (RuntimeException e) {
} finally {
long commitDuration = (System.nanoTime() - started) / 1_000_000;
reportCommit(
appendTableFiles,
Expand All @@ -284,7 +282,6 @@ public void commit(ManifestCommittable committable, Map<String, String> properti
commitDuration,
generatedSnapshot,
attempts + 1);
throw e;
}
// assume this compact commit follows just after the append commit created above
safeLatestSnapshotId += 1;
Expand Down Expand Up @@ -320,7 +317,7 @@ private void reportCommit(
List<ManifestEntry> compactChangelogFiles,
long commitDuration,
int generatedSnapshots,
long attempts) {
int attempts) {
CommitStats commitStats =
new CommitStats(
appendTableFiles,
Expand Down Expand Up @@ -547,7 +544,7 @@ private ManifestEntry makeEntry(FileKind kind, CommitMessage commitMessage, Data
kind, commitMessage.partition(), commitMessage.bucket(), numBucket, file);
}

private long tryCommit(
private int tryCommit(
List<ManifestEntry> tableFiles,
List<ManifestEntry> changelogFiles,
List<IndexManifestEntry> indexFiles,
Expand All @@ -556,7 +553,7 @@ private long tryCommit(
Map<Integer, Long> logOffsets,
Snapshot.CommitKind commitKind,
Long safeLatestSnapshotId) {
long cnt = 0;
int cnt = 0;
while (true) {
Snapshot latestSnapshot = snapshotManager.latestSnapshot();
cnt++;
Expand Down
Loading