Skip to content

Commit

Permalink
Merge pull request #115 from vincentjames501/fix/114
Browse files Browse the repository at this point in the history
Fix/114
  • Loading branch information
michaelklishin authored Aug 7, 2023
2 parents 9955fe4 + 78a93d3 commit 32c72a7
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 27 deletions.
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ debug/*
tmp/*
checkouts/*
.nrepl-*

*.iml
.idea/
.clj-kondo/*
.lsp/*
.lsp/*
52 changes: 27 additions & 25 deletions src/clojure/langohr/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,16 @@
* http://clojurerabbitmq.info/articles/connecting.html
* http://clojurerabbitmq.info/articles/tls.html"
(:import [com.rabbitmq.client Connection Channel Address
ConnectionFactory ShutdownListener BlockedListener
Consumer TopologyRecoveryException
ExceptionHandler Recoverable RecoveryListener DefaultSaslConfig]
ConnectionFactory ShutdownListener BlockedListener
Consumer TopologyRecoveryException
ExceptionHandler Recoverable RecoveryListener DefaultSaslConfig]
[com.rabbitmq.client.impl ForgivingExceptionHandler AMQConnection]
[com.rabbitmq.client.impl.recovery AutorecoveringConnection QueueRecoveryListener]
[com.rabbitmq.client.impl.recovery AutorecoveringConnection QueueRecoveryListener RetryHandler]
clojure.lang.IFn
java.util.concurrent.ThreadFactory)
java.util.concurrent.ThreadFactory
[javax.net SocketFactory])
(:require langohr.channel
[clojure.string :as s]
[clojure.walk :as walk]))
[clojure.walk :as walk]))

;;
;; Implementation
Expand All @@ -59,8 +59,7 @@
{:username "guest"
:password "guest"
:vhost "/"
:host "localhost"
:port ConnectionFactory/DEFAULT_AMQP_PORT})
:host "localhost"})

;;
;; API
Expand Down Expand Up @@ -111,13 +110,9 @@
;; settings
([settings]
(let [settings' (normalize-settings settings)
tls (get settings' :ssl false)
default-port (if tls
ConnectionFactory/DEFAULT_AMQP_OVER_SSL_PORT
ConnectionFactory/DEFAULT_AMQP_PORT)
^ConnectionFactory cf (create-connection-factory settings')
xs (address-array-from (get settings' :hosts #{})
(get settings' :port default-port))]
(get settings' :port))]
(doto (com.novemberain.langohr.Connection. cf (dissoc settings' :password :username))
(.init xs)))))

Expand Down Expand Up @@ -288,13 +283,19 @@
;;

(defn normalize-settings
"For setting maps that contain keys such as :host, :username, :vhost, returns the argument"
"Normalizes settings by converting a :uri/:host key to (at minimum) a map that contains
:port, :hosts, :username, :password, :vhost. If :port is not supplied, it will be defaulted
to the default AMQP port depending on if :ssl is supplied."
[config]
(let [{:keys [host hosts]} config
hosts' (into #{} (remove nil? (or hosts #{host})))]
(merge (settings-from (:uri config (System/getenv "RABBITMQ_URL")))
{:hosts hosts'}
config)))
hosts' (into #{} (remove nil? (or hosts #{host})))
settings' (merge (settings-from (:uri config (System/getenv "RABBITMQ_URL")))
{:hosts hosts'}
config)
default-port (if (:ssl settings' false)
ConnectionFactory/DEFAULT_AMQP_OVER_SSL_PORT
ConnectionFactory/DEFAULT_AMQP_PORT)]
(update settings' :port #(or % default-port))))

(defn- platform-string
[]
Expand Down Expand Up @@ -325,15 +326,12 @@
(let [{:keys [host port username password vhost
requested-heartbeat connection-timeout ssl ssl-context verify-hostname socket-factory sasl-config
requested-channel-max thread-factory exception-handler
connection-name update-client-properties]
connection-name update-client-properties topology-recovery-retry-handler]
:or {requested-heartbeat ConnectionFactory/DEFAULT_HEARTBEAT
connection-timeout ConnectionFactory/DEFAULT_CONNECTION_TIMEOUT
requested-channel-max ConnectionFactory/DEFAULT_CHANNEL_MAX
sasl-config (auth-mechanism->sasl-config settings)}} (normalize-settings settings)
cf (ConnectionFactory.)
final-port (if (and ssl (= port ConnectionFactory/DEFAULT_AMQP_PORT))
ConnectionFactory/DEFAULT_AMQP_OVER_SSL_PORT
port)
final-properties (cond-> client-properties
connection-name (assoc "connection_name" connection-name)
update-client-properties update-client-properties)]
Expand All @@ -346,7 +344,7 @@
(.setPassword password)
(.setVirtualHost vhost)
(.setHost host)
(.setPort final-port)
(.setPort port)
(.setRequestedHeartbeat requested-heartbeat)
(.setConnectionTimeout connection-timeout)
(.setRequestedChannelMax requested-channel-max))
Expand All @@ -355,11 +353,15 @@
(when ssl-context
(do
(.useSslProtocol cf ^javax.net.ssl.SSLContext ssl-context)
(.setPort cf final-port)))
(.setPort cf port)))
(when verify-hostname
(.enableHostnameVerification cf))
(when thread-factory
(.setThreadFactory cf ^ThreadFactory thread-factory))
(when topology-recovery-retry-handler
(.setTopologyRecoveryRetryHandler cf ^RetryHandler topology-recovery-retry-handler))
(when socket-factory
(.setSocketFactory cf ^SocketFactory socket-factory))
(if exception-handler
(.setExceptionHandler cf ^ExceptionHandler exception-handler)
(.setExceptionHandler cf (ForgivingExceptionHandler.)))
Expand Down

0 comments on commit 32c72a7

Please sign in to comment.