Skip to content

Commit

Permalink
fix PipelineRegistry to avoid re-creating a pipeline in the process o…
Browse files Browse the repository at this point in the history
…f being created

A pipeline in the process of being created was not marked as such in the pipeline registry resulting in a situation where a slow to initialize pipeline could be recreated on state convergence resulting in a PQ LockException because that pipeline was already existing and held the PQ lock. Replace native Java concurency with Ruby Mutex for simpler and straighforward implementation.
  • Loading branch information
colinsurprenant authored and elasticsearch-bot committed Jul 13, 2020
1 parent 62519ac commit 2afe60d
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 48 deletions.
158 changes: 111 additions & 47 deletions logstash-core/lib/logstash/pipelines_registry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,40 +17,100 @@

module LogStash
class PipelineState
attr_reader :pipeline_id, :pipeline
attr_reader :pipeline_id

def initialize(pipeline_id, pipeline)
@pipeline_id = pipeline_id
@pipeline = pipeline
@reloading = Concurrent::AtomicBoolean.new(false)
@loading = Concurrent::AtomicBoolean.new(false)

# this class uses a lock to ensure thread safe visibility.
@lock = Mutex.new
end

def terminated?
# a reloading pipeline is never considered terminated
@reloading.false? && @pipeline.finished_execution?
@lock.synchronize do
# a loading pipeline is never considered terminated
@loading.false? && @pipeline.finished_execution?
end
end

def set_reloading(is_reloading)
@reloading.value = is_reloading
def set_loading(is_loading)
@lock.synchronize do
@loading.value = is_loading
end
end

def set_pipeline(pipeline)
raise(ArgumentError, "invalid nil pipeline") if pipeline.nil?
@pipeline = pipeline
@lock.synchronize do
raise(ArgumentError, "invalid nil pipeline") if pipeline.nil?
@pipeline = pipeline
end
end

def pipeline
@lock.synchronize { @pipeline }
end
end

class PipelineStates

def initialize
@states = {}
@locks = {}
@lock = Mutex.new
end

def get(pipeline_id)
@lock.synchronize do
@states[pipeline_id]
end
end

def put(pipeline_id, state)
@lock.synchronize do
@states[pipeline_id] = state
end
end

def remove(pipeline_id)
@lock.synchronize do
@states.delete(pipeline_id)
@locks.delete(pipeline_id)
end
end

def size
@lock.synchronize do
@states.size
end
end

def empty?
@lock.synchronize do
@states.empty?
end
end

def each_with_object(init, &block)
states = @lock.synchronize { @states.dup }
states.each_with_object(init, &block)
end

def get_lock(pipeline_id)
@lock.synchronize do
@locks[pipeline_id] ||= Mutex.new
end
end
end


class PipelinesRegistry
attr_reader :states
include LogStash::Util::Loggable

def initialize
# we leverage the semantic of the Java ConcurrentHashMap for the
# compute() method which is atomic; calling compute() concurrently
# will block until the other compute finishes so no mutex is necessary
# for synchronizing compute calls
@states = java.util.concurrent.ConcurrentHashMap.new
@locks = java.util.concurrent.ConcurrentHashMap.new
@states = PipelineStates.new
end

# Execute the passed creation logic block and create a new state upon success
Expand All @@ -62,23 +122,35 @@ def initialize
#
# @return [Boolean] new pipeline creation success
def create_pipeline(pipeline_id, pipeline, &create_block)
lock = get_lock(pipeline_id)
lock = @states.get_lock(pipeline_id)
lock.lock

success = false

state = @states.get(pipeline_id)
if state
if state.terminated?

if state && !state.terminated?
logger.error("Attempted to create a pipeline that already exists", :pipeline_id => pipeline_id)
return false
end

if state.nil?
state = PipelineState.new(pipeline_id, pipeline)
state.set_loading(true)
@states.put(pipeline_id, state)
begin
success = yield
state.set_pipeline(pipeline)
else
logger.error("Attempted to create a pipeline that already exists", :pipeline_id => pipeline_id)
ensure
state.set_loading(false)
@states.remove(pipeline_id) unless success
end
@states.put(pipeline_id, state)
else
success = yield
@states.put(pipeline_id, PipelineState.new(pipeline_id, pipeline)) if success
state.set_loading(true)
state.set_pipeline(pipeline)
begin
success = yield
ensure
state.set_loading(false)
end
end

