Skip to content

Commit b4c3c55

Browse files
committed
Fixed bug
1 parent 601d653 commit b4c3c55

File tree

4 files changed

+284
-271
lines changed

4 files changed

+284
-271
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -543,6 +543,15 @@ abstract class StreamExecution(
543543
Option(name).map(_ + "<br/>").getOrElse("") +
544544
s"id = $id<br/>runId = $runId<br/>batch = $batchDescription"
545545
}
546+
547+
private[sql] def withProgressLocked(f: => Unit): Unit = {
548+
awaitProgressLock.lock()
549+
try {
550+
f
551+
} finally {
552+
awaitProgressLock.unlock()
553+
}
554+
}
546555
}
547556

548557
object StreamExecution {

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -282,19 +282,18 @@ class ContinuousExecution(
282282
epoch: Long, reader: ContinuousReader, partitionOffsets: Seq[PartitionOffset]): Unit = {
283283
assert(continuousSources.length == 1, "only one continuous source supported currently")
284284

285-
val globalOffset = reader.mergeOffsets(partitionOffsets.toArray)
286-
val oldOffset = synchronized {
287-
offsetLog.add(epoch, OffsetSeq.fill(globalOffset))
288-
offsetLog.get(epoch - 1)
289-
}
290-
291-
// If offset hasn't changed since last epoch, there's been no new data.
292-
if (oldOffset.contains(OffsetSeq.fill(globalOffset))) {
293-
noNewData = true
294-
}
295-
296285
awaitProgressLock.lock()
297286
try {
287+
val globalOffset = reader.mergeOffsets(partitionOffsets.toArray)
288+
val oldOffset = synchronized {
289+
offsetLog.add(epoch, OffsetSeq.fill(globalOffset))
290+
offsetLog.get(epoch - 1)
291+
}
292+
293+
// If offset hasn't changed since last epoch, there's been no new data.
294+
if (oldOffset.contains(OffsetSeq.fill(globalOffset))) {
295+
noNewData = true
296+
}
298297
awaitProgressLockCondition.signalAll()
299298
} finally {
300299
awaitProgressLock.unlock()
@@ -308,23 +307,22 @@ class ContinuousExecution(
308307
def commit(epoch: Long): Unit = {
309308
assert(continuousSources.length == 1, "only one continuous source supported currently")
310309
assert(offsetLog.get(epoch).isDefined, s"offset for epoch $epoch not reported before commit")
311-
synchronized {
310+
311+
awaitProgressLock.lock()
312+
try {
312313
if (queryExecutionThread.isAlive) {
313314
commitLog.add(epoch)
314315
val offset = offsetLog.get(epoch).get.offsets(0).get
315316
committedOffsets ++= Seq(continuousSources(0) -> offset)
316317
} else {
317318
return
318319
}
319-
}
320320

321-
if (minLogEntriesToMaintain < currentBatchId) {
322-
offsetLog.purge(currentBatchId - minLogEntriesToMaintain)
323-
commitLog.purge(currentBatchId - minLogEntriesToMaintain)
324-
}
321+
if (minLogEntriesToMaintain < currentBatchId) {
322+
offsetLog.purge(currentBatchId - minLogEntriesToMaintain)
323+
commitLog.purge(currentBatchId - minLogEntriesToMaintain)
324+
}
325325

326-
awaitProgressLock.lock()
327-
try {
328326
awaitProgressLockCondition.signalAll()
329327
} finally {
330328
awaitProgressLock.unlock()

0 commit comments

Comments
 (0)