Skip to content

Commit

Permalink
HDFS-16111. Add a configuration to RoundRobinVolumeChoosingPolicy to …
Browse files Browse the repository at this point in the history
…avoid failed volumes at datanodes.

Change-Id: Iead25812d4073e3980893e3e76f7d2b03b57442a
  • Loading branch information
Zhihai Xu committed Jul 27, 2021
1 parent b7431c3 commit 24a19b0
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1112,6 +1112,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final long DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_DEFAULT = 1024L * 1024L * 1024L * 10L; // 10 GB
public static final String DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_KEY = "dfs.datanode.available-space-volume-choosing-policy.balanced-space-preference-fraction";
public static final float DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_DEFAULT = 0.75f;
public static final String
DFS_DATANODE_ROUND_ROBIN_VOLUME_CHOOSING_POLICY_ADDITIONAL_AVAILABLE_SPACE_KEY =
"dfs.datanode.round-robin-volume-choosing-policy.additional-available-space";
public static final long
DFS_DATANODE_ROUND_ROBIN_VOLUME_CHOOSING_POLICY_ADDITIONAL_AVAILABLE_SPACE_DEFAULT =
1024L * 1024L * 1024L; // 1 GB
public static final String DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY =
HdfsClientConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
public static final String DFS_DATANODE_STARTUP_KEY = "dfs.datanode.startup";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,16 @@
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset;

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ROUND_ROBIN_VOLUME_CHOOSING_POLICY_ADDITIONAL_AVAILABLE_SPACE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ROUND_ROBIN_VOLUME_CHOOSING_POLICY_ADDITIONAL_AVAILABLE_SPACE_KEY;

import java.io.IOException;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;

Expand All @@ -30,7 +35,7 @@
* Use fine-grained locks to synchronize volume choosing.
*/
public class RoundRobinVolumeChoosingPolicy<V extends FsVolumeSpi>
implements VolumeChoosingPolicy<V> {
implements VolumeChoosingPolicy<V>, Configurable {
public static final Logger LOG =
LoggerFactory.getLogger(RoundRobinVolumeChoosingPolicy.class);

Expand All @@ -41,6 +46,9 @@ public class RoundRobinVolumeChoosingPolicy<V extends FsVolumeSpi>
// syncLocks stores the locks for each storage type.
private Object[] syncLocks;

// The required additional available space when choosing a volume.
private long additionalAvailableSpace;

public RoundRobinVolumeChoosingPolicy() {
int numStorageTypes = StorageType.values().length;
curVolumes = new int[numStorageTypes];
Expand All @@ -50,6 +58,23 @@ public RoundRobinVolumeChoosingPolicy() {
}
}

@Override
public void setConf(Configuration conf) {
additionalAvailableSpace = conf.getLong(
DFS_DATANODE_ROUND_ROBIN_VOLUME_CHOOSING_POLICY_ADDITIONAL_AVAILABLE_SPACE_KEY,
DFS_DATANODE_ROUND_ROBIN_VOLUME_CHOOSING_POLICY_ADDITIONAL_AVAILABLE_SPACE_DEFAULT);

LOG.info("Round robin volume choosing policy initialized: " +
DFS_DATANODE_ROUND_ROBIN_VOLUME_CHOOSING_POLICY_ADDITIONAL_AVAILABLE_SPACE_KEY +
" = " + additionalAvailableSpace);
}

@Override
public Configuration getConf() {
// Nothing to do. Only added to fulfill the Configurable contract.
return null;
}

@Override
public V chooseVolume(final List<V> volumes, long blockSize, String storageId)
throws IOException {
Expand Down Expand Up @@ -83,7 +108,7 @@ private V chooseVolume(final int curVolumeIndex, final List<V> volumes,
final V volume = volumes.get(curVolume);
curVolume = (curVolume + 1) % volumes.size();
long availableVolumeSize = volume.getAvailable();
if (availableVolumeSize > blockSize) {
if (availableVolumeSize > blockSize + additionalAvailableSpace) {
curVolumes[curVolumeIndex] = curVolume;
return volume;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2657,6 +2657,17 @@
</description>
</property>

<property>
<name>dfs.datanode.round-robin-volume-choosing-policy.additional-available-space</name>
<value>1073741824</value> <!-- 1 GB -->
<description>
Only used when the dfs.datanode.fsdataset.volume.choosing.policy is set to
org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy.
This setting controls how much additional available space (unit is byte) is needed
when choosing a volume.
</description>
</property>

<property>
<name>dfs.namenode.edits.noeditlogchannelflush</name>
<value>false</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,9 @@ private void initDefaultConfigurations() {
DEFAULT_DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD);
this.storagesPerDatanode =
FsDatasetTestUtils.Factory.getFactory(conf).getDefaultNumOfDataDirs();
conf.setLong(DFSConfigKeys
.DFS_DATANODE_ROUND_ROBIN_VOLUME_CHOOSING_POLICY_ADDITIONAL_AVAILABLE_SPACE_KEY,
0);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset;

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ROUND_ROBIN_VOLUME_CHOOSING_POLICY_ADDITIONAL_AVAILABLE_SPACE_KEY;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.apache.hadoop.util.ReflectionUtils;
Expand Down Expand Up @@ -70,7 +73,50 @@ public static void testRR(VolumeChoosingPolicy<FsVolumeSpi> policy)
// Passed.
}
}


// Test the Round-Robin block-volume choosing algorithm with
// additional available space configured.
@Test
@SuppressWarnings("unchecked")
public void testRRWithAdditionalAvailableSpace() throws Exception {
Configuration conf = new Configuration();
// Set the additional available space needed
conf.setLong(
DFS_DATANODE_ROUND_ROBIN_VOLUME_CHOOSING_POLICY_ADDITIONAL_AVAILABLE_SPACE_KEY,
100);
final RoundRobinVolumeChoosingPolicy<FsVolumeSpi> policy =
ReflectionUtils.newInstance(RoundRobinVolumeChoosingPolicy.class, conf);
testRRWithAdditionalAvailableSpace(policy);
}

public static void testRRWithAdditionalAvailableSpace(
VolumeChoosingPolicy<FsVolumeSpi> policy) throws Exception {
final List<FsVolumeSpi> volumes = new ArrayList<FsVolumeSpi>();

// First volume, with 100 bytes of space.
volumes.add(Mockito.mock(FsVolumeSpi.class));
Mockito.when(volumes.get(0).getAvailable()).thenReturn(100L);

// Second volume, with 200 bytes of space.
volumes.add(Mockito.mock(FsVolumeSpi.class));
Mockito.when(volumes.get(1).getAvailable()).thenReturn(200L);

// The first volume has only 100L space, so the policy should choose
// the second one with additional available space configured as 100L.
Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 0,
null));
Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 0,
null));

// Fail if no volume can be chosen?
try {
policy.chooseVolume(volumes, 100, null);
Assert.fail();
} catch (IOException e) {
// Passed.
}
}

// ChooseVolume should throw DiskOutOfSpaceException
// with volume and block sizes in exception message.
@Test
Expand Down

0 comments on commit 24a19b0

Please sign in to comment.