Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ static/js/lib/chunks/helpers.segment.js: | static/js/lib/chunks
tar -zxOf- package/dist/chunks/helpers.segment.js > $@

static/js/lib/luxon.js: | static/js/lib
curl --retry 5 -sLo $@ https://moment.github.io/luxon/es6/luxon.js
curl --retry 5 -sLo $@ https://moment.github.io/luxon/es6/luxon.mjs

static/js/lib/chartjs-adapter-luxon.esm.js: | static/js/lib
curl --retry 5 -sLo $@ https://cdn.jsdelivr.net/npm/chartjs-adapter-luxon@1.3.1/dist/chartjs-adapter-luxon.esm.js
Expand Down
108 changes: 108 additions & 0 deletions spec/exchange_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,114 @@ describe LavinMQ::Exchange do
end
end
end

describe "cross-protocol bindings" do
it "AMQP queue can bind to MQTT exchange" do
with_amqp_server do |s|
with_channel(s) do |ch|
ch.queue("test-queue")
mqtt_exchange = create_mqtt_exchange(s)
amqp_queue = s.vhosts["/"].queues["test-queue"]
binding_args = LavinMQ::AMQP::Table.new({"x-mqtt-qos": 1_u8})

mqtt_exchange.bind(amqp_queue, "sensor/temperature", binding_args).should be_true
binding = mqtt_exchange.bindings_details.find { |b| b.destination == amqp_queue }
binding.should_not be_nil
if binding
binding.binding_key.routing_key.should eq "sensor/temperature"
binding.binding_key.arguments.should eq binding_args
end
end
end
end

it "AMQP queue receives messages from MQTT publish" do
with_amqp_server do |s|
with_channel(s) do |ch|
ch.queue("mqtt-messages")
mqtt_exchange = create_mqtt_exchange(s)
amqp_queue = s.vhosts["/"].queues["mqtt-messages"]

mqtt_exchange.bind(amqp_queue, "home/lights", nil)
mqtt_exchange.publish(mqtt_publish("home/lights", "ON"), Set(LavinMQ::Queue).new, Set(LavinMQ::Exchange).new).should eq 1
amqp_queue.message_count.should eq 1
end
end
end

it "MQTT exchange routes with topic patterns" do
with_amqp_server do |s|
with_channel(s) do |ch|
ch.queue("sensor-data")
ch.queue("all-data")
mqtt_exchange = create_mqtt_exchange(s)
sensor_queue = s.vhosts["/"].queues["sensor-data"]
all_queue = s.vhosts["/"].queues["all-data"]

mqtt_exchange.bind(sensor_queue, "sensor/+/temperature", nil)
mqtt_exchange.bind(all_queue, "#", nil)
mqtt_exchange.publish(mqtt_publish("sensor/bedroom/temperature", "22.5"), Set(LavinMQ::Queue).new, Set(LavinMQ::Exchange).new).should eq 2
sensor_queue.message_count.should eq 1
all_queue.message_count.should eq 1
end
end
end

it "handles QoS arguments and unbinding" do
with_amqp_server do |s|
with_channel(s) do |ch|
ch.queue("qos-test")
mqtt_exchange = create_mqtt_exchange(s)
amqp_queue = s.vhosts["/"].queues["qos-test"]
qos_args = LavinMQ::AMQP::Table.new({"x-mqtt-qos": 2_u8})

mqtt_exchange.bind(amqp_queue, "device/status", qos_args).should be_true
binding = mqtt_exchange.bindings_details.find { |b| b.destination == amqp_queue }
binding.should_not be_nil
binding.try(&.binding_key.arguments.should eq qos_args)

mqtt_exchange.unbind(amqp_queue, "device/status", qos_args).should be_true
mqtt_exchange.bindings_details.find { |b| b.destination == amqp_queue }.should be_nil
end
end
end

it "multiple queues bind to same topic with different QoS" do
with_amqp_server do |s|
with_channel(s) do |ch|
["mqtt-qos0", "mqtt-qos1", "mqtt-qos2"].each { |name| ch.queue(name) }
mqtt_exchange = create_mqtt_exchange(s)
queues = ["mqtt-qos0", "mqtt-qos1", "mqtt-qos2"].map { |name| s.vhosts["/"].queues[name] }

