@@ -24,7 +24,6 @@ import org.apache.spark.sql.catalyst.InternalRow
2424import org .apache .spark .sql .catalyst .expressions ._
2525import org .apache .spark .sql .catalyst .expressions .codegen .GenerateUnsafeProjection
2626import org .apache .spark .sql .catalyst .plans .physical .{ClusteredDistribution , Distribution , Partitioning }
27- import org .apache .spark .sql .types .{LongType , TimestampType }
2827
2928/**
3029 * Used for calculating the session window start and end for each row, so this plan requires
@@ -46,9 +45,6 @@ case class SessionWindowExec(
4645 child : SparkPlan )
4746 extends UnaryExecNode {
4847
49- private final val WINDOW_START = " start"
50- private final val WINDOW_END = " end"
51-
5248 override def requiredChildDistribution : Seq [Distribution ] = {
5349 ClusteredDistribution (sessionSpec) :: Nil
5450 }
@@ -107,17 +103,11 @@ case class SessionWindowExec(
107103 var rowIndexWithinPartition = 0
108104 var lastTime : Long = _
109105 var windowStartTime : Long = _
110- var windowResultWithBoundary = mutable.ArrayBuffer .empty[(Int , CreateNamedStruct )]
106+ var windowResultWithBoundary = mutable.ArrayBuffer .empty[(Int , ( Long , Long ) )]
111107 var windowResultIndex = 0
112108
113109 private [this ] def addWindowValueAndBoundary (rowIndex : Int ) {
114- val windowValue = CreateNamedStruct (
115- Seq (Literal (WINDOW_START ),
116- PreciseTimestampConversion (Literal (windowStartTime), LongType , TimestampType ),
117- Literal (WINDOW_END ),
118- PreciseTimestampConversion (
119- Literal (lastTime + windowGap), LongType , TimestampType )))
120- windowResultWithBoundary.append((rowIndex, windowValue))
110+ windowResultWithBoundary.append((rowIndex, (windowStartTime, lastTime + windowGap)))
121111 }
122112
123113 private [this ] def getTimeFromRow (row : InternalRow ) = getTime.apply(row).getLong(0 )
@@ -196,10 +186,13 @@ case class SessionWindowExec(
196186 }
197187 rowIndexWithinPartition += 1
198188
199- // 'Merge' the input row with the session window struct
200- join(current,
201- UnsafeProjection .create(
202- windowResultWithBoundary(windowResultIndex)._2).apply(InternalRow .empty))
189+ val (currentWindowStart, currentWindowEnd) =
190+ windowResultWithBoundary(windowResultIndex)._2
191+ val currentWindow = InternalRow (currentWindowStart, currentWindowEnd)
192+
193+ // 'Merge' the input row with the session window struct. Since session window
194+ // processed as a column, so wrap the struct with another InternalRow.
195+ join(current, InternalRow (currentWindow))
203196 } else {
204197 throw new NoSuchElementException
205198 }
0 commit comments