Skip to content

Commit

Permalink
Add volatile cas fields, few classes converted
Browse files Browse the repository at this point in the history
fix usages of #ensure_ivar_visibility!
  • Loading branch information
pitr-ch committed Sep 7, 2015
1 parent 1aeea1e commit 9b8a280
Show file tree
Hide file tree
Showing 14 changed files with 127 additions and 90 deletions.
33 changes: 16 additions & 17 deletions lib/concurrent/atom.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ module Concurrent
# value validates.
#
# @see http://clojure.org/atoms Clojure Atoms
class Atom < Synchronization::LockableObject
class Atom < Synchronization::Object
include Concern::Observable

private *attr_volatile_with_cas(:value)
public :value

# Create a new atom with the given initial value.
#
# @param [Object] value The initial value
Expand All @@ -36,20 +39,16 @@ class Atom < Synchronization::LockableObject
#
# @raise [ArgumentError] if the validator is not a `Proc` (when given)
def initialize(value, opts = {})
super()
synchronize do
@validator = opts.fetch(:validator, ->(v){ true })
@value = Concurrent::AtomicReference.new(value)
self.observers = Collection::CopyOnNotifyObserverSet.new
end
@Validator = opts.fetch(:validator, -> (v) { true })
self.observers = Collection::CopyOnNotifyObserverSet.new
super(value) # ensures visibility
end

# The current value of the atom.
# @!method value
# The current value of the atom.
#
# @return [Object] The current value.
def value
@value.value
end
# @return [Object] The current value.

alias_method :deref, :value

# Atomically swaps the value of atom using the given block. The current
Expand Down Expand Up @@ -85,7 +84,7 @@ def swap(*args)
raise ArgumentError.new('no block given') unless block_given?

loop do
old_value = @value.value
old_value = value
begin
new_value = yield(old_value, *args)
break old_value unless valid?(new_value)
Expand All @@ -106,7 +105,7 @@ def swap(*args)
#
# @return [Boolean] True if the value is changed else false.
def compare_and_set(old_value, new_value)
if valid?(new_value) && @value.compare_and_set(old_value, new_value)
if valid?(new_value) && compare_and_set_value(old_value, new_value)
observers.notify_observers(Time.now, new_value, nil)
true
else
Expand All @@ -123,9 +122,9 @@ def compare_and_set(old_value, new_value)
# @return [Object] The final value of the atom after all operations and
# validations are complete.
def reset(new_value)
old_value = @value.value
old_value = value
if valid?(new_value)
@value.set(new_value)
self.value = new_value
observers.notify_observers(Time.now, new_value, nil)
new_value
else
Expand All @@ -141,7 +140,7 @@ def reset(new_value)
# @return [Boolean] false if the validator function returns false or raises
# an exception else true
def valid?(new_value)
@validator.call(new_value)
@Validator.call(new_value)
rescue
false
end
Expand Down
7 changes: 3 additions & 4 deletions lib/concurrent/atomic/read_write_lock.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ class ReadWriteLock < Synchronization::Object
# @!visibility private
MAX_WRITERS = RUNNING_WRITER - MAX_READERS - 1

# Implementation notes:
# Implementation notes:
# A goal is to make the uncontended path for both readers/writers lock-free
# Only if there is reader-writer or writer-writer contention, should locks be used
# Internal state is represented by a single integer ("counter"), and updated
# Internal state is represented by a single integer ("counter"), and updated
# using atomic compare-and-swap operations
# When the counter is 0, the lock is free
# Each reader increments the counter by 1 when acquiring a read lock
Expand All @@ -57,8 +57,7 @@ def initialize
@Counter = AtomicFixnum.new(0) # single integer which represents lock state
@ReadLock = Synchronization::Lock.new
@WriteLock = Synchronization::Lock.new
ensure_ivar_visibility!
super()
super() # ensures visibility
end

# Execute a block operation within a read lock.
Expand Down
2 changes: 1 addition & 1 deletion lib/concurrent/atomic/reentrant_read_write_lock.rb
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def initialize
@ReadQueue = Synchronization::Lock.new # used to queue waiting readers
@WriteQueue = Synchronization::Lock.new # used to queue waiting writers
@HeldCount = ThreadLocalVar.new(0) # indicates # of R & W locks held by this thread
ensure_ivar_visibility!
super() # ensures visibility
end

