Skip to content

Commit 3f3046f

Browse files
authored
suggestions (#1224)
1 parent d380812 commit 3f3046f

File tree

5 files changed

+20
-31
lines changed

5 files changed

+20
-31
lines changed

spec/exchange_spec.cr

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ describe LavinMQ::Exchange do
7272
amqp_queue = s.vhosts["/"].queues["mqtt-messages"]
7373

7474
mqtt_exchange.bind(amqp_queue, "home/lights", nil)
75-
mqtt_exchange.publish(mqtt_publish("home/lights", "ON"), Set(LavinMQ::Queue).new, Set(LavinMQ::Exchange).new).should eq 1
75+
mqtt_exchange.publish(mqtt_publish("home/lights", "ON")).should eq 1
7676
amqp_queue.message_count.should eq 1
7777
end
7878
end
@@ -89,7 +89,7 @@ describe LavinMQ::Exchange do
8989

9090
mqtt_exchange.bind(sensor_queue, "sensor/+/temperature", nil)
9191
mqtt_exchange.bind(all_queue, "#", nil)
92-
mqtt_exchange.publish(mqtt_publish("sensor/bedroom/temperature", "22.5"), Set(LavinMQ::Queue).new, Set(LavinMQ::Exchange).new).should eq 2
92+
mqtt_exchange.publish(mqtt_publish("sensor/bedroom/temperature", "22.5")).should eq 2
9393
sensor_queue.message_count.should eq 1
9494
all_queue.message_count.should eq 1
9595
end

src/lavinmq/amqp/exchange/exchange.cr

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,14 @@ module LavinMQ
218218
queues : Set(LavinMQ::Queue) = Set(LavinMQ::Queue).new,
219219
exchanges : Set(LavinMQ::Exchange) = Set(LavinMQ::Exchange).new) : Nil
220220
return unless exchanges.add? self
221-
find_queues_internal(routing_key, headers, queues, exchanges)
221+
each_destination(routing_key, headers) do |d|
222+
case d
223+
in LavinMQ::Queue
224+
queues.add(d)
225+
in LavinMQ::Exchange
226+
d.find_queues(routing_key, headers, queues, exchanges)
227+
end
228+
end
222229

223230
if hdrs = headers
224231
find_cc_queues(hdrs, "CC", queues)
@@ -233,17 +240,6 @@ module LavinMQ
233240
end
234241
end
235242

236-
protected def find_queues_internal(routing_key, headers, queues, exchanges)
237-
each_destination(routing_key, headers) do |d|
238-
case d
239-
in LavinMQ::Queue
240-
queues.add(d)
241-
in LavinMQ::Exchange
242-
d.find_queues(routing_key, headers, queues, exchanges)
243-
end
244-
end
245-
end
246-
247243
private def find_cc_queues(headers, key, queues)
248244
return unless cc = headers[key]?
249245
cc = cc.as?(Array(AMQP::Field))

src/lavinmq/mqtt/broker.cr

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,8 @@ module LavinMQ
7070
@vhost.rm_connection(client)
7171
end
7272

73-
def publish(packet : MQTT::Publish, queues : Set(LavinMQ::Queue), exchanges : Set(LavinMQ::Exchange))
74-
@exchange.publish(packet, queues, exchanges)
73+
def publish(packet : MQTT::Publish)
74+
@exchange.publish(packet)
7575
end
7676

7777
def subscribe(client, topics)

src/lavinmq/mqtt/client.cr

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,6 @@ module LavinMQ
4040
metadata = ::Log::Metadata.new(nil, {vhost: @broker.vhost.name, address: @connection_info.remote_address.to_s, client_id: client_id})
4141
@log = Logger.new(Log, metadata)
4242
@log.info { "Connection established for user=#{@user.name}" }
43-
@queues = Set(LavinMQ::Queue).new
44-
@exchanges = Set(LavinMQ::Exchange).new
4543
spawn read_loop, name: "MQTT read_loop #{@connection_info.remote_address}"
4644
end
4745

@@ -129,7 +127,7 @@ module LavinMQ
129127
end
130128

131129
def recieve_publish(packet : MQTT::Publish)
132-
@broker.publish(packet, @queues, @exchanges)
130+
@broker.publish(packet)
133131
vhost.event_tick(EventType::ClientPublish)
134132
# Ok to not send anything if qos = 0 (fire and forget)
135133
if packet.qos > 0 && (packet_id = packet.packet_id)
@@ -188,7 +186,7 @@ module LavinMQ
188186
qos: will.qos,
189187
retain: will.retain?,
190188
dup: false,
191-
), @queues, @exchanges
189+
)
192190
end
193191
rescue ex
194192
@log.warn { "Failed to publish will: #{ex.message}" }

src/lavinmq/mqtt/exchange.cr

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,12 @@ module LavinMQ
3737

3838
def initialize(vhost : VHost, name : String, @retain_store : MQTT::RetainStore)
3939
super(vhost, name, false, false, true)
40+
@queues = Set(LavinMQ::Queue).new
41+
@exchanges = Set(LavinMQ::Exchange).new
42+
end
43+
44+
def publish(packet : MQTT::Publish) : UInt32
45+
publish(packet, @queues, @exchanges)
4046
end
4147

4248
def publish(packet : MQTT::Publish, queues : Set(LavinMQ::Queue), exchanges : Set(LavinMQ::Exchange)) : UInt32
@@ -93,17 +99,6 @@ module LavinMQ
9399
# Use only the subscription tree for all destinations (MQTT and AMQP)
94100
end
95101

96-
protected def find_queues_internal(routing_key, headers, queues, exchanges)
97-
@tree.each_entry(routing_key) do |destination, _|
98-
case destination
99-
in LavinMQ::Queue
100-
queues.add(destination)
101-
in LavinMQ::Exchange
102-
destination.find_queues(routing_key, headers, queues, exchanges)
103-
end
104-
end
105-
end
106-
107102
def bind(destination : Destination, routing_key : String, arguments = nil) : Bool
108103
qos = arguments.try { |h| h[QOS_HEADER]?.try(&.as(UInt8)) } || 0u8
109104
binding_key = BindingKey.new(routing_key, arguments)

0 commit comments

Comments
 (0)