Skip to content

Commit

Permalink
[HUDI-7099] Providing metrics for archive and defining som string con…
Browse files Browse the repository at this point in the history
…stants
  • Loading branch information
majian1998 committed Nov 16, 2023
1 parent 2f97634 commit 1dddd78
Showing 1 changed file with 23 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,15 @@ public class HoodieMetrics {
public static final String TOTAL_ROLLBACK_LOG_BLOCKS_STR = "totalRollbackLogBlocks";
public static final String DURATION_STR = "duration";
public static final String DELETE_FILES_NUM_STR = "numFilesDeleted";
public static final String DELETE_INSTANTS_NUM_STR = "numInstantsArchived";
public static final String FINALIZED_FILES_NUM_STR = "numFilesFinalized";
public static final String CONFLICT_RESOLUTION_STR = "conflict_resolution";
public static final String COMMIT_LATENCY_STR = "commitLatencyInMs";
public static final String COMMIT_FRESHNESS_STR = "commitFreshnessInMs";
public static final String COMMIT_LATENCY_IN_MS_STR = "commitLatencyInMs";
public static final String COMMIT_FRESHNESS_IN_MS_STR = "commitFreshnessInMs";
public static final String COMMIT_TIME_STR = "commitTime";
public static final String SUCCESS_EXTENSION = ".success";
public static final String FAILURE_EXTENSION = ".failure";

public static final String TIMER_ACTION = "timer";
public static final String COUNTER_ACTION = "counter";
public static final String ARCHIVE_ACTION = "archive";
Expand Down Expand Up @@ -117,10 +121,10 @@ public HoodieMetrics(HoodieWriteConfig config) {
this.logCompactionTimerName = getMetricsName(TIMER_ACTION, HoodieTimeline.LOG_COMPACTION_ACTION);
this.indexTimerName = getMetricsName(TIMER_ACTION, INDEX_ACTION);
this.conflictResolutionTimerName = getMetricsName(TIMER_ACTION, CONFLICT_RESOLUTION_STR);
this.conflictResolutionSuccessCounterName = getMetricsName(COUNTER_ACTION, CONFLICT_RESOLUTION_STR + ".success");
this.conflictResolutionFailureCounterName = getMetricsName(COUNTER_ACTION, CONFLICT_RESOLUTION_STR + ".failure");
this.compactionRequestedCounterName = getMetricsName(COUNTER_ACTION, HoodieTimeline.COMPACTION_ACTION + ".requested");
this.compactionCompletedCounterName = getMetricsName(COUNTER_ACTION, HoodieTimeline.COMPACTION_ACTION + ".completed");
this.conflictResolutionSuccessCounterName = getMetricsName(COUNTER_ACTION, CONFLICT_RESOLUTION_STR + SUCCESS_EXTENSION);
this.conflictResolutionFailureCounterName = getMetricsName(COUNTER_ACTION, CONFLICT_RESOLUTION_STR + FAILURE_EXTENSION);
this.compactionRequestedCounterName = getMetricsName(COUNTER_ACTION, HoodieTimeline.COMPACTION_ACTION + HoodieTimeline.REQUESTED_EXTENSION);
this.compactionCompletedCounterName = getMetricsName(COUNTER_ACTION, HoodieTimeline.COMPACTION_ACTION + HoodieTimeline.COMPLETED_EXTENSION);
}
}

