Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added feature to template header #56

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 31 additions & 6 deletions lib/logstash/outputs/rabbitmq.rb
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,24 @@ def back_pressure_provider_for_connection(march_hare_connection)
class MessagePropertiesTemplate
##
# Creates a new `MessagePropertiesTemplate` from the provided `template`
# @param template [Hash{Symbol=>Object}]
# @param template [Hash{Symbol=>Object}]
def initialize(template)
constant_properties = template.reject { |_,v| templated?(v) }
variable_properties = template.select { |_,v| templated?(v) }

@constant_properties = normalize(constant_properties).freeze
constant_properties = template.reject { |_,v| templated?(v) } # ein hash von nicht-templated (reject) 'values' wobei templated heißt strings mit {%...
variable_properties = template.select { |_,v| templated?(v) } # der hash von templated 'values
@variable_properties = variable_properties
@constant_properties = normalize(constant_properties)

@variable_headers = nil

if @constant_properties[:headers]
constant_headers = constant_properties[:headers].reject { |_, v| templated?(v) }
@variable_headers = constant_properties[:headers].select { |_, v| templated?(v) }
@constant_properties[:headers] = constant_headers # overwrite headers with the constant ones
@constant_properties[:headers].freeze
end
@constant_properties.freeze


end

##
Expand All @@ -153,17 +164,31 @@ def initialize(template)
# @param event [LogStash::Event]: the event with which to populated templated values, if any.
# @return [Hash{Symbol=>Object}] a possibly-frozen properties hash for the provided `event`.
def build(event)
return @constant_properties if @variable_properties.empty?
return @constant_properties if only_constant?

properties = @variable_properties.each_with_object(@constant_properties.dup) do |(k,v), memo|
memo.store(k, event.sprintf(v))
end

if !@variable_headers.nil?
variable_headers_transformed = @variable_headers.transform_values {|value| event.sprintf(value)}
properties[:headers] = properties[:headers].merge(variable_headers_transformed)
end

return normalize(properties)
end

private

##
# Check wether template contains variable content that needs to be expanded.
#
# @api private
# @return [boolean]
def only_constant?()
return (@variable_properties.empty? & @variable_headers.nil?)
end

##
# Normalize the provided property mapping with respect to the value types the underlying
# client expects.
Expand Down
36 changes: 34 additions & 2 deletions spec/outputs/rabbitmq_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
}
let(:instance) { klass.new(rabbitmq_settings) }
let(:hare_info) { instance.instance_variable_get(:@hare_info) }
let(:headers) { Hash.new }

shared_examples 'recovers from exception gracefully' do
it 'should execute publish twice due to a retry' do
Expand Down Expand Up @@ -104,8 +105,40 @@
context 'with message_properties' do
let(:rabbitmq_settings) { super().merge("message_properties" => message_properties) }
let(:message_properties) { Hash.new }

context 'with headers' do

let(:message_properties) { super().merge("headers" => headers) }
let(:headers) { Hash.new }
let(:headers) { super().merge("myheader" => myheader_value) }

context 'with constant value' do
let(:myheader_value) { "asdf" }
it 'publishes headers with constant-value' do
instance.send(:publish, event, encoded_event)
expect(exchange).to have_received(:publish).with(anything, hash_including(:properties => hash_including(:headers => hash_including("myheader" => "asdf"))))
end
end

context 'with templated value' do
let(:myheader_value) { "%{[@metadata][priority]}" }
context 'when event expands template value' do
before do
expect(event).to receive(:sprintf).with(myheader_value).and_return("another_value")
end

it 'publishes with the value extracted from the event' do
instance.send(:publish, event, encoded_event)
expect(exchange).to have_received(:publish).with(anything, hash_including(:properties => hash_including(:headers => hash_including("myheader" => "another_value"))))
end
end

end

end

context 'priority' do
let(:message_properties) { super().merge("priority" => priority) }
let(:message_properties) { super().merge("priority" => priority) }
context 'as literal Integer value' do
let(:priority) { 3 }
it 'publishes with the constant-value priority' do
Expand All @@ -121,7 +154,6 @@
expect(exchange).to have_received(:publish).with(anything, hash_including(:properties => hash_including(:priority => 7)))
end
end

context 'as template value' do
let(:priority) { "%{[@metadata][priority]}" }
context 'when event expands template value' do
Expand Down