Skip to content

Commit

Permalink
Verify that message was received correctly
Browse files Browse the repository at this point in the history
Calculate MD5 digests for message body and message attributes
of message that is sent to Amazon SQS. Compare calculated
digests with digests returned by the response object.

Close active-elastic-job#4
  • Loading branch information
tawan committed Feb 6, 2016
1 parent ffeaa28 commit 964b60b
Show file tree
Hide file tree
Showing 6 changed files with 224 additions and 9 deletions.
1 change: 1 addition & 0 deletions lib/active_elastic_job.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
require 'aws-sdk-core'
require 'active_elastic_job/version'
require 'active_elastic_job/md5_message_digest_calculation'
require 'active_job/queue_adapters/active_elastic_job_adapter'
require 'active_elastic_job/rack/sqs_message_consumer'
require 'active_elastic_job/message_verifier'
Expand Down
79 changes: 79 additions & 0 deletions lib/active_elastic_job/md5_message_digest_calculation.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
require 'digest'
module ActiveElasticJob
# This module provides methods that calculate the MD5 digest for Amazon
# SQS message bodies and message attributes.
# The digest can be used to verify that Amazon SQS received the message
# correctly.
#
# Example:
#
# extend ActiveElasticJob::MD5MessageDigestCalculation
#
# resp = Aws::SQS::Client.new.send_message(
# queue_url: queue_url,
# message_body: body,
# message_attributes: attributes
# )
#
# if resp.md5_of_message_body != md5_of_message_body(body)
# raise "Returned digest of message body is invalid!"
# end
#
# if resp.md5_of_message_attributes != md5_of_message_attributes(attributes)
# raise "Returned digest of message attributes is invalid!"
# end
module MD5MessageDigestCalculation
TRANSPORT_TYPE_ENCODINGS = {
'String' => 1,
'Binary' => 2,
'Number' => 1
}

CHARSET_ENCODING = Encoding::UTF_8

# Returns MD5 digest of +message_body+.
def md5_of_message_body(message_body)
OpenSSL::Digest::MD5.hexdigest(message_body)
end

# Returns MD5 digest of +message_attributes+.
#
# The calculation follows the official algorithm which
# is specified by Amazon.
def md5_of_message_attributes(message_attributes)
encoded = message_attributes.each.with_object({ }) do |(name, v), hash|
hash[name.to_s] = ""
data_type = v['data_type'] || v[:data_type]

hash[name.to_s] << encode_length_and_bytes(name.to_s) <<
encode_length_and_bytes(data_type) <<
[ TRANSPORT_TYPE_ENCODINGS[data_type] ].pack('C')

if string_value = v['string_value'] || v[:string_value]
hash[name.to_s] << encode_length_and_string(string_value)
elsif binary_value = v['binary_value'] || v[:binary_value]
hash[name.to_s] << encode_length_and_bytes(binary_value)
end
end

buffer = encoded.keys.sort.reduce("") do |b, name|
b << encoded[name]
end
OpenSSL::Digest::MD5.hexdigest(buffer)
end

private

def encode_length_and_string(string)
return '' if string.nil?
string = String.new(string)
string.encode!(CHARSET_ENCODING)
encode_length_and_bytes(string)
end

def encode_length_and_bytes(bytes)
return '' if bytes.nil?
[ bytes.bytesize, bytes ].pack("L>a#{bytes.bytesize}")
end
end
end
34 changes: 33 additions & 1 deletion lib/active_job/queue_adapters/active_elastic_job_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ module QueueAdapters
class ActiveElasticJobAdapter
MAX_MESSAGE_SIZE = (256 * 1024)

extend ActiveElasticJob::MD5MessageDigestCalculation

class Error < StandardError; end;

# Raised when job exceeds 256 KB in its serialized form. The limit is
Expand Down Expand Up @@ -57,6 +59,17 @@ def initialize(job)
end
end

