Open
Description
Apache Iceberg version
1.2.0 (latest release)
Query engine
Spark
Please describe the bug 🐞
According to the documentation, when using Iceberg, one should set spark.sql.extensions
to org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
, but setting this property seems to cause an exception to be thrown when trying to write to an Iceberg table using Spark structured streaming.
The exception that is thrown is:
Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: days(ts) is not currently supported
=== Streaming Query ===
Identifier: [id = cfb83943-cd87-4c84-bf25-a290e8891e19, runId = ddf71690-7e5d-41f6-8a8e-84c425683a26]
Current Committed Offsets: {}
Current Available Offsets: {MemoryStream[ts#3,a#4,b#5]: 0}
Current State: ACTIVE
Thread State: RUNNABLE
Logical Plan:
WriteToMicroBatchDataSource spark_catalog.default.test_iceberg_table, cfb83943-cd87-4c84-bf25-a290e8891e19, [path=test_iceberg_table, fanout-enabled=true, checkpointLocation=/tmp/spark-checkpoint-16659193840247202419], Append
+- StreamingDataSourceV2Relation [ts#3, a#4, b#5], org.apache.spark.sql.execution.streaming.MemoryStreamScanBuilder@4533d1cf, MemoryStream[ts#3,a#4,b#5]
Code to reproduce:
package com.example
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{Encoders, SparkSession}
import java.nio.file.Files
import java.sql.Timestamp
case class Bla(ts: Timestamp, a: String, b: Double)
object MinEx {
def main(args: Array[String]): Unit = {
val warehouseDir = Files.createTempDirectory("spark-warehouse-iceberg-").toString
val checkpointDir = Files.createTempDirectory("spark-checkpoint-").toString
val spark = SparkSession.builder()
.config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
.config("spark.sql.catalog.spark_catalog.type", "hadoop")
.config("spark.sql.catalog.spark_catalog.warehouse", warehouseDir)
.config("spark.sql.warehouse.dir", warehouseDir)
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.appName("BugRepro")
.master("local[*]")
.enableHiveSupport()
.getOrCreate()
spark.sql("create table test_iceberg_table(ts timestamp, a string, b double) using iceberg partitioned by (days(ts))")
implicit val sqlContext = spark.sqlContext
implicit val encoder = Encoders.product[Bla]
val memStream = MemoryStream[Bla]
val now = System.currentTimeMillis()
val day = 86400000
memStream.addData(List(
Bla(new Timestamp(now), "test", 12.34),
Bla(new Timestamp(now - 1 * day), "test 1d", 33.34),
Bla(new Timestamp(now - 3 * day), "test 3d", 44.34),
Bla(new Timestamp(now - 2 * day), "test 2d", 55.34),
))
memStream.toDF()
.writeStream
.format("iceberg")
.outputMode("append")
.option("path", "test_iceberg_table")
.option("fanout-enabled", true)
.option("checkpointLocation", checkpointDir)
.trigger(Trigger.Once())
.start()
.awaitTermination()
}
}
The code works as expected when the statement that configures spark.sql.extensions
is commented out.