Skip to content

Commit

Permalink
Merge pull request #78 from fluent/pass-credentials-duration
Browse files Browse the repository at this point in the history
Pass a value of refresh_credentials_interval as duration_seconds

Closes #74
  • Loading branch information
cosmo0920 authored Jun 21, 2023
2 parents 66012d9 + b4a3cff commit 17e2c49
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 4 deletions.
25 changes: 21 additions & 4 deletions lib/fluent/plugin/out_opensearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ def initialize(retry_stream)
attr_reader :template_names
attr_reader :ssl_version_options
attr_reader :compressable_connection
attr_reader :duration_seconds

helpers :event_emitter, :compat_parameters, :record_accessor, :timer

Expand All @@ -94,6 +95,7 @@ def initialize(retry_stream)
DEFAULT_RELOAD_AFTER = -1
DEFAULT_TARGET_BULK_BYTES = -1
DEFAULT_POLICY_ID = "logstash-policy"
DEFAULT_DURATION = "5h"

config_param :host, :string, :default => 'localhost'
config_param :port, :integer, :default => 9200
Expand Down Expand Up @@ -195,7 +197,7 @@ def initialize(retry_stream)
config_param :assume_role_session_name, :string, :default => "fluentd"
config_param :assume_role_web_identity_token_file, :string, :default => nil
config_param :sts_credentials_region, :string, :default => nil
config_param :refresh_credentials_interval, :time, :default => "5h"
config_param :refresh_credentials_interval, :time, :default => DEFAULT_DURATION
config_param :aws_service_name, :enum, list: [:es, :aoss], :default => :es
end

Expand All @@ -211,6 +213,8 @@ def initialize(retry_stream)

def initialize
super

@duration_seconds = Fluent::Config.time_value(DEFAULT_DURATION)
end

######################################################################################################
Expand Down Expand Up @@ -238,13 +242,15 @@ def aws_credentials(conf)
credentials = Aws::AssumeRoleCredentials.new({
role_arn: conf[:assume_role_arn],
role_session_name: conf[:assume_role_session_name],
region: sts_creds_region(conf)
region: sts_creds_region(conf),
duration_seconds: @duration_seconds
}).credentials
else
credentials = Aws::AssumeRoleWebIdentityCredentials.new({
role_arn: conf[:assume_role_arn],
web_identity_token_file: conf[:assume_role_web_identity_token_file],
region: sts_creds_region(conf)
region: sts_creds_region(conf),
duration_seconds: @duration_seconds
}).credentials
end
end
Expand Down Expand Up @@ -345,7 +351,18 @@ class << self
@_aws_credentials = aws_credentials(@endpoint)

if @endpoint.refresh_credentials_interval
timer_execute(:out_opensearch_expire_credentials, @endpoint.refresh_credentials_interval) do
@duration_seconds = Fluent::Config.time_value(@endpoint.refresh_credentials_interval)
# 60 * 60 * 12 = 12 hours
if @duration_seconds > 43200
raise Fluent::ConfigError, "Maximum duration is 12 hours."
end

# 60 * 15 = 15 minutes
if @duration_seconds < 900
raise Fluent::ConfigError, "Minimum duration is 15 minutes."
end

timer_execute(:out_opensearch_expire_credentials, @duration_seconds) do
log.debug('Recreate the AWS credentials')

@credential_mutex.synchronize do
Expand Down
3 changes: 3 additions & 0 deletions test/plugin/test_out_opensearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ def test_configure
'region' => "local",
'access_key_id' => 'YOUR_AWESOME_KEY',
'secret_access_key' => 'YOUR_AWESOME_SECRET',
'refresh_credentials_interval' => '10h'
}, []),
Fluent::Config::Element.new('buffer', 'tag', {}, [])

Expand All @@ -316,6 +317,8 @@ def test_configure
assert_nil instance.endpoint.assume_role_web_identity_token_file
assert_nil instance.endpoint.sts_credentials_region
assert_equal :es, instance.endpoint.aws_service_name
assert_equal 36000, instance.endpoint.refresh_credentials_interval
assert_equal 36000, instance.duration_seconds
end

data("OpenSearch Service" => [:es, 'es'],
Expand Down

0 comments on commit 17e2c49

Please sign in to comment.