Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] Handle Iceberg overwrite and delete snapshots to prevent index refresh failure #708

Open
dai-chen opened this issue Sep 27, 2024 · 1 comment
Labels
DataSource:Iceberg enhancement New feature or request

Comments

@dai-chen
Copy link
Collaborator

dai-chen commented Sep 27, 2024

Is your feature request related to a problem?

When working with Iceberg tables in Spark streaming jobs, the stream will terminate with an error if there are updated or deleted rows in the Iceberg table. Specifically, Iceberg throws an exception when an overwrite snapshot is encountered, which causes the Spark streaming job to fail.

What solution would you like?

The Flint index refresh job should be able to handle updated or deleted rows gracefully by using the options streaming-skip-overwrite-snapshots=true and streaming-skip-delete-snapshots=true to avoid termination. These options should be set by default for use cases involving streaming and incremental updates, allowing the job to continue processing without manual intervention.

If we pursue this approach, we need to determine how to elegantly configure the source operator when creating a streaming job. Currently, we have the FlintSparkSourceRelationProvider, which is primarily used for query rewriting. Additionally, we should consider configuring other defaults, such as maxFilesPerTrigger, which can help speed up progress and generate results more quickly for Flint materialized view refreshes.

What alternatives have you considered?

Alternatively, ensure that these options are well-documented and easily discoverable. So users can set them manually by extraOptions in index options in create index statement and avoid missing this critical step. Clear guidance would help users avoid job failures caused by unhandled overwrite or delete snapshots.

Do you have any additional context?

  1. Related to [FEATURE] Support refreshing mutable source data in Flint index #700
  2. Example error:
scala> spark.readStream.table("myglue.default.iceberg_test")
  .writeStream
  .outputMode("update")
  .format("console")
  .trigger(Trigger.ProcessingTime("5 seconds"))
  .option("checkpointLocation", "s3://checkpoint_1")
  .start()

# Append snapshot can succeed
scala> sql("INSERT INTO myglue.default.iceberg_test values (8, 'h', 8.0)")
res26: org.apache.spark.sql.DataFrame = []

Batch: 2
-------------------------------------------
+---+----+---+
| id|data|col|
+---+----+---+
|  8|   h|8.0|
+---+----+---+

# Overwrite snapshot caused streaming job to terminate
scala> sql("UPDATE myglue.default.iceberg_test SET data = 'hhh' WHERE id = 8")
res27: org.apache.spark.sql.DataFrame = []

24/09/26 21:12:14 ERROR MicroBatchExecution: Query [id = 55630ffa-df96-4b4b-abe0-abe95c658565,
  runId = 5f628a2a-6c8a-4944-bab3-4f2101f4cded] terminated with error
java.lang.IllegalStateException: Cannot process overwrite snapshot: 2626116465374966496, to ignore overwrites,
  set streaming-skip-overwrite-snapshots=true
  at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkState(Preconditions.java:772)
  at org.apache.iceberg.spark.source.SparkMicroBatchStream.shouldProcess(SparkMicroBatchStream.java:271)
  at org.apache.iceberg.spark.source.SparkMicroBatchStream.nextValidSnapshot(SparkMicroBatchStream.java:426)
  at org.apache.iceberg.spark.source.SparkMicroBatchStream.latestOffset(SparkMicroBatchStream.java:395)
  ...

# Start streaming job with ignore options
scala> spark.readStream
  .option("streaming-skip-overwrite-snapshots", true)
  .option("streaming-skip-delete-snapshots", true)
  .table("myglue.default.iceberg_test")
  .writeStream
  .outputMode("update")
  .format("console")
  .trigger(Trigger.ProcessingTime("5 seconds"))
  .option("checkpointLocation", "s3://checkpoint_2")
  .start()

# Ignored
scala> sql("UPDATE myglue.default.iceberg_test SET data = 'hhh123' WHERE id = 8")
res47: org.apache.spark.sql.DataFrame = []

scala> sql("INSERT INTO myglue.default.iceberg_test values (10, 'i', 10.0)")
res48: org.apache.spark.sql.DataFrame = []

scala> -------------------------------------------
Batch: 1
-------------------------------------------
+---+----+----+
| id|data| col|
+---+----+----+
| 10|   i|10.0|
+---+----+----+
@dai-chen
Copy link
Collaborator Author

TODO: Verify if current Flint append mode also has this issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
DataSource:Iceberg enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

1 participant