Skip to content

Commit

Permalink
fix: Use makeCopy to change relation in FileSourceScanExec
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Mar 13, 2024
1 parent 9aa42cd commit 76ab588
Showing 1 changed file with 24 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package org.apache.spark.sql.comet

import scala.collection.mutable.HashMap
import scala.concurrent.duration.NANOSECONDS
import scala.reflect.ClassTag

import org.apache.hadoop.fs.Path
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -439,8 +440,29 @@ case class CometScanExec(

object CometScanExec {
def apply(scanExec: FileSourceScanExec, session: SparkSession): CometScanExec = {
val wrapped = scanExec.copy(relation =
scanExec.relation.copy(fileFormat = new CometParquetFileFormat)(session))
// TreeNode.mapProductIterator is protected method.
def mapProductIterator[B: ClassTag](product: Product, f: Any => B): Array[B] = {
val arr = Array.ofDim[B](product.productArity)
var i = 0
while (i < arr.length) {
arr(i) = f(product.productElement(i))
i += 1
}
arr
}

// Replacing the relation in FileSourceScanExec by `copy` seems causing some issues
// on other Spark distributions if FileSourceScanExec constructor is changed.
// Using `makeCopy` to avoid the issue.
// https://github.com/apache/arrow-datafusion-comet/issues/190
def transform(arg: Any): AnyRef = arg match {
case _: HadoopFsRelation =>
scanExec.relation.copy(fileFormat = new CometParquetFileFormat)(session)
case other: AnyRef => other
case null => null
}
val newArgs = mapProductIterator(scanExec, transform(_))
val wrapped = scanExec.makeCopy(newArgs).asInstanceOf[FileSourceScanExec]
val batchScanExec = CometScanExec(
wrapped.relation,
wrapped.output,
Expand Down

0 comments on commit 76ab588

Please sign in to comment.