Skip to content

Commit 65fed3a

Browse files
huanliwang-dbanishshri-db
authored andcommitted
[SPARK-53303][SS][CONNECT] Use the empty state encoder when the initial state is not provided in TWS
### What changes were proposed in this pull request? `agnosticEncoderFor[S]` returns the wrong encoder when no initial state is provided in TWS. We should create an empty state encoder and use that instead. ### Why are the changes needed? Fix the incorrect behavior for TWS without initial state in spark connect ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? N/A: unfortunately, this field is not used so far and it's not easy to provide the test coverage for this ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#52056 from huanliwang-db/huanliwang-db/fix-tws. Authored-by: huanliwang-db <huanli.wang@databricks.com> Signed-off-by: Anish Shrigondekar <anish.shrigondekar@databricks.com>
1 parent 528f3a7 commit 65fed3a

File tree

1 file changed

+20
-2
lines changed

1 file changed

+20
-2
lines changed

sql/connect/common/src/main/scala/org/apache/spark/sql/connect/KeyValueGroupedDataset.scala

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.apache.spark.api.java.function._
2424
import org.apache.spark.connect.proto
2525
import org.apache.spark.sql
2626
import org.apache.spark.sql.{Column, Encoder, TypedColumn}
27+
import org.apache.spark.sql.catalyst.ScalaReflection
2728
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
2829
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{agnosticEncoderFor, ProductEncoder, StructEncoder}
2930
import org.apache.spark.sql.connect.ColumnNodeToProtoConverter.{toExpr, toExprWithTransformation, toTypedExpr}
@@ -658,8 +659,14 @@ private class KeyValueGroupedDatasetImpl[K, V, IK, IV](
658659
initialState: Option[sql.KeyValueGroupedDataset[K, S]] = None,
659660
eventTimeColumnName: String = ""): Dataset[U] = {
660661
val outputEncoder = agnosticEncoderFor[U]
661-
val stateEncoder = agnosticEncoderFor[S]
662-
val inputEncoders: Seq[AgnosticEncoder[_]] = Seq(kEncoder, stateEncoder, ivEncoder)
662+
val initialStateEncoder = if (initialState.isDefined) {
663+
agnosticEncoderFor[S]
664+
} else {
665+
// Cannot use `agnosticEncoderFor[S]` here because it points to incorrect encoder
666+
// when the initial state is not provided. Using an empty state encoder instead.
667+
ScalaReflection.encoderFor[EmptyInitialStateStruct]
668+
}
669+
val inputEncoders: Seq[AgnosticEncoder[_]] = Seq(kEncoder, initialStateEncoder, ivEncoder)
663670

664671
// SparkUserDefinedFunction is creating a udfPacket where the input function are
665672
// being java serialized into bytes; we pass in `statefulProcessor` as function so it can be
@@ -780,3 +787,14 @@ private object KeyValueGroupedDatasetImpl {
780787
case _ => false
781788
}
782789
}
790+
791+
/**
792+
* A marker case class used as a placeholder type for initial state encoders when no actual
793+
* initial state is provided to stateful streaming operations.
794+
*
795+
* In the `transformWithStateHelper` method, when `initialState` is not provided, we cannot use
796+
* `agnosticEncoderFor[S]` for the initial state encoder because it would incorrectly point to the
797+
* other encoders. Instead, we use `EmptyStruct` as a sentinel type to create a proper encoder
798+
* that represents the absence of initial state data.
799+
*/
800+
case class EmptyInitialStateStruct()

0 commit comments

Comments
 (0)