-
Notifications
You must be signed in to change notification settings - Fork 9.1k
HDFS-14318:dn cannot be recognized and must be restarted to recognize the Repaired disk #1104
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Changes from all commits
ec5c73c
09c21ac
f8469f9
e53e149
919df08
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -46,9 +46,14 @@ | |
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY; | ||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT; | ||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_KEY; | ||
|
||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DISK_CHECK_INTERVAL_KEY; | ||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DISK_CHECK_INTERVAL_DEFAULT; | ||
|
||
import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_APPEND_RECOVERY; | ||
import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE; | ||
import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_STREAMING_RECOVERY; | ||
|
||
import static org.apache.hadoop.util.ExitUtil.terminate; | ||
|
||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; | ||
|
@@ -84,13 +89,7 @@ | |
import java.util.Map.Entry; | ||
import java.util.Set; | ||
import java.util.UUID; | ||
import java.util.concurrent.Callable; | ||
import java.util.concurrent.ExecutionException; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.Future; | ||
import java.util.concurrent.ScheduledThreadPoolExecutor; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.*; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
|
||
import javax.annotation.Nullable; | ||
|
@@ -407,6 +406,10 @@ public static InetSocketAddress createSocketAddr(String target) { | |
private final StorageLocationChecker storageLocationChecker; | ||
|
||
private final DatasetVolumeChecker volumeChecker; | ||
volatile FsDatasetSpi<? extends FsVolumeSpi> allData = null; | ||
private ScheduledExecutorService scheduledExecutor; | ||
private int checkDiskInterval = 5*1000; | ||
private List<StorageLocation> errorDisk = null; | ||
|
||
private final SocketFactory socketFactory; | ||
|
||
|
@@ -445,6 +448,8 @@ private static Tracer createTracer(Configuration conf) { | |
volumeChecker = new DatasetVolumeChecker(conf, new Timer()); | ||
this.xferService = | ||
HadoopExecutors.newCachedThreadPool(new Daemon.DaemonFactory()); | ||
this.checkDiskInterval = conf.getInt(DFS_DATANODE_DISK_CHECK_INTERVAL_KEY, | ||
DFS_DATANODE_DISK_CHECK_INTERVAL_DEFAULT); | ||
} | ||
|
||
/** | ||
|
@@ -478,6 +483,8 @@ private static Tracer createTracer(Configuration conf) { | |
this.pipelineSupportECN = conf.getBoolean( | ||
DFSConfigKeys.DFS_PIPELINE_ECN_ENABLED, | ||
DFSConfigKeys.DFS_PIPELINE_ECN_ENABLED_DEFAULT); | ||
this.checkDiskInterval = conf.getInt(DFS_DATANODE_DISK_CHECK_INTERVAL_KEY, | ||
DFS_DATANODE_DISK_CHECK_INTERVAL_DEFAULT); | ||
|
||
confVersion = "core-" + | ||
conf.get("hadoop.common.configuration.version", "UNSPECIFIED") + | ||
|
@@ -801,6 +808,7 @@ private synchronized void refreshVolumes(String newVolumes) throws IOException { | |
public IOException call() { | ||
try { | ||
data.addVolume(location, nsInfos); | ||
allData.addVolume(location, nsInfos); | ||
} catch (IOException e) { | ||
return e; | ||
} | ||
|
@@ -822,6 +830,14 @@ public IOException call() { | |
} else { | ||
effectiveVolumes.add(volume.toString()); | ||
LOG.info("Successfully added volume: {}", volume); | ||
if (errorDisk != null && !errorDisk.isEmpty()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It may be not necessary to check if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think so. If public String reconfigurePropertyImpl(String property, String newVal)
throws ReconfigurationException {
switch (property) {
case DFS_DATANODE_DATA_DIR_KEY: {
IOException rootException = null;
try {
LOG.info("Reconfiguring {} to {}", property, newVal);
this.refreshVolumes(newVal);
...
} |
||
LOG.debug("check errorDisk for {} disk ", volume); | ||
if (errorDisk.contains(volume)) { | ||
errorDisk.remove(volume); | ||
LOG.info("Remove {} from errorDisk, " + | ||
"because of the repaired disk ", volume); | ||
} | ||
} | ||
} | ||
} catch (Exception e) { | ||
errorMessageBuilder.append( | ||
|
@@ -1701,6 +1717,13 @@ void initBlockPool(BPOfferService bpos) throws IOException { | |
// Exclude failed disks before initializing the block pools to avoid startup | ||
// failures. | ||
checkDiskError(); | ||
|
||
// start check disk thread. | ||
scheduledExecutor = Executors.newScheduledThreadPool(1); | ||
Runnable checkDisk = new CheckDisk(); | ||
scheduledExecutor.scheduleAtFixedRate(checkDisk, checkDiskInterval, | ||
checkDiskInterval, TimeUnit.SECONDS); | ||
|
||
try { | ||
data.addBlockPool(nsInfo.getBlockPoolID(), getConf()); | ||
} catch (AddBlockPoolException e) { | ||
|
@@ -1775,6 +1798,7 @@ private void initStorage(final NamespaceInfo nsInfo) throws IOException { | |
synchronized(this) { | ||
if (data == null) { | ||
data = factory.newInstance(this, storage, getConf()); | ||
allData = factory.newInstance(this, storage, getConf()); | ||
} | ||
} | ||
} | ||
|
@@ -2194,6 +2218,26 @@ public void shutdown() { | |
tracer.close(); | ||
} | ||
|
||
private class CheckDisk implements Runnable { | ||
|
||
@Override | ||
public void run() { | ||
while (shouldRun) { | ||
LOG.info("CheckDiskThread running "); | ||
if (errorDisk != null && !errorDisk.isEmpty()) { | ||
try { | ||
checkDiskError(); | ||
} catch (Exception e) { | ||
LOG.warn("Unexpected exception occurred while" + | ||
" checking disk error "+ e); | ||
return; | ||
} | ||
lastDiskErrorCheck = Time.monotonicNow(); | ||
} | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Check if there is a disk failure asynchronously | ||
* and if so, handle the error. | ||
|
@@ -3422,20 +3466,53 @@ public ShortCircuitRegistry getShortCircuitRegistry() { | |
@VisibleForTesting | ||
public void checkDiskError() throws IOException { | ||
Set<FsVolumeSpi> unhealthyVolumes; | ||
Configuration conf = getConf(); | ||
String newDataDirs = null; | ||
try { | ||
unhealthyVolumes = volumeChecker.checkAllVolumes(data); | ||
// check all volume | ||
unhealthyVolumes = volumeChecker.checkAllVolumes(allData); | ||
lastDiskErrorCheck = Time.monotonicNow(); | ||
} catch (InterruptedException e) { | ||
LOG.error("Interruped while running disk check", e); | ||
LOG.error("Interrupted while running disk check", e); | ||
throw new IOException("Interrupted while running disk check", e); | ||
} | ||
|
||
if (unhealthyVolumes.size() > 0) { | ||
if (errorDisk == null) { | ||
errorDisk = new ArrayList<>(); | ||
} | ||
List<StorageLocation> tmpDisk = Lists.newArrayList(errorDisk); | ||
errorDisk.clear(); | ||
for (FsVolumeSpi vol : unhealthyVolumes) { | ||
LOG.info("Add error disk {} to errorDisk - {}", | ||
vol.getStorageLocation(), vol.getStorageLocation()); | ||
errorDisk.add(vol.getStorageLocation()); | ||
hunshenshi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (tmpDisk.contains(vol.getStorageLocation())) { | ||
tmpDisk.remove(vol.getStorageLocation()); | ||
} | ||
} | ||
LOG.warn("checkDiskError got {} failed volumes - {}", | ||
unhealthyVolumes.size(), unhealthyVolumes); | ||
handleVolumeFailures(unhealthyVolumes); | ||
if (!tmpDisk.isEmpty()) { | ||
hunshenshi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
newDataDirs = conf.get(DFS_DATANODE_DATA_DIR_KEY) | ||
+ "," + Joiner.on(",").join(tmpDisk); | ||
} | ||
} else { | ||
LOG.debug("checkDiskError encountered no failures"); | ||
LOG.debug("checkDiskError encountered no failures," + | ||
"then check errorDisk"); | ||
if (errorDisk != null && !errorDisk.isEmpty()) { | ||
newDataDirs = conf.get(DFS_DATANODE_DATA_DIR_KEY) | ||
+ "," + Joiner.on(",").join(errorDisk); | ||
} | ||
} | ||
if (newDataDirs != null) { | ||
LOG.debug("Bad disks is repaired, should refreshVolumes disk."); | ||
try { | ||
refreshVolumes(newDataDirs); | ||
} catch (IOException e) { | ||
LOG.error("Bad disks is repaired, refreshVolumes error : ", e); | ||
} | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
allData = data + errorDisk
right? It seems thatallData
is redundant variable.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
allData = data + errorDisk
is right.allData
will use involumeChecker.checkAllVolumes(allData)
.If no
allData
, we must make a variable beforevolumeChecker.checkAllVolumes
, the variable isdata + errorDisk
. So I thinkallData
is ok .