queues.each_with_index do |queue, i|
mqtt_exchange.bind(queue, "alerts/fire", LavinMQ::AMQP::Table.new({"x-mqtt-qos": i.to_u8}))
end

mqtt_exchange.bindings_details.count { |b| b.binding_key.routing_key == "alerts/fire" }.should eq 3
mqtt_exchange.publish(mqtt_publish("alerts/fire", "EMERGENCY", 1u8), Set(LavinMQ::Queue).new, Set(LavinMQ::Exchange).new).should eq 3
queues.each(&.message_count.should(eq(1)))
end
end
end

it "handles MQTT wildcard patterns correctly" do
with_amqp_server do |s|
with_channel(s) do |ch|
ch.queue("pattern-test")
mqtt_exchange = create_mqtt_exchange(s)
amqp_queue = s.vhosts["/"].queues["pattern-test"]

mqtt_exchange.bind(amqp_queue, "devices/+/status", nil)
mqtt_exchange.bind(amqp_queue, "logs/#", nil)

mqtt_exchange.publish(mqtt_publish("devices/sensor1/status", "online"), Set(LavinMQ::Queue).new, Set(LavinMQ::Exchange).new).should eq 1
mqtt_exchange.publish(mqtt_publish("logs/app/error/db", "failed"), Set(LavinMQ::Queue).new, Set(LavinMQ::Exchange).new).should eq 1
mqtt_exchange.publish(mqtt_publish("devices/sensor1/data/temp", "20"), Set(LavinMQ::Queue).new, Set(LavinMQ::Exchange).new).should eq 0
amqp_queue.message_count.should eq 2
end
end
end
end
end
describe "Exchange => Exchange binding" do
it "should allow multiple e2e bindings" do
Expand Down
18 changes: 0 additions & 18 deletions spec/message_routing_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -442,22 +442,4 @@ module MessageRoutingSpec
end
end
end

describe LavinMQ::MQTT::Exchange do
it "should only allow Session to bind" do
with_amqp_server do |s|
vhost = s.vhosts.create("x")
q1 = LavinMQ::AMQP::Queue.new(vhost, "q1")
s1 = LavinMQ::MQTT::Session.new(vhost, "q1")
index = LavinMQ::MQTT::TopicTree(String).new
store = LavinMQ::MQTT::RetainStore.new("tmp/retain_store", LavinMQ::Clustering::NoopServer.new, index)
x = LavinMQ::MQTT::Exchange.new(vhost, "", store)
x.bind(s1, "s1", LavinMQ::AMQP::Table.new)
expect_raises(LavinMQ::Exchange::AccessRefused) do
x.bind(q1, "q1", LavinMQ::AMQP::Table.new)
end
store.close
end
end
end
end
13 changes: 13 additions & 0 deletions spec/spec_helper.cr
Original file line number Diff line number Diff line change
Expand Up @@ -190,3 +190,16 @@ module LavinMQ
end
end
end

# MQTT Cross-Protocol Binding Helpers
def create_mqtt_exchange(s)
vhost = s.vhosts["/"]
retain_store = LavinMQ::MQTT::RetainStore.new(File.join(vhost.data_dir, "mqtt_retain"), LavinMQ::Clustering::NoopServer.new)
mqtt_exchange = LavinMQ::MQTT::Exchange.new(vhost, "mqtt.default", retain_store)
vhost.exchanges["mqtt.default"] = mqtt_exchange
mqtt_exchange
end

def mqtt_publish(topic, payload, qos = 0u8)
MQTT::Protocol::Publish.new(topic, payload.to_slice, nil, false, qos, false)
end
14 changes: 7 additions & 7 deletions spec/upstream_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ describe LavinMQ::Federation::Upstream do

downstream_ex = vhost2.exchanges["downstream_ex"]
downstream_q = vhost2.queues["downstream_q"]
downstream_ex.bind(downstream_q, "#")
downstream_ex.bind(downstream_q, "#", nil)

url = URI.parse(s.amqp_url)
url.path = vhost1.name
Expand Down Expand Up @@ -719,7 +719,7 @@ describe LavinMQ::Federation::Upstream do
link_ex2 = upstream_ex1_to_ex2.link(ex2)
wait_for { link_ex2.state.running? && link_ex3.state.running? }

