Skip to content

Commit

Permalink
Refactor middleware
Browse files Browse the repository at this point in the history
  • Loading branch information
tawan committed Nov 19, 2016
1 parent 0d9132d commit 013b32d
Showing 1 changed file with 32 additions and 35 deletions.
67 changes: 32 additions & 35 deletions lib/active_elastic_job/rack/sqs_message_consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ module Rack
# environment, which verifies the digest, have to use the *same*
# +secrets.secret_key_base+ setting.
class SqsMessageConsumer
USER_AGENT_PREFIX = 'aws-sqsd'.freeze
DIGEST_HEADER_NAME = 'HTTP_X_AWS_SQSD_ATTR_MESSAGE_DIGEST'.freeze
ORIGIN_HEADER_NAME = 'HTTP_X_AWS_SQSD_ATTR_ORIGIN'.freeze
CONTENT_TYPE = 'application/json'.freeze
CONTENT_TYPE_HEADER_NAME = 'Content-Type'.freeze
OK_RESPONSE_CODE = '200'.freeze
INSIDE_DOCKER_CONTAINER = `[ -f /proc/1/cgroup ] && cat /proc/1/cgroup` =~ /docker/
DOCKER_HOST_IP = "172.17.0.1".freeze
PERIODIC_TASK_PATH = "/periodic_tasks".freeze
OK_RESPONSE = [ '200'.freeze, { 'Content-Type'.freeze => 'text/plain'.freeze }, [ 'OK'.freeze ] ]
FORBIDDEN_RESPONSE = [
'403'.freeze,
{ 'Content-Type'.freeze => 'text/plain'.freeze },
[ 'Request forbidden!'.freeze ]
]
DOCKER_HOST_IP = '172.17.0.1'.freeze
PERIODIC_TASKS_PATH = '/periodic_tasks'.freeze

def initialize(app) #:nodoc:
@app = app
Expand All @@ -37,31 +36,19 @@ def call(env) #:nodoc:
request = ActionDispatch::Request.new env
if enabled? && aws_sqsd?(request)
unless request.local? || sent_from_docker_host?(request)
m = "Accepts only requests from localhost for job processing".freeze
return ['403', {CONTENT_TYPE_HEADER_NAME => 'text/plain' }, [ m ]]
return FORBIDDEN_RESPONSE
end

if periodic_task?(request)
execute_periodic_task(request)
return [
OK_RESPONSE_CODE ,
{CONTENT_TYPE_HEADER_NAME => CONTENT_TYPE },
[ '' ]]
return OK_RESPONSE
elsif originates_from_gem?(request)
begin
verify!(request)
job = JSON.load(request.body)
ActiveJob::Base.execute(job)
execute_job(request)
rescue ActiveElasticJob::MessageVerifier::InvalidDigest => e
return [
'403',
{CONTENT_TYPE_HEADER_NAME => 'text/plain' },
["Incorrect digest! Please, make sure that both environments, worker and web, use the same SECRET_KEY_BASE setting."]]
return FORBIDDEN_RESPONSE
end
return [
OK_RESPONSE_CODE ,
{CONTENT_TYPE_HEADER_NAME => CONTENT_TYPE },
[ '' ]]
return OK_RESPONSE
end
end
@app.call(env)
Expand All @@ -77,26 +64,32 @@ def enabled?
def verify!(request)
secret_key_base = Rails.application.secrets[:secret_key_base]
@verifier ||= ActiveElasticJob::MessageVerifier.new(secret_key_base)
digest = request.headers[DIGEST_HEADER_NAME]
digest = request.headers['HTTP_X_AWS_SQSD_ATTR_MESSAGE_DIGEST'.freeze]
message = request.body_stream.read
request.body_stream.rewind
@verifier.verify!(message, digest)
end

def aws_sqsd?(request)
# we do not match against a Regexp
# Does not match against a Regexp
# in order to avoid performance penalties.
# Instead we make a simple string comparison.
# Instead performs a simple string comparison.
# Benchmark runs showed an performance increase of
# up to 40%
current_user_agent = request.headers['User-Agent'.freeze]
return (current_user_agent.present? &&
current_user_agent.size >= USER_AGENT_PREFIX.size &&
current_user_agent[0..(USER_AGENT_PREFIX.size - 1)] == USER_AGENT_PREFIX)
current_user_agent.size >= 'aws-sqsd'.freeze.size &&
current_user_agent[0..('aws-sqsd'.freeze.size - 1)] == 'aws-sqsd'.freeze)
end

def periodic_task?(request)
!request.fullpath.nil? && request.fullpath[0..(PERIODIC_TASK_PATH.size - 1)] == PERIODIC_TASK_PATH
!request.fullpath.nil? && request.fullpath[0..(PERIODIC_TASKS_PATH.size - 1)] == PERIODIC_TASKS_PATH
end

def execute_job(request)
verify!(request)
job = JSON.load(request.body)
ActiveJob::Base.execute(job)
end

def execute_periodic_task(request)
Expand All @@ -106,17 +99,21 @@ def execute_periodic_task(request)
end

def originates_from_gem?(request)
if request.headers[ORIGIN_HEADER_NAME] == ActiveElasticJob::ACRONYM
if request.headers['HTTP_X_AWS_SQSD_ATTR_ORIGIN'.freeze] == ActiveElasticJob::ACRONYM
return true
elsif request.headers[DIGEST_HEADER_NAME] != nil
elsif request.headers['HTTP_X_AWS_SQSD_ATTR_MESSAGE_DIGEST'.freeze] != nil
return true
else
return false
end
end

def sent_from_docker_host?(request)
INSIDE_DOCKER_CONTAINER && request.remote_ip == DOCKER_HOST_IP
app_runs_in_docker_container? && request.remote_ip == DOCKER_HOST_IP
end

def app_runs_in_docker_container?
@app_in_docker_container ||= `[ -f /proc/1/cgroup ] && cat /proc/1/cgroup` =~ /docker/
end
end
end
Expand Down

0 comments on commit 013b32d

Please sign in to comment.