From bfdaeabb85ec4d91a816f50a44927e2e17cb1b5b Mon Sep 17 00:00:00 2001 From: pvary Date: Tue, 9 Jun 2020 09:58:35 +0200 Subject: [PATCH] HIVE-23624: Add metastore metrics to show the compaction status (#1064) (Peter Vary reviewed by Laszlo Pinter) HIVE-23624: Add metastore metrics to show the compaction status (#1064) (Peter Vary reviewed by Laszlo Pinter) --- .../ql/txn/compactor/CompactorThread.java | 1 - .../hive/ql/txn/compactor/Initiator.java | 38 ++++- .../hive/ql/txn/compactor/TestInitiator.java | 148 ++++++++++++++++++ .../metastore/metrics/MetricsConstants.java | 1 + .../hadoop/hive/metastore/txn/TxnStore.java | 3 + 5 files changed, 189 insertions(+), 2 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java index fb23c2feb578..1b0af0e1855e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java @@ -23,7 +23,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.RawStore; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index 791329538002..5b2c9374ca22 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -39,6 +40,8 @@ import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.metrics.Metrics; +import org.apache.hadoop.hive.metastore.metrics.MetricsConstants; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils; import org.apache.hadoop.hive.metastore.txn.TxnStore; @@ -92,6 +95,7 @@ public void run() { long abortedTimeThreshold = HiveConf .getTimeVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD, TimeUnit.MILLISECONDS); + boolean metricsEnabled = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED); // Make sure we run through the loop once before checking to stop as this makes testing // much easier. The stop value is only for testing anyway and not used when called from @@ -109,9 +113,13 @@ public void run() { long compactionInterval = (prevStart < 0) ? prevStart : (startedAt - prevStart)/1000; prevStart = startedAt; - //todo: add method to only get current i.e. skip history - more efficient ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest()); + if (metricsEnabled) { + // Update compaction metrics based on showCompactions result + updateCompactionMetrics(currentCompactions); + } + Set potentials = txnHandler.findPotentialCompactions(abortedThreshold, abortedTimeThreshold, compactionInterval) .stream() @@ -487,4 +495,32 @@ private boolean isEligibleForCompaction(CompactionInfo ci, ShowCompactResponse c } return true; } + + @VisibleForTesting + protected static void updateCompactionMetrics(ShowCompactResponse showCompactResponse) { + Map lastElements = new HashMap<>(); + + // Get the last compaction for each db/table/partition + for(ShowCompactResponseElement element : showCompactResponse.getCompacts()) { + String key = element.getDbname() + "/" + element.getTablename() + + (element.getPartitionname() != null ? "/" + element.getPartitionname() : ""); + // If new key, add the element, if there is an existing one, change to the element if the element.id is greater than old.id + lastElements.compute(key, (k, old) -> (old == null) ? element : (element.getId() > old.getId() ? element : old)); + } + + // Get the current count for each state + Map counts = lastElements.values().stream() + .collect(Collectors.groupingBy(e -> e.getState(), Collectors.counting())); + + // Update metrics + for (int i = 0; i < TxnStore.COMPACTION_STATES.length; ++i) { + String key = MetricsConstants.COMPACTION_STATUS_PREFIX + TxnStore.COMPACTION_STATES[i]; + Long count = counts.get(TxnStore.COMPACTION_STATES[i]); + if (count != null) { + Metrics.getOrCreateGauge(key).set(count.intValue()); + } else { + Metrics.getOrCreateGauge(key).set(0); + } + } + } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java index 279de19744f0..f3ab5cea1b38 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java @@ -34,6 +34,10 @@ import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.metrics.Metrics; +import org.apache.hadoop.hive.metastore.metrics.MetricsConstants; +import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.junit.After; import org.junit.Assert; import org.junit.Test; @@ -50,6 +54,7 @@ * Tests for the compactor Initiator thread. */ public class TestInitiator extends CompactorTest { + private final String INITIATED_METRICS_KEY = MetricsConstants.COMPACTION_STATUS_PREFIX + TxnStore.INITIATED_RESPONSE; @Test public void nothing() throws Exception { @@ -817,6 +822,149 @@ public void processCompactionCandidatesInParallel() throws Exception { Assert.assertEquals(10, compacts.size()); } + @Test + public void testInitiatorMetricsEnabled() throws Exception { + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED, true); + Metrics.initialize(conf); + int originalValue = Metrics.getOrCreateGauge(INITIATED_METRICS_KEY).intValue(); + Table t = newTable("default", "ime", true); + List components = new ArrayList<>(); + + for (int i = 0; i < 10; i++) { + Partition p = newPartition(t, "part" + (i + 1)); + addBaseFile(t, p, 20L, 20); + addDeltaFile(t, p, 21L, 22L, 2); + addDeltaFile(t, p, 23L, 24L, 2); + + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default"); + comp.setTablename("ime"); + comp.setPartitionname("ds=part" + (i + 1)); + comp.setOperationType(DataOperationType.UPDATE); + components.add(comp); + } + burnThroughTransactions("default", "ime", 23); + long txnid = openTxn(); + + LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnid); + LockResponse res = txnHandler.lock(req); + Assert.assertEquals(LockState.ACQUIRED, res.getState()); + + long writeid = allocateWriteId("default", "ime", txnid); + Assert.assertEquals(24, writeid); + txnHandler.commitTxn(new CommitTxnRequest(txnid)); + + startInitiator(); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + Assert.assertEquals(10, compacts.size()); + + // The metrics will appear after the next Initiator run + startInitiator(); + + Assert.assertEquals(originalValue + 10, + Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_STATUS_PREFIX + TxnStore.INITIATED_RESPONSE).intValue()); + } + + @Test + public void testInitiatorMetricsDisabled() throws Exception { + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED, false); + Metrics.initialize(conf); + int originalValue = Metrics.getOrCreateGauge(INITIATED_METRICS_KEY).intValue(); + Table t = newTable("default", "imd", true); + List components = new ArrayList<>(); + + for (int i = 0; i < 10; i++) { + Partition p = newPartition(t, "part" + (i + 1)); + addBaseFile(t, p, 20L, 20); + addDeltaFile(t, p, 21L, 22L, 2); + addDeltaFile(t, p, 23L, 24L, 2); + + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default"); + comp.setTablename("imd"); + comp.setPartitionname("ds=part" + (i + 1)); + comp.setOperationType(DataOperationType.UPDATE); + components.add(comp); + } + burnThroughTransactions("default", "imd", 23); + long txnid = openTxn(); + + LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnid); + LockResponse res = txnHandler.lock(req); + Assert.assertEquals(LockState.ACQUIRED, res.getState()); + + long writeid = allocateWriteId("default", "imd", txnid); + Assert.assertEquals(24, writeid); + txnHandler.commitTxn(new CommitTxnRequest(txnid)); + + startInitiator(); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + Assert.assertEquals(10, compacts.size()); + + // The metrics will appear after the next Initiator run + startInitiator(); + + Assert.assertEquals(originalValue, + Metrics.getOrCreateGauge(INITIATED_METRICS_KEY).intValue()); + } + + @Test + public void testUpdateCompactionMetrics() { + Metrics.initialize(conf); + ShowCompactResponse scr = new ShowCompactResponse(); + List elements = new ArrayList<>(); + elements.add(generateElement(1,"db", "tb", null, CompactionType.MAJOR, TxnStore.FAILED_RESPONSE)); + // Check for overwrite + elements.add(generateElement(2,"db", "tb", null, CompactionType.MAJOR, TxnStore.INITIATED_RESPONSE)); + elements.add(generateElement(3,"db", "tb2", null, CompactionType.MINOR, TxnStore.INITIATED_RESPONSE)); + elements.add(generateElement(5,"db", "tb3", "p1", CompactionType.MINOR, TxnStore.ATTEMPTED_RESPONSE)); + // Check for overwrite where the order is different + elements.add(generateElement(4,"db", "tb3", "p1", CompactionType.MINOR, TxnStore.FAILED_RESPONSE)); + + elements.add(generateElement(6,"db1", "tb", null, CompactionType.MINOR, TxnStore.FAILED_RESPONSE)); + elements.add(generateElement(7,"db1", "tb2", null, CompactionType.MINOR, TxnStore.FAILED_RESPONSE)); + elements.add(generateElement(8,"db1", "tb3", null, CompactionType.MINOR, TxnStore.FAILED_RESPONSE)); + + elements.add(generateElement(9,"db2", "tb", null, CompactionType.MINOR, TxnStore.SUCCEEDED_RESPONSE)); + elements.add(generateElement(10,"db2", "tb2", null, CompactionType.MINOR, TxnStore.SUCCEEDED_RESPONSE)); + elements.add(generateElement(11,"db2", "tb3", null, CompactionType.MINOR, TxnStore.SUCCEEDED_RESPONSE)); + elements.add(generateElement(12,"db2", "tb4", null, CompactionType.MINOR, TxnStore.SUCCEEDED_RESPONSE)); + + elements.add(generateElement(13,"db3", "tb3", null, CompactionType.MINOR, TxnStore.WORKING_RESPONSE)); + elements.add(generateElement(14,"db3", "tb4", null, CompactionType.MINOR, TxnStore.WORKING_RESPONSE)); + elements.add(generateElement(15,"db3", "tb5", null, CompactionType.MINOR, TxnStore.WORKING_RESPONSE)); + elements.add(generateElement(16,"db3", "tb6", null, CompactionType.MINOR, TxnStore.WORKING_RESPONSE)); + elements.add(generateElement(17,"db3", "tb7", null, CompactionType.MINOR, TxnStore.WORKING_RESPONSE)); + + scr.setCompacts(elements); + Initiator.updateCompactionMetrics(scr); + + Assert.assertEquals(1, + Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_STATUS_PREFIX + TxnStore.ATTEMPTED_RESPONSE).intValue()); + Assert.assertEquals(2, + Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_STATUS_PREFIX + TxnStore.INITIATED_RESPONSE).intValue()); + Assert.assertEquals(3, + Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_STATUS_PREFIX + TxnStore.FAILED_RESPONSE).intValue()); + Assert.assertEquals(4, + Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_STATUS_PREFIX + TxnStore.SUCCEEDED_RESPONSE).intValue()); + Assert.assertEquals(5, + Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_STATUS_PREFIX + TxnStore.WORKING_RESPONSE).intValue()); + Assert.assertEquals(0, + Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_STATUS_PREFIX + TxnStore.CLEANING_RESPONSE).intValue()); + } + + private ShowCompactResponseElement generateElement(long id, String db, String table, String partition, + CompactionType type, String state) { + ShowCompactResponseElement element = new ShowCompactResponseElement(db, table, type, state); + element.setId(id); + element.setPartitionname(partition); + return element; + } + @Override boolean useHive130DeltaDirName() { return false; diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/MetricsConstants.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/MetricsConstants.java index 24c8c4cc3abd..7ae98fe9abee 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/MetricsConstants.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/MetricsConstants.java @@ -20,6 +20,7 @@ public class MetricsConstants { public static final String ACTIVE_CALLS = "active_calls_"; public static final String API_PREFIX = "api_"; + public static final String COMPACTION_STATUS_PREFIX = "compaction_num_"; public static final String TOTAL_API_CALLS = "total_api_calls"; diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index 3e441b559f15..0d5b66988c1b 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -56,6 +56,9 @@ enum MUTEX_KEY { String SUCCEEDED_RESPONSE = "succeeded"; String ATTEMPTED_RESPONSE = "attempted"; + String[] COMPACTION_STATES = new String[] {INITIATED_RESPONSE, WORKING_RESPONSE, CLEANING_RESPONSE, FAILED_RESPONSE, + SUCCEEDED_RESPONSE, ATTEMPTED_RESPONSE}; + int TIMED_OUT_TXN_ABORT_BATCH_SIZE = 50000; /**