Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions docs/source/user-guide/latest/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,9 @@ The native Iceberg reader supports the following features:

**Table specifications:**

- Iceberg table spec v1 and v2 (v3 will fall back to Spark)
- Iceberg table spec v1, v2, and v3 (basic read support)
- V3 tables using V2-compatible features are fully accelerated
- V3-specific features (Deletion Vectors, new types) will gracefully fall back to Spark

**Schema and data types:**

Expand Down Expand Up @@ -266,10 +268,17 @@ scala> spark.sql("SELECT * FROM rest_cat.db.test_table").show()

The following scenarios will fall back to Spark's native Iceberg reader:

- Iceberg table spec v3 scans
- Iceberg writes (reads are accelerated, writes use Spark)
- Tables backed by Avro or ORC data files (only Parquet is accelerated)
- Tables partitioned on `BINARY` or `DECIMAL` (with precision >28) columns
- Scans with residual filters using `truncate`, `bucket`, `year`, `month`, `day`, or `hour`
transform functions (partition pruning still works, but row-level filtering of these
transforms falls back)

**V3-specific limitations (graceful fallback to Spark):**

- Deletion Vectors (DVs) - V3's efficient bitmap-based deletes stored in Puffin files
- V3-only data types: `timestamp_ns`, `timestamptz_ns`, `variant`, `geometry`, `geography`

Note: V3 tables that use only V2-compatible features (position deletes, equality deletes,
standard types) are fully accelerated by Comet.
159 changes: 159 additions & 0 deletions spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,165 @@ object IcebergReflection extends Logging {
}
None
}

// ============================================================================
// Iceberg V3 Feature Detection
// ============================================================================

/** V3-only types not yet supported by Comet. */
object V3Types {
val TIMESTAMP_NS = "timestamp_ns"
val TIMESTAMPTZ_NS = "timestamptz_ns"
val VARIANT = "variant"
val GEOMETRY = "geometry"
val GEOGRAPHY = "geography"

val UNSUPPORTED_V3_TYPES: Set[String] =
Set(TIMESTAMP_NS, TIMESTAMPTZ_NS, VARIANT, GEOMETRY, GEOGRAPHY)
}

/** V3 Deletion Vector content type. */
object V3ContentTypes {
val DELETION_VECTOR = "DELETION_VECTOR"
}

/** Checks if any delete files use Deletion Vectors (V3 feature). */
def hasDeletionVectors(tasks: java.util.List[_]): Boolean = {
import scala.jdk.CollectionConverters._

try {
val deleteFiles = getDeleteFiles(tasks)

deleteFiles.asScala.exists { deleteFile =>
try {
// scalastyle:off classforname
val contentFileClass = Class.forName(ClassNames.CONTENT_FILE)
// scalastyle:on classforname
val contentMethod = contentFileClass.getMethod("content")
val content = contentMethod.invoke(deleteFile)
content.toString == V3ContentTypes.DELETION_VECTOR
} catch {
case _: Exception => false
}
}
} catch {
case _: Exception => false
}
}

/** Finds V3-only types in a schema not yet supported by Comet. */
def findUnsupportedV3Types(schema: Any): Set[String] = {
import scala.jdk.CollectionConverters._

val unsupportedTypes = scala.collection.mutable.Set[String]()

try {
val columnsMethod = schema.getClass.getMethod("columns")
val columns = columnsMethod.invoke(schema).asInstanceOf[java.util.List[_]]

columns.asScala.foreach { column =>
try {
val typeMethod = column.getClass.getMethod("type")
val icebergType = typeMethod.invoke(column)
val typeStr = icebergType.toString.toLowerCase(java.util.Locale.ROOT)

V3Types.UNSUPPORTED_V3_TYPES.foreach { v3Type =>
if (typeStr == v3Type || typeStr.startsWith(s"$v3Type(")) {
unsupportedTypes += v3Type
}
}

checkNestedTypesForV3(icebergType, unsupportedTypes)
} catch {
case _: Exception => // Skip columns where we can't determine type
}
}
} catch {
case e: Exception =>
logWarning(s"Failed to scan schema for V3 types: ${e.getMessage}")
}

unsupportedTypes.toSet
}

