Skip to content

Commit

Permalink
Concurrent API update. (JetBrains#1949)
Browse files Browse the repository at this point in the history
  • Loading branch information
olonho authored Aug 29, 2018
1 parent 63cf677 commit f47fe79
Show file tree
Hide file tree
Showing 42 changed files with 586 additions and 416 deletions.
17 changes: 8 additions & 9 deletions CONCURRENCY.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
each mutable object is owned by the single worker, but ownership could be transferred.
See section [Object transfer and freezing](#transfer).

Once worker is started with `startWorker` function call, it can be uniquely addressed with an integer
Once worker is started with `Worker.start` function call, it can be uniquely addressed with an integer
worker id. Other workers, or non-worker concurrency primitives, such as OS threads, could send a message
to the worker with `schedule` call.
to the worker with `execute` call.
```kotlin
val future = schedule(TransferMode.CHECKED, { SomeDataForWorker() }) {
val future = execute(TransferMode.SAFE, { SomeDataForWorker() }) {
// data returned by the second function argument comes to the
// worker routine as 'input' parameter.
input ->
Expand All @@ -36,19 +36,18 @@
// Here we see result returned from routine above. Note that future object or
// id could be transferred to another worker, so we don't have to consume future
// in same execution context it was obtained.
result ->
println("result is $result")
result -> println("result is $result")
}
```
The call to `schedule` uses function passed as its second parameter to produce an object subgraph
The call to `execute` uses function passed as its second parameter to produce an object subgraph
(i.e. set of mutually referring objects) which is passed as the whole to that worker, and no longer
available to the thread that initiated the request. This property is checked if the first parameter
is `TransferMode.CHECKED` by graph traversal and just assumed to be true, if it is `TransferMode.UNCHECKED`.
Last parameter to schedule is a special Kotlin lambda, which is not allowed to capture any state,
is `TransferMode.SAFE` by graph traversal and just assumed to be true, if it is `TransferMode.UNCHECKED`.
Last parameter to `execute` is a special Kotlin lambda, which is not allowed to capture any state,
and is actually invoked in target worker's context. Once processed, result is transferred to whoever consumes
the future, and is attached to object graph of that worker/thread.

If an object is transferred in `UNCHECKED` mode and is still accessible from multiple concurrent executors,
If an object is transferred in `UNSAFE` mode and is still accessible from multiple concurrent executors,
program will likely crash unexpectedly, so consider that last resort in optimizing, not a general purpose
mechanism.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@ internal class InteropBuiltIns(builtIns: KonanBuiltIns, vararg konanPrimitives:

val concurrentPackageScope = builtIns.builtInsModule.getPackage(FqName("kotlin.native.concurrent")).memberScope

val scheduleFunction = concurrentPackageScope.getContributedClass("Worker")
.unsubstitutedMemberScope.getContributedFunctions("schedule").single()
val executeFunction = concurrentPackageScope.getContributedClass("Worker")
.unsubstitutedMemberScope.getContributedFunctions("execute").single()

val scheduleImplFunction = concurrentPackageScope.getContributedFunctions("scheduleImpl").single()
val executeImplFunction = concurrentPackageScope.getContributedFunctions("executeImpl").single()

val signExtend = packageScope.getContributedFunctions("signExtend").single()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ internal class KonanSymbols(context: Context, val symbolTable: SymbolTable, val
) as ClassDescriptor
)

val scheduleImpl = symbolTable.referenceSimpleFunction(context.interopBuiltIns.scheduleImplFunction)
val executeImpl = symbolTable.referenceSimpleFunction(context.interopBuiltIns.executeImplFunction)

val areEqualByValue = context.getInternalFunctions("areEqualByValue").map {
symbolTable.referenceSimpleFunction(it)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import org.jetbrains.kotlin.ir.symbols.IrFunctionSymbol
import org.jetbrains.kotlin.ir.symbols.IrSimpleFunctionSymbol
import org.jetbrains.kotlin.ir.types.IrType
import org.jetbrains.kotlin.ir.types.isNullableAny
import org.jetbrains.kotlin.ir.types.isUnit
import org.jetbrains.kotlin.ir.util.simpleFunctions
import org.jetbrains.kotlin.ir.util.transformFlat
import org.jetbrains.kotlin.ir.visitors.IrElementTransformerVoid
Expand Down Expand Up @@ -59,7 +58,7 @@ internal class WorkersBridgesBuilding(val context: Context) : DeclarationContain
expression.transformChildrenVoid(this)

val descriptor = expression.descriptor.original
if (descriptor != interop.scheduleImplFunction)
if (descriptor != interop.executeImplFunction)
return expression

val job = expression.getValueArgument(3) as IrFunctionReference
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,7 @@ private class InteropTransformer(val context: Context, val irFile: IrFile) : IrB
typeArgumentsCount = 0)
}

interop.scheduleFunction -> {
interop.executeFunction -> {
val irCallableReference = unwrapStaticFunctionArgument(expression.getValueArgument(2)!!)

if (irCallableReference == null || irCallableReference.getArguments().isNotEmpty()) {
Expand All @@ -824,11 +824,11 @@ private class InteropTransformer(val context: Context, val irFile: IrFile) : IrB
val target = targetSymbol.descriptor
val jobPointer = IrFunctionReferenceImpl(
builder.startOffset, builder.endOffset,
symbols.scheduleImpl.owner.valueParameters[3].type,
symbols.executeImpl.owner.valueParameters[3].type,
targetSymbol, target,
typeArgumentsCount = 0)

builder.irCall(symbols.scheduleImpl).apply {
builder.irCall(symbols.executeImpl).apply {
putValueArgument(0, expression.dispatchReceiver)
putValueArgument(1, expression.getValueArgument(0))
putValueArgument(2, expression.getValueArgument(1))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,11 +324,11 @@ internal class ModuleDFGBuilder(val context: Context, val irModule: IrModuleFrag
expressions += expression
}

if (expression is IrCall && expression.symbol == scheduleImplSymbol) {
// Producer and job of scheduleImpl are called externally, we need to reflect this somehow.
if (expression is IrCall && expression.symbol == executeImplSymbol) {
// Producer and job of executeImpl are called externally, we need to reflect this somehow.
val producerInvocation = IrCallImpl(expression.startOffset, expression.endOffset,
scheduleImplProducerInvoke.returnType,
scheduleImplProducerInvoke.symbol, scheduleImplProducerInvoke.descriptor)
executeImplProducerInvoke.returnType,
executeImplProducerInvoke.symbol, executeImplProducerInvoke.descriptor)
producerInvocation.dispatchReceiver = expression.getValueArgument(2)
val jobFunctionReference = expression.getValueArgument(3) as? IrFunctionReference
?: error("A function reference expected")
Expand Down Expand Up @@ -403,13 +403,9 @@ internal class ModuleDFGBuilder(val context: Context, val irModule: IrModuleFrag
private val arraySetSymbol = context.ir.symbols.arraySet
private val createUninitializedInstanceSymbol = context.ir.symbols.createUninitializedInstance
private val initInstanceSymbol = context.ir.symbols.initInstance
private val scheduleImplSymbol = context.ir.symbols.scheduleImpl
private val scheduleImplProducerClassSymbol = context.ir.symbols.functions[0]
private val scheduleImplProducerParam = scheduleImplSymbol.descriptor.valueParameters[2].also {
assert(it.name.asString() == "producer")
assert(it.type.constructor.declarationDescriptor == scheduleImplProducerClassSymbol.descriptor)
}
private val scheduleImplProducerInvoke = scheduleImplProducerClassSymbol.owner.simpleFunctions()
private val executeImplSymbol = context.ir.symbols.executeImpl
private val executeImplProducerClassSymbol = context.ir.symbols.functions[0]
private val executeImplProducerInvoke = executeImplProducerClassSymbol.owner.simpleFunctions()
.single { it.name == OperatorNameConventions.INVOKE }

private inner class FunctionDFGBuilder(val expressionValuesExtractor: ExpressionValuesExtractor,
Expand Down
9 changes: 5 additions & 4 deletions backend.native/tests/runtime/basic/worker_random.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,28 @@ import kotlin.test.*
@Test
fun testRandomWorkers() {
val seed = getTimeMillis()
val workers = Array(5, { _ -> startWorker()})
val workers = Array(5, { _ -> Worker.start() })

val attempts = 3
val results = Array(attempts, { ArrayList<Int>() } )
for (attempt in 0 until attempts) {
Random.seed = seed
// Produce a list of random numbers in each worker
val futures = Array(workers.size, { workerIndex ->
workers[workerIndex].schedule(TransferMode.CHECKED, { workerIndex }) { input ->
workers[workerIndex].execute(TransferMode.SAFE, { workerIndex }) {
input ->
Array(10, { Random.nextInt() }).toList()
}
})
// Now collect all results into current attempt's list
val futureSet = futures.toSet()
for (i in 0 until futureSet.size) {
val ready = futureSet.waitForMultipleFutures(10000)
ready.forEach { results[attempt].addAll(it.result()) }
ready.forEach { results[attempt].addAll(it.result) }
}
}

workers.forEach {
it.requestTermination().consume { _ -> }
it.requestTermination().result
}
}
54 changes: 27 additions & 27 deletions backend.native/tests/runtime/workers/atomic0.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,54 +12,54 @@ import kotlin.native.concurrent.*
fun test1(workers: Array<Worker>) {
val atomic = AtomicInt(15)
val futures = Array(workers.size, { workerIndex ->
workers[workerIndex].schedule(TransferMode.CHECKED, { atomic }) {
input -> input.increment()
workers[workerIndex].execute(TransferMode.SAFE, { atomic }) {
input -> input.addAndGet(1)
}
})
futures.forEach {
it.result()
it.result
}
println(atomic.get())
println(atomic.value)
}

fun test2(workers: Array<Worker>) {
val atomic = AtomicInt(1)
val counter = AtomicInt(0)
val futures = Array(workers.size, { workerIndex ->
workers[workerIndex].schedule(TransferMode.CHECKED, { Triple(atomic, workerIndex, counter) }) {
workers[workerIndex].execute(TransferMode.SAFE, { Triple(atomic, workerIndex, counter) }) {
(place, index, result) ->
// Here we simulate mutex using [place] location to store tag of the current worker.
// When it is negative - worker executes exclusively.
val tag = index + 1
while (place.compareAndSwap(tag, -tag) != tag) {}
val ok1 = result.increment() == index + 1
val ok1 = result.addAndGet(1) == index + 1
// Now, let the next worker run.
val ok2 = place.compareAndSwap(-tag, tag + 1) == -tag
ok1 && ok2
}
})
futures.forEach {
assertEquals(it.result(), true)
assertEquals(it.result, true)
}
println(counter.get())
println(counter.value)
}

data class Data(val value: Int)

fun test3(workers: Array<Worker>) {
val common = AtomicReference<Data>()
val common = AtomicReference<Data?>(null)
val futures = Array(workers.size, { workerIndex ->
workers[workerIndex].schedule(TransferMode.CHECKED, { Pair(common, workerIndex) }) {
workers[workerIndex].execute(TransferMode.SAFE, { Pair(common, workerIndex) }) {
(place, index) ->
val mine = Data(index).freeze()
// Try to publish our own data, until successful, in a tight loop.
while (place.compareAndSwap(null, mine) != null) {}
while (!place.compareAndSet(null, mine)) {}
}
})
val seen = mutableSetOf<Data>()
for (i in 0 until workers.size) {
do {
val current = common.get()
val current = common.value
if (current != null && !seen.contains(current)) {
seen += current
// Let others publish.
Expand All @@ -69,7 +69,7 @@ fun test3(workers: Array<Worker>) {
} while (true)
}
futures.forEach {
it.result()
it.result
}
assertEquals(seen.size, workers.size)
}
Expand All @@ -79,33 +79,33 @@ fun test4() {
AtomicReference(Data(1))
}
assertFailsWith<InvalidMutabilityException> {
AtomicReference<Data>().compareAndSwap(null, Data(2))
AtomicReference<Data?>(null).compareAndSwap(null, Data(2))
}
}

fun test5() {
assertFailsWith<InvalidMutabilityException> {
AtomicReference<Data>().set(Data(2))
AtomicReference<Data?>(null).value = Data(2)
}
val ref = AtomicReference<Data>()
val ref = AtomicReference<Data?>(null)
val value = Data(3).freeze()
assertEquals(null, ref.get())
ref.set(value)
assertEquals(3, ref.get()!!.value)
assertEquals(null, ref.value)
ref.value = value
assertEquals(3, ref.value!!.value)
}

fun test6() {
val int = AtomicInt()
int.set(239)
assertEquals(239, int.get())
val long = AtomicLong()
long.set(239L)
assertEquals(239L, long.get())
val int = AtomicInt(0)
int.value = 239
assertEquals(239, int.value)
val long = AtomicLong(0)
long.value = 239L
assertEquals(239L, long.value)
}

@Test fun runTest() {
val COUNT = 20
val workers = Array(COUNT, { _ -> startWorker()})
val workers = Array(COUNT, { _ -> Worker.start()})

test1(workers)
test2(workers)
Expand All @@ -115,7 +115,7 @@ fun test6() {
test6()

workers.forEach {
it.requestTermination().consume { _ -> }
it.requestTermination().result
}
println("OK")
}
4 changes: 2 additions & 2 deletions backend.native/tests/runtime/workers/enum_identity.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ data class Foo(val kind: A)
// Enums are shared between threads so identity should be kept.
@Test
fun runTest() {
val result = startWorker().schedule(TransferMode.CHECKED, { Foo(A.B) }, { input ->
val result = Worker.start().execute(TransferMode.SAFE, { Foo(A.B) }, { input ->
input.kind == A.B
}).result()
}).result
println(result)
}
6 changes: 3 additions & 3 deletions backend.native/tests/runtime/workers/freeze0.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,19 @@ data class SharedDataMember(val double: Double)
data class SharedData(val string: String, val int: Int, val member: SharedDataMember)

@Test fun runTest() {
val worker = startWorker()
val worker = Worker.start()
// Create immutable shared data.
val immutable = SharedData("Hello", 10, SharedDataMember(0.1)).freeze()
println("frozen bit is ${immutable.isFrozen}")

val future = worker.schedule(TransferMode.CHECKED, { immutable } ) {
val future = worker.execute(TransferMode.SAFE, { immutable } ) {
input ->
println("Worker: $input")
input
}
future.consume {
result -> println("Main: $result")
}
worker.requestTermination().result()
worker.requestTermination().result
println("OK")
}
20 changes: 10 additions & 10 deletions backend.native/tests/runtime/workers/freeze2.kt
Original file line number Diff line number Diff line change
Expand Up @@ -59,32 +59,32 @@ data class Data(var int: Int)
assertFailsWith<InvalidMutabilityException> { a8[1] = 2.0 }

// Ensure that String and integral boxes are frozen by default, by passing local to the worker.
val worker = startWorker()
val worker = Worker.start()
var data: Any = "Hello" + " " + "world"
assert(data.isFrozen)
worker.schedule(TransferMode.CHECKED, { data } ) {
worker.execute(TransferMode.SAFE, { data } ) {
input -> println("Worker 1: $input")
}.result()
}.result

data = 42
assert(data.isFrozen)
worker.schedule(TransferMode.CHECKED, { data } ) {
worker.execute(TransferMode.SAFE, { data } ) {
input -> println("Worker2: $input")
}.result()
}.result

data = 239.0
assert(data.isFrozen)
worker.schedule(TransferMode.CHECKED, { data } ) {
worker.execute(TransferMode.SAFE, { data } ) {
input -> println("Worker3: $input")
}.result()
}.result

data = 'a'
assert(data.isFrozen)
worker.schedule(TransferMode.CHECKED, { data } ) {
worker.execute(TransferMode.SAFE, { data } ) {
input -> println("Worker4: $input")
}.result()
}.result

worker.requestTermination().result()
worker.requestTermination().result

println("OK")
}
Loading

0 comments on commit f47fe79

Please sign in to comment.