Skip to content

Commit

Permalink
Merge pull request #109 from renegaderyu/enhance-opensearch-data-stream
Browse files Browse the repository at this point in the history
Add logic to write method of out_opensearch_data_stream
  • Loading branch information
cosmo0920 authored Aug 18, 2023
2 parents e11df60 + 9c3ab84 commit 05ec73b
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 1 deletion.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1606,7 +1606,7 @@ There are usually a few feature requests, tagged [Easy](https://github.com/fluen

Pull Requests are welcomed.

Becore send a pull request or report an issue, please read [the contribution guideline](CONTRIBUTING.md).
Before sending a pull request or reporting an issue, please read [the contribution guideline](CONTRIBUTING.md).

## Running tests

Expand Down
6 changes: 6 additions & 0 deletions lib/fluent/plugin/out_opensearch_data_stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,12 @@ def write(chunk)
dt = Time.at(time).to_datetime
end
record.merge!({"@timestamp" => dt.iso8601(@time_precision)})
if @include_tag_key
record[@tag_key] = tag
end
if @remove_keys
@remove_keys.each { |key| record.delete(key) }
end
bulk_message = append_record_to_messages(CREATE_OP, {}, headers, record, bulk_message)
rescue => e
emit_error_label_event do
Expand Down
65 changes: 65 additions & 0 deletions test/plugin/test_out_opensearch_data_stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -678,4 +678,69 @@ def test_record_no_timestamp
assert(index_cmds[1].has_key? '@timestamp')
end

def test_record_with_include_tag_key
stub_default
stub_bulk_feed
stub_default
stub_bulk_feed
conf = config_element(
'ROOT', '', {
'@type' => OPENSEARCH_DATA_STREAM_TYPE,
'data_stream_name' => 'foo',
'data_stream_template_name' => 'foo_tpl',
'include_tag_key' => true,
'tag_key' => 'test_tag'
})
record = {
'message' => 'Sample Record'
}
driver(conf).run(default_tag: 'test') do
driver.feed(record)
end
assert(index_cmds[1].has_key?('test_tag'))
end

def test_record_without_include_tag_key
stub_default
stub_bulk_feed
stub_default
stub_bulk_feed
conf = config_element(
'ROOT', '', {
'@type' => OPENSEARCH_DATA_STREAM_TYPE,
'data_stream_name' => 'foo',
'data_stream_template_name' => 'foo_tpl',
'include_tag_key' => false
})
record = {
'message' => 'Sample Record'
}
driver(conf).run(default_tag: 'test') do
driver.feed(record)
end
assert(!index_cmds[1].has_key?('test'))
end

def test_record_with_remove_keys
stub_default
stub_bulk_feed
stub_default
stub_bulk_feed
conf = config_element(
'ROOT', '', {
'@type' => OPENSEARCH_DATA_STREAM_TYPE,
'data_stream_name' => 'foo',
'data_stream_template_name' => 'foo_tpl',
'remove_keys' => 'remove_me'
})
record = {
'message' => 'Sample Record',
'remove_me' => 'foo'
}
driver(conf).run(default_tag: 'test') do
driver.feed(record)
end
assert(!index_cmds[1].has_key?('remove_me'))
end

end

0 comments on commit 05ec73b

Please sign in to comment.