Skip to content

HBASE-28721 AsyncFSWAL is broken when running against hadoop 3.4.0 #6270

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
Expand Down Expand Up @@ -121,7 +122,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {

private final String src;

private final long fileId;
private HdfsFileStatus stat;

private final ExtendedBlock block;

Expand Down Expand Up @@ -354,14 +355,14 @@ private void setupReceiver(int timeoutMs) {
}

FanOutOneBlockAsyncDFSOutput(Configuration conf, DistributedFileSystem dfs, DFSClient client,
ClientProtocol namenode, String clientName, String src, long fileId, LocatedBlock locatedBlock,
Encryptor encryptor, Map<Channel, DatanodeInfo> datanodeInfoMap, DataChecksum summer,
ByteBufAllocator alloc, StreamSlowMonitor streamSlowMonitor) {
ClientProtocol namenode, String clientName, String src, HdfsFileStatus stat,
LocatedBlock locatedBlock, Encryptor encryptor, Map<Channel, DatanodeInfo> datanodeInfoMap,
DataChecksum summer, ByteBufAllocator alloc, StreamSlowMonitor streamSlowMonitor) {
this.conf = conf;
this.dfs = dfs;
this.client = client;
this.namenode = namenode;
this.fileId = fileId;
this.stat = stat;
this.clientName = clientName;
this.src = src;
this.block = locatedBlock.getBlock();
Expand Down Expand Up @@ -592,7 +593,7 @@ public void recoverAndClose(CancelableProgressable reporter) throws IOException
buf = null;
}
closeDataNodeChannelsAndAwait();
endFileLease(client, fileId);
endFileLease(client, stat);
RecoverLeaseFSUtils.recoverFileLease(dfs, new Path(src), conf,
reporter == null ? new CancelOnClose(client) : reporter);
}
Expand All @@ -607,7 +608,7 @@ public void close() throws IOException {
state = State.CLOSED;
closeDataNodeChannelsAndAwait();
block.setNumBytes(ackedBlockLength);
completeFile(client, namenode, src, clientName, block, fileId);
completeFile(client, namenode, src, clientName, block, stat);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,9 @@ private FanOutOneBlockAsyncDFSOutputHelper() {

private interface LeaseManager {

void begin(DFSClient client, long inodeId);
void begin(DFSClient client, HdfsFileStatus stat);

void end(DFSClient client, long inodeId);
void end(DFSClient client, HdfsFileStatus stat);
}

private static final LeaseManager LEASE_MANAGER;
Expand Down Expand Up @@ -202,7 +202,58 @@ public boolean isClientRunning(DFSClient client) {
};
}

private static LeaseManager createLeaseManager() throws NoSuchMethodException {
private static LeaseManager createLeaseManager3_4() throws NoSuchMethodException {
Method beginFileLeaseMethod =
DFSClient.class.getDeclaredMethod("beginFileLease", String.class, DFSOutputStream.class);
beginFileLeaseMethod.setAccessible(true);
Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", String.class);
endFileLeaseMethod.setAccessible(true);
Method getConfigurationMethod = DFSClient.class.getDeclaredMethod("getConfiguration");
getConfigurationMethod.setAccessible(true);
Method getNamespaceMehtod = HdfsFileStatus.class.getDeclaredMethod("getNamespace");

return new LeaseManager() {

private static final String DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY =
"dfs.client.output.stream.uniq.default.key";
private static final String DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY_DEFAULT = "DEFAULT";

private String getUniqId(DFSClient client, HdfsFileStatus stat)
throws IllegalAccessException, IllegalArgumentException, InvocationTargetException {
// Copied from DFSClient in Hadoop 3.4.0
long fileId = stat.getFileId();
String namespace = (String) getNamespaceMehtod.invoke(stat);
if (namespace == null) {
Configuration conf = (Configuration) getConfigurationMethod.invoke(client);
String defaultKey = conf.get(DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY,
DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY_DEFAULT);
return defaultKey + "_" + fileId;
} else {
return namespace + "_" + fileId;
}
}

@Override
public void begin(DFSClient client, HdfsFileStatus stat) {
try {
beginFileLeaseMethod.invoke(client, getUniqId(client, stat), null);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}

@Override
public void end(DFSClient client, HdfsFileStatus stat) {
try {
endFileLeaseMethod.invoke(client, getUniqId(client, stat));
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}
};
}

private static LeaseManager createLeaseManager3() throws NoSuchMethodException {
Method beginFileLeaseMethod =
DFSClient.class.getDeclaredMethod("beginFileLease", long.class, DFSOutputStream.class);
beginFileLeaseMethod.setAccessible(true);
Expand All @@ -211,25 +262,35 @@ private static LeaseManager createLeaseManager() throws NoSuchMethodException {
return new LeaseManager() {

@Override
public void begin(DFSClient client, long inodeId) {
public void begin(DFSClient client, HdfsFileStatus stat) {
try {
beginFileLeaseMethod.invoke(client, inodeId, null);
beginFileLeaseMethod.invoke(client, stat.getFileId(), null);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}

@Override
public void end(DFSClient client, long inodeId) {
public void end(DFSClient client, HdfsFileStatus stat) {
try {
endFileLeaseMethod.invoke(client, inodeId);
endFileLeaseMethod.invoke(client, stat.getFileId());
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}
};
}

private static LeaseManager createLeaseManager() throws NoSuchMethodException {
try {
return createLeaseManager3_4();
} catch (NoSuchMethodException e) {
LOG.debug("DFSClient::beginFileLease wrong arguments, should be hadoop 3.3 or below");
}

return createLeaseManager3();
}

private static FileCreator createFileCreator3_3() throws NoSuchMethodException {
Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class,
String.class, EnumSetWritable.class, boolean.class, short.class, long.class,
Expand Down Expand Up @@ -320,12 +381,12 @@ public boolean progress() {
}
}

static void beginFileLease(DFSClient client, long inodeId) {
LEASE_MANAGER.begin(client, inodeId);
static void beginFileLease(DFSClient client, HdfsFileStatus stat) {
LEASE_MANAGER.begin(client, stat);
}

static void endFileLease(DFSClient client, long inodeId) {
LEASE_MANAGER.end(client, inodeId);
static void endFileLease(DFSClient client, HdfsFileStatus stat) {
LEASE_MANAGER.end(client, stat);
}

static DataChecksum createChecksum(DFSClient client) {
Expand Down Expand Up @@ -552,7 +613,7 @@ private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem d
throw new NameNodeException(e);
}
}
beginFileLease(client, stat.getFileId());
beginFileLease(client, stat);
boolean succ = false;
LocatedBlock locatedBlock = null;
List<Future<Channel>> futureList = null;
Expand All @@ -576,8 +637,8 @@ private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem d
}
Encryptor encryptor = createEncryptor(conf, stat, client);
FanOutOneBlockAsyncDFSOutput output =
new FanOutOneBlockAsyncDFSOutput(conf, dfs, client, namenode, clientName, src,
stat.getFileId(), locatedBlock, encryptor, datanodes, summer, ALLOC, monitor);
new FanOutOneBlockAsyncDFSOutput(conf, dfs, client, namenode, clientName, src, stat,
locatedBlock, encryptor, datanodes, summer, ALLOC, monitor);
succ = true;
return output;
} catch (RemoteException e) {
Expand Down Expand Up @@ -616,7 +677,7 @@ public void operationComplete(Future<Channel> future) throws Exception {
});
}
}
endFileLease(client, stat.getFileId());
endFileLease(client, stat);
}
}
}
Expand Down Expand Up @@ -654,11 +715,11 @@ public static boolean shouldRetryCreate(RemoteException e) {
}

static void completeFile(DFSClient client, ClientProtocol namenode, String src, String clientName,
ExtendedBlock block, long fileId) {
ExtendedBlock block, HdfsFileStatus stat) {
for (int retry = 0;; retry++) {
try {
if (namenode.complete(src, clientName, block, fileId)) {
endFileLease(client, fileId);
if (namenode.complete(src, clientName, block, stat.getFileId())) {
endFileLease(client, stat);
return;
} else {
LOG.warn("complete file " + src + " not finished, retry = " + retry);
Expand Down