From 0b1d4b8551b782a3d486627d8f0b8cb09da8d14e Mon Sep 17 00:00:00 2001 From: Tawan Sierek Date: Wed, 16 Nov 2016 20:44:06 +0100 Subject: [PATCH] Add support for Elastic Beanstalk's Periodic Tasks Intercept messages which have been enqueued by the EB worker environment itself due to a present cron.yaml file. Further information on Elastic Beanstalk's Periodic task feature is available in Amazon's official documentation: http://docs.aws.amazon.com/elasticbeanstalk/latest/dg/using-features-managing-env-tiers.html#worker-periodictasks Close #40 --- .../rack/sqs_message_consumer.rb | 60 +++++++++++++------ .../app/jobs/periodic_task_job.rb | 13 ++++ spec/integration/rails-app-4.2/cron.yaml | 5 ++ .../app/jobs/periodic_task_job.rb | 13 ++++ spec/integration/rails-app-5.0/cron.yaml | 5 ++ spec/integration/standard_scenarios_spec.rb | 19 ++++++ 6 files changed, 98 insertions(+), 17 deletions(-) create mode 100644 spec/integration/rails-app-4.2/app/jobs/periodic_task_job.rb create mode 100644 spec/integration/rails-app-4.2/cron.yaml create mode 100644 spec/integration/rails-app-5.0/app/jobs/periodic_task_job.rb create mode 100644 spec/integration/rails-app-5.0/cron.yaml diff --git a/lib/active_elastic_job/rack/sqs_message_consumer.rb b/lib/active_elastic_job/rack/sqs_message_consumer.rb index cf6456b..76b6394 100644 --- a/lib/active_elastic_job/rack/sqs_message_consumer.rb +++ b/lib/active_elastic_job/rack/sqs_message_consumer.rb @@ -5,7 +5,13 @@ module Rack # This middleware intercepts requests which are sent by the SQS daemon # running in {Amazon Elastic Beanstalk worker environments}[http://docs.aws.amazon.com/elasticbeanstalk/latest/dg/using-features-managing-env-tiers.html]. # It does this by looking at the +User-Agent+ header. - # Furthermore, it verifies the digest which is sent along with a legit SQS + # Requesets from the SQS daemon are handled in two alternative cases: + # + # (1) the processed SQS message was orignally triggered by a periodic task + # supported by Elastic Beanstalk's Periodic Task feature + # + # (2) the processed SQS message was queued by this gem representing an active job. + # In this case it verifies the digest which is sent along with a legit SQS # message, and passed as an HTTP header in the resulting request. # The digest is based on Rails' +secrets.secret_key_base+. # Therefore, the application running in the web environment, which generates @@ -21,6 +27,7 @@ class SqsMessageConsumer OK_RESPONSE_CODE = '200'.freeze INSIDE_DOCKER_CONTAINER = `[ -f /proc/1/cgroup ] && cat /proc/1/cgroup` =~ /docker/ DOCKER_HOST_IP = "172.17.0.1".freeze + PERIODIC_TASK_PATH = "/periodic_tasks".freeze def initialize(app) #:nodoc: @app = app @@ -28,26 +35,35 @@ def initialize(app) #:nodoc: def call(env) #:nodoc: request = ActionDispatch::Request.new env - if enabled? && aws_sqsd?(request) && originates_from_gem?(request) + if enabled? && aws_sqsd?(request) unless request.local? || sent_from_docker_host?(request) m = "Accepts only requests from localhost for job processing".freeze return ['403', {CONTENT_TYPE_HEADER_NAME => 'text/plain' }, [ m ]] end - begin - verify!(request) - job = JSON.load(request.body) - ActiveJob::Base.execute(job) - rescue ActiveElasticJob::MessageVerifier::InvalidDigest => e - return [ - '403', - {CONTENT_TYPE_HEADER_NAME => 'text/plain' }, - ["Incorrect digest! Please, make sure that both environments, worker and web, use the same SECRET_KEY_BASE setting."]] - end - return [ - OK_RESPONSE_CODE , - {CONTENT_TYPE_HEADER_NAME => CONTENT_TYPE }, - [ '' ]] - end + + if periodic_task?(request) + execute_periodic_task(request) + return [ + OK_RESPONSE_CODE , + {CONTENT_TYPE_HEADER_NAME => CONTENT_TYPE }, + [ '' ]] + elsif originates_from_gem?(request) + begin + verify!(request) + job = JSON.load(request.body) + ActiveJob::Base.execute(job) + rescue ActiveElasticJob::MessageVerifier::InvalidDigest => e + return [ + '403', + {CONTENT_TYPE_HEADER_NAME => 'text/plain' }, + ["Incorrect digest! Please, make sure that both environments, worker and web, use the same SECRET_KEY_BASE setting."]] + end + return [ + OK_RESPONSE_CODE , + {CONTENT_TYPE_HEADER_NAME => CONTENT_TYPE }, + [ '' ]] + end + end @app.call(env) end @@ -79,6 +95,16 @@ def aws_sqsd?(request) current_user_agent[0..(USER_AGENT_PREFIX.size - 1)] == USER_AGENT_PREFIX) end + def periodic_task?(request) + !request.fullpath.nil? && request.fullpath[0..(PERIODIC_TASK_PATH.size - 1)] == PERIODIC_TASK_PATH + end + + def execute_periodic_task(request) + job_name = request.headers['X-Aws-Sqsd-Taskname'] + job = job_name.constantize.new + job.perform_now + end + def originates_from_gem?(request) if request.headers[ORIGIN_HEADER_NAME] == ActiveElasticJob::ACRONYM return true diff --git a/spec/integration/rails-app-4.2/app/jobs/periodic_task_job.rb b/spec/integration/rails-app-4.2/app/jobs/periodic_task_job.rb new file mode 100644 index 0000000..4b56f14 --- /dev/null +++ b/spec/integration/rails-app-4.2/app/jobs/periodic_task_job.rb @@ -0,0 +1,13 @@ +require 'net/http' + +class PeriodicTaskJob < ActiveJob::Base + WEB_ENV_HOST = ENV['WEB_ENV_HOST'] + WEB_ENV_PORT = ENV['WEB_ENV_PORT'] + + def perform + Net::HTTP.start(WEB_ENV_HOST, WEB_ENV_PORT, use_ssl: true, verify_mode: OpenSSL::SSL::VERIFY_NONE) do |https| + request = Net::HTTP::Delete.new "/random_strings/from_periodic_task" + https.request request + end + end +end diff --git a/spec/integration/rails-app-4.2/cron.yaml b/spec/integration/rails-app-4.2/cron.yaml new file mode 100644 index 0000000..7f96f4e --- /dev/null +++ b/spec/integration/rails-app-4.2/cron.yaml @@ -0,0 +1,5 @@ +version: 1 +cron: + - name: "PeriodicTaskJob" + url: "/periodic_tasks" + schedule: "* * * * *" diff --git a/spec/integration/rails-app-5.0/app/jobs/periodic_task_job.rb b/spec/integration/rails-app-5.0/app/jobs/periodic_task_job.rb new file mode 100644 index 0000000..4b56f14 --- /dev/null +++ b/spec/integration/rails-app-5.0/app/jobs/periodic_task_job.rb @@ -0,0 +1,13 @@ +require 'net/http' + +class PeriodicTaskJob < ActiveJob::Base + WEB_ENV_HOST = ENV['WEB_ENV_HOST'] + WEB_ENV_PORT = ENV['WEB_ENV_PORT'] + + def perform + Net::HTTP.start(WEB_ENV_HOST, WEB_ENV_PORT, use_ssl: true, verify_mode: OpenSSL::SSL::VERIFY_NONE) do |https| + request = Net::HTTP::Delete.new "/random_strings/from_periodic_task" + https.request request + end + end +end diff --git a/spec/integration/rails-app-5.0/cron.yaml b/spec/integration/rails-app-5.0/cron.yaml new file mode 100644 index 0000000..7f96f4e --- /dev/null +++ b/spec/integration/rails-app-5.0/cron.yaml @@ -0,0 +1,5 @@ +version: 1 +cron: + - name: "PeriodicTaskJob" + url: "/periodic_tasks" + schedule: "* * * * *" diff --git a/spec/integration/standard_scenarios_spec.rb b/spec/integration/standard_scenarios_spec.rb index 0cd033c..89d8d16 100644 --- a/spec/integration/standard_scenarios_spec.rb +++ b/spec/integration/standard_scenarios_spec.rb @@ -34,4 +34,23 @@ expect(@rails_app).to have_deleted(random_string) end end + + describe "periodic tasks", slow: true, deployed: true do + let(:random_string) { "from_periodic_task" } + + it "processes the job triggered by cron.yaml" do + expect(@rails_app.fetch_random_strings).to_not include(random_string) + @rails_app.create_random_string(random_string) + expect(@rails_app.fetch_random_strings).to include(random_string) + begin + Timeout::timeout(65) do + while(@rails_app.fetch_random_strings.include?(random_string)) do + sleep 1 + end + end + rescue Timeout::Error + fail "random string has not been deleted within 65 seconds" + end + end + end end