Skip to content

Commit

Permalink
Merge pull request #416 from ruby-concurrency/synchronization
Browse files Browse the repository at this point in the history
Synchronization updates
  • Loading branch information
Petr Chalupa committed Sep 16, 2015
2 parents 09a0eff + 7b03abb commit cd6a117
Show file tree
Hide file tree
Showing 55 changed files with 883 additions and 556 deletions.
190 changes: 120 additions & 70 deletions ext/com/concurrent_ruby/ext/SynchronizationLibrary.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.concurrent_ruby.ext;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;

import org.jruby.Ruby;
import org.jruby.RubyClass;
Expand All @@ -14,42 +13,79 @@
import org.jruby.runtime.load.Library;
import org.jruby.runtime.Block;
import org.jruby.runtime.Visibility;
import org.jruby.RubyBoolean;
import org.jruby.RubyNil;
import org.jruby.runtime.ThreadContext;
import org.jruby.util.unsafe.UnsafeHolder;

public class SynchronizationLibrary implements Library {

private static final ObjectAllocator JRUBYREFERENCE_ALLOCATOR = new ObjectAllocator() {
private static final ObjectAllocator JRUBY_OBJECT_ALLOCATOR = new ObjectAllocator() {
public IRubyObject allocate(Ruby runtime, RubyClass klazz) {
return new JavaObject(runtime, klazz);
return new JRubyObject(runtime, klazz);
}
};

private static final ObjectAllocator OBJECT_ALLOCATOR = new ObjectAllocator() {
public IRubyObject allocate(Ruby runtime, RubyClass klazz) {
return new Object(runtime, klazz);
}
};

private static final ObjectAllocator ABSTRACT_LOCKABLE_OBJECT_ALLOCATOR = new ObjectAllocator() {
public IRubyObject allocate(Ruby runtime, RubyClass klazz) {
return new AbstractLockableObject(runtime, klazz);
}
};

private static final ObjectAllocator JRUBY_LOCKABLE_OBJECT_ALLOCATOR = new ObjectAllocator() {
public IRubyObject allocate(Ruby runtime, RubyClass klazz) {
return new JRubyLockableObject(runtime, klazz);
}
};

public void load(Ruby runtime, boolean wrap) throws IOException {
RubyModule synchronizationModule = runtime.
defineModule("Concurrent").
defineModuleUnder("Synchronization");
RubyClass parentClass = synchronizationModule.getClass("AbstractObject");

if (parentClass == null)
throw runtime.newRuntimeError("Concurrent::Synchronization::AbstractObject is missing");
defineClass(runtime, synchronizationModule, "AbstractObject", "JRubyObject",
JRubyObject.class, JRUBY_OBJECT_ALLOCATOR);

defineClass(runtime, synchronizationModule, "JRubyObject", "Object",
Object.class, OBJECT_ALLOCATOR);

defineClass(runtime, synchronizationModule, "Object", "AbstractLockableObject",
AbstractLockableObject.class, ABSTRACT_LOCKABLE_OBJECT_ALLOCATOR);

defineClass(runtime, synchronizationModule, "AbstractLockableObject", "JRubyLockableObject",
JRubyLockableObject.class, JRUBY_LOCKABLE_OBJECT_ALLOCATOR);
}

private RubyClass defineClass(Ruby runtime, RubyModule namespace, String parentName, String name,
Class javaImplementation, ObjectAllocator allocator) {
final RubyClass parentClass = namespace.getClass(parentName);

RubyClass synchronizedObjectJavaClass =
synchronizationModule.defineClassUnder("JavaObject", parentClass, JRUBYREFERENCE_ALLOCATOR);
if (parentClass == null) {
System.out.println("not found " + parentName);
throw runtime.newRuntimeError(namespace.toString() + "::" + parentName + " is missing");
}

synchronizedObjectJavaClass.defineAnnotatedMethods(JavaObject.class);
final RubyClass newClass = namespace.defineClassUnder(name, parentClass, allocator);
newClass.defineAnnotatedMethods(javaImplementation);
return newClass;
}

@JRubyClass(name = "JavaObject", parent = "AbstractObject")
public static class JavaObject extends RubyObject {
// Facts:
// - all ivar reads are without any synchronisation of fences see
// https://github.com/jruby/jruby/blob/master/core/src/main/java/org/jruby/runtime/ivars/VariableAccessor.java#L110-110
// - writes depend on UnsafeHolder.U, null -> SynchronizedVariableAccessor, !null -> StampedVariableAccessor
// SynchronizedVariableAccessor wraps with synchronized block, StampedVariableAccessor uses fullFence or
// volatilePut

public static final long AN_VOLATILE_FIELD_OFFSET =
UnsafeHolder.fieldOffset(JavaObject.class, "anVolatileField");
private volatile int anVolatileField = 0;
@JRubyClass(name = "JRubyObject", parent = "AbstractObject")
public static class JRubyObject extends RubyObject {
private static volatile ThreadContext threadContext = null;

public JavaObject(Ruby runtime, RubyClass metaClass) {
public JRubyObject(Ruby runtime, RubyClass metaClass) {
super(runtime, metaClass);
}

Expand All @@ -58,6 +94,73 @@ public IRubyObject initialize(ThreadContext context) {
return this;
}

@JRubyMethod(name = "full_memory_barrier", visibility = Visibility.PRIVATE)
public IRubyObject fullMemoryBarrier(ThreadContext context) {
// Prevent reordering of ivar writes with publication of this instance
if (UnsafeHolder.U == null || !UnsafeHolder.SUPPORTS_FENCES) {
// Assuming that following volatile read and write is not eliminated it simulates fullFence.
// If it's eliminated it'll cause problems only on non-x86 platforms.
final ThreadContext oldContext = threadContext;
threadContext = context;
} else {
UnsafeHolder.fullFence();
}
return context.nil;
}

@JRubyMethod(name = "instance_variable_get_volatile", visibility = Visibility.PROTECTED)
public IRubyObject instanceVariableGetVolatile(ThreadContext context, IRubyObject name) {
// Ensure we ses latest value with loadFence
if (UnsafeHolder.U == null || !UnsafeHolder.SUPPORTS_FENCES) {
// piggybacking on volatile read, simulating loadFence
final ThreadContext oldContext = threadContext;
return instance_variable_get(context, name);
} else {
UnsafeHolder.loadFence();
return instance_variable_get(context, name);
}
}

@JRubyMethod(name = "instance_variable_set_volatile", visibility = Visibility.PROTECTED)
public IRubyObject InstanceVariableSetVolatile(ThreadContext context, IRubyObject name, IRubyObject value) {
// Ensure we make last update visible
if (UnsafeHolder.U == null || !UnsafeHolder.SUPPORTS_FENCES) {
// piggybacking on volatile write, simulating storeFence
final IRubyObject result = instance_variable_set(name, value);
threadContext = context;
return result;
} else {
// JRuby uses StampedVariableAccessor which calls fullFence
// so no additional steps needed.
// See https://github.com/jruby/jruby/blob/master/core/src/main/java/org/jruby/runtime/ivars/StampedVariableAccessor.java#L151-L159
return instance_variable_set(name, value);
}
}
}

@JRubyClass(name = "Object", parent = "JRubyObject")
public static class Object extends JRubyObject {

public Object(Ruby runtime, RubyClass metaClass) {
super(runtime, metaClass);
}
}

@JRubyClass(name = "AbstractLockableObject", parent = "Object")
public static class AbstractLockableObject extends Object {

public AbstractLockableObject(Ruby runtime, RubyClass metaClass) {
super(runtime, metaClass);
}
}

@JRubyClass(name = "JRubyLockableObject", parent = "AbstractLockableObject")
public static class JRubyLockableObject extends JRubyObject {

public JRubyLockableObject(Ruby runtime, RubyClass metaClass) {
super(runtime, metaClass);
}

@JRubyMethod(name = "synchronize", visibility = Visibility.PROTECTED)
public IRubyObject rubySynchronize(ThreadContext context, Block block) {
synchronized (this) {
Expand Down Expand Up @@ -108,58 +211,5 @@ public IRubyObject nsBroadcast(ThreadContext context) {
notifyAll();
return this;
}

@JRubyMethod(name = "ensure_ivar_visibility!", visibility = Visibility.PROTECTED)
public IRubyObject ensureIvarVisibilityBang(ThreadContext context) {
if (UnsafeHolder.U == null) {
// We are screwed
throw new UnsupportedOperationException();
} else if (UnsafeHolder.SUPPORTS_FENCES)
// We have to prevent ivar writes to reordered with storing of the final instance reference
// Therefore wee need a fullFence to prevent reordering in both directions.
UnsafeHolder.fullFence();
else {
// Assumption that this is not eliminated, if false it will break non x86 platforms.
UnsafeHolder.U.putIntVolatile(this, AN_VOLATILE_FIELD_OFFSET, 1);
UnsafeHolder.U.getIntVolatile(this, AN_VOLATILE_FIELD_OFFSET);
}
return context.nil;
}

@JRubyMethod(name = "instance_variable_get_volatile", visibility = Visibility.PROTECTED)
public IRubyObject instanceVariableGetVolatile(ThreadContext context, IRubyObject name) {
if (UnsafeHolder.U == null) {
// TODO: Possibly dangerous, there may be a deadlock on the this
synchronized (this) {
return instance_variable_get(context, name);
}
} else if (UnsafeHolder.SUPPORTS_FENCES) {
// ensure we see latest value
UnsafeHolder.loadFence();
return instance_variable_get(context, name);
} else {
UnsafeHolder.U.getIntVolatile(this, AN_VOLATILE_FIELD_OFFSET);
return instance_variable_get(context, name);
}
}

@JRubyMethod(name = "instance_variable_set_volatile", visibility = Visibility.PROTECTED)
public IRubyObject InstanceVariableSetVolatile(ThreadContext context, IRubyObject name, IRubyObject value) {
if (UnsafeHolder.U == null) {
// TODO: Possibly dangerous, there may be a deadlock on the this
synchronized (this) {
return instance_variable_set(name, value);
}
} else if (UnsafeHolder.SUPPORTS_FENCES) {
final IRubyObject result = instance_variable_set(name, value);
// ensure we make latest value visible
UnsafeHolder.storeFence();
return result;
} else {
final IRubyObject result = instance_variable_set(name, value);
UnsafeHolder.U.putIntVolatile(this, AN_VOLATILE_FIELD_OFFSET, 1);
return result;
}
}
}
}
2 changes: 1 addition & 1 deletion lib/concurrent/actor/context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ module Actor
# > {include:Actor::RestartingContext}
#
# Example of ac actor definition:
#
#
# {include:file:doc/actor/define.out.rb}
#
# See methods of {AbstractContext} what else can be tweaked, e.g {AbstractContext#default_reference_class}
Expand Down
2 changes: 1 addition & 1 deletion lib/concurrent/actor/core.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ module Actor
# @note Whole class should be considered private. An user should use {Context}s and {Reference}s only.
# @note devel: core should not block on anything, e.g. it cannot wait on children to terminate
# that would eat up all threads in task pool and deadlock
class Core < Synchronization::Object
class Core < Synchronization::LockableObject
include TypeCheck
include Concern::Logging

Expand Down
Loading

0 comments on commit cd6a117

Please sign in to comment.