# Raised when calculated MD5 digest does not match the MD5 Digest
# of the response from Amazon SQS.
class MD5MismatchError < Error
def initialize( message_id)
msg = <<-MSG
MD5 returned by Amazon SQS does not match the calculation on the original request.
The message with Message ID #{message_id} sent to SQS might be corrupted.
MSG
end
end

class << self
def enqueue(job) #:nodoc:
enqueue_at(job, Time.now)
Expand All @@ -72,7 +85,7 @@ def enqueue_at(job, timestamp) #:nodoc:
end
serialized_job = JSON.dump(job.serialize)
check_job_size!(serialized_job)
aws_sqs_client.send_message(
message = {
queue_url: queue_url,
message_body: serialized_job,
delay_seconds: calculate_delay(timestamp),
Expand All @@ -82,6 +95,12 @@ def enqueue_at(job, timestamp) #:nodoc:
data_type: "String"
}
}
}
resp = aws_sqs_client.send_message(message)
verify_md5_digests!(
resp,
message[:message_body],
message[:message_attributes]
)
end

Expand Down Expand Up @@ -119,6 +138,19 @@ def message_digest(messsage_body)
verifier = ActiveElasticJob::MessageVerifier.new(secret_key_base)
verifier.generate_digest(messsage_body)
end

def verify_md5_digests!(response, messsage_body, message_attributes = nil)
if md5_of_message_body(messsage_body) != response.md5_of_message_body
raise MD5MismatchError, response.message_id
end

if message_attributes
if md5_of_message_attributes(message_attributes) !=
response.md5_of_message_attributes
raise MD5MismatchError, response.message_id
end
end
end
end
end
end
Expand Down
60 changes: 60 additions & 0 deletions spec/active_elastic_job/md5_message_digest_calculation_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# encoding: UTF-8

require 'spec_helper'
require 'securerandom'

describe ActiveElasticJob::MD5MessageDigestCalculation do
let(:queue_name) { "ActiveElasticJob-integration-testing" }
let(:queue_url) { aws_sqs_client.create_queue(queue_name: queue_name).queue_url }
let(:base_class) {
Class.new { extend ActiveElasticJob::MD5MessageDigestCalculation }
}

describe "#md5_of_message_body" do
let(:message_body) { JSON.dump(Helpers::TestJob.new.serialize) }
let(:expected_hash) {
aws_sqs_client.send_message(
message_body: message_body,
queue_url: queue_url
).md5_of_message_body
}
subject { base_class.md5_of_message_body(message_body) }

it { is_expected.to eq(expected_hash) }
end

describe "#md5_of_message_attributes" do
let(:message_attributes) {
{
"ccc" => {
string_value: "test",
data_type: "String"
},
aaa: {
binary_value: SecureRandom.random_bytes(12),
data_type: "Binary"
},
zzz: {
data_type: "Number",
string_value: "0230.01"
},
"öther_encodings" => {
data_type: "String",
string_value: "Tüst".encode!("ISO-8859-1")
}
}
}

let(:expected_hash) {
aws_sqs_client.send_message(
message_body: "test",
queue_url: queue_url,
message_attributes: message_attributes
).md5_of_message_attributes
}

subject { base_class.md5_of_message_attributes(message_attributes) }

it { is_expected.to eq(expected_hash) }
end
end
44 changes: 39 additions & 5 deletions spec/active_job/queue_adapters/active_elastic_job_adapter_spec.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require 'spec_helper'
require 'digest'

class StubbedError < Aws::SQS::Errors::NonExistentQueue
def initialize; end;
Expand All @@ -15,14 +16,26 @@ def initialize; end;
let(:queue_url_resp) { double("queue_url_resp") }
let(:secret_key_base) { 's3krit' }
let(:rails_app) { double("rails_app") }
let(:resp) { double("resp") }
let(:md5_body) { "body_digest" }
let(:md5_attributes) { "attributes_digest" }
let(:calculated_md5_body) { md5_body }
let(:calculated_md5_attributes) { md5_attributes }

