diff --git a/src/clj/backtype/storm/daemon/executor.clj b/src/clj/backtype/storm/daemon/executor.clj index 2e176bcd2..33924303a 100644 --- a/src/clj/backtype/storm/daemon/executor.clj +++ b/src/clj/backtype/storm/daemon/executor.clj @@ -554,7 +554,8 @@ )) 0)) :kill-fn (:report-error-and-die executor-data) - :factory? true)])) + :factory? true + :thread-name component-id)])) (defn- tuple-time-delta! [^TupleImpl tuple] (let [ms (.getProcessSampleStartTime tuple)] @@ -715,7 +716,8 @@ (disruptor/consume-batch-when-available receive-queue event-handler) 0))) :kill-fn (:report-error-and-die executor-data) - :factory? true)])) + :factory? true + :thread-name component-id)])) (defmethod close-component :spout [executor-data spout] (.close spout)) diff --git a/src/clj/backtype/storm/disruptor.clj b/src/clj/backtype/storm/disruptor.clj index c69ae745f..5dc175716 100644 --- a/src/clj/backtype/storm/disruptor.clj +++ b/src/clj/backtype/storm/disruptor.clj @@ -67,12 +67,14 @@ (defn halt-with-interrupt! [^DisruptorQueue queue] (.haltWithInterrupt queue)) -(defnk consume-loop* [^DisruptorQueue queue handler :kill-fn (fn [error] (halt-process! 1 "Async loop died!"))] +(defnk consume-loop* [^DisruptorQueue queue handler :kill-fn (fn [error] (halt-process! 1 "Async loop died!")) + :thread-name nil] (let [ret (async-loop (fn [] (consume-batch-when-available queue handler) 0 ) :kill-fn kill-fn + :thread-name thread-name )] (consumer-started! queue) ret diff --git a/src/clj/backtype/storm/util.clj b/src/clj/backtype/storm/util.clj index af1113732..f2a73e927 100644 --- a/src/clj/backtype/storm/util.clj +++ b/src/clj/backtype/storm/util.clj @@ -368,7 +368,8 @@ :kill-fn (fn [error] (halt-process! 1 "Async loop died!")) :priority Thread/NORM_PRIORITY :factory? false - :start true] + :start true + :thread-name nil] (let [thread (Thread. (fn [] (try-cause @@ -389,6 +390,8 @@ ))] (.setDaemon thread daemon) (.setPriority thread priority) + (when-not (nil? thread-name) + (.setName thread (str (.getName thread) "-" thread-name))) (when start (.start thread)) ;; should return object that supports stop, interrupt, join, and waiting? diff --git a/test/clj/backtype/storm/util_test.clj b/test/clj/backtype/storm/util_test.clj new file mode 100644 index 000000000..8377bb906 --- /dev/null +++ b/test/clj/backtype/storm/util_test.clj @@ -0,0 +1,24 @@ +(ns backtype.storm.util-test + (:import [java.util.regex Pattern]) + (:use [clojure test]) + (:use [backtype.storm util])) + +(deftest async-loop-test + (testing "thread name provided" + (let [thread (async-loop + (fn [] + (is (= true (.startsWith (.getName (Thread/currentThread)) "Thread-"))) + (is (= true (.endsWith (.getName (Thread/currentThread)) "-mythreadname"))) + 1) + :thread-name "mythreadname")] + (sleep-secs 2) + (.interrupt thread) + (.join thread))) + (testing "thread name not provided" + (let [thread (async-loop + (fn [] + (is (= true (Pattern/matches "Thread-\\d+" (.getName (Thread/currentThread))))) + 1))] + (sleep-secs 2) + (.interrupt thread) + (.join thread)))) \ No newline at end of file