Skip to content

HDFS-17542. EC: Optimize the EC block reconstruction. #6915

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState;


import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.DECOMMISSIONING;
import static org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.LIVE;

public class ErasureCodingReplicationWork extends BlockReconstructionWork {

private final Byte[] srcIndices;

public ErasureCodingReplicationWork(BlockInfo block, BlockCollection bc,
DatanodeDescriptor[] srcNodes, List<DatanodeDescriptor> containingNodes,
List<DatanodeStorageInfo> liveReplicaStorages, int additionalReplRequired, Byte[] srcIndices,
int priority) {
super(block, bc, srcNodes, containingNodes, liveReplicaStorages, additionalReplRequired,
priority);
this.srcIndices = srcIndices;
BlockManager.LOG.debug("Creating an ErasureCodingReplicationWork to {} reconstruct ", block);
}

@Override
void chooseTargets(BlockPlacementPolicy blockplacement,
BlockStoragePolicySuite storagePolicySuite,
Set<Node> excludedNodes) {
// TODO: new placement policy for EC considering multiple writers
DatanodeStorageInfo[] chosenTargets = null;
// HDFS-14720. If the block is deleted, the block size will become
// BlockCommand.NO_ACK (LONG.MAX_VALUE) . This kind of block we don't need
// to send for replication or reconstruction
if (!getBlock().isDeleted()) {
chosenTargets = blockplacement.chooseTarget(
getSrcPath(), getAdditionalReplRequired(), getSrcNodes()[0],
getLiveReplicaStorages(), false, excludedNodes, getBlockSize(),
storagePolicySuite.getPolicy(getStoragePolicyID()), null);
} else {
LOG.warn("ErasureCodingWork could not need choose targets for {}", getBlock());
}
setTargets(chosenTargets);
}

@Override
boolean addTaskToDatanode(NumberReplicas numberReplicas) {
for (int i = 0; i < getSrcNodes().length && i < getTargets().length; i++) {
BlockInfoStriped stripedBlk = (BlockInfoStriped) getBlock();
final byte blockIndex = srcIndices[i];
final DatanodeDescriptor source = getSrcNodes()[i];
final DatanodeStorageInfo target = getTargets()[i];
final long internBlkLen = StripedBlockUtil.getInternalBlockLength(
stripedBlk.getNumBytes(), stripedBlk.getCellSize(),
stripedBlk.getDataBlockNum(), blockIndex);
final Block targetBlk = new Block(stripedBlk.getBlockId() + blockIndex,
internBlkLen, stripedBlk.getGenerationStamp());
source.addECBlockToBeReplicated(targetBlk, new DatanodeStorageInfo[] {target});
LOG.debug("Add replication task from source {} to "
+ "target {} for EC block {}", source, target, targetBlk);
}
return true;
}

@VisibleForTesting
public Byte[] getSrcIndices() {
return srcIndices;
}

static byte chooseSource4SimpleReplication(NumberReplicasStriped numberReplicas) {
Map<String, List<Byte>> map = new HashMap<>();
for (int i = 0; i < numberReplicas.getSize(); i++) {
if (numberReplicas.exist(i)) {
DatanodeStorageInfo storage = numberReplicas.getStorage(i);
StoredReplicaState state = numberReplicas.getState(i);
if ((state == LIVE || state == DECOMMISSIONING) && !numberReplicas.isBusy(i)) {
final String rack = storage.getDatanodeDescriptor().getNetworkLocation();
List<Byte> dnList = map.get(rack);
if (dnList == null) {
dnList = new ArrayList<>();
map.put(rack, dnList);
}
dnList.add((byte) i);
}
}
}
List<Byte> max = null;
for (Map.Entry<String, List<Byte>> entry : map.entrySet()) {
if (max == null || entry.getValue().size() > max.size()) {
max = entry.getValue();
}
}
return max != null ? max.get(0) : (byte) -1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,15 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;

import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.net.Node;

import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

class ErasureCodingWork extends BlockReconstructionWork {
private final byte[] liveBlockIndices;
private final byte[] liveBusyBlockIndices;
private final byte[] excludeReconstructedIndices;
private final String blockPoolId;

Expand All @@ -41,13 +35,12 @@ public ErasureCodingWork(String blockPoolId, BlockInfo block,
List<DatanodeDescriptor> containingNodes,
List<DatanodeStorageInfo> liveReplicaStorages,
int additionalReplRequired, int priority,
byte[] liveBlockIndices, byte[] liveBusyBlockIndices,
byte[] liveBlockIndices,
byte[] excludeReconstrutedIndices) {
super(block, bc, srcNodes, containingNodes,
liveReplicaStorages, additionalReplRequired, priority);
this.blockPoolId = blockPoolId;
this.liveBlockIndices = liveBlockIndices;
this.liveBusyBlockIndices = liveBusyBlockIndices;
this.excludeReconstructedIndices = excludeReconstrutedIndices;
LOG.debug("Creating an ErasureCodingWork to {} reconstruct ",
block);
Expand Down Expand Up @@ -77,132 +70,19 @@ void chooseTargets(BlockPlacementPolicy blockplacement,
setTargets(chosenTargets);
}

/**
* @return true if the current source nodes cover all the internal blocks.
* I.e., we only need to have more racks.
*/
private boolean hasAllInternalBlocks() {
final BlockInfoStriped block = (BlockInfoStriped) getBlock();
if (liveBlockIndices.length
+ liveBusyBlockIndices.length < block.getRealTotalBlockNum()) {
return false;
}
BitSet bitSet = new BitSet(block.getTotalBlockNum());
for (byte index : liveBlockIndices) {
bitSet.set(index);
}
for (byte busyIndex: liveBusyBlockIndices) {
bitSet.set(busyIndex);
}
for (int i = 0; i < block.getRealDataBlockNum(); i++) {
if (!bitSet.get(i)) {
return false;
}
}
for (int i = block.getDataBlockNum(); i < block.getTotalBlockNum(); i++) {
if (!bitSet.get(i)) {
return false;
}
}
return true;
}

/**
* We have all the internal blocks but not enough racks. Thus we do not need
* to do decoding but only simply make an extra copy of an internal block. In
* this scenario, use this method to choose the source datanode for simple
* replication.
* @return The index of the source datanode.
*/
private int chooseSource4SimpleReplication() {
Map<String, List<Integer>> map = new HashMap<>();
for (int i = 0; i < getSrcNodes().length; i++) {
final String rack = getSrcNodes()[i].getNetworkLocation();
List<Integer> dnList = map.get(rack);
if (dnList == null) {
dnList = new ArrayList<>();
map.put(rack, dnList);
}
dnList.add(i);
}
List<Integer> max = null;
for (Map.Entry<String, List<Integer>> entry : map.entrySet()) {
if (max == null || entry.getValue().size() > max.size()) {
max = entry.getValue();
}
}
assert max != null;
return max.get(0);
}

@Override
boolean addTaskToDatanode(NumberReplicas numberReplicas) {
final DatanodeStorageInfo[] targets = getTargets();
assert targets.length > 0;
BlockInfoStriped stripedBlk = (BlockInfoStriped) getBlock();
boolean flag = true;
if (hasNotEnoughRack()) {
// if we already have all the internal blocks, but not enough racks,
// we only need to replicate one internal block to a new rack
int sourceIndex = chooseSource4SimpleReplication();
createReplicationWork(sourceIndex, targets[0]);
} else if ((numberReplicas.decommissioning() > 0 ||
numberReplicas.liveEnteringMaintenanceReplicas() > 0) &&
hasAllInternalBlocks()) {
List<Integer> leavingServiceSources = findLeavingServiceSources();
// decommissioningSources.size() should be >= targets.length
final int num = Math.min(leavingServiceSources.size(), targets.length);
if (num == 0) {
flag = false;
}
for (int i = 0; i < num; i++) {
createReplicationWork(leavingServiceSources.get(i), targets[i]);
}
} else {
targets[0].getDatanodeDescriptor().addBlockToBeErasureCoded(
new ExtendedBlock(blockPoolId, stripedBlk), getSrcNodes(), targets,
liveBlockIndices, excludeReconstructedIndices, stripedBlk.getErasureCodingPolicy());
}
return flag;
}

private void createReplicationWork(int sourceIndex,
DatanodeStorageInfo target) {
BlockInfoStriped stripedBlk = (BlockInfoStriped) getBlock();
final byte blockIndex = liveBlockIndices[sourceIndex];
final DatanodeDescriptor source = getSrcNodes()[sourceIndex];
final long internBlkLen = StripedBlockUtil.getInternalBlockLength(
stripedBlk.getNumBytes(), stripedBlk.getCellSize(),
stripedBlk.getDataBlockNum(), blockIndex);
final Block targetBlk = new Block(stripedBlk.getBlockId() + blockIndex,
internBlkLen, stripedBlk.getGenerationStamp());
source.addECBlockToBeReplicated(targetBlk,
new DatanodeStorageInfo[] {target});
LOG.debug("Add replication task from source {} to "
+ "target {} for EC block {}", source, target, targetBlk);
targets[0].getDatanodeDescriptor().addBlockToBeErasureCoded(
new ExtendedBlock(blockPoolId, stripedBlk), getSrcNodes(), targets,
liveBlockIndices, excludeReconstructedIndices, stripedBlk.getErasureCodingPolicy());
return true;
}

private List<Integer> findLeavingServiceSources() {
// Mark the block in normal node.
BlockInfoStriped block = (BlockInfoStriped)getBlock();
BitSet bitSet = new BitSet(block.getRealTotalBlockNum());
for (int i = 0; i < getSrcNodes().length; i++) {
if (getSrcNodes()[i].isInService()) {
bitSet.set(liveBlockIndices[i]);
}
}
// If the block is on the node which is decommissioning or
// entering_maintenance, and it doesn't exist on other normal nodes,
// we just add the node into source list.
List<Integer> srcIndices = new ArrayList<>();
for (int i = 0; i < getSrcNodes().length; i++) {
if ((getSrcNodes()[i].isDecommissionInProgress() ||
(getSrcNodes()[i].isEnteringMaintenance() &&
getSrcNodes()[i].isAlive())) &&
!bitSet.get(liveBlockIndices[i])) {
srcIndices.add(i);
}
}
return srcIndices;
@VisibleForTesting
public byte[] getExcludeReconstructedIndices() {
return excludeReconstructedIndices;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,29 +37,61 @@
public class NumberReplicas extends EnumCounters<NumberReplicas.StoredReplicaState> {

public enum StoredReplicaState {
// live replicas. for a striped block, this value excludes redundant
// replicas for the same internal block
LIVE,
READONLY,
// decommissioning replicas. for a striped block ,this value excludes
// redundant and live replicas for the same internal block.
DECOMMISSIONING,
DECOMMISSIONED,
// live replicas
LIVE((byte) 1),
READONLY((byte) 2),
// decommissioning replicas.
DECOMMISSIONING((byte) 5),
DECOMMISSIONED((byte) 11),
// We need live ENTERING_MAINTENANCE nodes to continue
// to serve read request while it is being transitioned to live
// IN_MAINTENANCE if these are the only replicas left.
// MAINTENANCE_NOT_FOR_READ == maintenanceReplicas -
// Live ENTERING_MAINTENANCE.
MAINTENANCE_NOT_FOR_READ,
MAINTENANCE_NOT_FOR_READ((byte) 4),
// Live ENTERING_MAINTENANCE nodes to serve read requests.
MAINTENANCE_FOR_READ,
CORRUPT,
MAINTENANCE_FOR_READ((byte) 3),
CORRUPT((byte) 13),
// excess replicas already tracked by blockmanager's excess map
EXCESS,
STALESTORAGE,
EXCESS((byte) 12),
STALESTORAGE(Byte.MAX_VALUE),
// for striped blocks only. number of redundant internal block replicas
// that have not been tracked by blockmanager yet (i.e., not in excess)
REDUNDANT
REDUNDANT(Byte.MAX_VALUE);

/*
* Priority is used to handle the situation where some block index in
* an EC block has multiple replicas. The higher the priority, the
* healthier the storage. 1 is the highest priority.
* For example, if a block index has two replicas, one in the
* LIVE state and the other in the CORRUPT state, the state will be
* store as LIVE.
*
* The priorities are classified as follows:
* (1) States that may be read normally
* including LIVE, READONLY, MAINTENANCE_FOR_READ, MAINTENANCE_NOT_FOR_READ,
* and DECOMMISSIONING. These states have the highest priority, ranging
* from 1 to 5. It is easy to understand that LIVE has the highest
* priority, READONLY has the second highest priority, and MAINTENANCE_FOR_READ
* must have a higher priority than MAINTENANCE_NOT_FOR_READ. Then
* DECOMMISSIONING has a lower priority than MAINTENANCE_FOR_READ and
* MAINTENANCE_NOT_FOR_READ, mainly because we can tolerate the temporary loss
* of replicas in the MAINTENANCE state.
* (2) States that may not be read normally
* including DECOMMISSIONED, EXCESS, and CORRUPT. Their priorities are the lowest,
* ranging from 11 to 13.
* (3) Some special states, including STALESTORAGE and REDUNDANT, are only used for
* statistical calculations.
* */
private final byte priority;

StoredReplicaState(byte priority) {
this.priority = priority;
}

public byte getPriority() {
return priority;
}
}

public NumberReplicas() {
Expand Down Expand Up @@ -134,4 +166,9 @@ public int outOfServiceReplicas() {
public int liveEnteringMaintenanceReplicas() {
return (int)get(MAINTENANCE_FOR_READ);
}

public void add(final NumberReplicas.StoredReplicaState e, final long value, int blockIndex,
DatanodeStorageInfo storage, boolean allowReplaceIfExist) {
super.add(e, value);
}
}
Loading
Loading