Skip to content

Commit

Permalink
Add support for Elastic Beanstalk's Periodic Tasks
Browse files Browse the repository at this point in the history
Intercept messages which have been enqueued by the EB
worker environment itself due to a present cron.yaml file.
Further information on Elastic Beanstalk's Periodic task
feature is available in Amazon's official documentation:
http://docs.aws.amazon.com/elasticbeanstalk/latest/dg/using-features-managing-env-tiers.html#worker-periodictasks

Close active-elastic-job#40
  • Loading branch information
tawan committed Nov 19, 2016
1 parent 6110d57 commit 0b1d4b8
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 17 deletions.
60 changes: 43 additions & 17 deletions lib/active_elastic_job/rack/sqs_message_consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,13 @@ module Rack
# This middleware intercepts requests which are sent by the SQS daemon
# running in {Amazon Elastic Beanstalk worker environments}[http://docs.aws.amazon.com/elasticbeanstalk/latest/dg/using-features-managing-env-tiers.html].
# It does this by looking at the +User-Agent+ header.
# Furthermore, it verifies the digest which is sent along with a legit SQS
# Requesets from the SQS daemon are handled in two alternative cases:
#
# (1) the processed SQS message was orignally triggered by a periodic task
# supported by Elastic Beanstalk's Periodic Task feature
#
# (2) the processed SQS message was queued by this gem representing an active job.
# In this case it verifies the digest which is sent along with a legit SQS
# message, and passed as an HTTP header in the resulting request.
# The digest is based on Rails' +secrets.secret_key_base+.
# Therefore, the application running in the web environment, which generates
Expand All @@ -21,33 +27,43 @@ class SqsMessageConsumer
OK_RESPONSE_CODE = '200'.freeze
INSIDE_DOCKER_CONTAINER = `[ -f /proc/1/cgroup ] && cat /proc/1/cgroup` =~ /docker/
DOCKER_HOST_IP = "172.17.0.1".freeze
PERIODIC_TASK_PATH = "/periodic_tasks".freeze

def initialize(app) #:nodoc:
@app = app
end

def call(env) #:nodoc:
request = ActionDispatch::Request.new env
if enabled? && aws_sqsd?(request) && originates_from_gem?(request)
if enabled? && aws_sqsd?(request)
unless request.local? || sent_from_docker_host?(request)
m = "Accepts only requests from localhost for job processing".freeze
return ['403', {CONTENT_TYPE_HEADER_NAME => 'text/plain' }, [ m ]]
end
begin
verify!(request)
job = JSON.load(request.body)
ActiveJob::Base.execute(job)
rescue ActiveElasticJob::MessageVerifier::InvalidDigest => e
return [
'403',
{CONTENT_TYPE_HEADER_NAME => 'text/plain' },
["Incorrect digest! Please, make sure that both environments, worker and web, use the same SECRET_KEY_BASE setting."]]
end
return [
OK_RESPONSE_CODE ,
{CONTENT_TYPE_HEADER_NAME => CONTENT_TYPE },
[ '' ]]
end

if periodic_task?(request)
execute_periodic_task(request)
return [
OK_RESPONSE_CODE ,
{CONTENT_TYPE_HEADER_NAME => CONTENT_TYPE },
[ '' ]]
elsif originates_from_gem?(request)
begin
verify!(request)
job = JSON.load(request.body)
ActiveJob::Base.execute(job)
rescue ActiveElasticJob::MessageVerifier::InvalidDigest => e
return [
'403',
{CONTENT_TYPE_HEADER_NAME => 'text/plain' },
["Incorrect digest! Please, make sure that both environments, worker and web, use the same SECRET_KEY_BASE setting."]]
end
return [
OK_RESPONSE_CODE ,
{CONTENT_TYPE_HEADER_NAME => CONTENT_TYPE },
[ '' ]]
end
end
@app.call(env)
end

Expand Down Expand Up @@ -79,6 +95,16 @@ def aws_sqsd?(request)
current_user_agent[0..(USER_AGENT_PREFIX.size - 1)] == USER_AGENT_PREFIX)
end

def periodic_task?(request)
!request.fullpath.nil? && request.fullpath[0..(PERIODIC_TASK_PATH.size - 1)] == PERIODIC_TASK_PATH
end

def execute_periodic_task(request)
job_name = request.headers['X-Aws-Sqsd-Taskname']
job = job_name.constantize.new
job.perform_now
end

def originates_from_gem?(request)
if request.headers[ORIGIN_HEADER_NAME] == ActiveElasticJob::ACRONYM
return true
Expand Down
13 changes: 13 additions & 0 deletions spec/integration/rails-app-4.2/app/jobs/periodic_task_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
require 'net/http'

class PeriodicTaskJob < ActiveJob::Base
WEB_ENV_HOST = ENV['WEB_ENV_HOST']
WEB_ENV_PORT = ENV['WEB_ENV_PORT']

def perform
Net::HTTP.start(WEB_ENV_HOST, WEB_ENV_PORT, use_ssl: true, verify_mode: OpenSSL::SSL::VERIFY_NONE) do |https|
request = Net::HTTP::Delete.new "/random_strings/from_periodic_task"
https.request request
end
end
end
5 changes: 5 additions & 0 deletions spec/integration/rails-app-4.2/cron.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
version: 1
cron:
- name: "PeriodicTaskJob"
url: "/periodic_tasks"
schedule: "* * * * *"
13 changes: 13 additions & 0 deletions spec/integration/rails-app-5.0/app/jobs/periodic_task_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
require 'net/http'

class PeriodicTaskJob < ActiveJob::Base
WEB_ENV_HOST = ENV['WEB_ENV_HOST']
WEB_ENV_PORT = ENV['WEB_ENV_PORT']

def perform
Net::HTTP.start(WEB_ENV_HOST, WEB_ENV_PORT, use_ssl: true, verify_mode: OpenSSL::SSL::VERIFY_NONE) do |https|
request = Net::HTTP::Delete.new "/random_strings/from_periodic_task"
https.request request
end
end
end
5 changes: 5 additions & 0 deletions spec/integration/rails-app-5.0/cron.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
version: 1
cron:
- name: "PeriodicTaskJob"
url: "/periodic_tasks"
schedule: "* * * * *"
19 changes: 19 additions & 0 deletions spec/integration/standard_scenarios_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,23 @@
expect(@rails_app).to have_deleted(random_string)
end
end

describe "periodic tasks", slow: true, deployed: true do
let(:random_string) { "from_periodic_task" }

it "processes the job triggered by cron.yaml" do
expect(@rails_app.fetch_random_strings).to_not include(random_string)
@rails_app.create_random_string(random_string)
expect(@rails_app.fetch_random_strings).to include(random_string)
begin
Timeout::timeout(65) do
while(@rails_app.fetch_random_strings.include?(random_string)) do
sleep 1
end
end
rescue Timeout::Error
fail "random string has not been deleted within 65 seconds"
end
end
end
end

0 comments on commit 0b1d4b8

Please sign in to comment.