-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support structured streaming read for Iceberg #2272
Conversation
afbb6d9
to
7a68114
Compare
Hi @XuQianJin-Stars - is there anything pending in this PR. Pl. let me know if you need any help to push this. Happy to collaborate & contribute. |
Yes, thank you very much, this function is already available in our internal work, and I want to improve this function in the community. |
@XuQianJin-Stars - this is great. Is there anything pending in this PR? Or are you waiting on any inputs? @rdblue @RussellSpitzer @aokolnychyi - can you folks pl. add your review/inputs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me in general given what the tests cover, I will try running this in a cluster to see if there is any issue and reply back later.
CloseableIterator<FileScanTask> taskIter = taskIterable.iterator()) { | ||
try (CloseableIterable<FileScanTask> taskIterable = open(indexedManifests.get(idx).first(), | ||
scanAllFiles)) { | ||
CloseableIterator<FileScanTask> taskIter = taskIterable.iterator(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: just declare it as Iterator
should be enough.
@@ -434,6 +454,35 @@ private static void mergeIcebergHadoopConfs( | |||
return tasks; | |||
} | |||
|
|||
// An extracted method which will be overrided by StreamingReader. This is because the tasks generated by Streaming is | |||
// per batch and cannot be planned like Reader beforehand. | |||
protected boolean checkEnableBatchRead(List<CombinedScanTask> taskList) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with this, enableBatchRead
at L331 can be simplified.
return startingOffset; | ||
} | ||
|
||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we should provide more meaningful doc, otherwise we should just remove it instead of repeating the method name.
* @return MicroBatch of list | ||
*/ | ||
@VisibleForTesting | ||
@SuppressWarnings("checkstyle:HiddenField") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why suppress this warning?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why suppress this warning?
Task :iceberg-spark2:checkstyleMain FAILED
[ant:checkstyle] [ERROR] /opt/sourcecode/iceberg-src/spark2/src/main/java/org/apache/iceberg/spark/source/StreamingReader.java:281:60: 'startOffset' 隐藏属性。 [HiddenField]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think generally we just change the field visibility
987c154
to
bacf28c
Compare
|
||
Configuration conf = new Configuration(lazyBaseConf()); | ||
Table table = getTableAndResolveHadoopConfiguration(options, conf); | ||
String caseSensitive = lazySparkSession().conf().get("spark.sql.caseSensitive"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add a comment about why this is resolved from the session conf instead of the merged session conf / source options?
@@ -127,6 +130,23 @@ public StreamWriter createStreamWriter(String runId, StructType dsStruct, | |||
return new StreamingWriter(table, io, encryptionManager, options, queryId, mode, appId, writeSchema, dsStruct); | |||
} | |||
|
|||
@Override | |||
public MicroBatchReader createMicroBatchReader(Optional<StructType> schema, String checkpointLocation, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So it seems like we're not using the checkpointLocation here, do we not need to store anything to be able to recover on failure? I think this is the case because as we read data it's not like it gets "consumed" but I just want to make sure.
} | ||
|
||
@Override | ||
public void commit(Offset end) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to clean up the cachedPendingBatches here, or would they already have been removed at this point?
I really appreciate the time given to the test case :) |
hi @holdenk Thank you very much for your review, I will reply to your comments later. |
(startOffset.shouldScanAllFiles() || isAppend(table.snapshot(startOffset.snapshotId()))); | ||
} | ||
|
||
private static void assertNoOverwrite(Snapshot snapshot) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we pl. take a configuration whether or not to assert. essentially - we want to factor in cases like
- compaction - which will perform harmless
REPLACE
operations & - gdpr deletes
- which will not impact the structured streaming result
https://docs.microsoft.com/en-us/azure/databricks/delta/delta-streaming#ignore-updates-and-deletes
This work is an extension of the idea in issue apache#179 & the Spark2 work done in PR apache#2272 - only that - this is for Spark3. **In the current implementation:** * Iceberg Snapshot is the upper bound for MicroBatch. A given MicroBatch will only Span within a Snapshot. It will not be composed of multiple Snapshots. BatchSize - is used to limit the number of files with in a given snapshot. * The streaming reader - will error out if it encounters any Snapshot of type NOT EQUAL to type `APPEND`. * Handling `DELETES`, `REPLACE` & `OVERWRITES` is something for future. * Columnar reads are not enabled. Something for future.
870156b
to
3e1cd20
Compare
hi @SreeramGarlapati @holdenk @jackye1995 @rdblue @RussellSpitzer Sorry, it took so long to fix the problem, do you have time to help continue to review this pr? |
Hey folks (incl. @XuQianJin-Stars & @RussellSpitzer ) -- is this something that people are still open to working on? We're running into a sitaution with the current limited streaming support where the lack of |
& @flyrain - what are your thoughts? |
+1, I have a PR out for supporting rate limiting in Spark 3 :
cc @holdenk |
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions. |
This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
An implementation of Spark Structured Streaming Read, to track the current processed files of Iceberg table, This PR is a split of the PR-796 of Structured streaming read for Iceberg.