Skip to content

Commit

Permalink
SAMZA-1112; BrokerProxy does not log fatal errors
Browse files Browse the repository at this point in the history
Add an UncaughtExceptionHandler to the broker proxy thread so
failures there get logged.

Author: Tommy Becker <tobecker@tivo.com>

Reviewers: Jagadish <jagadish@apache.org>

Closes apache#80 from twbecker/SAMZA-1112
  • Loading branch information
Tommy Becker authored and jagadish-northguard committed Mar 11, 2017
1 parent e3f8587 commit 5a88b9e
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.samza.job.local

import java.lang.Thread.UncaughtExceptionHandler

import org.apache.samza.util.Logging
import org.apache.samza.job.StreamJob
import org.apache.samza.job.ApplicationStatus
Expand All @@ -42,7 +44,7 @@ class ThreadJob(runnable: Runnable) extends StreamJob with Logging {
runnable.run
jobStatus = Some(SuccessfulFinish)
} catch {
case e: Exception => {
case e: Throwable => {
error("Failing job with exception.", e)
jobStatus = Some(UnsuccessfulFinish)
throw e
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,20 @@

package org.apache.samza.system.kafka

import java.lang.Thread.UncaughtExceptionHandler
import java.nio.channels.ClosedByInterruptException
import java.util.Map.Entry
import java.util.concurrent.{ConcurrentHashMap, CountDownLatch}

import kafka.api._
import kafka.common.{NotLeaderForPartitionException, UnknownTopicOrPartitionException, ErrorMapping, TopicAndPartition}
import kafka.common.{ErrorMapping, NotLeaderForPartitionException, TopicAndPartition, UnknownTopicOrPartitionException}
import kafka.consumer.ConsumerConfig
import kafka.message.MessageSet
import org.apache.samza.SamzaException
import org.apache.samza.util.ExponentialSleepStrategy
import org.apache.samza.util.Logging
import org.apache.samza.util.ThreadNamePrefix.SAMZA_THREAD_NAME_PREFIX

import scala.collection.JavaConversions._
import scala.collection.concurrent
import scala.collection.mutable
Expand Down Expand Up @@ -198,8 +201,8 @@ class BrokerProxy(
}

/**
* Releases ownership for a single TopicAndPartition. The
* KafkaSystemConsumer will try and find a new broker for the
* Releases ownership for a single TopicAndPartition. The
* KafkaSystemConsumer will try and find a new broker for the
* TopicAndPartition.
*/
def abdicate(tp: TopicAndPartition) = removeTopicPartition(tp) match {
Expand All @@ -209,8 +212,8 @@ class BrokerProxy(
}

/**
* Releases all TopicAndPartition ownership for this BrokerProxy thread. The
* KafkaSystemConsumer will try and find a new broker for the
* Releases all TopicAndPartition ownership for this BrokerProxy thread. The
* KafkaSystemConsumer will try and find a new broker for the
* TopicAndPartition.
*/
def abdicateAll {
Expand Down Expand Up @@ -295,6 +298,9 @@ class BrokerProxy(
info("Starting " + toString)
thread.setDaemon(true)
thread.setName(SAMZA_THREAD_NAME_PREFIX + BrokerProxy.BROKER_PROXY_THREAD_NAME_PREFIX + thread.getName)
thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler {
override def uncaughtException(t: Thread, e: Throwable) = error("Uncaught exception in broker proxy:", e)
})
thread.start
} else {
debug("Tried to start an already started broker proxy (%s). Ignoring." format toString)
Expand Down Expand Up @@ -330,4 +336,4 @@ class BrokerProxy(
}
}
}
}
}

0 comments on commit 5a88b9e

Please sign in to comment.