Skip to content

Commit

Permalink
[HUDI-7099] Providing metrics for archive and defining some string co…
Browse files Browse the repository at this point in the history
…nstants (apache#10101)
  • Loading branch information
majian1998 authored Nov 20, 2023
1 parent 3225625 commit 979132b
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -783,9 +783,14 @@ protected void archive(HoodieTable table) {
return;
}
try {
final Timer.Context timerContext = metrics.getArchiveCtx();
// We cannot have unbounded commit files. Archive commits if we have to archive
HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(config, table);
archiver.archiveIfRequired(context, true);
int instantsToArchive = archiver.archiveIfRequired(context, true);
if (timerContext != null) {
long durationMs = metrics.getDurationInMs(timerContext.stop());
this.metrics.updateArchiveMetrics(durationMs, instantsToArchive);
}
} catch (IOException ioe) {
throw new HoodieIOException("Failed to archive", ioe);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
import org.apache.hudi.table.marker.WriteMarkers;
Expand Down Expand Up @@ -74,6 +75,7 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
private final TransactionManager txnManager;

private final LSMTimelineWriter timelineWriter;
private final HoodieMetrics metrics;

public HoodieTimelineArchiver(HoodieWriteConfig config, HoodieTable<T, I, K, O> table) {
this.config = config;
Expand All @@ -84,24 +86,24 @@ public HoodieTimelineArchiver(HoodieWriteConfig config, HoodieTable<T, I, K, O>
Pair<Integer, Integer> minAndMaxInstants = getMinAndMaxInstantsToKeep(table, metaClient);
this.minInstantsToKeep = minAndMaxInstants.getLeft();
this.maxInstantsToKeep = minAndMaxInstants.getRight();
this.metrics = new HoodieMetrics(config);
}

public boolean archiveIfRequired(HoodieEngineContext context) throws IOException {
public int archiveIfRequired(HoodieEngineContext context) throws IOException {
return archiveIfRequired(context, false);
}

/**
* Check if commits need to be archived. If yes, archive commits.
*/
public boolean archiveIfRequired(HoodieEngineContext context, boolean acquireLock) throws IOException {
public int archiveIfRequired(HoodieEngineContext context, boolean acquireLock) throws IOException {
try {
if (acquireLock) {
// there is no owner or instant time per se for archival.
txnManager.beginTransaction(Option.empty(), Option.empty());
}
// Sort again because the cleaning and rollback instants could break the sequence.
List<ActiveAction> instantsToArchive = getInstantsToArchive().sorted().collect(Collectors.toList());
boolean success = true;
if (!instantsToArchive.isEmpty()) {
LOG.info("Archiving instants " + instantsToArchive);
Consumer<Exception> exceptionHandler = e -> {
Expand All @@ -111,13 +113,13 @@ public boolean archiveIfRequired(HoodieEngineContext context, boolean acquireLoc
};
this.timelineWriter.write(instantsToArchive, Option.of(action -> deleteAnyLeftOverMarkers(context, action)), Option.of(exceptionHandler));
LOG.info("Deleting archived instants " + instantsToArchive);
success = deleteArchivedInstants(instantsToArchive, context);
deleteArchivedInstants(instantsToArchive, context);
// triggers compaction and cleaning only after archiving action
this.timelineWriter.compactAndClean(context);
} else {
LOG.info("No Instants to archive");
}
return success;
return instantsToArchive.size();
} finally {
if (acquireLock) {
txnManager.endTransaction(Option.empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,28 @@ public class HoodieMetrics {
public static final String TOTAL_RECORDS_DELETED = "totalRecordsDeleted";
public static final String TOTAL_CORRUPTED_LOG_BLOCKS_STR = "totalCorruptedLogBlocks";
public static final String TOTAL_ROLLBACK_LOG_BLOCKS_STR = "totalRollbackLogBlocks";

public static final String DURATION_STR = "duration";
public static final String DELETE_FILES_NUM_STR = "numFilesDeleted";
public static final String DELETE_INSTANTS_NUM_STR = "numInstantsArchived";
public static final String FINALIZED_FILES_NUM_STR = "numFilesFinalized";
public static final String CONFLICT_RESOLUTION_STR = "conflict_resolution";
public static final String COMMIT_LATENCY_IN_MS_STR = "commitLatencyInMs";
public static final String COMMIT_FRESHNESS_IN_MS_STR = "commitFreshnessInMs";
public static final String COMMIT_TIME_STR = "commitTime";
public static final String SUCCESS_EXTENSION = ".success";
public static final String FAILURE_EXTENSION = ".failure";

public static final String TIMER_ACTION = "timer";
public static final String COUNTER_ACTION = "counter";
public static final String ARCHIVE_ACTION = "archive";
public static final String FINALIZE_ACTION = "finalize";
public static final String INDEX_ACTION = "index";

private Metrics metrics;
// Some timers
public String rollbackTimerName = null;
public String cleanTimerName = null;
public String archiveTimerName = null;
public String commitTimerName = null;
public String logCompactionTimerName = null;
public String deltaCommitTimerName = null;
Expand All @@ -74,6 +91,7 @@ public class HoodieMetrics {
private String tableName;
private Timer rollbackTimer = null;
private Timer cleanTimer = null;
private Timer archiveTimer = null;
private Timer commitTimer = null;
private Timer deltaCommitTimer = null;
private Timer finalizeTimer = null;
Expand All @@ -92,20 +110,21 @@ public HoodieMetrics(HoodieWriteConfig config) {
this.tableName = config.getTableName();
if (config.isMetricsOn()) {
metrics = Metrics.getInstance(config);
this.rollbackTimerName = getMetricsName("timer", HoodieTimeline.ROLLBACK_ACTION);
this.cleanTimerName = getMetricsName("timer", HoodieTimeline.CLEAN_ACTION);
this.commitTimerName = getMetricsName("timer", HoodieTimeline.COMMIT_ACTION);
this.deltaCommitTimerName = getMetricsName("timer", HoodieTimeline.DELTA_COMMIT_ACTION);
this.replaceCommitTimerName = getMetricsName("timer", HoodieTimeline.REPLACE_COMMIT_ACTION);
this.finalizeTimerName = getMetricsName("timer", "finalize");
this.compactionTimerName = getMetricsName("timer", HoodieTimeline.COMPACTION_ACTION);
this.logCompactionTimerName = getMetricsName("timer", HoodieTimeline.LOG_COMPACTION_ACTION);
this.indexTimerName = getMetricsName("timer", "index");
this.conflictResolutionTimerName = getMetricsName("timer", "conflict_resolution");
this.conflictResolutionSuccessCounterName = getMetricsName("counter", "conflict_resolution.success");
this.conflictResolutionFailureCounterName = getMetricsName("counter", "conflict_resolution.failure");
this.compactionRequestedCounterName = getMetricsName("counter", "compaction.requested");
this.compactionCompletedCounterName = getMetricsName("counter", "compaction.completed");
this.rollbackTimerName = getMetricsName(TIMER_ACTION, HoodieTimeline.ROLLBACK_ACTION);
this.cleanTimerName = getMetricsName(TIMER_ACTION, HoodieTimeline.CLEAN_ACTION);
this.archiveTimerName = getMetricsName(TIMER_ACTION, ARCHIVE_ACTION);
this.commitTimerName = getMetricsName(TIMER_ACTION, HoodieTimeline.COMMIT_ACTION);
this.deltaCommitTimerName = getMetricsName(TIMER_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION);
this.replaceCommitTimerName = getMetricsName(TIMER_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION);
this.finalizeTimerName = getMetricsName(TIMER_ACTION, FINALIZE_ACTION);
this.compactionTimerName = getMetricsName(TIMER_ACTION, HoodieTimeline.COMPACTION_ACTION);
this.logCompactionTimerName = getMetricsName(TIMER_ACTION, HoodieTimeline.LOG_COMPACTION_ACTION);
this.indexTimerName = getMetricsName(TIMER_ACTION, INDEX_ACTION);
this.conflictResolutionTimerName = getMetricsName(TIMER_ACTION, CONFLICT_RESOLUTION_STR);
this.conflictResolutionSuccessCounterName = getMetricsName(COUNTER_ACTION, CONFLICT_RESOLUTION_STR + SUCCESS_EXTENSION);
this.conflictResolutionFailureCounterName = getMetricsName(COUNTER_ACTION, CONFLICT_RESOLUTION_STR + FAILURE_EXTENSION);
this.compactionRequestedCounterName = getMetricsName(COUNTER_ACTION, HoodieTimeline.COMPACTION_ACTION + HoodieTimeline.REQUESTED_EXTENSION);
this.compactionCompletedCounterName = getMetricsName(COUNTER_ACTION, HoodieTimeline.COMPACTION_ACTION + HoodieTimeline.COMPLETED_EXTENSION);
}
}

Expand Down Expand Up @@ -152,6 +171,13 @@ public Timer.Context getCleanCtx() {
return cleanTimer == null ? null : cleanTimer.time();
}

public Timer.Context getArchiveCtx() {
if (config.isMetricsOn() && archiveTimer == null) {
archiveTimer = createTimer(archiveTimerName);
}
return archiveTimer == null ? null : archiveTimer.time();
}

public Timer.Context getCommitCtx() {
if (config.isMetricsOn() && commitTimer == null) {
commitTimer = createTimer(commitTimerName);
Expand Down Expand Up @@ -255,48 +281,60 @@ private void updateCommitTimingMetrics(long commitEpochTimeInMs, long durationIn
Pair<Option<Long>, Option<Long>> eventTimePairMinMax = metadata.getMinAndMaxEventTime();
if (eventTimePairMinMax.getLeft().isPresent()) {
long commitLatencyInMs = commitEpochTimeInMs + durationInMs - eventTimePairMinMax.getLeft().get();
metrics.registerGauge(getMetricsName(actionType, "commitLatencyInMs"), commitLatencyInMs);
metrics.registerGauge(getMetricsName(actionType, COMMIT_LATENCY_IN_MS_STR), commitLatencyInMs);
}
if (eventTimePairMinMax.getRight().isPresent()) {
long commitFreshnessInMs = commitEpochTimeInMs + durationInMs - eventTimePairMinMax.getRight().get();
metrics.registerGauge(getMetricsName(actionType, "commitFreshnessInMs"), commitFreshnessInMs);
metrics.registerGauge(getMetricsName(actionType, COMMIT_FRESHNESS_IN_MS_STR), commitFreshnessInMs);
}
metrics.registerGauge(getMetricsName(actionType, "commitTime"), commitEpochTimeInMs);
metrics.registerGauge(getMetricsName(actionType, "duration"), durationInMs);
metrics.registerGauge(getMetricsName(actionType, COMMIT_TIME_STR), commitEpochTimeInMs);
metrics.registerGauge(getMetricsName(actionType, DURATION_STR), durationInMs);
}
}

public void updateRollbackMetrics(long durationInMs, long numFilesDeleted) {
if (config.isMetricsOn()) {
LOG.info(
String.format("Sending rollback metrics (duration=%d, numFilesDeleted=%d)", durationInMs, numFilesDeleted));
metrics.registerGauge(getMetricsName("rollback", "duration"), durationInMs);
metrics.registerGauge(getMetricsName("rollback", "numFilesDeleted"), numFilesDeleted);
String.format("Sending rollback metrics (%s=%d, %s=%d)", DURATION_STR, durationInMs,
DELETE_FILES_NUM_STR, numFilesDeleted));
metrics.registerGauge(getMetricsName(HoodieTimeline.ROLLBACK_ACTION, DURATION_STR), durationInMs);
metrics.registerGauge(getMetricsName(HoodieTimeline.ROLLBACK_ACTION, DELETE_FILES_NUM_STR), numFilesDeleted);
}
}

public void updateCleanMetrics(long durationInMs, int numFilesDeleted) {
if (config.isMetricsOn()) {
LOG.info(
String.format("Sending clean metrics (duration=%d, numFilesDeleted=%d)", durationInMs, numFilesDeleted));
metrics.registerGauge(getMetricsName("clean", "duration"), durationInMs);
metrics.registerGauge(getMetricsName("clean", "numFilesDeleted"), numFilesDeleted);
String.format("Sending clean metrics (%s=%d, %s=%d)", DURATION_STR, durationInMs,
DELETE_FILES_NUM_STR, numFilesDeleted));
metrics.registerGauge(getMetricsName(HoodieTimeline.CLEAN_ACTION, DURATION_STR), durationInMs);
metrics.registerGauge(getMetricsName(HoodieTimeline.CLEAN_ACTION, DELETE_FILES_NUM_STR), numFilesDeleted);
}
}

public void updateArchiveMetrics(long durationInMs, int numInstantsArchived) {
if (config.isMetricsOn()) {
LOG.info(
String.format("Sending archive metrics (%s=%d, %s=%d)", DURATION_STR, durationInMs,
DELETE_INSTANTS_NUM_STR, numInstantsArchived));
metrics.registerGauge(getMetricsName(ARCHIVE_ACTION, DURATION_STR), durationInMs);
metrics.registerGauge(getMetricsName(ARCHIVE_ACTION, DELETE_INSTANTS_NUM_STR), numInstantsArchived);
}
}

public void updateFinalizeWriteMetrics(long durationInMs, long numFilesFinalized) {
if (config.isMetricsOn()) {
LOG.info(String.format("Sending finalize write metrics (duration=%d, numFilesFinalized=%d)", durationInMs,
numFilesFinalized));
metrics.registerGauge(getMetricsName("finalize", "duration"), durationInMs);
metrics.registerGauge(getMetricsName("finalize", "numFilesFinalized"), numFilesFinalized);
LOG.info(String.format("Sending finalize write metrics (%s=%d, %s=%d)", DURATION_STR, durationInMs,
FINALIZED_FILES_NUM_STR, numFilesFinalized));
metrics.registerGauge(getMetricsName(FINALIZE_ACTION, DURATION_STR), durationInMs);
metrics.registerGauge(getMetricsName(FINALIZE_ACTION, FINALIZED_FILES_NUM_STR), numFilesFinalized);
}
}

public void updateIndexMetrics(final String action, final long durationInMs) {
if (config.isMetricsOn()) {
LOG.info(String.format("Sending index metrics (%s.duration, %d)", action, durationInMs));
metrics.registerGauge(getMetricsName("index", String.format("%s.duration", action)), durationInMs);
LOG.info(String.format("Sending index metrics (%s.%s, %d)", action, DURATION_STR, durationInMs));
metrics.registerGauge(getMetricsName(INDEX_ACTION, String.format("%s.%s", action, DURATION_STR)), durationInMs);
}
}

Expand All @@ -306,7 +344,7 @@ public String getMetricsName(String action, String metric) {
}

public void updateClusteringFileCreationMetrics(long durationInMs) {
reportMetrics("replacecommit", "fileCreationTime", durationInMs);
reportMetrics(HoodieTimeline.REPLACE_COMMIT_ACTION, "fileCreationTime", durationInMs);
}

/**
Expand Down
Loading

0 comments on commit 979132b

Please sign in to comment.