From 77ef473981c1af0166ed6307b5a2789ca7caa7ba Mon Sep 17 00:00:00 2001 From: Jian He Date: Thu, 28 Apr 2016 21:54:11 -0700 Subject: [PATCH] YARN-5009. NMLeveldbStateStoreService database can grow substantially leading to longer recovery times. Contributed by Jason Lowe (cherry picked from commit 4a8508501bc753858693dacdafba61d604702f71) --- .../hadoop/yarn/conf/YarnConfiguration.java | 7 +++ .../src/main/resources/yarn-default.xml | 6 +++ .../recovery/NMLeveldbStateStoreService.java | 45 ++++++++++++++++++- .../TestNMLeveldbStateStoreService.java | 25 +++++++++++ 4 files changed, 81 insertions(+), 2 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 0cfca20380558..8922e060f8cda 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1134,6 +1134,13 @@ private static void addDeprecatedKeys() { public static final String NM_RECOVERY_DIR = NM_RECOVERY_PREFIX + "dir"; + /** The time in seconds between full compactions of the NM state database. + * Setting the interval to zero disables the full compaction cycles. + */ + public static final String NM_RECOVERY_COMPACTION_INTERVAL_SECS = + NM_RECOVERY_PREFIX + "compaction-interval-secs"; + public static final int DEFAULT_NM_RECOVERY_COMPACTION_INTERVAL_SECS = 3600; + //////////////////////////////// // Web Proxy Configs //////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 5664242993a56..4993846062c6b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1246,6 +1246,12 @@ yarn.nodemanager.container-metrics.unregister-delay-ms 10000 + + The time in seconds between full compactions of the NM state + database. Setting the interval to zero disables the full compaction + cycles. + yarn.nodemanager.recovery.compaction-interval-secs + 3600 diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index 0c9901ce9cf72..36b7f81742833 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -28,6 +28,8 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Timer; +import java.util.TimerTask; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -35,6 +37,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainerRequestPBImpl; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -123,6 +126,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { private DB db; private boolean isNewlyCreated; + private Timer compactionTimer; public NMLeveldbStateStoreService() { super(NMLeveldbStateStoreService.class.getName()); @@ -134,6 +138,10 @@ protected void startStorage() throws IOException { @Override protected void closeStorage() throws IOException { + if (compactionTimer != null) { + compactionTimer.cancel(); + compactionTimer = null; + } if (db != null) { db.close(); } @@ -942,6 +950,12 @@ private String getLogDeleterKey(ApplicationId appId) { @Override protected void initStorage(Configuration conf) throws IOException { + db = openDatabase(conf); + checkVersion(); + startCompactionTimer(conf); + } + + protected DB openDatabase(Configuration conf) throws IOException { Path storeRoot = createStorageDir(conf); Options options = new Options(); options.createIfMissing(false); @@ -966,7 +980,7 @@ protected void initStorage(Configuration conf) throw e; } } - checkVersion(); + return db; } private Path createStorageDir(Configuration conf) throws IOException { @@ -982,6 +996,33 @@ private Path createStorageDir(Configuration conf) throws IOException { return root; } + private void startCompactionTimer(Configuration conf) { + long intervalMsec = conf.getLong( + YarnConfiguration.NM_RECOVERY_COMPACTION_INTERVAL_SECS, + YarnConfiguration.DEFAULT_NM_RECOVERY_COMPACTION_INTERVAL_SECS) * 1000; + if (intervalMsec > 0) { + compactionTimer = new Timer( + this.getClass().getSimpleName() + " compaction timer", true); + compactionTimer.schedule(new CompactionTimerTask(), + intervalMsec, intervalMsec); + } + } + + + private class CompactionTimerTask extends TimerTask { + @Override + public void run() { + long start = Time.monotonicNow(); + LOG.info("Starting full compaction cycle"); + try { + db.compactRange(null, null); + } catch (DBException e) { + LOG.error("Error compacting database", e); + } + long duration = Time.monotonicNow() - start; + LOG.info("Full compaction cycle completed in " + duration + " msec"); + } + } private static class LeveldbLogger implements Logger { private static final Log LOG = LogFactory.getLog(LeveldbLogger.class); @@ -1039,7 +1080,7 @@ Version getCurrentVersion() { * throw exception and indicate user to use a separate upgrade tool to * upgrade NM state or remove incompatible old state. */ - private void checkVersion() throws IOException { + protected void checkVersion() throws IOException { Version loadedVersion = loadVersion(); LOG.info("Loaded NM state version info " + loadedVersion); if (loadedVersion.equals(getCurrentVersion())) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index 41ec2d5555099..f2f43a9118e98 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -23,6 +23,10 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; +import static org.mockito.Mockito.isNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; import java.io.File; import java.io.IOException; @@ -75,6 +79,7 @@ import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.iq80.leveldb.DB; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -874,6 +879,26 @@ public void testLogDeleterStorage() throws IOException { assertTrue(state.getLogDeleterMap().isEmpty()); } + @Test + public void testCompactionCycle() throws IOException { + final DB mockdb = mock(DB.class); + conf.setInt(YarnConfiguration.NM_RECOVERY_COMPACTION_INTERVAL_SECS, 1); + NMLeveldbStateStoreService store = new NMLeveldbStateStoreService() { + @Override + protected void checkVersion() {} + + @Override + protected DB openDatabase(Configuration conf) { + return mockdb; + } + }; + store.init(conf); + store.start(); + verify(mockdb, timeout(10000)).compactRange( + (byte[]) isNull(), (byte[]) isNull()); + store.close(); + } + private static class NMTokenSecretManagerForTest extends BaseNMTokenSecretManager { public MasterKey generateKey() {