downstream_ex.bind(downstream_q, "#")
downstream_ex.bind(downstream_q, "#", nil)
with_channel(s, vhost: "three") do |ch|
ch_q3 = ch.queue("q3")

Expand Down Expand Up @@ -754,7 +754,7 @@ describe LavinMQ::Federation::Upstream do

downstream_ex = vhost2.exchanges["downstream_ex"]
downstream_q = vhost2.queues["downstream_q"]
downstream_ex.bind(downstream_q, "federation.link.rk")
downstream_ex.bind(downstream_q, "federation.link.rk", nil)

upstream_ex = vhost1.exchanges["upstream_ex"]

Expand Down Expand Up @@ -786,7 +786,7 @@ describe LavinMQ::Federation::Upstream do
q = downstream_vhost.queues["q"]
# This binding should be propagated all the way to "fe" in "v1"
# For each hop x-bound-from should be extended with one new entry
downstream_ex.bind(q, "spec.routing.key")
downstream_ex.bind(q, "spec.routing.key", nil)

# Verify bindings on exchange "fe" from vhost v1 to
# vhost v9.
Expand Down Expand Up @@ -827,7 +827,7 @@ describe LavinMQ::Federation::Upstream do
q = downstream_vhost.queues["q"]
# This binding should only be propagated to "fe" in "v4", because
# max_hops on v5 is 1.
downstream_ex.bind(q, "spec.routing.key")
downstream_ex.bind(q, "spec.routing.key", nil)

# Verify bindings on exchange "fe" from vhost v1 to
# vhost v4.
Expand Down Expand Up @@ -874,7 +874,7 @@ describe LavinMQ::Federation::Upstream do
q = downstream_vhost.queues["q"]
# This binding should only be propagated to "fe" in "v4", because
# max_hops on v5 is 1.
downstream_ex.bind(q, "spec.routing.key")
downstream_ex.bind(q, "spec.routing.key", nil)

# Verify bindings on exchange "fe" from vhost v1 to
# vhost v6.
Expand Down Expand Up @@ -912,7 +912,7 @@ describe LavinMQ::Federation::Upstream do
downstream_ex = downstream_vhost.exchanges["fe"]
downstream_vhost.declare_queue("q", durable: true, auto_delete: false)
q = downstream_vhost.queues["q"]
downstream_ex.bind(q, "spec.routing.key")
downstream_ex.bind(q, "spec.routing.key", nil)

with_channel(s, vhost: "v1") do |ch|
# By addign two entries in x-received-from the message should
Expand Down
48 changes: 12 additions & 36 deletions src/lavinmq/amqp/exchange/exchange.cr
Original file line number Diff line number Diff line change
Expand Up @@ -160,34 +160,6 @@ module LavinMQ
notify_observers(ExchangeEvent::Deleted)
end

# This outer macro will add a finished macro hook to all inherited classes
# in LavinMQ::AMQP namespace.
macro inherited
{% if @type.name.starts_with?("LavinMQ::AMQP::") %}
# This macro will find the "bind" method of classes inheriting from this class
# and redefine them to raise AccessRefused exception if the first argument
# isn't a type in LavinMQ::AMQP namespace.
#
# TODO remove this when LavinMQ::MQTT::Session no longer inherit from
# LavinMQ::AMQP::Queue and LavinMQ::MQTT::Exchange no longer inherit from
# lavinMQ::AMQP::Exchange
macro finished
\{% if (m = @type.methods.find(&.name.== "bind")) %}
def bind(\{{ m.args.map(&.id).join(",").id}}) : Bool
unless \{{m.args[0].name.id}}.class.name.starts_with?("LavinMQ::AMQP::")
raise AccessRefused.new(self)
end
\{{ m.body }}
end
\{% end %}
end
{% end %}
end

def bind(destination : LavinMQ::Destination, routing_key, arguments = nil) : Bool
raise AccessRefused.new(self)
end

abstract def type : String
abstract def bind(destination : AMQP::Destination, routing_key : String, arguments : AMQP::Table?)
abstract def unbind(destination : AMQP::Destination, routing_key : String, arguments : AMQP::Table?)
Expand Down Expand Up @@ -246,14 +218,7 @@ module LavinMQ
queues : Set(LavinMQ::Queue) = Set(LavinMQ::Queue).new,
exchanges : Set(LavinMQ::Exchange) = Set(LavinMQ::Exchange).new) : Nil
return unless exchanges.add? self
each_destination(routing_key, headers) do |d|
case d
in LavinMQ::Queue
queues.add(d)
in LavinMQ::Exchange
d.find_queues(routing_key, headers, queues, exchanges)
end
end
find_queues_internal(routing_key, headers, queues, exchanges)

