Module: Concurrent + + + +
+-
+
- Defined in: +
- lib/concurrent-ruby/concurrent.rb,
+ lib/concurrent-ruby/concurrent/map.rb,
lib/concurrent-ruby/concurrent/set.rb,
lib/concurrent-ruby/concurrent/atom.rb,
lib/concurrent-ruby/concurrent/hash.rb,
lib/concurrent-ruby/concurrent/ivar.rb,
lib/concurrent-ruby/concurrent/mvar.rb,
lib/concurrent-ruby/concurrent/tvar.rb,
lib/concurrent-ruby/concurrent/agent.rb,
lib/concurrent-ruby/concurrent/array.rb,
lib/concurrent-ruby/concurrent/async.rb,
lib/concurrent-ruby/concurrent/delay.rb,
lib/concurrent-ruby/concurrent/maybe.rb,
lib/concurrent-ruby/concurrent/tuple.rb,
lib/concurrent-ruby/concurrent/errors.rb,
lib/concurrent-ruby/concurrent/future.rb,
lib/concurrent-ruby/concurrent/options.rb,
lib/concurrent-ruby/concurrent/promise.rb,
lib/concurrent-ruby/concurrent/version.rb,
lib/concurrent-ruby/concurrent/dataflow.rb,
lib/concurrent-ruby/concurrent/promises.rb,
lib/concurrent-ruby/concurrent/constants.rb,
lib/concurrent-ruby/concurrent/exchanger.rb,
lib/concurrent-ruby/concurrent/re_include.rb,
lib/concurrent-ruby/concurrent/timer_task.rb,
lib/concurrent-ruby/concurrent/atomic/event.rb,
lib/concurrent-ruby/concurrent/configuration.rb,
lib/concurrent-ruby/concurrent/mutable_struct.rb,
lib/concurrent-ruby/concurrent/scheduled_task.rb,
lib/concurrent-ruby/concurrent/utility/engine.rb,
lib/concurrent-ruby/concurrent/concern/logging.rb,
lib/concurrent-ruby/concurrent/settable_struct.rb,
lib/concurrent-ruby/concurrent/synchronization.rb,
lib/concurrent-ruby/concurrent/atomic/semaphore.rb,
lib/concurrent-ruby/concurrent/immutable_struct.rb,
lib/concurrent-ruby/concurrent/thread_safe/util.rb,
lib/concurrent-ruby/concurrent/concern/obligation.rb,
lib/concurrent-ruby/concurrent/concern/observable.rb,
lib/concurrent-ruby/concurrent/executor/timer_set.rb,
lib/concurrent-ruby/concurrent/concern/deprecation.rb,
lib/concurrent-ruby/concurrent/atomic/atomic_fixnum.rb,
lib/concurrent-ruby/concurrent/synchronization/lock.rb,
lib/concurrent-ruby/concurrent/atomic/atomic_boolean.rb,
lib/concurrent-ruby/concurrent/atomic/cyclic_barrier.rb,
lib/concurrent-ruby/concurrent/atomic/mutex_semaphore.rb,
lib/concurrent-ruby/concurrent/atomic/read_write_lock.rb,
lib/concurrent-ruby/concurrent/synchronization/object.rb,
lib/concurrent-ruby/concurrent/thread_safe/util/adder.rb,
lib/concurrent-ruby/concurrent/utility/monotonic_time.rb,
lib/concurrent-ruby/concurrent/utility/native_integer.rb,
lib/concurrent-ruby/concurrent/atomic/atomic_reference.rb,
lib/concurrent-ruby/concurrent/atomic/count_down_latch.rb,
lib/concurrent-ruby/concurrent/atomic/thread_local_var.rb,
lib/concurrent-ruby/concurrent/concern/dereferenceable.rb,
lib/concurrent-ruby/concurrent/synchronization/volatile.rb,
lib/concurrent-ruby/concurrent/executor/executor_service.rb,
lib/concurrent-ruby/concurrent/synchronization/condition.rb,
lib/concurrent-ruby/concurrent/thread_safe/util/volatile.rb,
lib/concurrent-ruby/concurrent/utility/processor_counter.rb,
lib/concurrent-ruby/concurrent/atomic/mutex_atomic_fixnum.rb,
lib/concurrent-ruby/concurrent/collection/lock_free_stack.rb,
lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb,
lib/concurrent-ruby/concurrent/synchronization/mri_object.rb,
lib/concurrent-ruby/concurrent/synchronization/rbx_object.rb,
lib/concurrent-ruby/concurrent/thread_safe/util/striped64.rb,
lib/concurrent-ruby/concurrent/atomic/mutex_atomic_boolean.rb,
lib/concurrent-ruby/concurrent/executor/cached_thread_pool.rb,
lib/concurrent-ruby/concurrent/executor/immediate_executor.rb,
lib/concurrent-ruby/concurrent/executor/safe_task_executor.rb,
lib/concurrent-ruby/concurrent/atomic/java_count_down_latch.rb,
lib/concurrent-ruby/concurrent/atomic/java_thread_local_var.rb,
lib/concurrent-ruby/concurrent/atomic/ruby_thread_local_var.rb,
lib/concurrent-ruby/concurrent/synchronization/jruby_object.rb,
lib/concurrent-ruby/concurrent/atomic/mutex_count_down_latch.rb,
lib/concurrent-ruby/concurrent/atomic_reference/mutex_atomic.rb,
lib/concurrent-ruby/concurrent/executor/serialized_execution.rb,
lib/concurrent-ruby/concurrent/executor/thread_pool_executor.rb,
lib/concurrent-ruby/concurrent/collection/map/mri_map_backend.rb,
lib/concurrent-ruby/concurrent/executor/java_executor_service.rb,
lib/concurrent-ruby/concurrent/executor/ruby_executor_service.rb,
lib/concurrent-ruby/concurrent/executor/single_thread_executor.rb,
lib/concurrent-ruby/concurrent/synchronization/abstract_object.rb,
lib/concurrent-ruby/concurrent/synchronization/abstract_struct.rb,
lib/concurrent-ruby/concurrent/synchronization/lockable_object.rb,
lib/concurrent-ruby/concurrent/thread_safe/util/cheap_lockable.rb,
lib/concurrent-ruby/concurrent/utility/native_extension_loader.rb,
lib/concurrent-ruby/concurrent/atomic/abstract_thread_local_var.rb,
lib/concurrent-ruby/concurrent/atomic/atomic_markable_reference.rb,
lib/concurrent-ruby/concurrent/atomic/reentrant_read_write_lock.rb,
lib/concurrent-ruby/concurrent/executor/serial_executor_service.rb,
lib/concurrent-ruby/concurrent/executor/simple_executor_service.rb,
lib/concurrent-ruby/concurrent/thread_safe/util/data_structures.rb,
lib/concurrent-ruby/concurrent/thread_safe/util/xor_shift_random.rb,
lib/concurrent-ruby/concurrent/executor/abstract_executor_service.rb,
lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb,
lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb,
lib/concurrent-ruby/concurrent/synchronization/truffleruby_object.rb,
lib/concurrent-ruby/concurrent/thread_safe/synchronized_delegator.rb,
lib/concurrent-ruby/concurrent/synchronization/rbx_lockable_object.rb,
lib/concurrent-ruby/concurrent/thread_safe/util/power_of_two_tuple.rb,
lib/concurrent-ruby/concurrent/atomic_reference/numeric_cas_wrapper.rb,
lib/concurrent-ruby/concurrent/executor/indirect_immediate_executor.rb,
lib/concurrent-ruby/concurrent/executor/java_single_thread_executor.rb,
lib/concurrent-ruby/concurrent/executor/ruby_single_thread_executor.rb,
lib/concurrent-ruby/concurrent/collection/copy_on_write_observer_set.rb,
lib/concurrent-ruby/concurrent/synchronization/jruby_lockable_object.rb,
lib/concurrent-ruby/concurrent/synchronization/mutex_lockable_object.rb,
lib/concurrent-ruby/concurrent/collection/copy_on_notify_observer_set.rb,
lib/concurrent-ruby/concurrent/collection/map/truffleruby_map_backend.rb,
lib/concurrent-ruby/concurrent/collection/map/synchronized_map_backend.rb,
lib/concurrent-ruby/concurrent/executor/serialized_execution_delegator.rb,
lib/concurrent-ruby/concurrent/collection/non_concurrent_priority_queue.rb,
lib/concurrent-ruby/concurrent/synchronization/abstract_lockable_object.rb,
lib/concurrent-ruby/concurrent/collection/map/non_concurrent_map_backend.rb,
lib/concurrent-ruby/concurrent/collection/map/atomic_reference_map_backend.rb,
lib/concurrent-ruby/concurrent/collection/java_non_concurrent_priority_queue.rb,
lib/concurrent-ruby/concurrent/collection/ruby_non_concurrent_priority_queue.rb,
lib/concurrent-ruby-edge/concurrent/edge.rb,
lib/concurrent-ruby-edge/concurrent/actor.rb,
lib/concurrent-ruby-edge/concurrent/channel.rb,
lib/concurrent-ruby-edge/concurrent/actor/core.rb,
lib/concurrent-ruby-edge/concurrent/actor/root.rb,
lib/concurrent-ruby-edge/concurrent/actor/utils.rb,
lib/concurrent-ruby-edge/concurrent/actor/errors.rb,
lib/concurrent-ruby-edge/concurrent/channel/tick.rb,
lib/concurrent-ruby-edge/concurrent/edge/channel.rb,
lib/concurrent-ruby-edge/concurrent/edge/version.rb,
lib/concurrent-ruby-edge/concurrent/actor/context.rb,
lib/concurrent-ruby-edge/concurrent/edge/promises.rb,
lib/concurrent-ruby-edge/concurrent/edge/throttle.rb,
lib/concurrent-ruby-edge/concurrent/lazy_register.rb,
lib/concurrent-ruby-edge/concurrent/actor/envelope.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour.rb,
lib/concurrent-ruby-edge/concurrent/actor/reference.rb,
lib/concurrent-ruby-edge/concurrent/actor/type_check.rb,
lib/concurrent-ruby-edge/concurrent/actor/utils/pool.rb,
lib/concurrent-ruby-edge/concurrent/channel/selector.rb,
lib/concurrent-ruby-edge/concurrent/edge/cancellation.rb,
lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb,
lib/concurrent-ruby-edge/concurrent/actor/utils/ad_hoc.rb,
lib/concurrent-ruby-edge/concurrent/channel/buffer/base.rb,
lib/concurrent-ruby-edge/concurrent/actor/utils/balancer.rb,
lib/concurrent-ruby-edge/concurrent/channel/buffer/timer.rb,
lib/concurrent-ruby-edge/concurrent/edge/lock_free_queue.rb,
lib/concurrent-ruby-edge/concurrent/actor/utils/broadcast.rb,
lib/concurrent-ruby-edge/concurrent/channel/buffer/ticker.rb,
lib/concurrent-ruby-edge/concurrent/edge/processing_actor.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/awaits.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/buffer.rb,
lib/concurrent-ruby-edge/concurrent/channel/buffer/sliding.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/linking.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/pausing.rb,
lib/concurrent-ruby-edge/concurrent/channel/buffer/buffered.rb,
lib/concurrent-ruby-edge/concurrent/channel/buffer/dropping.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/abstract.rb,
lib/concurrent-ruby-edge/concurrent/actor/public_delegations.rb,
lib/concurrent-ruby-edge/concurrent/channel/buffer/unbuffered.rb,
lib/concurrent-ruby-edge/concurrent/edge/lock_free_linked_set.rb,
lib/concurrent-ruby-edge/concurrent/actor/internal_delegations.rb,
lib/concurrent-ruby-edge/concurrent/executor/wrapping_executor.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/supervising.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/termination.rb,
lib/concurrent-ruby-edge/concurrent/channel/selector/put_clause.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/sets_results.rb,
lib/concurrent-ruby-edge/concurrent/channel/selector/take_clause.rb,
lib/concurrent-ruby-edge/concurrent/edge/old_channel_integration.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/removes_child.rb,
lib/concurrent-ruby-edge/concurrent/channel/selector/after_clause.rb,
lib/concurrent-ruby-edge/concurrent/channel/selector/error_clause.rb,
lib/concurrent-ruby-edge/concurrent/edge/lock_free_linked_set/node.rb,
lib/concurrent-ruby-edge/concurrent/channel/selector/default_clause.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/executes_context.rb,
lib/concurrent-ruby-edge/concurrent/edge/lock_free_linked_set/window.rb,
lib/concurrent-ruby-edge/concurrent/actor/default_dead_letter_handler.rb,
lib/concurrent-ruby-edge/concurrent/actor/behaviour/errors_on_unknown_message.rb +
+
Overview
Edge Features are under active development and may change frequently.
+ +-
+
- Deprecations are not added before incompatible changes. +
- Edge version: major is always 0, minor bump means incompatible change, +patch bump means compatible change. +
- Edge features may also lack tests and documentation. +
- Features developed in
concurrent-ruby-edge
are expected to move +toconcurrent-ruby
when finalised.
+
Defined Under Namespace
++ + + Modules: Actor, Async, Concern, Edge, ErlangActor, ImmutableStruct, MutableStruct, Promises, ReInclude, SettableStruct, Synchronization, Utility + + + + Classes: Agent, Array, Atom, AtomicBoolean, AtomicFixnum, AtomicMarkableReference, AtomicReference, CachedThreadPool, Cancellation, Channel, ConcurrentUpdateError, CountDownLatch, CyclicBarrier, Delay, Event, Exchanger, FixedThreadPool, Future, Hash, IVar, ImmediateExecutor, IndirectImmediateExecutor, LazyRegister, LockFreeStack, MVar, Map, Maybe, MultipleAssignmentError, MultipleErrors, ProcessingActor, Promise, ReadWriteLock, ReentrantReadWriteLock, SafeTaskExecutor, ScheduledTask, Semaphore, SerializedExecution, SerializedExecutionDelegator, Set, SimpleExecutorService, SingleThreadExecutor, TVar, ThreadLocalVar, ThreadPoolExecutor, Throttle, TimerSet, TimerTask, Transaction, Tuple, WrappingExecutor + + +
+ + ++ Constant Summary + collapse +
+ +-
+
+
- Error = + + +
Class.new(StandardError)
+
+ - ConfigurationError =
+ ++ +++
Raised when errors occur during configuration.
+ + +
+ Class.new(Error)
+
+ - CancelledOperationError =
+ ++ +++
Raised when an asynchronous operation is cancelled before execution.
+ + +
+ Class.new(Error)
+
+ - LifecycleError =
+ ++ +++
Raised when a lifecycle method (such as
+ + +stop
) is called in an improper +sequence or when the object is in an inappropriate state.
+ Class.new(Error)
+
+ - ImmutabilityError =
+ ++ +++
Raised when an attempt is made to violate an immutability guarantee.
+ + +
+ Class.new(Error)
+
+ - IllegalOperationError =
+ ++ +++
Raised when an operation is attempted which is not legal given the +receiver's current state
+ + +
+ Class.new(Error)
+
+ - InitializationError =
+ ++ +++
Raised when an object's methods are called when it has not been +properly initialized.
+ + +
+ Class.new(Error)
+
+ - MaxRestartFrequencyError =
+ ++ +++
Raised when an object with a start/stop lifecycle has been started an +excessive number of times. Often used in conjunction with a restart +policy or strategy.
+ + +
+ Class.new(Error)
+
+ - RejectedExecutionError =
+ ++ +++
Raised by an
+ + +Executor
when it is unable to process a given task, +possibly because of a reject policy or other internal error.
+ Class.new(Error)
+
+ - ResourceLimitError =
+ ++ +++
Raised when any finite resource, such as a lock counter, exceeds its +maximum limit/threshold.
+ + +
+ Class.new(Error)
+
+ - TimeoutError =
+ ++ +++
Raised when an operation times out.
+ + +
+ Class.new(Error)
+
+ - PromiseExecutionError = + + +
Class.new(StandardError)
+
+ - VERSION = + + +
'1.1.10'
+
+ - NULL_LOGGER =
+ ++ +++
Suppresses all output when used for logging.
+ + +
+ lambda { |level, progname, = nil, &block| }
+
+ - EDGE_VERSION =
+ ++ ++ + ++
+
'0.6.0'
+
+
+ Class Method Summary + collapse +
+ +-
+
+
-
+
+
+ .abort_transaction ⇒ undocumented
+
+
+
+
+
+
+
+
+
+
+
+
+
+ + +
Abort a currently running transaction - see
+Concurrent::atomically
.
+
+
+ -
+
+
+ .atomically ⇒ undocumented
+
+
+
+
+
+
+
+
+
+
+
+
+
+ + +
Run a block that reads and writes
+TVar
s as a single atomic transaction.
+
+
+ - + + + .call_dataflow(method, executor, *inputs, &block) ⇒ undocumented + + + + + + + + + + + + + + + + + + +
-
+
+
+ .create_simple_logger(level = Logger::FATAL, output = $stderr) ⇒ Logger
+
+
+
+
+
+
+
+
+
+
+
+
+
+ + +
Logger with provided level and output.
+
+
+
+ - + + + .create_stdlib_logger(level = Logger::FATAL, output = $stderr) ⇒ Logger + + + + + + + + + + deprecated + + + + Deprecated. + + + + +
-
+
+
+ .dataflow(*inputs) {|inputs| ... } ⇒ Object
+
+
+
+
+
+
+
+
+
+
+
+
+
+ + +
Dataflow allows you to create a task that will be scheduled when all of its data dependencies are available.
+
+
+
+ - + + + .dataflow!(*inputs, &block) ⇒ undocumented + + + + + + + + + + + + + + + + + + +
- + + + .dataflow_with(executor, *inputs, &block) ⇒ undocumented + + + + + + + + + + + + + + + + + + +
- + + + .dataflow_with!(executor, *inputs, &block) ⇒ undocumented + + + + + + + + + + + + + + + + + + +
-
+
+
+ .disable_at_exit_handlers! ⇒ undocumented
+
+
+
+
+
+
+
+
+
+ deprecated
+
+
+
+ Deprecated. + +
Has no effect since it is no longer needed, see https://github.com/ruby-concurrency/concurrent-ruby/pull/841.
+
+
+
+ -
+
+
+ .executor(executor_identifier) ⇒ Executor
+
+
+
+
+
+
+
+
+
+
+
+
+
+ + +
General access point to global executors.
+
+
+
+ -
+
+
+ .global_fast_executor ⇒ ThreadPoolExecutor
+
+
+
+
+
+
+
+
+
+
+
+
+
+ + +
Global thread pool optimized for short, fast operations.
+
+
+
+ - + + + .global_immediate_executor ⇒ undocumented + + + + + + + + + + + + + + + + + + +
-
+
+
+ .global_io_executor ⇒ ThreadPoolExecutor
+
+
+
+
+
+
+
+
+
+
+
+
+
+ + +
Global thread pool optimized for long, blocking (IO) tasks.
+
+
+
+ - + + + .global_logger ⇒ undocumented + + + + + + + + + + + + + + + + + + +
- + + + .global_logger=(value) ⇒ undocumented + + + + + + + + + + + + + + + + + + +
-
+
+
+ .global_timer_set ⇒ Concurrent::TimerSet
+
+
+
+
+
+
+
+
+
+
+
+
+
+ + +
Global thread pool user for global timers.
+
+
+
+ -
+
+
+ .leave_transaction ⇒ undocumented
+
+
+
+
+
+
+
+
+
+
+
+
+
+ + +
Leave a transaction without committing or aborting - see
+Concurrent::atomically
.
+
+
+ - + + + .monotonic_time(unit = :float_second) ⇒ undocumented + + + + + + + + + + + + + + + + + + +
- + + + .new_fast_executor(opts = {}) ⇒ undocumented + + + + + + + + + + + + + + + + + + +
- + + + .new_io_executor(opts = {}) ⇒ undocumented + + + + + + + + + + + + + + + + + + +
- + + + .physical_processor_count ⇒ undocumented + + + + + + + + + + + + + + + + + + +
- + + + .processor_count ⇒ undocumented + + + + + + + + + + + + + + + + + + +
-
+
+
+ .use_simple_logger(level = Logger::FATAL, output = $stderr) ⇒ undocumented
+
+
+
+
+
+
+
+
+
+
+
+
+
+ + +
Use logger created by #create_simple_logger to log concurrent-ruby messages.
+
+
+
+ - + + + .use_stdlib_logger(level = Logger::FATAL, output = $stderr) ⇒ undocumented + + + + + + + + + + deprecated + + + + Deprecated. + + + + +
+ Instance Method Summary + collapse +
+ +-
+
+
-
+
+
+ #exchange(value, timeout = nil) ⇒ Object
+
+
+
+
+
+
+
+
+
+
+
+
+
+ + +
Waits for another thread to arrive at this exchange point (unless the current thread is interrupted), and then transfers the given object to it, receiving its object in return.
+
+
+
+ -
+
+
+ #exchange!(value, timeout = nil) ⇒ Object
+
+
+
+
+
+
+
+
+
+
+
+
+
+ + +
Waits for another thread to arrive at this exchange point (unless the current thread is interrupted), and then transfers the given object to it, receiving its object in return.
+
+
+
+ -
+
+
+ #initialize(opts = {}) ⇒ undocumented
+
+
+
+
+
+
+
+
+
+
+
+
+
+ + +
Create a new thread pool.
+
+
+
+ -
+
+
+ #try_exchange(value, timeout = nil) ⇒ Concurrent::Maybe
+
+
+
+
+
+
+
+
+
+
+
+
+
+ + +
Waits for another thread to arrive at this exchange point (unless the current thread is interrupted), and then transfers the given object to it, receiving its object in return.
+
+
+
+
Class Method Details
+ + ++ + .abort_transaction ⇒ undocumented + + + + + +
Abort a currently running transaction - see Concurrent::atomically
.
+ + + +139 +140 +141+ |
+
+ # File 'lib/concurrent-ruby/concurrent/tvar.rb', line 139 + +def abort_transaction + raise Transaction::AbortError.new +end+ |
+
+ + .atomically ⇒ undocumented + + + + + +
Run a block that reads and writes TVar
s as a single atomic transaction.
+With respect to the value of TVar
objects, the transaction is atomic, in
+that it either happens or it does not, consistent, in that the TVar
+objects involved will never enter an illegal state, and isolated, in that
+transactions never interfere with each other. You may recognise these
+properties from database transactions.
There are some very important and unusual semantics that you must be aware of:
+ +-
+
Most importantly, the block that you pass to atomically may be executed +more than once. In most cases your code should be free of +side-effects, except for via TVar.
+If an exception escapes an atomically block it will abort the transaction.
+It is undefined behaviour to use callcc or Fiber with atomically.
+If you create a new thread within an atomically, it will not be part of +the transaction. Creating a thread counts as a side-effect.
+
Transactions within transactions are flattened to a single transaction.
+ + +
+ + + +82 +83 +84 +85 +86 +87 +88 +89 +90 +91 +92 +93 +94 +95 +96 +97 +98 +99 +100 +101 +102 +103 +104 +105 +106 +107 +108 +109 +110 +111 +112 +113 +114 +115 +116 +117 +118 +119 +120 +121 +122 +123 +124 +125 +126 +127 +128 +129 +130 +131 +132 +133 +134 +135 +136+ |
+
+ # File 'lib/concurrent-ruby/concurrent/tvar.rb', line 82 + +def atomically + raise ArgumentError.new('no block given') unless block_given? + + # Get the current transaction + + transaction = Transaction::current + + # Are we not already in a transaction (not nested)? + + if transaction.nil? + # New transaction + + begin + # Retry loop + + loop do + + # Create a new transaction + + transaction = Transaction.new + Transaction::current = transaction + + # Run the block, aborting on exceptions + + begin + result = yield + rescue Transaction::AbortError => e + transaction.abort + result = Transaction::ABORTED + rescue Transaction::LeaveError => e + transaction.abort + break result + rescue => e + transaction.abort + raise e + end + # If we can commit, break out of the loop + + if result != Transaction::ABORTED + if transaction.commit + break result + end + end + end + ensure + # Clear the current transaction + + Transaction::current = nil + end + else + # Nested transaction - flatten it and just run the block + + yield + end +end+ |
+
+ + .call_dataflow(method, executor, *inputs, &block) ⇒ undocumented + + + + + +
+ + + +56 +57 +58 +59 +60 +61 +62 +63 +64 +65 +66 +67 +68 +69 +70 +71 +72 +73 +74 +75 +76 +77 +78 +79+ |
+
+ # File 'lib/concurrent-ruby/concurrent/dataflow.rb', line 56 + +def call_dataflow(method, executor, *inputs, &block) + raise ArgumentError.new('an executor must be provided') if executor.nil? + raise ArgumentError.new('no block given') unless block_given? + unless inputs.all? { |input| input.is_a? IVar } + raise ArgumentError.new("Not all dependencies are IVars.\nDependencies: #{ inputs.inspect }") + end + + result = Future.new(executor: executor) do + values = inputs.map { |input| input.send(method) } + block.call(*values) + end + + if inputs.empty? + result.execute + else + counter = DependencyCounter.new(inputs.size) { result.execute } + + inputs.each do |input| + input.add_observer counter + end + end + + result +end+ |
+
+ + .create_simple_logger(level = Logger::FATAL, output = $stderr) ⇒ Logger + + + + + +
Returns Logger with provided level and output.
+ + +
+ + + +20 +21 +22 +23 +24 +25 +26 +27 +28 +29 +30 +31 +32 +33 +34 +35 +36 +37 +38 +39 +40 +41 +42 +43+ |
+
+ # File 'lib/concurrent-ruby/concurrent/configuration.rb', line 20 + +def self.create_simple_logger(level = Logger::FATAL, output = $stderr) + # TODO (pitr-ch 24-Dec-2016): figure out why it had to be replaced, stdlogger was deadlocking + lambda do |severity, progname, = nil, &block| + return false if severity < level + + = block ? block.call : + = case + when String + + when Exception + format "%s (%s)\n%s", + ., .class, (.backtrace || []).join("\n") + else + .inspect + end + + output.print format "[%s] %5s -- %s: %s\n", + Time.now.strftime('%Y-%m-%d %H:%M:%S.%L'), + Logger::SEV_LABEL[severity], + progname, + + true + end +end+ |
+
+ + .create_stdlib_logger(level = Logger::FATAL, output = $stderr) ⇒ Logger + + + + + +
Returns Logger with provided level and output.
+ + +
+ + + +52 +53 +54 +55 +56 +57 +58 +59 +60 +61 +62 +63 +64 +65 +66 +67 +68 +69 +70 +71 +72 +73 +74 +75+ |
+
+ # File 'lib/concurrent-ruby/concurrent/configuration.rb', line 52 + +def self.create_stdlib_logger(level = Logger::FATAL, output = $stderr) + logger = Logger.new(output) + logger.level = level + logger.formatter = lambda do |severity, datetime, progname, msg| + = case msg + when String + msg + when Exception + format "%s (%s)\n%s", + msg., msg.class, (msg.backtrace || []).join("\n") + else + msg.inspect + end + format "[%s] %5s -- %s: %s\n", + datetime.strftime('%Y-%m-%d %H:%M:%S.%L'), + severity, + progname, + + end + + lambda do |loglevel, progname, = nil, &block| + logger.add loglevel, , progname, &block + end +end+ |
+
+ + .dataflow(*inputs) {|inputs| ... } ⇒ Object + + + + + +
Dataflow allows you to create a task that will be scheduled when all of its data dependencies are available.
+Data dependencies are Future
values. The dataflow task itself is also a Future
value, so you can build up a graph of these tasks, each of which is run when all the data and other tasks it depends on are available or completed.
Our syntax is somewhat related to that of Akka's flow
and Habanero Java's DataDrivenFuture
. However unlike Akka we don't schedule a task at all until it is ready to run, and unlike Habanero Java we pass the data values into the task instead of dereferencing them again in the task.
The theory of dataflow goes back to the 70s. In the terminology of the literature, our implementation is coarse-grained, in that each task can be many instructions, and dynamic in that you can create more tasks within other tasks.
+ +Example
+ +A dataflow task is created with the dataflow
method, passing in a block.
task = Concurrent::dataflow { 14 }
+
+
+This produces a simple Future
value. The task will run immediately, as it has no dependencies. We can also specify Future
values that must be available before a task will run. When we do this we get the value of those futures passed to our block.
a = Concurrent::dataflow { 1 }
+b = Concurrent::dataflow { 2 }
+c = Concurrent::dataflow(a, b) { |av, bv| av + bv }
+
+
+Using the dataflow
method you can build up a directed acyclic graph (DAG) of tasks that depend on each other, and have the tasks run as soon as their dependencies are ready and there is CPU capacity to schedule them. This can help you create a program that uses more of the CPU resources available to you.
Derivation
+ +This section describes how we could derive dataflow from other primitives in this library.
+ +Consider a naive fibonacci calculator.
+ +def fib(n)
+ if n < 2
+ n
+ else
+ fib(n - 1) + fib(n - 2)
+ end
+end
+
+puts fib(14) #=> 377
+
+
+We could modify this to use futures.
+ +def fib(n)
+ if n < 2
+ Concurrent::Future.new { n }
+ else
+ n1 = fib(n - 1).execute
+ n2 = fib(n - 2).execute
+ Concurrent::Future.new { n1.value + n2.value }
+ end
+end
+
+f = fib(14) #=> #f.execute #=> #
+sleep(0.5)
+
+puts f.value #=> 377
+
+
+One of the drawbacks of this approach is that all the futures start, and then most of them immediately block on their dependencies. We know that there's no point executing those futures until their dependencies are ready, so let's not execute each future until all their dependencies are ready.
+ +To do this we'll create an object that counts the number of times it observes a future finishing before it does something - and for us that something will be to execute the next future.
+ +class CountingObserver
+
+ def initialize(count, &block)
+ @count = count
+ @block = block
+ end
+
+ def update(time, value, reason)
+ @count -= 1
+
+ if @count <= 0
+ @block.call()
+ end
+ end
+
+end
+
+def fib(n)
+ if n < 2
+ Concurrent::Future.new { n }.execute
+ else
+ n1 = fib(n - 1)
+ n2 = fib(n - 2)
+
+ result = Concurrent::Future.new { n1.value + n2.value }
+
+ = CountingObserver.new(2) { result.execute }
+ n1.add_observer
+ n2.add_observer
+
+ n1.execute
+ n2.execute
+
+ result
+ end
+end
+
+
+We can wrap this up in a dataflow utility.
+ +f = fib(14) #=> #sleep(0.5)
+
+puts f.value #=> 377
+
+def dataflow(*inputs, &block)
+ result = Concurrent::Future.new(&block)
+
+ if inputs.empty?
+ result.execute
+ else
+ = CountingObserver.new(inputs.size) { result.execute }
+
+ inputs.each do |input|
+ input.add_observer
+ end
+ end
+
+ result
+end
+
+def fib(n)
+ if n < 2
+ dataflow { n }
+ else
+ n1 = fib(n - 1)
+ n2 = fib(n - 2)
+ dataflow(n1, n2) { n1.value + n2.value }
+ end
+end
+
+f = fib(14) #=> #sleep(0.5)
+
+puts f.value #=> 377
+
+
+Since we know that the futures the dataflow computation depends on are already going to be available when the future is executed, we might as well pass the values into the block so we don't have to reference the futures inside the block. This allows us to write the dataflow block as straight non-concurrent code without reference to futures.
+ +def dataflow(*inputs, &block)
+ result = Concurrent::Future.new do
+ values = inputs.map { |input| input.value }
+ block.call(*values)
+ end
+
+ if inputs.empty?
+ result.execute
+ else
+ = CountingObserver.new(inputs.size) { result.execute }
+
+ inputs.each do |input|
+ input.add_observer
+ end
+ end
+
+ result
+end
+
+def fib(n)
+ if n < 2
+ Concurrent::dataflow { n }
+ else
+ n1 = fib(n - 1)
+ n2 = fib(n - 2)
+ Concurrent::dataflow(n1, n2) { |v1, v2| v1 + v2 }
+ end
+end
+
+f = fib(14) #=> #sleep(0.5)
+
+puts f.value #=> 377
+
+
+
+
+
+ + + +34 +35 +36+ |
+
+ # File 'lib/concurrent-ruby/concurrent/dataflow.rb', line 34 + +def dataflow(*inputs, &block) + dataflow_with(Concurrent.global_io_executor, *inputs, &block) +end+ |
+
+ + .dataflow!(*inputs, &block) ⇒ undocumented + + + + + +
+ + + +44 +45 +46+ |
+
+ # File 'lib/concurrent-ruby/concurrent/dataflow.rb', line 44 + +def dataflow!(*inputs, &block) + dataflow_with!(Concurrent.global_io_executor, *inputs, &block) +end+ |
+
+ + .dataflow_with(executor, *inputs, &block) ⇒ undocumented + + + + + +
+ + + +39 +40 +41+ |
+
+ # File 'lib/concurrent-ruby/concurrent/dataflow.rb', line 39 + +def dataflow_with(executor, *inputs, &block) + call_dataflow(:value, executor, *inputs, &block) +end+ |
+
+ + .dataflow_with!(executor, *inputs, &block) ⇒ undocumented + + + + + +
+ + + +49 +50 +51+ |
+
+ # File 'lib/concurrent-ruby/concurrent/dataflow.rb', line 49 + +def dataflow_with!(executor, *inputs, &block) + call_dataflow(:value!, executor, *inputs, &block) +end+ |
+
+ + .disable_at_exit_handlers! ⇒ undocumented + + + + + +
Has no effect since it is no longer needed, see https://github.com/ruby-concurrency/concurrent-ruby/pull/841.
+this option should be needed only because of at_exit
ordering
+issues which may arise when running some of the testing frameworks.
+E.g. Minitest's test-suite runs itself in at_exit
callback which
+executes after the pools are already terminated. Then auto termination
+needs to be disabled and called manually after test-suite ends.
This method should never be called +from within a gem. It should only be used from within the main +application and even then it should be used only when necessary.
+Disables AtExit handlers including pool auto-termination handlers.
+When disabled it will be the application programmer's responsibility
+to ensure that the handlers are shutdown properly prior to application
+exit by calling AtExit.run
method.
+ + + +131 +132 +133+ |
+
+ # File 'lib/concurrent-ruby/concurrent/configuration.rb', line 131 + +def self.disable_at_exit_handlers! + deprecated "Method #disable_at_exit_handlers! has no effect since it is no longer needed, see https://github.com/ruby-concurrency/concurrent-ruby/pull/841." +end+ |
+
+ + .executor(executor_identifier) ⇒ Executor + + + + + +
General access point to global executors.
+ + +
+ + + +166 +167 +168+ |
+
+ # File 'lib/concurrent-ruby/concurrent/configuration.rb', line 166 + +def self.executor(executor_identifier) + Options.executor(executor_identifier) +end+ |
+
+ + .global_fast_executor ⇒ ThreadPoolExecutor + + + + + +
Global thread pool optimized for short, fast operations.
+ + +
+ + + +138 +139 +140+ |
+
+ # File 'lib/concurrent-ruby/concurrent/configuration.rb', line 138 + +def self.global_fast_executor + GLOBAL_FAST_EXECUTOR.value +end+ |
+
+ + .global_immediate_executor ⇒ undocumented + + + + + +
+ + + +149 +150 +151+ |
+
+ # File 'lib/concurrent-ruby/concurrent/configuration.rb', line 149 + +def self.global_immediate_executor + GLOBAL_IMMEDIATE_EXECUTOR +end+ |
+
+ + .global_io_executor ⇒ ThreadPoolExecutor + + + + + +
Global thread pool optimized for long, blocking (IO) tasks.
+ + +
+ + + +145 +146 +147+ |
+
+ # File 'lib/concurrent-ruby/concurrent/configuration.rb', line 145 + +def self.global_io_executor + GLOBAL_IO_EXECUTOR.value +end+ |
+
+ + .global_logger ⇒ undocumented + + + + + +
+ + + +92 +93 +94+ |
+
+ # File 'lib/concurrent-ruby/concurrent/configuration.rb', line 92 + +def self.global_logger + GLOBAL_LOGGER.value +end+ |
+
+ + .global_logger=(value) ⇒ undocumented + + + + + +
+ + + +96 +97 +98+ |
+
+ # File 'lib/concurrent-ruby/concurrent/configuration.rb', line 96 + +def self.global_logger=(value) + GLOBAL_LOGGER.value = value +end+ |
+
+ + .global_timer_set ⇒ Concurrent::TimerSet + + + + + +
Global thread pool user for global timers.
+ + +
+ + + +156 +157 +158+ |
+
+ # File 'lib/concurrent-ruby/concurrent/configuration.rb', line 156 + +def self.global_timer_set + GLOBAL_TIMER_SET.value +end+ |
+
+ + .leave_transaction ⇒ undocumented + + + + + +
Leave a transaction without committing or aborting - see Concurrent::atomically
.
+ + + +144 +145 +146+ |
+
+ # File 'lib/concurrent-ruby/concurrent/tvar.rb', line 144 + +def leave_transaction + raise Transaction::LeaveError.new +end+ |
+
+ + .monotonic_time(unit = :float_second) ⇒ undocumented + + + + + +
+ + + +19 +20 +21+ |
+
+ # File 'lib/concurrent-ruby/concurrent/utility/monotonic_time.rb', line 19 + +def monotonic_time(unit = :float_second) + Process.clock_gettime(Process::CLOCK_MONOTONIC, unit) +end+ |
+
+ + .new_fast_executor(opts = {}) ⇒ undocumented + + + + + +
+ + + +170 +171 +172 +173 +174 +175 +176 +177 +178 +179+ |
+
+ # File 'lib/concurrent-ruby/concurrent/configuration.rb', line 170 + +def self.new_fast_executor(opts = {}) + FixedThreadPool.new( + [2, Concurrent.processor_count].max, + auto_terminate: opts.fetch(:auto_terminate, true), + idletime: 60, # 1 minute + max_queue: 0, # unlimited + fallback_policy: :abort, # shouldn't matter -- 0 max queue + name: "fast" + ) +end+ |
+
+ + .new_io_executor(opts = {}) ⇒ undocumented + + + + + +
+ + + +181 +182 +183 +184 +185 +186 +187+ |
+
+ # File 'lib/concurrent-ruby/concurrent/configuration.rb', line 181 + +def self.new_io_executor(opts = {}) + CachedThreadPool.new( + auto_terminate: opts.fetch(:auto_terminate, true), + fallback_policy: :abort, # shouldn't matter -- 0 max queue + name: "io" + ) +end+ |
+
+ + .physical_processor_count ⇒ undocumented + + + + + +
+ + + +127 +128 +129+ |
+
+ # File 'lib/concurrent-ruby/concurrent/utility/processor_counter.rb', line 127 + +def self.physical_processor_count + processor_counter.physical_processor_count +end+ |
+
+ + .processor_count ⇒ undocumented + + + + + +
+ + + +123 +124 +125+ |
+
+ # File 'lib/concurrent-ruby/concurrent/utility/processor_counter.rb', line 123 + +def self.processor_count + processor_counter.processor_count +end+ |
+
+ + .use_simple_logger(level = Logger::FATAL, output = $stderr) ⇒ undocumented + + + + + +
Use logger created by #create_simple_logger to log concurrent-ruby messages.
+ + +
+ + + +46 +47 +48+ |
+
+ # File 'lib/concurrent-ruby/concurrent/configuration.rb', line 46 + +def self.use_simple_logger(level = Logger::FATAL, output = $stderr) + Concurrent.global_logger = create_simple_logger level, output +end+ |
+
+ + .use_stdlib_logger(level = Logger::FATAL, output = $stderr) ⇒ undocumented + + + + + +
Use logger created by #create_stdlib_logger to log concurrent-ruby messages.
+ + +
+ + + +79 +80 +81+ |
+
+ # File 'lib/concurrent-ruby/concurrent/configuration.rb', line 79 + +def self.use_stdlib_logger(level = Logger::FATAL, output = $stderr) + Concurrent.global_logger = create_stdlib_logger level, output +end+ |
+
Instance Method Details
+ + ++ + #exchange(value, timeout = nil) ⇒ Object + + + + + +
Waits for another thread to arrive at this exchange point (unless the
+current thread is interrupted), and then transfers the given object to
+it, receiving its object in return. The timeout value indicates the
+approximate number of seconds the method should block while waiting
+for the exchange. When the timeout value is nil
the method will
+block indefinitely.
In some edge cases when a timeout
is given a return value of nil
may be
+ambiguous. Specifically, if nil
is a valid value in the exchange it will
+be impossible to tell whether nil
is the actual return value or if it
+signifies timeout. When nil
is a valid value in the exchange consider
+using #exchange! or #try_exchange instead.
+ + + ++ |
+
+ # File 'lib/concurrent-ruby/concurrent/exchanger.rb', line 340
+
+
+ |
+
+ + #exchange!(value, timeout = nil) ⇒ Object + + + + + +
Waits for another thread to arrive at this exchange point (unless the
+current thread is interrupted), and then transfers the given object to
+it, receiving its object in return. The timeout value indicates the
+approximate number of seconds the method should block while waiting
+for the exchange. When the timeout value is nil
the method will
+block indefinitely.
On timeout a TimeoutError exception will be raised.
+ + +
+ + + ++ |
+
+ # File 'lib/concurrent-ruby/concurrent/exchanger.rb', line 344
+
+
+ |
+
+ + #initialize(opts = {}) ⇒ undocumented + + + + + +
Create a new thread pool.
+ + +
+ + + ++ |
+
+ # File 'lib/concurrent-ruby/concurrent/exchanger.rb', line 337
+
+
+ |
+
+ + #try_exchange(value, timeout = nil) ⇒ Concurrent::Maybe + + + + + +
Waits for another thread to arrive at this exchange point (unless the
+current thread is interrupted), and then transfers the given object to
+it, receiving its object in return. The timeout value indicates the
+approximate number of seconds the method should block while waiting
+for the exchange. When the timeout value is nil
the method will
+block indefinitely.
The return value will be a Maybe set to Just
on success or
+Nothing
on timeout.
+ + + ++ |
+
+ # File 'lib/concurrent-ruby/concurrent/exchanger.rb', line 348
+
+
+ |
+