From 562b58b08cc5a6b60bd90c4aadd49b91c18c84e7 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Sun, 6 Sep 2015 19:42:14 +0200 Subject: [PATCH 1/5] Break Synchronization::Object into Object and LockableObject * JRuby broken for now --- lib/concurrent/actor/context.rb | 2 +- lib/concurrent/actor/core.rb | 2 +- lib/concurrent/agent.rb | 48 +++--- lib/concurrent/atom.rb | 8 +- lib/concurrent/atomic/cyclic_barrier.rb | 2 +- lib/concurrent/atomic/event.rb | 2 +- lib/concurrent/atomic/mutex_atomic_boolean.rb | 2 +- lib/concurrent/atomic/mutex_atomic_fixnum.rb | 2 +- .../atomic/mutex_count_down_latch.rb | 2 +- lib/concurrent/atomic/mutex_semaphore.rb | 4 +- .../atomic_reference/mutex_atomic.rb | 2 +- lib/concurrent/atomics.rb | 8 +- .../channel/blocking_ring_buffer.rb | 2 +- lib/concurrent/channel/buffered_channel.rb | 2 +- lib/concurrent/channel/waitable_list.rb | 2 +- .../collection/copy_on_notify_observer_set.rb | 4 +- .../collection/copy_on_write_observer_set.rb | 4 +- lib/concurrent/delay.rb | 2 +- lib/concurrent/edge/future.rb | 3 +- lib/concurrent/edge/lock_free_stack.rb | 1 + .../executor/abstract_executor_service.rb | 2 +- lib/concurrent/executor/safe_task_executor.rb | 2 +- .../executor/serialized_execution.rb | 2 +- lib/concurrent/immutable_struct.rb | 2 +- lib/concurrent/ivar.rb | 2 +- lib/concurrent/mutable_struct.rb | 8 +- lib/concurrent/settable_struct.rb | 4 +- lib/concurrent/synchronization.rb | 14 +- .../abstract_lockable_object.rb | 127 ++++++++++++++++ .../synchronization/abstract_object.rb | 138 ++---------------- lib/concurrent/synchronization/condition.rb | 6 +- .../synchronization/jruby_lockable_object.rb | 13 ++ .../{java_object.rb => jruby_object.rb} | 8 +- lib/concurrent/synchronization/lock.rb | 5 +- .../synchronization/lockable_object.rb | 56 +++++++ .../synchronization/monitor_object.rb | 27 ---- .../synchronization/mri_lockable_object.rb | 67 +++++++++ lib/concurrent/synchronization/mri_object.rb | 35 +++++ .../synchronization/mutex_object.rb | 43 ------ lib/concurrent/synchronization/object.rb | 56 +++---- .../synchronization/rbx_lockable_object.rb | 62 ++++++++ lib/concurrent/synchronization/rbx_object.rb | 85 +++-------- lib/concurrent/utility/at_exit.rb | 2 +- lib/concurrent/utility/monotonic_time.rb | 4 +- spec/concurrent/synchronization_spec.rb | 4 +- 45 files changed, 503 insertions(+), 375 deletions(-) create mode 100644 lib/concurrent/synchronization/abstract_lockable_object.rb create mode 100644 lib/concurrent/synchronization/jruby_lockable_object.rb rename lib/concurrent/synchronization/{java_object.rb => jruby_object.rb} (84%) create mode 100644 lib/concurrent/synchronization/lockable_object.rb delete mode 100644 lib/concurrent/synchronization/monitor_object.rb create mode 100644 lib/concurrent/synchronization/mri_lockable_object.rb create mode 100644 lib/concurrent/synchronization/mri_object.rb delete mode 100644 lib/concurrent/synchronization/mutex_object.rb create mode 100644 lib/concurrent/synchronization/rbx_lockable_object.rb diff --git a/lib/concurrent/actor/context.rb b/lib/concurrent/actor/context.rb index 8904dc507..2734f1e76 100644 --- a/lib/concurrent/actor/context.rb +++ b/lib/concurrent/actor/context.rb @@ -15,7 +15,7 @@ module Actor # > {include:Actor::RestartingContext} # # Example of ac actor definition: - # + # # {include:file:doc/actor/define.out.rb} # # See methods of {AbstractContext} what else can be tweaked, e.g {AbstractContext#default_reference_class} diff --git a/lib/concurrent/actor/core.rb b/lib/concurrent/actor/core.rb index 24f91aa8c..2423195e2 100644 --- a/lib/concurrent/actor/core.rb +++ b/lib/concurrent/actor/core.rb @@ -10,7 +10,7 @@ module Actor # @note Whole class should be considered private. An user should use {Context}s and {Reference}s only. # @note devel: core should not block on anything, e.g. it cannot wait on children to terminate # that would eat up all threads in task pool and deadlock - class Core < Synchronization::Object + class Core < Synchronization::LockableObject include TypeCheck include Concern::Logging diff --git a/lib/concurrent/agent.rb b/lib/concurrent/agent.rb index d415b52d3..cfe95dca1 100644 --- a/lib/concurrent/agent.rb +++ b/lib/concurrent/agent.rb @@ -19,13 +19,13 @@ module Concurrent # and no blocking receive. The state of an Agent should be itself immutable # and the `#value` of an Agent is always immediately available for reading by # any thread without any messages, i.e. observation does not require - # cooperation or coordination. + # cooperation or coordination. # # Agent action dispatches are made using the various `#send` methods. These # methods always return immediately. At some point later, in another thread, # the following will happen: # - # 1. The given `action` will be applied to the state of the Agent and the + # 1. The given `action` will be applied to the state of the Agent and the # `args`, if any were supplied. # 2. The return value of `action` will be passed to the validator lambda, # if one has been set on the Agent. @@ -55,7 +55,7 @@ module Concurrent # Unlike in Clojure, `Agent` cannot participate in `Concurrent::TVar` transactions. # # ## Example - # + # # ``` # def next_fibonacci(set = nil) # return [0, 1] if set.nil? @@ -130,7 +130,7 @@ module Concurrent # ``` # # @!macro [new] agent_await_warning - # + # # **NOTE** Never, *under any circumstances*, call any of the "await" methods # ({#await}, {#await_for}, {#await_for!}, and {#wait}) from within an action # block/proc/lambda. The call will block the Agent and will always fail. @@ -141,7 +141,7 @@ module Concurrent # # @see http://clojure.org/Agents Clojure Agents # @see http://clojure.org/state Values and Change - Clojure's approach to Identity and State - class Agent < Synchronization::Object + class Agent < Synchronization::LockableObject include Concern::Observable ERROR_MODES = [:continue, :fail].freeze @@ -150,13 +150,13 @@ class Agent < Synchronization::Object AWAIT_FLAG = Object.new private_constant :AWAIT_FLAG - AWAIT_ACTION = ->(value, latch){ latch.count_down; AWAIT_FLAG } + AWAIT_ACTION = ->(value, latch) { latch.count_down; AWAIT_FLAG } private_constant :AWAIT_ACTION - DEFAULT_ERROR_HANDLER = ->(agent, error){ nil } + DEFAULT_ERROR_HANDLER = ->(agent, error) { nil } private_constant :DEFAULT_ERROR_HANDLER - DEFAULT_VALIDATOR = ->(value){ true } + DEFAULT_VALIDATOR = ->(value) { true } private_constant :DEFAULT_VALIDATOR Job = Struct.new(:action, :args, :executor, :caller) @@ -226,6 +226,7 @@ def initialize(initial, opts = {}) def value @current.value end + alias_method :deref, :value # When {#failed?} and {#error_mode} is `:fail`, returns the error object @@ -236,6 +237,7 @@ def value def error @error.value end + alias_method :reason, :error # @!macro [attach] agent_send @@ -289,6 +291,7 @@ def send!(*args, &action) def send_off(*args, &action) enqueue_action_job(action, args, Concurrent.global_io_executor) end + alias_method :post, :send_off # @!macro agent_send @@ -396,6 +399,7 @@ def wait(timeout = nil) def failed? !@error.value.nil? end + alias_method :stopped?, :failed? # When an Agent is {#failed?}, changes the Agent {#value} to `new_value` @@ -420,7 +424,7 @@ def restart(new_value, opts = {}) raise Error.new('agent is not failed') unless failed? raise ValidationError unless ns_validate(new_value) @current.value = new_value - @error.value = nil + @error.value = nil @queue.clear if clear_actions ns_post_next_job unless @queue.empty? end @@ -440,7 +444,7 @@ class << self # # @!macro agent_await_warning def await(*agents) - agents.each {|agent| agent.await } + agents.each { |agent| agent.await } true end @@ -455,11 +459,11 @@ def await(*agents) # @!macro agent_await_warning def await_for(timeout, *agents) end_at = Concurrent.monotonic_time + timeout.to_f - ok = agents.length.times do |i| + ok = agents.length.times do |i| break false if (delay = end_at - Concurrent.monotonic_time) < 0 break false unless agents[i].await_for(delay) end - !! ok + !!ok end # Blocks the current thread until all actions dispatched thus far to all @@ -481,7 +485,7 @@ def await_for!(timeout, *agents) private def ns_initialize(initial, opts) - @error_mode = opts[:error_mode] + @error_mode = opts[:error_mode] @error_handler = opts[:error_handler] if @error_mode && !ERROR_MODES.include?(@error_mode) @@ -491,11 +495,11 @@ def ns_initialize(initial, opts) end @error_handler ||= DEFAULT_ERROR_HANDLER - @validator = opts.fetch(:validator, DEFAULT_VALIDATOR) - @current = Concurrent::AtomicReference.new(initial) - @error = Concurrent::AtomicReference.new(nil) - @caller = Concurrent::ThreadLocalVar.new(nil) - @queue = [] + @validator = opts.fetch(:validator, DEFAULT_VALIDATOR) + @current = Concurrent::AtomicReference.new(initial) + @error = Concurrent::AtomicReference.new(nil) + @caller = Concurrent::ThreadLocalVar.new(nil) + @queue = [] self.observers = Collection::CopyOnNotifyObserverSet.new end @@ -530,15 +534,15 @@ def ns_enqueue_job(job, index = nil) end def ns_post_next_job - @queue.first.executor.post{ execute_next_job } + @queue.first.executor.post { execute_next_job } end def execute_next_job - job = synchronize { @queue.first } + job = synchronize { @queue.first } old_value = @current.value @caller.value = job.caller # for nested actions - new_value = job.action.call(old_value, *job.args) + new_value = job.action.call(old_value, *job.args) @caller.value = nil if new_value != AWAIT_FLAG && ns_validate(new_value) @@ -573,7 +577,7 @@ def handle_error(error) end def ns_find_last_job_for_thread - @queue.rindex {|job| job.caller == Thread.current.object_id } + @queue.rindex { |job| job.caller == Thread.current.object_id } end end end diff --git a/lib/concurrent/atom.rb b/lib/concurrent/atom.rb index b45133e9c..0870454cd 100644 --- a/lib/concurrent/atom.rb +++ b/lib/concurrent/atom.rb @@ -20,7 +20,7 @@ module Concurrent # value validates. # # ## Example - # + # # ``` # def next_fibonacci(set = nil) # return [0, 1] if set.nil? @@ -57,7 +57,7 @@ module Concurrent # # @see http://clojure.org/atoms Clojure Atoms # @see http://clojure.org/state Values and Change - Clojure's approach to Identity and State - class Atom < Synchronization::Object + class Atom < Synchronization::LockableObject include Concern::Observable # Create a new atom with the given initial value. @@ -70,7 +70,7 @@ class Atom < Synchronization::Object # is acceptable else return false (preferrably) or raise an exception. # # @!macro deref_options - # + # # @raise [ArgumentError] if the validator is not a `Proc` (when given) def initialize(value, opts = {}) super() @@ -102,7 +102,7 @@ def value # the application of the supplied block to a current value, atomically. # However, because the block might be called multiple times, it must be free # of side effects. - # + # # @note The given block may be called multiple times, and thus should be free # of side effects. # diff --git a/lib/concurrent/atomic/cyclic_barrier.rb b/lib/concurrent/atomic/cyclic_barrier.rb index dfcde8ce9..852b08bd2 100644 --- a/lib/concurrent/atomic/cyclic_barrier.rb +++ b/lib/concurrent/atomic/cyclic_barrier.rb @@ -4,7 +4,7 @@ module Concurrent # A synchronization aid that allows a set of threads to all wait for each # other to reach a common barrier point. - class CyclicBarrier < Synchronization::Object + class CyclicBarrier < Synchronization::LockableObject # @!visibility private Generation = Struct.new(:status) diff --git a/lib/concurrent/atomic/event.rb b/lib/concurrent/atomic/event.rb index 8b4c87a3e..8079343fa 100644 --- a/lib/concurrent/atomic/event.rb +++ b/lib/concurrent/atomic/event.rb @@ -13,7 +13,7 @@ module Concurrent # `#reset` at any time once it has been set. # # @see http://msdn.microsoft.com/en-us/library/windows/desktop/ms682655.aspx - class Event < Synchronization::Object + class Event < Synchronization::LockableObject # Creates a new `Event` in the unset state. Threads calling `#wait` on the # `Event` will block. diff --git a/lib/concurrent/atomic/mutex_atomic_boolean.rb b/lib/concurrent/atomic/mutex_atomic_boolean.rb index c78c858ac..ffa3e365c 100644 --- a/lib/concurrent/atomic/mutex_atomic_boolean.rb +++ b/lib/concurrent/atomic/mutex_atomic_boolean.rb @@ -5,7 +5,7 @@ module Concurrent # @!macro atomic_boolean # @!visibility private # @!macro internal_implementation_note - class MutexAtomicBoolean < Synchronization::Object + class MutexAtomicBoolean < Synchronization::LockableObject # @!macro atomic_boolean_method_initialize def initialize(initial = false) diff --git a/lib/concurrent/atomic/mutex_atomic_fixnum.rb b/lib/concurrent/atomic/mutex_atomic_fixnum.rb index 43f29d768..15ea7b205 100644 --- a/lib/concurrent/atomic/mutex_atomic_fixnum.rb +++ b/lib/concurrent/atomic/mutex_atomic_fixnum.rb @@ -5,7 +5,7 @@ module Concurrent # @!macro atomic_fixnum # @!visibility private # @!macro internal_implementation_note - class MutexAtomicFixnum < Synchronization::Object + class MutexAtomicFixnum < Synchronization::LockableObject # http://stackoverflow.com/questions/535721/ruby-max-integer MIN_VALUE = -(2**(0.size * 8 - 2)) diff --git a/lib/concurrent/atomic/mutex_count_down_latch.rb b/lib/concurrent/atomic/mutex_count_down_latch.rb index 0975b9437..15758eb0c 100644 --- a/lib/concurrent/atomic/mutex_count_down_latch.rb +++ b/lib/concurrent/atomic/mutex_count_down_latch.rb @@ -5,7 +5,7 @@ module Concurrent # @!macro count_down_latch # @!visibility private # @!macro internal_implementation_note - class MutexCountDownLatch < Synchronization::Object + class MutexCountDownLatch < Synchronization::LockableObject # @!macro count_down_latch_method_initialize def initialize(count = 1) diff --git a/lib/concurrent/atomic/mutex_semaphore.rb b/lib/concurrent/atomic/mutex_semaphore.rb index fd66108d9..b12a3ce46 100644 --- a/lib/concurrent/atomic/mutex_semaphore.rb +++ b/lib/concurrent/atomic/mutex_semaphore.rb @@ -5,7 +5,7 @@ module Concurrent # @!macro semaphore # @!visibility private # @!macro internal_implementation_note - class MutexSemaphore < Synchronization::Object + class MutexSemaphore < Synchronization::LockableObject # @!macro semaphore_method_initialize def initialize(count) @@ -78,7 +78,7 @@ def release(permits = 1) # @raise [ArgumentError] if `@free` - `@reduction` is less than zero # # @return [nil] - # + # # @!visibility private def reduce_permits(reduction) unless reduction.is_a?(Fixnum) && reduction >= 0 diff --git a/lib/concurrent/atomic_reference/mutex_atomic.rb b/lib/concurrent/atomic_reference/mutex_atomic.rb index 4d8047e44..a2becc075 100644 --- a/lib/concurrent/atomic_reference/mutex_atomic.rb +++ b/lib/concurrent/atomic_reference/mutex_atomic.rb @@ -8,7 +8,7 @@ module Concurrent # # @!visibility private # @!macro internal_implementation_note - class MutexAtomicReference < Synchronization::Object + class MutexAtomicReference < Synchronization::LockableObject include Concurrent::AtomicDirectUpdate include Concurrent::AtomicNumericCompareAndSetWrapper diff --git a/lib/concurrent/atomics.rb b/lib/concurrent/atomics.rb index 26c14c1b5..04852c883 100644 --- a/lib/concurrent/atomics.rb +++ b/lib/concurrent/atomics.rb @@ -10,24 +10,24 @@ # @!method initialize # @!macro [new] atomic_reference_method_initialize # @param [Object] value The initial value. -# +# # @!method get # @!macro [new] atomic_reference_method_get # Gets the current value. # @return [Object] the current value -# +# # @!method set # @!macro [new] atomic_reference_method_set # Sets to the given value. # @param [Object] new_value the new value # @return [Object] the new value -# +# # @!method get_and_set # @!macro [new] atomic_reference_method_get_and_set # Atomically sets to the given value and returns the old value. # @param [Object] new_value the new value # @return [Object] the old value -# +# # @!method compare_and_set # @!macro [new] atomic_reference_method_compare_and_set # diff --git a/lib/concurrent/channel/blocking_ring_buffer.rb b/lib/concurrent/channel/blocking_ring_buffer.rb index c68c1b2a1..4e3efc88e 100644 --- a/lib/concurrent/channel/blocking_ring_buffer.rb +++ b/lib/concurrent/channel/blocking_ring_buffer.rb @@ -5,7 +5,7 @@ module Channel # @api Channel # @!macro edge_warning - class BlockingRingBuffer < Synchronization::Object + class BlockingRingBuffer < Synchronization::LockableObject def initialize(capacity) super() diff --git a/lib/concurrent/channel/buffered_channel.rb b/lib/concurrent/channel/buffered_channel.rb index d7ca55751..19a4324a0 100644 --- a/lib/concurrent/channel/buffered_channel.rb +++ b/lib/concurrent/channel/buffered_channel.rb @@ -6,7 +6,7 @@ module Channel # @api Channel # @!macro edge_warning - class BufferedChannel < Synchronization::Object + class BufferedChannel < Synchronization::LockableObject def initialize(size) super() diff --git a/lib/concurrent/channel/waitable_list.rb b/lib/concurrent/channel/waitable_list.rb index 2bd64fde5..f8c638d45 100644 --- a/lib/concurrent/channel/waitable_list.rb +++ b/lib/concurrent/channel/waitable_list.rb @@ -5,7 +5,7 @@ module Channel # @api Channel # @!macro edge_warning - class WaitableList < Synchronization::Object + class WaitableList < Synchronization::LockableObject def initialize super() diff --git a/lib/concurrent/collection/copy_on_notify_observer_set.rb b/lib/concurrent/collection/copy_on_notify_observer_set.rb index 6e0022950..50d52a623 100644 --- a/lib/concurrent/collection/copy_on_notify_observer_set.rb +++ b/lib/concurrent/collection/copy_on_notify_observer_set.rb @@ -7,9 +7,9 @@ module Collection # observers are added and removed from a thread safe collection; every time # a notification is required the internal data structure is copied to # prevent concurrency issues - # + # # @api private - class CopyOnNotifyObserverSet < Synchronization::Object + class CopyOnNotifyObserverSet < Synchronization::LockableObject def initialize super() diff --git a/lib/concurrent/collection/copy_on_write_observer_set.rb b/lib/concurrent/collection/copy_on_write_observer_set.rb index 37a7423df..3f3f7cccd 100644 --- a/lib/concurrent/collection/copy_on_write_observer_set.rb +++ b/lib/concurrent/collection/copy_on_write_observer_set.rb @@ -6,9 +6,9 @@ module Collection # A thread safe observer set implemented using copy-on-write approach: # every time an observer is added or removed the whole internal data structure is # duplicated and replaced with a new one. - # + # # @api private - class CopyOnWriteObserverSet < Synchronization::Object + class CopyOnWriteObserverSet < Synchronization::LockableObject def initialize super() diff --git a/lib/concurrent/delay.rb b/lib/concurrent/delay.rb index b93b283ae..d5a3f6d39 100644 --- a/lib/concurrent/delay.rb +++ b/lib/concurrent/delay.rb @@ -40,7 +40,7 @@ module Concurrent # execute on the given executor, allowing the call to timeout. # # @see Concurrent::Concern::Dereferenceable - class Delay < Synchronization::Object + class Delay < Synchronization::LockableObject include Concern::Obligation # NOTE: Because the global thread pools are lazy-loaded with these objects diff --git a/lib/concurrent/edge/future.rb b/lib/concurrent/edge/future.rb index db676f978..e2aa19449 100644 --- a/lib/concurrent/edge/future.rb +++ b/lib/concurrent/edge/future.rb @@ -125,7 +125,7 @@ def post_on(executor, *args, &job) include FutureShortcuts # Represents an event which will happen in future (will be completed). It has to always happen. - class Event < Synchronization::Object + class Event < Synchronization::LockableObject include Concern::Deprecation # @!visibility private @@ -879,6 +879,7 @@ def hide_completable # @!visibility private class AbstractPromise < Synchronization::Object def initialize(future) + super() @Future = future ensure_ivar_visibility! end diff --git a/lib/concurrent/edge/lock_free_stack.rb b/lib/concurrent/edge/lock_free_stack.rb index 534218e08..448a3c5dd 100644 --- a/lib/concurrent/edge/lock_free_stack.rb +++ b/lib/concurrent/edge/lock_free_stack.rb @@ -22,6 +22,7 @@ def next_node EMPTY = Empty[nil, nil] def initialize + super @Head = AtomicReference.new EMPTY ensure_ivar_visibility! end diff --git a/lib/concurrent/executor/abstract_executor_service.rb b/lib/concurrent/executor/abstract_executor_service.rb index a6cb8a462..ef013d919 100644 --- a/lib/concurrent/executor/abstract_executor_service.rb +++ b/lib/concurrent/executor/abstract_executor_service.rb @@ -7,7 +7,7 @@ module Concurrent # @!macro abstract_executor_service_public_api # @!visibility private - class AbstractExecutorService < Synchronization::Object + class AbstractExecutorService < Synchronization::LockableObject include ExecutorService # The set of possible fallback policies that may be set at thread pool creation. diff --git a/lib/concurrent/executor/safe_task_executor.rb b/lib/concurrent/executor/safe_task_executor.rb index fe054a9d9..94e104e8f 100644 --- a/lib/concurrent/executor/safe_task_executor.rb +++ b/lib/concurrent/executor/safe_task_executor.rb @@ -6,7 +6,7 @@ module Concurrent # success - indicating if the callable has been executed without errors # value - filled by the callable result if it has been executed without errors, nil otherwise # reason - the error risen by the callable if it has been executed with errors, nil otherwise - class SafeTaskExecutor < Synchronization::Object + class SafeTaskExecutor < Synchronization::LockableObject def initialize(task, opts = {}) super() diff --git a/lib/concurrent/executor/serialized_execution.rb b/lib/concurrent/executor/serialized_execution.rb index 8ced8b94f..1fd9b8855 100644 --- a/lib/concurrent/executor/serialized_execution.rb +++ b/lib/concurrent/executor/serialized_execution.rb @@ -5,7 +5,7 @@ module Concurrent # Ensures passed jobs in a serialized order never running at the same time. - class SerializedExecution < Synchronization::Object + class SerializedExecution < Synchronization::LockableObject include Concern::Logging def initialize() diff --git a/lib/concurrent/immutable_struct.rb b/lib/concurrent/immutable_struct.rb index ee0591a45..e8e0111d1 100644 --- a/lib/concurrent/immutable_struct.rb +++ b/lib/concurrent/immutable_struct.rb @@ -77,7 +77,7 @@ def self.new(*args, &block) FACTORY.define_struct(clazz_name, args, &block) end - FACTORY = Class.new(Synchronization::Object) do + FACTORY = Class.new(Synchronization::LockableObject) do def define_struct(name, members, &block) synchronize do Synchronization::AbstractStruct.define_struct_class(ImmutableStruct, Synchronization::Object, name, members, &block) diff --git a/lib/concurrent/ivar.rb b/lib/concurrent/ivar.rb index 16b3a28d7..0b866072f 100644 --- a/lib/concurrent/ivar.rb +++ b/lib/concurrent/ivar.rb @@ -43,7 +43,7 @@ module Concurrent # In Proceedings of Workshop on Graph Reduction, 1986. # 2. For recent application: # [DataDrivenFuture in Habanero Java from Rice](http://www.cs.rice.edu/~vs3/hjlib/doc/edu/rice/hj/api/HjDataDrivenFuture.html). - class IVar < Synchronization::Object + class IVar < Synchronization::LockableObject include Concern::Obligation include Concern::Observable diff --git a/lib/concurrent/mutable_struct.rb b/lib/concurrent/mutable_struct.rb index 68b0040d4..3252356bf 100644 --- a/lib/concurrent/mutable_struct.rb +++ b/lib/concurrent/mutable_struct.rb @@ -75,7 +75,7 @@ def inspect alias_method :to_s, :inspect # @!macro [attach] struct_merge - # + # # Returns a new struct containing the contents of `other` and the contents # of `self`. If no block is specified, the value for entries with duplicate # keys will be that of `other`. Otherwise the value for each duplicate key @@ -98,7 +98,7 @@ def merge(other, &block) # @!macro [attach] struct_to_h # # Returns a hash containing the names and values for the struct’s members. - # + # # @return [Hash] the names and values for the struct’s members def to_h synchronize { ns_to_h } @@ -206,10 +206,10 @@ def self.new(*args, &block) FACTORY.define_struct(clazz_name, args, &block) end - FACTORY = Class.new(Synchronization::Object) do + FACTORY = Class.new(Synchronization::LockableObject) do def define_struct(name, members, &block) synchronize do - clazz = Synchronization::AbstractStruct.define_struct_class(MutableStruct, Synchronization::Object, name, members, &block) + clazz = Synchronization::AbstractStruct.define_struct_class(MutableStruct, Synchronization::LockableObject, name, members, &block) members.each_with_index do |member, index| clazz.send(:define_method, member) do synchronize { @values[index] } diff --git a/lib/concurrent/settable_struct.rb b/lib/concurrent/settable_struct.rb index f0b397a72..b4365d34c 100644 --- a/lib/concurrent/settable_struct.rb +++ b/lib/concurrent/settable_struct.rb @@ -101,10 +101,10 @@ def self.new(*args, &block) FACTORY.define_struct(clazz_name, args, &block) end - FACTORY = Class.new(Synchronization::Object) do + FACTORY = Class.new(Synchronization::LockableObject) do def define_struct(name, members, &block) synchronize do - clazz = Synchronization::AbstractStruct.define_struct_class(SettableStruct, Synchronization::Object, name, members, &block) + clazz = Synchronization::AbstractStruct.define_struct_class(SettableStruct, Synchronization::LockableObject, name, members, &block) members.each_with_index do |member, index| clazz.send(:define_method, member) do synchronize { @values[index] } diff --git a/lib/concurrent/synchronization.rb b/lib/concurrent/synchronization.rb index d11e53846..9c086ffff 100644 --- a/lib/concurrent/synchronization.rb +++ b/lib/concurrent/synchronization.rb @@ -1,11 +1,19 @@ require 'concurrent/utility/engine' + require 'concurrent/synchronization/abstract_object' -require 'concurrent/synchronization/java_object' -require 'concurrent/synchronization/mutex_object' -require 'concurrent/synchronization/monitor_object' +require 'concurrent/synchronization/mri_object' +require 'concurrent/synchronization/jruby_object' require 'concurrent/synchronization/rbx_object' require 'concurrent/synchronization/object' +require 'concurrent/synchronization/abstract_lockable_object' +require 'concurrent/synchronization/mri_lockable_object' +require 'concurrent/synchronization/jruby_lockable_object' +require 'concurrent/synchronization/rbx_lockable_object' + +require 'concurrent/utility/native_extension_loader' # load native part first +require 'concurrent/synchronization/lockable_object' + require 'concurrent/synchronization/condition' require 'concurrent/synchronization/lock' diff --git a/lib/concurrent/synchronization/abstract_lockable_object.rb b/lib/concurrent/synchronization/abstract_lockable_object.rb new file mode 100644 index 000000000..2da28843b --- /dev/null +++ b/lib/concurrent/synchronization/abstract_lockable_object.rb @@ -0,0 +1,127 @@ +module Concurrent + module Synchronization + # @!macro synchronization_object + # @!visibility private + class AbstractLockableObject < Object + + # # @!macro [attach] synchronization_object_method_initialize + # # + # # @abstract for helper ivar initialization if needed, + # # otherwise it can be left empty. It has to call ns_initialize. # FIXME no longer true + # def initialize + # super + # end + + protected + + # @!macro [attach] synchronization_object_method_synchronize + # + # @yield runs the block synchronized against this object, + # equivalent of java's `synchronize(this) {}` + # @note can by made public in descendants if required by `public :synchronize` + def synchronize + raise NotImplementedError + end + + # @!macro [attach] synchronization_object_method_ns_initialize + # + # initialization of the object called inside synchronize block + # @note has to be called manually when required in children of this class + # @example + # class Child < Concurrent::Synchornization::Object + # def initialize(*args, &block) + # super(&nil) + # synchronize { ns_initialize(*args, &block) } + # end + # + # def ns_initialize(*args, &block) + # @args = args + # end + # end + def ns_initialize(*args, &block) + end + + # @!macro [attach] synchronization_object_method_ns_wait_until + # + # Wait until condition is met or timeout passes, + # protects against spurious wake-ups. + # @param [Numeric, nil] timeout in seconds, `nil` means no timeout + # @yield condition to be met + # @yieldreturn [true, false] + # @return [true, false] if condition met + # @note only to be used inside synchronized block + # @note to provide direct access to this method in a descendant add method + # ``` + # def wait_until(timeout = nil, &condition) + # synchronize { ns_wait_until(timeout, &condition) } + # end + # ``` + def ns_wait_until(timeout = nil, &condition) + if timeout + wait_until = Concurrent.monotonic_time + timeout + loop do + now = Concurrent.monotonic_time + condition_result = condition.call + # TODO recheck and fix properly + # 0.001 correction to avoid error when `wait_until - now` is smaller than 0.0005 and rounded to 0 + # when passed to java #wait(long timeout) + return condition_result if (now + 0.001) >= wait_until || condition_result + ns_wait wait_until - now + end + else + ns_wait timeout until condition.call + true + end + end + + # @!macro [attach] synchronization_object_method_ns_wait + # + # Wait until another thread calls #signal or #broadcast, + # spurious wake-ups can happen. + # + # @param [Numeric, nil] timeout in seconds, `nil` means no timeout + # @return [self] + # @note only to be used inside synchronized block + # @note to provide direct access to this method in a descendant add method + # ``` + # def wait(timeout = nil) + # synchronize { ns_wait(timeout) } + # end + # ``` + def ns_wait(timeout = nil) + raise NotImplementedError + end + + # @!macro [attach] synchronization_object_method_ns_signal + # + # Signal one waiting thread. + # @return [self] + # @note only to be used inside synchronized block + # @note to provide direct access to this method in a descendant add method + # ``` + # def signal + # synchronize { ns_signal } + # end + # ``` + def ns_signal + raise NotImplementedError + end + + # @!macro [attach] synchronization_object_method_ns_broadcast + # + # Broadcast to all waiting threads. + # @return [self] + # @note only to be used inside synchronized block + # @note to provide direct access to this method in a descendant add method + # ``` + # def broadcast + # synchronize { ns_broadcast } + # end + # ``` + def ns_broadcast + raise NotImplementedError + end + + end + end +end \ No newline at end of file diff --git a/lib/concurrent/synchronization/abstract_object.rb b/lib/concurrent/synchronization/abstract_object.rb index b222e3b71..c4a11e058 100644 --- a/lib/concurrent/synchronization/abstract_object.rb +++ b/lib/concurrent/synchronization/abstract_object.rb @@ -6,124 +6,16 @@ module Synchronization class AbstractObject # @!macro [attach] synchronization_object_method_initialize - # - # @abstract for helper ivar initialization if needed, - # otherwise it can be left empty. It has to call ns_initialize. - def initialize(*args, &block) + # + # @abstract has to be called by children + def initialize raise NotImplementedError end protected - # @!macro [attach] synchronization_object_method_synchronize - # - # @yield runs the block synchronized against this object, - # equivalent of java's `synchronize(this) {}` - # @note can by made public in descendants if required by `public :synchronize` - def synchronize - raise NotImplementedError - end - - # @!macro [attach] synchronization_object_method_ns_initialize - # - # initialization of the object called inside synchronize block - # @note has to be called manually when required in children of this class - # @example - # class Child < Concurrent::Synchornization::Object - # def initialize(*args, &block) - # super(&nil) - # synchronize { ns_initialize(*args, &block) } - # end - # - # def ns_initialize(*args, &block) - # @args = args - # end - # end - def ns_initialize(*args, &block) - end - - # @!macro [attach] synchronization_object_method_ns_wait_until - # - # Wait until condition is met or timeout passes, - # protects against spurious wake-ups. - # @param [Numeric, nil] timeout in seconds, `nil` means no timeout - # @yield condition to be met - # @yieldreturn [true, false] - # @return [true, false] if condition met - # @note only to be used inside synchronized block - # @note to provide direct access to this method in a descendant add method - # ``` - # def wait_until(timeout = nil, &condition) - # synchronize { ns_wait_until(timeout, &condition) } - # end - # ``` - def ns_wait_until(timeout = nil, &condition) - if timeout - wait_until = Concurrent.monotonic_time + timeout - loop do - now = Concurrent.monotonic_time - condition_result = condition.call - # 0.001 correction to avoid error when `wait_until - now` is smaller than 0.0005 and rounded to 0 - # when passed to java #wait(long timeout) - return condition_result if (now + 0.001) >= wait_until || condition_result - ns_wait wait_until - now - end - else - ns_wait timeout until condition.call - true - end - end - - # @!macro [attach] synchronization_object_method_ns_wait - # - # Wait until another thread calls #signal or #broadcast, - # spurious wake-ups can happen. - # - # @param [Numeric, nil] timeout in seconds, `nil` means no timeout - # @return [self] - # @note only to be used inside synchronized block - # @note to provide direct access to this method in a descendant add method - # ``` - # def wait(timeout = nil) - # synchronize { ns_wait(timeout) } - # end - # ``` - def ns_wait(timeout = nil) - raise NotImplementedError - end - - # @!macro [attach] synchronization_object_method_ns_signal - # - # Signal one waiting thread. - # @return [self] - # @note only to be used inside synchronized block - # @note to provide direct access to this method in a descendant add method - # ``` - # def signal - # synchronize { ns_signal } - # end - # ``` - def ns_signal - raise NotImplementedError - end - - # @!macro [attach] synchronization_object_method_ns_broadcast - # - # Broadcast to all waiting threads. - # @return [self] - # @note only to be used inside synchronized block - # @note to provide direct access to this method in a descendant add method - # ``` - # def broadcast - # synchronize { ns_broadcast } - # end - # ``` - def ns_broadcast - raise NotImplementedError - end - # @!macro [attach] synchronization_object_method_ensure_ivar_visibility - # + # # Allows to construct immutable objects where all fields are visible after initialization, not requiring # further synchronization on access. # @example @@ -136,27 +28,21 @@ def ns_broadcast # end # end def ensure_ivar_visibility! + # We have to prevent ivar writes to reordered with storing of the final instance reference + # Therefore wee need a fullFence to prevent reordering in both directions. + full_memory_barrier + end + + def full_memory_barrier raise NotImplementedError end # @!macro [attach] synchronization_object_method_self_attr_volatile - # + # # creates methods for reading and writing to a instance variable with volatile (Java semantic) instance variable # return [Array] names of defined method names def self.attr_volatile(*names) - names.each do |name| - ivar = :"@volatile_#{name}" - class_eval <<-RUBY, __FILE__, __LINE__ + 1 - def #{name} - #{ivar} - end - - def #{name}=(value) - #{ivar} = value - end - RUBY - end - names.map { |n| [n, :"#{n}="] }.flatten + raise NotImplementedError end end end diff --git a/lib/concurrent/synchronization/condition.rb b/lib/concurrent/synchronization/condition.rb index eeb6996cd..03f3fbe7d 100644 --- a/lib/concurrent/synchronization/condition.rb +++ b/lib/concurrent/synchronization/condition.rb @@ -1,6 +1,8 @@ module Concurrent module Synchronization - class Condition < Object + class Condition < LockableObject + + # TODO locks two objects, improve singleton_class.send :alias_method, :private_new, :new private_class_method :new @@ -44,7 +46,7 @@ def ns_broadcast end end - class Object < Implementation + class LockableObject < LockableObjectImplementation def new_condition Condition.private_new(self) end diff --git a/lib/concurrent/synchronization/jruby_lockable_object.rb b/lib/concurrent/synchronization/jruby_lockable_object.rb new file mode 100644 index 000000000..670037f8c --- /dev/null +++ b/lib/concurrent/synchronization/jruby_lockable_object.rb @@ -0,0 +1,13 @@ +module Concurrent + module Synchronization + + if Concurrent.on_jruby? + + # @!visibility private + # @!macro internal_implementation_note + class JRubyLockableObject < AbstractLockableObject + + end + end + end +end diff --git a/lib/concurrent/synchronization/java_object.rb b/lib/concurrent/synchronization/jruby_object.rb similarity index 84% rename from lib/concurrent/synchronization/java_object.rb rename to lib/concurrent/synchronization/jruby_object.rb index 8ed79ce62..38c538c38 100644 --- a/lib/concurrent/synchronization/java_object.rb +++ b/lib/concurrent/synchronization/jruby_object.rb @@ -1,5 +1,3 @@ -require 'concurrent/utility/native_extension_loader' # load native part first - module Concurrent module Synchronization @@ -7,7 +5,11 @@ module Synchronization # @!visibility private # @!macro internal_implementation_note - class JavaObject < AbstractObject + class JRubyObject < AbstractObject + + def initialize + # nothing to do + end def self.attr_volatile(*names) names.each do |name| diff --git a/lib/concurrent/synchronization/lock.rb b/lib/concurrent/synchronization/lock.rb index 5a7c0192c..74c9f5459 100644 --- a/lib/concurrent/synchronization/lock.rb +++ b/lib/concurrent/synchronization/lock.rb @@ -1,6 +1,7 @@ module Concurrent module Synchronization - class Lock < Object + class Lock < LockableObject + # TODO use JavaReentrantLock on JRuby public :synchronize @@ -21,7 +22,7 @@ def signal end public :ns_signal - + def broadcast synchronize { ns_broadcast } end diff --git a/lib/concurrent/synchronization/lockable_object.rb b/lib/concurrent/synchronization/lockable_object.rb new file mode 100644 index 000000000..5ce0ea845 --- /dev/null +++ b/lib/concurrent/synchronization/lockable_object.rb @@ -0,0 +1,56 @@ +module Concurrent + module Synchronization + + # @!visibility private + # @!macro internal_implementation_note + LockableObjectImplementation = case + when Concurrent.on_cruby? && Concurrent.ruby_version(:<=, 1, 9, 3) + MriMonitorLockableObject + when Concurrent.on_cruby? && Concurrent.ruby_version(:>, 1, 9, 3) + MriMutexLockableObject + when Concurrent.on_jruby? + JRubyLockableObject + when Concurrent.on_rbx? + RbxLockableObject + else + warn 'Possibly unsupported Ruby implementation' + MriMonitorLockableObject + end + private_constant :LockableObjectImplementation + + class LockableObject < LockableObjectImplementation + def self.allow_only_direct_descendants! # FIXME interne dedime docela dost :/ + this = self + singleton_class.send :define_method, :inherited do |child| + # super child + + if child.superclass != this + raise "all children of #{this} are final, subclassing is not supported use composition." + end + end + end + + # @!method initialize(*args, &block) + # @!macro synchronization_object_method_initialize + + # @!method synchronize + # @!macro synchronization_object_method_synchronize + + # @!method initialize(*args, &block) + # @!macro synchronization_object_method_ns_initialize + + # @!method wait_until(timeout = nil, &condition) + # @!macro synchronization_object_method_ns_wait_until + + # @!method wait(timeout = nil) + # @!macro synchronization_object_method_ns_wait + + # @!method signal + # @!macro synchronization_object_method_ns_signal + + # @!method broadcast + # @!macro synchronization_object_method_ns_broadcast + + end + end +end \ No newline at end of file diff --git a/lib/concurrent/synchronization/monitor_object.rb b/lib/concurrent/synchronization/monitor_object.rb deleted file mode 100644 index f4d6e3597..000000000 --- a/lib/concurrent/synchronization/monitor_object.rb +++ /dev/null @@ -1,27 +0,0 @@ -require 'monitor' -require 'concurrent/synchronization/mutex_object' - -module Concurrent - module Synchronization - - # @!visibility private - # @!macro internal_implementation_note - class MonitorObject < MutexObject - def initialize - @__lock__ = ::Monitor.new - @__condition__ = @__lock__.new_cond - end - - protected - - def synchronize - @__lock__.synchronize { yield } - end - - def ns_wait(timeout = nil) - @__condition__.wait timeout - self - end - end - end -end diff --git a/lib/concurrent/synchronization/mri_lockable_object.rb b/lib/concurrent/synchronization/mri_lockable_object.rb new file mode 100644 index 000000000..38c4e4253 --- /dev/null +++ b/lib/concurrent/synchronization/mri_lockable_object.rb @@ -0,0 +1,67 @@ +module Concurrent + module Synchronization + + # @!visibility private + # @!macro internal_implementation_note + class MriLockableObject < AbstractLockableObject + protected + + def ns_signal + @__condition__.signal + self + end + + def ns_broadcast + @__condition__.broadcast + self + end + end + + + # @!visibility private + # @!macro internal_implementation_note + class MriMutexLockableObject < MriLockableObject + def initialize + super + @__lock__ = ::Mutex.new + @__condition__ = ::ConditionVariable.new + end + + protected + + def synchronize + if @__lock__.owned? + yield + else + @__lock__.synchronize { yield } + end + end + + def ns_wait(timeout = nil) + @__condition__.wait @__lock__, timeout + self + end + end + + # @!visibility private + # @!macro internal_implementation_note + class MriMonitorLockableObject < MriLockableObject + def initialize + super + @__lock__ = ::Monitor.new + @__condition__ = @__lock__.new_cond + end + + protected + + def synchronize # TODO may be a problem with lock.synchronize { lock.wait } + @__lock__.synchronize { yield } + end + + def ns_wait(timeout = nil) + @__condition__.wait timeout + self + end + end + end +end diff --git a/lib/concurrent/synchronization/mri_object.rb b/lib/concurrent/synchronization/mri_object.rb new file mode 100644 index 000000000..289334034 --- /dev/null +++ b/lib/concurrent/synchronization/mri_object.rb @@ -0,0 +1,35 @@ +module Concurrent + module Synchronization + + # @!visibility private + # @!macro internal_implementation_note + class MriObject < AbstractObject + + def initialize + # nothing to do + end + + def full_memory_barrier + # relying on undocumented behavior of CRuby, GVL acquire has lock which ensures visibility of ivars + # https://github.com/ruby/ruby/blob/ruby_2_2/thread_pthread.c#L204-L211 + end + + def self.attr_volatile(*names) + names.each do |name| + ivar = :"@volatile_#{name}" + class_eval <<-RUBY, __FILE__, __LINE__ + 1 + def #{name} + #{ivar} + end + + def #{name}=(value) + #{ivar} = value + end + RUBY + end + names.map { |n| [n, :"#{n}="] }.flatten + end + end + + end +end diff --git a/lib/concurrent/synchronization/mutex_object.rb b/lib/concurrent/synchronization/mutex_object.rb deleted file mode 100644 index 0ec26c974..000000000 --- a/lib/concurrent/synchronization/mutex_object.rb +++ /dev/null @@ -1,43 +0,0 @@ -module Concurrent - module Synchronization - - # @!visibility private - # @!macro internal_implementation_note - class MutexObject < AbstractObject - def initialize - @__lock__ = ::Mutex.new - @__condition__ = ::ConditionVariable.new - end - - protected - - def synchronize - if @__lock__.owned? - yield - else - @__lock__.synchronize { yield } - end - end - - def ns_signal - @__condition__.signal - self - end - - def ns_broadcast - @__condition__.broadcast - self - end - - def ns_wait(timeout = nil) - @__condition__.wait @__lock__, timeout - self - end - - def ensure_ivar_visibility! - # relying on undocumented behavior of CRuby, GVL acquire has lock which ensures visibility of ivars - # https://github.com/ruby/ruby/blob/ruby_2_2/thread_pthread.c#L204-L211 - end - end - end -end diff --git a/lib/concurrent/synchronization/object.rb b/lib/concurrent/synchronization/object.rb index d3c2d47e7..4745b56e6 100644 --- a/lib/concurrent/synchronization/object.rb +++ b/lib/concurrent/synchronization/object.rb @@ -1,77 +1,61 @@ -require 'concurrent/synchronization/java_object' -require 'concurrent/synchronization/monitor_object' -require 'concurrent/synchronization/mutex_object' -require 'concurrent/synchronization/rbx_object' module Concurrent module Synchronization # @!visibility private # @!macro internal_implementation_note - Implementation = case + ObjectImplementation = case + when Concurrent.on_cruby? + MriObject when Concurrent.on_jruby? - JavaObject - when Concurrent.on_cruby? && Concurrent.ruby_version(:<=, 1, 9, 3) - MonitorObject - when Concurrent.on_cruby? && Concurrent.ruby_version(:>, 1, 9, 3) - MutexObject + JRubyObject when Concurrent.on_rbx? RbxObject else warn 'Possibly unsupported Ruby implementation' - MutexObject + MriObject end - private_constant :Implementation + private_constant :ObjectImplementation + # TODO fix documentation # @!macro [attach] synchronization_object - # + # # Safe synchronization under any Ruby implementation. # It provides methods like {#synchronize}, {#wait}, {#signal} and {#broadcast}. # Provides a single layer which can improve its implementation over time without changes needed to # the classes using it. Use {Synchronization::Object} not this abstract class. - # + # # @note this object does not support usage together with # [`Thread#wakeup`](http://ruby-doc.org/core-2.2.0/Thread.html#method-i-wakeup) # and [`Thread#raise`](http://ruby-doc.org/core-2.2.0/Thread.html#method-i-raise). # `Thread#sleep` and `Thread#wakeup` will work as expected but mixing `Synchronization::Object#wait` and # `Thread#wakeup` will not work on all platforms. - # + # # @see {Event} implementation as an example of this class use - # + # # @example simple # class AnClass < Synchronization::Object # def initialize # super # synchronize { @value = 'asd' } # end - # + # # def value # synchronize { @value } # end # end # - class Object < Implementation + class Object < ObjectImplementation - # @!method initialize(*args, &block) - # @!macro synchronization_object_method_initialize + # TODO split to be able to use just final fields + # - object has mfence, volatile fields, and cas fields - # @!method synchronize - # @!macro synchronization_object_method_synchronize + # TODO lock should be the public api, Object with private synchronize, signal, .. should be + # private class just for concurrent-ruby, forbid inheritance of classes using it, like CountDownLatch + # TODO in place CAS - # @!method initialize(*args, &block) - # @!macro synchronization_object_method_ns_initialize - - # @!method wait_until(timeout = nil, &condition) - # @!macro synchronization_object_method_ns_wait_until - - # @!method wait(timeout = nil) - # @!macro synchronization_object_method_ns_wait - - # @!method signal - # @!macro synchronization_object_method_ns_signal - - # @!method broadcast - # @!macro synchronization_object_method_ns_broadcast + # @!method initialize + # @!macro synchronization_object_method_initialize # @!method ensure_ivar_visibility! # @!macro synchronization_object_method_ensure_ivar_visibility diff --git a/lib/concurrent/synchronization/rbx_lockable_object.rb b/lib/concurrent/synchronization/rbx_lockable_object.rb new file mode 100644 index 000000000..6fff96e28 --- /dev/null +++ b/lib/concurrent/synchronization/rbx_lockable_object.rb @@ -0,0 +1,62 @@ +module Concurrent + module Synchronization + + # @!visibility private + # @!macro internal_implementation_note + class RbxLockableObject < AbstractLockableObject + def initialize + @__Waiters__ = [] + @__owner__ = nil + ensure_ivar_visibility! + end + + protected + + def synchronize(&block) + if @__owner__ == Thread.current + yield + else + Rubinius.lock(self) + begin + @__owner__ = Thread.current + result = yield + ensure + @__owner__ = nil + Rubinius.unlock(self) + result + end + end + end + + def ns_wait(timeout = nil) + wchan = Rubinius::Channel.new + + begin + @__Waiters__.push wchan + Rubinius.unlock(self) + signaled = wchan.receive_timeout timeout + ensure + Rubinius.lock(self) + + if !signaled && !@__Waiters__.delete(wchan) + # we timed out, but got signaled afterwards, + # so pass that signal on to the next waiter + @__Waiters__.shift << true unless @__Waiters__.empty? + end + end + + self + end + + def ns_signal + @__Waiters__.shift << true unless @__Waiters__.empty? + self + end + + def ns_broadcast + @__Waiters__.shift << true until @__Waiters__.empty? + self + end + end + end +end diff --git a/lib/concurrent/synchronization/rbx_object.rb b/lib/concurrent/synchronization/rbx_object.rb index d59f4e04c..b6d324d21 100644 --- a/lib/concurrent/synchronization/rbx_object.rb +++ b/lib/concurrent/synchronization/rbx_object.rb @@ -1,74 +1,23 @@ module Concurrent module Synchronization - if Concurrent.on_rbx? - # @!visibility private - # @!macro internal_implementation_note - class RbxObject < AbstractObject - def initialize - @__Waiters__ = [] - @__owner__ = nil - ensure_ivar_visibility! - end - - protected - - def synchronize(&block) - if @__owner__ == Thread.current - yield - else - Rubinius.lock(self) - begin - @__owner__ = Thread.current - result = yield - ensure - @__owner__ = nil - Rubinius.unlock(self) - result - end - end - end - - def ns_wait(timeout = nil) - wchan = Rubinius::Channel.new - - begin - @__Waiters__.push wchan - Rubinius.unlock(self) - signaled = wchan.receive_timeout timeout - ensure - Rubinius.lock(self) - - if !signaled && !@__Waiters__.delete(wchan) - # we timed out, but got signaled afterwards, - # so pass that signal on to the next waiter - @__Waiters__.shift << true unless @__Waiters__.empty? - end - end - - self - end - - def ns_signal - @__Waiters__.shift << true unless @__Waiters__.empty? - self - end - - def ns_broadcast - @__Waiters__.shift << true until @__Waiters__.empty? - self - end + # @!visibility private + # @!macro internal_implementation_note + class RbxObject < AbstractObject + def initialize + # nothing to do + end - def ensure_ivar_visibility! - # Rubinius instance variables are not volatile so we need to insert barrier - Rubinius.memory_barrier - end + def full_memory_barrier + # Rubinius instance variables are not volatile so we need to insert barrier + Rubinius.memory_barrier + end - def self.attr_volatile *names - names.each do |name| - ivar = :"@volatile_#{name}" - class_eval <<-RUBY, __FILE__, __LINE__ + 1 + def self.attr_volatile *names + names.each do |name| + ivar = :"@volatile_#{name}" + class_eval <<-RUBY, __FILE__, __LINE__ + 1 def #{name} Rubinius.memory_barrier #{ivar} @@ -78,11 +27,11 @@ def #{name}=(value) #{ivar} = value Rubinius.memory_barrier end - RUBY - end - names.map { |n| [n, :"#{n}="] }.flatten + RUBY end + names.map { |n| [n, :"#{n}="] }.flatten end end + end end diff --git a/lib/concurrent/utility/at_exit.rb b/lib/concurrent/utility/at_exit.rb index 0405c0c60..a4cc508bb 100644 --- a/lib/concurrent/utility/at_exit.rb +++ b/lib/concurrent/utility/at_exit.rb @@ -7,7 +7,7 @@ module Concurrent # Each handler is executed at most once. # # @!visibility private - class AtExitImplementation < Synchronization::Object + class AtExitImplementation < Synchronization::LockableObject include Logger::Severity def initialize(*args) diff --git a/lib/concurrent/utility/monotonic_time.rb b/lib/concurrent/utility/monotonic_time.rb index 98aaeb674..08e2e29e8 100644 --- a/lib/concurrent/utility/monotonic_time.rb +++ b/lib/concurrent/utility/monotonic_time.rb @@ -2,7 +2,7 @@ module Concurrent - class_definition = Class.new(Synchronization::Object) do + class_definition = Class.new(Synchronization::LockableObject) do def initialize super() @last_time = Time.now.to_f @@ -44,7 +44,7 @@ def get_time private_constant :GLOBAL_MONOTONIC_CLOCK # @!macro [attach] monotonic_get_time - # + # # Returns the current time a tracked by the application monotonic clock. # # @return [Float] The current monotonic time when `since` not given else diff --git a/spec/concurrent/synchronization_spec.rb b/spec/concurrent/synchronization_spec.rb index e41f2524d..7119e44b2 100644 --- a/spec/concurrent/synchronization_spec.rb +++ b/spec/concurrent/synchronization_spec.rb @@ -3,9 +3,9 @@ module Concurrent describe Synchronization do - describe Synchronization::Object do + describe Synchronization::LockableObject do - class AClass < Synchronization::Object + class AClass < Synchronization::LockableObject attr_volatile :volatile attr_accessor :not_volatile From bfee21080399c9ec2a8d00b4a332dd375b5c47e7 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Mon, 7 Sep 2015 10:42:51 +0200 Subject: [PATCH 2/5] Add volatile cas fields, few classes converted fix usages of #ensure_ivar_visibility! --- lib/concurrent/atom.rb | 33 ++++---- lib/concurrent/atomic/read_write_lock.rb | 7 +- .../atomic/reentrant_read_write_lock.rb | 2 +- .../edge/atomic_markable_reference.rb | 24 +++--- lib/concurrent/edge/future.rb | 9 +-- .../edge/lock_free_linked_set/node.rb | 4 +- lib/concurrent/edge/lock_free_stack.rb | 30 ++++---- lib/concurrent/executor/safe_task_executor.rb | 11 ++- lib/concurrent/synchronization/condition.rb | 1 - .../synchronization/lockable_object.rb | 5 +- .../synchronization/mri_lockable_object.rb | 8 +- lib/concurrent/synchronization/object.rb | 76 +++++++++++++++---- .../synchronization/rbx_lockable_object.rb | 4 +- lib/concurrent/utility/monotonic_time.rb | 3 +- 14 files changed, 127 insertions(+), 90 deletions(-) diff --git a/lib/concurrent/atom.rb b/lib/concurrent/atom.rb index 0870454cd..de4dabe8a 100644 --- a/lib/concurrent/atom.rb +++ b/lib/concurrent/atom.rb @@ -57,9 +57,12 @@ module Concurrent # # @see http://clojure.org/atoms Clojure Atoms # @see http://clojure.org/state Values and Change - Clojure's approach to Identity and State - class Atom < Synchronization::LockableObject + class Atom < Synchronization::Object include Concern::Observable + private *attr_volatile_with_cas(:value) + public :value + # Create a new atom with the given initial value. # # @param [Object] value The initial value @@ -73,20 +76,16 @@ class Atom < Synchronization::LockableObject # # @raise [ArgumentError] if the validator is not a `Proc` (when given) def initialize(value, opts = {}) - super() - synchronize do - @validator = opts.fetch(:validator, ->(v){ true }) - @value = Concurrent::AtomicReference.new(value) - self.observers = Collection::CopyOnNotifyObserverSet.new - end + @Validator = opts.fetch(:validator, -> (v) { true }) + self.observers = Collection::CopyOnNotifyObserverSet.new + super(value) # ensures visibility end - # The current value of the atom. + # @!method value + # The current value of the atom. # - # @return [Object] The current value. - def value - @value.value - end + # @return [Object] The current value. + alias_method :deref, :value # Atomically swaps the value of atom using the given block. The current @@ -122,7 +121,7 @@ def swap(*args) raise ArgumentError.new('no block given') unless block_given? loop do - old_value = @value.value + old_value = value begin new_value = yield(old_value, *args) break old_value unless valid?(new_value) @@ -143,7 +142,7 @@ def swap(*args) # # @return [Boolean] True if the value is changed else false. def compare_and_set(old_value, new_value) - if valid?(new_value) && @value.compare_and_set(old_value, new_value) + if valid?(new_value) && compare_and_set_value(old_value, new_value) observers.notify_observers(Time.now, old_value, new_value) true else @@ -160,9 +159,9 @@ def compare_and_set(old_value, new_value) # @return [Object] The final value of the atom after all operations and # validations are complete. def reset(new_value) - old_value = @value.value + old_value = value if valid?(new_value) - @value.set(new_value) + self.value = new_value observers.notify_observers(Time.now, old_value, new_value) new_value else @@ -178,7 +177,7 @@ def reset(new_value) # @return [Boolean] false if the validator function returns false or raises # an exception else true def valid?(new_value) - @validator.call(new_value) + @Validator.call(new_value) rescue false end diff --git a/lib/concurrent/atomic/read_write_lock.rb b/lib/concurrent/atomic/read_write_lock.rb index a42e19dc8..dac6b1c4d 100644 --- a/lib/concurrent/atomic/read_write_lock.rb +++ b/lib/concurrent/atomic/read_write_lock.rb @@ -41,10 +41,10 @@ class ReadWriteLock < Synchronization::Object # @!visibility private MAX_WRITERS = RUNNING_WRITER - MAX_READERS - 1 - # Implementation notes: + # Implementation notes: # A goal is to make the uncontended path for both readers/writers lock-free # Only if there is reader-writer or writer-writer contention, should locks be used - # Internal state is represented by a single integer ("counter"), and updated + # Internal state is represented by a single integer ("counter"), and updated # using atomic compare-and-swap operations # When the counter is 0, the lock is free # Each reader increments the counter by 1 when acquiring a read lock @@ -57,8 +57,7 @@ def initialize @Counter = AtomicFixnum.new(0) # single integer which represents lock state @ReadLock = Synchronization::Lock.new @WriteLock = Synchronization::Lock.new - ensure_ivar_visibility! - super() + super() # ensures visibility end # Execute a block operation within a read lock. diff --git a/lib/concurrent/atomic/reentrant_read_write_lock.rb b/lib/concurrent/atomic/reentrant_read_write_lock.rb index d7b8de8e9..5c21f904a 100644 --- a/lib/concurrent/atomic/reentrant_read_write_lock.rb +++ b/lib/concurrent/atomic/reentrant_read_write_lock.rb @@ -105,7 +105,7 @@ def initialize @ReadQueue = Synchronization::Lock.new # used to queue waiting readers @WriteQueue = Synchronization::Lock.new # used to queue waiting writers @HeldCount = ThreadLocalVar.new(0) # indicates # of R & W locks held by this thread - ensure_ivar_visibility! + super() # ensures visibility end # Execute a block operation within a read lock. diff --git a/lib/concurrent/edge/atomic_markable_reference.rb b/lib/concurrent/edge/atomic_markable_reference.rb index 94e0d87e9..423351191 100644 --- a/lib/concurrent/edge/atomic_markable_reference.rb +++ b/lib/concurrent/edge/atomic_markable_reference.rb @@ -11,11 +11,11 @@ module Edge # @api Edge class AtomicMarkableReference < ::Concurrent::Synchronization::Object + private *attr_volatile_with_cas(:reference) + # @!macro [attach] atomic_markable_reference_method_initialize def initialize(value = nil, mark = false) - super() - @Reference = AtomicReference.new ImmutableArray[value, mark] - ensure_ivar_visibility! + super(ImmutableArray[value, mark]) # ensures visibility end # @!macro [attach] atomic_markable_reference_method_compare_and_set @@ -36,7 +36,7 @@ def initialize(value = nil, mark = false) def compare_and_set(expected_val, new_val, expected_mark, new_mark) # Memoize a valid reference to the current AtomicReference for # later comparison. - current = @Reference.get + current = reference curr_val, curr_mark = current # Ensure that that the expected marks match. @@ -56,7 +56,7 @@ def compare_and_set(expected_val, new_val, expected_mark, new_mark) prospect = ImmutableArray[new_val, new_mark] - @Reference.compare_and_set current, prospect + compare_and_set_reference current, prospect end alias_method :compare_and_swap, :compare_and_set @@ -66,7 +66,7 @@ def compare_and_set(expected_val, new_val, expected_mark, new_mark) # # @return [ImmutableArray] the current reference and marked values def get - @Reference.get + reference end # @!macro [attach] atomic_markable_reference_method_value @@ -75,7 +75,7 @@ def get # # @return [Object] the current value of the reference def value - @Reference.get[0] + reference[0] end # @!macro [attach] atomic_markable_reference_method_mark @@ -84,7 +84,7 @@ def value # # @return [Boolean] the current marked value def mark - @Reference.get[1] + reference[1] end alias_method :marked?, :mark @@ -98,7 +98,7 @@ def mark # # @return [ImmutableArray] both the new value and the new mark def set(new_val, new_mark) - @Reference.set ImmutableArray[new_val, new_mark] + self.reference = ImmutableArray[new_val, new_mark] end # @!macro [attach] atomic_markable_reference_method_update @@ -115,7 +115,7 @@ def set(new_val, new_mark) # @return [ImmutableArray] the new value and new mark def update loop do - old_val, old_mark = @Reference.get + old_val, old_mark = reference new_val, new_mark = yield old_val, old_mark if compare_and_set old_val, new_val, old_mark, new_mark @@ -139,7 +139,7 @@ def update # # @raise [Concurrent::ConcurrentUpdateError] if the update fails def try_update! - old_val, old_mark = @Reference.get + old_val, old_mark = reference new_val, new_mark = yield old_val, old_mark unless compare_and_set old_val, new_val, old_mark, new_mark @@ -165,7 +165,7 @@ def try_update! # @return [ImmutableArray] the new value and marked state, or nil if # the update failed def try_update - old_val, old_mark = @Reference.get + old_val, old_mark = reference new_val, new_mark = yield old_val, old_mark return unless compare_and_set old_val, new_val, old_mark, new_mark diff --git a/lib/concurrent/edge/future.rb b/lib/concurrent/edge/future.rb index e2aa19449..4a8d59194 100644 --- a/lib/concurrent/edge/future.rb +++ b/lib/concurrent/edge/future.rb @@ -173,8 +173,7 @@ def initialize(promise, default_executor) @Callbacks = LockFreeStack.new @Waiters = LockFreeStack.new # TODO replace with AtomicFixnum, avoid aba problem @State = AtomicReference.new PENDING - super() - ensure_ivar_visibility! + super() # ensures visibility end # @return [:pending, :completed] @@ -879,9 +878,8 @@ def hide_completable # @!visibility private class AbstractPromise < Synchronization::Object def initialize(future) - super() @Future = future - ensure_ivar_visibility! + super() end def future @@ -1377,9 +1375,8 @@ def initialize(default_executor, intended_time) class Channel < Synchronization::Object # TODO make lock free def initialize - super @ProbeSet = Concurrent::Channel::WaitableList.new - ensure_ivar_visibility! + super() end def probe_set_size diff --git a/lib/concurrent/edge/lock_free_linked_set/node.rb b/lib/concurrent/edge/lock_free_linked_set/node.rb index dec61e76a..82f1ff383 100644 --- a/lib/concurrent/edge/lock_free_linked_set/node.rb +++ b/lib/concurrent/edge/lock_free_linked_set/node.rb @@ -9,13 +9,11 @@ class Node < Synchronization::Object attr_reader :Data, :Successor_reference, :Key def initialize(data = nil, successor = nil) - super() - @Successor_reference = AtomicMarkableReference.new(successor || Tail.new) @Data = data @Key = key_for data - ensure_ivar_visibility! + super() # ensures visibility end # Check to see if the node is the last in the list. diff --git a/lib/concurrent/edge/lock_free_stack.rb b/lib/concurrent/edge/lock_free_stack.rb index 448a3c5dd..0decb0de2 100644 --- a/lib/concurrent/edge/lock_free_stack.rb +++ b/lib/concurrent/edge/lock_free_stack.rb @@ -21,51 +21,51 @@ def next_node EMPTY = Empty[nil, nil] + private *attr_volatile_with_cas(:head) + def initialize - super - @Head = AtomicReference.new EMPTY - ensure_ivar_visibility! + super(EMPTY) end def empty? - @Head.get.equal? EMPTY + head.equal? EMPTY end def compare_and_push(head, value) - @Head.compare_and_set head, Node[value, head] + compare_and_set_head head, Node[value, head] end def push(value) while true - head = @Head.get - return self if @Head.compare_and_set head, Node[value, head] + current_head = head + return self if compare_and_set_head current_head, Node[value, current_head] end end def peek - @Head.get + head end def compare_and_pop(head) - @Head.compare_and_set head, head.next_node + compare_and_set_head head, head.next_node end def pop while true - head = @Head.get - return head.value if @Head.compare_and_set head, head.next_node + current_head = head + return current_head.value if compare_and_set_head current_head, current_head.next_node end end def compare_and_clear(head) - @Head.compare_and_set head, EMPTY + compare_and_set_head head, EMPTY end def clear while true - head = @Head.get - return false if head == EMPTY - return true if @Head.compare_and_set head, EMPTY + current_head = head + return false if current_head == EMPTY + return true if compare_and_set_head current_head, EMPTY end end diff --git a/lib/concurrent/executor/safe_task_executor.rb b/lib/concurrent/executor/safe_task_executor.rb index 94e104e8f..c8d9b9d4b 100644 --- a/lib/concurrent/executor/safe_task_executor.rb +++ b/lib/concurrent/executor/safe_task_executor.rb @@ -9,23 +9,22 @@ module Concurrent class SafeTaskExecutor < Synchronization::LockableObject def initialize(task, opts = {}) - super() - @task = task + @task = task @exception_class = opts.fetch(:rescue_exception, false) ? Exception : StandardError - ensure_ivar_visibility! + super() # ensures visibility end # @return [Array] def execute(*args) synchronize do success = false - value = reason = nil + value = reason = nil begin - value = @task.call(*args) + value = @task.call(*args) success = true rescue @exception_class => ex - reason = ex + reason = ex success = false end diff --git a/lib/concurrent/synchronization/condition.rb b/lib/concurrent/synchronization/condition.rb index 03f3fbe7d..7b100be83 100644 --- a/lib/concurrent/synchronization/condition.rb +++ b/lib/concurrent/synchronization/condition.rb @@ -9,7 +9,6 @@ class Condition < LockableObject def initialize(lock) @Lock = lock - ensure_ivar_visibility! super() end diff --git a/lib/concurrent/synchronization/lockable_object.rb b/lib/concurrent/synchronization/lockable_object.rb index 5ce0ea845..fabfe3251 100644 --- a/lib/concurrent/synchronization/lockable_object.rb +++ b/lib/concurrent/synchronization/lockable_object.rb @@ -19,7 +19,10 @@ module Synchronization private_constant :LockableObjectImplementation class LockableObject < LockableObjectImplementation - def self.allow_only_direct_descendants! # FIXME interne dedime docela dost :/ + + # TODO make private for c-r + + def self.allow_only_direct_descendants! # FIXME we inherit too much ourselves :/ this = self singleton_class.send :define_method, :inherited do |child| # super child diff --git a/lib/concurrent/synchronization/mri_lockable_object.rb b/lib/concurrent/synchronization/mri_lockable_object.rb index 38c4e4253..c79942c72 100644 --- a/lib/concurrent/synchronization/mri_lockable_object.rb +++ b/lib/concurrent/synchronization/mri_lockable_object.rb @@ -21,8 +21,8 @@ def ns_broadcast # @!visibility private # @!macro internal_implementation_note class MriMutexLockableObject < MriLockableObject - def initialize - super + def initialize(*defaults) + super(*defaults) @__lock__ = ::Mutex.new @__condition__ = ::ConditionVariable.new end @@ -46,8 +46,8 @@ def ns_wait(timeout = nil) # @!visibility private # @!macro internal_implementation_note class MriMonitorLockableObject < MriLockableObject - def initialize - super + def initialize(*defaults) + super(*defaults) @__lock__ = ::Monitor.new @__condition__ = @__lock__.new_cond end diff --git a/lib/concurrent/synchronization/object.rb b/lib/concurrent/synchronization/object.rb index 4745b56e6..e219dca32 100644 --- a/lib/concurrent/synchronization/object.rb +++ b/lib/concurrent/synchronization/object.rb @@ -1,20 +1,19 @@ - module Concurrent module Synchronization # @!visibility private # @!macro internal_implementation_note ObjectImplementation = case - when Concurrent.on_cruby? - MriObject - when Concurrent.on_jruby? - JRubyObject - when Concurrent.on_rbx? - RbxObject - else - warn 'Possibly unsupported Ruby implementation' - MriObject - end + when Concurrent.on_cruby? + MriObject + when Concurrent.on_jruby? + JRubyObject + when Concurrent.on_rbx? + RbxObject + else + warn 'Possibly unsupported Ruby implementation' + MriObject + end private_constant :ObjectImplementation # TODO fix documentation @@ -47,12 +46,57 @@ module Synchronization # class Object < ObjectImplementation - # TODO split to be able to use just final fields - # - object has mfence, volatile fields, and cas fields + def initialize(*defaults) + super() + initialize_volatile_cas_fields(defaults) + ensure_ivar_visibility! + end + + def self.attr_volatile_with_cas(*names) + @volatile_cas_fields ||= [] + @volatile_cas_fields += names + + names.each do |name| + ivar = :"@VolatileCas_#{name}" + class_eval <<-RUBY, __FILE__, __LINE__ + 1 + def #{name} + #{ivar}.get + end + + def #{name}=(value) + #{ivar}.set value + end + + def swap_#{name}(value) + #{ivar}.swap value + end + + def compare_and_set_#{name}(expected, value) + #{ivar}.compare_and_set expected, value + end + + def update_#{name}(&block) + #{ivar}.update &block + end + RUBY + end + names.map { |n| [n, :"#{n}=", :"swap_#{n}", :"compare_and_set_#{n}"] }.flatten + end + + def self.volatile_cas_fields(inherited = true) + @volatile_cas_fields ||= [] + ((superclass.volatile_cas_fields if superclass.respond_to?(:volatile_cas_fields) && inherited) || []) + + @volatile_cas_fields + end + + private - # TODO lock should be the public api, Object with private synchronize, signal, .. should be - # private class just for concurrent-ruby, forbid inheritance of classes using it, like CountDownLatch - # TODO in place CAS + def initialize_volatile_cas_fields(defaults) + self.class.volatile_cas_fields.zip(defaults) do |name, default| + instance_variable_set :"@VolatileCas_#{name}", AtomicReference.new(default) + end + nil + end # @!method initialize # @!macro synchronization_object_method_initialize diff --git a/lib/concurrent/synchronization/rbx_lockable_object.rb b/lib/concurrent/synchronization/rbx_lockable_object.rb index 6fff96e28..d7d81a873 100644 --- a/lib/concurrent/synchronization/rbx_lockable_object.rb +++ b/lib/concurrent/synchronization/rbx_lockable_object.rb @@ -4,10 +4,10 @@ module Synchronization # @!visibility private # @!macro internal_implementation_note class RbxLockableObject < AbstractLockableObject - def initialize + def initialize(*defaults) @__Waiters__ = [] @__owner__ = nil - ensure_ivar_visibility! + super(*defaults) # ensures visibility end protected diff --git a/lib/concurrent/utility/monotonic_time.rb b/lib/concurrent/utility/monotonic_time.rb index 08e2e29e8..8e40d690b 100644 --- a/lib/concurrent/utility/monotonic_time.rb +++ b/lib/concurrent/utility/monotonic_time.rb @@ -4,9 +4,8 @@ module Concurrent class_definition = Class.new(Synchronization::LockableObject) do def initialize - super() @last_time = Time.now.to_f - ensure_ivar_visibility! + super() end if defined?(Process::CLOCK_MONOTONIC) From 0bd741c9e9daa6e335adeaf2d38ac5352eb77c8c Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Tue, 8 Sep 2015 21:15:27 +0200 Subject: [PATCH 3/5] Fix Synchronization on JRuby --- .../ext/SynchronizationLibrary.java | 185 +++++++++++------- lib/concurrent/agent.rb | 2 +- lib/concurrent/atom.rb | 4 +- .../executor/abstract_executor_service.rb | 2 +- lib/concurrent/executor/safe_task_executor.rb | 2 +- .../executor/serialized_execution.rb | 2 +- .../utility/native_extension_loader.rb | 2 +- 7 files changed, 122 insertions(+), 77 deletions(-) diff --git a/ext/com/concurrent_ruby/ext/SynchronizationLibrary.java b/ext/com/concurrent_ruby/ext/SynchronizationLibrary.java index 11d2ff30a..9ef239f00 100644 --- a/ext/com/concurrent_ruby/ext/SynchronizationLibrary.java +++ b/ext/com/concurrent_ruby/ext/SynchronizationLibrary.java @@ -1,7 +1,6 @@ package com.concurrent_ruby.ext; import java.io.IOException; -import java.util.concurrent.atomic.AtomicBoolean; import org.jruby.Ruby; import org.jruby.RubyClass; @@ -14,16 +13,32 @@ import org.jruby.runtime.load.Library; import org.jruby.runtime.Block; import org.jruby.runtime.Visibility; -import org.jruby.RubyBoolean; -import org.jruby.RubyNil; import org.jruby.runtime.ThreadContext; import org.jruby.util.unsafe.UnsafeHolder; public class SynchronizationLibrary implements Library { - private static final ObjectAllocator JRUBYREFERENCE_ALLOCATOR = new ObjectAllocator() { + private static final ObjectAllocator JRUBY_OBJECT_ALLOCATOR = new ObjectAllocator() { public IRubyObject allocate(Ruby runtime, RubyClass klazz) { - return new JavaObject(runtime, klazz); + return new JRubyObject(runtime, klazz); + } + }; + + private static final ObjectAllocator OBJECT_ALLOCATOR = new ObjectAllocator() { + public IRubyObject allocate(Ruby runtime, RubyClass klazz) { + return new Object(runtime, klazz); + } + }; + + private static final ObjectAllocator ABSTRACT_LOCKABLE_OBJECT_ALLOCATOR = new ObjectAllocator() { + public IRubyObject allocate(Ruby runtime, RubyClass klazz) { + return new AbstractLockableObject(runtime, klazz); + } + }; + + private static final ObjectAllocator JRUBY_LOCKABLE_OBJECT_ALLOCATOR = new ObjectAllocator() { + public IRubyObject allocate(Ruby runtime, RubyClass klazz) { + return new JRubyLockableObject(runtime, klazz); } }; @@ -31,25 +46,37 @@ public void load(Ruby runtime, boolean wrap) throws IOException { RubyModule synchronizationModule = runtime. defineModule("Concurrent"). defineModuleUnder("Synchronization"); - RubyClass parentClass = synchronizationModule.getClass("AbstractObject"); - if (parentClass == null) - throw runtime.newRuntimeError("Concurrent::Synchronization::AbstractObject is missing"); + defineClass(runtime, synchronizationModule, "AbstractObject", "JRubyObject", + JRubyObject.class, JRUBY_OBJECT_ALLOCATOR); - RubyClass synchronizedObjectJavaClass = - synchronizationModule.defineClassUnder("JavaObject", parentClass, JRUBYREFERENCE_ALLOCATOR); + defineClass(runtime, synchronizationModule, "JRubyObject", "Object", + Object.class, OBJECT_ALLOCATOR); - synchronizedObjectJavaClass.defineAnnotatedMethods(JavaObject.class); + defineClass(runtime, synchronizationModule, "Object", "AbstractLockableObject", + AbstractLockableObject.class, ABSTRACT_LOCKABLE_OBJECT_ALLOCATOR); + + defineClass(runtime, synchronizationModule, "AbstractLockableObject", "JRubyLockableObject", + JRubyLockableObject.class, JRUBY_LOCKABLE_OBJECT_ALLOCATOR); } - @JRubyClass(name = "JavaObject", parent = "AbstractObject") - public static class JavaObject extends RubyObject { + private RubyClass defineClass(Ruby runtime, RubyModule namespace, String parentName, String name, + Class javaImplementation, ObjectAllocator allocator) { + final RubyClass parentClass = namespace.getClass(parentName); - public static final long AN_VOLATILE_FIELD_OFFSET = - UnsafeHolder.fieldOffset(JavaObject.class, "anVolatileField"); - private volatile int anVolatileField = 0; + if (parentClass == null) { + System.out.println("not found " + parentName); + throw runtime.newRuntimeError(namespace.toString() + "::" + parentName + " is missing"); + } - public JavaObject(Ruby runtime, RubyClass metaClass) { + final RubyClass newClass = namespace.defineClassUnder(name, parentClass, allocator); + newClass.defineAnnotatedMethods(javaImplementation); + return newClass; + } + + @JRubyClass(name = "JRubyObject", parent = "AbstractObject") + public static class JRubyObject extends RubyObject { + public JRubyObject(Ruby runtime, RubyClass metaClass) { super(runtime, metaClass); } @@ -58,6 +85,77 @@ public IRubyObject initialize(ThreadContext context) { return this; } + @JRubyMethod(name = "full_memory_barrier", visibility = Visibility.PRIVATE) + public IRubyObject fullMemoryBarrier(ThreadContext context) { + if (UnsafeHolder.U == null) { + // We are screwed + throw new UnsupportedOperationException(); + } else if (UnsafeHolder.SUPPORTS_FENCES) + UnsafeHolder.fullFence(); + else { + // TODO (pitr 06-Sep-2015): enforce Java 8 + throw new UnsupportedOperationException(); + } + return context.nil; + } + + @JRubyMethod(name = "instance_variable_get_volatile", visibility = Visibility.PROTECTED) + public IRubyObject instanceVariableGetVolatile(ThreadContext context, IRubyObject name) { + if (UnsafeHolder.U == null) { + synchronized (this) { + // TODO (pitr 06-Sep-2015): Possibly dangerous, there may be a deadlock here + // TODO (pitr 08-Sep-2015): maybe remove the branch since full_memory_barrier is not supported anyway + return instance_variable_get(context, name); + } + } else if (UnsafeHolder.SUPPORTS_FENCES) { + // ensure we see latest value + UnsafeHolder.loadFence(); + return instance_variable_get(context, name); + } else { + throw new UnsupportedOperationException(); + } + } + + @JRubyMethod(name = "instance_variable_set_volatile", visibility = Visibility.PROTECTED) + public IRubyObject InstanceVariableSetVolatile(ThreadContext context, IRubyObject name, IRubyObject value) { + if (UnsafeHolder.U == null) { + synchronized (this) { + return instance_variable_set(name, value); + } + } else if (UnsafeHolder.SUPPORTS_FENCES) { + final IRubyObject result = instance_variable_set(name, value); + // ensure we make latest value visible + UnsafeHolder.storeFence(); + return result; + } else { + throw new UnsupportedOperationException(); + } + } + } + + @JRubyClass(name = "Object", parent = "JRubyObject") + public static class Object extends JRubyObject { + + public Object(Ruby runtime, RubyClass metaClass) { + super(runtime, metaClass); + } + } + + @JRubyClass(name = "AbstractLockableObject", parent = "Object") + public static class AbstractLockableObject extends Object { + + public AbstractLockableObject(Ruby runtime, RubyClass metaClass) { + super(runtime, metaClass); + } + } + + @JRubyClass(name = "JRubyLockableObject", parent = "AbstractLockableObject") + public static class JRubyLockableObject extends JRubyObject { + + public JRubyLockableObject(Ruby runtime, RubyClass metaClass) { + super(runtime, metaClass); + } + @JRubyMethod(name = "synchronize", visibility = Visibility.PROTECTED) public IRubyObject rubySynchronize(ThreadContext context, Block block) { synchronized (this) { @@ -108,58 +206,5 @@ public IRubyObject nsBroadcast(ThreadContext context) { notifyAll(); return this; } - - @JRubyMethod(name = "ensure_ivar_visibility!", visibility = Visibility.PROTECTED) - public IRubyObject ensureIvarVisibilityBang(ThreadContext context) { - if (UnsafeHolder.U == null) { - // We are screwed - throw new UnsupportedOperationException(); - } else if (UnsafeHolder.SUPPORTS_FENCES) - // We have to prevent ivar writes to reordered with storing of the final instance reference - // Therefore wee need a fullFence to prevent reordering in both directions. - UnsafeHolder.fullFence(); - else { - // Assumption that this is not eliminated, if false it will break non x86 platforms. - UnsafeHolder.U.putIntVolatile(this, AN_VOLATILE_FIELD_OFFSET, 1); - UnsafeHolder.U.getIntVolatile(this, AN_VOLATILE_FIELD_OFFSET); - } - return context.nil; - } - - @JRubyMethod(name = "instance_variable_get_volatile", visibility = Visibility.PROTECTED) - public IRubyObject instanceVariableGetVolatile(ThreadContext context, IRubyObject name) { - if (UnsafeHolder.U == null) { - // TODO: Possibly dangerous, there may be a deadlock on the this - synchronized (this) { - return instance_variable_get(context, name); - } - } else if (UnsafeHolder.SUPPORTS_FENCES) { - // ensure we see latest value - UnsafeHolder.loadFence(); - return instance_variable_get(context, name); - } else { - UnsafeHolder.U.getIntVolatile(this, AN_VOLATILE_FIELD_OFFSET); - return instance_variable_get(context, name); - } - } - - @JRubyMethod(name = "instance_variable_set_volatile", visibility = Visibility.PROTECTED) - public IRubyObject InstanceVariableSetVolatile(ThreadContext context, IRubyObject name, IRubyObject value) { - if (UnsafeHolder.U == null) { - // TODO: Possibly dangerous, there may be a deadlock on the this - synchronized (this) { - return instance_variable_set(name, value); - } - } else if (UnsafeHolder.SUPPORTS_FENCES) { - final IRubyObject result = instance_variable_set(name, value); - // ensure we make latest value visible - UnsafeHolder.storeFence(); - return result; - } else { - final IRubyObject result = instance_variable_set(name, value); - UnsafeHolder.U.putIntVolatile(this, AN_VOLATILE_FIELD_OFFSET, 1); - return result; - } - } } } diff --git a/lib/concurrent/agent.rb b/lib/concurrent/agent.rb index cfe95dca1..58f30c9f9 100644 --- a/lib/concurrent/agent.rb +++ b/lib/concurrent/agent.rb @@ -3,7 +3,7 @@ require 'concurrent/atomic/thread_local_var' require 'concurrent/collection/copy_on_write_observer_set' require 'concurrent/concern/observable' -require 'concurrent/synchronization/object' +require 'concurrent/synchronization' module Concurrent diff --git a/lib/concurrent/atom.rb b/lib/concurrent/atom.rb index de4dabe8a..25e1ed76f 100644 --- a/lib/concurrent/atom.rb +++ b/lib/concurrent/atom.rb @@ -1,7 +1,7 @@ require 'concurrent/atomic/atomic_reference' require 'concurrent/collection/copy_on_notify_observer_set' require 'concurrent/concern/observable' -require 'concurrent/synchronization/object' +require 'concurrent/synchronization' module Concurrent @@ -76,7 +76,7 @@ class Atom < Synchronization::Object # # @raise [ArgumentError] if the validator is not a `Proc` (when given) def initialize(value, opts = {}) - @Validator = opts.fetch(:validator, -> (v) { true }) + @Validator = opts.fetch(:validator, -> v { true }) self.observers = Collection::CopyOnNotifyObserverSet.new super(value) # ensures visibility end diff --git a/lib/concurrent/executor/abstract_executor_service.rb b/lib/concurrent/executor/abstract_executor_service.rb index ef013d919..35212d321 100644 --- a/lib/concurrent/executor/abstract_executor_service.rb +++ b/lib/concurrent/executor/abstract_executor_service.rb @@ -1,6 +1,6 @@ require 'concurrent/errors' require 'concurrent/executor/executor_service' -require 'concurrent/synchronization/object' +require 'concurrent/synchronization' require 'concurrent/utility/at_exit' module Concurrent diff --git a/lib/concurrent/executor/safe_task_executor.rb b/lib/concurrent/executor/safe_task_executor.rb index c8d9b9d4b..414aa641f 100644 --- a/lib/concurrent/executor/safe_task_executor.rb +++ b/lib/concurrent/executor/safe_task_executor.rb @@ -1,4 +1,4 @@ -require 'concurrent/synchronization/object' +require 'concurrent/synchronization' module Concurrent diff --git a/lib/concurrent/executor/serialized_execution.rb b/lib/concurrent/executor/serialized_execution.rb index 1fd9b8855..d314e9061 100644 --- a/lib/concurrent/executor/serialized_execution.rb +++ b/lib/concurrent/executor/serialized_execution.rb @@ -1,6 +1,6 @@ require 'concurrent/errors' require 'concurrent/concern/logging' -require 'concurrent/synchronization/object' +require 'concurrent/synchronization' module Concurrent diff --git a/lib/concurrent/utility/native_extension_loader.rb b/lib/concurrent/utility/native_extension_loader.rb index 3387befd1..8fd8e5a59 100644 --- a/lib/concurrent/utility/native_extension_loader.rb +++ b/lib/concurrent/utility/native_extension_loader.rb @@ -1,4 +1,4 @@ -require 'concurrent/synchronization/abstract_object' # for JRuby +require 'concurrent/synchronization' # has to be loaded before JRuby extensions require 'concurrent/utility/engine' module Concurrent From a9da852b163a947c4a782b9c33beb7a99cc850a1 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Sat, 12 Sep 2015 21:30:02 +0200 Subject: [PATCH 4/5] Introduce safe_initialization! class marker instead of ensure_ivar_visibility! * ensures full memory barier is inserted only when needed * ensures that it's inserted only once * testable correctness with @FinalIvar convention --- .../ext/SynchronizationLibrary.java | 12 ++--- lib/concurrent/agent.rb | 2 +- lib/concurrent/atom.rb | 3 +- lib/concurrent/atomic/read_write_lock.rb | 4 +- .../atomic/reentrant_read_write_lock.rb | 4 +- lib/concurrent/channel/buffered_channel.rb | 2 +- lib/concurrent/delay.rb | 2 +- lib/concurrent/edge/future.rb | 15 ++++-- lib/concurrent/edge/lock_free_linked_set.rb | 12 ++--- .../edge/lock_free_linked_set/node.rb | 27 +++++++--- .../edge/lock_free_linked_set/window.rb | 6 +-- lib/concurrent/edge/lock_free_stack.rb | 2 + .../executor/ruby_executor_service.rb | 10 ++-- lib/concurrent/immutable_struct.rb | 4 ++ lib/concurrent/mutable_struct.rb | 5 +- lib/concurrent/settable_struct.rb | 5 +- .../abstract_lockable_object.rb | 16 ++---- .../synchronization/abstract_object.rb | 11 ++-- .../synchronization/abstract_struct.rb | 5 +- lib/concurrent/synchronization/condition.rb | 5 +- .../synchronization/lockable_object.rb | 10 ++-- .../synchronization/mri_lockable_object.rb | 4 ++ lib/concurrent/synchronization/object.rb | 52 ++++++++++++++++-- .../synchronization/rbx_lockable_object.rb | 4 +- spec/concurrent/synchronization_spec.rb | 54 ++++++++++++++++--- 25 files changed, 193 insertions(+), 83 deletions(-) diff --git a/ext/com/concurrent_ruby/ext/SynchronizationLibrary.java b/ext/com/concurrent_ruby/ext/SynchronizationLibrary.java index 9ef239f00..20cc7e652 100644 --- a/ext/com/concurrent_ruby/ext/SynchronizationLibrary.java +++ b/ext/com/concurrent_ruby/ext/SynchronizationLibrary.java @@ -87,14 +87,12 @@ public IRubyObject initialize(ThreadContext context) { @JRubyMethod(name = "full_memory_barrier", visibility = Visibility.PRIVATE) public IRubyObject fullMemoryBarrier(ThreadContext context) { - if (UnsafeHolder.U == null) { - // We are screwed - throw new UnsupportedOperationException(); - } else if (UnsafeHolder.SUPPORTS_FENCES) + if (UnsafeHolder.U == null || !UnsafeHolder.SUPPORTS_FENCES) { + throw new UnsupportedOperationException( + "concurrent-ruby requires java with sun.mics.Unsafe fences support, such as Java 8. " + + "Current version is: " + System.getProperty("java.version")); + } else { UnsafeHolder.fullFence(); - else { - // TODO (pitr 06-Sep-2015): enforce Java 8 - throw new UnsupportedOperationException(); } return context.nil; } diff --git a/lib/concurrent/agent.rb b/lib/concurrent/agent.rb index 58f30c9f9..8af58aed6 100644 --- a/lib/concurrent/agent.rb +++ b/lib/concurrent/agent.rb @@ -224,7 +224,7 @@ def initialize(initial, opts = {}) # # @return [Object] the current value def value - @current.value + @current.value # TODO (pitr 12-Sep-2015): broken unsafe read? end alias_method :deref, :value diff --git a/lib/concurrent/atom.rb b/lib/concurrent/atom.rb index 25e1ed76f..0c1fbf8f6 100644 --- a/lib/concurrent/atom.rb +++ b/lib/concurrent/atom.rb @@ -60,6 +60,7 @@ module Concurrent class Atom < Synchronization::Object include Concern::Observable + safe_initialization! private *attr_volatile_with_cas(:value) public :value @@ -78,7 +79,7 @@ class Atom < Synchronization::Object def initialize(value, opts = {}) @Validator = opts.fetch(:validator, -> v { true }) self.observers = Collection::CopyOnNotifyObserverSet.new - super(value) # ensures visibility + super(value) end # @!method value diff --git a/lib/concurrent/atomic/read_write_lock.rb b/lib/concurrent/atomic/read_write_lock.rb index dac6b1c4d..435d8f0d6 100644 --- a/lib/concurrent/atomic/read_write_lock.rb +++ b/lib/concurrent/atomic/read_write_lock.rb @@ -41,6 +41,8 @@ class ReadWriteLock < Synchronization::Object # @!visibility private MAX_WRITERS = RUNNING_WRITER - MAX_READERS - 1 + safe_initialization! + # Implementation notes: # A goal is to make the uncontended path for both readers/writers lock-free # Only if there is reader-writer or writer-writer contention, should locks be used @@ -54,10 +56,10 @@ class ReadWriteLock < Synchronization::Object # Create a new `ReadWriteLock` in the unlocked state. def initialize + super() @Counter = AtomicFixnum.new(0) # single integer which represents lock state @ReadLock = Synchronization::Lock.new @WriteLock = Synchronization::Lock.new - super() # ensures visibility end # Execute a block operation within a read lock. diff --git a/lib/concurrent/atomic/reentrant_read_write_lock.rb b/lib/concurrent/atomic/reentrant_read_write_lock.rb index 5c21f904a..3f29169bd 100644 --- a/lib/concurrent/atomic/reentrant_read_write_lock.rb +++ b/lib/concurrent/atomic/reentrant_read_write_lock.rb @@ -99,13 +99,15 @@ class ReentrantReadWriteLock < Synchronization::Object # @!visibility private WRITE_LOCK_MASK = MAX_WRITERS + safe_initialization! + # Create a new `ReentrantReadWriteLock` in the unlocked state. def initialize + super() @Counter = AtomicFixnum.new(0) # single integer which represents lock state @ReadQueue = Synchronization::Lock.new # used to queue waiting readers @WriteQueue = Synchronization::Lock.new # used to queue waiting writers @HeldCount = ThreadLocalVar.new(0) # indicates # of R & W locks held by this thread - super() # ensures visibility end # Execute a block operation within a read lock. diff --git a/lib/concurrent/channel/buffered_channel.rb b/lib/concurrent/channel/buffered_channel.rb index 19a4324a0..056e8064f 100644 --- a/lib/concurrent/channel/buffered_channel.rb +++ b/lib/concurrent/channel/buffered_channel.rb @@ -14,7 +14,7 @@ def initialize(size) end def probe_set_size - @probe_set.size + @probe_set.size # TODO (pitr 12-Sep-2015): unsafe? end def buffer_queue_size diff --git a/lib/concurrent/delay.rb b/lib/concurrent/delay.rb index d5a3f6d39..492d2ed75 100644 --- a/lib/concurrent/delay.rb +++ b/lib/concurrent/delay.rb @@ -74,7 +74,7 @@ def initialize(opts = {}, &block) # # @!macro delay_note_regarding_blocking def value(timeout = nil) - if @executor + if @executor # TODO (pitr 12-Sep-2015): broken unsafe read? super else # this function has been optimized for performance and diff --git a/lib/concurrent/edge/future.rb b/lib/concurrent/edge/future.rb index 4a8d59194..155e6defc 100644 --- a/lib/concurrent/edge/future.rb +++ b/lib/concurrent/edge/future.rb @@ -126,6 +126,7 @@ def post_on(executor, *args, &job) # Represents an event which will happen in future (will be completed). It has to always happen. class Event < Synchronization::LockableObject + safe_initialization! include Concern::Deprecation # @!visibility private @@ -167,13 +168,15 @@ def to_sym COMPLETED = Completed.new def initialize(promise, default_executor) + super() @Promise = promise @DefaultExecutor = default_executor @Touched = AtomicBoolean.new(false) @Callbacks = LockFreeStack.new - @Waiters = LockFreeStack.new # TODO replace with AtomicFixnum, avoid aba problem + # TODO (pitr 12-Sep-2015): replace with AtomicFixnum, avoid aba problem + # TODO (pitr 12-Sep-2015): look at java.util.concurrent solution + @Waiters = LockFreeStack.new @State = AtomicReference.new PENDING - super() # ensures visibility end # @return [:pending, :completed] @@ -877,9 +880,11 @@ def hide_completable # @abstract # @!visibility private class AbstractPromise < Synchronization::Object + safe_initialization! + def initialize(future) - @Future = future super() + @Future = future end def future @@ -1373,10 +1378,12 @@ def initialize(default_executor, intended_time) # @note proof of concept class Channel < Synchronization::Object + safe_initialization! + # TODO make lock free def initialize - @ProbeSet = Concurrent::Channel::WaitableList.new super() + @ProbeSet = Concurrent::Channel::WaitableList.new end def probe_set_size diff --git a/lib/concurrent/edge/lock_free_linked_set.rb b/lib/concurrent/edge/lock_free_linked_set.rb index 2c294923f..beae38eca 100644 --- a/lib/concurrent/edge/lock_free_linked_set.rb +++ b/lib/concurrent/edge/lock_free_linked_set.rb @@ -55,7 +55,7 @@ def add(item) node = Node.new item, curr - if pred.Successor_reference.compare_and_set curr, node, false, false + if pred.successor_reference.compare_and_set curr, node, false, false return true end end @@ -88,7 +88,7 @@ def contains?(item) while curr < item curr = curr.next_node - marked = curr.Successor_reference.marked? + marked = curr.successor_reference.marked? end curr == item && !marked @@ -109,11 +109,11 @@ def remove(item) return false if curr != item succ = curr.next_node - removed = curr.Successor_reference.compare_and_set succ, succ, false, true + removed = curr.successor_reference.compare_and_set succ, succ, false, true next_node unless removed - pred.Successor_reference.compare_and_set curr, succ, false, false + pred.successor_reference.compare_and_set curr, succ, false, false return true end @@ -134,9 +134,9 @@ def each until curr.last? curr = curr.next_node - marked = curr.Successor_reference.marked? + marked = curr.successor_reference.marked? - yield curr.Data unless marked + yield curr.data unless marked end self diff --git a/lib/concurrent/edge/lock_free_linked_set/node.rb b/lib/concurrent/edge/lock_free_linked_set/node.rb index 82f1ff383..1a7c31a7c 100644 --- a/lib/concurrent/edge/lock_free_linked_set/node.rb +++ b/lib/concurrent/edge/lock_free_linked_set/node.rb @@ -6,25 +6,36 @@ class LockFreeLinkedSet class Node < Synchronization::Object include Comparable - attr_reader :Data, :Successor_reference, :Key + safe_initialization! def initialize(data = nil, successor = nil) - @Successor_reference = AtomicMarkableReference.new(successor || Tail.new) - @Data = data - @Key = key_for data + super() + @SuccessorReference = AtomicMarkableReference.new(successor || Tail.new) + @Data = data + @Key = key_for data + end + + def data + @Data + end + + def successor_reference + @SuccessorReference + end - super() # ensures visibility + def key + @Key end # Check to see if the node is the last in the list. def last? - @Successor_reference.value.is_a? Tail + @SuccessorReference.value.is_a? Tail end # Next node in the list. Note: this is not the AtomicMarkableReference # of the next node, this is the actual Node itself. def next_node - @Successor_reference.value + @SuccessorReference.value end # This method provides a unqiue key for the data which will be used for @@ -47,7 +58,7 @@ def <=>(other) # a self-loop. class Tail < Node def initialize(_data = nil, _succ = nil) - @Successor_reference = AtomicMarkableReference.new self + @SuccessorReference = AtomicMarkableReference.new self end # Always greater than other nodes. This means that traversal will end diff --git a/lib/concurrent/edge/lock_free_linked_set/window.rb b/lib/concurrent/edge/lock_free_linked_set/window.rb index d96c8b753..fe7edf8a3 100644 --- a/lib/concurrent/edge/lock_free_linked_set/window.rb +++ b/lib/concurrent/edge/lock_free_linked_set/window.rb @@ -20,17 +20,17 @@ def self.find(head, item) curr = pred.next_node loop do - succ, marked = curr.Successor_reference.get + succ, marked = curr.successor_reference.get # Remove sequence of marked nodes while marked - removed = pred.Successor_reference.compare_and_set curr, succ, false, false + removed = pred.successor_reference.compare_and_set curr, succ, false, false # If could not remove node, try again break_inner_loops = true && break unless removed curr = succ - succ, marked = curr.Successor_reference.get + succ, marked = curr.successor_reference.get end break if break_inner_loops diff --git a/lib/concurrent/edge/lock_free_stack.rb b/lib/concurrent/edge/lock_free_stack.rb index 0decb0de2..df1874057 100644 --- a/lib/concurrent/edge/lock_free_stack.rb +++ b/lib/concurrent/edge/lock_free_stack.rb @@ -2,6 +2,8 @@ module Concurrent module Edge class LockFreeStack < Synchronization::Object + safe_initialization! + class Node attr_reader :value, :next_node diff --git a/lib/concurrent/executor/ruby_executor_service.rb b/lib/concurrent/executor/ruby_executor_service.rb index 96a9e0b2c..7b2ee7377 100644 --- a/lib/concurrent/executor/ruby_executor_service.rb +++ b/lib/concurrent/executor/ruby_executor_service.rb @@ -6,12 +6,12 @@ module Concurrent # @!macro abstract_executor_service_public_api # @!visibility private class RubyExecutorService < AbstractExecutorService + safe_initialization! def initialize(*args, &block) super - @stop_event = Event.new - @stopped_event = Event.new - ensure_ivar_visibility! + @StopEvent = Event.new + @StoppedEvent = Event.new end def post(*args, &task) @@ -52,11 +52,11 @@ def wait_for_termination(timeout = nil) private def stop_event - @stop_event + @StopEvent end def stopped_event - @stopped_event + @StoppedEvent end def ns_shutdown_execution diff --git a/lib/concurrent/immutable_struct.rb b/lib/concurrent/immutable_struct.rb index e8e0111d1..05b8035c0 100644 --- a/lib/concurrent/immutable_struct.rb +++ b/lib/concurrent/immutable_struct.rb @@ -9,6 +9,10 @@ module Concurrent module ImmutableStruct include Synchronization::AbstractStruct + def self.included(base) + base.safe_initialization! + end + # @!macro struct_values def values ns_values diff --git a/lib/concurrent/mutable_struct.rb b/lib/concurrent/mutable_struct.rb index 3252356bf..a1587012c 100644 --- a/lib/concurrent/mutable_struct.rb +++ b/lib/concurrent/mutable_struct.rb @@ -184,8 +184,9 @@ def select(&block) # @raise [IndexError] if the index is out of range. def []=(member, value) if member.is_a? Integer - if member >= @values.length - raise IndexError.new("offset #{member} too large for struct(size:#{@values.length})") + length = synchronize { @values.length } + if member >= length + raise IndexError.new("offset #{member} too large for struct(size:#{length})") end synchronize { @values[member] = value } else diff --git a/lib/concurrent/settable_struct.rb b/lib/concurrent/settable_struct.rb index b4365d34c..f6feb4559 100644 --- a/lib/concurrent/settable_struct.rb +++ b/lib/concurrent/settable_struct.rb @@ -74,8 +74,9 @@ def select(&block) # @raise [Concurrent::ImmutabilityError] if the given member has already been set def []=(member, value) if member.is_a? Integer - if member >= @values.length - raise IndexError.new("offset #{member} too large for struct(size:#{@values.length})") + length = synchronize { @values.length } + if member >= length + raise IndexError.new("offset #{member} too large for struct(size:#{length})") end synchronize do unless @values[member].nil? diff --git a/lib/concurrent/synchronization/abstract_lockable_object.rb b/lib/concurrent/synchronization/abstract_lockable_object.rb index 2da28843b..f47b4ea82 100644 --- a/lib/concurrent/synchronization/abstract_lockable_object.rb +++ b/lib/concurrent/synchronization/abstract_lockable_object.rb @@ -4,14 +4,6 @@ module Synchronization # @!visibility private class AbstractLockableObject < Object - # # @!macro [attach] synchronization_object_method_initialize - # # - # # @abstract for helper ivar initialization if needed, - # # otherwise it can be left empty. It has to call ns_initialize. # FIXME no longer true - # def initialize - # super - # end - protected # @!macro [attach] synchronization_object_method_synchronize @@ -38,6 +30,7 @@ def synchronize # @args = args # end # end + # TODO (pitr 12-Sep-2015): remove def ns_initialize(*args, &block) end @@ -62,10 +55,7 @@ def ns_wait_until(timeout = nil, &condition) loop do now = Concurrent.monotonic_time condition_result = condition.call - # TODO recheck and fix properly - # 0.001 correction to avoid error when `wait_until - now` is smaller than 0.0005 and rounded to 0 - # when passed to java #wait(long timeout) - return condition_result if (now + 0.001) >= wait_until || condition_result + return condition_result if now >= wait_until || condition_result ns_wait wait_until - now end else @@ -124,4 +114,4 @@ def ns_broadcast end end -end \ No newline at end of file +end diff --git a/lib/concurrent/synchronization/abstract_object.rb b/lib/concurrent/synchronization/abstract_object.rb index c4a11e058..1accf2b8c 100644 --- a/lib/concurrent/synchronization/abstract_object.rb +++ b/lib/concurrent/synchronization/abstract_object.rb @@ -5,15 +5,11 @@ module Synchronization # @!visibility private class AbstractObject - # @!macro [attach] synchronization_object_method_initialize - # - # @abstract has to be called by children + # @abstract has to be implemented based on Ruby runtime def initialize raise NotImplementedError end - protected - # @!macro [attach] synchronization_object_method_ensure_ivar_visibility # # Allows to construct immutable objects where all fields are visible after initialization, not requiring @@ -27,12 +23,17 @@ def initialize # # now it can be shared as Java's final field # end # end + # @!visibility private def ensure_ivar_visibility! # We have to prevent ivar writes to reordered with storing of the final instance reference # Therefore wee need a fullFence to prevent reordering in both directions. full_memory_barrier end + protected + + # @!visibility private + # @abstract def full_memory_barrier raise NotImplementedError end diff --git a/lib/concurrent/synchronization/abstract_struct.rb b/lib/concurrent/synchronization/abstract_struct.rb index 1b6043e42..1fd66ef9e 100644 --- a/lib/concurrent/synchronization/abstract_struct.rb +++ b/lib/concurrent/synchronization/abstract_struct.rb @@ -9,7 +9,6 @@ module AbstractStruct def initialize(*values) super() ns_initialize(*values) - ensure_ivar_visibility! end # @!macro [attach] struct_length @@ -131,7 +130,7 @@ def pr_underscore(clazz) def self.define_struct_class(parent, base, name, members, &block) clazz = Class.new(base || Object) do include parent - self.const_set(:MEMBERS, members.collect{|member| member.to_s.to_sym}.freeze) + self.const_set(:MEMBERS, members.collect{|member| member.to_s.to_sym}.freeze) def ns_initialize(*values) raise ArgumentError.new('struct size differs') if values.length > length @values = values.fill(nil, values.length..length-1) @@ -142,7 +141,7 @@ def ns_initialize(*values) parent.const_set(name, clazz) parent.const_get(name) rescue NameError - raise NameError.new("identifier #{name} needs to be constant") + raise NameError.new("identifier #{name} needs to be constant") end end members.each_with_index do |member, index| diff --git a/lib/concurrent/synchronization/condition.rb b/lib/concurrent/synchronization/condition.rb index 7b100be83..f5e03ed47 100644 --- a/lib/concurrent/synchronization/condition.rb +++ b/lib/concurrent/synchronization/condition.rb @@ -1,15 +1,16 @@ module Concurrent module Synchronization class Condition < LockableObject + safe_initialization! - # TODO locks two objects, improve + # TODO (pitr 12-Sep-2015): locks two objects, improve singleton_class.send :alias_method, :private_new, :new private_class_method :new def initialize(lock) - @Lock = lock super() + @Lock = lock end def wait(timeout = nil) diff --git a/lib/concurrent/synchronization/lockable_object.rb b/lib/concurrent/synchronization/lockable_object.rb index fabfe3251..fbab1d6d8 100644 --- a/lib/concurrent/synchronization/lockable_object.rb +++ b/lib/concurrent/synchronization/lockable_object.rb @@ -18,17 +18,17 @@ module Synchronization end private_constant :LockableObjectImplementation + # TODO (pitr 12-Sep-2015): make private for c-r, prohibit subclassing class LockableObject < LockableObjectImplementation - # TODO make private for c-r - - def self.allow_only_direct_descendants! # FIXME we inherit too much ourselves :/ + # TODO (pitr 12-Sep-2015): we inherit too much ourselves :/ + def self.allow_only_direct_descendants! this = self singleton_class.send :define_method, :inherited do |child| # super child if child.superclass != this - raise "all children of #{this} are final, subclassing is not supported use composition." + warn "all children of #{this} are final, subclassing is not supported use composition." end end end @@ -56,4 +56,4 @@ def self.allow_only_direct_descendants! # FIXME we inherit too much ourselves :/ end end -end \ No newline at end of file +end diff --git a/lib/concurrent/synchronization/mri_lockable_object.rb b/lib/concurrent/synchronization/mri_lockable_object.rb index c79942c72..22120280b 100644 --- a/lib/concurrent/synchronization/mri_lockable_object.rb +++ b/lib/concurrent/synchronization/mri_lockable_object.rb @@ -21,6 +21,8 @@ def ns_broadcast # @!visibility private # @!macro internal_implementation_note class MriMutexLockableObject < MriLockableObject + safe_initialization! + def initialize(*defaults) super(*defaults) @__lock__ = ::Mutex.new @@ -46,6 +48,8 @@ def ns_wait(timeout = nil) # @!visibility private # @!macro internal_implementation_note class MriMonitorLockableObject < MriLockableObject + safe_initialization! + def initialize(*defaults) super(*defaults) @__lock__ = ::Monitor.new diff --git a/lib/concurrent/synchronization/object.rb b/lib/concurrent/synchronization/object.rb index e219dca32..56820ef4d 100644 --- a/lib/concurrent/synchronization/object.rb +++ b/lib/concurrent/synchronization/object.rb @@ -46,15 +46,61 @@ module Synchronization # class Object < ObjectImplementation + # Has to be called by children. + # Initializes default volatile fields with cas if any. + # @param [Array] defaults values for fields, in same order as they are defined def initialize(*defaults) super() initialize_volatile_cas_fields(defaults) - ensure_ivar_visibility! end + # By calling this method on a class, it and all its children are marked to be constructed safely. Meaning that + # all writes (ivar initializations) are made visible to all readers of newly constructed object. It ensures + # same behaviour as Java's final fields. + # @example + # class AClass < Concurrent::Synchronization::Object + # safe_initialization! + # + # def initialize + # @AFinalValue = 'value' # published safly, does not have to be synchronized + # end + # end + def self.safe_initialization! + # define only once, and not again in children + return if safe_initialization? + + def self.new(*) + object = super + ensure + object.ensure_ivar_visibility! if object + end + + @safe_initialization = true + end + + def self.safe_initialization? + @safe_initialization || (superclass.respond_to?(:safe_initialization?) && superclass.safe_initialization?) + end + + # For testing purposes, quite slow. + def self.ensure_safe_initialization_when_final_fields_are_present + Object.class_eval do + def self.new(*) + object = super + ensure + has_final_field = object.instance_variables.any? { |v| v.to_s =~ /^@[A-Z]/ } + if has_final_field && !safe_initialization? + raise "there was an instance of #{object.class} with final field but not marked with safe_initialization!" + end + end + end + end + + # TODO documentation def self.attr_volatile_with_cas(*names) @volatile_cas_fields ||= [] @volatile_cas_fields += names + safe_initialization! names.each do |name| ivar = :"@VolatileCas_#{name}" @@ -84,6 +130,7 @@ def update_#{name}(&block) end def self.volatile_cas_fields(inherited = true) + # TODO (pitr 11-Sep-2015): maybe use constant for better optimisation on Truffle since it will not speculate on ivar being final @volatile_cas_fields ||= [] ((superclass.volatile_cas_fields if superclass.respond_to?(:volatile_cas_fields) && inherited) || []) + @volatile_cas_fields @@ -98,9 +145,6 @@ def initialize_volatile_cas_fields(defaults) nil end - # @!method initialize - # @!macro synchronization_object_method_initialize - # @!method ensure_ivar_visibility! # @!macro synchronization_object_method_ensure_ivar_visibility diff --git a/lib/concurrent/synchronization/rbx_lockable_object.rb b/lib/concurrent/synchronization/rbx_lockable_object.rb index d7d81a873..e6bec7c2b 100644 --- a/lib/concurrent/synchronization/rbx_lockable_object.rb +++ b/lib/concurrent/synchronization/rbx_lockable_object.rb @@ -4,10 +4,12 @@ module Synchronization # @!visibility private # @!macro internal_implementation_note class RbxLockableObject < AbstractLockableObject + safe_initialization! + def initialize(*defaults) + super(*defaults) @__Waiters__ = [] @__owner__ = nil - super(*defaults) # ensures visibility end protected diff --git a/spec/concurrent/synchronization_spec.rb b/spec/concurrent/synchronization_spec.rb index 7119e44b2..2520f192f 100644 --- a/spec/concurrent/synchronization_spec.rb +++ b/spec/concurrent/synchronization_spec.rb @@ -3,9 +3,50 @@ module Concurrent describe Synchronization do + describe Synchronization::Object do + class AAClass < Synchronization::Object + end + + class ABClass < AAClass + safe_initialization! + end + + class ACClass < ABClass + end + + class ADClass < ACClass + safe_initialization! + end + + it 'does not ensure visibility when not needed' do + expect_any_instance_of(AAClass).not_to receive(:ensure_ivar_visibility!) + AAClass.new + end + + it "does ensure visibility when specified" do + expect_any_instance_of(ABClass).to receive(:ensure_ivar_visibility!) + ABClass.new + end + + it "does ensure visibility when specified in a parent" do + expect_any_instance_of(ACClass).to receive(:ensure_ivar_visibility!) + ACClass.new + end + + it "does ensure visibility once when specified in child again" do + expect_any_instance_of(ADClass).to receive(:ensure_ivar_visibility!) + ADClass.new + end + + # TODO (pitr 12-Sep-2015): give a whole gem a pass to find classes with final fields without using the convention and migrate + Synchronization::Object.ensure_safe_initialization_when_final_fields_are_present + end + describe Synchronization::LockableObject do - class AClass < Synchronization::LockableObject + class BClass < Synchronization::LockableObject + safe_initialization! + attr_volatile :volatile attr_accessor :not_volatile @@ -13,7 +54,6 @@ def initialize(value = nil) super() @Final = value ns_initialize - ensure_ivar_visibility! end def final @@ -37,7 +77,7 @@ def ns_initialize end end - subject { AClass.new } + subject { BClass.new } describe '#wait' do @@ -74,7 +114,7 @@ def ns_initialize it 'can be called from within a #synchronize block' do expect { Timeout.timeout(3) do # #wait should release lock, even if it was already held on entry - t = Thread.new { subject.synchronize { subject.wait }} + t = Thread.new { subject.synchronize { subject.wait } } sleep 0.1 expect(t.status).to eq 'sleep' subject.synchronize {} # we will deadlock here if lock wasn't released @@ -95,8 +135,8 @@ def ns_initialize end specify 'final field always visible' do - store = AClass.new 'asd' - t1 = Thread.new { 1000000000.times { |i| store = AClass.new i.to_s } } + store = BClass.new 'asd' + t1 = Thread.new { 1000000000.times { |i| store = BClass.new i.to_s } } t2 = Thread.new { 10.times { expect(store.final).not_to be_nil; Thread.pass } } t2.join t1.kill @@ -104,7 +144,7 @@ def ns_initialize describe 'attr volatile' do specify 'older writes are always visible' do - store = AClass.new + store = BClass.new store.not_volatile = 0 store.volatile = 0 From 7b03abb17f490f4b89da2f9382deafe96ff96541 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Tue, 15 Sep 2015 18:05:30 +0200 Subject: [PATCH 5/5] Synchronisation support for Java 7 --- .../ext/SynchronizationLibrary.java | 49 +++++++++++-------- 1 file changed, 28 insertions(+), 21 deletions(-) diff --git a/ext/com/concurrent_ruby/ext/SynchronizationLibrary.java b/ext/com/concurrent_ruby/ext/SynchronizationLibrary.java index 20cc7e652..ff11da183 100644 --- a/ext/com/concurrent_ruby/ext/SynchronizationLibrary.java +++ b/ext/com/concurrent_ruby/ext/SynchronizationLibrary.java @@ -74,8 +74,17 @@ private RubyClass defineClass(Ruby runtime, RubyModule namespace, String parentN return newClass; } + // Facts: + // - all ivar reads are without any synchronisation of fences see + // https://github.com/jruby/jruby/blob/master/core/src/main/java/org/jruby/runtime/ivars/VariableAccessor.java#L110-110 + // - writes depend on UnsafeHolder.U, null -> SynchronizedVariableAccessor, !null -> StampedVariableAccessor + // SynchronizedVariableAccessor wraps with synchronized block, StampedVariableAccessor uses fullFence or + // volatilePut + @JRubyClass(name = "JRubyObject", parent = "AbstractObject") public static class JRubyObject extends RubyObject { + private static volatile ThreadContext threadContext = null; + public JRubyObject(Ruby runtime, RubyClass metaClass) { super(runtime, metaClass); } @@ -87,10 +96,12 @@ public IRubyObject initialize(ThreadContext context) { @JRubyMethod(name = "full_memory_barrier", visibility = Visibility.PRIVATE) public IRubyObject fullMemoryBarrier(ThreadContext context) { + // Prevent reordering of ivar writes with publication of this instance if (UnsafeHolder.U == null || !UnsafeHolder.SUPPORTS_FENCES) { - throw new UnsupportedOperationException( - "concurrent-ruby requires java with sun.mics.Unsafe fences support, such as Java 8. " + - "Current version is: " + System.getProperty("java.version")); + // Assuming that following volatile read and write is not eliminated it simulates fullFence. + // If it's eliminated it'll cause problems only on non-x86 platforms. + final ThreadContext oldContext = threadContext; + threadContext = context; } else { UnsafeHolder.fullFence(); } @@ -99,34 +110,30 @@ public IRubyObject fullMemoryBarrier(ThreadContext context) { @JRubyMethod(name = "instance_variable_get_volatile", visibility = Visibility.PROTECTED) public IRubyObject instanceVariableGetVolatile(ThreadContext context, IRubyObject name) { - if (UnsafeHolder.U == null) { - synchronized (this) { - // TODO (pitr 06-Sep-2015): Possibly dangerous, there may be a deadlock here - // TODO (pitr 08-Sep-2015): maybe remove the branch since full_memory_barrier is not supported anyway - return instance_variable_get(context, name); - } - } else if (UnsafeHolder.SUPPORTS_FENCES) { - // ensure we see latest value - UnsafeHolder.loadFence(); + // Ensure we ses latest value with loadFence + if (UnsafeHolder.U == null || !UnsafeHolder.SUPPORTS_FENCES) { + // piggybacking on volatile read, simulating loadFence + final ThreadContext oldContext = threadContext; return instance_variable_get(context, name); } else { - throw new UnsupportedOperationException(); + UnsafeHolder.loadFence(); + return instance_variable_get(context, name); } } @JRubyMethod(name = "instance_variable_set_volatile", visibility = Visibility.PROTECTED) public IRubyObject InstanceVariableSetVolatile(ThreadContext context, IRubyObject name, IRubyObject value) { - if (UnsafeHolder.U == null) { - synchronized (this) { - return instance_variable_set(name, value); - } - } else if (UnsafeHolder.SUPPORTS_FENCES) { + // Ensure we make last update visible + if (UnsafeHolder.U == null || !UnsafeHolder.SUPPORTS_FENCES) { + // piggybacking on volatile write, simulating storeFence final IRubyObject result = instance_variable_set(name, value); - // ensure we make latest value visible - UnsafeHolder.storeFence(); + threadContext = context; return result; } else { - throw new UnsupportedOperationException(); + // JRuby uses StampedVariableAccessor which calls fullFence + // so no additional steps needed. + // See https://github.com/jruby/jruby/blob/master/core/src/main/java/org/jruby/runtime/ivars/StampedVariableAccessor.java#L151-L159 + return instance_variable_set(name, value); } } }