success
Expand All @@ -92,49 +164,47 @@ def create_pipeline(pipeline_id, pipeline, &create_block)
#
# @yieldparam [Pipeline] the pipeline to terminate
def terminate_pipeline(pipeline_id, &stop_block)
lock = get_lock(pipeline_id)
lock = @states.get_lock(pipeline_id)
lock.lock

state = @states.get(pipeline_id)
if state.nil?
logger.error("Attempted to terminate a pipeline that does not exists", :pipeline_id => pipeline_id)
@states.remove(pipeline_id)
else
yield(state.pipeline)
@states.put(pipeline_id, state)
end
ensure
lock.unlock
end

# Execute the passed reloading logic block in the context of the reloading state and set new pipeline in state
# Execute the passed reloading logic block in the context of the loading state and set new pipeline in state
# @param pipeline_id [String, Symbol] the pipeline id
# @param reload_block [Block] the reloading execution logic
#
# @yieldreturn [Array<Boolean, Pipeline>] the new pipeline creation success and new pipeline object
#
# @return [Boolean] new pipeline creation success
def reload_pipeline(pipeline_id, &reload_block)
lock = get_lock(pipeline_id)
lock = @states.get_lock(pipeline_id)
lock.lock
success = false

state = @states.get(pipeline_id)

if state.nil?
logger.error("Attempted to reload a pipeline that does not exists", :pipeline_id => pipeline_id)
@states.remove(pipeline_id)
else
state.set_reloading(true)
begin
success, new_pipeline = yield
state.set_pipeline(new_pipeline)
ensure
state.set_reloading(false)
end
@states.put(pipeline_id, state)
return false
end

success
state.set_loading(true)
begin
success, new_pipeline = yield
state.set_pipeline(new_pipeline)
ensure
state.set_loading(false)
end

success
ensure
lock.unlock
end
Expand All @@ -153,7 +223,7 @@ def size

# @return [Boolean] true if the states collection is empty.
def empty?
@states.isEmpty
@states.empty?
end

# @return [Hash{String=>Pipeline}]
Expand Down Expand Up @@ -189,11 +259,5 @@ def select_pipelines(&optional_state_filter)
end
end
end

def get_lock(pipeline_id)
@locks.compute_if_absent(pipeline_id) do |k|
java.util.concurrent.locks.ReentrantLock.new
end
end
end
end
41 changes: 41 additions & 0 deletions logstash-core/spec/logstash/pipelines_registry_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,47 @@
end
end
end

context "when pipeline is initializing" do
let (:wait_start_create_block) { Queue.new }
let (:wait_before_exiting_create_block) { Queue.new }
let (:slow_initializing_pipeline) { double("slow_initializing_pipeline") }
let (:pipeline2) { double("pipeline2") }

it "should create a loading state before calling the create block" do

# create a thread which calls create_pipeline and wait in the create
# block so we can controle the pipeline initialization phase
t = Thread.new do
subject.create_pipeline(pipeline_id, slow_initializing_pipeline) do
# signal that we entered the create block
wait_start_create_block << "ping"

# stall here until wait_before_exiting_create_block receives a message
wait_before_exiting_create_block.pop

true
end
end

# stall here until subject.create_pipeline has been called in the above thread
# and it entered the create block
wait_start_create_block.pop

# finished_execution? should not be called in the below tests using terminated?
# because the loading state is true. This is to make sure the state is used and not
# the pipeline termination status
expect(slow_initializing_pipeline).not_to receive(:finished_execution?)

expect(subject.states.get(pipeline_id).terminated?).to be_falsey
expect(subject.get_pipeline(pipeline_id)).to eq(slow_initializing_pipeline)
expect(subject.empty?).to be_falsey

# signal termination of create block
wait_before_exiting_create_block << "ping"
t.join
end
end
end

context "terminating a pipeline" do
Expand Down
2 changes: 1 addition & 1 deletion logstash-core/spec/support/matchers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@ def all_instance_methods_implemented?
try(30) do
pipeline = agent.get_pipeline(pipeline_config.pipeline_id)
expect(pipeline).to_not be_nil
expect(pipeline.running?).to be_truthy
end
expect(pipeline.config_str).to eq(pipeline_config.config_string)
expect(pipeline.running?).to be_truthy
expect(agent.running_pipelines.keys.map(&:to_s)).to include(pipeline_config.pipeline_id.to_s)
end

Expand Down

0 comments on commit 2afe60d

Please sign in to comment.