Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add rx support to hystrix-clj. #170

Merged
merged 1 commit into from
Aug 12, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@
[com.netflix.hystrix.strategy.concurrency
HystrixRequestContext]))

(set! *warn-on-reflection* true)

(defmacro ^:private key-fn
"Make a function that creates keys of the given class given one of:

Expand Down Expand Up @@ -603,6 +605,64 @@
(let [^HystrixExecutable instance (apply instantiate definition args)]
(queued-command instance (.queue instance))))

(defn observe
"Asynchronously execute the command or collapser specified by the given normalized definition
with the given arguments. Returns an rx.Observable which can be subscribed to.

Note that this will eagerly begin execution of the command, even if there are no subscribers.
Use observe-later for lazy semantics.

If definition is already a HystrixExecutable and no args are given, observes it and returns
an Observable as described above. NEVER OBSERVE A HystrixExecutable MORE THAN ONCE.

See:
http://netflix.github.io/Hystrix/javadoc/com/netflix/hystrix/HystrixCommand.html#observe()
http://netflix.github.io/Hystrix/javadoc/com/netflix/hystrix/HystrixCollapser.html#observe()
http://netflix.github.io/RxJava/javadoc/rx/Observable.html
"
[definition & args]
(let [^HystrixExecutable instance (apply instantiate definition args)]
(.observe instance)))

(defprotocol ^:private ObserveLater
"A protocol solely to eliminate reflection warnings because .toObservable
can be found on both HystrixCommand and HystrixCollapser, but not in their
common base class HystrixExecutable."
(^:private observe-later* [this])
(^:private observe-later-on* [this scheduler]))

(extend-protocol ObserveLater
HystrixCommand
(observe-later* [this] (.toObservable this))
(observe-later-on* [this scheduler] (.toObservable this scheduler))
HystrixCollapser
(observe-later* [this] (.toObservable this))
(observe-later-on* [this scheduler] (.toObservable this scheduler)))

(defn observe-later
"Same as #'com.netflix.hystrix.core/observe, but command execution does not begin until the
returned Observable is subscribed to.

See:
http://netflix.github.io/Hystrix/javadoc/com/netflix/hystrix/HystrixCommand.html#toObservable())
http://netflix.github.io/RxJava/javadoc/rx/Observable.html
"
[definition & args]
(observe-later* (apply instantiate definition args)))

(defn observe-later-on
"Same as #'com.netflix.hystrix.core/observe-later but an explicit scheduler can be provided
for the callback.

See:
com.netflix.hystrix.core/observe-later
com.netflix.hystrix.core/observe
http://netflix.github.io/Hystrix/javadoc/com/netflix/hystrix/HystrixCommand.html#toObservable(Scheduler)
http://netflix.github.io/RxJava/javadoc/rx/Observable.html
"
[definition scheduler & args]
(observe-later-on* (apply instantiate definition args) scheduler))

;################################################################################
; :command impl

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,55 @@
(is (= "hello-world" (.get qc) @qc))
(is (.isDone qc))))))

(defn ^:private wait-for-observable
[^rx.Observable o]
(rx.observables.BlockingObservable/single o))

(deftest test-observe
(let [base-def {:type :command
:group-key :my-group
:command-key :my-command
:run-fn + }]
(testing "observes a HystrixCommand"
(is (= 99
(-> (instantiate (normalize base-def) 11 88)
observe
wait-for-observable))))
(testing "throws if there are trailing args"
(is (thrown? IllegalArgumentException
(observe (instantiate (normalize base-def)) 10 23))))
(testing "instantiates and observes a command"
(let [o (observe (normalize base-def) 75 19 23)]
(is (instance? rx.Observable o))
(is (= (+ 75 19 23)
(wait-for-observable o)))))))

(deftest test-observe-later
(let [base-def {:type :command
:group-key :my-group
:command-key :my-command
:run-fn + }]
(testing "observes a HystrixCommand"
(is (= 99
(-> (instantiate (normalize base-def) 11 88)
observe-later
wait-for-observable))))
(testing "throws if there are trailing args"
(is (thrown? IllegalArgumentException
(observe-later (instantiate (normalize base-def)) 10 23))))
(testing "instantiates and observes a command"
(let [o (observe-later (normalize base-def) 75 19 23)]
(is (instance? rx.Observable o))
(is (= (+ 75 19 23)
(wait-for-observable o)))))
(testing "observes command with a Scheduler"
(let [o (observe-later-on (normalize base-def)
(rx.concurrency.Schedulers/newThread)
75 19 23)]
(is (instance? rx.Observable o))
(is (= (+ 75 19 23)
(wait-for-observable o)))))))

(deftest test-this-command-binding
(let [base-def {:type :command
:group-key :test-this-command-binding-group
Expand Down Expand Up @@ -264,8 +313,12 @@
(testing "defines a functioning command"
(is (= 99 (my-fn-command 88 11)))
(is (= 100 (execute #'my-fn-command 89 11)))
(is (= 101 (deref (queue #'my-fn-command 89 12)))))))

(is (= 101 (deref (queue #'my-fn-command 89 12))))
(is (= 103 (wait-for-observable (observe #'my-fn-command 90 13))))
(is (= 105 (wait-for-observable (observe-later #'my-fn-command 91 14))))
(is (= 107 (wait-for-observable (observe-later-on #'my-fn-command
(rx.concurrency.Schedulers/newThread)
92 15)))))))

(defcollapser my-collapser
"a doc string"
Expand Down