Skip to content

Commit d1fa32e

Browse files
committed
SPARK-23325: Add physical projection in DataSourceV2Strategy.
These projections ensure that rows are converted to UnsafeRow before they are passed to physical operators that require UnsafeRow. These operators are rare and both project and filter operators support InternalRow. When push-down is handled during conversion to a physical plan, filters should be placed below a final projection.
1 parent a0a005b commit d1fa32e

File tree

1 file changed

+5
-8
lines changed

1 file changed

+5
-8
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -125,16 +125,13 @@ object DataSourceV2Strategy extends Strategy {
125125
val filterCondition = postScanFilters.reduceLeftOption(And)
126126
val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan)
127127

128-
val withProjection = if (withFilter.output != project) {
129-
ProjectExec(project, withFilter)
130-
} else {
131-
withFilter
132-
}
133-
134-
withProjection :: Nil
128+
// always add the projection, which will produce unsafe rows required by some operators
129+
ProjectExec(project, withFilter) :: Nil
135130

136131
case r: StreamingDataSourceV2Relation =>
137-
DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, r.reader) :: Nil
132+
// ensure there is a projection, which will produce unsafe rows required by some operators
133+
ProjectExec(r.output,
134+
DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, r.reader)) :: Nil
138135

139136
case WriteToDataSourceV2(writer, query) =>
140137
WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil

0 commit comments

Comments
 (0)