-
Notifications
You must be signed in to change notification settings - Fork 13.7k
[FLINK-18592][Connectors/FileSystem] StreamingFileSink fails due to truncating HDFS file failure #16927
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-18592][Connectors/FileSystem] StreamingFileSink fails due to truncating HDFS file failure #16927
Changes from all commits
d0fbb5a
eb74618
d2d4f64
92a72d0
f6eeaac
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,7 +35,10 @@ | |
import org.apache.hadoop.fs.Path; | ||
import org.apache.hadoop.fs.viewfs.ViewFileSystem; | ||
import org.apache.hadoop.hdfs.DistributedFileSystem; | ||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; | ||
import org.apache.hadoop.util.VersionInfo; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.io.FileNotFoundException; | ||
import java.io.IOException; | ||
|
@@ -53,7 +56,9 @@ | |
@Internal | ||
class HadoopRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream { | ||
|
||
private static final long LEASE_TIMEOUT = 100_000L; | ||
private static final long LEASE_RECOVERY_TIMEOUT_MS = 900_000L; | ||
|
||
private static final long BLOCK_RECOVERY_TIMEOUT_MS = 300_000L; | ||
|
||
private static Method truncateHandle; | ||
|
||
|
@@ -65,6 +70,9 @@ class HadoopRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream | |
|
||
private final FSDataOutputStream out; | ||
|
||
private static final Logger LOG = | ||
LoggerFactory.getLogger(HadoopRecoverableFsDataOutputStream.class); | ||
|
||
HadoopRecoverableFsDataOutputStream(FileSystem fs, Path targetFile, Path tempFile) | ||
throws IOException { | ||
|
||
|
@@ -160,7 +168,17 @@ private static void safelyTruncateFile( | |
|
||
ensureTruncateInitialized(); | ||
|
||
revokeLeaseByFileSystem(fileSystem, path); | ||
LOG.debug("Truncating file {}.", path); | ||
long start = System.currentTimeMillis(); | ||
|
||
boolean isLeaseReleased = revokeLeaseByFileSystem(fileSystem, path); | ||
Preconditions.checkState( | ||
isLeaseReleased, | ||
"Failed to release lease on file `%s` in %d ms.", | ||
path.toString(), | ||
LEASE_RECOVERY_TIMEOUT_MS); | ||
long leaseRecoveredTime = System.currentTimeMillis(); | ||
LOG.debug("Lease recovery for file {} take {} ms.", path, leaseRecoveredTime - start); | ||
|
||
// truncate back and append | ||
boolean truncated; | ||
|
@@ -171,10 +189,20 @@ private static void safelyTruncateFile( | |
} | ||
|
||
if (!truncated) { | ||
// Truncate did not complete immediately, we must wait for | ||
// the operation to complete and release the lease. | ||
revokeLeaseByFileSystem(fileSystem, path); | ||
// Truncate does not complete immediately if block recovery is needed, | ||
// thus wait for block recovery | ||
LOG.debug("Waiting for block recovery for file {}.", path); | ||
boolean recovered = waitForBlockRecovery(fileSystem, path); | ||
if (!recovered) { | ||
throw new IOException( | ||
String.format( | ||
"Failed to recover blocks for file %s in %d ms.", | ||
path.toString(), BLOCK_RECOVERY_TIMEOUT_MS)); | ||
} | ||
} | ||
|
||
long truncateFinishedTime = System.currentTimeMillis(); | ||
LOG.debug("File {} recovered in {} ms.", path, truncateFinishedTime - start); | ||
} | ||
|
||
private static void ensureTruncateInitialized() throws FlinkRuntimeException { | ||
|
@@ -338,6 +366,9 @@ private static boolean revokeLeaseByFileSystem(final FileSystem fs, final Path p | |
* <p>The lease of the file we are resuming writing/committing to may still belong to the | ||
* process that failed previously and whose state we are recovering. | ||
* | ||
* <p>The recovery process would be fast in most cases, but in case of the file's primary node | ||
* crashes, the NameNode must wait for a socket timeout which is 10 min by default. | ||
* | ||
* @param path The path to the file we want to resume writing to. | ||
*/ | ||
private static boolean waitUntilLeaseIsRevoked(final FileSystem fs, final Path path) | ||
|
@@ -347,7 +378,7 @@ private static boolean waitUntilLeaseIsRevoked(final FileSystem fs, final Path p | |
final DistributedFileSystem dfs = (DistributedFileSystem) fs; | ||
dfs.recoverLease(path); | ||
|
||
final Deadline deadline = Deadline.now().plus(Duration.ofMillis(LEASE_TIMEOUT)); | ||
final Deadline deadline = Deadline.now().plus(Duration.ofMillis(LEASE_RECOVERY_TIMEOUT_MS)); | ||
|
||
boolean isClosed = dfs.isFileClosed(path); | ||
while (!isClosed && deadline.hasTimeLeft()) { | ||
|
@@ -360,4 +391,37 @@ private static boolean waitUntilLeaseIsRevoked(final FileSystem fs, final Path p | |
} | ||
return isClosed; | ||
} | ||
|
||
/** | ||
* If the last block of the file that the previous execution was writing to is not in COMPLETE | ||
* state, HDFS will perform block recovery which blocks truncate. Thus we have to wait for block | ||
* recovery to ensure the truncate is successful. | ||
*/ | ||
private static boolean waitForBlockRecovery(final FileSystem fs, final Path path) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have one question, why not judge this by
If the path is a very large file, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not particularly familiar with this API, but I think it is worth a try. |
||
throws IOException { | ||
Preconditions.checkState(fs instanceof DistributedFileSystem); | ||
|
||
final DistributedFileSystem dfs = (DistributedFileSystem) fs; | ||
|
||
final Deadline deadline = Deadline.now().plus(Duration.ofMillis(BLOCK_RECOVERY_TIMEOUT_MS)); | ||
|
||
boolean success = false; | ||
while (deadline.hasTimeLeft()) { | ||
String absolutePath = Path.getPathWithoutSchemeAndAuthority(path).toString(); | ||
LocatedBlocks blocks = | ||
dfs.getClient().getLocatedBlocks(absolutePath, 0, Long.MAX_VALUE); | ||
boolean noLastBlock = blocks.getLastLocatedBlock() == null; | ||
if (!blocks.isUnderConstruction() && (noLastBlock || blocks.isLastBlockComplete())) { | ||
success = true; | ||
break; | ||
} | ||
try { | ||
Thread.sleep(500L); | ||
} catch (InterruptedException e) { | ||
LOG.warn("Interrupted when waiting for block recovery for file {}.", path); | ||
break; | ||
} | ||
} | ||
return success; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the revokeLeaseByFileSystem() called after blockrecovery?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO, we've already triggered a lease recovery, so there's no need to call it again, which would interrupt the previous call.