Skip to content

HDFS-14989. Add a 'swapBlockList' operation to Namenode. #1819

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.server.common.ECTopologyVerifier;
import org.apache.hadoop.hdfs.server.namenode.SwapBlockListOp.SwapBlockListResult;
import org.apache.hadoop.hdfs.server.namenode.metrics.ReplicatedBlocksMBean;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
Expand Down Expand Up @@ -8498,5 +8499,35 @@ public void checkErasureCodingSupported(String operationName)
throw new UnsupportedActionException(operationName + " not supported.");
}
}

/**
* Namesystem API to swap block list between source and destination files.
*
* @param src source file.
* @param dst destination file.
* @throws IOException on Error.
*/
boolean swapBlockList(final String src, final String dst, long genTimestamp)
throws IOException {
final String operationName = "swapBlockList";
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = getPermissionChecker();
SwapBlockListResult res = null;
try {
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot swap block list." + src + ", " + dst);
res = SwapBlockListOp.swapBlocks(dir, pc, src, dst, genTimestamp);
} finally {
writeUnlock(operationName);
}
} catch (AccessControlException e) {
logAuditEvent(false, operationName, src, dst, null);
throw e;
}
logAuditEvent(true, operationName, src, dst, res.getDstFileAuditStat());
return res.isSuccess();
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ static byte getStoragePolicyID(long header) {
return (byte)STORAGE_POLICY_ID.BITS.retrieve(header);
}

static byte getBlockLayoutPolicy(long header) {
return (byte)BLOCK_LAYOUT_AND_REDUNDANCY.BITS.retrieve(header);
}

// Union of all the block type masks. Currently there is only
// BLOCK_TYPE_MASK_STRIPED
static final long BLOCK_TYPE_MASK = 1 << 11;
Expand Down Expand Up @@ -728,6 +732,17 @@ public void clearBlocks() {
this.blocks = BlockInfo.EMPTY_ARRAY;
}

/**
* This method replaces blocks in a file with the supplied blocks.
* @param newBlocks List of new blocks.
*/
void replaceBlocks(BlockInfo[] newBlocks) {
this.blocks = Arrays.copyOf(newBlocks, newBlocks.length);
for (BlockInfo block : blocks) {
block.setBlockCollectionId(getId());
}
}