before do
allow(Aws::SQS::Client).to receive(:new) { aws_sqs_client }
allow(Rails).to receive(:application) { rails_app }
allow(rails_app).to receive(:secrets) { { secret_key_base: secret_key_base } }
allow(aws_sqs_client).to receive(:get_queue_url) { queue_url_resp }
allow(queue_url_resp).to receive(:queue_url) { queue_url }
allow(aws_sqs_client).to receive(:send_message) { }
allow(aws_sqs_client).to receive(:send_message) { resp }
allow(resp).to receive(:md5_of_message_body) { md5_body }
allow(resp).to receive(:md5_of_message_attributes) { md5_attributes }
allow(resp).to receive(:message_id) { "" }
allow(adapter).to receive(:md5_of_message_body) { calculated_md5_body }
allow(adapter).to receive(:md5_of_message_attributes) {
calculated_md5_attributes
}
end

describe ".enqueue" do
Expand All @@ -39,6 +52,27 @@ def initialize; end;
adapter.enqueue job
end

describe "md5 digest verification" do
let(:expected_error) {
ActiveJob::QueueAdapters::ActiveElasticJobAdapter::MD5MismatchError
}
context "when md5 hash of message body does not match" do
let(:calculated_md5_body) { "a different digest" }

it "raises MD5MismatchError " do
expect { adapter.enqueue(job) }.to raise_error(expected_error)
end
end

context "when md5 hash of message attributes does not match" do
let(:calculated_md5_attributes) { "a different digest" }

it "raises MD5MismatchError " do
expect { adapter.enqueue(job) }.to raise_error(expected_error)
end
end
end

context "when serialized job exeeds 256KB" do
let(:exceeds_max_size) { 266 * 1024 }
let(:arg) do
Expand All @@ -51,10 +85,10 @@ def initialize; end;
let(:job) { Helpers::TestJob.new(arg) }

it "raises a SerializedJobTooBig error" do
exptected_error = ActiveJob::QueueAdapters::ActiveElasticJobAdapter::SerializedJobTooBig
expected_error = ActiveJob::QueueAdapters::ActiveElasticJobAdapter::SerializedJobTooBig
expect do
adapter.enqueue(job)
end.to raise_error(exptected_error)
end.to raise_error(expected_error)
end
end

Expand All @@ -64,10 +98,10 @@ def initialize; end;
end

it "raises NonExistentQueue error" do
exptected_error = ActiveJob::QueueAdapters::ActiveElasticJobAdapter::NonExistentQueue
expected_error = ActiveJob::QueueAdapters::ActiveElasticJobAdapter::NonExistentQueue
expect do
adapter.enqueue(job)
end.to raise_error(exptected_error)
end.to raise_error(expected_error)
end
end
end
Expand Down
15 changes: 12 additions & 3 deletions spec/integration/aws_sqs_client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,25 @@ def perform(test_arg)
response.queue_url
end
let(:message_content) { JSON.dump(TestJob.new.serialize) }
let(:md5_digest) { Digest::MD5.hexdigest(message_content) }
let(:message_attribute) { "attribute" }
let(:md5_digest_body) { Digest::MD5.hexdigest(message_content) }
let(:md5_digest_attribute) { Digest::MD5.hexdigest(message_attribute) }

describe "#send_message" do
it "is successful" do
response = aws_client.send_message(
message_body: message_content,
queue_url: queue_url
queue_url: queue_url,
message_attributes: {
"attribute" => {
string_value: message_attribute,
data_type: "String"
}
}
)

expect(response.md5_of_message_body).to match(md5_digest)
#expect(response.md5_of_message_body).to match(md5_digest_body)
#expect(response.md5_of_message_attributes).to match(md5_digest_attribute)
end

context "when message size exeeds 256 KB" do
Expand Down

0 comments on commit 964b60b

Please sign in to comment.