Skip to content

Commit

Permalink
YARN-5009. NMLeveldbStateStoreService database can grow substantially…
Browse files Browse the repository at this point in the history
… leading to longer recovery times. Contributed by Jason Lowe

(cherry picked from commit 4a85085)
  • Loading branch information
jian-he committed Apr 29, 2016
1 parent d7b2da6 commit 77ef473
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1134,6 +1134,13 @@ private static void addDeprecatedKeys() {

public static final String NM_RECOVERY_DIR = NM_RECOVERY_PREFIX + "dir";

/** The time in seconds between full compactions of the NM state database.
* Setting the interval to zero disables the full compaction cycles.
*/
public static final String NM_RECOVERY_COMPACTION_INTERVAL_SECS =
NM_RECOVERY_PREFIX + "compaction-interval-secs";
public static final int DEFAULT_NM_RECOVERY_COMPACTION_INTERVAL_SECS = 3600;

////////////////////////////////
// Web Proxy Configs
////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1246,6 +1246,12 @@
</description>
<name>yarn.nodemanager.container-metrics.unregister-delay-ms</name>
<value>10000</value>

<description>The time in seconds between full compactions of the NM state
database. Setting the interval to zero disables the full compaction
cycles.</description>
<name>yarn.nodemanager.recovery.compaction-interval-secs</name>
<value>3600</value>
</property>

<!--Docker configuration-->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,16 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Timer;
import java.util.TimerTask;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainerRequestPBImpl;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
Expand Down Expand Up @@ -123,6 +126,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {

private DB db;
private boolean isNewlyCreated;
private Timer compactionTimer;

public NMLeveldbStateStoreService() {
super(NMLeveldbStateStoreService.class.getName());
Expand All @@ -134,6 +138,10 @@ protected void startStorage() throws IOException {

@Override
protected void closeStorage() throws IOException {
if (compactionTimer != null) {
compactionTimer.cancel();
compactionTimer = null;
}
if (db != null) {
db.close();
}
Expand Down Expand Up @@ -942,6 +950,12 @@ private String getLogDeleterKey(ApplicationId appId) {
@Override
protected void initStorage(Configuration conf)
throws IOException {
db = openDatabase(conf);
checkVersion();
startCompactionTimer(conf);
}

protected DB openDatabase(Configuration conf) throws IOException {
Path storeRoot = createStorageDir(conf);
Options options = new Options();
options.createIfMissing(false);
Expand All @@ -966,7 +980,7 @@ protected void initStorage(Configuration conf)
throw e;
}
}
checkVersion();
return db;
}

private Path createStorageDir(Configuration conf) throws IOException {
Expand All @@ -982,6 +996,33 @@ private Path createStorageDir(Configuration conf) throws IOException {
return root;
}

private void startCompactionTimer(Configuration conf) {
long intervalMsec = conf.getLong(
YarnConfiguration.NM_RECOVERY_COMPACTION_INTERVAL_SECS,
YarnConfiguration.DEFAULT_NM_RECOVERY_COMPACTION_INTERVAL_SECS) * 1000;
if (intervalMsec > 0) {
compactionTimer = new Timer(
this.getClass().getSimpleName() + " compaction timer", true);
compactionTimer.schedule(new CompactionTimerTask(),
intervalMsec, intervalMsec);
}
}


private class CompactionTimerTask extends TimerTask {
@Override
public void run() {
long start = Time.monotonicNow();
LOG.info("Starting full compaction cycle");
try {
db.compactRange(null, null);
} catch (DBException e) {
LOG.error("Error compacting database", e);
}
long duration = Time.monotonicNow() - start;
LOG.info("Full compaction cycle completed in " + duration + " msec");
}
}

private static class LeveldbLogger implements Logger {
private static final Log LOG = LogFactory.getLog(LeveldbLogger.class);
Expand Down Expand Up @@ -1039,7 +1080,7 @@ Version getCurrentVersion() {
* throw exception and indicate user to use a separate upgrade tool to
* upgrade NM state or remove incompatible old state.
*/
private void checkVersion() throws IOException {
protected void checkVersion() throws IOException {
Version loadedVersion = loadVersion();
LOG.info("Loaded NM state version info " + loadedVersion);
if (loadedVersion.equals(getCurrentVersion())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import static org.mockito.Mockito.isNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -75,6 +79,7 @@
import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.iq80.leveldb.DB;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -874,6 +879,26 @@ public void testLogDeleterStorage() throws IOException {
assertTrue(state.getLogDeleterMap().isEmpty());
}

@Test
public void testCompactionCycle() throws IOException {
final DB mockdb = mock(DB.class);
conf.setInt(YarnConfiguration.NM_RECOVERY_COMPACTION_INTERVAL_SECS, 1);
NMLeveldbStateStoreService store = new NMLeveldbStateStoreService() {
@Override
protected void checkVersion() {}

@Override
protected DB openDatabase(Configuration conf) {
return mockdb;
}
};
store.init(conf);
store.start();
verify(mockdb, timeout(10000)).compactRange(
(byte[]) isNull(), (byte[]) isNull());
store.close();
}

private static class NMTokenSecretManagerForTest extends
BaseNMTokenSecretManager {
public MasterKey generateKey() {
Expand Down

0 comments on commit 77ef473

Please sign in to comment.