Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 147 additions & 0 deletions spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,151 @@ import org.apache.spark.internal.Logging
*/
object IcebergReflection extends Logging {

/**
* Cache for reflection classes and methods to avoid repeated lookups. All Iceberg reflection
* operations are expensive, so we cache them once and reuse.
*/
case class ReflectionCache(
// Iceberg classes
contentScanTaskClass: Class[_],
fileScanTaskClass: Class[_],
contentFileClass: Class[_],
deleteFileClass: Class[_],
schemaParserClass: Class[_],
schemaClass: Class[_],
partitionSpecParserClass: Class[_],
partitionSpecClass: Class[_],
structLikeClass: Class[_],
nestedFieldClass: Class[_],
// ContentScanTask methods
fileMethod: java.lang.reflect.Method,
startMethod: java.lang.reflect.Method,
lengthMethod: java.lang.reflect.Method,
partitionMethod: java.lang.reflect.Method,
residualMethod: java.lang.reflect.Method,
// FileScanTask methods
taskSchemaMethod: java.lang.reflect.Method,
deletesMethod: java.lang.reflect.Method,
specMethod: java.lang.reflect.Method,
// ContentFile methods
fileLocationMethod: java.lang.reflect.Method,
// DeleteFile methods
deleteContentMethod: java.lang.reflect.Method,
deleteSpecIdMethod: java.lang.reflect.Method,
deleteEqualityIdsMethod: java.lang.reflect.Method,
// Schema methods
schemaToJsonMethod: java.lang.reflect.Method,
// PartitionSpec methods
partitionSpecToJsonMethod: java.lang.reflect.Method,
partitionTypeMethod: java.lang.reflect.Method,
// StructType methods
structTypeFieldsMethod: java.lang.reflect.Method,
// NestedField methods
nestedFieldTypeMethod: java.lang.reflect.Method,
nestedFieldIdMethod: java.lang.reflect.Method,
nestedFieldNameMethod: java.lang.reflect.Method,
nestedFieldIsOptionalMethod: java.lang.reflect.Method,
// StructLike methods
structLikeGetMethod: java.lang.reflect.Method)

/**
* Creates a ReflectionCache by loading all classes and methods once.
*/
def createReflectionCache(): ReflectionCache = {
// scalastyle:off classforname
val contentScanTaskClass = Class.forName(ClassNames.CONTENT_SCAN_TASK)
val fileScanTaskClass = Class.forName(ClassNames.FILE_SCAN_TASK)
val contentFileClass = Class.forName(ClassNames.CONTENT_FILE)
val deleteFileClass = Class.forName(ClassNames.DELETE_FILE)
val schemaParserClass = Class.forName(ClassNames.SCHEMA_PARSER)
val schemaClass = Class.forName(ClassNames.SCHEMA)
val partitionSpecParserClass = Class.forName(ClassNames.PARTITION_SPEC_PARSER)
val partitionSpecClass = Class.forName(ClassNames.PARTITION_SPEC)
val structTypeClass = Class.forName(ClassNames.STRUCT_TYPE)
val nestedFieldClass = Class.forName(ClassNames.NESTED_FIELD)
val structLikeClass = Class.forName(ClassNames.STRUCT_LIKE)
// scalastyle:on classforname

// ContentScanTask methods
val fileMethod = contentScanTaskClass.getMethod("file")
val startMethod = contentScanTaskClass.getMethod("start")
val lengthMethod = contentScanTaskClass.getMethod("length")
val partitionMethod = contentScanTaskClass.getMethod("partition")
val residualMethod = contentScanTaskClass.getMethod("residual")

// FileScanTask methods
val taskSchemaMethod = fileScanTaskClass.getMethod("schema")
val deletesMethod = fileScanTaskClass.getMethod("deletes")
val specMethod = fileScanTaskClass.getMethod("spec")

// ContentFile methods - try location() first, fall back to path()
val fileLocationMethod =
try {
contentFileClass.getMethod("location")
} catch {
case _: NoSuchMethodException => contentFileClass.getMethod("path")
}

// DeleteFile methods
val deleteContentMethod = deleteFileClass.getMethod("content")
val deleteSpecIdMethod = deleteFileClass.getMethod("specId")
val deleteEqualityIdsMethod = deleteFileClass.getMethod("equalityFieldIds")

// Schema serialization
val schemaToJsonMethod = schemaParserClass.getMethod("toJson", schemaClass)
schemaToJsonMethod.setAccessible(true)

// PartitionSpec methods
val partitionSpecToJsonMethod =
partitionSpecParserClass.getMethod("toJson", partitionSpecClass)
val partitionTypeMethod = partitionSpecClass.getMethod("partitionType")

// StructType methods
val structTypeFieldsMethod = structTypeClass.getMethod("fields")

// NestedField methods
val nestedFieldTypeMethod = nestedFieldClass.getMethod("type")
val nestedFieldIdMethod = nestedFieldClass.getMethod("fieldId")
val nestedFieldNameMethod = nestedFieldClass.getMethod("name")
val nestedFieldIsOptionalMethod = nestedFieldClass.getMethod("isOptional")

// StructLike methods
val structLikeGetMethod = structLikeClass.getMethod("get", classOf[Int], classOf[Class[_]])

ReflectionCache(
contentScanTaskClass = contentScanTaskClass,
fileScanTaskClass = fileScanTaskClass,
contentFileClass = contentFileClass,
deleteFileClass = deleteFileClass,
schemaParserClass = schemaParserClass,
schemaClass = schemaClass,
partitionSpecParserClass = partitionSpecParserClass,
partitionSpecClass = partitionSpecClass,
structLikeClass = structLikeClass,
nestedFieldClass = nestedFieldClass,
fileMethod = fileMethod,
startMethod = startMethod,
lengthMethod = lengthMethod,
partitionMethod = partitionMethod,
residualMethod = residualMethod,
taskSchemaMethod = taskSchemaMethod,
deletesMethod = deletesMethod,
specMethod = specMethod,
fileLocationMethod = fileLocationMethod,
deleteContentMethod = deleteContentMethod,
deleteSpecIdMethod = deleteSpecIdMethod,
deleteEqualityIdsMethod = deleteEqualityIdsMethod,
schemaToJsonMethod = schemaToJsonMethod,
partitionSpecToJsonMethod = partitionSpecToJsonMethod,
partitionTypeMethod = partitionTypeMethod,
structTypeFieldsMethod = structTypeFieldsMethod,
nestedFieldTypeMethod = nestedFieldTypeMethod,
nestedFieldIdMethod = nestedFieldIdMethod,
nestedFieldNameMethod = nestedFieldNameMethod,
nestedFieldIsOptionalMethod = nestedFieldIsOptionalMethod,
structLikeGetMethod = structLikeGetMethod)
}

/**
* Iceberg class names used throughout Comet.
*/
Expand All @@ -46,6 +191,8 @@ object IcebergReflection extends Logging {
val PARTITION_SPEC = "org.apache.iceberg.PartitionSpec"
val PARTITION_FIELD = "org.apache.iceberg.PartitionField"
val UNBOUND_PREDICATE = "org.apache.iceberg.expressions.UnboundPredicate"
val STRUCT_TYPE = "org.apache.iceberg.types.Types$StructType"
val NESTED_FIELD = "org.apache.iceberg.types.Types$NestedField"
}

/**
Expand Down
Loading
Loading