Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support structured streaming read for Iceberg #2272

Closed
wants to merge 6 commits into from

Conversation

XuQianJin-Stars
Copy link
Contributor

An implementation of Spark Structured Streaming Read, to track the current processed files of Iceberg table, This PR is a split of the PR-796 of Structured streaming read for Iceberg.

@SreeramGarlapati
Copy link
Collaborator

Hi @XuQianJin-Stars - is there anything pending in this PR. Pl. let me know if you need any help to push this. Happy to collaborate & contribute.

@XuQianJin-Stars
Copy link
Contributor Author

Hi @XuQianJin-Stars - is there anything pending in this PR. Pl. let me know if you need any help to push this. Happy to collaborate & contribute.

Yes, thank you very much, this function is already available in our internal work, and I want to improve this function in the community.

@SreeramGarlapati
Copy link
Collaborator

SreeramGarlapati commented Apr 19, 2021

@XuQianJin-Stars - this is great. Is there anything pending in this PR? Or are you waiting on any inputs?
Thanks a lot for your contribution.

@rdblue @RussellSpitzer @aokolnychyi - can you folks pl. add your review/inputs.
We are in need of this change - truly appreciate your help.

Copy link
Contributor

@jackye1995 jackye1995 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me in general given what the tests cover, I will try running this in a cluster to see if there is any issue and reply back later.

CloseableIterator<FileScanTask> taskIter = taskIterable.iterator()) {
try (CloseableIterable<FileScanTask> taskIterable = open(indexedManifests.get(idx).first(),
scanAllFiles)) {
CloseableIterator<FileScanTask> taskIter = taskIterable.iterator();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: just declare it as Iterator should be enough.

@@ -434,6 +454,35 @@ private static void mergeIcebergHadoopConfs(
return tasks;
}

// An extracted method which will be overrided by StreamingReader. This is because the tasks generated by Streaming is
// per batch and cannot be planned like Reader beforehand.
protected boolean checkEnableBatchRead(List<CombinedScanTask> taskList) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with this, enableBatchRead at L331 can be simplified.

return startingOffset;
}

/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we should provide more meaningful doc, otherwise we should just remove it instead of repeating the method name.

* @return MicroBatch of list
*/
@VisibleForTesting
@SuppressWarnings("checkstyle:HiddenField")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why suppress this warning?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why suppress this warning?

Task :iceberg-spark2:checkstyleMain FAILED
[ant:checkstyle] [ERROR] /opt/sourcecode/iceberg-src/spark2/src/main/java/org/apache/iceberg/spark/source/StreamingReader.java:281:60: 'startOffset' 隐藏属性。 [HiddenField]

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think generally we just change the field visibility


Configuration conf = new Configuration(lazyBaseConf());
Table table = getTableAndResolveHadoopConfiguration(options, conf);
String caseSensitive = lazySparkSession().conf().get("spark.sql.caseSensitive");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a comment about why this is resolved from the session conf instead of the merged session conf / source options?

@@ -127,6 +130,23 @@ public StreamWriter createStreamWriter(String runId, StructType dsStruct,
return new StreamingWriter(table, io, encryptionManager, options, queryId, mode, appId, writeSchema, dsStruct);
}

@Override
public MicroBatchReader createMicroBatchReader(Optional<StructType> schema, String checkpointLocation,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it seems like we're not using the checkpointLocation here, do we not need to store anything to be able to recover on failure? I think this is the case because as we read data it's not like it gets "consumed" but I just want to make sure.

}

@Override
public void commit(Offset end) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to clean up the cachedPendingBatches here, or would they already have been removed at this point?

@holdenk
Copy link
Contributor

holdenk commented May 11, 2021

I really appreciate the time given to the test case :)

@XuQianJin-Stars
Copy link
Contributor Author

hi @holdenk Thank you very much for your review, I will reply to your comments later.

(startOffset.shouldScanAllFiles() || isAppend(table.snapshot(startOffset.snapshotId())));
}

private static void assertNoOverwrite(Snapshot snapshot) {
Copy link
Collaborator

@SreeramGarlapati SreeramGarlapati May 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we pl. take a configuration whether or not to assert. essentially - we want to factor in cases like

  1. compaction - which will perform harmless REPLACE operations &
  2. gdpr deletes
  • which will not impact the structured streaming result

https://docs.microsoft.com/en-us/azure/databricks/delta/delta-streaming#ignore-updates-and-deletes

SreeramGarlapati added a commit to SreeramGarlapati/iceberg that referenced this pull request Jun 2, 2021
This work is an extension of the idea in issue apache#179 & the Spark2 work done in PR apache#2272 - only that - this is for Spark3.

**In the current implementation:**
* Iceberg Snapshot is the upper bound for MicroBatch. A given MicroBatch will only Span within a Snapshot. It will not be composed of multiple Snapshots. BatchSize - is used to limit the number of files with in a given snapshot.
* The streaming reader - will error out if it encounters any Snapshot of type NOT EQUAL to type `APPEND`. 
* Handling `DELETES`, `REPLACE` & `OVERWRITES` is something for future.
* Columnar reads are not enabled. Something for future.
@XuQianJin-Stars
Copy link
Contributor Author

XuQianJin-Stars commented Jul 15, 2021

hi @SreeramGarlapati @holdenk @jackye1995 @rdblue @RussellSpitzer Sorry, it took so long to fix the problem, do you have time to help continue to review this pr?

@holdenk
Copy link
Contributor

holdenk commented Sep 13, 2022

Hey folks (incl. @XuQianJin-Stars & @RussellSpitzer ) -- is this something that people are still open to working on? We're running into a sitaution with the current limited streaming support where the lack of maxFilesPerTrigger (or it's equivelent) which is included in this PR keeps us from being able to do combined historical + streaming reads from Iceberg tables.

@holdenk
Copy link
Contributor

holdenk commented Sep 14, 2022

& @flyrain - what are your thoughts?

@singhpk234
Copy link
Contributor

is this something that people are still open to working on?

+1, I have a PR out for supporting rate limiting in Spark 3 :

cc @holdenk

Copy link

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions.

@github-actions github-actions bot added the stale label Jul 27, 2024
Copy link

github-actions bot commented Aug 3, 2024

This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@github-actions github-actions bot closed this Aug 3, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants