Skip to content

Commit

Permalink
HIVE-23624: Add metastore metrics to show the compaction status (apac…
Browse files Browse the repository at this point in the history
…he#1064) (Peter Vary reviewed by Laszlo Pinter)

HIVE-23624: Add metastore metrics to show the compaction status (apache#1064)  (Peter Vary reviewed by Laszlo Pinter)
  • Loading branch information
pvary authored Jun 9, 2020
1 parent 2a3a42f commit bfdaeab
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.RawStore;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Partition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand All @@ -39,6 +40,8 @@
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.metrics.Metrics;
import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
Expand Down Expand Up @@ -92,6 +95,7 @@ public void run() {
long abortedTimeThreshold = HiveConf
.getTimeVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD,
TimeUnit.MILLISECONDS);
boolean metricsEnabled = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED);

// Make sure we run through the loop once before checking to stop as this makes testing
// much easier. The stop value is only for testing anyway and not used when called from
Expand All @@ -109,9 +113,13 @@ public void run() {
long compactionInterval = (prevStart < 0) ? prevStart : (startedAt - prevStart)/1000;
prevStart = startedAt;

//todo: add method to only get current i.e. skip history - more efficient
ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest());

if (metricsEnabled) {
// Update compaction metrics based on showCompactions result
updateCompactionMetrics(currentCompactions);
}

Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(abortedThreshold,
abortedTimeThreshold, compactionInterval)
.stream()
Expand Down Expand Up @@ -487,4 +495,32 @@ private boolean isEligibleForCompaction(CompactionInfo ci, ShowCompactResponse c
}
return true;
}

@VisibleForTesting
protected static void updateCompactionMetrics(ShowCompactResponse showCompactResponse) {
Map<String, ShowCompactResponseElement> lastElements = new HashMap<>();

// Get the last compaction for each db/table/partition
for(ShowCompactResponseElement element : showCompactResponse.getCompacts()) {
String key = element.getDbname() + "/" + element.getTablename() +
(element.getPartitionname() != null ? "/" + element.getPartitionname() : "");
// If new key, add the element, if there is an existing one, change to the element if the element.id is greater than old.id
lastElements.compute(key, (k, old) -> (old == null) ? element : (element.getId() > old.getId() ? element : old));
}

// Get the current count for each state
Map<String, Long> counts = lastElements.values().stream()
.collect(Collectors.groupingBy(e -> e.getState(), Collectors.counting()));

// Update metrics
for (int i = 0; i < TxnStore.COMPACTION_STATES.length; ++i) {
String key = MetricsConstants.COMPACTION_STATUS_PREFIX + TxnStore.COMPACTION_STATES[i];
Long count = counts.get(TxnStore.COMPACTION_STATES[i]);
if (count != null) {
Metrics.getOrCreateGauge(key).set(count.intValue());
} else {
Metrics.getOrCreateGauge(key).set(0);
}
}
}
}
148 changes: 148 additions & 0 deletions ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.metrics.Metrics;
import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
Expand All @@ -50,6 +54,7 @@
* Tests for the compactor Initiator thread.
*/
public class TestInitiator extends CompactorTest {
private final String INITIATED_METRICS_KEY = MetricsConstants.COMPACTION_STATUS_PREFIX + TxnStore.INITIATED_RESPONSE;

@Test
public void nothing() throws Exception {
Expand Down Expand Up @@ -817,6 +822,149 @@ public void processCompactionCandidatesInParallel() throws Exception {
Assert.assertEquals(10, compacts.size());
}

@Test
public void testInitiatorMetricsEnabled() throws Exception {
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED, true);
Metrics.initialize(conf);
int originalValue = Metrics.getOrCreateGauge(INITIATED_METRICS_KEY).intValue();
Table t = newTable("default", "ime", true);
List<LockComponent> components = new ArrayList<>();

for (int i = 0; i < 10; i++) {
Partition p = newPartition(t, "part" + (i + 1));
addBaseFile(t, p, 20L, 20);
addDeltaFile(t, p, 21L, 22L, 2);
addDeltaFile(t, p, 23L, 24L, 2);

LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default");
comp.setTablename("ime");
comp.setPartitionname("ds=part" + (i + 1));
comp.setOperationType(DataOperationType.UPDATE);
components.add(comp);
}
burnThroughTransactions("default", "ime", 23);
long txnid = openTxn();

LockRequest req = new LockRequest(components, "me", "localhost");
req.setTxnid(txnid);
LockResponse res = txnHandler.lock(req);
Assert.assertEquals(LockState.ACQUIRED, res.getState());

long writeid = allocateWriteId("default", "ime", txnid);
Assert.assertEquals(24, writeid);
txnHandler.commitTxn(new CommitTxnRequest(txnid));

startInitiator();

ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
Assert.assertEquals(10, compacts.size());

// The metrics will appear after the next Initiator run
startInitiator();

Assert.assertEquals(originalValue + 10,
Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_STATUS_PREFIX + TxnStore.INITIATED_RESPONSE).intValue());
}

@Test
public void testInitiatorMetricsDisabled() throws Exception {
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED, false);
Metrics.initialize(conf);
int originalValue = Metrics.getOrCreateGauge(INITIATED_METRICS_KEY).intValue();
Table t = newTable("default", "imd", true);
List<LockComponent> components = new ArrayList<>();

