Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ class CometSparkSessionExtensions

// data source V1
case scanExec @ FileSourceScanExec(
HadoopFsRelation(_, partitionSchema, _, _, _: ParquetFileFormat, _),
HadoopFsRelation(_, partitionSchema, _, _, fileFormat, _),
_: Seq[_],
requiredSchema,
_,
Expand All @@ -199,14 +199,15 @@ class CometSparkSessionExtensions
_,
_,
_)
if CometScanExec.isSchemaSupported(requiredSchema)
if CometScanExec.isFileFormatSupported(fileFormat)
&& CometScanExec.isSchemaSupported(requiredSchema)
&& CometScanExec.isSchemaSupported(partitionSchema) =>
logInfo("Comet extension enabled for v1 Scan")
CometScanExec(scanExec, session)

// data source v1 not supported case
case scanExec @ FileSourceScanExec(
HadoopFsRelation(_, partitionSchema, _, _, _: ParquetFileFormat, _),
HadoopFsRelation(_, partitionSchema, _, _, fileFormat, _),
_: Seq[_],
requiredSchema,
_,
Expand All @@ -216,12 +217,15 @@ class CometSparkSessionExtensions
_,
_) =>
val info1 = createMessage(
!CometScanExec.isFileFormatSupported(fileFormat),
s"File format $fileFormat is not supported")
val info2 = createMessage(
!CometScanExec.isSchemaSupported(requiredSchema),
s"Schema $requiredSchema is not supported")
val info2 = createMessage(
val info3 = createMessage(
!CometScanExec.isSchemaSupported(partitionSchema),
s"Partition schema $partitionSchema is not supported")
withInfo(scanExec, Seq(info1, info2).flatten.mkString(","))
withInfo(scanExec, Seq(info1, info2, info3).flatten.mkString(","))
scanExec
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.comet.shims.ShimCometScanExec
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions}
import org.apache.spark.sql.execution.datasources.v2.DataSourceRDD
import org.apache.spark.sql.execution.metric._
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -503,4 +503,10 @@ object CometScanExec extends DataTypeSupport {
scanExec.logicalLink.foreach(batchScanExec.setLogicalLink)
batchScanExec
}

def isFileFormatSupported(fileFormat: FileFormat): Boolean = {
// Only support Spark's built-in Parquet scans, not others such as Delta which use a subclass
// of ParquetFileFormat.
fileFormat.getClass().equals(classOf[ParquetFileFormat])
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, CometBroadcastHas
import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, CometShuffleExchangeExec}
import org.apache.spark.sql.execution.{CollectLimitExec, ProjectExec, SQLExecution, UnionExec}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins.{BroadcastNestedLoopJoinExec, CartesianProductExec, SortMergeJoinExec}
import org.apache.spark.sql.execution.reuse.ReuseExchangeAndSubquery
Expand Down Expand Up @@ -1883,6 +1884,14 @@ class CometExecSuite extends CometTestBase {
}
}
}

test("Supported file formats for CometScanExec") {
assert(CometScanExec.isFileFormatSupported(new ParquetFileFormat()))

class CustomParquetFileFormat extends ParquetFileFormat {}

assert(!CometScanExec.isFileFormatSupported(new CustomParquetFileFormat()))
}
}

case class BucketedTableTestSpec(
Expand Down
Loading