Skip to content

Commit

Permalink
Add publisher class, it fixes #3
Browse files Browse the repository at this point in the history
  • Loading branch information
descholar-ceo committed Feb 13, 2024
1 parent 90d023b commit 2b3df2b
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 6 deletions.
Empty file added src/gato/dtos.cr
Empty file.
18 changes: 18 additions & 0 deletions src/gato/publisher.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
module Gato
class Publisher
def self.publish(data : JSON::Any, queue_name : String)
AMQP::Client.start(Gato.configuration.amqp_url.to_s) do |c|
c.channel do |ch|
unless LuckyEnv.production?
Log.notice { "Publishing to #{queue_name}..." }
end
q = ch.queue queue_name
q.publish_confirm data
unless LuckyEnv.production?
Log.notice { "Successfully published to lavinmq!" }
end
end
end
end
end
end
18 changes: 12 additions & 6 deletions src/gato/runner.cr → src/gato/subscriber.cr
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
module Gato
class Runner
def self.start(param : Array(NamedTuple(queue_name: String, message_handler: JSON::Any ->))) : Nil
Log.notice { "El gato esta maullando..." }
class Subscriber
def self.subscribe(param : Array(NamedTuple(queue_name: String, message_handler: JSON::Any ->))) : Nil
unless LuckyEnv.production?
Log.notice { "El gato esta maullando..." }
end
AMQP::Client.start(Gato.configuration.amqp_url.to_s) do |c|
c.channel do |ch|
param.each do |curr_param|
q = ch.queue curr_param[:queue_name], durable: true
ch.prefetch count: 1

q.subscribe(no_ack: false, block: false) do |msg|
message = JSON.parse msg.body_io.to_s
Log.notice { "Received a new message for #{curr_param[:queue_name]} queue" }
unless LuckyEnv.production?
Log.notice { "Received a new message for #{curr_param[:queue_name]} queue" }
end
curr_param[:message_handler].call message
msg.ack
Log.notice { "Done processing the new message!" }
unless LuckyEnv.production?
Log.notice { "Done processing the new message!" }
end
end
end
sleep
Expand Down

0 comments on commit 2b3df2b

Please sign in to comment.