Skip to content

Commit 4f38e3b

Browse files
committed
Don't inherit expression id in dropDuplicates
1 parent 13f54a9 commit 4f38e3b

File tree

4 files changed

+12
-41
lines changed

4 files changed

+12
-41
lines changed

sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2003,10 +2003,7 @@ class Dataset[T] private[sql](
20032003
if (groupColExprIds.contains(attr.exprId)) {
20042004
attr
20052005
} else {
2006-
// Removing duplicate rows should not change output attributes. We should keep
2007-
// the original exprId of the attribute. Otherwise, to select a column in original
2008-
// dataset will cause analysis exception due to unresolved attribute.
2009-
Alias(new First(attr).toAggregateExpression(), attr.name)(exprId = attr.exprId)
2006+
Alias(new First(attr).toAggregateExpression(), attr.name)()
20102007
}
20112008
}
20122009
Aggregate(groupCols, aggCols, logicalPlan)

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.hadoop.fs.Path
2929
import org.apache.spark.internal.Logging
3030
import org.apache.spark.sql._
3131
import org.apache.spark.sql.catalyst.encoders.RowEncoder
32-
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
32+
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
3333
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
3434
import org.apache.spark.sql.execution.QueryExecution
3535
import org.apache.spark.sql.execution.command.ExplainCommand
@@ -495,13 +495,8 @@ class StreamExecution(
495495

496496
// Rewire the plan to use the new attributes that were returned by the source.
497497
val replacementMap = AttributeMap(replacements)
498-
val exprIdMap =
499-
replacements.map { case (oldAttr, newAttr) => (oldAttr.exprId, newAttr.exprId)}.toMap
500498
val triggerLogicalPlan = withNewSources transformAllExpressions {
501499
case a: Attribute if replacementMap.contains(a) => replacementMap(a)
502-
case a: Alias if exprIdMap.contains(a.exprId) =>
503-
// Also rewrite `Alias`s as they may use the same `exprId` of `Attribute`s.
504-
Alias(a.child, a.name)(exprIdMap(a.exprId), a.qualifier, a.explicitMetadata, a.isGenerated)
505500
case ct: CurrentTimestamp =>
506501
CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
507502
ct.dataType)

sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput}
2121
import java.sql.{Date, Timestamp}
2222

2323
import org.apache.spark.sql.catalyst.encoders.{OuterScopes, RowEncoder}
24+
import org.apache.spark.sql.catalyst.expressions.NamedExpression
2425
import org.apache.spark.sql.catalyst.util.sideBySide
2526
import org.apache.spark.sql.execution.{LogicalRDD, RDDScanExec, SortExec}
2627
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchange}
@@ -898,11 +899,15 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
898899
(1, 2), (1, 1), (2, 1), (2, 2))
899900
}
900901

901-
test("dropDuplicates should not change child plan output") {
902-
val ds = Seq(("a", 1), ("a", 2), ("b", 1), ("a", 1)).toDS()
903-
checkDataset(
904-
ds.dropDuplicates("_1").select(ds("_1").as[String], ds("_2").as[Int]),
905-
("a", 1), ("b", 1))
902+
test("SPARK-19065 dropDuplicates should not create expressions using the same id") {
903+
val ds = Seq(("a", 1), ("a", 2), ("b", 1), ("a", 1)).toDS().dropDuplicates("_1")
904+
var exprs = Set.empty[NamedExpression]
905+
ds.logicalPlan.transformAllExpressions { case e: NamedExpression =>
906+
exprs += e
907+
e
908+
}
909+
val duplicatedExprs = exprs.groupBy(expr => expr.exprId).filter(_._2.size > 1).values
910+
assert(duplicatedExprs.isEmpty)
906911
}
907912

908913
test("SPARK-16097: Encoders.tuple should handle null object correctly") {

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -304,32 +304,6 @@ class StreamSuite extends StreamTest {
304304
q.stop()
305305
}
306306
}
307-
308-
test("SPARK-19065 Alia should be replaced as well") {
309-
withTempPath { testPath =>
310-
val data = Seq((1, 2), (2, 3), (3, 4))
311-
data.toDS.write.mode("overwrite").json(testPath.getCanonicalPath)
312-
val schema = spark.read.json(testPath.getCanonicalPath).schema
313-
val query = spark
314-
.readStream
315-
.schema(schema)
316-
.json(testPath.getCanonicalPath)
317-
.dropDuplicates("_1") // dropDuplicates will create an Alias using the same exprId.
318-
.writeStream
319-
.format("memory")
320-
.queryName("testquery")
321-
.outputMode("complete")
322-
.start()
323-
try {
324-
query.processAllAvailable()
325-
if (query.exception.isDefined) {
326-
throw query.exception.get
327-
}
328-
} finally {
329-
query.stop()
330-
}
331-
}
332-
}
333307
}
334308

335309
/**

0 commit comments

Comments
 (0)