From 56f7ba4a802e727dc915d34bc3b2e420f011ff9d Mon Sep 17 00:00:00 2001 From: Dimitrios Piliouras Date: Fri, 4 Aug 2023 21:10:50 +0100 Subject: [PATCH 1/8] introduce http2 --- .gitignore | 3 +- docker-compose.yml | 17 + project.clj | 7 +- src/clojure/langohr/http2.clj | 645 ++++++++++++++++++++++++++++ test/langohr/test/http_api_test.clj | 18 +- 5 files changed, 681 insertions(+), 9 deletions(-) create mode 100644 docker-compose.yml create mode 100644 src/clojure/langohr/http2.clj diff --git a/.gitignore b/.gitignore index 129b795..faca364 100644 --- a/.gitignore +++ b/.gitignore @@ -13,4 +13,5 @@ checkouts/* .nrepl-* .clj-kondo/* -.lsp/* \ No newline at end of file +.lsp/* +.calva/output-window/output.calva-repl diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..6081887 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,17 @@ +version: "3.8" +services: + rabbitmq: + image: rabbitmq:3-management-alpine + container_name: 'rabbitmq' + ports: + - 5672:5672 + - 15672:15672 + volumes: + - ~/.docker-conf/rabbitmq/data/:/var/lib/rabbitmq/ + - ~/.docker-conf/rabbitmq/log/:/var/log/rabbitmq + networks: + - rabbitmq_go_net + +networks: + rabbitmq_go_net: + driver: bridge \ No newline at end of file diff --git a/project.clj b/project.clj index 763146f..4cdf35c 100644 --- a/project.clj +++ b/project.clj @@ -6,7 +6,8 @@ [com.rabbitmq/amqp-client "5.18.0"] [clojurewerkz/support "1.1.0" :exclusions [com.google.guava/guava]] [clj-http "3.12.3"] - [cheshire "5.10.1"]] + [hato "0.9.0"] + [cheshire "5.11.0"]] :profiles {:1.10 {:dependencies [[org.clojure/clojure "1.10.2"]]} :1.9 {:dependencies [[org.clojure/clojure "1.9.0"]]} :master {:dependencies [[org.clojure/clojure "1.12.0-master-SNAPSHOT"]]} @@ -18,7 +19,7 @@ :source-uri "https://github.com/michaelklishin/langohr/blob/v{version}/{filepath}#L{line}"}}} :source-paths ["src/clojure"] :java-source-paths ["src/java"] - :javac-options ["-target" "1.8" "-source" "1.8"] + :javac-options ["-target" "11" "-source" "1.8"] :url "https://clojurerabbitmq.info" :repositories {"sonatype" {:url "https://oss.sonatype.org/content/repositories/releases" :snapshots false @@ -42,7 +43,7 @@ :time-consuming :time-consuming :performance :performance :tls :tls - :ci (fn [m] (not (:tls m)))} + :ci (complement :tls)} :mailing-list {:name "clojure-rabbitmq" :archive "https://groups.google.com/group/clojure-rabbitmq" :post "clojure-rabbitmq@googlegroups.com"}) diff --git a/src/clojure/langohr/http2.clj b/src/clojure/langohr/http2.clj new file mode 100644 index 0000000..68b8dff --- /dev/null +++ b/src/clojure/langohr/http2.clj @@ -0,0 +1,645 @@ +(ns langohr.http2 + (:require [cheshire.core :as json] + [hato.client :as hato.client] + [clojure.string :as str]) + (:import [java.util.concurrent Executors ThreadFactory] + [java.util.concurrent.atomic AtomicLong] + [java.net URLEncoder])) + +(defonce DEFAULT-CLIENT + (delay + (hato.client/build-http-client + {:connect-timeout 5000 + :redirect-policy :never + :version :http-2 + :executor (try + (eval ;; test Virtual thread availability java 19+ + `(-> (Thread/ofVirtual) + (.name "langohr.http-" 0) + (.factory) + (java.util.concurrent.Executors/newThreadPerTaskExecutor))) + (catch Exception _ + ;; fallback to standard utilities + (let [counter (AtomicLong. 0)] + (Executors/newCachedThreadPool + (reify ThreadFactory + (newThread [_ runnable] + (doto (Thread. runnable) + (.setName (str "langohr.http-" (.getAndIncrement counter))))))))))}))) + + +(defonce GLOBAL-SERVER + (atom + {:endpoint "http://localhost:15672" + :username "guest" + :password "guest"})) + +(defn- set-server! + [host-opts] + (swap! GLOBAL-SERVER merge host-opts)) + +(defmacro with-tmp-global-host + [host-opts & body] + `(let [[previous# current#] (swap-vals! GLOBAL-SERVER merge ~host-opts)] + (try ~@body + (finally + (reset! GLOBAL-SERVER previous#))))) + +(defonce default-http-opts + {:accept :json + :content-type "application/json" + :throw-exceptions? false}) + +(defn- basic-auth* [user password] {:basic-auth {:user user :pass password}}) +(defn- with-client* [c] (or c @DEFAULT-CLIENT)) + +(defn GET + ([uri] + (GET uri {})) + ([uri options] + (io! + (->> (update options :http-client with-client*) + (merge default-http-opts) + (hato.client/get uri))))) + +(defn HEAD + ([uri] + (HEAD uri {})) + ([uri options] + (io! + (->> (update options :http-client with-client*) + (merge default-http-opts) + (hato.client/head uri))))) + +(defn POST + ([uri] + (POST uri {})) + ([uri options] + (io! + (->> (-> options + (update :body json/generate-string) + (update :http-client with-client*)) + (merge default-http-opts) + (hato.client/post uri)) + true))) + +(defn PUT + ([uri] + (PUT uri {})) + ([uri options] + (io! + (->> (-> options + (update :body json/generate-string) + (update :http-client with-client*)) + (merge default-http-opts) + (hato.client/put uri)) + true))) + +(defn DELETE + ([uri] + (DELETE uri {})) + ([uri options] + (io! + (->> (-> options + (update :body json/generate-string) + (update :http-client with-client*)) + (merge default-http-opts) + (hato.client/delete uri)) + true))) + +(defn- not-found? + [status] + (= status 404)) + +(defn- normalize-protocol + [proto] + (if (= proto "amqp/ssl") + "amqps" + (str/lower-case proto))) + +(defn- maybe-parse-json + "Try to parse json response. If the content-type is not json, just return the body (string)." + [{body :body {content-type "content-type"} :headers}] + (when (some-> body seq) + (if (re-find #"(?i)json" content-type) + (json/parse-string body true) + body))) + +(defn- get-json* + ([uri] + (get-json* uri {})) + ([uri options] + (maybe-parse-json (GET uri options)))) + +(defn- url-encode + ^String [^String fragment] + (URLEncoder/encode fragment)) + +(defn- optional-url-fragment + [fragment] + (some->> fragment url-encode (str \/))) + +;; ===== +;; API +;; ===== + +(defn connect! + "Mutates the global server details. + Allows for passing less arguments (often 0) + in the functions below, but is generally not recommended. + For full visibility, prefer the arities taking a + as the first arg, and as the last arg. + This way you never have to worry about shared mutation." + [endpoint username password] + (set-server! + {:endpoint endpoint + :username username + :password password})) + +(defn get-overview + ([] + (get-overview @GLOBAL-SERVER)) + ([server] + (get-overview server {})) + ([{:keys [endpoint username password]} opts] + (-> (str endpoint "/api/overview") + (get-json* (merge opts (basic-auth* username password)))))) + +(defn list-enabled-protocols + ([] + (list-enabled-protocols @GLOBAL-SERVER)) + ([server] + (list-enabled-protocols server {})) + ([server opts] + (->> (get-overview server opts) + :listeners + (into #{} (map :protocol))))) + +(defn list-nodes + ([] + (list-nodes @GLOBAL-SERVER)) + ([server] + (list-nodes server {})) + ([{:keys [endpoint username password]} opts] + (-> (str endpoint "/api/nodes") + (get-json* (merge opts (basic-auth* username password)))))) + +(defn protocol-ports + ([] + (protocol-ports @GLOBAL-SERVER)) + ([server] + (protocol-ports server {})) + ([server opts] + (->> (get-overview server opts) + :listeners + (into {} (map (juxt (comp normalize-protocol :protocol) :port)))))) + +(defn get-node + ([node] + (get-node @GLOBAL-SERVER node)) + ([server node] + (get-node server node {})) + ([{:keys [endpoint username password]} node opts] + (-> (str endpoint "/api/nodes/" node) + (get-json* (merge opts (basic-auth* username password)))))) + +(defn list-extensions + ([] + (list-extensions @GLOBAL-SERVER)) + ([server] + (list-extensions server {})) + ([{:keys [endpoint username password]} opts] + (-> (str endpoint "/api/extensions") + (get-json* (merge opts (basic-auth* username password)))))) + +(defn list-definitions + ([] + (list-definitions @GLOBAL-SERVER)) + ([server] + (list-definitions server {})) + ([{:keys [endpoint username password]} opts] + (-> (str endpoint "/api/definitions") + (get-json* (merge opts (basic-auth* username password)))))) + +(defn list-connections + ([] + (list-connections @GLOBAL-SERVER)) + ([server] + (list-connections server {})) + ([{:keys [endpoint username password]} opts] + (-> (str endpoint "/api/connections") + (get-json* (merge opts (basic-auth* username password)))))) + +(defn list-connections-from + ([user] + (list-connections-from @GLOBAL-SERVER user)) + ([server user] + (list-connections-from server user {})) + ([server user opts] + (->> (list-connections server opts) + (filter (comp (partial = user) :user))))) + +(defn get-connection + ([id] + (get-connection @GLOBAL-SERVER id)) + ([server id] + (get-connection server id {})) + ([{:keys [endpoint username password]} id opts] + (-> (str endpoint "/api/connections/" id) + (get-json* (merge opts (basic-auth* username password)))))) + +(defn close-connection + ([^String id] + (close-connection @GLOBAL-SERVER id)) + ([server id] + (close-connection server id {})) + ([{:keys [endpoint username password]} id opts] + (-> (str endpoint "/api/connections/" id) + (DELETE (merge opts (basic-auth* username password)))))) + +(defn close-all-connections + ([] + (close-all-connections @GLOBAL-SERVER)) + ([server] + (->> (list-connections server) + (map :name) + (run! (partial close-connection server))))) + +(defn close-connections-from + ([user] + (close-connections-from @GLOBAL-SERVER user)) + ([server user] + (close-connections-from server user {})) + ([server user opts] + (->> (list-connections-from server user opts) + (map :name) + (run! #(close-connection server % opts))))) + +(defn list-channels + ([] + (list-channels @GLOBAL-SERVER)) + ([server] + (list-channels server {})) + ([{:keys [endpoint username password]} opts] + (-> (str endpoint "/api/channels") + (get-json* (merge opts (basic-auth* username password)))))) + +(defn list-exchanges + ([] + (list-exchanges nil)) + ([vhost] + (list-exchanges @GLOBAL-SERVER vhost)) + ([server vhost] + (list-exchanges (or server @GLOBAL-SERVER) vhost {})) + ([{:keys [endpoint username password]} vhost opts] + (-> (str endpoint "/api/exchanges" (optional-url-fragment vhost)) + (get-json* (merge opts (basic-auth* username password)))))) + +(defn get-exchange + ([vhost exchange] + (get-exchange @GLOBAL-SERVER vhost exchange)) + ([server vhost exchange] + (get-exchange server vhost exchange {})) + ([{:keys [endpoint username password]} vhost exchange opts] + (-> (str endpoint "/api/exchanges/" (url-encode vhost) \/ (url-encode exchange)) + (get-json* (merge opts (basic-auth* username password)))))) + +(defn declare-exchange + ([vhost exchange properties] + (declare-exchange @GLOBAL-SERVER vhost exchange properties)) + ([server vhost exchange properties] + (declare-exchange server vhost exchange properties {})) + ([{:keys [endpoint username password]} ^String vhost ^String exchange properties opts] + (-> (str endpoint "/api/exchanges/" (url-encode vhost) \/ (url-encode exchange)) + (PUT (-> opts + (assoc :body properties) + (merge (basic-auth* username password))))))) + +(defn delete-exchange + ([vhost exchange] + (delete-exchange @GLOBAL-SERVER vhost exchange)) + ([host vhost exchange] + (delete-exchange host vhost exchange {})) + ([{:keys [endpoint username password]} vhost exchange opts] + (-> (str endpoint "/api/exchanges/" (url-encode vhost) \/ (url-encode exchange)) + (DELETE (merge opts (basic-auth* username password)))))) + +(defn list-bindings-for-which-exchange-is-the-source + ([vhost exchange] + (list-bindings-for-which-exchange-is-the-source @GLOBAL-SERVER vhost exchange)) + ([server vhost exchange] + (list-bindings-for-which-exchange-is-the-source server vhost exchange {})) + ([{:keys [endpoint username password]} vhost exchange opts] + (-> "%s/api/exchanges/%s/%s/bindings/source" + (format endpoint (url-encode vhost) (url-encode exchange)) + (get-json* (merge opts (basic-auth* username password)))))) + +(defn list-bindings-for-which-exchange-is-the-destination + ([vhost exchange] + (list-bindings-for-which-exchange-is-the-destination @GLOBAL-SERVER vhost exchange)) + ([server vhost exchange] + (list-bindings-for-which-exchange-is-the-destination server vhost exchange {})) + ([{:keys [endpoint username password]} vhost exchange opts] + (-> "%s/api/exchanges/%s/%s/bindings/destination" + (format endpoint (url-encode vhost) (url-encode exchange)) + (get-json* (merge opts (basic-auth* username password)))))) + +(defn publish + [vhost exchange] + ;; FIXME: ??? + ) + +(defn list-vhosts + ([] + (list-vhosts @GLOBAL-SERVER)) + ([server] + (list-vhosts server {})) + ([{:keys [endpoint username password]} opts] + (-> (str endpoint "/api/vhosts") + (get-json* (merge opts (basic-auth* username password)))))) + +(defn vhost-exists? + ([^String vhost] + (vhost-exists? @GLOBAL-SERVER vhost)) + ([server vhost] + (vhost-exists? server vhost {})) + ([{:keys [endpoint username password]} vhost opts] + (-> (str endpoint "/api/vhosts/" (url-encode vhost)) + (HEAD (merge opts (basic-auth* username password))) + :status + not-found? + false?))) + +(defn get-vhost + ([vhost] + (get-vhost @GLOBAL-SERVER vhost)) + ([server vhost] + (get-vhost server vhost {})) + ([{:keys [endpoint username password]} vhost opts] + (-> (str endpoint "/api/vhosts/" (url-encode vhost)) + (get-json* (merge opts (basic-auth* username password)))))) + +(defn add-vhost + ([vhost] + (add-vhost @GLOBAL-SERVER vhost)) + ([server vhost] + (add-vhost server vhost {})) + ([{:keys [endpoint username password]} vhost opts] + (-> (str endpoint "/api/vhosts/" (url-encode vhost)) + (PUT (-> opts + (assoc-in [:body :name] vhost) + (merge (basic-auth* username password))))))) + +(defn delete-vhost + ([vhost] + (delete-vhost @GLOBAL-SERVER vhost)) + ([server vhost] + (delete-vhost server vhost {})) + ([{:keys [endpoint username password]} vhost opts] + (-> (str endpoint "/api/vhosts/" (url-encode vhost)) + (DELETE (merge opts (basic-auth* username password)))))) + +(defn list-permissions + ([] + (list-permissions @GLOBAL-SERVER)) + ([server] + (list-permissions server nil)) + ([server vhost] + (list-permissions server vhost {})) + ([server vhost opts] + (let [{:keys [endpoint username password]} (or server @GLOBAL-SERVER) + url (if (nil? vhost) + (str endpoint "/api/permissions") + (format "%s/api/vhosts/%s/permissions" endpoint (url-encode vhost)))] + (get-json* url (merge opts (basic-auth* username password)))))) + +(defn get-permissions + ([vhost user] + (get-permissions @GLOBAL-SERVER vhost user)) + ([server vhost user] + (get-permissions server vhost user {})) + ([{:keys [endpoint username password]} vhost user opts] + (-> (str endpoint "/api/permissions/" (url-encode vhost) \/ (url-encode user)) + (get-json* (merge opts (basic-auth* username password)))))) + +(defn set-permissions + ([vhost user perms] + (set-permissions @GLOBAL-SERVER vhost user perms)) + ([server vhost user perms] + (set-permissions server vhost user perms {})) + ([{:keys [endpoint username password]} vhost user {:keys [configure write read] :as perms} opts] + {:pre [(every? string? [configure write read])]} + (-> (str endpoint "/api/permissions/" (url-encode vhost) \/ (url-encode user)) + (PUT (-> opts + (assoc :body perms) + (merge (basic-auth* username password))))))) + +(defn delete-permissions + ([vhost user] + (delete-permissions @GLOBAL-SERVER vhost user)) + ([server vhost user] + (delete-permissions server vhost user {})) + ([{:keys [endpoint username password]} vhost user opts] + (-> (str endpoint "/api/permissions/" (url-encode vhost) \/ (url-encode user)) + (DELETE (merge opts (basic-auth* username password)))))) + +(defn list-queues + ([] + (list-queues nil)) + ([vhost] + (list-queues @GLOBAL-SERVER vhost)) + ([server vhost] + (list-queues (or server @GLOBAL-SERVER) vhost {})) + ([{:keys [endpoint username password]} vhost opts] + (-> (str endpoint "/api/queues" (optional-url-fragment vhost)) + (get-json* (merge opts (basic-auth* username password)))))) + +(defn get-queue + ([vhost queue] + (get-queue @GLOBAL-SERVER vhost queue)) + ([server vhost queue] + (get-queue server vhost queue {})) + ([{:keys [endpoint username password]} vhost queue opts] + (-> (str endpoint "/api/queues/" (url-encode vhost) \/ (url-encode queue)) + (get-json* (merge opts (basic-auth* username password)))))) + +(defn declare-queue + ([vhost queue properties] + (declare-queue @GLOBAL-SERVER vhost queue properties)) + ([server vhost queue properties] + (declare-queue server vhost queue properties {})) + ([{:keys [endpoint username password]} vhost queue properties opts] + (-> (str endpoint "/api/queues/" (url-encode vhost) \/ (url-encode queue)) + (PUT (-> opts + (assoc :body properties) + (merge (basic-auth* username password))))))) + +(defn delete-queue + ([vhost queue] + (delete-queue @GLOBAL-SERVER vhost queue)) + ([server vhost queue] + (delete-queue server vhost queue {})) + ([{:keys [endpoint username password]} vhost queue opts] + (-> (str endpoint "/api/queues/" (url-encode vhost) \/ (url-encode queue)) + (DELETE (merge opts (basic-auth* username password)))))) + +(defn purge-queue + ([vhost queue] + (purge-queue @GLOBAL-SERVER vhost queue)) + ([server vhost queue] + (purge-queue server vhost queue {})) + ([{:keys [endpoint username password]} vhost queue opts] + (-> "%s/api/queues/%s/%s/contents" + (format endpoint (url-encode vhost) (url-encode queue)) + (DELETE (merge opts (basic-auth* username password)))))) + +(defn get-message + [^String vhost ^String queue] + ;; FIXME: ??? + ) + +;; first two arities call "/api/bindings" +;; next two call "/api/bindings/:vhost" +;; last one calls "/api/queues/:vhost/:queue/bindings" +(defn list-bindings + ([] + (list-bindings {})) + ([opts] + (list-bindings nil opts)) + ([vhost opts] + (list-bindings @GLOBAL-SERVER vhost opts)) + ([{:keys [endpoint username password]} vhost opts] + (-> (str endpoint "/api/bindings" (optional-url-fragment vhost)) + (get-json* (merge opts (basic-auth* username password))))) + ([server vhost queue opts] + ;; this last arity is kind of special, in that there are no global defaults + ;; nil(s) must be explicitely passed in for defaults to fire + (let [{:keys [endpoint username password]} (or server @GLOBAL-SERVER)] + (-> "%s/api/queues/%s/%s/bindings" + (format endpoint (url-encode vhost) (url-encode queue)) + (get-json* (merge opts (basic-auth* username password))))))) + +(defn bind + ([vhost exchange queue] + (bind @GLOBAL-SERVER vhost exchange queue)) + ([server vhost exchange queue] + (bind server vhost exchange queue {})) + ([server vhost exchange queue properties] + (bind server vhost exchange queue properties {})) + ([{:keys [endpoint username password]} vhost exchange queue properties opts] + (-> "%s/api/bindings/%s/e/%s/q/%s" + (format endpoint (url-encode vhost) (url-encode exchange) (url-encode queue)) + (POST (-> opts + (assoc :body properties) + (merge (basic-auth* username password))))))) + +(defn list-users + ([] + (list-users @GLOBAL-SERVER)) + ([server] + (list-users server {})) + ([{:keys [endpoint username password]} opts] + (-> (str endpoint "/api/users") + (get-json* (merge opts (basic-auth* username password)))))) + +(defn get-user + ([user] + (get-user @GLOBAL-SERVER user)) + ([server user] + (get-user server user {})) + ([{:keys [endpoint username password]} user opts] + (-> (str endpoint "/api/users/" (url-encode user)) + (get-json* (merge opts (basic-auth* username password)))))) + +(defn user-exists? + ([user] + (user-exists? @GLOBAL-SERVER user)) + ([server user] + (user-exists? server user {})) + ([{:keys [endpoint username password]} user opts] + (-> (str endpoint "/api/users/" (url-encode user)) + (HEAD (merge opts (basic-auth* username password))) + :status + not-found? + false?))) + +(defn add-user + ([user pass tags] + (add-user @GLOBAL-SERVER user pass tags)) + ([server user pass tags] + (add-user server user pass tags {})) + ([{:keys [endpoint username password]} user pass tags opts] + (-> (str endpoint "/api/users/" (url-encode user)) + (PUT (-> opts + (assoc :body {:username user + :password pass + :tags tags + :has-password true}) + (merge (basic-auth* username password))))))) + +(defn delete-user + ([user] + (delete-user @GLOBAL-SERVER user)) + ([server user] + (delete-user server user {})) + ([{:keys [endpoint username password]} user opts] + (-> (str endpoint "/api/users/" (url-encode user)) + (DELETE (merge opts (basic-auth* username password)))))) + +(defn list-policies + ([] + (list-policies @GLOBAL-SERVER)) + ([server] + (list-policies server {})) + ([{:keys [endpoint username password]} opts] + (-> (str endpoint "/api/policies") + (get-json* (merge opts (basic-auth* username password)))))) + +(defn get-policies + ([vhost] + (get-policies @GLOBAL-SERVER vhost)) + ([server vhost] + (get-policies server vhost nil)) + ([server vhost policy-name] + (get-policies server vhost policy-name {})) + ([{:keys [endpoint username password]} vhost policy-name opts] + (-> (str endpoint "/api/policies/" (url-encode vhost) (optional-url-fragment policy-name)) + (get-json* (merge opts (basic-auth* username password)))))) + +(defn set-policy + ([vhost policy-name policy] + (set-policy @GLOBAL-SERVER vhost policy-name policy)) + ([server vhost policy-name policy] + (set-policy server vhost policy-name policy {})) + ([{:keys [endpoint username password]} vhost policy-name policy opts] + (-> (str endpoint "/api/policies/" (url-encode vhost) \/ (url-encode policy-name)) + (PUT (-> opts + (assoc :body policy) + (merge (basic-auth* username password))))))) + +(defn delete-policy + ([vhost] + (delete-policy @GLOBAL-SERVER vhost)) + ([server vhost] + (delete-policy server vhost {})) + ([{:keys [endpoint username password]} vhost opts] + (-> (str endpoint "/api/policies/" (url-encode vhost)) + (DELETE (merge opts (basic-auth* username password)))))) + +(defn whoami + ([] + (whoami @GLOBAL-SERVER)) + ([server] + (whoami server {})) + ([{:keys [endpoint username password]} opts] + (-> (str endpoint "/api/whoami") + (get-json* (merge opts (basic-auth* username password)))))) + +(defn aliveness-test + ([vhost] + (aliveness-test @GLOBAL-SERVER vhost)) + ([server vhost] + (aliveness-test server vhost {})) + ([{:keys [endpoint username password]} vhost opts] + (-> (str endpoint "/api/aliveness-test/" (url-encode vhost)) + (get-json* (merge opts (basic-auth* username password)))))) diff --git a/test/langohr/test/http_api_test.clj b/test/langohr/test/http_api_test.clj index c0ea177..2bbff2f 100644 --- a/test/langohr/test/http_api_test.clj +++ b/test/langohr/test/http_api_test.clj @@ -8,24 +8,32 @@ ;; You must not remove this notice, or any other, from this software. (ns langohr.test.http-api-test - (:require [langohr.http :as hc] - [clojure.test :refer [deftest is]] + (:require [langohr.http2 :as hc] + [clojure.test :refer [deftest is use-fixtures]] [clojure.set :refer [subset?]] [langohr.core :as rmq] [langohr.channel :as lch] [langohr.queue :as lq])) -(hc/connect! "http://127.0.0.1:15672" "guest" "guest") +(defn- with-local-conn + [run-all-tests!] + (hc/with-tmp-global-host + {:endpoint "http://localhost:15672" + :username "guest" + :password "guest"} + (run-all-tests!))) + +(use-fixtures :once with-local-conn) ;; ;; Implementation ;; -(defn await-event-propagation +(defn- await-event-propagation "Gives management plugin stats database a chance to update (updates happen asynchronously)" [] - (Thread/sleep 1150)) + (Thread/sleep 1500)) ;; ;; Tests From 9c1ce2fa111f13fb26b8827521cd8a1bdd2cc793 Mon Sep 17 00:00:00 2001 From: Dimitrios Piliouras Date: Sun, 6 Aug 2023 13:39:27 +0100 Subject: [PATCH 2/8] remove redundant `do` inside `when` --- src/clojure/langohr/core.clj | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/clojure/langohr/core.clj b/src/clojure/langohr/core.clj index 32663e6..98659a7 100644 --- a/src/clojure/langohr/core.clj +++ b/src/clojure/langohr/core.clj @@ -353,9 +353,8 @@ (when sasl-config (.setSaslConfig cf sasl-config)) (when ssl-context - (do - (.useSslProtocol cf ^javax.net.ssl.SSLContext ssl-context) - (.setPort cf final-port))) + (.useSslProtocol cf ^javax.net.ssl.SSLContext ssl-context) + (.setPort cf final-port)) (when verify-hostname (.enableHostnameVerification cf)) (when thread-factory From 5eadb8f44da34c210c3771b0f9f389a4ac1881b3 Mon Sep 17 00:00:00 2001 From: Dimitrios Piliouras Date: Sun, 6 Aug 2023 13:59:53 +0100 Subject: [PATCH 3/8] prefer `reify` in `exception-handler` --- src/clojure/langohr/core.clj | 43 ++++++++++++++++++------------------ 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/src/clojure/langohr/core.clj b/src/clojure/langohr/core.clj index 98659a7..f6505bd 100644 --- a/src/clojure/langohr/core.clj +++ b/src/clojure/langohr/core.clj @@ -251,37 +251,36 @@ handle-connection-recovery-exception-fn handle-channel-recovery-exception-fn handle-topology-recovery-exception-fn]}] - (proxy [ForgivingExceptionHandler] [] - (handleUnexpectedConnectionDriverException [^Connection conn ^Throwable t] + (reify ExceptionHandler + (handleUnexpectedConnectionDriverException [_ conn throwable] (when handle-connection-exception-fn - (handle-connection-exception-fn conn t))) - (handleReturnListenerException [^Channel ch ^Throwable t] + (handle-connection-exception-fn conn throwable))) + (handleReturnListenerException [_ channel throwable] (when handle-return-listener-exception-fn - (handle-return-listener-exception-fn ch t))) - (handleFlowListenerException [^Channel ch ^Throwable t] + (handle-return-listener-exception-fn channel throwable))) + (handleFlowListenerException [_ channel throwable] (when handle-flow-listener-exception-fn - (handle-flow-listener-exception-fn ch t))) - (handleConfirmListenerException [^Channel ch ^Throwable t] + (handle-flow-listener-exception-fn channel throwable))) + (handleConfirmListenerException [_ channel throwable] (when handle-confirm-listener-exception-fn - (handle-confirm-listener-exception-fn ch t))) - (handleBlockedListenerException [^Connection conn ^Throwable t] + (handle-confirm-listener-exception-fn channel throwable))) + (handleBlockedListenerException [_ conn throwable] (when handle-blocked-listener-exception-fn - (handle-blocked-listener-exception-fn conn t))) - (handleConsumerException [^Channel ch ^Throwable t - ^Consumer consumer ^String consumer-tag - ^String method-name] + (handle-blocked-listener-exception-fn conn throwable))) + (handleConsumerException [_ channel throwable consumer consumer-tag method-name] (when handle-consumer-exception-fn - (handle-consumer-exception-fn ch t consumer consumer-tag method-name))) - (handleConnectionRecoveryException [^Connection conn ^Throwable t] + (handle-consumer-exception-fn channel throwable consumer consumer-tag method-name))) + (handleConnectionRecoveryException [_ conn throwable] (when handle-connection-recovery-exception-fn - (handle-connection-recovery-exception-fn conn t))) - (handleChannelRecoveryException [^Channel ch ^Throwable t] + (handle-connection-recovery-exception-fn conn throwable))) + (handleChannelRecoveryException [_ channel throwable] (when handle-channel-recovery-exception-fn - (handle-channel-recovery-exception-fn ch t))) - (handleTopologyRecoveryException [^Connection conn ^Channel ch - ^TopologyRecoveryException t] + (handle-channel-recovery-exception-fn channel throwable))) + (handleTopologyRecoveryException [_ conn channel tre] ;; TopologyRecoveryException (when handle-topology-recovery-exception-fn - (handle-topology-recovery-exception-fn conn ch t))))) + (handle-topology-recovery-exception-fn conn channel tre))) + ) + ) ;; ;; Implementation From dfa6f83f0a1868b6e78b3c5369c0999c8f2f2cb7 Mon Sep 17 00:00:00 2001 From: Dimitrios Piliouras Date: Sun, 6 Aug 2023 14:06:43 +0100 Subject: [PATCH 4/8] remove `handle-flow-listener-exception-fn` as it's not part of `ExceptionHandler` --- src/clojure/langohr/core.clj | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/clojure/langohr/core.clj b/src/clojure/langohr/core.clj index f6505bd..d49e15f 100644 --- a/src/clojure/langohr/core.clj +++ b/src/clojure/langohr/core.clj @@ -244,7 +244,7 @@ (defn exception-handler [{:keys [handle-connection-exception-fn handle-return-listener-exception-fn - handle-flow-listener-exception-fn + ;handle-flow-listener-exception-fn handle-confirm-listener-exception-fn handle-blocked-listener-exception-fn handle-consumer-exception-fn @@ -258,9 +258,6 @@ (handleReturnListenerException [_ channel throwable] (when handle-return-listener-exception-fn (handle-return-listener-exception-fn channel throwable))) - (handleFlowListenerException [_ channel throwable] - (when handle-flow-listener-exception-fn - (handle-flow-listener-exception-fn channel throwable))) (handleConfirmListenerException [_ channel throwable] (when handle-confirm-listener-exception-fn (handle-confirm-listener-exception-fn channel throwable))) From c910165b38ef41f81b4c2643c74e3e72596e42ca Mon Sep 17 00:00:00 2001 From: Dimitrios Piliouras Date: Sun, 6 Aug 2023 14:09:37 +0100 Subject: [PATCH 5/8] remove redundant `let` --- src/clojure/langohr/core.clj | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/clojure/langohr/core.clj b/src/clojure/langohr/core.clj index d49e15f..1be83fe 100644 --- a/src/clojure/langohr/core.clj +++ b/src/clojure/langohr/core.clj @@ -294,11 +294,10 @@ (defn- platform-string [] - (let [] - (format "Clojure %s on %s %s" - (clojure-version) - (System/getProperty "java.vm.name") - (System/getProperty "java.version")))) + (format "Clojure %s on %s %s" + (clojure-version) + (System/getProperty "java.vm.name") + (System/getProperty "java.version"))) (def ^{:private true} client-properties {"product" "Langohr" From df1d332c114bd542de77b5399d4224b7e12ea097 Mon Sep 17 00:00:00 2001 From: Dimitrios Piliouras Date: Sun, 6 Aug 2023 14:34:21 +0100 Subject: [PATCH 6/8] make fields private in `FnConsumer` --- src/java/com/novemberain/langohr/FnConsumer.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/java/com/novemberain/langohr/FnConsumer.java b/src/java/com/novemberain/langohr/FnConsumer.java index 1d804a3..28cb1ce 100644 --- a/src/java/com/novemberain/langohr/FnConsumer.java +++ b/src/java/com/novemberain/langohr/FnConsumer.java @@ -14,9 +14,9 @@ import clojure.lang.Keyword; import clojure.lang.IFn; -public class FnConsumer extends DefaultConsumer { +public final class FnConsumer extends DefaultConsumer { - final IFn handleConsumeOK, handleCancel, handleCancelOK, handleShutdownSignal, handleRecoverOK, handleDelivery; + private final IFn handleConsumeOK, handleCancel, handleCancelOK, handleShutdownSignal, handleRecoverOK, handleDelivery; public static Channel asNonRecovering(Channel c) { if (c instanceof AutorecoveringChannel) { From 1b57038ccc01ca2f15c72667e050709cd289e8b0 Mon Sep 17 00:00:00 2001 From: Dimitrios Piliouras Date: Sun, 6 Aug 2023 14:39:02 +0100 Subject: [PATCH 7/8] change `-source` to 11 too --- project.clj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project.clj b/project.clj index 4cdf35c..2a4094f 100644 --- a/project.clj +++ b/project.clj @@ -19,7 +19,7 @@ :source-uri "https://github.com/michaelklishin/langohr/blob/v{version}/{filepath}#L{line}"}}} :source-paths ["src/clojure"] :java-source-paths ["src/java"] - :javac-options ["-target" "11" "-source" "1.8"] + :javac-options ["-target" "11" "-source" "11"] :url "https://clojurerabbitmq.info" :repositories {"sonatype" {:url "https://oss.sonatype.org/content/repositories/releases" :snapshots false From ef2a987b10927f0fd79237e24f0a0152ac7ba43c Mon Sep 17 00:00:00 2001 From: Dimitrios Piliouras Date: Mon, 7 Aug 2023 19:54:56 +0100 Subject: [PATCH 8/8] revert changes in gitignore --- .gitignore | 1 - 1 file changed, 1 deletion(-) diff --git a/.gitignore b/.gitignore index faca364..9a6492d 100644 --- a/.gitignore +++ b/.gitignore @@ -14,4 +14,3 @@ checkouts/* .clj-kondo/* .lsp/* -.calva/output-window/output.calva-repl