Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: Watermark read options #9346

Merged
merged 3 commits into from
Jan 9, 2024
Merged

Conversation

rodmeneses
Copy link
Contributor

@rodmeneses rodmeneses commented Dec 19, 2023

  1. Adds read option to specify watermark-column for Flink Iceberg connector
  2. Adds read option to specify watermark-column-time-unit for Flink Iceberg connector
    Now it is possible to pass the above parameters in the SQL statement:
select * from t /*+ OPTIONS('watermark-column'='t2','watermark-column-time-unit'='MILLISECONDS')*/

@rodmeneses rodmeneses changed the title Water mark flink options Flink: Watermark read options Dec 19, 2023
docs/flink-configuration.md Outdated Show resolved Hide resolved
@rodmeneses rodmeneses force-pushed the waterMarkFlinkOptions branch 2 times, most recently from 3afa06b to e0cf6ef Compare December 20, 2023 17:46
docs/flink-configuration.md Outdated Show resolved Hide resolved
@@ -94,7 +94,7 @@ private FlinkConfigOptions() {}
public static final ConfigOption<SplitAssignerType> TABLE_EXEC_SPLIT_ASSIGNER_TYPE =
ConfigOptions.key("table.exec.iceberg.split-assigner-type")
.enumType(SplitAssignerType.class)
.defaultValue(SplitAssignerType.SIMPLE)
.defaultValue(null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this intended? This would be a breaking change if a user did not provide a split assigner type before

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch.
I changed that to null, as we had an assertion before:

      if (splitAssignerFactory != null) {
        Preconditions.checkArgument(
            watermarkColumn == null,
            "Watermark column and SplitAssigner should not be set in the same source");
      }

however, after my work on this PR, it is safe to remove the above assertion, since we will override the factory with a OrderedSplitAssignerFactory when the watermarkColumn is specified

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. It would be good to update the docs to reveal how the default is determined

Copy link
Contributor Author

@rodmeneses rodmeneses Dec 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm updating the doc entry for watermark-column:

Specifies the watermark column to use for watermark generation. If this option is present, the splitAssignerFactory will be overriden with OrderedSplitAssignerFactory.

wdyt? @mas-chen @pvary

Copy link
Contributor

@mas-chen mas-chen Dec 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If users have not opt-ed into the new feature, I would expect their SQL queries to still work. Can you add tests to ensure that?

After some thinking, I'm totally not sure about the default change. Couldn't it be possible to use the watermark column, without the ordered split assigner factory?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not according to the code.

   if (watermarkColumn != null) {
        // Column statistics is needed for watermark generation
        context = context.copyWithColumnStat(watermarkColumn);
        SplitWatermarkExtractor watermarkExtractor =
            new ColumnStatsWatermarkExtractor(icebergSchema, watermarkColumn, watermarkTimeUnit);
        emitter = SerializableRecordEmitter.emitterWithWatermark(watermarkExtractor);
        splitAssignerFactory =
            new OrderedSplitAssignerFactory(SplitComparators.watermark(watermarkExtractor));
      }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc: @pvary

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the splits are not ordered, then we will have fluctuating watermarks. We do not emit those, which are not in order, but beats the purpose of the whole watermark generation feature.
Imagine a situation where we reading time series data, and read the latest file first. Every other file will contain late data in this case, and might be dropped.

So while technically possible, I rather not allow the users to shoot themselves in the foot.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If users have not opt-ed into the new feature, I would expect their SQL queries to still work. Can you add tests to ensure that?

Hi! I didn't get your suggestion about the unit test. Would you please rephrase? Thanks @mas-chen

@rodmeneses rodmeneses force-pushed the waterMarkFlinkOptions branch 4 times, most recently from 9c73d9e to 0eba14a Compare December 21, 2023 19:54
@rodmeneses rodmeneses force-pushed the waterMarkFlinkOptions branch 2 times, most recently from 6583717 to 344325e Compare January 4, 2024 16:41
@rodmeneses rodmeneses force-pushed the waterMarkFlinkOptions branch 3 times, most recently from ba4204b to 5559cda Compare January 8, 2024 21:38
@rodmeneses rodmeneses force-pushed the waterMarkFlinkOptions branch 2 times, most recently from 4b8c558 to 721ece6 Compare January 8, 2024 21:46
@rodmeneses rodmeneses force-pushed the waterMarkFlinkOptions branch 4 times, most recently from 83144b9 to 185171f Compare January 8, 2024 21:59
recordsDataFile1.add(file1Record1);
recordsDataFile1.add(file1Record2);
DataFile dataFile1 = helper.writeFile(recordsDataFile1);
// File 2 - old timestamps, old longs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is not correct:

// File 2 - late timestamps, old longs

recordsDataFile2.add(file2Record1);
recordsDataFile2.add(file2Record2);

// early1 - early2 -- late1 late 2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not get this comment.
Maybe something like this - feel free to reword if you feel so:

// Expected records if the splits are ordered
//     - ascending (watermark from t1) - records from the split with early timestamps then records from the split with late timestamps
//     - descending (watermark from t2) - records from the split with old longs then records from the split with new longs

Copy link
Contributor

@pvary pvary left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some minor comments for comments 😄
Otherwise +1 LGTM

@pvary pvary merged commit 4d34398 into apache:main Jan 9, 2024
13 checks passed
@pvary
Copy link
Contributor

pvary commented Jan 9, 2024

Thanks @rodmeneses for the PR, and @stevenzwu and @mas-chen for the review!

geruh pushed a commit to geruh/iceberg that referenced this pull request Jan 26, 2024
adnanhemani pushed a commit to adnanhemani/iceberg that referenced this pull request Jan 30, 2024
devangjhabakh pushed a commit to cdouglas/iceberg that referenced this pull request Apr 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants