@@ -142,7 +142,7 @@ private[continuous] class EpochCoordinator(
142142 private val epochsWaitingToBeCommitted = mutable.HashSet .empty[Long ]
143143
144144 private def resolveCommitsAtEpoch (epoch : Long ) = {
145- val thisEpochCommits = findCommitsForEpoch (epoch)
145+ val thisEpochCommits = findPartitionCommitsForEpoch (epoch)
146146 val nextEpochOffsets =
147147 partitionOffsets.collect { case ((e, _), o) if e == epoch => o }
148148
@@ -163,7 +163,7 @@ private[continuous] class EpochCoordinator(
163163 // Commit subsequent epochs that are waiting to be committed.
164164 var nextEpoch = lastCommittedEpoch + 1
165165 while (epochsWaitingToBeCommitted.contains(nextEpoch)) {
166- val nextEpochCommits = findCommitsForEpoch (nextEpoch)
166+ val nextEpochCommits = findPartitionCommitsForEpoch (nextEpoch)
167167 commitEpoch(nextEpoch, nextEpochCommits)
168168
169169 epochsWaitingToBeCommitted.remove(nextEpoch)
@@ -183,10 +183,16 @@ private[continuous] class EpochCoordinator(
183183 }
184184 }
185185
186- private def findCommitsForEpoch (epoch : Long ): Iterable [WriterCommitMessage ] = {
186+ /**
187+ * Collect per-partition commits for an epoch.
188+ */
189+ private def findPartitionCommitsForEpoch (epoch : Long ): Iterable [WriterCommitMessage ] = {
187190 partitionCommits.collect { case ((e, _), msg) if e == epoch => msg }
188191 }
189192
193+ /**
194+ * Commit epoch to the offset log.
195+ */
190196 private def commitEpoch (epoch : Long , messages : Iterable [WriterCommitMessage ]): Unit = {
191197 logDebug(s " Epoch $epoch has received commits from all partitions " +
192198 s " and is ready to be committed. Committing epoch $epoch. " )
0 commit comments