Skip to content

Structured streaming writes to partitioned table fails when spark.sql.extensions is set to IcebergSparkSessionExtensions #7226

Open
@adigerber

Description

@adigerber

Apache Iceberg version

1.2.0 (latest release)

Query engine

Spark

Please describe the bug 🐞

According to the documentation, when using Iceberg, one should set spark.sql.extensions to org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions, but setting this property seems to cause an exception to be thrown when trying to write to an Iceberg table using Spark structured streaming.

The exception that is thrown is:

Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: days(ts) is not currently supported
=== Streaming Query ===
Identifier: [id = cfb83943-cd87-4c84-bf25-a290e8891e19, runId = ddf71690-7e5d-41f6-8a8e-84c425683a26]
Current Committed Offsets: {}
Current Available Offsets: {MemoryStream[ts#3,a#4,b#5]: 0}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
WriteToMicroBatchDataSource spark_catalog.default.test_iceberg_table, cfb83943-cd87-4c84-bf25-a290e8891e19, [path=test_iceberg_table, fanout-enabled=true, checkpointLocation=/tmp/spark-checkpoint-16659193840247202419], Append
+- StreamingDataSourceV2Relation [ts#3, a#4, b#5], org.apache.spark.sql.execution.streaming.MemoryStreamScanBuilder@4533d1cf, MemoryStream[ts#3,a#4,b#5]

Code to reproduce:

package com.example

import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{Encoders, SparkSession}

import java.nio.file.Files
import java.sql.Timestamp

case class Bla(ts: Timestamp, a: String, b: Double)

object MinEx {
  def main(args: Array[String]): Unit = {
    val warehouseDir = Files.createTempDirectory("spark-warehouse-iceberg-").toString
    val checkpointDir = Files.createTempDirectory("spark-checkpoint-").toString
    val spark = SparkSession.builder()
      .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
      .config("spark.sql.catalog.spark_catalog.type", "hadoop")
      .config("spark.sql.catalog.spark_catalog.warehouse", warehouseDir)
      .config("spark.sql.warehouse.dir", warehouseDir)
      .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
      .appName("BugRepro")
      .master("local[*]")
      .enableHiveSupport()
      .getOrCreate()

    spark.sql("create table test_iceberg_table(ts timestamp, a string, b double) using iceberg partitioned by (days(ts))")

    implicit val sqlContext = spark.sqlContext
    implicit val encoder = Encoders.product[Bla]
    val memStream = MemoryStream[Bla]
    val now = System.currentTimeMillis()
    val day = 86400000
    memStream.addData(List(
      Bla(new Timestamp(now), "test", 12.34),
      Bla(new Timestamp(now - 1 * day), "test 1d", 33.34),
      Bla(new Timestamp(now - 3 * day), "test 3d", 44.34),
      Bla(new Timestamp(now - 2 * day), "test 2d", 55.34),
    ))

    memStream.toDF()
      .writeStream
      .format("iceberg")
      .outputMode("append")
      .option("path", "test_iceberg_table")
      .option("fanout-enabled", true)
      .option("checkpointLocation", checkpointDir)
      .trigger(Trigger.Once())
      .start()
      .awaitTermination()
  }
}

The code works as expected when the statement that configures spark.sql.extensions is commented out.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workingspark

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions