Skip to content

HDFS-15549. Improve DISK/ARCHIVE movement if they are on same filesystem #2583

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Jan 16, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,11 @@ String[] linkCount(File file) throws IOException {
*/

/**
* Creates a hardlink
* Creates a hardlink.
* @param file - existing source file
* @param linkName - desired target link file
*/
public static void createHardLink(File file, File linkName)
public static void createHardLink(File file, File linkName)
throws IOException {
if (file == null) {
throw new IOException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ public static StorageType parseStorageType(String s) {
return StorageType.valueOf(StringUtils.toUpperCase(s));
}

public static boolean allowSameDiskTiering(StorageType storageType) {
return storageType == StorageType.DISK
|| storageType == StorageType.ARCHIVE;
}

private static List<StorageType> getNonTransientTypes() {
List<StorageType> nonTransientTypes = new ArrayList<>();
for (StorageType t : VALUES) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,8 @@ public DirectoryScanner(FsDatasetSpi<?> dataset, Configuration conf) {
* Start the scanner. The scanner will run every
* {@link DFSConfigKeys#DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY} seconds.
*/
void start() {
@VisibleForTesting
public void start() {
shouldRun.set(true);
long firstScanTime = ThreadLocalRandom.current().nextLong(scanPeriodMsecs);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import javax.management.ObjectName;
import javax.management.StandardMBean;

import org.apache.hadoop.fs.HardLink;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;

import org.apache.hadoop.HadoopIllegalArgumentException;
Expand Down Expand Up @@ -994,6 +995,20 @@ static File[] copyBlockFiles(long blockId, long genStamp,
smallBufferSize, conf);
}

/**
* Link the block and meta files for the given block to the given destination.
* @return the new meta and block files.
* @throws IOException
*/
static File[] hardLinkBlockFiles(long blockId, long genStamp,
ReplicaInfo srcReplica, File destRoot) throws IOException {
final File destDir = DatanodeUtil.idToBlockDir(destRoot, blockId);
// blockName is same as the filename for the block
final File dstFile = new File(destDir, srcReplica.getBlockName());
final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, genStamp);
return hardLinkBlockFiles(srcReplica, dstMeta, dstFile);
}

static File[] copyBlockFiles(ReplicaInfo srcReplica, File dstMeta,
File dstFile, boolean calculateChecksum,
int smallBufferSize, final Configuration conf)
Expand Down Expand Up @@ -1026,6 +1041,34 @@ static File[] copyBlockFiles(ReplicaInfo srcReplica, File dstMeta,
return new File[] {dstMeta, dstFile};
}

static File[] hardLinkBlockFiles(ReplicaInfo srcReplica, File dstMeta,
File dstFile)
throws IOException {
// Create parent folder if not exists.
srcReplica.getFileIoProvider()
.mkdirs(srcReplica.getVolume(), dstFile.getParentFile());
try {
HardLink.createHardLink(
new File(srcReplica.getBlockURI()), dstFile);
} catch (IOException e) {
throw new IOException("Failed to hardLink "
+ srcReplica + " block file to "
+ dstFile, e);
}
try {
HardLink.createHardLink(
new File(srcReplica.getMetadataURI()), dstMeta);
} catch (IOException e) {
throw new IOException("Failed to hardLink "
+ srcReplica + " metadata to "
+ dstMeta, e);
}
if (LOG.isDebugEnabled()) {
LOG.info("Linked " + srcReplica.getBlockURI() + " to " + dstFile);
}
return new File[]{dstMeta, dstFile};
}

/**
* Move block files from one storage to another storage.
* @return Returns the Old replicaInfo
Expand Down Expand Up @@ -1058,12 +1101,30 @@ public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block,
}

FsVolumeReference volumeRef = null;
boolean shouldConsiderSameMountVolume =
shouldConsiderSameMountVolume(replicaInfo.getVolume(),
targetStorageType, targetStorageId);
boolean useVolumeOnSameMount = false;

try (AutoCloseableLock lock = datasetReadLock.acquire()) {
volumeRef = volumes.getNextVolume(targetStorageType, targetStorageId,
block.getNumBytes());
if (shouldConsiderSameMountVolume) {
volumeRef = volumes.getVolumeByMount(targetStorageType,
((FsVolumeImpl) replicaInfo.getVolume()).getMount(),
block.getNumBytes());
if (volumeRef != null) {
useVolumeOnSameMount = true;
}
}
if (!useVolumeOnSameMount) {
volumeRef = volumes.getNextVolume(
targetStorageType,
targetStorageId,
block.getNumBytes()
);
}
}
try {
moveBlock(block, replicaInfo, volumeRef);
moveBlock(block, replicaInfo, volumeRef, useVolumeOnSameMount);
} finally {
if (volumeRef != null) {
volumeRef.close();
Expand All @@ -1074,20 +1135,54 @@ public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block,
return replicaInfo;
}

/**
* When configuring DISK/ARCHIVE on same volume,
* check if we should find the counterpart on the same disk mount.
*/
@VisibleForTesting
boolean shouldConsiderSameMountVolume(FsVolumeSpi fsVolume,
StorageType targetStorageType, String targetStorageID) {
if (targetStorageID != null && !targetStorageID.isEmpty()) {
return false;
}
if (!(fsVolume instanceof FsVolumeImpl)
|| ((FsVolumeImpl) fsVolume).getMount().isEmpty()) {
return false;
}
StorageType sourceStorageType = fsVolume.getStorageType();
// Source/dest storage types are different
if (sourceStorageType == targetStorageType) {
return false;
}
// Source/dest storage types are either DISK or ARCHIVE.
return StorageType.allowSameDiskTiering(sourceStorageType)
&& StorageType.allowSameDiskTiering(targetStorageType);
}

/**
* Moves a block from a given volume to another.
*
* @param block - Extended Block
* @param replicaInfo - ReplicaInfo
* @param volumeRef - Volume Ref - Closed by caller.
* @param moveBlockToLocalMount - Whether we use shortcut
* to move block on same mount.
* @return newReplicaInfo
* @throws IOException
*/
@VisibleForTesting
ReplicaInfo moveBlock(ExtendedBlock block, ReplicaInfo replicaInfo,
FsVolumeReference volumeRef) throws IOException {
ReplicaInfo newReplicaInfo = copyReplicaToVolume(block, replicaInfo,
volumeRef);
FsVolumeReference volumeRef, boolean moveBlockToLocalMount)
throws IOException {
ReplicaInfo newReplicaInfo;
if (moveBlockToLocalMount) {
newReplicaInfo = moveReplicaToVolumeOnSameMount(block, replicaInfo,
volumeRef);
} else {
newReplicaInfo = copyReplicaToVolume(block, replicaInfo,
volumeRef);
}

finalizeNewReplica(newReplicaInfo, block);
removeOldReplica(replicaInfo, newReplicaInfo, block.getBlockPoolId());
return newReplicaInfo;
Expand Down Expand Up @@ -1128,6 +1223,33 @@ ReplicaInfo copyReplicaToVolume(ExtendedBlock block, ReplicaInfo replicaInfo,
return newReplicaInfo;
}

/**
* Shortcut to use hardlink to move blocks on same mount.
* This is useful when moving blocks between storage types on same disk mount.
* Two cases need to be considered carefully:
* 1) Datanode restart in the middle should not cause data loss.
* We use hardlink to avoid this.
* 2) Finalized blocks can be reopened to append.
* This is already handled by dataset lock and gen stamp.
* See HDFS-12942
*
* @param block - Extended Block
* @param replicaInfo - ReplicaInfo
* @param volumeRef - Volume Ref - Closed by caller.
* @return newReplicaInfo new replica object created in specified volume.
* @throws IOException
*/
@VisibleForTesting
ReplicaInfo moveReplicaToVolumeOnSameMount(ExtendedBlock block,
ReplicaInfo replicaInfo,
FsVolumeReference volumeRef) throws IOException {
FsVolumeImpl targetVolume = (FsVolumeImpl) volumeRef.getVolume();
// Move files to temp dir first
ReplicaInfo newReplicaInfo = targetVolume.hardLinkBlockToTmpLocation(block,
replicaInfo);
return newReplicaInfo;
}

/**
* Finalizes newReplica by calling finalizeReplica internally.
*
Expand Down Expand Up @@ -1177,7 +1299,7 @@ public ReplicaInfo moveBlockAcrossVolumes(ExtendedBlock block, FsVolumeSpi
}

try {
moveBlock(block, replicaInfo, volumeRef);
moveBlock(block, replicaInfo, volumeRef, false);
} finally {
if (volumeRef != null) {
volumeRef.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,9 +484,8 @@ long getActualNonDfsUsed() throws IOException {
// should share the same amount of reserved capacity.
// When calculating actual non dfs used,
// exclude DFS used capacity by another volume.
if (enableSameDiskTiering &&
(storageType == StorageType.DISK
|| storageType == StorageType.ARCHIVE)) {
if (enableSameDiskTiering
&& StorageType.allowSameDiskTiering(storageType)) {
StorageType counterpartStorageType = storageType == StorageType.DISK
? StorageType.ARCHIVE : StorageType.DISK;
FsVolumeReference counterpartRef = dataset
Expand Down Expand Up @@ -1529,6 +1528,24 @@ public ReplicaInfo moveBlockToTmpLocation(ExtendedBlock block,
return newReplicaInfo;
}

public ReplicaInfo hardLinkBlockToTmpLocation(ExtendedBlock block,
ReplicaInfo replicaInfo) throws IOException {

File[] blockFiles = FsDatasetImpl.hardLinkBlockFiles(block.getBlockId(),
block.getGenerationStamp(), replicaInfo,
getTmpDir(block.getBlockPoolId()));

ReplicaInfo newReplicaInfo = new ReplicaBuilder(ReplicaState.TEMPORARY)
.setBlockId(replicaInfo.getBlockId())
.setGenerationStamp(replicaInfo.getGenerationStamp())
.setFsVolume(this)
.setDirectoryToUse(blockFiles[0].getParentFile())
.setBytesToReserve(0)
.build();
newReplicaInfo.setNumBytes(blockFiles[1].length());
return newReplicaInfo;
}

public File[] copyBlockToLazyPersistLocation(String bpId, long blockId,
long genStamp,
ReplicaInfo replicaInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,30 @@ private FsVolumeReference chooseVolume(List<FsVolumeImpl> list,
}
}

/**
* Get volume by disk mount to place a block.
* This is useful for same disk tiering.
*
* @param storageType The desired {@link StorageType}
* @param mount Disk mount of the volume
* @param blockSize Free space needed on the volume
* @return
* @throws IOException
*/
FsVolumeReference getVolumeByMount(StorageType storageType,
String mount, long blockSize) throws IOException {
if (!enableSameDiskTiering) {
return null;
}
FsVolumeReference volume = mountVolumeMap
.getVolumeRefByMountAndStorageType(mount, storageType);
// Check if volume has enough capacity
if (volume != null && volume.getVolume().getAvailable() > blockSize) {
return volume;
}
return null;
}

/**
* Get next volume.
*
Expand Down Expand Up @@ -354,9 +378,8 @@ private void removeVolume(FsVolumeImpl target) {
* Check if same disk tiering is applied to the volume.
*/
private boolean isSameDiskTieringApplied(FsVolumeImpl target) {
return enableSameDiskTiering &&
(target.getStorageType() == StorageType.DISK
|| target.getStorageType() == StorageType.ARCHIVE);
return enableSameDiskTiering
&& StorageType.allowSameDiskTiering(target.getStorageType());
}

/**
Expand Down
Loading