# Execute a block operation within a read lock.
Expand Down
24 changes: 12 additions & 12 deletions lib/concurrent/edge/atomic_markable_reference.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ module Edge
# @api Edge
class AtomicMarkableReference < ::Concurrent::Synchronization::Object

private *attr_volatile_with_cas(:reference)

# @!macro [attach] atomic_markable_reference_method_initialize
def initialize(value = nil, mark = false)
super()
@Reference = AtomicReference.new ImmutableArray[value, mark]
ensure_ivar_visibility!
super(ImmutableArray[value, mark]) # ensures visibility
end

# @!macro [attach] atomic_markable_reference_method_compare_and_set
Expand All @@ -36,7 +36,7 @@ def initialize(value = nil, mark = false)
def compare_and_set(expected_val, new_val, expected_mark, new_mark)
# Memoize a valid reference to the current AtomicReference for
# later comparison.
current = @Reference.get
current = reference
curr_val, curr_mark = current

# Ensure that that the expected marks match.
Expand All @@ -56,7 +56,7 @@ def compare_and_set(expected_val, new_val, expected_mark, new_mark)

prospect = ImmutableArray[new_val, new_mark]

@Reference.compare_and_set current, prospect
compare_and_set_reference current, prospect
end
alias_method :compare_and_swap, :compare_and_set

Expand All @@ -66,7 +66,7 @@ def compare_and_set(expected_val, new_val, expected_mark, new_mark)
#
# @return [ImmutableArray] the current reference and marked values
def get
@Reference.get
reference
end

# @!macro [attach] atomic_markable_reference_method_value
Expand All @@ -75,7 +75,7 @@ def get
#
# @return [Object] the current value of the reference
def value
@Reference.get[0]
reference[0]
end

# @!macro [attach] atomic_markable_reference_method_mark
Expand All @@ -84,7 +84,7 @@ def value
#
# @return [Boolean] the current marked value
def mark
@Reference.get[1]
reference[1]
end
alias_method :marked?, :mark

Expand All @@ -98,7 +98,7 @@ def mark
#
# @return [ImmutableArray] both the new value and the new mark
def set(new_val, new_mark)
@Reference.set ImmutableArray[new_val, new_mark]
self.reference = ImmutableArray[new_val, new_mark]
end

# @!macro [attach] atomic_markable_reference_method_update
Expand All @@ -115,7 +115,7 @@ def set(new_val, new_mark)
# @return [ImmutableArray] the new value and new mark
def update
loop do
old_val, old_mark = @Reference.get
old_val, old_mark = reference
new_val, new_mark = yield old_val, old_mark

if compare_and_set old_val, new_val, old_mark, new_mark
Expand All @@ -139,7 +139,7 @@ def update
#
# @raise [Concurrent::ConcurrentUpdateError] if the update fails
def try_update!
old_val, old_mark = @Reference.get
old_val, old_mark = reference
new_val, new_mark = yield old_val, old_mark

unless compare_and_set old_val, new_val, old_mark, new_mark
Expand All @@ -165,7 +165,7 @@ def try_update!
# @return [ImmutableArray] the new value and marked state, or nil if
# the update failed
def try_update
old_val, old_mark = @Reference.get
old_val, old_mark = reference
new_val, new_mark = yield old_val, old_mark

return unless compare_and_set old_val, new_val, old_mark, new_mark
Expand Down
9 changes: 3 additions & 6 deletions lib/concurrent/edge/future.rb
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,7 @@ def initialize(promise, default_executor)
@Callbacks = LockFreeStack.new
@Waiters = LockFreeStack.new # TODO replace with AtomicFixnum, avoid aba problem
@State = AtomicReference.new PENDING
super()
ensure_ivar_visibility!
super() # ensures visibility
end

# @return [:pending, :completed]
Expand Down Expand Up @@ -879,9 +878,8 @@ def hide_completable
# @!visibility private
class AbstractPromise < Synchronization::Object
def initialize(future)
super()
@Future = future
ensure_ivar_visibility!
super()
end

def future
Expand Down Expand Up @@ -1377,9 +1375,8 @@ def initialize(default_executor, intended_time)
class Channel < Synchronization::Object
# TODO make lock free
def initialize
super
@ProbeSet = Concurrent::Channel::WaitableList.new
ensure_ivar_visibility!
super()
end