Expand Down Expand Up @@ -277,11 +281,11 @@ private void updateCommitTimingMetrics(long commitEpochTimeInMs, long durationIn
Pair<Option<Long>, Option<Long>> eventTimePairMinMax = metadata.getMinAndMaxEventTime();
if (eventTimePairMinMax.getLeft().isPresent()) {
long commitLatencyInMs = commitEpochTimeInMs + durationInMs - eventTimePairMinMax.getLeft().get();
metrics.registerGauge(getMetricsName(actionType, COMMIT_LATENCY_STR), commitLatencyInMs);
metrics.registerGauge(getMetricsName(actionType, COMMIT_LATENCY_IN_MS_STR), commitLatencyInMs);
}
if (eventTimePairMinMax.getRight().isPresent()) {
long commitFreshnessInMs = commitEpochTimeInMs + durationInMs - eventTimePairMinMax.getRight().get();
metrics.registerGauge(getMetricsName(actionType, COMMIT_FRESHNESS_STR), commitFreshnessInMs);
metrics.registerGauge(getMetricsName(actionType, COMMIT_FRESHNESS_IN_MS_STR), commitFreshnessInMs);
}
metrics.registerGauge(getMetricsName(actionType, COMMIT_TIME_STR), commitEpochTimeInMs);
metrics.registerGauge(getMetricsName(actionType, DURATION_STR), durationInMs);
Expand All @@ -291,7 +295,8 @@ private void updateCommitTimingMetrics(long commitEpochTimeInMs, long durationIn
public void updateRollbackMetrics(long durationInMs, long numFilesDeleted) {
if (config.isMetricsOn()) {
LOG.info(
String.format("Sending rollback metrics (duration=%d, numFilesDeleted=%d)", durationInMs, numFilesDeleted));
String.format("Sending rollback metrics (%s=%d, %s=%d)", DURATION_STR, durationInMs,
DELETE_FILES_NUM_STR, numFilesDeleted));
metrics.registerGauge(getMetricsName(HoodieTimeline.ROLLBACK_ACTION, DURATION_STR), durationInMs);
metrics.registerGauge(getMetricsName(HoodieTimeline.ROLLBACK_ACTION, DELETE_FILES_NUM_STR), numFilesDeleted);
}
Expand All @@ -300,33 +305,35 @@ public void updateRollbackMetrics(long durationInMs, long numFilesDeleted) {
public void updateCleanMetrics(long durationInMs, int numFilesDeleted) {
if (config.isMetricsOn()) {
LOG.info(
String.format("Sending clean metrics (duration=%d, numFilesDeleted=%d)", durationInMs, numFilesDeleted));
String.format("Sending clean metrics (%s=%d, %s=%d)", DURATION_STR, durationInMs,
DELETE_FILES_NUM_STR, numFilesDeleted));
metrics.registerGauge(getMetricsName(HoodieTimeline.CLEAN_ACTION, DURATION_STR), durationInMs);
metrics.registerGauge(getMetricsName(HoodieTimeline.CLEAN_ACTION, DELETE_FILES_NUM_STR), numFilesDeleted);
}
}

public void updateArchiveMetrics(long durationInMs, int numFilesDeleted) {
public void updateArchiveMetrics(long durationInMs, int numInstantsArchived) {
if (config.isMetricsOn()) {
LOG.info(
String.format("Sending archive metrics (duration=%d, numFilesDeleted=%d)", durationInMs, numFilesDeleted));
String.format("Sending archive metrics (%s=%d, %s=%d)", DURATION_STR, durationInMs,
DELETE_INSTANTS_NUM_STR, numInstantsArchived));
metrics.registerGauge(getMetricsName(ARCHIVE_ACTION, DURATION_STR), durationInMs);
metrics.registerGauge(getMetricsName(ARCHIVE_ACTION, DELETE_FILES_NUM_STR), numFilesDeleted);
metrics.registerGauge(getMetricsName(ARCHIVE_ACTION, DELETE_INSTANTS_NUM_STR), numInstantsArchived);
}
}

public void updateFinalizeWriteMetrics(long durationInMs, long numFilesFinalized) {
if (config.isMetricsOn()) {
LOG.info(String.format("Sending finalize write metrics (duration=%d, numFilesFinalized=%d)", durationInMs,
numFilesFinalized));
LOG.info(String.format("Sending finalize write metrics (%s=%d, %s=%d)", DURATION_STR, durationInMs,
FINALIZED_FILES_NUM_STR, numFilesFinalized));
metrics.registerGauge(getMetricsName(FINALIZE_ACTION, DURATION_STR), durationInMs);
metrics.registerGauge(getMetricsName(FINALIZE_ACTION, FINALIZED_FILES_NUM_STR), numFilesFinalized);
}
}

public void updateIndexMetrics(final String action, final long durationInMs) {
if (config.isMetricsOn()) {
LOG.info(String.format("Sending index metrics (%s.duration, %d)", action, durationInMs));
LOG.info(String.format("Sending index metrics (%s.%s, %d)", action, DURATION_STR, durationInMs));
metrics.registerGauge(getMetricsName(INDEX_ACTION, String.format("%s.%s", action, DURATION_STR)), durationInMs);
}
}
Expand Down

0 comments on commit 1dddd78

Please sign in to comment.