Skip to content

Commit

Permalink
remove exclusive lock for Ruby pipeline initialization (elastic#10431)
Browse files Browse the repository at this point in the history
  • Loading branch information
colinsurprenant authored Feb 15, 2019
1 parent 6cd7329 commit 0cdefb9
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 41 deletions.
5 changes: 0 additions & 5 deletions logstash-core/lib/logstash/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ def initialize(settings = LogStash::SETTINGS, source_loader = nil)

# Mutex to synchonize in the exclusive method
# Initial usage for the Ruby pipeline initialization which is not thread safe
@exclusive_lock = Mutex.new
@webserver_control_lock = Mutex.new

# Special bus object for inter-pipelines communications. Used by the `pipeline` input/output
Expand Down Expand Up @@ -86,10 +85,6 @@ def initialize(settings = LogStash::SETTINGS, source_loader = nil)
@running = Concurrent::AtomicBoolean.new(false)
end

def exclusive(&block)
@exclusive_lock.synchronize { block.call }
end

def execute
@thread = Thread.current # this var is implicitly used by Stud.stop?
logger.debug("Starting agent")
Expand Down
15 changes: 2 additions & 13 deletions logstash-core/lib/logstash/pipeline_action/create.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,11 @@ def execution_priority
# The execute assume that the thread safety access of the pipeline
# is managed by the caller.
def execute(agent, pipelines_registry)
new_pipeline =
if @pipeline_config.settings.get_value("pipeline.java_execution")
LogStash::JavaPipeline.new(@pipeline_config, @metric, agent)
else
agent.exclusive do
# The Ruby pipeline initialization is not thread safe because of the module level
# shared state in LogsStash::Config::AST. When using multiple pipelines this gets
# executed simultaneously in different threads and we need to synchronize this initialization.
LogStash::Pipeline.new(@pipeline_config, @metric, agent)
end
end

pipeline_class = @pipeline_config.settings.get_value("pipeline.java_execution") ? LogStash::JavaPipeline : LogStash::Pipeline
new_pipeline = pipeline_class.new(@pipeline_config, @metric, agent)
success = pipelines_registry.create_pipeline(pipeline_id, new_pipeline) do
new_pipeline.start # block until the pipeline is correctly started or crashed
end

LogStash::ConvergeResult::ActionResult.create(self, success)
end

Expand Down
27 changes: 4 additions & 23 deletions logstash-core/lib/logstash/pipeline_action/reload.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,10 @@ def execute(agent, pipelines_registry)
return LogStash::ConvergeResult::FailedAction.new("Cannot reload pipeline, because the existing pipeline is not reloadable")
end

java_exec = @pipeline_config.settings.get_value("pipeline.java_execution")

begin
pipeline_validator =
if @pipeline_config.settings.get_value("pipeline.java_execution")
LogStash::JavaBasePipeline.new(@pipeline_config, nil, logger, nil)
else
agent.exclusive do
# The Ruby pipeline initialization is not thread safe because of the module level
# shared state in LogsStash::Config::AST. When using multiple pipelines this gets
# executed simultaneously in different threads and we need to synchronize this initialization.
LogStash::BasePipeline.new(@pipeline_config)
end
end
pipeline_validator = java_exec ? LogStash::JavaBasePipeline.new(@pipeline_config, nil, logger, nil) : LogStash::BasePipeline.new(@pipeline_config)
rescue => e
return LogStash::ConvergeResult::FailedAction.from_exception(e)
end
Expand All @@ -62,18 +54,7 @@ def execute(agent, pipelines_registry)
old_pipeline.thread.join

# Then create a new pipeline
new_pipeline =
if @pipeline_config.settings.get_value("pipeline.java_execution")
LogStash::JavaPipeline.new(@pipeline_config, @metric, agent)
else
agent.exclusive do
# The Ruby pipeline initialization is not thread safe because of the module level
# shared state in LogsStash::Config::AST. When using multiple pipelines this gets
# executed simultaneously in different threads and we need to synchronize this initialization.
LogStash::Pipeline.new(@pipeline_config, @metric, agent)
end
end

new_pipeline = java_exec ? LogStash::JavaPipeline.new(@pipeline_config, @metric, agent) : LogStash::Pipeline.new(@pipeline_config, @metric, agent)
success = new_pipeline.start # block until the pipeline is correctly started or crashed

# return success and new_pipeline to registry reload_pipeline
Expand Down

0 comments on commit 0cdefb9

Please sign in to comment.