Skip to content

Commit

Permalink
Cache queue url
Browse files Browse the repository at this point in the history
Cache the url of an Amazon SQS queue to avoid making
slow API requests.

Close active-elastic-job#3
  • Loading branch information
tawan committed Feb 6, 2016
1 parent bce9571 commit f03700f
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 15 deletions.
36 changes: 22 additions & 14 deletions lib/active_job/queue_adapters/active_elastic_job_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,12 @@ def initialize(serialized_job)
# #..
# end
class NonExistentQueue < Error
def initialize(job)
def initialize(queue_name)
msg = <<-MSG
The job is bound to queue at #{job.queue_name}. Unfortunately a queue
The job is bound to queue at #{queue_name}. Unfortunately a queue
with this name does not exist in this region. Either create an Amazon SQS queue
named #{job.queue_name} - you can do this in AWS console, make sure to select
region '#{ENV['AWS_REGION']}' - or you select another queue for
#{job.class.name} jobs.
named #{queue_name} - you can do this in AWS console, make sure to select
region '#{ENV['AWS_REGION']}' - or you select another queue for your jobs.
MSG
super msg
end
Expand All @@ -76,17 +75,10 @@ def enqueue(job) #:nodoc:
end

def enqueue_at(job, timestamp) #:nodoc:
queue_url = nil
begin
resp = aws_sqs_client.get_queue_url(queue_name: job.queue_name.to_s)
queue_url = resp.queue_url
rescue Aws::SQS::Errors::NonExistentQueue => e
raise NonExistentQueue, job
end
serialized_job = JSON.dump(job.serialize)
check_job_size!(serialized_job)
message = {
queue_url: queue_url,
queue_url: queue_url(job.queue_name),
message_body: serialized_job,
delay_seconds: calculate_delay(timestamp),
message_attributes: {
Expand All @@ -98,14 +90,30 @@ def enqueue_at(job, timestamp) #:nodoc:
}
resp = aws_sqs_client.send_message(message)
verify_md5_digests!(
resp,
resp,
message[:message_body],
message[:message_attributes]
)
rescue Aws::SQS::Errors::NonExistentQueue => e
unless @queue_urls[job.queue_name.to_s].nil?
@queue_urls[job.queue_name.to_s] = nil
retry
end
raise NonExistentQueue, job
end

private

def queue_url(queue_name)
cache_key = queue_name.to_s
@queue_urls ||= { }
return @queue_urls[cache_key] if @queue_urls[cache_key]
resp = aws_sqs_client.get_queue_url(queue_name: queue_name.to_s)
@queue_urls[cache_key] = resp.queue_url
rescue Aws::SQS::Errors::NonExistentQueue => e
raise NonExistentQueue, queue_name
end

def calculate_delay(timestamp)
delay = (timestamp - Time.current.to_f).to_i + 1
if delay > 15.minutes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ def initialize; end;
adapter.enqueue job
end

it "caches the queue url" do
expect(aws_sqs_client).to receive(:get_queue_url).exactly(0).times
adapter.enqueue job
end

it "sends the serialized job as a message to an AWS SQS queue" do
expect(aws_sqs_client).to receive(:send_message)

Expand Down Expand Up @@ -94,7 +99,7 @@ def initialize; end;

context "when queue does not exist" do
before do
allow(aws_sqs_client).to receive(:get_queue_url) { raise StubbedError }
allow(adapter).to receive(:queue_url) { raise StubbedError }
end

it "raises NonExistentQueue error" do
Expand Down

0 comments on commit f03700f

Please sign in to comment.