Skip to content

Commit ef05009

Browse files
committed
Fixed synchronization bug
1 parent 12177b1 commit ef05009

File tree

1 file changed

+8
-2
lines changed

1 file changed

+8
-2
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,8 +129,7 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
129129
verify(state == UPDATING, "Cannot commit after already committed or aborted")
130130

131131
try {
132-
finalizeDeltaFile(compressedStream)
133-
loadedMaps.put(newVersion, mapToUpdate)
132+
commitUpdates(newVersion, mapToUpdate, compressedStream)
134133
state = COMMITTED
135134
logInfo(s"Committed version $newVersion for $this to file $finalDeltaFile")
136135
newVersion
@@ -250,6 +249,13 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
250249

251250
private case class StoreFile(version: Long, path: Path, isSnapshot: Boolean)
252251

252+
private def commitUpdates(newVersion: Long, map: MapType, output: DataOutputStream): Unit = {
253+
synchronized {
254+
finalizeDeltaFile(output)
255+
loadedMaps.put(newVersion, map)
256+
}
257+
}
258+
253259
/**
254260
* Get iterator of all the data of the latest version of the store.
255261
* Note that this will look up the files to determined the latest known version.

0 commit comments

Comments
 (0)