-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flink: Support nested projection #3991
Conversation
2c76f58
to
2481d05
Compare
2481d05
to
19ca54e
Compare
ping @stevenzwu , cloud you please take a look? |
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java
Outdated
Show resolved
Hide resolved
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java
Outdated
Show resolved
Hide resolved
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java
Outdated
Show resolved
Hide resolved
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/RowDataNestProjection.java
Outdated
Show resolved
Hide resolved
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/RowDataNestProjection.java
Outdated
Show resolved
Hide resolved
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/RowDataNestProjection.java
Outdated
Show resolved
Hide resolved
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/RowDataNestProjection.java
Outdated
Show resolved
Hide resolved
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
Outdated
Show resolved
Hide resolved
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/util/FlinkCompatibilityUtil.java
Show resolved
Hide resolved
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
Outdated
Show resolved
Hide resolved
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/RowDataNestProjection.java
Outdated
Show resolved
Hide resolved
a3ecbb5
to
fbd0bbb
Compare
68abcf8
to
08e548d
Compare
97e62e9
to
44206f2
Compare
b966083
to
1805d20
Compare
Hi @openinx, according to the feedback from our team's internal trial, I have updated a new version. When you are free, would you please help to have a look? thank you. 😃 |
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java
Outdated
Show resolved
Hide resolved
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java
Outdated
Show resolved
Hide resolved
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java
Outdated
Show resolved
Hide resolved
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java
Outdated
Show resolved
Hide resolved
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
Outdated
Show resolved
Hide resolved
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
Outdated
Show resolved
Hide resolved
@@ -208,21 +218,30 @@ public FlinkInputFormat buildFormat() { | |||
|
|||
if (projectedSchema == null) { | |||
contextBuilder.project(icebergSchema); | |||
} else if (projectedFields != null) { | |||
// Push down the nested projection so that don't need to get extra fields when get data from files. | |||
Schema icebergProjectionSchema = projectSchema(icebergSchema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Push the nesting down so that when try to get data, no longer get extra fields.
// Push down the nested projection so that don't need to get extra fields when get data from files. | ||
Schema icebergProjectionSchema = projectSchema(icebergSchema); | ||
contextBuilder.project(icebergProjectionSchema); | ||
projectedFields = mappingNestProjectedFields(icebergProjectionSchema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Get the mapping between the underlying iceberg nested schema and the Flink source output schema.
return getters[pos].getFieldOrNull(rowData); | ||
return valueCache.computeIfAbsent(pos, key -> { | ||
Object value = null; | ||
if (projectedFields != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fetch data from iceberg nested struct data according to projectedFields
and the field pos in flink output schema
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java
Outdated
Show resolved
Hide resolved
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java
Outdated
Show resolved
Hide resolved
hi @openinx , could you please take a look when you are available? |
f00e08f
to
7adb426
Compare
7adb426
to
5065ac1
Compare
cc @stevenzwu @Fokko @chenjunjiedada @singhpk234 , could you please take a look when you get a chance? |
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions. |
This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
Supports nested projections in flink.
Add a transform action in RowDataFileScanTaskReade, flatten the nested data obtained from the file based on the field index paths.