Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 24 additions & 14 deletions spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -142,19 +142,24 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
conf.getConfString("parquet.crypto.factory.class", "").nonEmpty &&
conf.getConfString("parquet.encryption.kms.client.class", "").nonEmpty

var scanImpl = COMET_NATIVE_SCAN_IMPL.get()

// if scan is auto then pick the best available scan
if (scanImpl == SCAN_AUTO) {
if (encryptionEnabled) {
logInfo(
s"Auto scan mode falling back to $SCAN_NATIVE_COMET because " +
s"$SCAN_NATIVE_ICEBERG_COMPAT does not support reading encrypted Parquet files")
scanImpl = SCAN_NATIVE_COMET
} else {
scanImpl = selectScan(scanExec, r.partitionSchema)
// Derive scan mode and save it to session level to avoid recomputation
val scanImpl = CometScanRule.currentScanImplementation.getOrElse({
var scanModeImpl = COMET_NATIVE_SCAN_IMPL.get()
// if scan is auto then pick the best available scan
if (scanModeImpl == SCAN_AUTO) {
if (encryptionEnabled) {
logInfo(
s"Auto scan mode falling back to $SCAN_NATIVE_COMET because " +
s"$SCAN_NATIVE_ICEBERG_COMPAT does not support reading encrypted Parquet files")
scanModeImpl = SCAN_NATIVE_COMET
} else {
scanModeImpl = selectScan(scanExec, r.partitionSchema)
}
}
}

CometScanRule.currentScanImplementation = Some(scanModeImpl)
scanModeImpl
})

if (scanImpl == SCAN_NATIVE_DATAFUSION && !COMET_EXEC_ENABLED.get()) {
fallbackReasons +=
Expand Down Expand Up @@ -409,7 +414,13 @@ object CometScanRule extends Logging {
* session, but we reset the cache once it reaches a fixed size to prevent it growing
* indefinitely.
*/
val configValidityMapMaxSize = 1024
private val configValidityMapMaxSize = 1024

// Comet derives on fly scan implementation approach and this variable to access the selected value
private var currentScanImplementation: Option[String] = None

// Current selected scan implementation
def getCurrentScanImplementation: Option[String] = currentScanImplementation

def validateObjectStoreConfig(
filePath: String,
Expand Down Expand Up @@ -450,6 +461,5 @@ object CometScanRule extends Logging {
throw e
}
}

}
}
Loading