Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Thread#run to avoid unexpected interruption. #2170

Merged
merged 1 commit into from
Nov 9, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 48 additions & 23 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def multi_workers_ready?
end

# Internal states
FlushThreadState = Struct.new(:thread, :next_clock)
FlushThreadState = Struct.new(:thread, :next_clock, :mutex, :cond_var)
DequeuedChunkInfo = Struct.new(:chunk_id, :time, :timeout) do
def expired?
time + timeout < Time.now
Expand Down Expand Up @@ -440,7 +440,7 @@ def start

@buffer_config.flush_thread_count.times do |i|
thread_title = "flush_thread_#{i}".to_sym
thread_state = FlushThreadState.new(nil, nil)
thread_state = FlushThreadState.new(nil, nil, Mutex.new, ConditionVariable.new)
thread = thread_create(thread_title) do
flush_thread_run(thread_state)
end
Expand Down Expand Up @@ -506,9 +506,14 @@ def after_shutdown
@output_flush_threads_running = false
if @output_flush_threads && !@output_flush_threads.empty?
@output_flush_threads.each do |state|
state.thread.run if state.thread.alive? # to wakeup thread and make it to stop by itself
end
@output_flush_threads.each do |state|
# to wakeup thread and make it to stop by itself
state.mutex.synchronize {
if state.thread && state.thread.status
state.next_clock = 0
state.cond_var.signal
end
}
Thread.pass
state.thread.join
end
end
Expand Down Expand Up @@ -1254,12 +1259,15 @@ def submit_flush_once
# Without locks: it is rough but enough to select "next" writer selection
@output_flush_thread_current_position = (@output_flush_thread_current_position + 1) % @buffer_config.flush_thread_count
state = @output_flush_threads[@output_flush_thread_current_position]
state.next_clock = 0
if state.thread && state.thread.status # "run"/"sleep"/"aborting" or false(successfully stop) or nil(killed by exception)
state.thread.run
else
log.warn "thread is already dead"
end
state.mutex.synchronize {
if state.thread && state.thread.status # "run"/"sleep"/"aborting" or false(successfully stop) or nil(killed by exception)
state.next_clock = 0
state.cond_var.signal
else
log.warn "thread is already dead"
end
}
Thread.pass
end

def force_flush
Expand Down Expand Up @@ -1296,8 +1304,13 @@ def enqueue_thread_wait
# only for tests of output plugin
def flush_thread_wakeup
@output_flush_threads.each do |state|
state.next_clock = 0
state.thread.run
state.mutex.synchronize {
if state.thread && state.thread.status
state.next_clock = 0
state.cond_var.signal
end
}
Thread.pass
end
end

Expand Down Expand Up @@ -1376,6 +1389,7 @@ def flush_thread_run(state)
end
log.debug "flush_thread actually running"

state.mutex.lock
begin
# This thread don't use `thread_current_running?` because this thread should run in `before_shutdown` phase
while @output_flush_threads_running
Expand All @@ -1391,30 +1405,41 @@ def flush_thread_run(state)
elsif next_retry_time && next_retry_time > Time.now
interval = next_retry_time.to_f - Time.now.to_f
else
try_flush

# next_flush_time uses flush_thread_interval or flush_thread_burst_interval (or retrying)
interval = next_flush_time.to_f - Time.now.to_f
# TODO: if secondary && delayed-commit, next_flush_time will be much longer than expected
# because @retry still exists (#commit_write is not called yet in #try_flush)
# @retry should be cleared if delayed commit is enabled? Or any other solution?
state.next_clock = Fluent::Clock.now + interval
state.mutex.unlock
begin
try_flush
# next_flush_time uses flush_thread_interval or flush_thread_burst_interval (or retrying)
interval = next_flush_time.to_f - Time.now.to_f
# TODO: if secondary && delayed-commit, next_flush_time will be much longer than expected
# because @retry still exists (#commit_write is not called yet in #try_flush)
# @retry should be cleared if delayed commit is enabled? Or any other solution?
state.next_clock = Fluent::Clock.now + interval
ensure
state.mutex.lock
end
end

if @dequeued_chunks_mutex.synchronize{ !@dequeued_chunks.empty? && @dequeued_chunks.first.expired? }
unless @output_flush_interrupted
try_rollback_write
state.mutex.unlock
begin
try_rollback_write
ensure
state.mutex.lock
end
end
end

sleep interval if interval > 0
state.cond_var.wait(state.mutex, interval) if interval > 0
end
rescue => e
# normal errors are rescued by output plugins in #try_flush
# so this rescue section is for critical & unrecoverable errors
log.error "error on output thread", error: e
log.error_backtrace
raise
ensure
state.mutex.unlock
end
end
end
Expand Down