def probe_set_size
Expand Down
4 changes: 1 addition & 3 deletions lib/concurrent/edge/lock_free_linked_set/node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,11 @@ class Node < Synchronization::Object
attr_reader :Data, :Successor_reference, :Key

def initialize(data = nil, successor = nil)
super()

@Successor_reference = AtomicMarkableReference.new(successor || Tail.new)
@Data = data
@Key = key_for data

ensure_ivar_visibility!
super() # ensures visibility
end

# Check to see if the node is the last in the list.
Expand Down
30 changes: 15 additions & 15 deletions lib/concurrent/edge/lock_free_stack.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,51 +21,51 @@ def next_node

EMPTY = Empty[nil, nil]

private *attr_volatile_with_cas(:head)

def initialize
super
@Head = AtomicReference.new EMPTY
ensure_ivar_visibility!
super(EMPTY)
end

def empty?
@Head.get.equal? EMPTY
head.equal? EMPTY
end

def compare_and_push(head, value)
@Head.compare_and_set head, Node[value, head]
compare_and_set_head head, Node[value, head]
end

def push(value)
while true
head = @Head.get
return self if @Head.compare_and_set head, Node[value, head]
current_head = head
return self if compare_and_set_head current_head, Node[value, current_head]
end
end

def peek
@Head.get
head
end

def compare_and_pop(head)
@Head.compare_and_set head, head.next_node
compare_and_set_head head, head.next_node
end

def pop
while true
head = @Head.get
return head.value if @Head.compare_and_set head, head.next_node
current_head = head
return current_head.value if compare_and_set_head current_head, current_head.next_node
end
end

def compare_and_clear(head)
@Head.compare_and_set head, EMPTY
compare_and_set_head head, EMPTY
end

def clear
while true
head = @Head.get
return false if head == EMPTY
return true if @Head.compare_and_set head, EMPTY
current_head = head
return false if current_head == EMPTY
return true if compare_and_set_head current_head, EMPTY
end
end

Expand Down
11 changes: 5 additions & 6 deletions lib/concurrent/executor/safe_task_executor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,22 @@ module Concurrent
class SafeTaskExecutor < Synchronization::LockableObject

def initialize(task, opts = {})
super()
@task = task
@task = task
@exception_class = opts.fetch(:rescue_exception, false) ? Exception : StandardError
ensure_ivar_visibility!
super() # ensures visibility
end

# @return [Array]
def execute(*args)
synchronize do
success = false
value = reason = nil
value = reason = nil

begin
value = @task.call(*args)
value = @task.call(*args)
success = true
rescue @exception_class => ex
reason = ex
reason = ex
success = false
end

Expand Down
1 change: 0 additions & 1 deletion lib/concurrent/synchronization/condition.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ class Condition < LockableObject

def initialize(lock)
@Lock = lock
ensure_ivar_visibility!
super()
end

Expand Down
5 changes: 4 additions & 1 deletion lib/concurrent/synchronization/lockable_object.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ module Synchronization
private_constant :LockableObjectImplementation

class LockableObject < LockableObjectImplementation
def self.allow_only_direct_descendants! # FIXME interne dedime docela dost :/

# TODO make private for c-r

def self.allow_only_direct_descendants! # FIXME we inherit too much ourselves :/
this = self
singleton_class.send :define_method, :inherited do |child|
# super child
Expand Down
8 changes: 4 additions & 4 deletions lib/concurrent/synchronization/mri_lockable_object.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ def ns_broadcast
# @!visibility private
# @!macro internal_implementation_note
class MriMutexLockableObject < MriLockableObject
def initialize
super
def initialize(*defaults)
super(*defaults)
@__lock__ = ::Mutex.new
@__condition__ = ::ConditionVariable.new
end
Expand All @@ -46,8 +46,8 @@ def ns_wait(timeout = nil)
# @!visibility private
# @!macro internal_implementation_note
class MriMonitorLockableObject < MriLockableObject
def initialize
super
def initialize(*defaults)
super(*defaults)
@__lock__ = ::Monitor.new
@__condition__ = @__lock__.new_cond
end
Expand Down
Loading

0 comments on commit 9b8a280

Please sign in to comment.