Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-18592][Connectors/FileSystem] StreamingFileSink fails due to truncating HDFS file failure #16927

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

link3280
Copy link
Contributor

What is the purpose of the change

In the case of HDFS, upon job recovery, StreamingFileSink would not wait for lease recoveries to complete before truncating a file (now it would try to truncate the file after a timeout, no matter if the lease is revoked or not). This may lead to an IOException because the file length could be behind the actual length and the checkpointed length. What's worse, the job may fall into an endless restart loop, because a new invoke of #recoverLease will interrupt the previous one (see HBase's RecoverLeaseFSUtils).

Moreover, we should wait for block recoveries which may be triggered by truncate calls (as mentioned in Hadoop FileSystem Javadoc), before appending to the recovered files.

This PR fixes the problem, but with two hard-coded timeout thresholds since it requires interfaces changes to make these timeouts configurable. If the lease recovery of the block recovery fails, an IOException would be thrown, which triggers a restart of the job.

Brief change log

  • Wait for lease recoveries to complete before truncating in-progress files.
  • Wait for possible block recoveries to complete before appending to recovered files.

Verifying this change

I simply tested it on an HDFS cluster with 3 nodes, but it may require further tests.

This change added tests and can be verified as follows:

  1. Start a Flink job writing recoverable HDFS files.
  2. Manually kill a DateNode which StreamingFileSink is writing to (to trigger lease recovery and block recovery).
  3. Restart the job from the latest successful checkpoint. The files should be properly recovered.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@flinkbot
Copy link
Collaborator

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit d0fbb5a (Sat Aug 21 16:08:14 UTC 2021)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!
  • This pull request references an unassigned Jira ticket. According to the code contribution guide, tickets need to be assigned before starting with the implementation work.

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Aug 21, 2021

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@link3280
Copy link
Contributor Author

link3280 commented Aug 23, 2021

It's a bit weird that CI was good on my forked repo. I'm giving it one more shot.

@link3280
Copy link
Contributor Author

@flinkbot run azure

1 similar comment
@link3280
Copy link
Contributor Author

@flinkbot run azure

String.format(
"Failed to recover blocks for file %s in %d ms.",
path.toString(), BLOCK_RECOVERY_TIMEOUT_MS));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the revokeLeaseByFileSystem() called after blockrecovery?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, we've already triggered a lease recovery, so there's no need to call it again, which would interrupt the previous call.

* state, HDFS will perform block recovery which blocks truncate. Thus we have to wait for block
* recovery to ensure the truncate is successful.
*/
private static boolean waitForBlockRecovery(final FileSystem fs, final Path path)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have one question, why not judge this by isFileClosed(path), such as:

final Deadline deadline = Deadline.now().plus(Duration.ofMillis(LEASE_TIMEOUT));
boolean isClosed = dfs.isFileClosed(path);
while (!isClosed && deadline.hasTimeLeft()) {
  try {
         Thread.sleep(500L);
    } catch (InterruptedException e1) {
         LOG.warn("Interrupted when waiting for block recovery for file {}.", path, e1);
         break;
     }
     isClosed = dfs.isFileClosed(path);
  }
 return isClosed;

If the path is a very large file, LocatedBlocks blocks = dfs.getClient().getLocatedBlocks(absolutePath, 0, Long.MAX_VALUE); will be expensive.

Copy link
Contributor Author

@link3280 link3280 Jan 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not particularly familiar with this API, but I think it is worth a try.

@thebalu
Copy link

thebalu commented Jan 10, 2023

Hi @link3280 ,
do you have plans to finish this PR and get it merged? We have run into the same issue, and it would be nice to use your solution if it's completed.

@link3280
Copy link
Contributor Author

@thebalu Thanks for reaching out. I didn't get this PR merged because the community has no capacity to review it, especially since this PR is hard to test. I would revisit it and make a new PR if possible.

Note that this PR is not a complete solution, because it may take hours for HDFS to recover from node failures sometimes. In that case, it would block Flink for a long time, so we had to give up truncating these files and perform a manual truncation after HDFS's recovery.

@thebalu
Copy link

thebalu commented Jan 16, 2023

Thanks for your answer. Let me know if I can help with testing somehow.

@ferenc-csaky
Copy link
Contributor

@thebalu Thanks for reaching out. I didn't get this PR merged because the community has no capacity to review it, especially since this PR is hard to test. I would revisit it and make a new PR if possible.

Note that this PR is not a complete solution, because it may take hours for HDFS to recover from node failures sometimes. In that case, it would block Flink for a long time, so we had to give up truncating these files and perform a manual truncation after HDFS's recovery.

@link3280 we can help with the testing/review and some push to get this merged, so if you are willing to finish this work, lets collaborate. :) Do you have anything in mind already regarding the missing parts? I think it would make sense to introduce a timeout property for the blocking wait.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants