Skip to content

Commit 535d712

Browse files
committed
HDFS-17542. EC: Optimize the EC block reconstruction.
1 parent 134dcf1 commit 535d712

File tree

7 files changed

+921
-304
lines changed

7 files changed

+921
-304
lines changed

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

Lines changed: 182 additions & 99 deletions
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hdfs.server.blockmanagement;
19+
20+
import org.apache.hadoop.classification.VisibleForTesting;
21+
import org.apache.hadoop.hdfs.protocol.Block;
22+
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
23+
import org.apache.hadoop.net.Node;
24+
import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState;
25+
26+
27+
import java.util.ArrayList;
28+
import java.util.HashMap;
29+
import java.util.List;
30+
import java.util.Map;
31+
import java.util.Set;
32+
33+
import static org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.DECOMMISSIONING;
34+
import static org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.LIVE;
35+
36+
public class ErasureCodingReplicationWork extends BlockReconstructionWork {
37+
38+
private final Byte[] srcIndices;
39+
40+
public ErasureCodingReplicationWork(BlockInfo block, BlockCollection bc,
41+
DatanodeDescriptor[] srcNodes, List<DatanodeDescriptor> containingNodes,
42+
List<DatanodeStorageInfo> liveReplicaStorages, int additionalReplRequired, Byte[] srcIndices,
43+
int priority) {
44+
super(block, bc, srcNodes, containingNodes, liveReplicaStorages, additionalReplRequired,
45+
priority);
46+
this.srcIndices = srcIndices;
47+
BlockManager.LOG.debug("Creating an ErasureCodingReplicationWork to {} reconstruct ", block);
48+
}
49+
50+
@Override
51+
void chooseTargets(BlockPlacementPolicy blockplacement,
52+
BlockStoragePolicySuite storagePolicySuite,
53+
Set<Node> excludedNodes) {
54+
// TODO: new placement policy for EC considering multiple writers
55+
DatanodeStorageInfo[] chosenTargets = null;
56+
// HDFS-14720. If the block is deleted, the block size will become
57+
// BlockCommand.NO_ACK (LONG.MAX_VALUE) . This kind of block we don't need
58+
// to send for replication or reconstruction
59+
if (!getBlock().isDeleted()) {
60+
chosenTargets = blockplacement.chooseTarget(
61+
getSrcPath(), getAdditionalReplRequired(), getSrcNodes()[0],
62+
getLiveReplicaStorages(), false, excludedNodes, getBlockSize(),
63+
storagePolicySuite.getPolicy(getStoragePolicyID()), null);
64+
} else {
65+
LOG.warn("ErasureCodingWork could not need choose targets for {}", getBlock());
66+
}
67+
setTargets(chosenTargets);
68+
}
69+
70+
@Override
71+
void addTaskToDatanode(NumberReplicas numberReplicas) {
72+
for (int i = 0; i < getSrcNodes().length && i < getTargets().length; i++) {
73+
BlockInfoStriped stripedBlk = (BlockInfoStriped) getBlock();
74+
final byte blockIndex = srcIndices[i];
75+
final DatanodeDescriptor source = getSrcNodes()[i];
76+
final DatanodeStorageInfo target = getTargets()[i];
77+
final long internBlkLen = StripedBlockUtil.getInternalBlockLength(
78+
stripedBlk.getNumBytes(), stripedBlk.getCellSize(),
79+
stripedBlk.getDataBlockNum(), blockIndex);
80+
final Block targetBlk = new Block(stripedBlk.getBlockId() + blockIndex,
81+
internBlkLen, stripedBlk.getGenerationStamp());
82+
source.addECBlockToBeReplicated(targetBlk, new DatanodeStorageInfo[] {target});
83+
LOG.debug("Add replication task from source {} to "
84+
+ "target {} for EC block {}", source, target, targetBlk);
85+
}
86+
}
87+
88+
@VisibleForTesting
89+
public Byte[] getSrcIndices() {
90+
return srcIndices;
91+
}
92+
93+
static byte chooseSource4SimpleReplication(NumberReplicasStriped numberReplicas) {
94+
Map<String, List<Byte>> map = new HashMap<>();
95+
for (int i = 0; i < numberReplicas.getSize(); i++) {
96+
if (numberReplicas.exist(i)) {
97+
DatanodeStorageInfo storage = numberReplicas.getStorage(i);
98+
StoredReplicaState state = numberReplicas.getState(i);
99+
if ((state == LIVE || state == DECOMMISSIONING) && !numberReplicas.isBusy(i)) {
100+
final String rack = storage.getDatanodeDescriptor().getNetworkLocation();
101+
List<Byte> dnList = map.get(rack);
102+
if (dnList == null) {
103+
dnList = new ArrayList<>();
104+
map.put(rack, dnList);
105+
}
106+
dnList.add((byte) i);
107+
}
108+
}
109+
}
110+
List<Byte> max = null;
111+
for (Map.Entry<String, List<Byte>> entry : map.entrySet()) {
112+
if (max == null || entry.getValue().size() > max.size()) {
113+
max = entry.getValue();
114+
}
115+
}
116+
return max != null ? max.get(0) : (byte) -1;
117+
}
118+
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java

Lines changed: 8 additions & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,15 @@
1717
*/
1818
package org.apache.hadoop.hdfs.server.blockmanagement;
1919

20-
import org.apache.hadoop.hdfs.protocol.Block;
20+
import org.apache.hadoop.classification.VisibleForTesting;
2121
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
22-
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
2322
import org.apache.hadoop.net.Node;
2423

25-
import java.util.ArrayList;
26-
import java.util.BitSet;
27-
import java.util.HashMap;
2824
import java.util.List;
29-
import java.util.Map;
3025
import java.util.Set;
3126

3227
class ErasureCodingWork extends BlockReconstructionWork {
3328
private final byte[] liveBlockIndices;
34-
private final byte[] liveBusyBlockIndices;
3529
private final byte[] excludeReconstructedIndices;
3630
private final String blockPoolId;
3731

@@ -41,13 +35,12 @@ public ErasureCodingWork(String blockPoolId, BlockInfo block,
4135
List<DatanodeDescriptor> containingNodes,
4236
List<DatanodeStorageInfo> liveReplicaStorages,
4337
int additionalReplRequired, int priority,
44-
byte[] liveBlockIndices, byte[] liveBusyBlockIndices,
38+
byte[] liveBlockIndices,
4539
byte[] excludeReconstrutedIndices) {
4640
super(block, bc, srcNodes, containingNodes,
4741
liveReplicaStorages, additionalReplRequired, priority);
4842
this.blockPoolId = blockPoolId;
4943
this.liveBlockIndices = liveBlockIndices;
50-
this.liveBusyBlockIndices = liveBusyBlockIndices;
5144
this.excludeReconstructedIndices = excludeReconstrutedIndices;
5245
LOG.debug("Creating an ErasureCodingWork to {} reconstruct ",
5346
block);
@@ -77,128 +70,18 @@ void chooseTargets(BlockPlacementPolicy blockplacement,
7770
setTargets(chosenTargets);
7871
}
7972

80-
/**
81-
* @return true if the current source nodes cover all the internal blocks.
82-
* I.e., we only need to have more racks.
83-
*/
84-
private boolean hasAllInternalBlocks() {
85-
final BlockInfoStriped block = (BlockInfoStriped) getBlock();
86-
if (liveBlockIndices.length
87-
+ liveBusyBlockIndices.length < block.getRealTotalBlockNum()) {
88-
return false;
89-
}
90-
BitSet bitSet = new BitSet(block.getTotalBlockNum());
91-
for (byte index : liveBlockIndices) {
92-
bitSet.set(index);
93-
}
94-
for (byte busyIndex: liveBusyBlockIndices) {
95-
bitSet.set(busyIndex);
96-
}
97-
for (int i = 0; i < block.getRealDataBlockNum(); i++) {
98-
if (!bitSet.get(i)) {
99-
return false;
100-
}
101-
}
102-
for (int i = block.getDataBlockNum(); i < block.getTotalBlockNum(); i++) {
103-
if (!bitSet.get(i)) {
104-
return false;
105-
}
106-
}
107-
return true;
108-
}
109-
110-
/**
111-
* We have all the internal blocks but not enough racks. Thus we do not need
112-
* to do decoding but only simply make an extra copy of an internal block. In
113-
* this scenario, use this method to choose the source datanode for simple
114-
* replication.
115-
* @return The index of the source datanode.
116-
*/
117-
private int chooseSource4SimpleReplication() {
118-
Map<String, List<Integer>> map = new HashMap<>();
119-
for (int i = 0; i < getSrcNodes().length; i++) {
120-
final String rack = getSrcNodes()[i].getNetworkLocation();
121-
List<Integer> dnList = map.get(rack);
122-
if (dnList == null) {
123-
dnList = new ArrayList<>();
124-
map.put(rack, dnList);
125-
}
126-
dnList.add(i);
127-
}
128-
List<Integer> max = null;
129-
for (Map.Entry<String, List<Integer>> entry : map.entrySet()) {
130-
if (max == null || entry.getValue().size() > max.size()) {
131-
max = entry.getValue();
132-
}
133-
}
134-
assert max != null;
135-
return max.get(0);
136-
}
137-
13873
@Override
13974
void addTaskToDatanode(NumberReplicas numberReplicas) {
14075
final DatanodeStorageInfo[] targets = getTargets();
14176
assert targets.length > 0;
14277
BlockInfoStriped stripedBlk = (BlockInfoStriped) getBlock();
143-
144-
if (hasNotEnoughRack()) {
145-
// if we already have all the internal blocks, but not enough racks,
146-
// we only need to replicate one internal block to a new rack
147-
int sourceIndex = chooseSource4SimpleReplication();
148-
createReplicationWork(sourceIndex, targets[0]);
149-
} else if ((numberReplicas.decommissioning() > 0 ||
150-
numberReplicas.liveEnteringMaintenanceReplicas() > 0) &&
151-
hasAllInternalBlocks()) {
152-
List<Integer> leavingServiceSources = findLeavingServiceSources();
153-
// decommissioningSources.size() should be >= targets.length
154-
final int num = Math.min(leavingServiceSources.size(), targets.length);
155-
for (int i = 0; i < num; i++) {
156-
createReplicationWork(leavingServiceSources.get(i), targets[i]);
157-
}
158-
} else {
159-
targets[0].getDatanodeDescriptor().addBlockToBeErasureCoded(
160-
new ExtendedBlock(blockPoolId, stripedBlk), getSrcNodes(), targets,
161-
liveBlockIndices, excludeReconstructedIndices, stripedBlk.getErasureCodingPolicy());
162-
}
78+
targets[0].getDatanodeDescriptor().addBlockToBeErasureCoded(
79+
new ExtendedBlock(blockPoolId, stripedBlk), getSrcNodes(), targets,
80+
liveBlockIndices, excludeReconstructedIndices, stripedBlk.getErasureCodingPolicy());
16381
}
16482

165-
private void createReplicationWork(int sourceIndex,
166-
DatanodeStorageInfo target) {
167-
BlockInfoStriped stripedBlk = (BlockInfoStriped) getBlock();
168-
final byte blockIndex = liveBlockIndices[sourceIndex];
169-
final DatanodeDescriptor source = getSrcNodes()[sourceIndex];
170-
final long internBlkLen = StripedBlockUtil.getInternalBlockLength(
171-
stripedBlk.getNumBytes(), stripedBlk.getCellSize(),
172-
stripedBlk.getDataBlockNum(), blockIndex);
173-
final Block targetBlk = new Block(stripedBlk.getBlockId() + blockIndex,
174-
internBlkLen, stripedBlk.getGenerationStamp());
175-
source.addECBlockToBeReplicated(targetBlk,
176-
new DatanodeStorageInfo[] {target});
177-
LOG.debug("Add replication task from source {} to "
178-
+ "target {} for EC block {}", source, target, targetBlk);
179-
}
180-
181-
private List<Integer> findLeavingServiceSources() {
182-
// Mark the block in normal node.
183-
BlockInfoStriped block = (BlockInfoStriped)getBlock();
184-
BitSet bitSet = new BitSet(block.getRealTotalBlockNum());
185-
for (int i = 0; i < getSrcNodes().length; i++) {
186-
if (getSrcNodes()[i].isInService()) {
187-
bitSet.set(liveBlockIndices[i]);
188-
}
189-
}
190-
// If the block is on the node which is decommissioning or
191-
// entering_maintenance, and it doesn't exist on other normal nodes,
192-
// we just add the node into source list.
193-
List<Integer> srcIndices = new ArrayList<>();
194-
for (int i = 0; i < getSrcNodes().length; i++) {
195-
if ((getSrcNodes()[i].isDecommissionInProgress() ||
196-
(getSrcNodes()[i].isEnteringMaintenance() &&
197-
getSrcNodes()[i].isAlive())) &&
198-
!bitSet.get(liveBlockIndices[i])) {
199-
srcIndices.add(i);
200-
}
201-
}
202-
return srcIndices;
83+
@VisibleForTesting
84+
public byte[] getExcludeReconstructedIndices() {
85+
return excludeReconstructedIndices;
20386
}
20487
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java

Lines changed: 51 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -37,29 +37,61 @@
3737
public class NumberReplicas extends EnumCounters<NumberReplicas.StoredReplicaState> {
3838

3939
public enum StoredReplicaState {
40-
// live replicas. for a striped block, this value excludes redundant
41-
// replicas for the same internal block
42-
LIVE,
43-
READONLY,
44-
// decommissioning replicas. for a striped block ,this value excludes
45-
// redundant and live replicas for the same internal block.
46-
DECOMMISSIONING,
47-
DECOMMISSIONED,
40+
// live replicas
41+
LIVE((byte) 1),
42+
READONLY((byte) 2),
43+
// decommissioning replicas.
44+
DECOMMISSIONING((byte) 5),
45+
DECOMMISSIONED((byte) 11),
4846
// We need live ENTERING_MAINTENANCE nodes to continue
4947
// to serve read request while it is being transitioned to live
5048
// IN_MAINTENANCE if these are the only replicas left.
5149
// MAINTENANCE_NOT_FOR_READ == maintenanceReplicas -
5250
// Live ENTERING_MAINTENANCE.
53-
MAINTENANCE_NOT_FOR_READ,
51+
MAINTENANCE_NOT_FOR_READ((byte) 4),
5452
// Live ENTERING_MAINTENANCE nodes to serve read requests.
55-
MAINTENANCE_FOR_READ,
56-
CORRUPT,
53+
MAINTENANCE_FOR_READ((byte) 3),
54+
CORRUPT((byte) 13),
5755
// excess replicas already tracked by blockmanager's excess map
58-
EXCESS,
59-
STALESTORAGE,
56+
EXCESS((byte) 12),
57+
STALESTORAGE(Byte.MAX_VALUE),
6058
// for striped blocks only. number of redundant internal block replicas
6159
// that have not been tracked by blockmanager yet (i.e., not in excess)
62-
REDUNDANT
60+
REDUNDANT(Byte.MAX_VALUE);
61+
62+
/*
63+
* Priority is used to handle the situation where some block index in
64+
* an EC block has multiple replicas. The higher the priority, the
65+
* healthier the storage. 1 is the highest priority.
66+
* For example, if a block index has two replicas, one in the
67+
* LIVE state and the other in the CORRUPT state, the state will be
68+
* store as LIVE.
69+
*
70+
* The priorities are classified as follows:
71+
* (1) States that may be read normally
72+
* including LIVE, READONLY, MAINTENANCE_FOR_READ, MAINTENANCE_NOT_FOR_READ,
73+
* and DECOMMISSIONING. These states have the highest priority, ranging
74+
* from 1 to 5. It is easy to understand that LIVE has the highest
75+
* priority, READONLY has the second highest priority, and MAINTENANCE_FOR_READ
76+
* must have a higher priority than MAINTENANCE_NOT_FOR_READ. Then
77+
* DECOMMISSIONING has a lower priority than MAINTENANCE_FOR_READ and
78+
* MAINTENANCE_NOT_FOR_READ, mainly because we can tolerate the temporary loss
79+
* of replicas in the MAINTENANCE state.
80+
* (2) States that may not be read normally
81+
* including DECOMMISSIONED, EXCESS, and CORRUPT. Their priorities are the lowest,
82+
* ranging from 11 to 13.
83+
* (3) Some special states, including STALESTORAGE and REDUNDANT, are only used for
84+
* statistical calculations.
85+
* */
86+
private final byte priority;
87+
88+
StoredReplicaState(byte priority) {
89+
this.priority = priority;
90+
}
91+
92+
public byte getPriority() {
93+
return priority;
94+
}
6395
}
6496

6597
public NumberReplicas() {
@@ -134,4 +166,9 @@ public int outOfServiceReplicas() {
134166
public int liveEnteringMaintenanceReplicas() {
135167
return (int)get(MAINTENANCE_FOR_READ);
136168
}
169+
170+
public void add(final NumberReplicas.StoredReplicaState e, final long value, int blockIndex,
171+
DatanodeStorageInfo storage, boolean allowReplaceIfExist) {
172+
super.add(e, value);
173+
}
137174
}

0 commit comments

Comments
 (0)