for (int i = 0; i < 10; i++) {
Partition p = newPartition(t, "part" + (i + 1));
addBaseFile(t, p, 20L, 20);
addDeltaFile(t, p, 21L, 22L, 2);
addDeltaFile(t, p, 23L, 24L, 2);

LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default");
comp.setTablename("imd");
comp.setPartitionname("ds=part" + (i + 1));
comp.setOperationType(DataOperationType.UPDATE);
components.add(comp);
}
burnThroughTransactions("default", "imd", 23);
long txnid = openTxn();

LockRequest req = new LockRequest(components, "me", "localhost");
req.setTxnid(txnid);
LockResponse res = txnHandler.lock(req);
Assert.assertEquals(LockState.ACQUIRED, res.getState());

long writeid = allocateWriteId("default", "imd", txnid);
Assert.assertEquals(24, writeid);
txnHandler.commitTxn(new CommitTxnRequest(txnid));

startInitiator();

ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
Assert.assertEquals(10, compacts.size());

// The metrics will appear after the next Initiator run
startInitiator();

Assert.assertEquals(originalValue,
Metrics.getOrCreateGauge(INITIATED_METRICS_KEY).intValue());
}

@Test
public void testUpdateCompactionMetrics() {
Metrics.initialize(conf);
ShowCompactResponse scr = new ShowCompactResponse();
List<ShowCompactResponseElement> elements = new ArrayList<>();
elements.add(generateElement(1,"db", "tb", null, CompactionType.MAJOR, TxnStore.FAILED_RESPONSE));
// Check for overwrite
elements.add(generateElement(2,"db", "tb", null, CompactionType.MAJOR, TxnStore.INITIATED_RESPONSE));
elements.add(generateElement(3,"db", "tb2", null, CompactionType.MINOR, TxnStore.INITIATED_RESPONSE));
elements.add(generateElement(5,"db", "tb3", "p1", CompactionType.MINOR, TxnStore.ATTEMPTED_RESPONSE));
// Check for overwrite where the order is different
elements.add(generateElement(4,"db", "tb3", "p1", CompactionType.MINOR, TxnStore.FAILED_RESPONSE));

elements.add(generateElement(6,"db1", "tb", null, CompactionType.MINOR, TxnStore.FAILED_RESPONSE));
elements.add(generateElement(7,"db1", "tb2", null, CompactionType.MINOR, TxnStore.FAILED_RESPONSE));
elements.add(generateElement(8,"db1", "tb3", null, CompactionType.MINOR, TxnStore.FAILED_RESPONSE));

elements.add(generateElement(9,"db2", "tb", null, CompactionType.MINOR, TxnStore.SUCCEEDED_RESPONSE));
elements.add(generateElement(10,"db2", "tb2", null, CompactionType.MINOR, TxnStore.SUCCEEDED_RESPONSE));
elements.add(generateElement(11,"db2", "tb3", null, CompactionType.MINOR, TxnStore.SUCCEEDED_RESPONSE));
elements.add(generateElement(12,"db2", "tb4", null, CompactionType.MINOR, TxnStore.SUCCEEDED_RESPONSE));

elements.add(generateElement(13,"db3", "tb3", null, CompactionType.MINOR, TxnStore.WORKING_RESPONSE));
elements.add(generateElement(14,"db3", "tb4", null, CompactionType.MINOR, TxnStore.WORKING_RESPONSE));
elements.add(generateElement(15,"db3", "tb5", null, CompactionType.MINOR, TxnStore.WORKING_RESPONSE));
elements.add(generateElement(16,"db3", "tb6", null, CompactionType.MINOR, TxnStore.WORKING_RESPONSE));
elements.add(generateElement(17,"db3", "tb7", null, CompactionType.MINOR, TxnStore.WORKING_RESPONSE));

scr.setCompacts(elements);
Initiator.updateCompactionMetrics(scr);

Assert.assertEquals(1,
Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_STATUS_PREFIX + TxnStore.ATTEMPTED_RESPONSE).intValue());
Assert.assertEquals(2,
Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_STATUS_PREFIX + TxnStore.INITIATED_RESPONSE).intValue());
Assert.assertEquals(3,
Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_STATUS_PREFIX + TxnStore.FAILED_RESPONSE).intValue());
Assert.assertEquals(4,
Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_STATUS_PREFIX + TxnStore.SUCCEEDED_RESPONSE).intValue());
Assert.assertEquals(5,
Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_STATUS_PREFIX + TxnStore.WORKING_RESPONSE).intValue());
Assert.assertEquals(0,
Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_STATUS_PREFIX + TxnStore.CLEANING_RESPONSE).intValue());
}

private ShowCompactResponseElement generateElement(long id, String db, String table, String partition,
CompactionType type, String state) {
ShowCompactResponseElement element = new ShowCompactResponseElement(db, table, type, state);
element.setId(id);
element.setPartitionname(partition);
return element;
}

@Override
boolean useHive130DeltaDirName() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
public class MetricsConstants {
public static final String ACTIVE_CALLS = "active_calls_";
public static final String API_PREFIX = "api_";
public static final String COMPACTION_STATUS_PREFIX = "compaction_num_";

public static final String TOTAL_API_CALLS = "total_api_calls";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ enum MUTEX_KEY {
String SUCCEEDED_RESPONSE = "succeeded";
String ATTEMPTED_RESPONSE = "attempted";

String[] COMPACTION_STATES = new String[] {INITIATED_RESPONSE, WORKING_RESPONSE, CLEANING_RESPONSE, FAILED_RESPONSE,
SUCCEEDED_RESPONSE, ATTEMPTED_RESPONSE};

int TIMED_OUT_TXN_ABORT_BATCH_SIZE = 50000;

/**
Expand Down

0 comments on commit bfdaeab

Please sign in to comment.