@@ -142,31 +142,29 @@ private[continuous] class EpochCoordinator(
142142 private val epochsWaitingToBeCommitted = mutable.HashSet .empty[Long ]
143143
144144 private def resolveCommitsAtEpoch (epoch : Long ) = {
145- val thisEpochCommits =
146- partitionCommits.collect { case ((e, _), msg) if e == epoch => msg }
145+ val thisEpochCommits = findCommitsForEpoch(epoch)
147146 val nextEpochOffsets =
148147 partitionOffsets.collect { case ((e, _), o) if e == epoch => o }
149148
150149 if (thisEpochCommits.size == numWriterPartitions &&
151150 nextEpochOffsets.size == numReaderPartitions) {
152151
153152 // Check that last committed epoch is the previous one for sequencing of committed epochs.
154- if (lastCommittedEpoch == epoch - 1 ) {
155- logDebug(s " Epoch $epoch has received commits from all partitions. Committing globally. " )
156- // Sequencing is important here. We must commit to the writer before recording the commit
157- // in the query, or we will end up dropping the commit if we restart in the middle.
158- writer.commit(epoch, thisEpochCommits.toArray)
159- query.commit(epoch)
153+ // If not, add the epoch being currently processed to epochs waiting to be committed,
154+ // otherwise commit it.
155+ if (lastCommittedEpoch != epoch - 1 ) {
156+ logDebug(s " Epoch $epoch has received commits from all partitions " +
157+ s " and is waiting for epoch ${epoch - 1 } to be committed first. " )
158+ epochsWaitingToBeCommitted.add(epoch)
159+ } else {
160+ commitEpoch(epoch, thisEpochCommits)
160161 lastCommittedEpoch = epoch
161162
162163 // Commit subsequent epochs that are waiting to be committed.
163164 var nextEpoch = lastCommittedEpoch + 1
164165 while (epochsWaitingToBeCommitted.contains(nextEpoch)) {
165- val nextEpochCommits =
166- partitionCommits.collect { case ((e, _), msg) if e == nextEpoch => msg }
167- logDebug(s " Committing epoch $nextEpoch. " )
168- writer.commit(nextEpoch, nextEpochCommits.toArray)
169- query.commit(nextEpoch)
166+ val nextEpochCommits = findCommitsForEpoch(nextEpoch)
167+ commitEpoch(nextEpoch, nextEpochCommits)
170168
171169 epochsWaitingToBeCommitted.remove(nextEpoch)
172170 lastCommittedEpoch = nextEpoch
@@ -181,14 +179,23 @@ private[continuous] class EpochCoordinator(
181179 for (k <- partitionOffsets.keys.filter { case (e, _) => e < lastCommittedEpoch }) {
182180 partitionOffsets.remove(k)
183181 }
184- } else {
185- logDebug(s " Epoch $epoch has received commits from all partitions " +
186- s " and is waiting for epoch ${epoch - 1 } to be committed first. " )
187- epochsWaitingToBeCommitted.add(epoch)
188182 }
189183 }
190184 }
191185
186+ private def findCommitsForEpoch (epoch : Long ): Iterable [WriterCommitMessage ] = {
187+ partitionCommits.collect { case ((e, _), msg) if e == epoch => msg }
188+ }
189+
190+ private def commitEpoch (epoch : Long , messages : Iterable [WriterCommitMessage ]): Unit = {
191+ logDebug(s " Epoch $epoch has received commits from all partitions " +
192+ s " and is ready to be committed. Committing epoch $epoch. " )
193+ // Sequencing is important here. We must commit to the writer before recording the commit
194+ // in the query, or we will end up dropping the commit if we restart in the middle.
195+ writer.commit(epoch, messages.toArray)
196+ query.commit(epoch)
197+ }
198+
192199 override def receive : PartialFunction [Any , Unit ] = {
193200 // If we just drop these messages, we won't do any writes to the query. The lame duck tasks
194201 // won't shed errors or anything.
0 commit comments