if hdrs = headers
find_cc_queues(hdrs, "CC", queues)
Expand All @@ -268,6 +233,17 @@ module LavinMQ
end
end

protected def find_queues_internal(routing_key, headers, queues, exchanges)
each_destination(routing_key, headers) do |d|
case d
in LavinMQ::Queue
queues.add(d)
in LavinMQ::Exchange
d.find_queues(routing_key, headers, queues, exchanges)
end
end
end

private def find_cc_queues(headers, key, queues)
return unless cc = headers[key]?
cc = cc.as?(Array(AMQP::Field))
Expand Down
4 changes: 2 additions & 2 deletions src/lavinmq/http/controller/bindings.cr
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ module LavinMQ
access_refused(context, "User doesn't have write permissions to exchange '#{destination.name}'")
elsif source.name.empty? || destination.name.empty?
access_refused(context, "Not allowed to bind to the default exchange")
elsif destination.internal?
elsif destination.internal? && destination.name != "mqtt.default"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is mqtt.default an internal or default exchange? the if above checks for default exchange and based on the name this is a default exchange.

Is there a way to not have the string "mqtt.default" here but rather some method on destination to check wether we can bind/unbind to the change? Less hard coded strings is my goal here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, i agree this is unclear. Its been a "good enough" solution up untill now. I think we should have it as a default exchange and let it behave more like an amq.topic exchange. @antondalgren lifts a good issue about bindings not persiting for mqtt exchange bound to amqp exchange/queues. this is because we onyl built it to support the mqtt use case.

tldr; we need to refactor the exchange's place in lavinmq :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but new features like this might be less prioritized next week/2 weeks to prioritize squashing bugs we found in 2.4.0

bad_request(context, "Not allowed to bind to an internal exchange")
end
body = parse_body(context)
Expand Down Expand Up @@ -177,7 +177,7 @@ module LavinMQ
access_refused(context, "User doesn't have write permissions to queue '#{destination.name}'")
elsif source.name.empty? || destination.name.empty?
access_refused(context, "Not allowed to unbind from the default exchange")
elsif destination.internal?
elsif destination.internal? && destination.name != "mqtt.default"
bad_request(context, "Not allowed to unbind from an internal exchange")
end
props = URI.decode_www_form(params["props"])
Expand Down
4 changes: 2 additions & 2 deletions src/lavinmq/mqtt/broker.cr
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ module LavinMQ
@vhost.rm_connection(client)
end

def publish(packet : MQTT::Publish)
@exchange.publish(packet)
def publish(packet : MQTT::Publish, queues : Set(LavinMQ::Queue), exchanges : Set(LavinMQ::Exchange))
@exchange.publish(packet, queues, exchanges)
end

def subscribe(client, topics)
Expand Down
6 changes: 4 additions & 2 deletions src/lavinmq/mqtt/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ module LavinMQ
metadata = ::Log::Metadata.new(nil, {vhost: @broker.vhost.name, address: @connection_info.remote_address.to_s, client_id: client_id})
@log = Logger.new(Log, metadata)
@log.info { "Connection established for user=#{@user.name}" }
@queues = Set(LavinMQ::Queue).new
@exchanges = Set(LavinMQ::Exchange).new
spawn read_loop, name: "MQTT read_loop #{@connection_info.remote_address}"
end

Expand Down Expand Up @@ -127,7 +129,7 @@ module LavinMQ
end

def recieve_publish(packet : MQTT::Publish)
@broker.publish(packet)
@broker.publish(packet, @queues, @exchanges)
vhost.event_tick(EventType::ClientPublish)
# Ok to not send anything if qos = 0 (fire and forget)
if packet.qos > 0 && (packet_id = packet.packet_id)
Expand Down Expand Up @@ -186,7 +188,7 @@ module LavinMQ
qos: will.qos,
retain: will.retain?,
dup: false,
)
), @queues, @exchanges
end
rescue ex
@log.warn { "Failed to publish will: #{ex.message}" }
Expand Down
Loading
Loading