-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flink: Fix flaky test TestFlinkTableSink #2989
Conversation
szehon-ho
commented
Aug 17, 2021
•
edited
Loading
edited
- In my observation, new files may be added if pass checkpoint interval (set to 400)
@stevenzwu @openinx can you guys help check my understanding? Thanks |
@szehon-ho I am not sure why this failed. maybe it is what you said. records were split across two checkpoint/commit cycles. However, this change will defeat the purpose of this unit test. with hash distribution, this unit test is to make sure all records for the same partition key is distributed to a single writer task and hence should generate one file per partition (per commit cycle). Here is probably what we should do to fix the flakiness. we should assert that there is only one file per partition for each commit/snapshot. |
Yea this makes sense, let me look into it. |
…errors (test_partition already exists)
@stevenzwu thanks for the review. I put back the one-file-per-partition, per commit/checkpoint. Also I added some test cleanup as well in the failure case. |
flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
Outdated
Show resolved
Hide resolved
flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
Outdated
Show resolved
Hide resolved
flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
Outdated
Show resolved
Hide resolved
List<DataFile> dataFiles, PartitionSpec partitionSpec, Map<String, Object> partitionValues) { | ||
Types.StructType spec = partitionSpec.partitionType(); | ||
Record partitionRecord = GenericRecord.create(spec).copy(partitionValues); | ||
StructLikeWrapper expected = StructLikeWrapper |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PartitionData
implements the equals
method. we can construct PartitionData
using this API from DataFiles
class. not sure if it is better. but at least it is more specific.
public static PartitionData copy(PartitionSpec spec, StructLike partition) {
return copyPartitionData(spec, partition, null);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good idea, but when I tried I found PartitionData is package protected so can't access it here unfortunately.
flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
Outdated
Show resolved
Hide resolved
@szehon-ho thx a lot for fixing the flaky tests. overall, it looks good to me. Left a few minor comments. |
dd93f7b
to
3269024
Compare
@stevenzwu thanks again for taking a look, I updated based on the feedback |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. @openinx can you take a look too and merge it when it is good from your side.
public static Map<Long, List<DataFile>> snapshotToDataFiles( | ||
Table table) | ||
throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: The method format looks quite strange for me, could we format it looks like the following?
public static Map<Long, List<DataFile>> snapshotToDataFiles(Table table) throws IOException {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
List<ManifestFile> manifestFiles = table.currentSnapshot().dataManifests(); | ||
for (ManifestFile manifestFile : manifestFiles) { | ||
try (ManifestReader<DataFile> reader = ManifestFiles.read(manifestFile, table.io())) { | ||
List<DataFile> dataFiles = IteratorUtils.toList(reader.iterator()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see a warning in IDE here, using the Lists.newArrayList(reader)
could fix the warning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great to me ! Thanks @szehon-ho for the contribution, and @stevenzwu for the reviewing. I just left few minor comments.
@openinx done, thanks for taking a look! |
LGTM, thanks for the contribution! |