Skip to content

Commit 434d74e

Browse files
Efim Poberezkintdas
authored andcommitted
[SPARK-23503][SS] Enforce sequencing of committed epochs for Continuous Execution
## What changes were proposed in this pull request? Made changes to EpochCoordinator so that it enforces a commit order. In case a message for epoch n is lost and epoch (n + 1) is ready for commit before epoch n is, epoch (n + 1) will wait for epoch n to be committed first. ## How was this patch tested? Existing tests in ContinuousSuite and EpochCoordinatorSuite. Author: Efim Poberezkin <efim@poberezkin.ru> Closes #20936 from efimpoberezkin/pr/sequence-commited-epochs.
1 parent 710e4e8 commit 434d74e

File tree

2 files changed

+58
-17
lines changed

2 files changed

+58
-17
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala

Lines changed: 55 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -137,30 +137,71 @@ private[continuous] class EpochCoordinator(
137137
private val partitionOffsets =
138138
mutable.Map[(Long, Int), PartitionOffset]()
139139

140+
private var lastCommittedEpoch = startEpoch - 1
141+
// Remembers epochs that have to wait for previous epochs to be committed first.
142+
private val epochsWaitingToBeCommitted = mutable.HashSet.empty[Long]
143+
140144
private def resolveCommitsAtEpoch(epoch: Long) = {
141-
val thisEpochCommits =
142-
partitionCommits.collect { case ((e, _), msg) if e == epoch => msg }
145+
val thisEpochCommits = findPartitionCommitsForEpoch(epoch)
143146
val nextEpochOffsets =
144147
partitionOffsets.collect { case ((e, _), o) if e == epoch => o }
145148

146149
if (thisEpochCommits.size == numWriterPartitions &&
147150
nextEpochOffsets.size == numReaderPartitions) {
148-
logDebug(s"Epoch $epoch has received commits from all partitions. Committing globally.")
149-
// Sequencing is important here. We must commit to the writer before recording the commit
150-
// in the query, or we will end up dropping the commit if we restart in the middle.
151-
writer.commit(epoch, thisEpochCommits.toArray)
152-
query.commit(epoch)
153-
154-
// Cleanup state from before this epoch, now that we know all partitions are forever past it.
155-
for (k <- partitionCommits.keys.filter { case (e, _) => e < epoch }) {
156-
partitionCommits.remove(k)
157-
}
158-
for (k <- partitionOffsets.keys.filter { case (e, _) => e < epoch }) {
159-
partitionOffsets.remove(k)
151+
152+
// Check that last committed epoch is the previous one for sequencing of committed epochs.
153+
// If not, add the epoch being currently processed to epochs waiting to be committed,
154+
// otherwise commit it.
155+
if (lastCommittedEpoch != epoch - 1) {
156+
logDebug(s"Epoch $epoch has received commits from all partitions " +
157+
s"and is waiting for epoch ${epoch - 1} to be committed first.")
158+
epochsWaitingToBeCommitted.add(epoch)
159+
} else {
160+
commitEpoch(epoch, thisEpochCommits)
161+
lastCommittedEpoch = epoch
162+
163+
// Commit subsequent epochs that are waiting to be committed.
164+
var nextEpoch = lastCommittedEpoch + 1
165+
while (epochsWaitingToBeCommitted.contains(nextEpoch)) {
166+
val nextEpochCommits = findPartitionCommitsForEpoch(nextEpoch)
167+
commitEpoch(nextEpoch, nextEpochCommits)
168+
169+
epochsWaitingToBeCommitted.remove(nextEpoch)
170+
lastCommittedEpoch = nextEpoch
171+
nextEpoch += 1
172+
}
173+
174+
// Cleanup state from before last committed epoch,
175+
// now that we know all partitions are forever past it.
176+
for (k <- partitionCommits.keys.filter { case (e, _) => e < lastCommittedEpoch }) {
177+
partitionCommits.remove(k)
178+
}
179+
for (k <- partitionOffsets.keys.filter { case (e, _) => e < lastCommittedEpoch }) {
180+
partitionOffsets.remove(k)
181+
}
160182
}
161183
}
162184
}
163185

186+
/**
187+
* Collect per-partition commits for an epoch.
188+
*/
189+
private def findPartitionCommitsForEpoch(epoch: Long): Iterable[WriterCommitMessage] = {
190+
partitionCommits.collect { case ((e, _), msg) if e == epoch => msg }
191+
}
192+
193+
/**
194+
* Commit epoch to the offset log.
195+
*/
196+
private def commitEpoch(epoch: Long, messages: Iterable[WriterCommitMessage]): Unit = {
197+
logDebug(s"Epoch $epoch has received commits from all partitions " +
198+
s"and is ready to be committed. Committing epoch $epoch.")
199+
// Sequencing is important here. We must commit to the writer before recording the commit
200+
// in the query, or we will end up dropping the commit if we restart in the middle.
201+
writer.commit(epoch, messages.toArray)
202+
query.commit(epoch)
203+
}
204+
164205
override def receive: PartialFunction[Any, Unit] = {
165206
// If we just drop these messages, we won't do any writes to the query. The lame duck tasks
166207
// won't shed errors or anything.

sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ class EpochCoordinatorSuite
120120
verifyCommitsInOrderOf(List(1, 2))
121121
}
122122

123-
ignore("consequent epochs, a message for epoch k arrives after messages for epoch (k + 1)") {
123+
test("consequent epochs, a message for epoch k arrives after messages for epoch (k + 1)") {
124124
setWriterPartitions(2)
125125
setReaderPartitions(2)
126126

@@ -141,7 +141,7 @@ class EpochCoordinatorSuite
141141
verifyCommitsInOrderOf(List(1, 2))
142142
}
143143

144-
ignore("several epochs, messages arrive in order 1 -> 3 -> 4 -> 2") {
144+
test("several epochs, messages arrive in order 1 -> 3 -> 4 -> 2") {
145145
setWriterPartitions(1)
146146
setReaderPartitions(1)
147147

@@ -162,7 +162,7 @@ class EpochCoordinatorSuite
162162
verifyCommitsInOrderOf(List(1, 2, 3, 4))
163163
}
164164

165-
ignore("several epochs, messages arrive in order 1 -> 3 -> 5 -> 4 -> 2") {
165+
test("several epochs, messages arrive in order 1 -> 3 -> 5 -> 4 -> 2") {
166166
setWriterPartitions(1)
167167
setReaderPartitions(1)
168168

0 commit comments

Comments
 (0)