Skip to content

Commit

Permalink
HDFS-16566 Erasure Coding: Recovery may causes excess replicas when b…
Browse files Browse the repository at this point in the history
…usy DN exsits (#4252)
  • Loading branch information
RuinanGu authored Jul 15, 2022
1 parent 2835174 commit 9376b65
Show file tree
Hide file tree
Showing 14 changed files with 167 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ message BlockECReconstructionInfoProto {
required StorageTypesProto targetStorageTypes = 5;
required bytes liveBlockIndices = 6;
required ErasureCodingPolicyProto ecPolicy = 7;
optional bytes excludeReconstructedIndices = 8;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1049,11 +1049,17 @@ public static BlockECReconstructionInfo convertBlockECReconstructionInfo(

byte[] liveBlkIndices = blockEcReconstructionInfoProto.getLiveBlockIndices()
.toByteArray();
byte[] excludeReconstructedIndices =
blockEcReconstructionInfoProto.hasExcludeReconstructedIndices() ?
blockEcReconstructionInfoProto.getExcludeReconstructedIndices()
.toByteArray() : new byte[0];
ErasureCodingPolicy ecPolicy =
PBHelperClient.convertErasureCodingPolicy(
blockEcReconstructionInfoProto.getEcPolicy());
return new BlockECReconstructionInfo(block, sourceDnInfos, targetDnInfos,
targetStorageUuids, convertStorageTypes, liveBlkIndices, ecPolicy);
return new BlockECReconstructionInfo(
block, sourceDnInfos, targetDnInfos,
targetStorageUuids, convertStorageTypes, liveBlkIndices,
excludeReconstructedIndices, ecPolicy);
}

public static BlockECReconstructionInfoProto convertBlockECRecoveryInfo(
Expand All @@ -1079,6 +1085,10 @@ public static BlockECReconstructionInfoProto convertBlockECRecoveryInfo(
byte[] liveBlockIndices = blockEcRecoveryInfo.getLiveBlockIndices();
builder.setLiveBlockIndices(PBHelperClient.getByteString(liveBlockIndices));

byte[] excludeReconstructedIndices = blockEcRecoveryInfo.getExcludeReconstructedIndices();
builder.setExcludeReconstructedIndices(
PBHelperClient.getByteString(excludeReconstructedIndices));

builder.setEcPolicy(PBHelperClient.convertErasureCodingPolicy(
blockEcRecoveryInfo.getErasureCodingPolicy()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -949,7 +949,7 @@ private void dumpBlockMeta(Block block, PrintWriter out) {
// source node returned is not used
chooseSourceDatanodes(blockInfo, containingNodes,
containingLiveReplicasNodes, numReplicas, new ArrayList<Byte>(),
new ArrayList<Byte>(), LowRedundancyBlocks.LEVEL);
new ArrayList<Byte>(), new ArrayList<Byte>(), LowRedundancyBlocks.LEVEL);

// containingLiveReplicasNodes can include READ_ONLY_SHARED replicas which are
// not included in the numReplicas.liveReplicas() count
Expand Down Expand Up @@ -2195,9 +2195,10 @@ BlockReconstructionWork scheduleReconstruction(BlockInfo block,
NumberReplicas numReplicas = new NumberReplicas();
List<Byte> liveBlockIndices = new ArrayList<>();
List<Byte> liveBusyBlockIndices = new ArrayList<>();
List<Byte> excludeReconstructed = new ArrayList<>();
final DatanodeDescriptor[] srcNodes = chooseSourceDatanodes(block,
containingNodes, liveReplicaNodes, numReplicas,
liveBlockIndices, liveBusyBlockIndices, priority);
liveBlockIndices, liveBusyBlockIndices, excludeReconstructed, priority);
short requiredRedundancy = getExpectedLiveRedundancyNum(block,
numReplicas);
if(srcNodes == null || srcNodes.length == 0) {
Expand Down Expand Up @@ -2267,9 +2268,13 @@ BlockReconstructionWork scheduleReconstruction(BlockInfo block,
for (int i = 0; i < liveBusyBlockIndices.size(); i++) {
busyIndices[i] = liveBusyBlockIndices.get(i);
}
byte[] excludeReconstructedIndices = new byte[excludeReconstructed.size()];
for (int i = 0; i < excludeReconstructed.size(); i++) {
excludeReconstructedIndices[i] = excludeReconstructed.get(i);
}
return new ErasureCodingWork(getBlockPoolId(), block, bc, newSrcNodes,
containingNodes, liveReplicaNodes, additionalReplRequired,
priority, newIndices, busyIndices);
priority, newIndices, busyIndices, excludeReconstructedIndices);
} else {
return new ReplicationWork(block, bc, srcNodes,
containingNodes, liveReplicaNodes, additionalReplRequired,
Expand Down Expand Up @@ -2517,7 +2522,7 @@ DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block,
List<DatanodeDescriptor> containingNodes,
List<DatanodeStorageInfo> nodesContainingLiveReplicas,
NumberReplicas numReplicas, List<Byte> liveBlockIndices,
List<Byte> liveBusyBlockIndices, int priority) {
List<Byte> liveBusyBlockIndices, List<Byte> excludeReconstructed, int priority) {
containingNodes.clear();
nodesContainingLiveReplicas.clear();
List<DatanodeDescriptor> srcNodes = new ArrayList<>();
Expand Down Expand Up @@ -2587,6 +2592,8 @@ DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block,
if (isStriped && (state == StoredReplicaState.LIVE
|| state == StoredReplicaState.DECOMMISSIONING)) {
liveBusyBlockIndices.add(blockIndex);
//HDFS-16566 ExcludeReconstructed won't be reconstructed.
excludeReconstructed.add(blockIndex);
}
continue; // already reached replication limit
}
Expand All @@ -2595,6 +2602,8 @@ DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block,
if (isStriped && (state == StoredReplicaState.LIVE
|| state == StoredReplicaState.DECOMMISSIONING)) {
liveBusyBlockIndices.add(blockIndex);
//HDFS-16566 ExcludeReconstructed won't be reconstructed.
excludeReconstructed.add(blockIndex);
}
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -683,10 +683,10 @@ public void addBlockToBeReplicated(Block block,
*/
void addBlockToBeErasureCoded(ExtendedBlock block,
DatanodeDescriptor[] sources, DatanodeStorageInfo[] targets,
byte[] liveBlockIndices, ErasureCodingPolicy ecPolicy) {
byte[] liveBlockIndices, byte[] excludeReconstrutedIndices, ErasureCodingPolicy ecPolicy) {
assert (block != null && sources != null && sources.length > 0);
BlockECReconstructionInfo task = new BlockECReconstructionInfo(block,
sources, targets, liveBlockIndices, ecPolicy);
sources, targets, liveBlockIndices, excludeReconstrutedIndices, ecPolicy);
erasurecodeBlocks.offer(task);
BlockManager.LOG.debug("Adding block reconstruction task " + task + "to "
+ getName() + ", current queue size is " + erasurecodeBlocks.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
class ErasureCodingWork extends BlockReconstructionWork {
private final byte[] liveBlockIndices;
private final byte[] liveBusyBlockIndices;
private final byte[] excludeReconstructedIndices;
private final String blockPoolId;

public ErasureCodingWork(String blockPoolId, BlockInfo block,
Expand All @@ -40,12 +41,14 @@ public ErasureCodingWork(String blockPoolId, BlockInfo block,
List<DatanodeDescriptor> containingNodes,
List<DatanodeStorageInfo> liveReplicaStorages,
int additionalReplRequired, int priority,
byte[] liveBlockIndices, byte[] liveBusyBlockIndices) {
byte[] liveBlockIndices, byte[] liveBusyBlockIndices,
byte[] excludeReconstrutedIndices) {
super(block, bc, srcNodes, containingNodes,
liveReplicaStorages, additionalReplRequired, priority);
this.blockPoolId = blockPoolId;
this.liveBlockIndices = liveBlockIndices;
this.liveBusyBlockIndices = liveBusyBlockIndices;
this.excludeReconstructedIndices=excludeReconstrutedIndices;
LOG.debug("Creating an ErasureCodingWork to {} reconstruct ",
block);
}
Expand Down Expand Up @@ -147,7 +150,7 @@ void addTaskToDatanode(NumberReplicas numberReplicas) {
} else {
targets[0].getDatanodeDescriptor().addBlockToBeErasureCoded(
new ExtendedBlock(blockPoolId, stripedBlk), getSrcNodes(), targets,
getLiveBlockIndices(), stripedBlk.getErasureCodingPolicy());
liveBlockIndices, excludeReconstructedIndices, stripedBlk.getErasureCodingPolicy());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public void processErasureCodingTasks(
reconInfo.getExtendedBlock(), reconInfo.getErasureCodingPolicy(),
reconInfo.getLiveBlockIndices(), reconInfo.getSourceDnInfos(),
reconInfo.getTargetDnInfos(), reconInfo.getTargetStorageTypes(),
reconInfo.getTargetStorageIDs());
reconInfo.getTargetStorageIDs(), reconInfo.getExcludeReconstructedIndices());
// It may throw IllegalArgumentException from task#stripedReader
// constructor.
final StripedBlockReconstructor task =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,26 +41,28 @@ public class StripedReconstructionInfo {
private final DatanodeInfo[] targets;
private final StorageType[] targetStorageTypes;
private final String[] targetStorageIds;
private final byte[] excludeReconstructedIndices;

public StripedReconstructionInfo(ExtendedBlock blockGroup,
ErasureCodingPolicy ecPolicy, byte[] liveIndices, DatanodeInfo[] sources,
byte[] targetIndices) {
this(blockGroup, ecPolicy, liveIndices, sources, targetIndices, null,
null, null);
null, null, new byte[0]);
}

StripedReconstructionInfo(ExtendedBlock blockGroup,
ErasureCodingPolicy ecPolicy, byte[] liveIndices, DatanodeInfo[] sources,
DatanodeInfo[] targets, StorageType[] targetStorageTypes,
String[] targetStorageIds) {
String[] targetStorageIds, byte[] excludeReconstructedIndices) {
this(blockGroup, ecPolicy, liveIndices, sources, null, targets,
targetStorageTypes, targetStorageIds);
targetStorageTypes, targetStorageIds, excludeReconstructedIndices);
}

private StripedReconstructionInfo(ExtendedBlock blockGroup,
ErasureCodingPolicy ecPolicy, byte[] liveIndices, DatanodeInfo[] sources,
byte[] targetIndices, DatanodeInfo[] targets,
StorageType[] targetStorageTypes, String[] targetStorageIds) {
StorageType[] targetStorageTypes, String[] targetStorageIds,
byte[] excludeReconstructedIndices) {

this.blockGroup = blockGroup;
this.ecPolicy = ecPolicy;
Expand All @@ -70,6 +72,7 @@ private StripedReconstructionInfo(ExtendedBlock blockGroup,
this.targets = targets;
this.targetStorageTypes = targetStorageTypes;
this.targetStorageIds = targetStorageIds;
this.excludeReconstructedIndices = excludeReconstructedIndices;
}

ExtendedBlock getBlockGroup() {
Expand Down Expand Up @@ -104,5 +107,9 @@ String[] getTargetStorageIds() {
return targetStorageIds;
}

byte[] getExcludeReconstructedIndices() {
return excludeReconstructedIndices;
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ abstract class StripedReconstructor {
private final CachingStrategy cachingStrategy;
private long maxTargetLength = 0L;
private final BitSet liveBitSet;
private final BitSet excludeBitSet;

// metrics
private AtomicLong bytesRead = new AtomicLong(0);
Expand All @@ -137,6 +138,12 @@ abstract class StripedReconstructor {
for (int i = 0; i < stripedReconInfo.getLiveIndices().length; i++) {
liveBitSet.set(stripedReconInfo.getLiveIndices()[i]);
}
excludeBitSet = new BitSet(
ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits());
for (int i = 0; i < stripedReconInfo.getExcludeReconstructedIndices().length; i++) {
excludeBitSet.set(stripedReconInfo.getExcludeReconstructedIndices()[i]);
}

blockGroup = stripedReconInfo.getBlockGroup();
stripedReader = new StripedReader(this, datanode, conf, stripedReconInfo);
cachingStrategy = CachingStrategy.newDefaultStrategy();
Expand Down Expand Up @@ -261,6 +268,10 @@ BitSet getLiveBitSet() {
return liveBitSet;
}

BitSet getExcludeBitSet(){
return excludeBitSet;
}

long getMaxTargetLength() {
return maxTargetLength;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,14 @@ void init() throws IOException {

private void initTargetIndices() {
BitSet bitset = reconstructor.getLiveBitSet();
BitSet excludebitset=reconstructor.getExcludeBitSet();

int m = 0;
hasValidTargets = false;
for (int i = 0; i < dataBlkNum + parityBlkNum; i++) {
if (!bitset.get(i)) {
if (reconstructor.getBlockLen(i) > 0) {
if (m < targets.length) {
if (m < targets.length && !excludebitset.get(i)) {
targetIndices[m++] = (short)i;
hasValidTargets = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,28 +78,31 @@ public static class BlockECReconstructionInfo {
private String[] targetStorageIDs;
private StorageType[] targetStorageTypes;
private final byte[] liveBlockIndices;
private final byte[] excludeReconstructedIndices;
private final ErasureCodingPolicy ecPolicy;

public BlockECReconstructionInfo(ExtendedBlock block,
DatanodeInfo[] sources, DatanodeStorageInfo[] targetDnStorageInfo,
byte[] liveBlockIndices, ErasureCodingPolicy ecPolicy) {
byte[] liveBlockIndices, byte[] excludeReconstructedIndices, ErasureCodingPolicy ecPolicy) {
this(block, sources, DatanodeStorageInfo
.toDatanodeInfos(targetDnStorageInfo), DatanodeStorageInfo
.toStorageIDs(targetDnStorageInfo), DatanodeStorageInfo
.toStorageTypes(targetDnStorageInfo), liveBlockIndices, ecPolicy);
.toStorageTypes(targetDnStorageInfo), liveBlockIndices,
excludeReconstructedIndices, ecPolicy);
}

public BlockECReconstructionInfo(ExtendedBlock block,
DatanodeInfo[] sources, DatanodeInfo[] targets,
String[] targetStorageIDs, StorageType[] targetStorageTypes,
byte[] liveBlockIndices, ErasureCodingPolicy ecPolicy) {
byte[] liveBlockIndices, byte[] excludeReconstructedIndices, ErasureCodingPolicy ecPolicy) {
this.block = block;
this.sources = sources;
this.targets = targets;
this.targetStorageIDs = targetStorageIDs;
this.targetStorageTypes = targetStorageTypes;
this.liveBlockIndices = liveBlockIndices == null ?
new byte[]{} : liveBlockIndices;
this.excludeReconstructedIndices = excludeReconstructedIndices;
this.ecPolicy = ecPolicy;
}

Expand Down Expand Up @@ -127,6 +130,10 @@ public byte[] getLiveBlockIndices() {
return liveBlockIndices;
}

public byte[] getExcludeReconstructedIndices() {
return excludeReconstructedIndices;
}

public ErasureCodingPolicy getErasureCodingPolicy() {
return ecPolicy;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ public void testProcessErasureCodingTasksSubmitionShouldSucceed()

BlockECReconstructionInfo invalidECInfo = new BlockECReconstructionInfo(
new ExtendedBlock("bp-id", 123456), dataDNs, dnStorageInfo, liveIndices,
ecPolicy);
new byte[0], ecPolicy);
List<BlockECReconstructionInfo> ecTasks = new ArrayList<>();
ecTasks.add(invalidECInfo);
dataNode.getErasureCodingWorker().processErasureCodingTasks(ecTasks);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -749,9 +749,10 @@ public void testBlockECRecoveryCommand() {
DatanodeStorageInfo[] targetDnInfos0 = new DatanodeStorageInfo[] {
targetDnInfos_0, targetDnInfos_1 };
byte[] liveBlkIndices0 = new byte[2];
byte[] excludeReconstructedIndices0=new byte[2];
BlockECReconstructionInfo blkECRecoveryInfo0 = new BlockECReconstructionInfo(
new ExtendedBlock("bp1", 1234), dnInfos0, targetDnInfos0,
liveBlkIndices0, StripedFileTestUtil.getDefaultECPolicy());
liveBlkIndices0, excludeReconstructedIndices0, StripedFileTestUtil.getDefaultECPolicy());
DatanodeInfo[] dnInfos1 = new DatanodeInfo[] {
DFSTestUtil.getLocalDatanodeInfo(), DFSTestUtil.getLocalDatanodeInfo() };
DatanodeStorageInfo targetDnInfos_2 = BlockManagerTestUtil
Expand All @@ -763,9 +764,10 @@ public void testBlockECRecoveryCommand() {
DatanodeStorageInfo[] targetDnInfos1 = new DatanodeStorageInfo[] {
targetDnInfos_2, targetDnInfos_3 };
byte[] liveBlkIndices1 = new byte[2];
byte[] excludeReconstructedIndices = new byte[2];
BlockECReconstructionInfo blkECRecoveryInfo1 = new BlockECReconstructionInfo(
new ExtendedBlock("bp2", 3256), dnInfos1, targetDnInfos1,
liveBlkIndices1, StripedFileTestUtil.getDefaultECPolicy());
liveBlkIndices1, excludeReconstructedIndices, StripedFileTestUtil.getDefaultECPolicy());
List<BlockECReconstructionInfo> blkRecoveryInfosList = new ArrayList<BlockECReconstructionInfo>();
blkRecoveryInfosList.add(blkECRecoveryInfo0);
blkRecoveryInfosList.add(blkECRecoveryInfo1);
Expand Down
Loading

0 comments on commit 9376b65

Please sign in to comment.