diff --git a/lib/amq/uri.rb b/lib/amq/uri.rb index f8fbb2e..6515f34 100644 --- a/lib/amq/uri.rb +++ b/lib/amq/uri.rb @@ -8,12 +8,23 @@ class URI # @private AMQP_PORTS = {"amqp" => 5672, "amqps" => 5671}.freeze + DEFAULTS = { + heartbeat: nil, + connection_timeout: nil, + channel_max: nil, + auth_mechanism: [], + verify: false, + fail_if_no_peer_cert: false, + cacertfile: nil, + certfile: nil, + keyfile: nil + }.freeze def self.parse(connection_string) uri = ::URI.parse(connection_string) raise ArgumentError.new("Connection URI must use amqp or amqps schema (example: amqp://bus.megacorp.internal:5766), learn more at http://bit.ly/ks8MXK") unless %w{amqp amqps}.include?(uri.scheme) - opts = {} + opts = DEFAULTS.dup opts[:scheme] = uri.scheme opts[:user] = ::CGI::unescape(uri.user) if uri.user @@ -26,8 +37,28 @@ def self.parse(connection_string) opts[:vhost] = ::CGI::unescape($1) end + if uri.query + query_params = CGI::parse(uri.query) + + normalized_query_params = Hash[query_params.map { |param, value| [param, value.one? ? value.first : value] }] + + opts[:heartbeat] = normalized_query_params["heartbeat"].to_i + opts[:connection_timeout] = normalized_query_params["connection_timeout"].to_i + opts[:channel_max] = normalized_query_params["channel_max"].to_i + opts[:auth_mechanism] = normalized_query_params["auth_mechanism"] + + %w(verify fail_if_no_peer_cert cacertfile certfile keyfile).each do |ssl_option| + if normalized_query_params[ssl_option] && uri.scheme == "amqp" + raise ArgumentError.new("Only of use for the amqps scheme") + else + opts[ssl_option.to_sym] = normalized_query_params[ssl_option] + end + end + end + opts end + def self.parse_amqp_url(s) parse(s) end diff --git a/spec/amq/uri_parsing_spec.rb b/spec/amq/uri_parsing_spec.rb index cbb8511..b624c36 100644 --- a/spec/amq/uri_parsing_spec.rb +++ b/spec/amq/uri_parsing_spec.rb @@ -92,4 +92,80 @@ expect(val[:vhost]).to be_nil # in this case, default / will be used end end + + subject { described_class.parse(uri) } + + context "schema 'amqp'" do + context "query parameters" do + context "present" do + let(:uri) { "amqp://rabbitmq?heartbeat=10&connection_timeout=100&channel_max=1000&auth_mechanism=plain&auth_mechanism=amqplain" } + + specify "parses parameters" do + expect(subject[:heartbeat]).to eq(10) + expect(subject[:connection_timeout]).to eq(100) + expect(subject[:channel_max]).to eq(1000) + expect(subject[:auth_mechanism]).to eq(["plain", "amqplain"]) + end + end + + context "absent" do + let(:uri) { "amqp://rabbitmq" } + + it "fallbacks to defaults" do + expect(subject[:heartbeat]).to be_nil + expect(subject[:connection_timeout]).to be_nil + expect(subject[:channel_max]).to be_nil + expect(subject[:auth_mechanism]).to be_empty + end + end + + context "tls parameters" do + %w(verify fail_if_no_peer_cert cacertfile certfile keyfile).each do |tls_param| + describe "'verify'" do + let(:uri) { "amqp://rabbitmq?#{tls_param}=true" } + + it "raises ArgumentError" do + expect { subject }.to raise_error(ArgumentError, /Only of use for the amqps scheme/) + end + end + end + end + end + end + + context "schema 'amqps'" do + context "query parameters" do + context "present" do + let(:uri) { "amqps://rabbitmq?heartbeat=10&connection_timeout=100&channel_max=1000&auth_mechanism=plain&auth_mechanism=amqplain&verify=true&fail_if_no_peer_cert=true&cacertfile=/examples/tls/cacert.pem&certfile=/examples/tls/client_cert.pem&keyfile=/examples/tls/client_key.pem" } + + it "parses parameters" do + expect(subject[:heartbeat]).to eq(10) + expect(subject[:connection_timeout]).to eq(100) + expect(subject[:channel_max]).to eq(1000) + expect(subject[:auth_mechanism]).to eq(["plain", "amqplain"]) + expect(subject[:verify]).to be_truthy + expect(subject[:fail_if_no_peer_cert]).to be_truthy + expect(subject[:cacertfile]).to eq("/examples/tls/cacert.pem") + expect(subject[:certfile]).to eq("/examples/tls/client_cert.pem") + expect(subject[:keyfile]).to eq("/examples/tls/client_key.pem") + end + end + + context "absent" do + let(:uri) { "amqps://rabbitmq" } + + it "fallbacks to defaults" do + expect(subject[:heartbeat]).to be_nil + expect(subject[:connection_timeout]).to be_nil + expect(subject[:channel_max]).to be_nil + expect(subject[:auth_mechanism]).to be_empty + expect(subject[:verify]).to be_falsey + expect(subject[:fail_if_no_peer_cert]).to be_falsey + expect(subject[:cacertfile]).to be_nil + expect(subject[:certfile]).to be_nil + expect(subject[:keyfile]).to be_nil + end + end + end + end end