Skip to content

Commit e340a29

Browse files
committed
rewritten retry state with the above recommendations in mind.
Signed-off-by: Oleh <trahterber@gmail.com> Signed-off-by: OlehPalanskyi <Trahterber@gmail.com>
1 parent 9708305 commit e340a29

File tree

1 file changed

+21
-11
lines changed

1 file changed

+21
-11
lines changed

lib/fluent/plugin/in_opensearch.rb

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ def configure(conf)
110110

111111
@timestamp_parser = create_time_parser
112112
@backend_options = backend_options
113-
@retry = retry_state(@retry_randomize)
113+
@retry = nil
114114

115115
raise Fluent::ConfigError, "`password` must be present if `user` is present" if @user && @password.nil?
116116

@@ -337,6 +337,23 @@ def is_existing_connection(host)
337337
return true
338338
end
339339

340+
def update_retry_state(error=nil)
341+
if error
342+
unless @retry
343+
@retry = retry_state(@retry_randomize)
344+
end
345+
@retry.step
346+
#Raise error if the retry limit has been reached
347+
raise "Hit limit for retries. retry_times: #{@retry.steps}, error: #{error.message}" if @retry.limit?
348+
#Retry if the limit hasn't been reached
349+
log.warn("failed to connect or search.", retry_times: @retry.steps, next_retry_time: @retry.next_time.round, error: error.message)
350+
sleep(@retry.next_time - Time.now)
351+
else
352+
log.debug("retry succeeded.") unless @retry.nil?
353+
@retry = nil unless @retry.nil?
354+
end
355+
end
356+
340357
def run
341358
return run_slice if @num_slices <= 1
342359

@@ -347,13 +364,8 @@ def run
347364
run_slice(slice_id)
348365
end
349366
end
350-
rescue Faraday::ConnectionFailed, OpenSearch::Transport::Transport::Error => e
351-
@retry.step
352-
#Raise error if the retry limit has been reached
353-
raise "Hit limit for retries. retry_times: #{@retry.steps}, error: #{e.message}" if @retry.limit?
354-
#Retry if the retry limit hasn't been reached
355-
log.warn("failed to connect or search.", retry_times: @retry.steps, next_retry_time: @retry.next_time.round, error: e.message)
356-
sleep(@retry.next_time - Time.now)
367+
rescue Faraday::ConnectionFailed, OpenSearch::Transport::Transport::Error => error
368+
update_retry_state(error)
357369
retry
358370
end
359371

@@ -375,9 +387,7 @@ def run_slice(slice_id=nil)
375387

376388
router.emit_stream(@tag, es)
377389
clear_scroll(scroll_id)
378-
#reset steps and next_time if our function successful ends
379-
@retry.instance_variable_set(:@steps, 0)
380-
@retry.instance_variable_set(:@next_time, nil)
390+
update_retry_state
381391
end
382392

383393
def clear_scroll(scroll_id)

0 commit comments

Comments
 (0)