Skip to content

Commit

Permalink
start inputs only when all WorkerLoop are fully initialized (elastic#…
Browse files Browse the repository at this point in the history
  • Loading branch information
colinsurprenant authored Jan 14, 2020
1 parent 5c4d353 commit b5f203c
Showing 1 changed file with 35 additions and 5 deletions.
40 changes: 35 additions & 5 deletions logstash-core/lib/logstash/java_pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -244,18 +244,28 @@ def start_workers

filter_queue_client.set_batch_dimensions(batch_size, batch_delay)

pipeline_workers.times do |t|
# First launch WorkerLoop initialization in separate threads which concurrently
# compiles and initializes the worker pipelines

worker_loops = pipeline_workers.times
.map { Thread.new { init_worker_loop } }
.map(&:value)

fail("Some worker(s) were not correctly initialized") if worker_loops.any?{|v| v.nil?}

# Once all WorkerLoop have been initialized run them in separate threads

worker_loops.each_with_index do |worker_loop, t|
thread = Thread.new do
Util.set_thread_name("[#{pipeline_id}]>worker#{t}")
ThreadContext.put("pipeline.id", pipeline_id)
org.logstash.execution.WorkerLoop.new(
lir_execution, filter_queue_client, @events_filtered, @events_consumed,
@flushRequested, @flushing, @shutdownRequested, @drain_queue).run
worker_loop.run
end
@worker_threads << thread
end

# inputs should be started last, after all workers
# Finally inputs should be started last, after all workers have been initialized and started

begin
start_inputs
rescue => e
Expand Down Expand Up @@ -466,6 +476,26 @@ def inspect

private

# @return [WorkerLoop] a new WorkerLoop instance or nil upon construction exception
def init_worker_loop
begin
org.logstash.execution.WorkerLoop.new(
lir_execution,
filter_queue_client,
@events_filtered,
@events_consumed,
@flushRequested,
@flushing,
@shutdownRequested,
@drain_queue)
rescue => e
@logger.error(
"Worker loop initialization error",
default_logging_keys(:error => e.message, :exception => e.class, :stacktrace => e.backtrace.join("\n")))
nil
end
end

def maybe_setup_out_plugins
if @outputs_registered.make_true
register_plugins(outputs)
Expand Down

0 comments on commit b5f203c

Please sign in to comment.