-
Notifications
You must be signed in to change notification settings - Fork 13.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-18592][Connectors/FileSystem] StreamingFileSink fails due to truncating HDFS file failure #16927
base: master
Are you sure you want to change the base?
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit d0fbb5a (Sat Aug 21 16:08:14 UTC 2021) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
It's a bit weird that CI was good on my forked repo. I'm giving it one more shot. |
@flinkbot run azure |
1 similar comment
@flinkbot run azure |
...p-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
Outdated
Show resolved
Hide resolved
String.format( | ||
"Failed to recover blocks for file %s in %d ms.", | ||
path.toString(), BLOCK_RECOVERY_TIMEOUT_MS)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the revokeLeaseByFileSystem() called after blockrecovery?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO, we've already triggered a lease recovery, so there's no need to call it again, which would interrupt the previous call.
* state, HDFS will perform block recovery which blocks truncate. Thus we have to wait for block | ||
* recovery to ensure the truncate is successful. | ||
*/ | ||
private static boolean waitForBlockRecovery(final FileSystem fs, final Path path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have one question, why not judge this by isFileClosed(path)
, such as:
final Deadline deadline = Deadline.now().plus(Duration.ofMillis(LEASE_TIMEOUT));
boolean isClosed = dfs.isFileClosed(path);
while (!isClosed && deadline.hasTimeLeft()) {
try {
Thread.sleep(500L);
} catch (InterruptedException e1) {
LOG.warn("Interrupted when waiting for block recovery for file {}.", path, e1);
break;
}
isClosed = dfs.isFileClosed(path);
}
return isClosed;
If the path is a very large file, LocatedBlocks blocks = dfs.getClient().getLocatedBlocks(absolutePath, 0, Long.MAX_VALUE);
will be expensive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not particularly familiar with this API, but I think it is worth a try.
Hi @link3280 , |
@thebalu Thanks for reaching out. I didn't get this PR merged because the community has no capacity to review it, especially since this PR is hard to test. I would revisit it and make a new PR if possible. Note that this PR is not a complete solution, because it may take hours for HDFS to recover from node failures sometimes. In that case, it would block Flink for a long time, so we had to give up truncating these files and perform a manual truncation after HDFS's recovery. |
Thanks for your answer. Let me know if I can help with testing somehow. |
@link3280 we can help with the testing/review and some push to get this merged, so if you are willing to finish this work, lets collaborate. :) Do you have anything in mind already regarding the missing parts? I think it would make sense to introduce a timeout property for the blocking wait. |
What is the purpose of the change
In the case of HDFS, upon job recovery, StreamingFileSink would not wait for lease recoveries to complete before truncating a file (now it would try to truncate the file after a timeout, no matter if the lease is revoked or not). This may lead to an IOException because the file length could be behind the actual length and the checkpointed length. What's worse, the job may fall into an endless restart loop, because a new invoke of #recoverLease will interrupt the previous one (see HBase's RecoverLeaseFSUtils).
Moreover, we should wait for block recoveries which may be triggered by truncate calls (as mentioned in Hadoop FileSystem Javadoc), before appending to the recovered files.
This PR fixes the problem, but with two hard-coded timeout thresholds since it requires interfaces changes to make these timeouts configurable. If the lease recovery of the block recovery fails, an IOException would be thrown, which triggers a restart of the job.
Brief change log
Verifying this change
I simply tested it on an HDFS cluster with 3 nodes, but it may require further tests.
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation