Skip to content

Commit 13f54a9

Browse files
committed
Rewrite Alias in StreamExecution if necessary
1 parent 3bc2eff commit 13f54a9

File tree

2 files changed

+32
-1
lines changed

2 files changed

+32
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.hadoop.fs.Path
2929
import org.apache.spark.internal.Logging
3030
import org.apache.spark.sql._
3131
import org.apache.spark.sql.catalyst.encoders.RowEncoder
32-
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
32+
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
3333
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
3434
import org.apache.spark.sql.execution.QueryExecution
3535
import org.apache.spark.sql.execution.command.ExplainCommand
@@ -495,8 +495,13 @@ class StreamExecution(
495495

496496
// Rewire the plan to use the new attributes that were returned by the source.
497497
val replacementMap = AttributeMap(replacements)
498+
val exprIdMap =
499+
replacements.map { case (oldAttr, newAttr) => (oldAttr.exprId, newAttr.exprId)}.toMap
498500
val triggerLogicalPlan = withNewSources transformAllExpressions {
499501
case a: Attribute if replacementMap.contains(a) => replacementMap(a)
502+
case a: Alias if exprIdMap.contains(a.exprId) =>
503+
// Also rewrite `Alias`s as they may use the same `exprId` of `Attribute`s.
504+
Alias(a.child, a.name)(exprIdMap(a.exprId), a.qualifier, a.explicitMetadata, a.isGenerated)
500505
case ct: CurrentTimestamp =>
501506
CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
502507
ct.dataType)

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,32 @@ class StreamSuite extends StreamTest {
304304
q.stop()
305305
}
306306
}
307+
308+
test("SPARK-19065 Alia should be replaced as well") {
309+
withTempPath { testPath =>
310+
val data = Seq((1, 2), (2, 3), (3, 4))
311+
data.toDS.write.mode("overwrite").json(testPath.getCanonicalPath)
312+
val schema = spark.read.json(testPath.getCanonicalPath).schema
313+
val query = spark
314+
.readStream
315+
.schema(schema)
316+
.json(testPath.getCanonicalPath)
317+
.dropDuplicates("_1") // dropDuplicates will create an Alias using the same exprId.
318+
.writeStream
319+
.format("memory")
320+
.queryName("testquery")
321+
.outputMode("complete")
322+
.start()
323+
try {
324+
query.processAllAvailable()
325+
if (query.exception.isDefined) {
326+
throw query.exception.get
327+
}
328+
} finally {
329+
query.stop()
330+
}
331+
}
332+
}
307333
}
308334

309335
/**

0 commit comments

Comments
 (0)