private void updateRemovedUnderConstructionFiles(
ReclaimContext reclaimContext) {
if (isUnderConstruction() && reclaimContext.removedUCFiles != null) {
Expand Down Expand Up @@ -1257,4 +1272,17 @@ boolean isBlockInLatestSnapshot(BlockInfo block) {
return snapshotBlocks != null &&
Arrays.asList(snapshotBlocks).contains(block);
}

/**
* Update Header with new Block Layout and Redundancy bits.
* @param newBlockLayoutPolicy new block layout policy.
* @param newStoragePolicy new storage policy ID.
*/
void updateHeaderWithNewPolicy(byte newBlockLayoutPolicy,
byte newStoragePolicy) {
this.header = HeaderFormat.toLong(
HeaderFormat.getPreferredBlockSize(header),
newBlockLayoutPolicy,
newStoragePolicy);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2667,4 +2667,14 @@ public Long getNextSPSPath() throws IOException {
}
return namesystem.getBlockManager().getSPSManager().getNextPathId();
}

public boolean swapBlockList(String src, String dst, long maxTimestamp)
throws IOException {
checkNNStartup();
if (stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*DIR* NameNode.swapBlockList: {} and {}", src, dst);
}
return namesystem.swapBlockList(src, dst, maxTimestamp);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdfs.server.namenode;

import java.io.IOException;

import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
import org.apache.hadoop.hdfs.server.namenode.INodeFile.HeaderFormat;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.util.Time;

/**
* Class to carry out the operation of swapping blocks from one file to another.
* Along with swapping blocks, we can also optionally swap the block layout
* of a file header, which is useful for client operations like converting
* replicated to EC file.
*/
public final class SwapBlockListOp {

private SwapBlockListOp() {
}

static SwapBlockListResult swapBlocks(FSDirectory fsd, FSPermissionChecker pc,
String src, String dst, long genTimestamp)
throws IOException {

final INodesInPath srcIIP = fsd.resolvePath(pc, src, DirOp.WRITE);
final INodesInPath dstIIP = fsd.resolvePath(pc, dst, DirOp.WRITE);
if (fsd.isPermissionEnabled()) {
fsd.checkAncestorAccess(pc, srcIIP, FsAction.WRITE);
fsd.checkAncestorAccess(pc, dstIIP, FsAction.WRITE);
}
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* FSDirectory.swapBlockList: "
+ srcIIP.getPath() + " and " + dstIIP.getPath());
}
SwapBlockListResult result;
fsd.writeLock();
try {
result = swapBlockList(fsd, srcIIP, dstIIP, genTimestamp);
} finally {
fsd.writeUnlock();
}
return result;
}

private static SwapBlockListResult swapBlockList(FSDirectory fsd,
final INodesInPath srcIIP,
final INodesInPath dstIIP,
long genTimestamp)
throws IOException {

assert fsd.hasWriteLock();
validateInode(srcIIP);
validateInode(dstIIP);
fsd.ezManager.checkMoveValidity(srcIIP, dstIIP);

final String src = srcIIP.getPath();
final String dst = dstIIP.getPath();
if (dst.equals(src)) {
throw new FileAlreadyExistsException("The source " + src +
" and destination " + dst + " are the same");
}

INodeFile srcINodeFile = srcIIP.getLastINode().asFile();
INodeFile dstINodeFile = dstIIP.getLastINode().asFile();

String errorPrefix = "DIR* FSDirectory.swapBlockList: ";
String error = "Swap Block List destination file ";
BlockInfo lastBlock = dstINodeFile.getLastBlock();
if (lastBlock != null && lastBlock.getGenerationStamp() != genTimestamp) {
error += dstIIP.getPath() +
" has last block with different gen timestamp.";
NameNode.stateChangeLog.warn(errorPrefix + error);
throw new IOException(error);
}

long mtime = Time.now();
BlockInfo[] dstINodeFileBlocks = dstINodeFile.getBlocks();
dstINodeFile.replaceBlocks(srcINodeFile.getBlocks());
srcINodeFile.replaceBlocks(dstINodeFileBlocks);

long srcHeader = srcINodeFile.getHeaderLong();
long dstHeader = dstINodeFile.getHeaderLong();

byte dstBlockLayoutPolicy =
HeaderFormat.getBlockLayoutPolicy(dstHeader);
byte srcBlockLayoutPolicy =
HeaderFormat.getBlockLayoutPolicy(srcHeader);

byte dstStoragePolicyID = HeaderFormat.getStoragePolicyID(dstHeader);
byte srcStoragePolicyID = HeaderFormat.getStoragePolicyID(srcHeader);

dstINodeFile.updateHeaderWithNewPolicy(srcBlockLayoutPolicy,
srcStoragePolicyID);
dstINodeFile.setModificationTime(mtime);

srcINodeFile.updateHeaderWithNewPolicy(dstBlockLayoutPolicy,
dstStoragePolicyID);
srcINodeFile.setModificationTime(mtime);

return new SwapBlockListResult(true,
fsd.getAuditFileInfo(srcIIP),
fsd.getAuditFileInfo(dstIIP));
}

private static void validateInode(INodesInPath srcIIP)
throws IOException {

String errorPrefix = "DIR* FSDirectory.swapBlockList: ";
String error = "Swap Block List input ";

INode srcInode = FSDirectory.resolveLastINode(srcIIP);

// Check if INode is a file and NOT a directory.
if (!srcInode.isFile()) {
error += srcIIP.getPath() + " is not a file.";
NameNode.stateChangeLog.warn(errorPrefix + error);
throw new IOException(error);
}

// Check if file is under construction.
INodeFile iNodeFile = (INodeFile) srcIIP.getLastINode();
if (iNodeFile.isUnderConstruction()) {
error += srcIIP.getPath() + " is under construction.";
NameNode.stateChangeLog.warn(errorPrefix + error);
throw new IOException(error);
}

// Check if any parent directory is in a snapshot.
if (srcIIP.getLatestSnapshotId() != Snapshot.CURRENT_STATE_ID) {
error += srcIIP.getPath() + " is in a snapshot directory.";
NameNode.stateChangeLog.warn(errorPrefix + error);
throw new IOException(error);
}
}

static class SwapBlockListResult {
private final boolean success;
private final FileStatus srcFileAuditStat;
private final FileStatus dstFileAuditStat;

SwapBlockListResult(boolean success,
FileStatus srcFileAuditStat,
FileStatus dstFileAuditStat) {
this.success = success;
this.srcFileAuditStat = srcFileAuditStat;
this.dstFileAuditStat = dstFileAuditStat;
}

public boolean isSuccess() {
return success;
}

public FileStatus getDstFileAuditStat() {
return dstFileAuditStat;
}

public FileStatus getSrcFileAuditStat() {
return srcFileAuditStat;
}
}
}
Loading