/** Recursively checks nested types for V3-only types. */
private def checkNestedTypesForV3(
icebergType: Any,
unsupportedTypes: scala.collection.mutable.Set[String]): Unit = {
import scala.jdk.CollectionConverters._

try {
val typeClass = icebergType.getClass

if (typeClass.getSimpleName.contains("StructType")) {
try {
val fieldsMethod = typeClass.getMethod("fields")
val fields = fieldsMethod.invoke(icebergType).asInstanceOf[java.util.List[_]]

fields.asScala.foreach { field =>
try {
val fieldTypeMethod = field.getClass.getMethod("type")
val fieldType = fieldTypeMethod.invoke(field)
val typeStr = fieldType.toString.toLowerCase(java.util.Locale.ROOT)

V3Types.UNSUPPORTED_V3_TYPES.foreach { v3Type =>
if (typeStr == v3Type || typeStr.startsWith(s"$v3Type(")) {
unsupportedTypes += v3Type
}
}

checkNestedTypesForV3(fieldType, unsupportedTypes)
} catch {
case _: Exception =>
}
}
} catch {
case _: Exception =>
}
}

if (typeClass.getSimpleName.contains("ListType")) {
try {
val elementTypeMethod = typeClass.getMethod("elementType")
val elementType = elementTypeMethod.invoke(icebergType)
val typeStr = elementType.toString.toLowerCase(java.util.Locale.ROOT)

V3Types.UNSUPPORTED_V3_TYPES.foreach { v3Type =>
if (typeStr == v3Type || typeStr.startsWith(s"$v3Type(")) {
unsupportedTypes += v3Type
}
}

checkNestedTypesForV3(elementType, unsupportedTypes)
} catch {
case _: Exception =>
}
}

if (typeClass.getSimpleName.contains("MapType")) {
try {
val keyTypeMethod = typeClass.getMethod("keyType")
val valueTypeMethod = typeClass.getMethod("valueType")
val keyType = keyTypeMethod.invoke(icebergType)
val valueType = valueTypeMethod.invoke(icebergType)

Seq(keyType, valueType).foreach { mapType =>
val typeStr = mapType.toString.toLowerCase(java.util.Locale.ROOT)
V3Types.UNSUPPORTED_V3_TYPES.foreach { v3Type =>
if (typeStr == v3Type || typeStr.startsWith(s"$v3Type(")) {
unsupportedTypes += v3Type
}
}
checkNestedTypesForV3(mapType, unsupportedTypes)
}
} catch {
case _: Exception =>
}
}
} catch {
case _: Exception =>
}
}
}

