@@ -277,7 +277,7 @@ class PubsubReceiver(
277277
278278 var buffer : ArrayBuffer [ReceivedMessage ] = createBufferArray()
279279
280- var latestStorePushTime : Long = - 1
280+ var latestAttemptToPushInStoreTime : Long = - 1
281281
282282 lazy val rateLimiter : RateLimiter = RateLimiter .create(getInitialRateLimit.toDouble)
283283
@@ -325,7 +325,7 @@ class PubsubReceiver(
325325 var backoff = INIT_BACKOFF
326326
327327 // To avoid the edge case when buffer is not full and no message pushed to store
328- latestStorePushTime = System .currentTimeMillis()
328+ latestAttemptToPushInStoreTime = System .currentTimeMillis()
329329
330330 while (! isStopped()) {
331331 try {
@@ -393,7 +393,7 @@ class PubsubReceiver(
393393 */
394394 def push (): Unit = {
395395
396- val diff = System .currentTimeMillis() - latestStorePushTime
396+ val diff = System .currentTimeMillis() - latestAttemptToPushInStoreTime
397397 if (buffer.length >= blockSize || (buffer.length < blockSize && diff >= blockIntervalMs)) {
398398
399399 // grouping messages into complete and partial blocks (if any)
@@ -404,18 +404,29 @@ class PubsubReceiver(
404404 // messages in buffer is less than blockSize. So will push partial block
405405 val iterator = if (completeBlocks.nonEmpty) completeBlocks else partialBlock
406406
407- // Creating new buffer
408- buffer = createBufferArray()
409-
410- // Pushing partial block messages back to buffer if complete blocks formed
411- if (completeBlocks.nonEmpty && partialBlock.hasNext) {
412- buffer.appendAll(partialBlock.next())
413- }
407+ // Will push partial block messages back to buffer if complete blocks formed
408+ val partial = if (completeBlocks.nonEmpty && partialBlock.nonEmpty) {
409+ partialBlock.next()
410+ } else null
414411
415412 while (iterator.hasNext) {
416- pushToStoreAndAck(iterator.next().toList)
417- latestStorePushTime = System .currentTimeMillis()
413+ try {
414+ pushToStoreAndAck(iterator.next().toList)
415+ } catch {
416+ case e : SparkException => reportError(
417+ " Failed to write messages into reliable store" , e)
418+ case NonFatal (e) => reportError(
419+ " Failed to write messages in reliable store" , e)
420+ } finally {
421+ latestAttemptToPushInStoreTime = System .currentTimeMillis()
422+ }
418423 }
424+
425+ // clear existing buffer messages
426+ buffer.clear()
427+
428+ // Pushing partial block messages back to buffer if complete blocks formed
429+ if (partial != null ) buffer.appendAll(partial)
419430 }
420431 }
421432
0 commit comments