Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Support commit metrics #1638

Merged
merged 13 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[Core] Only create commit metrics from TableCommitImpl and won't clos…
…e metrics group from other commit objs
  • Loading branch information
schnappi17 committed Oct 19, 2023
commit 068a09986087ae98b58bbb711b2651eeedc32ed9
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.paimon.metrics.DescriptiveStatisticsHistogram;
import org.apache.paimon.metrics.Histogram;
import org.apache.paimon.metrics.groups.GenericMetricGroup;
import org.apache.paimon.utils.FileStorePathFactory;

/** Metrics to measure a commit. */
public class CommitMetrics {
Expand All @@ -32,10 +31,8 @@ public class CommitMetrics {

private final AbstractMetricGroup genericMetricGroup;

public CommitMetrics(FileStorePathFactory pathFactory) {
this.genericMetricGroup =
GenericMetricGroup.createGenericMetricGroup(
pathFactory.root().getName(), GROUP_NAME);
public CommitMetrics(String table) {
this.genericMetricGroup = GenericMetricGroup.createGenericMetricGroup(table, GROUP_NAME);
registerGenericCommitMetrics();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.Snapshot;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.metrics.commit.CommitMetrics;
import org.apache.paimon.table.sink.CommitMessage;

import java.util.List;
Expand Down Expand Up @@ -79,6 +80,6 @@ void overwrite(
/** Abort an unsuccessful commit. The data files will be deleted. */
void abort(List<CommitMessage> commitMessages);

/** Close the commit. */
void close();
/** With metrics to measure commits. */
FileStoreCommit withMetrics(CommitMetrics metrics);
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
@Nullable private Lock lock;
private boolean ignoreEmptyCommit;

private final CommitMetrics commitMetrics;
private CommitMetrics commitMetrics;

public FileStoreCommitImpl(
FileIO fileIO,
Expand Down Expand Up @@ -153,7 +153,7 @@ public FileStoreCommitImpl(

this.lock = null;
this.ignoreEmptyCommit = true;
this.commitMetrics = new CommitMetrics(pathFactory);
this.commitMetrics = null;
}

@Override
Expand Down Expand Up @@ -281,14 +281,16 @@ public void commit(ManifestCommittable committable, Map<String, String> properti
}
} finally {
long commitDuration = (System.nanoTime() - started) / 1_000_000;
reportCommit(
appendTableFiles,
appendChangelog,
compactTableFiles,
compactChangelog,
commitDuration,
generatedSnapshot,
attempts);
if (this.commitMetrics != null) {
reportCommit(
appendTableFiles,
appendChangelog,
compactTableFiles,
compactChangelog,
commitDuration,
generatedSnapshot,
attempts);
}
}
}

Expand Down Expand Up @@ -476,8 +478,9 @@ public void abort(List<CommitMessage> commitMessages) {
}

@Override
public void close() {
commitMetrics.getMetricGroup().close();
public FileStoreCommit withMetrics(CommitMetrics metrics) {
this.commitMetrics = metrics;
return this;
}

private void collectChanges(
Expand Down Expand Up @@ -1002,10 +1005,6 @@ private void cleanUpTmpManifests(
}
}

public CommitMetrics getCommitMetrics() {
return commitMetrics;
}

private static class LevelIdentifier {

private final BinaryRow partition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ private void doExpire(LocalDateTime expireDateTime, long commitIdentifier) {
if (expired.size() > 0) {
commit.dropPartitions(expired, commitIdentifier);
}
commit.close();
}

private Map<String, String> toPartitionString(Object[] array) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,8 @@ public TableCommitImpl newCommit(String commitUser) {
catalogEnvironment.lockFactory().create(),
CoreOptions.fromMap(options()).consumerExpireTime(),
new ConsumerManager(fileIO, path),
coreOptions().snapshotExpireExecutionMode());
coreOptions().snapshotExpireExecutionMode(),
name());
}

private List<CommitCallback> createCommitCallbacks() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.metrics.commit.CommitMetrics;
import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.operation.FileStoreExpire;
import org.apache.paimon.operation.Lock;
Expand Down Expand Up @@ -75,6 +76,7 @@ public class TableCommitImpl implements InnerTableCommit {

private ExecutorService expireMainExecutor;
private AtomicReference<Throwable> expireError;
private final CommitMetrics commitMetrics;

public TableCommitImpl(
FileStoreCommit commit,
Expand All @@ -85,8 +87,10 @@ public TableCommitImpl(
Lock lock,
@Nullable Duration consumerExpireTime,
ConsumerManager consumerManager,
ExpireExecutionMode expireExecutionMode) {
commit.withLock(lock);
ExpireExecutionMode expireExecutionMode,
String table) {
schnappi17 marked this conversation as resolved.
Show resolved Hide resolved
this.commitMetrics = new CommitMetrics(table);
commit.withLock(lock).withMetrics(commitMetrics);
if (expire != null) {
expire.withLock(lock);
}
Expand Down Expand Up @@ -261,7 +265,7 @@ public void close() throws Exception {
}
IOUtils.closeQuietly(lock);
expireMainExecutor.shutdownNow();
commit.close();
commitMetrics.getMetricGroup().close();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,6 @@ public Snapshot dropPartitions(List<Map<String, String>> partitions) {
snapshotIdBeforeCommit = Snapshot.FIRST_SNAPSHOT_ID - 1;
}
commit.dropPartitions(partitions, Long.MAX_VALUE);
commit.close();

Long snapshotIdAfterCommit = snapshotManager.latestSnapshotId();
assertThat(snapshotIdAfterCommit).isNotNull();
Expand Down Expand Up @@ -300,7 +299,6 @@ public List<Snapshot> commitDataImpl(
snapshotIdBeforeCommit = Snapshot.FIRST_SNAPSHOT_ID - 1;
}
commitFunction.accept(commit, committable);
commit.close();

Long snapshotIdAfterCommit = snapshotManager.latestSnapshotId();
if (snapshotIdAfterCommit == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,16 @@

package org.apache.paimon.metrics.commit;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.metrics.Gauge;
import org.apache.paimon.metrics.Histogram;
import org.apache.paimon.metrics.Metric;
import org.apache.paimon.metrics.MetricGroup;
import org.apache.paimon.metrics.Metrics;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -45,7 +39,6 @@

/** Tests for {@link CommitMetrics}. */
public class CommitMetricsTest {
@TempDir static java.nio.file.Path tempDir;
private static final String TABLE_NAME = "myTable";

private CommitMetrics commitMetrics;
Expand Down Expand Up @@ -278,13 +271,6 @@ private void reportAgain(CommitMetrics commitMetrics) {
}

private CommitMetrics getCommitMetrics() {
Path path = new Path(tempDir.toString(), TABLE_NAME);
FileStorePathFactory pathFactory =
new FileStorePathFactory(
path,
RowType.of(new IntType()),
"default",
CoreOptions.FILE_FORMAT.defaultValue().toString());
return new CommitMetrics(pathFactory);
return new CommitMetrics(TABLE_NAME);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ public void testMultiPartitions() throws Exception {
partitionSpec.put("hr", "8");
commit.overwrite(
partitionSpec, new ManifestCommittable(commitIdentifier++), Collections.emptyMap());
commit.close();
// step 4: generate snapshot 4 by cleaning dt=0402/hr=12/bucket-0
BinaryRow partition = partitions.get(7);
cleanBucket(store, partition, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ public static void commitData(
}

commit.commit(committable, Collections.emptyMap());
commit.close();

writers.values().stream()
.flatMap(m -> m.values().stream())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ public void run() {
throw new RuntimeException(e);
}
}
commit.close();
}

private void doCommit() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,5 @@ public DropPartitionAction(
@Override
public void run() throws Exception {
commit.dropPartitions(partitions, BatchWriteBuilder.COMMIT_IDENTIFIER);
commit.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public String[] call(
(FileStoreTable) catalog.getTable(Identifier.fromString(tableId));
FileStoreCommit commit = fileStoreTable.store().newCommit(UUID.randomUUID().toString());
commit.dropPartitions(getPartitions(partitionStrings), BatchWriteBuilder.COMMIT_IDENTIFIER);
commit.close();

return new String[] {"Success"};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,20 +170,16 @@ public boolean applyDeleteFilters(List<ResolvedExpression> list) {
public Optional<Long> executeDeletion() {
FileStoreCommit commit =
((AbstractFileStoreTable) table).store().newCommit(UUID.randomUUID().toString());
try {
long identifier = BatchWriteBuilder.COMMIT_IDENTIFIER;
if (deletePredicate == null) {
commit.purgeTable(identifier);
return Optional.empty();
} else if (deleteIsDropPartition()) {
commit.dropPartitions(Collections.singletonList(deletePartitions()), identifier);
return Optional.empty();
} else {
return Optional.of(
TableUtils.deleteWhere(table, Collections.singletonList(deletePredicate)));
}
} finally {
commit.close();
long identifier = BatchWriteBuilder.COMMIT_IDENTIFIER;
if (deletePredicate == null) {
commit.purgeTable(identifier);
return Optional.empty();
} else if (deleteIsDropPartition()) {
commit.dropPartitions(Collections.singletonList(deletePartitions()), identifier);
return Optional.empty();
} else {
return Optional.of(
TableUtils.deleteWhere(table, Collections.singletonList(deletePredicate)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ trait PaimonPartitionManagement extends SupportsPartitionManagement {
commit.dropPartitions(
Collections.singletonList(partitionMap),
BatchWriteBuilder.COMMIT_IDENTIFIER)
commit.close();
case _ =>
throw new UnsupportedOperationException(
"Only AbstractFileStoreTable supports drop partitions.")
Expand Down