Skip to content

Commit

Permalink
[HUDI-7099] Providing metrics for archive and defining som string con…
Browse files Browse the repository at this point in the history
…stants
  • Loading branch information
majian1998 committed Nov 15, 2023
1 parent abd3afc commit 40eab84
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
import org.apache.hudi.table.marker.WriteMarkers;
import org.apache.hudi.table.marker.WriteMarkersFactory;

import com.codahale.metrics.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -74,6 +76,7 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
private final TransactionManager txnManager;

private final LSMTimelineWriter timelineWriter;
private final HoodieMetrics metrics;

public HoodieTimelineArchiver(HoodieWriteConfig config, HoodieTable<T, I, K, O> table) {
this.config = config;
Expand All @@ -84,6 +87,7 @@ public HoodieTimelineArchiver(HoodieWriteConfig config, HoodieTable<T, I, K, O>
Pair<Integer, Integer> minAndMaxInstants = getMinAndMaxInstantsToKeep(table, metaClient);
this.minInstantsToKeep = minAndMaxInstants.getLeft();
this.maxInstantsToKeep = minAndMaxInstants.getRight();
this.metrics = new HoodieMetrics(config);
}

public boolean archiveIfRequired(HoodieEngineContext context) throws IOException {
Expand All @@ -99,6 +103,7 @@ public boolean archiveIfRequired(HoodieEngineContext context, boolean acquireLoc
// there is no owner or instant time per se for archival.
txnManager.beginTransaction(Option.empty(), Option.empty());
}
final Timer.Context timerContext = metrics.getArchiveCtx();
// Sort again because the cleaning and rollback instants could break the sequence.
List<ActiveAction> instantsToArchive = getInstantsToArchive().sorted().collect(Collectors.toList());
boolean success = true;
Expand All @@ -117,6 +122,10 @@ public boolean archiveIfRequired(HoodieEngineContext context, boolean acquireLoc
} else {
LOG.info("No Instants to archive");
}
if (success && timerContext != null) {
long durationMs = metrics.getDurationInMs(timerContext.stop());
this.metrics.updateArchiveMetrics(durationMs, instantsToArchive.size());
}
return success;
} finally {
if (acquireLock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,20 @@ public class HoodieMetrics {
public static final String TOTAL_RECORDS_DELETED = "totalRecordsDeleted";
public static final String TOTAL_CORRUPTED_LOG_BLOCKS_STR = "totalCorruptedLogBlocks";
public static final String TOTAL_ROLLBACK_LOG_BLOCKS_STR = "totalRollbackLogBlocks";

public static final String DURATION_STR = "duration";
public static final String DELETE_FILES_NUM_STR = "numFilesDeleted";

public static final String TIMER_ACTION = "timer";
public static final String COUNTER_ACTION = "counter";
public static final String ARCHIVE_ACTION = "archive";
public static final String FINALIZE_ACTION = "finalize";
public static final String INDEX_ACTION = "index";

private Metrics metrics;
// Some timers
public String rollbackTimerName = null;
public String cleanTimerName = null;
public String archiveTimerName = null;
public String commitTimerName = null;
public String logCompactionTimerName = null;
public String deltaCommitTimerName = null;
Expand All @@ -74,6 +83,7 @@ public class HoodieMetrics {
private String tableName;
private Timer rollbackTimer = null;
private Timer cleanTimer = null;
private Timer archiveTimer = null;
private Timer commitTimer = null;
private Timer deltaCommitTimer = null;
private Timer finalizeTimer = null;
Expand All @@ -92,20 +102,21 @@ public HoodieMetrics(HoodieWriteConfig config) {
this.tableName = config.getTableName();
if (config.isMetricsOn()) {
metrics = Metrics.getInstance(config);
this.rollbackTimerName = getMetricsName("timer", HoodieTimeline.ROLLBACK_ACTION);
this.cleanTimerName = getMetricsName("timer", HoodieTimeline.CLEAN_ACTION);
this.commitTimerName = getMetricsName("timer", HoodieTimeline.COMMIT_ACTION);
this.deltaCommitTimerName = getMetricsName("timer", HoodieTimeline.DELTA_COMMIT_ACTION);
this.replaceCommitTimerName = getMetricsName("timer", HoodieTimeline.REPLACE_COMMIT_ACTION);
this.finalizeTimerName = getMetricsName("timer", "finalize");
this.compactionTimerName = getMetricsName("timer", HoodieTimeline.COMPACTION_ACTION);
this.logCompactionTimerName = getMetricsName("timer", HoodieTimeline.LOG_COMPACTION_ACTION);
this.indexTimerName = getMetricsName("timer", "index");
this.conflictResolutionTimerName = getMetricsName("timer", "conflict_resolution");
this.conflictResolutionSuccessCounterName = getMetricsName("counter", "conflict_resolution.success");
this.conflictResolutionFailureCounterName = getMetricsName("counter", "conflict_resolution.failure");
this.compactionRequestedCounterName = getMetricsName("counter", "compaction.requested");
this.compactionCompletedCounterName = getMetricsName("counter", "compaction.completed");
this.rollbackTimerName = getMetricsName(TIMER_ACTION, HoodieTimeline.ROLLBACK_ACTION);
this.cleanTimerName = getMetricsName(TIMER_ACTION, HoodieTimeline.CLEAN_ACTION);
this.archiveTimerName = getMetricsName(TIMER_ACTION, ARCHIVE_ACTION);
this.commitTimerName = getMetricsName(TIMER_ACTION, HoodieTimeline.COMMIT_ACTION);
this.deltaCommitTimerName = getMetricsName(TIMER_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION);
this.replaceCommitTimerName = getMetricsName(TIMER_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION);
this.finalizeTimerName = getMetricsName(TIMER_ACTION, FINALIZE_ACTION);
this.compactionTimerName = getMetricsName(TIMER_ACTION, HoodieTimeline.COMPACTION_ACTION);
this.logCompactionTimerName = getMetricsName(TIMER_ACTION, HoodieTimeline.LOG_COMPACTION_ACTION);
this.indexTimerName = getMetricsName(TIMER_ACTION, INDEX_ACTION);
this.conflictResolutionTimerName = getMetricsName(TIMER_ACTION, "conflict_resolution");
this.conflictResolutionSuccessCounterName = getMetricsName(COUNTER_ACTION, "conflict_resolution.success");
this.conflictResolutionFailureCounterName = getMetricsName(COUNTER_ACTION, "conflict_resolution.failure");
this.compactionRequestedCounterName = getMetricsName(COUNTER_ACTION, "compaction.requested");
this.compactionCompletedCounterName = getMetricsName(COUNTER_ACTION, "compaction.completed");
}
}

Expand Down Expand Up @@ -152,6 +163,13 @@ public Timer.Context getCleanCtx() {
return cleanTimer == null ? null : cleanTimer.time();
}

public Timer.Context getArchiveCtx() {
if (config.isMetricsOn() && archiveTimer == null) {
archiveTimer = createTimer(archiveTimerName);
}
return archiveTimer == null ? null : archiveTimer.time();
}

public Timer.Context getCommitCtx() {
if (config.isMetricsOn() && commitTimer == null) {
commitTimer = createTimer(commitTimerName);
Expand Down Expand Up @@ -262,41 +280,50 @@ private void updateCommitTimingMetrics(long commitEpochTimeInMs, long durationIn
metrics.registerGauge(getMetricsName(actionType, "commitFreshnessInMs"), commitFreshnessInMs);
}
metrics.registerGauge(getMetricsName(actionType, "commitTime"), commitEpochTimeInMs);
metrics.registerGauge(getMetricsName(actionType, "duration"), durationInMs);
metrics.registerGauge(getMetricsName(actionType, DURATION_STR), durationInMs);
}
}

public void updateRollbackMetrics(long durationInMs, long numFilesDeleted) {
if (config.isMetricsOn()) {
LOG.info(
String.format("Sending rollback metrics (duration=%d, numFilesDeleted=%d)", durationInMs, numFilesDeleted));
metrics.registerGauge(getMetricsName("rollback", "duration"), durationInMs);
metrics.registerGauge(getMetricsName("rollback", "numFilesDeleted"), numFilesDeleted);
metrics.registerGauge(getMetricsName(HoodieTimeline.ROLLBACK_ACTION, DURATION_STR), durationInMs);
metrics.registerGauge(getMetricsName(HoodieTimeline.ROLLBACK_ACTION, DELETE_FILES_NUM_STR), numFilesDeleted);
}
}

public void updateCleanMetrics(long durationInMs, int numFilesDeleted) {
if (config.isMetricsOn()) {
LOG.info(
String.format("Sending clean metrics (duration=%d, numFilesDeleted=%d)", durationInMs, numFilesDeleted));
metrics.registerGauge(getMetricsName("clean", "duration"), durationInMs);
metrics.registerGauge(getMetricsName("clean", "numFilesDeleted"), numFilesDeleted);
metrics.registerGauge(getMetricsName(HoodieTimeline.CLEAN_ACTION, DURATION_STR), durationInMs);
metrics.registerGauge(getMetricsName(HoodieTimeline.CLEAN_ACTION, DELETE_FILES_NUM_STR), numFilesDeleted);
}
}

public void updateArchiveMetrics(long durationInMs, int numFilesDeleted) {
if (config.isMetricsOn()) {
LOG.info(
String.format("Sending archive metrics (duration=%d, numFilesDeleted=%d)", durationInMs, numFilesDeleted));
metrics.registerGauge(getMetricsName(ARCHIVE_ACTION, DURATION_STR), durationInMs);
metrics.registerGauge(getMetricsName(ARCHIVE_ACTION, DELETE_FILES_NUM_STR), numFilesDeleted);
}
}

public void updateFinalizeWriteMetrics(long durationInMs, long numFilesFinalized) {
if (config.isMetricsOn()) {
LOG.info(String.format("Sending finalize write metrics (duration=%d, numFilesFinalized=%d)", durationInMs,
numFilesFinalized));
metrics.registerGauge(getMetricsName("finalize", "duration"), durationInMs);
metrics.registerGauge(getMetricsName("finalize", "numFilesFinalized"), numFilesFinalized);
metrics.registerGauge(getMetricsName(FINALIZE_ACTION, DURATION_STR), durationInMs);
metrics.registerGauge(getMetricsName(FINALIZE_ACTION, "numFilesFinalized"), numFilesFinalized);
}
}

public void updateIndexMetrics(final String action, final long durationInMs) {
if (config.isMetricsOn()) {
LOG.info(String.format("Sending index metrics (%s.duration, %d)", action, durationInMs));
metrics.registerGauge(getMetricsName("index", String.format("%s.duration", action)), durationInMs);
metrics.registerGauge(getMetricsName(INDEX_ACTION, String.format("%s.duration", action)), durationInMs);
}
}

Expand All @@ -306,7 +333,7 @@ public String getMetricsName(String action, String metric) {
}

public void updateClusteringFileCreationMetrics(long durationInMs) {
reportMetrics("replacecommit", "fileCreationTime", durationInMs);
reportMetrics(HoodieTimeline.REPLACE_COMMIT_ACTION, "fileCreationTime", durationInMs);
}

/**
Expand Down

0 comments on commit 40eab84

Please sign in to comment.