Skip to content

[FLINK-18592][Connectors/FileSystem] StreamingFileSink fails due to truncating HDFS file failure #16927

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.viewfs.ViewFileSystem;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.util.VersionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileNotFoundException;
import java.io.IOException;
Expand All @@ -53,7 +56,9 @@
@Internal
class HadoopRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {

private static final long LEASE_TIMEOUT = 100_000L;
private static final long LEASE_RECOVERY_TIMEOUT_MS = 900_000L;

private static final long BLOCK_RECOVERY_TIMEOUT_MS = 300_000L;

private static Method truncateHandle;

Expand All @@ -65,6 +70,9 @@ class HadoopRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream

private final FSDataOutputStream out;

private static final Logger LOG =
LoggerFactory.getLogger(HadoopRecoverableFsDataOutputStream.class);

HadoopRecoverableFsDataOutputStream(FileSystem fs, Path targetFile, Path tempFile)
throws IOException {

Expand Down Expand Up @@ -160,7 +168,17 @@ private static void safelyTruncateFile(

ensureTruncateInitialized();

revokeLeaseByFileSystem(fileSystem, path);
LOG.debug("Truncating file {}.", path);
long start = System.currentTimeMillis();

boolean isLeaseReleased = revokeLeaseByFileSystem(fileSystem, path);
Preconditions.checkState(
isLeaseReleased,
"Failed to release lease on file `%s` in %d ms.",
path.toString(),
LEASE_RECOVERY_TIMEOUT_MS);
long leaseRecoveredTime = System.currentTimeMillis();
LOG.debug("Lease recovery for file {} take {} ms.", path, leaseRecoveredTime - start);

// truncate back and append
boolean truncated;
Expand All @@ -171,10 +189,20 @@ private static void safelyTruncateFile(
}

if (!truncated) {
// Truncate did not complete immediately, we must wait for
// the operation to complete and release the lease.
revokeLeaseByFileSystem(fileSystem, path);
// Truncate does not complete immediately if block recovery is needed,
// thus wait for block recovery
LOG.debug("Waiting for block recovery for file {}.", path);
boolean recovered = waitForBlockRecovery(fileSystem, path);
if (!recovered) {
throw new IOException(
String.format(
"Failed to recover blocks for file %s in %d ms.",
path.toString(), BLOCK_RECOVERY_TIMEOUT_MS));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the revokeLeaseByFileSystem() called after blockrecovery?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, we've already triggered a lease recovery, so there's no need to call it again, which would interrupt the previous call.

}

long truncateFinishedTime = System.currentTimeMillis();
LOG.debug("File {} recovered in {} ms.", path, truncateFinishedTime - start);
}

private static void ensureTruncateInitialized() throws FlinkRuntimeException {
Expand Down Expand Up @@ -338,6 +366,9 @@ private static boolean revokeLeaseByFileSystem(final FileSystem fs, final Path p
* <p>The lease of the file we are resuming writing/committing to may still belong to the
* process that failed previously and whose state we are recovering.
*
* <p>The recovery process would be fast in most cases, but in case of the file's primary node
* crashes, the NameNode must wait for a socket timeout which is 10 min by default.
*
* @param path The path to the file we want to resume writing to.
*/
private static boolean waitUntilLeaseIsRevoked(final FileSystem fs, final Path path)
Expand All @@ -347,7 +378,7 @@ private static boolean waitUntilLeaseIsRevoked(final FileSystem fs, final Path p
final DistributedFileSystem dfs = (DistributedFileSystem) fs;
dfs.recoverLease(path);

final Deadline deadline = Deadline.now().plus(Duration.ofMillis(LEASE_TIMEOUT));
final Deadline deadline = Deadline.now().plus(Duration.ofMillis(LEASE_RECOVERY_TIMEOUT_MS));

boolean isClosed = dfs.isFileClosed(path);
while (!isClosed && deadline.hasTimeLeft()) {
Expand All @@ -360,4 +391,37 @@ private static boolean waitUntilLeaseIsRevoked(final FileSystem fs, final Path p
}
return isClosed;
}

/**
* If the last block of the file that the previous execution was writing to is not in COMPLETE
* state, HDFS will perform block recovery which blocks truncate. Thus we have to wait for block
* recovery to ensure the truncate is successful.
*/
private static boolean waitForBlockRecovery(final FileSystem fs, final Path path)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have one question, why not judge this by isFileClosed(path), such as:

final Deadline deadline = Deadline.now().plus(Duration.ofMillis(LEASE_TIMEOUT));
boolean isClosed = dfs.isFileClosed(path);
while (!isClosed && deadline.hasTimeLeft()) {
  try {
         Thread.sleep(500L);
    } catch (InterruptedException e1) {
         LOG.warn("Interrupted when waiting for block recovery for file {}.", path, e1);
         break;
     }
     isClosed = dfs.isFileClosed(path);
  }
 return isClosed;

If the path is a very large file, LocatedBlocks blocks = dfs.getClient().getLocatedBlocks(absolutePath, 0, Long.MAX_VALUE); will be expensive.

Copy link
Contributor Author

@link3280 link3280 Jan 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not particularly familiar with this API, but I think it is worth a try.

throws IOException {
Preconditions.checkState(fs instanceof DistributedFileSystem);

final DistributedFileSystem dfs = (DistributedFileSystem) fs;

final Deadline deadline = Deadline.now().plus(Duration.ofMillis(BLOCK_RECOVERY_TIMEOUT_MS));

boolean success = false;
while (deadline.hasTimeLeft()) {
String absolutePath = Path.getPathWithoutSchemeAndAuthority(path).toString();
LocatedBlocks blocks =
dfs.getClient().getLocatedBlocks(absolutePath, 0, Long.MAX_VALUE);
boolean noLastBlock = blocks.getLastLocatedBlock() == null;
if (!blocks.isUnderConstruction() && (noLastBlock || blocks.isLastBlockComplete())) {
success = true;
break;
}
try {
Thread.sleep(500L);
} catch (InterruptedException e) {
LOG.warn("Interrupted when waiting for block recovery for file {}.", path);
break;
}
}
return success;
}
}