Skip to content

Commit 7c2da35

Browse files
lw-lincmonkey
authored andcommitted
[SPARK-19168][STRUCTURED STREAMING] StateStore should be aborted upon error
## What changes were proposed in this pull request? We should call `StateStore.abort()` when there should be any error before the store is committed. ## How was this patch tested? Manually. Author: Liwei Lin <lwlin7@gmail.com> Closes apache#16547 from lw-lin/append-filter.
1 parent 58ea392 commit 7c2da35

File tree

3 files changed

+10
-2
lines changed

3 files changed

+10
-2
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import org.apache.spark.sql.execution.streaming.state._
3131
import org.apache.spark.sql.execution.SparkPlan
3232
import org.apache.spark.sql.streaming.OutputMode
3333
import org.apache.spark.sql.types.StructType
34+
import org.apache.spark.TaskContext
3435

3536

3637
/** Used to identify the state store for a given operator. */
@@ -150,6 +151,13 @@ case class StateStoreSaveExec(
150151
val numTotalStateRows = longMetric("numTotalStateRows")
151152
val numUpdatedStateRows = longMetric("numUpdatedStateRows")
152153

154+
// Abort the state store in case of error
155+
TaskContext.get().addTaskCompletionListener(_ => {
156+
if (!store.hasCommitted) {
157+
store.abort()
158+
}
159+
})
160+
153161
outputMode match {
154162
// Update and output all rows in the StateStore.
155163
case Some(Complete) =>

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ private[state] class HDFSBackedStateStoreProvider(
203203
/**
204204
* Whether all updates have been committed
205205
*/
206-
override private[state] def hasCommitted: Boolean = {
206+
override private[streaming] def hasCommitted: Boolean = {
207207
state == COMMITTED
208208
}
209209

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ trait StateStore {
8383
/**
8484
* Whether all updates have been committed
8585
*/
86-
private[state] def hasCommitted: Boolean
86+
private[streaming] def hasCommitted: Boolean
8787
}
8888

8989

0 commit comments

Comments
 (0)