Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: Fix flaky test TestFlinkTableSink #2989

Merged
merged 4 commits into from
Sep 4, 2021
Merged

Conversation

szehon-ho
Copy link
Collaborator

@szehon-ho szehon-ho commented Aug 17, 2021

  • In my observation, new files may be added if pass checkpoint interval (set to 400)

@szehon-ho
Copy link
Collaborator Author

@stevenzwu @openinx can you guys help check my understanding? Thanks

@stevenzwu
Copy link
Contributor

stevenzwu commented Aug 18, 2021

@szehon-ho I am not sure why this failed. maybe it is what you said. records were split across two checkpoint/commit cycles. However, this change will defeat the purpose of this unit test. with hash distribution, this unit test is to make sure all records for the same partition key is distributed to a single writer task and hence should generate one file per partition (per commit cycle).

Here is probably what we should do to fix the flakiness. we should assert that there is only one file per partition for each commit/snapshot.

@szehon-ho
Copy link
Collaborator Author

Yea this makes sense, let me look into it.

@szehon-ho
Copy link
Collaborator Author

@stevenzwu thanks for the review. I put back the one-file-per-partition, per commit/checkpoint. Also I added some test cleanup as well in the failure case.

List<DataFile> dataFiles, PartitionSpec partitionSpec, Map<String, Object> partitionValues) {
Types.StructType spec = partitionSpec.partitionType();
Record partitionRecord = GenericRecord.create(spec).copy(partitionValues);
StructLikeWrapper expected = StructLikeWrapper
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PartitionData implements the equals method. we can construct PartitionData using this API from DataFiles class. not sure if it is better. but at least it is more specific.

  public static PartitionData copy(PartitionSpec spec, StructLike partition) {
    return copyPartitionData(spec, partition, null);
  }

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good idea, but when I tried I found PartitionData is package protected so can't access it here unfortunately.

@stevenzwu
Copy link
Contributor

@szehon-ho thx a lot for fixing the flaky tests. overall, it looks good to me. Left a few minor comments.

@szehon-ho
Copy link
Collaborator Author

@stevenzwu thanks again for taking a look, I updated based on the feedback

Copy link
Contributor

@stevenzwu stevenzwu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. @openinx can you take a look too and merge it when it is good from your side.

Comment on lines 269 to 271
public static Map<Long, List<DataFile>> snapshotToDataFiles(
Table table)
throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: The method format looks quite strange for me, could we format it looks like the following?

  public static Map<Long, List<DataFile>> snapshotToDataFiles(Table table) throws IOException {

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

List<ManifestFile> manifestFiles = table.currentSnapshot().dataManifests();
for (ManifestFile manifestFile : manifestFiles) {
try (ManifestReader<DataFile> reader = ManifestFiles.read(manifestFile, table.io())) {
List<DataFile> dataFiles = IteratorUtils.toList(reader.iterator());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see a warning in IDE here, using the Lists.newArrayList(reader) could fix the warning.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, done

Copy link
Member

@openinx openinx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great to me ! Thanks @szehon-ho for the contribution, and @stevenzwu for the reviewing. I just left few minor comments.

@szehon-ho
Copy link
Collaborator Author

@openinx done, thanks for taking a look!

@openinx
Copy link
Member

openinx commented Sep 4, 2021

LGTM, thanks for the contribution!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants