Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark: Inject DataSourceV2Relation when missing #7910

Closed
wants to merge 9 commits into from

Conversation

Fokko
Copy link
Contributor

@Fokko Fokko commented Jun 26, 2023

When you start a structured streaming query using .start(), there will be no DataSourceV2Relation reference. Therefore the catalog functions won't be available, resulting in:

Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: days(ts) is not currently supported

When this is missing, we'll just create one since we know the table.

Resolves #7226

@Fokko Fokko requested a review from aokolnychyi June 26, 2023 06:43
@Fokko Fokko force-pushed the fd-structured-streaming branch 3 times, most recently from de721ad to caa73de Compare June 26, 2023 07:46
When you start a structured streaming query using `.start()`,
there will be no `DataSourceV2Relation` reference. When this
is missing, we'll just create one since we know the table.

Resolves apache#7226
@Marcus-Rosti
Copy link

Is this fix only for 3.4+?

@Fokko
Copy link
Contributor Author

Fokko commented Aug 8, 2023

@Marcus-Rosti I first wanted to get some feedback on this before also backporting this to older versions of Spark

@aokolnychyi
Copy link
Contributor

I should have some time to review this week. Sorry for the delay!

@aokolnychyi
Copy link
Contributor

It seems like we are trying to fix a bug in Spark, which is beyond Iceberg control. While I don't mind that cause we would have to wait for a new Spark release otherwise, it would be nice to look for a proper fix in Spark that would work without Iceberg extensions.

Let me take a closer look at the Spark side.

case p: WriteToMicroBatchDataSource if p.relation.isEmpty =>
import spark.sessionState.analyzer.CatalogAndIdentifier
val originalMultipartIdentifier = spark.sessionState.sqlParser
.parseMultipartIdentifier(p.table.name())
Copy link
Contributor

@aokolnychyi aokolnychyi Aug 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have doubts that this is generally safe as we assume table.name() would include the catalog name. That's currently the case but it feels fragile. I mentioned a potential fix on the Spark side here. Let me come back with fresh eyes tomorrow.

@Fokko, let me know what you think about fixing it in Spark.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your concern, but as you can see in the test it works quite well. In the smoke test it passes for table, catalog.schema.table, and with s3://bucket/wh/path. Fixing it in Spark could also work, but then I need more pointers on where to start. I looked into the comment on the issue, but it wasn't directly obvious to me. I think we should fix this, looking at how many people are bumping into this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say let's just use the new API in Spark and don't worry about it. I think you already updated the docs to cover that.

@Fokko Fokko closed this Oct 12, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants