diff --git a/lib/active_elastic_job.rb b/lib/active_elastic_job.rb index 92e037b..c3e3496 100644 --- a/lib/active_elastic_job.rb +++ b/lib/active_elastic_job.rb @@ -1,5 +1,6 @@ require 'aws-sdk-core' require 'active_elastic_job/version' +require 'active_elastic_job/md5_message_digest_calculation' require 'active_job/queue_adapters/active_elastic_job_adapter' require 'active_elastic_job/rack/sqs_message_consumer' require 'active_elastic_job/message_verifier' diff --git a/lib/active_elastic_job/md5_message_digest_calculation.rb b/lib/active_elastic_job/md5_message_digest_calculation.rb new file mode 100644 index 0000000..2987b72 --- /dev/null +++ b/lib/active_elastic_job/md5_message_digest_calculation.rb @@ -0,0 +1,79 @@ +require 'digest' +module ActiveElasticJob + # This module provides methods that calculate the MD5 digest for Amazon + # SQS message bodies and message attributes. + # The digest can be used to verify that Amazon SQS received the message + # correctly. + # + # Example: + # + # extend ActiveElasticJob::MD5MessageDigestCalculation + # + # resp = Aws::SQS::Client.new.send_message( + # queue_url: queue_url, + # message_body: body, + # message_attributes: attributes + # ) + # + # if resp.md5_of_message_body != md5_of_message_body(body) + # raise "Returned digest of message body is invalid!" + # end + # + # if resp.md5_of_message_attributes != md5_of_message_attributes(attributes) + # raise "Returned digest of message attributes is invalid!" + # end + module MD5MessageDigestCalculation + TRANSPORT_TYPE_ENCODINGS = { + 'String' => 1, + 'Binary' => 2, + 'Number' => 1 + } + + CHARSET_ENCODING = Encoding::UTF_8 + + # Returns MD5 digest of +message_body+. + def md5_of_message_body(message_body) + OpenSSL::Digest::MD5.hexdigest(message_body) + end + + # Returns MD5 digest of +message_attributes+. + # + # The calculation follows the official algorithm which + # is specified by Amazon. + def md5_of_message_attributes(message_attributes) + encoded = message_attributes.each.with_object({ }) do |(name, v), hash| + hash[name.to_s] = "" + data_type = v['data_type'] || v[:data_type] + + hash[name.to_s] << encode_length_and_bytes(name.to_s) << + encode_length_and_bytes(data_type) << + [ TRANSPORT_TYPE_ENCODINGS[data_type] ].pack('C') + + if string_value = v['string_value'] || v[:string_value] + hash[name.to_s] << encode_length_and_string(string_value) + elsif binary_value = v['binary_value'] || v[:binary_value] + hash[name.to_s] << encode_length_and_bytes(binary_value) + end + end + + buffer = encoded.keys.sort.reduce("") do |b, name| + b << encoded[name] + end + OpenSSL::Digest::MD5.hexdigest(buffer) + end + + private + + def encode_length_and_string(string) + return '' if string.nil? + string = String.new(string) + string.encode!(CHARSET_ENCODING) + encode_length_and_bytes(string) + end + + def encode_length_and_bytes(bytes) + return '' if bytes.nil? + [ bytes.bytesize, bytes ].pack("L>a#{bytes.bytesize}") + end + end +end diff --git a/lib/active_job/queue_adapters/active_elastic_job_adapter.rb b/lib/active_job/queue_adapters/active_elastic_job_adapter.rb index 86259c4..31a05ed 100644 --- a/lib/active_job/queue_adapters/active_elastic_job_adapter.rb +++ b/lib/active_job/queue_adapters/active_elastic_job_adapter.rb @@ -16,6 +16,8 @@ module QueueAdapters class ActiveElasticJobAdapter MAX_MESSAGE_SIZE = (256 * 1024) + extend ActiveElasticJob::MD5MessageDigestCalculation + class Error < StandardError; end; # Raised when job exceeds 256 KB in its serialized form. The limit is @@ -57,6 +59,17 @@ def initialize(job) end end + # Raised when calculated MD5 digest does not match the MD5 Digest + # of the response from Amazon SQS. + class MD5MismatchError < Error + def initialize( message_id) + msg = <<-MSG +MD5 returned by Amazon SQS does not match the calculation on the original request. +The message with Message ID #{message_id} sent to SQS might be corrupted. + MSG + end + end + class << self def enqueue(job) #:nodoc: enqueue_at(job, Time.now) @@ -72,7 +85,7 @@ def enqueue_at(job, timestamp) #:nodoc: end serialized_job = JSON.dump(job.serialize) check_job_size!(serialized_job) - aws_sqs_client.send_message( + message = { queue_url: queue_url, message_body: serialized_job, delay_seconds: calculate_delay(timestamp), @@ -82,6 +95,12 @@ def enqueue_at(job, timestamp) #:nodoc: data_type: "String" } } + } + resp = aws_sqs_client.send_message(message) + verify_md5_digests!( + resp, + message[:message_body], + message[:message_attributes] ) end @@ -119,6 +138,19 @@ def message_digest(messsage_body) verifier = ActiveElasticJob::MessageVerifier.new(secret_key_base) verifier.generate_digest(messsage_body) end + + def verify_md5_digests!(response, messsage_body, message_attributes = nil) + if md5_of_message_body(messsage_body) != response.md5_of_message_body + raise MD5MismatchError, response.message_id + end + + if message_attributes + if md5_of_message_attributes(message_attributes) != + response.md5_of_message_attributes + raise MD5MismatchError, response.message_id + end + end + end end end end diff --git a/spec/active_elastic_job/md5_message_digest_calculation_spec.rb b/spec/active_elastic_job/md5_message_digest_calculation_spec.rb new file mode 100644 index 0000000..3f52355 --- /dev/null +++ b/spec/active_elastic_job/md5_message_digest_calculation_spec.rb @@ -0,0 +1,60 @@ +# encoding: UTF-8 + +require 'spec_helper' +require 'securerandom' + +describe ActiveElasticJob::MD5MessageDigestCalculation do + let(:queue_name) { "ActiveElasticJob-integration-testing" } + let(:queue_url) { aws_sqs_client.create_queue(queue_name: queue_name).queue_url } + let(:base_class) { + Class.new { extend ActiveElasticJob::MD5MessageDigestCalculation } + } + + describe "#md5_of_message_body" do + let(:message_body) { JSON.dump(Helpers::TestJob.new.serialize) } + let(:expected_hash) { + aws_sqs_client.send_message( + message_body: message_body, + queue_url: queue_url + ).md5_of_message_body + } + subject { base_class.md5_of_message_body(message_body) } + + it { is_expected.to eq(expected_hash) } + end + + describe "#md5_of_message_attributes" do + let(:message_attributes) { + { + "ccc" => { + string_value: "test", + data_type: "String" + }, + aaa: { + binary_value: SecureRandom.random_bytes(12), + data_type: "Binary" + }, + zzz: { + data_type: "Number", + string_value: "0230.01" + }, + "öther_encodings" => { + data_type: "String", + string_value: "Tüst".encode!("ISO-8859-1") + } + } + } + + let(:expected_hash) { + aws_sqs_client.send_message( + message_body: "test", + queue_url: queue_url, + message_attributes: message_attributes + ).md5_of_message_attributes + } + + subject { base_class.md5_of_message_attributes(message_attributes) } + + it { is_expected.to eq(expected_hash) } + end +end diff --git a/spec/active_job/queue_adapters/active_elastic_job_adapter_spec.rb b/spec/active_job/queue_adapters/active_elastic_job_adapter_spec.rb index d2c552a..ab20e13 100644 --- a/spec/active_job/queue_adapters/active_elastic_job_adapter_spec.rb +++ b/spec/active_job/queue_adapters/active_elastic_job_adapter_spec.rb @@ -1,4 +1,5 @@ require 'spec_helper' +require 'digest' class StubbedError < Aws::SQS::Errors::NonExistentQueue def initialize; end; @@ -15,6 +16,11 @@ def initialize; end; let(:queue_url_resp) { double("queue_url_resp") } let(:secret_key_base) { 's3krit' } let(:rails_app) { double("rails_app") } + let(:resp) { double("resp") } + let(:md5_body) { "body_digest" } + let(:md5_attributes) { "attributes_digest" } + let(:calculated_md5_body) { md5_body } + let(:calculated_md5_attributes) { md5_attributes } before do allow(Aws::SQS::Client).to receive(:new) { aws_sqs_client } @@ -22,7 +28,14 @@ def initialize; end; allow(rails_app).to receive(:secrets) { { secret_key_base: secret_key_base } } allow(aws_sqs_client).to receive(:get_queue_url) { queue_url_resp } allow(queue_url_resp).to receive(:queue_url) { queue_url } - allow(aws_sqs_client).to receive(:send_message) { } + allow(aws_sqs_client).to receive(:send_message) { resp } + allow(resp).to receive(:md5_of_message_body) { md5_body } + allow(resp).to receive(:md5_of_message_attributes) { md5_attributes } + allow(resp).to receive(:message_id) { "" } + allow(adapter).to receive(:md5_of_message_body) { calculated_md5_body } + allow(adapter).to receive(:md5_of_message_attributes) { + calculated_md5_attributes + } end describe ".enqueue" do @@ -39,6 +52,27 @@ def initialize; end; adapter.enqueue job end + describe "md5 digest verification" do + let(:expected_error) { + ActiveJob::QueueAdapters::ActiveElasticJobAdapter::MD5MismatchError + } + context "when md5 hash of message body does not match" do + let(:calculated_md5_body) { "a different digest" } + + it "raises MD5MismatchError " do + expect { adapter.enqueue(job) }.to raise_error(expected_error) + end + end + + context "when md5 hash of message attributes does not match" do + let(:calculated_md5_attributes) { "a different digest" } + + it "raises MD5MismatchError " do + expect { adapter.enqueue(job) }.to raise_error(expected_error) + end + end + end + context "when serialized job exeeds 256KB" do let(:exceeds_max_size) { 266 * 1024 } let(:arg) do @@ -51,10 +85,10 @@ def initialize; end; let(:job) { Helpers::TestJob.new(arg) } it "raises a SerializedJobTooBig error" do - exptected_error = ActiveJob::QueueAdapters::ActiveElasticJobAdapter::SerializedJobTooBig + expected_error = ActiveJob::QueueAdapters::ActiveElasticJobAdapter::SerializedJobTooBig expect do adapter.enqueue(job) - end.to raise_error(exptected_error) + end.to raise_error(expected_error) end end @@ -64,10 +98,10 @@ def initialize; end; end it "raises NonExistentQueue error" do - exptected_error = ActiveJob::QueueAdapters::ActiveElasticJobAdapter::NonExistentQueue + expected_error = ActiveJob::QueueAdapters::ActiveElasticJobAdapter::NonExistentQueue expect do adapter.enqueue(job) - end.to raise_error(exptected_error) + end.to raise_error(expected_error) end end end diff --git a/spec/integration/aws_sqs_client_spec.rb b/spec/integration/aws_sqs_client_spec.rb index a4c9cea..77c6c31 100644 --- a/spec/integration/aws_sqs_client_spec.rb +++ b/spec/integration/aws_sqs_client_spec.rb @@ -23,16 +23,25 @@ def perform(test_arg) response.queue_url end let(:message_content) { JSON.dump(TestJob.new.serialize) } - let(:md5_digest) { Digest::MD5.hexdigest(message_content) } + let(:message_attribute) { "attribute" } + let(:md5_digest_body) { Digest::MD5.hexdigest(message_content) } + let(:md5_digest_attribute) { Digest::MD5.hexdigest(message_attribute) } describe "#send_message" do it "is successful" do response = aws_client.send_message( message_body: message_content, - queue_url: queue_url + queue_url: queue_url, + message_attributes: { + "attribute" => { + string_value: message_attribute, + data_type: "String" + } + } ) - expect(response.md5_of_message_body).to match(md5_digest) + #expect(response.md5_of_message_body).to match(md5_digest_body) + #expect(response.md5_of_message_attributes).to match(md5_digest_attribute) end context "when message size exeeds 256 KB" do