/**
Expand Down
42 changes: 39 additions & 3 deletions spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -441,10 +441,10 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com

val formatVersionSupported = IcebergReflection.getFormatVersion(metadata.table) match {
case Some(formatVersion) =>
if (formatVersion > 2) {
if (formatVersion > 3) {
fallbackReasons += "Iceberg table format version " +
s"$formatVersion is not supported. " +
"Comet only supports Iceberg table format V1 and V2"
"Comet supports Iceberg table format V1, V2, and V3"
false
} else {
true
Expand Down Expand Up @@ -621,10 +621,46 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
!hasUnsupportedDeletes
}

val v3FeaturesSupported = IcebergReflection.getFormatVersion(metadata.table) match {
case Some(formatVersion) if formatVersion >= 3 =>
var allV3FeaturesSupported = true

try {
if (IcebergReflection.hasDeletionVectors(metadata.tasks)) {
fallbackReasons += "Iceberg V3 Deletion Vectors are not yet supported. " +
"Tables using Deletion Vectors will fall back to Spark"
allV3FeaturesSupported = false
}
} catch {
case e: Exception =>
fallbackReasons += "Iceberg reflection failure: Could not check for " +
s"Deletion Vectors: ${e.getMessage}"
allV3FeaturesSupported = false
}

try {
val unsupportedTypes = IcebergReflection.findUnsupportedV3Types(metadata.scanSchema)
if (unsupportedTypes.nonEmpty) {
fallbackReasons += "Iceberg V3 types not yet supported: " +
s"${unsupportedTypes.mkString(", ")}. " +
"Tables with these types will fall back to Spark"
allV3FeaturesSupported = false
}
} catch {
case e: Exception =>
fallbackReasons += "Iceberg reflection failure: Could not check for " +
s"V3 types: ${e.getMessage}"
allV3FeaturesSupported = false
}

allV3FeaturesSupported
case _ => true
}

if (schemaSupported && fileIOCompatible && formatVersionSupported && allParquetFiles &&
allSupportedFilesystems && partitionTypesSupported &&
complexTypePredicatesSupported && transformFunctionsSupported &&
deleteFileTypesSupported) {
deleteFileTypesSupported && v3FeaturesSupported) {
CometBatchScanExec(
scanExec.clone().asInstanceOf[BatchScanExec],
runtimeFilters = scanExec.runtimeFilters,
Expand Down
48 changes: 48 additions & 0 deletions spark/src/test/resources/sql-tests/iceberg/v3_aggregations.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.

-- Iceberg V3 aggregations and grouping

statement
CREATE NAMESPACE IF NOT EXISTS iceberg_cat.db

statement
CREATE TABLE iceberg_cat.db.v3_orders (order_id INT, customer_id INT, amount DOUBLE, category STRING) USING iceberg TBLPROPERTIES ('format-version' = '3')

statement
INSERT INTO iceberg_cat.db.v3_orders VALUES (1, 1, 100.0, 'A'), (2, 1, 200.0, 'B'), (3, 2, 150.0, 'A'), (4, 3, 300.0, 'B'), (5, 2, 50.0, 'A')

query spark_answer_only
SELECT COUNT(*) FROM iceberg_cat.db.v3_orders

query spark_answer_only
SELECT SUM(amount) FROM iceberg_cat.db.v3_orders

query spark_answer_only
SELECT AVG(amount) FROM iceberg_cat.db.v3_orders

query spark_answer_only
SELECT MIN(amount), MAX(amount) FROM iceberg_cat.db.v3_orders

query spark_answer_only
SELECT customer_id, SUM(amount) as total FROM iceberg_cat.db.v3_orders GROUP BY customer_id ORDER BY customer_id

query spark_answer_only
SELECT category, COUNT(*) as cnt, AVG(amount) as avg_amt FROM iceberg_cat.db.v3_orders GROUP BY category ORDER BY category

query spark_answer_only
SELECT customer_id, SUM(amount) as total FROM iceberg_cat.db.v3_orders GROUP BY customer_id HAVING SUM(amount) > 200 ORDER BY customer_id
42 changes: 42 additions & 0 deletions spark/src/test/resources/sql-tests/iceberg/v3_basic.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.

-- Iceberg V3 basic table operations

statement
CREATE NAMESPACE IF NOT EXISTS iceberg_cat.db

statement
CREATE TABLE iceberg_cat.db.v3_basic (id INT, name STRING, value DOUBLE) USING iceberg TBLPROPERTIES ('format-version' = '3')

statement
INSERT INTO iceberg_cat.db.v3_basic VALUES (1, 'Alice', 10.5), (2, 'Bob', 20.3), (3, 'Charlie', 30.7)

query spark_answer_only
SELECT * FROM iceberg_cat.db.v3_basic ORDER BY id

query spark_answer_only
SELECT * FROM iceberg_cat.db.v3_basic WHERE id > 1 ORDER BY id

query spark_answer_only
SELECT * FROM iceberg_cat.db.v3_basic WHERE name = 'Alice'

query spark_answer_only
SELECT COUNT(*) FROM iceberg_cat.db.v3_basic

query spark_answer_only
SELECT id, name FROM iceberg_cat.db.v3_basic WHERE value > 15.0 ORDER BY id
42 changes: 42 additions & 0 deletions spark/src/test/resources/sql-tests/iceberg/v3_joins.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.

-- Iceberg V3 join operations

statement
CREATE NAMESPACE IF NOT EXISTS iceberg_cat.db

statement
CREATE TABLE iceberg_cat.db.v3_customers (id INT, name STRING) USING iceberg TBLPROPERTIES ('format-version' = '3')

statement
CREATE TABLE iceberg_cat.db.v3_orders_join (order_id INT, customer_id INT, amount DOUBLE) USING iceberg TBLPROPERTIES ('format-version' = '3')

statement
INSERT INTO iceberg_cat.db.v3_customers VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Charlie')

statement
INSERT INTO iceberg_cat.db.v3_orders_join VALUES (1, 1, 100.0), (2, 1, 200.0), (3, 2, 150.0), (4, 3, 300.0)

query spark_answer_only
SELECT o.order_id, c.name, o.amount FROM iceberg_cat.db.v3_orders_join o JOIN iceberg_cat.db.v3_customers c ON o.customer_id = c.id ORDER BY o.order_id

query spark_answer_only
SELECT c.name, SUM(o.amount) as total FROM iceberg_cat.db.v3_orders_join o JOIN iceberg_cat.db.v3_customers c ON o.customer_id = c.id GROUP BY c.name ORDER BY c.name

query spark_answer_only
SELECT c.name, COUNT(o.order_id) as order_count FROM iceberg_cat.db.v3_customers c LEFT JOIN iceberg_cat.db.v3_orders_join o ON c.id = o.customer_id GROUP BY c.name ORDER BY c.name
Loading