From 964b60bb8a7185bba2ce2911dbb64c0b6c6ef5dd Mon Sep 17 00:00:00 2001 From: Tawan Sierek Date: Fri, 5 Feb 2016 11:47:58 +0100 Subject: [PATCH] Verify that message was received correctly Calculate MD5 digests for message body and message attributes of message that is sent to Amazon SQS. Compare calculated digests with digests returned by the response object. Close #4 --- lib/active_elastic_job.rb | 1 + .../md5_message_digest_calculation.rb | 79 +++++++++++++++++++ .../active_elastic_job_adapter.rb | 34 +++++++- .../md5_message_digest_calculation_spec.rb | 60 ++++++++++++++ .../active_elastic_job_adapter_spec.rb | 44 +++++++++-- spec/integration/aws_sqs_client_spec.rb | 15 +++- 6 files changed, 224 insertions(+), 9 deletions(-) create mode 100644 lib/active_elastic_job/md5_message_digest_calculation.rb create mode 100644 spec/active_elastic_job/md5_message_digest_calculation_spec.rb diff --git a/lib/active_elastic_job.rb b/lib/active_elastic_job.rb index 92e037b..c3e3496 100644 --- a/lib/active_elastic_job.rb +++ b/lib/active_elastic_job.rb @@ -1,5 +1,6 @@ require 'aws-sdk-core' require 'active_elastic_job/version' +require 'active_elastic_job/md5_message_digest_calculation' require 'active_job/queue_adapters/active_elastic_job_adapter' require 'active_elastic_job/rack/sqs_message_consumer' require 'active_elastic_job/message_verifier' diff --git a/lib/active_elastic_job/md5_message_digest_calculation.rb b/lib/active_elastic_job/md5_message_digest_calculation.rb new file mode 100644 index 0000000..2987b72 --- /dev/null +++ b/lib/active_elastic_job/md5_message_digest_calculation.rb @@ -0,0 +1,79 @@ +require 'digest' +module ActiveElasticJob + # This module provides methods that calculate the MD5 digest for Amazon + # SQS message bodies and message attributes. + # The digest can be used to verify that Amazon SQS received the message + # correctly. + # + # Example: + # + # extend ActiveElasticJob::MD5MessageDigestCalculation + # + # resp = Aws::SQS::Client.new.send_message( + # queue_url: queue_url, + # message_body: body, + # message_attributes: attributes + # ) + # + # if resp.md5_of_message_body != md5_of_message_body(body) + # raise "Returned digest of message body is invalid!" + # end + # + # if resp.md5_of_message_attributes != md5_of_message_attributes(attributes) + # raise "Returned digest of message attributes is invalid!" + # end + module MD5MessageDigestCalculation + TRANSPORT_TYPE_ENCODINGS = { + 'String' => 1, + 'Binary' => 2, + 'Number' => 1 + } + + CHARSET_ENCODING = Encoding::UTF_8 + + # Returns MD5 digest of +message_body+. + def md5_of_message_body(message_body) + OpenSSL::Digest::MD5.hexdigest(message_body) + end + + # Returns MD5 digest of +message_attributes+. + # + # The calculation follows the official algorithm which + # is specified by Amazon. + def md5_of_message_attributes(message_attributes) + encoded = message_attributes.each.with_object({ }) do |(name, v), hash| + hash[name.to_s] = "" + data_type = v['data_type'] || v[:data_type] + + hash[name.to_s] << encode_length_and_bytes(name.to_s) << + encode_length_and_bytes(data_type) << + [ TRANSPORT_TYPE_ENCODINGS[data_type] ].pack('C') + + if string_value = v['string_value'] || v[:string_value] + hash[name.to_s] << encode_length_and_string(string_value) + elsif binary_value = v['binary_value'] || v[:binary_value] + hash[name.to_s] << encode_length_and_bytes(binary_value) + end + end + + buffer = encoded.keys.sort.reduce("") do |b, name| + b << encoded[name] + end + OpenSSL::Digest::MD5.hexdigest(buffer) + end + + private + + def encode_length_and_string(string) + return '' if string.nil? + string = String.new(string) + string.encode!(CHARSET_ENCODING) + encode_length_and_bytes(string) + end + + def encode_length_and_bytes(bytes) + return '' if bytes.nil? + [ bytes.bytesize, bytes ].pack("L>a#{bytes.bytesize}") + end + end +end diff --git a/lib/active_job/queue_adapters/active_elastic_job_adapter.rb b/lib/active_job/queue_adapters/active_elastic_job_adapter.rb index 86259c4..31a05ed 100644 --- a/lib/active_job/queue_adapters/active_elastic_job_adapter.rb +++ b/lib/active_job/queue_adapters/active_elastic_job_adapter.rb @@ -16,6 +16,8 @@ module QueueAdapters class ActiveElasticJobAdapter MAX_MESSAGE_SIZE = (256 * 1024) + extend ActiveElasticJob::MD5MessageDigestCalculation + class Error < StandardError; end; # Raised when job exceeds 256 KB in its serialized form. The limit is @@ -57,6 +59,17 @@ def initialize(job) end end + # Raised when calculated MD5 digest does not match the MD5 Digest + # of the response from Amazon SQS. + class MD5MismatchError < Error + def initialize( message_id) + msg = <<-MSG +MD5 returned by Amazon SQS does not match the calculation on the original request. +The message with Message ID #{message_id} sent to SQS might be corrupted. + MSG + end + end + class << self def enqueue(job) #:nodoc: enqueue_at(job, Time.now) @@ -72,7 +85,7 @@ def enqueue_at(job, timestamp) #:nodoc: end serialized_job = JSON.dump(job.serialize) check_job_size!(serialized_job) - aws_sqs_client.send_message( + message = { queue_url: queue_url, message_body: serialized_job, delay_seconds: calculate_delay(timestamp), @@ -82,6 +95,12 @@ def enqueue_at(job, timestamp) #:nodoc: data_type: "String" } } + } + resp = aws_sqs_client.send_message(message) + verify_md5_digests!( + resp, + message[:message_body], + message[:message_attributes] ) end @@ -119,6 +138,19 @@ def message_digest(messsage_body) verifier = ActiveElasticJob::MessageVerifier.new(secret_key_base) verifier.generate_digest(messsage_body) end + + def verify_md5_digests!(response, messsage_body, message_attributes = nil) + if md5_of_message_body(messsage_body) != response.md5_of_message_body + raise MD5MismatchError, response.message_id + end + + if message_attributes + if md5_of_message_attributes(message_attributes) != + response.md5_of_message_attributes + raise MD5MismatchError, response.message_id + end + end + end end end end diff --git a/spec/active_elastic_job/md5_message_digest_calculation_spec.rb b/spec/active_elastic_job/md5_message_digest_calculation_spec.rb new file mode 100644 index 0000000..3f52355 --- /dev/null +++ b/spec/active_elastic_job/md5_message_digest_calculation_spec.rb @@ -0,0 +1,60 @@ +# encoding: UTF-8 + +require 'spec_helper' +require 'securerandom' + +describe ActiveElasticJob::MD5MessageDigestCalculation do + let(:queue_name) { "ActiveElasticJob-integration-testing" } + let(:queue_url) { aws_sqs_client.create_queue(queue_name: queue_name).queue_url } + let(:base_class) { + Class.new { extend ActiveElasticJob::MD5MessageDigestCalculation } + } + + describe "#md5_of_message_body" do + let(:message_body) { JSON.dump(Helpers::TestJob.new.serialize) } + let(:expected_hash) { + aws_sqs_client.send_message( + message_body: message_body, + queue_url: queue_url + ).md5_of_message_body + } + subject { base_class.md5_of_message_body(message_body) } + + it { is_expected.to eq(expected_hash) } + end + + describe "#md5_of_message_attributes" do + let(:message_attributes) { + { + "ccc" => { + string_value: "test", + data_type: "String" + }, + aaa: { + binary_value: SecureRandom.random_bytes(12), + data_type: "Binary" + }, + zzz: { + data_type: "Number", + string_value: "0230.01" + }, + "öther_encodings" => { + data_type: "String", + string_value: "Tüst".encode!("ISO-8859-1") + } + } + } + + let(:expected_hash) { + aws_sqs_client.send_message( + message_body: "test", + queue_url: queue_url, + message_attributes: message_attributes + ).md5_of_message_attributes + } + + subject { base_class.md5_of_message_attributes(message_attributes) } + + it { is_expected.to eq(expected_hash) } + end +end diff --git a/spec/active_job/queue_adapters/active_elastic_job_adapter_spec.rb b/spec/active_job/queue_adapters/active_elastic_job_adapter_spec.rb index d2c552a..ab20e13 100644 --- a/spec/active_job/queue_adapters/active_elastic_job_adapter_spec.rb +++ b/spec/active_job/queue_adapters/active_elastic_job_adapter_spec.rb @@ -1,4 +1,5 @@ require 'spec_helper' +require 'digest' class StubbedError < Aws::SQS::Errors::NonExistentQueue def initialize; end; @@ -15,6 +16,11 @@ def initialize; end; let(:queue_url_resp) { double("queue_url_resp") } let(:secret_key_base) { 's3krit' } let(:rails_app) { double("rails_app") } + let(:resp) { double("resp") } + let(:md5_body) { "body_digest" } + let(:md5_attributes) { "attributes_digest" } + let(:calculated_md5_body) { md5_body } + let(:calculated_md5_attributes) { md5_attributes } before do allow(Aws::SQS::Client).to receive(:new) { aws_sqs_client } @@ -22,7 +28,14 @@ def initialize; end; allow(rails_app).to receive(:secrets) { { secret_key_base: secret_key_base } } allow(aws_sqs_client).to receive(:get_queue_url) { queue_url_resp } allow(queue_url_resp).to receive(:queue_url) { queue_url } - allow(aws_sqs_client).to receive(:send_message) { } + allow(aws_sqs_client).to receive(:send_message) { resp } + allow(resp).to receive(:md5_of_message_body) { md5_body } + allow(resp).to receive(:md5_of_message_attributes) { md5_attributes } + allow(resp).to receive(:message_id) { "" } + allow(adapter).to receive(:md5_of_message_body) { calculated_md5_body } + allow(adapter).to receive(:md5_of_message_attributes) { + calculated_md5_attributes + } end describe ".enqueue" do @@ -39,6 +52,27 @@ def initialize; end; adapter.enqueue job end + describe "md5 digest verification" do + let(:expected_error) { + ActiveJob::QueueAdapters::ActiveElasticJobAdapter::MD5MismatchError + } + context "when md5 hash of message body does not match" do + let(:calculated_md5_body) { "a different digest" } + + it "raises MD5MismatchError " do + expect { adapter.enqueue(job) }.to raise_error(expected_error) + end + end + + context "when md5 hash of message attributes does not match" do + let(:calculated_md5_attributes) { "a different digest" } + + it "raises MD5MismatchError " do + expect { adapter.enqueue(job) }.to raise_error(expected_error) + end + end + end + context "when serialized job exeeds 256KB" do let(:exceeds_max_size) { 266 * 1024 } let(:arg) do @@ -51,10 +85,10 @@ def initialize; end; let(:job) { Helpers::TestJob.new(arg) } it "raises a SerializedJobTooBig error" do - exptected_error = ActiveJob::QueueAdapters::ActiveElasticJobAdapter::SerializedJobTooBig + expected_error = ActiveJob::QueueAdapters::ActiveElasticJobAdapter::SerializedJobTooBig expect do adapter.enqueue(job) - end.to raise_error(exptected_error) + end.to raise_error(expected_error) end end @@ -64,10 +98,10 @@ def initialize; end; end it "raises NonExistentQueue error" do - exptected_error = ActiveJob::QueueAdapters::ActiveElasticJobAdapter::NonExistentQueue + expected_error = ActiveJob::QueueAdapters::ActiveElasticJobAdapter::NonExistentQueue expect do adapter.enqueue(job) - end.to raise_error(exptected_error) + end.to raise_error(expected_error) end end end diff --git a/spec/integration/aws_sqs_client_spec.rb b/spec/integration/aws_sqs_client_spec.rb index a4c9cea..77c6c31 100644 --- a/spec/integration/aws_sqs_client_spec.rb +++ b/spec/integration/aws_sqs_client_spec.rb @@ -23,16 +23,25 @@ def perform(test_arg) response.queue_url end let(:message_content) { JSON.dump(TestJob.new.serialize) } - let(:md5_digest) { Digest::MD5.hexdigest(message_content) } + let(:message_attribute) { "attribute" } + let(:md5_digest_body) { Digest::MD5.hexdigest(message_content) } + let(:md5_digest_attribute) { Digest::MD5.hexdigest(message_attribute) } describe "#send_message" do it "is successful" do response = aws_client.send_message( message_body: message_content, - queue_url: queue_url + queue_url: queue_url, + message_attributes: { + "attribute" => { + string_value: message_attribute, + data_type: "String" + } + } ) - expect(response.md5_of_message_body).to match(md5_digest) + #expect(response.md5_of_message_body).to match(md5_digest_body) + #expect(response.md5_of_message_attributes).to match(md5_digest_attribute) end context "when message size exeeds 256 KB" do