diff --git a/CONCURRENCY.md b/CONCURRENCY.md index 4007237dbae..27ef68e7ab8 100644 --- a/CONCURRENCY.md +++ b/CONCURRENCY.md @@ -20,11 +20,11 @@ each mutable object is owned by the single worker, but ownership could be transferred. See section [Object transfer and freezing](#transfer). - Once worker is started with `startWorker` function call, it can be uniquely addressed with an integer + Once worker is started with `Worker.start` function call, it can be uniquely addressed with an integer worker id. Other workers, or non-worker concurrency primitives, such as OS threads, could send a message - to the worker with `schedule` call. + to the worker with `execute` call. ```kotlin - val future = schedule(TransferMode.CHECKED, { SomeDataForWorker() }) { + val future = execute(TransferMode.SAFE, { SomeDataForWorker() }) { // data returned by the second function argument comes to the // worker routine as 'input' parameter. input -> @@ -36,19 +36,18 @@ // Here we see result returned from routine above. Note that future object or // id could be transferred to another worker, so we don't have to consume future // in same execution context it was obtained. - result -> - println("result is $result") + result -> println("result is $result") } ``` - The call to `schedule` uses function passed as its second parameter to produce an object subgraph + The call to `execute` uses function passed as its second parameter to produce an object subgraph (i.e. set of mutually referring objects) which is passed as the whole to that worker, and no longer available to the thread that initiated the request. This property is checked if the first parameter - is `TransferMode.CHECKED` by graph traversal and just assumed to be true, if it is `TransferMode.UNCHECKED`. - Last parameter to schedule is a special Kotlin lambda, which is not allowed to capture any state, + is `TransferMode.SAFE` by graph traversal and just assumed to be true, if it is `TransferMode.UNCHECKED`. + Last parameter to `execute` is a special Kotlin lambda, which is not allowed to capture any state, and is actually invoked in target worker's context. Once processed, result is transferred to whoever consumes the future, and is attached to object graph of that worker/thread. - If an object is transferred in `UNCHECKED` mode and is still accessible from multiple concurrent executors, + If an object is transferred in `UNSAFE` mode and is still accessible from multiple concurrent executors, program will likely crash unexpectedly, so consider that last resort in optimizing, not a general purpose mechanism. diff --git a/backend.native/compiler/ir/backend.native/src/org/jetbrains/kotlin/backend/konan/InteropUtils.kt b/backend.native/compiler/ir/backend.native/src/org/jetbrains/kotlin/backend/konan/InteropUtils.kt index ff0c65dcfbb..bfbae3e1115 100644 --- a/backend.native/compiler/ir/backend.native/src/org/jetbrains/kotlin/backend/konan/InteropUtils.kt +++ b/backend.native/compiler/ir/backend.native/src/org/jetbrains/kotlin/backend/konan/InteropUtils.kt @@ -82,10 +82,10 @@ internal class InteropBuiltIns(builtIns: KonanBuiltIns, vararg konanPrimitives: val concurrentPackageScope = builtIns.builtInsModule.getPackage(FqName("kotlin.native.concurrent")).memberScope - val scheduleFunction = concurrentPackageScope.getContributedClass("Worker") - .unsubstitutedMemberScope.getContributedFunctions("schedule").single() + val executeFunction = concurrentPackageScope.getContributedClass("Worker") + .unsubstitutedMemberScope.getContributedFunctions("execute").single() - val scheduleImplFunction = concurrentPackageScope.getContributedFunctions("scheduleImpl").single() + val executeImplFunction = concurrentPackageScope.getContributedFunctions("executeImpl").single() val signExtend = packageScope.getContributedFunctions("signExtend").single() diff --git a/backend.native/compiler/ir/backend.native/src/org/jetbrains/kotlin/backend/konan/ir/Ir.kt b/backend.native/compiler/ir/backend.native/src/org/jetbrains/kotlin/backend/konan/ir/Ir.kt index e3805fed0ba..9022a1cd396 100644 --- a/backend.native/compiler/ir/backend.native/src/org/jetbrains/kotlin/backend/konan/ir/Ir.kt +++ b/backend.native/compiler/ir/backend.native/src/org/jetbrains/kotlin/backend/konan/ir/Ir.kt @@ -234,7 +234,7 @@ internal class KonanSymbols(context: Context, val symbolTable: SymbolTable, val ) as ClassDescriptor ) - val scheduleImpl = symbolTable.referenceSimpleFunction(context.interopBuiltIns.scheduleImplFunction) + val executeImpl = symbolTable.referenceSimpleFunction(context.interopBuiltIns.executeImplFunction) val areEqualByValue = context.getInternalFunctions("areEqualByValue").map { symbolTable.referenceSimpleFunction(it) diff --git a/backend.native/compiler/ir/backend.native/src/org/jetbrains/kotlin/backend/konan/lower/BridgesBuilding.kt b/backend.native/compiler/ir/backend.native/src/org/jetbrains/kotlin/backend/konan/lower/BridgesBuilding.kt index 4d1958ebf38..d6d41602563 100644 --- a/backend.native/compiler/ir/backend.native/src/org/jetbrains/kotlin/backend/konan/lower/BridgesBuilding.kt +++ b/backend.native/compiler/ir/backend.native/src/org/jetbrains/kotlin/backend/konan/lower/BridgesBuilding.kt @@ -27,7 +27,6 @@ import org.jetbrains.kotlin.ir.symbols.IrFunctionSymbol import org.jetbrains.kotlin.ir.symbols.IrSimpleFunctionSymbol import org.jetbrains.kotlin.ir.types.IrType import org.jetbrains.kotlin.ir.types.isNullableAny -import org.jetbrains.kotlin.ir.types.isUnit import org.jetbrains.kotlin.ir.util.simpleFunctions import org.jetbrains.kotlin.ir.util.transformFlat import org.jetbrains.kotlin.ir.visitors.IrElementTransformerVoid @@ -59,7 +58,7 @@ internal class WorkersBridgesBuilding(val context: Context) : DeclarationContain expression.transformChildrenVoid(this) val descriptor = expression.descriptor.original - if (descriptor != interop.scheduleImplFunction) + if (descriptor != interop.executeImplFunction) return expression val job = expression.getValueArgument(3) as IrFunctionReference diff --git a/backend.native/compiler/ir/backend.native/src/org/jetbrains/kotlin/backend/konan/lower/InteropLowering.kt b/backend.native/compiler/ir/backend.native/src/org/jetbrains/kotlin/backend/konan/lower/InteropLowering.kt index 3a418643874..d9b8f5c4953 100644 --- a/backend.native/compiler/ir/backend.native/src/org/jetbrains/kotlin/backend/konan/lower/InteropLowering.kt +++ b/backend.native/compiler/ir/backend.native/src/org/jetbrains/kotlin/backend/konan/lower/InteropLowering.kt @@ -810,7 +810,7 @@ private class InteropTransformer(val context: Context, val irFile: IrFile) : IrB typeArgumentsCount = 0) } - interop.scheduleFunction -> { + interop.executeFunction -> { val irCallableReference = unwrapStaticFunctionArgument(expression.getValueArgument(2)!!) if (irCallableReference == null || irCallableReference.getArguments().isNotEmpty()) { @@ -824,11 +824,11 @@ private class InteropTransformer(val context: Context, val irFile: IrFile) : IrB val target = targetSymbol.descriptor val jobPointer = IrFunctionReferenceImpl( builder.startOffset, builder.endOffset, - symbols.scheduleImpl.owner.valueParameters[3].type, + symbols.executeImpl.owner.valueParameters[3].type, targetSymbol, target, typeArgumentsCount = 0) - builder.irCall(symbols.scheduleImpl).apply { + builder.irCall(symbols.executeImpl).apply { putValueArgument(0, expression.dispatchReceiver) putValueArgument(1, expression.getValueArgument(0)) putValueArgument(2, expression.getValueArgument(1)) diff --git a/backend.native/compiler/ir/backend.native/src/org/jetbrains/kotlin/backend/konan/optimizations/DFGBuilder.kt b/backend.native/compiler/ir/backend.native/src/org/jetbrains/kotlin/backend/konan/optimizations/DFGBuilder.kt index 5ddb441acb8..04ebf48316e 100644 --- a/backend.native/compiler/ir/backend.native/src/org/jetbrains/kotlin/backend/konan/optimizations/DFGBuilder.kt +++ b/backend.native/compiler/ir/backend.native/src/org/jetbrains/kotlin/backend/konan/optimizations/DFGBuilder.kt @@ -324,11 +324,11 @@ internal class ModuleDFGBuilder(val context: Context, val irModule: IrModuleFrag expressions += expression } - if (expression is IrCall && expression.symbol == scheduleImplSymbol) { - // Producer and job of scheduleImpl are called externally, we need to reflect this somehow. + if (expression is IrCall && expression.symbol == executeImplSymbol) { + // Producer and job of executeImpl are called externally, we need to reflect this somehow. val producerInvocation = IrCallImpl(expression.startOffset, expression.endOffset, - scheduleImplProducerInvoke.returnType, - scheduleImplProducerInvoke.symbol, scheduleImplProducerInvoke.descriptor) + executeImplProducerInvoke.returnType, + executeImplProducerInvoke.symbol, executeImplProducerInvoke.descriptor) producerInvocation.dispatchReceiver = expression.getValueArgument(2) val jobFunctionReference = expression.getValueArgument(3) as? IrFunctionReference ?: error("A function reference expected") @@ -403,13 +403,9 @@ internal class ModuleDFGBuilder(val context: Context, val irModule: IrModuleFrag private val arraySetSymbol = context.ir.symbols.arraySet private val createUninitializedInstanceSymbol = context.ir.symbols.createUninitializedInstance private val initInstanceSymbol = context.ir.symbols.initInstance - private val scheduleImplSymbol = context.ir.symbols.scheduleImpl - private val scheduleImplProducerClassSymbol = context.ir.symbols.functions[0] - private val scheduleImplProducerParam = scheduleImplSymbol.descriptor.valueParameters[2].also { - assert(it.name.asString() == "producer") - assert(it.type.constructor.declarationDescriptor == scheduleImplProducerClassSymbol.descriptor) - } - private val scheduleImplProducerInvoke = scheduleImplProducerClassSymbol.owner.simpleFunctions() + private val executeImplSymbol = context.ir.symbols.executeImpl + private val executeImplProducerClassSymbol = context.ir.symbols.functions[0] + private val executeImplProducerInvoke = executeImplProducerClassSymbol.owner.simpleFunctions() .single { it.name == OperatorNameConventions.INVOKE } private inner class FunctionDFGBuilder(val expressionValuesExtractor: ExpressionValuesExtractor, diff --git a/backend.native/tests/runtime/basic/worker_random.kt b/backend.native/tests/runtime/basic/worker_random.kt index ab1d8721f70..f941d397c3e 100644 --- a/backend.native/tests/runtime/basic/worker_random.kt +++ b/backend.native/tests/runtime/basic/worker_random.kt @@ -14,7 +14,7 @@ import kotlin.test.* @Test fun testRandomWorkers() { val seed = getTimeMillis() - val workers = Array(5, { _ -> startWorker()}) + val workers = Array(5, { _ -> Worker.start() }) val attempts = 3 val results = Array(attempts, { ArrayList() } ) @@ -22,7 +22,8 @@ fun testRandomWorkers() { Random.seed = seed // Produce a list of random numbers in each worker val futures = Array(workers.size, { workerIndex -> - workers[workerIndex].schedule(TransferMode.CHECKED, { workerIndex }) { input -> + workers[workerIndex].execute(TransferMode.SAFE, { workerIndex }) { + input -> Array(10, { Random.nextInt() }).toList() } }) @@ -30,11 +31,11 @@ fun testRandomWorkers() { val futureSet = futures.toSet() for (i in 0 until futureSet.size) { val ready = futureSet.waitForMultipleFutures(10000) - ready.forEach { results[attempt].addAll(it.result()) } + ready.forEach { results[attempt].addAll(it.result) } } } workers.forEach { - it.requestTermination().consume { _ -> } + it.requestTermination().result } } \ No newline at end of file diff --git a/backend.native/tests/runtime/workers/atomic0.kt b/backend.native/tests/runtime/workers/atomic0.kt index 2b589b5ae22..17d55d23d5c 100644 --- a/backend.native/tests/runtime/workers/atomic0.kt +++ b/backend.native/tests/runtime/workers/atomic0.kt @@ -12,54 +12,54 @@ import kotlin.native.concurrent.* fun test1(workers: Array) { val atomic = AtomicInt(15) val futures = Array(workers.size, { workerIndex -> - workers[workerIndex].schedule(TransferMode.CHECKED, { atomic }) { - input -> input.increment() + workers[workerIndex].execute(TransferMode.SAFE, { atomic }) { + input -> input.addAndGet(1) } }) futures.forEach { - it.result() + it.result } - println(atomic.get()) + println(atomic.value) } fun test2(workers: Array) { val atomic = AtomicInt(1) val counter = AtomicInt(0) val futures = Array(workers.size, { workerIndex -> - workers[workerIndex].schedule(TransferMode.CHECKED, { Triple(atomic, workerIndex, counter) }) { + workers[workerIndex].execute(TransferMode.SAFE, { Triple(atomic, workerIndex, counter) }) { (place, index, result) -> // Here we simulate mutex using [place] location to store tag of the current worker. // When it is negative - worker executes exclusively. val tag = index + 1 while (place.compareAndSwap(tag, -tag) != tag) {} - val ok1 = result.increment() == index + 1 + val ok1 = result.addAndGet(1) == index + 1 // Now, let the next worker run. val ok2 = place.compareAndSwap(-tag, tag + 1) == -tag ok1 && ok2 } }) futures.forEach { - assertEquals(it.result(), true) + assertEquals(it.result, true) } - println(counter.get()) + println(counter.value) } data class Data(val value: Int) fun test3(workers: Array) { - val common = AtomicReference() + val common = AtomicReference(null) val futures = Array(workers.size, { workerIndex -> - workers[workerIndex].schedule(TransferMode.CHECKED, { Pair(common, workerIndex) }) { + workers[workerIndex].execute(TransferMode.SAFE, { Pair(common, workerIndex) }) { (place, index) -> val mine = Data(index).freeze() // Try to publish our own data, until successful, in a tight loop. - while (place.compareAndSwap(null, mine) != null) {} + while (!place.compareAndSet(null, mine)) {} } }) val seen = mutableSetOf() for (i in 0 until workers.size) { do { - val current = common.get() + val current = common.value if (current != null && !seen.contains(current)) { seen += current // Let others publish. @@ -69,7 +69,7 @@ fun test3(workers: Array) { } while (true) } futures.forEach { - it.result() + it.result } assertEquals(seen.size, workers.size) } @@ -79,33 +79,33 @@ fun test4() { AtomicReference(Data(1)) } assertFailsWith { - AtomicReference().compareAndSwap(null, Data(2)) + AtomicReference(null).compareAndSwap(null, Data(2)) } } fun test5() { assertFailsWith { - AtomicReference().set(Data(2)) + AtomicReference(null).value = Data(2) } - val ref = AtomicReference() + val ref = AtomicReference(null) val value = Data(3).freeze() - assertEquals(null, ref.get()) - ref.set(value) - assertEquals(3, ref.get()!!.value) + assertEquals(null, ref.value) + ref.value = value + assertEquals(3, ref.value!!.value) } fun test6() { - val int = AtomicInt() - int.set(239) - assertEquals(239, int.get()) - val long = AtomicLong() - long.set(239L) - assertEquals(239L, long.get()) + val int = AtomicInt(0) + int.value = 239 + assertEquals(239, int.value) + val long = AtomicLong(0) + long.value = 239L + assertEquals(239L, long.value) } @Test fun runTest() { val COUNT = 20 - val workers = Array(COUNT, { _ -> startWorker()}) + val workers = Array(COUNT, { _ -> Worker.start()}) test1(workers) test2(workers) @@ -115,7 +115,7 @@ fun test6() { test6() workers.forEach { - it.requestTermination().consume { _ -> } + it.requestTermination().result } println("OK") } \ No newline at end of file diff --git a/backend.native/tests/runtime/workers/enum_identity.kt b/backend.native/tests/runtime/workers/enum_identity.kt index 3f3b0ff99db..eb8d4010988 100644 --- a/backend.native/tests/runtime/workers/enum_identity.kt +++ b/backend.native/tests/runtime/workers/enum_identity.kt @@ -17,8 +17,8 @@ data class Foo(val kind: A) // Enums are shared between threads so identity should be kept. @Test fun runTest() { - val result = startWorker().schedule(TransferMode.CHECKED, { Foo(A.B) }, { input -> + val result = Worker.start().execute(TransferMode.SAFE, { Foo(A.B) }, { input -> input.kind == A.B - }).result() + }).result println(result) } \ No newline at end of file diff --git a/backend.native/tests/runtime/workers/freeze0.kt b/backend.native/tests/runtime/workers/freeze0.kt index 86838324473..0eff9602588 100644 --- a/backend.native/tests/runtime/workers/freeze0.kt +++ b/backend.native/tests/runtime/workers/freeze0.kt @@ -14,12 +14,12 @@ data class SharedDataMember(val double: Double) data class SharedData(val string: String, val int: Int, val member: SharedDataMember) @Test fun runTest() { - val worker = startWorker() + val worker = Worker.start() // Create immutable shared data. val immutable = SharedData("Hello", 10, SharedDataMember(0.1)).freeze() println("frozen bit is ${immutable.isFrozen}") - val future = worker.schedule(TransferMode.CHECKED, { immutable } ) { + val future = worker.execute(TransferMode.SAFE, { immutable } ) { input -> println("Worker: $input") input @@ -27,6 +27,6 @@ data class SharedData(val string: String, val int: Int, val member: SharedDataMe future.consume { result -> println("Main: $result") } - worker.requestTermination().result() + worker.requestTermination().result println("OK") } \ No newline at end of file diff --git a/backend.native/tests/runtime/workers/freeze2.kt b/backend.native/tests/runtime/workers/freeze2.kt index efbe1735da8..c0ede937cf9 100644 --- a/backend.native/tests/runtime/workers/freeze2.kt +++ b/backend.native/tests/runtime/workers/freeze2.kt @@ -59,32 +59,32 @@ data class Data(var int: Int) assertFailsWith { a8[1] = 2.0 } // Ensure that String and integral boxes are frozen by default, by passing local to the worker. - val worker = startWorker() + val worker = Worker.start() var data: Any = "Hello" + " " + "world" assert(data.isFrozen) - worker.schedule(TransferMode.CHECKED, { data } ) { + worker.execute(TransferMode.SAFE, { data } ) { input -> println("Worker 1: $input") - }.result() + }.result data = 42 assert(data.isFrozen) - worker.schedule(TransferMode.CHECKED, { data } ) { + worker.execute(TransferMode.SAFE, { data } ) { input -> println("Worker2: $input") - }.result() + }.result data = 239.0 assert(data.isFrozen) - worker.schedule(TransferMode.CHECKED, { data } ) { + worker.execute(TransferMode.SAFE, { data } ) { input -> println("Worker3: $input") - }.result() + }.result data = 'a' assert(data.isFrozen) - worker.schedule(TransferMode.CHECKED, { data } ) { + worker.execute(TransferMode.SAFE, { data } ) { input -> println("Worker4: $input") - }.result() + }.result - worker.requestTermination().result() + worker.requestTermination().result println("OK") } \ No newline at end of file diff --git a/backend.native/tests/runtime/workers/lazy0.kt b/backend.native/tests/runtime/workers/lazy0.kt index be032dc29ba..cf900d3e5d0 100644 --- a/backend.native/tests/runtime/workers/lazy0.kt +++ b/backend.native/tests/runtime/workers/lazy0.kt @@ -36,9 +36,9 @@ fun testSingleData(workers: Array) { val set = mutableSetOf() for (attempt in 1 .. 3) { val futures = Array(workers.size, { workerIndex -> - workers[workerIndex].schedule(TransferMode.CHECKED, { "" }) { _ -> Immutable2.y } + workers[workerIndex].execute(TransferMode.SAFE, { "" }) { _ -> Immutable2.y } }) - futures.forEach { set += it.result() } + futures.forEach { set += it.result } } assertEquals(set.size, 1) assertEquals(set.single(), Immutable2.y) @@ -48,9 +48,9 @@ fun testFrozenLazy(workers: Array) { val set = mutableSetOf() for (attempt in 1 .. 3) { val futures = Array(workers.size, { workerIndex -> - workers[workerIndex].schedule(TransferMode.CHECKED, { "" }) { _ -> Immutable3.x } + workers[workerIndex].execute(TransferMode.SAFE, { "" }) { _ -> Immutable3.x } }) - futures.forEach { set += it.result() } + futures.forEach { set += it.result } } assertEquals(1, set.size) assertEquals(Immutable3.x, set.single()) @@ -77,7 +77,7 @@ fun testLiquidLazy() { assertEquals(42, Immutable.x) val COUNT = 5 - val workers = Array(COUNT, { _ -> startWorker()}) + val workers = Array(COUNT, { _ -> Worker.start()}) testSingleData(workers) testFrozenLazy(workers) testLiquidLazy() diff --git a/backend.native/tests/runtime/workers/worker0.kt b/backend.native/tests/runtime/workers/worker0.kt index 736987ba2cf..fc7009d1381 100644 --- a/backend.native/tests/runtime/workers/worker0.kt +++ b/backend.native/tests/runtime/workers/worker0.kt @@ -10,13 +10,13 @@ import kotlin.test.* import kotlin.native.concurrent.* @Test fun runTest() { - val worker = startWorker() - val future = worker.schedule(TransferMode.CHECKED, { "Input" }) { + val worker = Worker.start() + val future = worker.execute(TransferMode.SAFE, { "Input" }) { input -> input + " processed" } future.consume { result -> println("Got $result") } - worker.requestTermination().consume { _ -> } + worker.requestTermination().result println("OK") } \ No newline at end of file diff --git a/backend.native/tests/runtime/workers/worker1.kt b/backend.native/tests/runtime/workers/worker1.kt index a9c434d9d08..3f86cd876f3 100644 --- a/backend.native/tests/runtime/workers/worker1.kt +++ b/backend.native/tests/runtime/workers/worker1.kt @@ -11,11 +11,11 @@ import kotlin.native.concurrent.* @Test fun runTest() { val COUNT = 5 - val workers = Array(COUNT, { _ -> startWorker()}) + val workers = Array(COUNT, { _ -> Worker.start()}) for (attempt in 1 .. 3) { val futures = Array(workers.size, - { i -> workers[i].schedule(TransferMode.CHECKED, { "$attempt: Input $i" }) + { i -> workers[i].execute(TransferMode.SAFE, { "$attempt: Input $i" }) { input -> input + " processed" } }) futures.forEachIndexed { index, future -> @@ -29,7 +29,7 @@ import kotlin.native.concurrent.* } } workers.forEach { - it.requestTermination().consume { _ -> } + it.requestTermination().result } println("OK") } \ No newline at end of file diff --git a/backend.native/tests/runtime/workers/worker2.kt b/backend.native/tests/runtime/workers/worker2.kt index 730fe73363c..c831b103dfc 100644 --- a/backend.native/tests/runtime/workers/worker2.kt +++ b/backend.native/tests/runtime/workers/worker2.kt @@ -14,10 +14,10 @@ data class WorkerResult(val intResult: Int, val stringResult: String) @Test fun runTest() { val COUNT = 5 - val workers = Array(COUNT, { _ -> startWorker()}) + val workers = Array(COUNT, { _ -> Worker.start()}) for (attempt in 1 .. 3) { - val futures = Array(workers.size, { workerIndex -> workers[workerIndex].schedule(TransferMode.CHECKED, { + val futures = Array(workers.size, { workerIndex -> workers[workerIndex].execute(TransferMode.SAFE, { WorkerArgument(workerIndex, "attempt $attempt") }) { input -> var sum = 0 for (i in 0..input.intParam * 1000) { @@ -33,12 +33,13 @@ data class WorkerResult(val intResult: Int, val stringResult: String) ready.forEach { it.consume { result -> if (result.stringResult != "attempt $attempt result") throw Error("Unexpected $result") - consumed++ } + consumed++ + } } } } workers.forEach { - it.requestTermination().consume { _ -> } + it.requestTermination().result } println("OK") } \ No newline at end of file diff --git a/backend.native/tests/runtime/workers/worker3.kt b/backend.native/tests/runtime/workers/worker3.kt index 9e4415b07ae..57f22eecf6a 100644 --- a/backend.native/tests/runtime/workers/worker3.kt +++ b/backend.native/tests/runtime/workers/worker3.kt @@ -18,19 +18,19 @@ data class WorkerResult(val intResult: Int, val stringResult: String) } fun main(args: Array) { - val worker = startWorker() + val worker = Worker.start() val dataParam = DataParam(17) val future = try { - worker.schedule(TransferMode.CHECKED, - { WorkerArgument(42, dataParam) }, - { input -> WorkerResult(input.intParam, input.dataParam.toString() + " result") } - ) + worker.execute(TransferMode.SAFE, + { WorkerArgument(42, dataParam) }) { + input -> WorkerResult(input.intParam, input.dataParam.toString() + " result") + } } catch (e: IllegalStateException) { null } if (future != null) println("Fail 1") if (dataParam.int != 17) println("Fail 2") - worker.requestTermination().consume { _ -> } + worker.requestTermination().result println("OK") } \ No newline at end of file diff --git a/backend.native/tests/runtime/workers/worker4.kt b/backend.native/tests/runtime/workers/worker4.kt index 254b79af312..44c30b86a30 100644 --- a/backend.native/tests/runtime/workers/worker4.kt +++ b/backend.native/tests/runtime/workers/worker4.kt @@ -10,13 +10,13 @@ import kotlin.test.* import kotlin.native.concurrent.* @Test fun runTest() { - val worker = startWorker() - val future = worker.schedule(TransferMode.CHECKED, { 41 }) { + val worker = Worker.start() + val future = worker.execute(TransferMode.SAFE, { 41 }) { input -> input + 1 } future.consume { result -> println("Got $result") } - worker.requestTermination().consume { _ -> } + worker.requestTermination().result println("OK") } \ No newline at end of file diff --git a/backend.native/tests/runtime/workers/worker5.kt b/backend.native/tests/runtime/workers/worker5.kt index 05f7d5f5c0e..16e825c1b45 100644 --- a/backend.native/tests/runtime/workers/worker5.kt +++ b/backend.native/tests/runtime/workers/worker5.kt @@ -10,13 +10,13 @@ import kotlin.test.* import kotlin.native.concurrent.* @Test fun runTest() { - val worker = startWorker() - val future = worker.schedule(TransferMode.CHECKED, { "zzz" }) { + val worker = Worker.start() + val future = worker.execute(TransferMode.SAFE, { "zzz" }) { input -> input.length } future.consume { result -> println("Got $result") } - worker.requestTermination().consume { _ -> } + worker.requestTermination().result println("OK") } \ No newline at end of file diff --git a/backend.native/tests/runtime/workers/worker6.kt b/backend.native/tests/runtime/workers/worker6.kt index 41548b9ada6..276cd66e1ea 100644 --- a/backend.native/tests/runtime/workers/worker6.kt +++ b/backend.native/tests/runtime/workers/worker6.kt @@ -10,13 +10,13 @@ import kotlin.test.* import kotlin.native.concurrent.* @Test fun runTest() { - val worker = startWorker() - val future = worker.schedule(TransferMode.CHECKED, { 42 }) { + val worker = Worker.start() + val future = worker.execute(TransferMode.SAFE, { 42 }) { input -> input.toString() } future.consume { result -> println("Got $result") } - worker.requestTermination().consume { _ -> } + worker.requestTermination().result println("OK") } \ No newline at end of file diff --git a/backend.native/tests/runtime/workers/worker7.kt b/backend.native/tests/runtime/workers/worker7.kt index 4cc17c337a0..b6c1f379a1a 100644 --- a/backend.native/tests/runtime/workers/worker7.kt +++ b/backend.native/tests/runtime/workers/worker7.kt @@ -10,13 +10,13 @@ import kotlin.test.* import kotlin.native.concurrent.* @Test fun runTest() { - val worker = startWorker() - val future = worker.schedule(TransferMode.CHECKED, { "Input" }) { + val worker = Worker.start() + val future = worker.execute(TransferMode.SAFE, { "Input" }) { input -> println(input) } future.consume { result -> println("Got $result") } - worker.requestTermination().consume { _ -> } + worker.requestTermination().result println("OK") } \ No newline at end of file diff --git a/backend.native/tests/runtime/workers/worker8.kt b/backend.native/tests/runtime/workers/worker8.kt index 3992ac276f4..72ebb86b2fd 100644 --- a/backend.native/tests/runtime/workers/worker8.kt +++ b/backend.native/tests/runtime/workers/worker8.kt @@ -14,19 +14,19 @@ data class SharedDataMember(val double: Double) data class SharedData(val string: String, val int: Int, val member: SharedDataMember) @Test fun runTest() { - val worker = startWorker() + val worker = Worker.start() // Here we do rather strange thing. To test object detach API we detach object graph, - // pass C pointer as a value to worker, where we manually reattached passed value. - val future = worker.schedule(TransferMode.CHECKED, { - detachObjectGraph { SharedData("Hello", 10, SharedDataMember(0.1)) } - } ) { - inputC -> - val input = attachObjectGraph(inputC) + // pass detached graph to a worker, where we manually reattached passed value. + val future = worker.execute(TransferMode.SAFE, { + DetachedObjectGraph { SharedData("Hello", 10, SharedDataMember(0.1)) }.asCPointer() + }) { + inputDetached -> + val input = DetachedObjectGraph(inputDetached).attach() println(input) } future.consume { result -> println("Got $result") } - worker.requestTermination().consume { _ -> } + worker.requestTermination().result println("OK") } \ No newline at end of file diff --git a/backend.native/tests/runtime/workers/worker9.kt b/backend.native/tests/runtime/workers/worker9.kt index d6e7afdc752..45c97533543 100644 --- a/backend.native/tests/runtime/workers/worker9.kt +++ b/backend.native/tests/runtime/workers/worker9.kt @@ -11,14 +11,14 @@ import kotlin.native.concurrent.* @Test fun runTest() { withLock { println("zzz") } - val worker = startWorker() - val future = worker.schedule(TransferMode.CHECKED, {}) { + val worker = Worker.start() + val future = worker.execute(TransferMode.SAFE, {}) { withLock { println("42") } } - future.result() - worker.requestTermination().result() + future.result + worker.requestTermination().result println("OK") } diff --git a/runtime/src/main/cpp/Atomic.cpp b/runtime/src/main/cpp/Atomic.cpp index cee9b3c1094..41dee6ec815 100644 --- a/runtime/src/main/cpp/Atomic.cpp +++ b/runtime/src/main/cpp/Atomic.cpp @@ -32,6 +32,11 @@ template void setImpl(KRef thiz, T value) { atomicSet(location, value); } +template T getImpl(KRef thiz) { + volatile T* location = reinterpret_cast(thiz + 1); + return atomicGet(location); +} + template T addAndGetImpl(KRef thiz, T delta) { volatile T* location = reinterpret_cast(thiz + 1); return atomicAdd(location, delta); @@ -42,6 +47,11 @@ template T compareAndSwapImpl(KRef thiz, T expectedValue, T newValu return compareAndSwap(location, expectedValue, newValue); } +template KBoolean compareAndSetImpl(KRef thiz, T expectedValue, T newValue) { + volatile T* location = reinterpret_cast(thiz + 1); + return compareAndSet(location, expectedValue, newValue); +} + inline AtomicReferenceLayout* asAtomicReference(KRef thiz) { return reinterpret_cast(thiz + 1); } @@ -58,54 +68,105 @@ KInt Kotlin_AtomicInt_compareAndSwap(KRef thiz, KInt expectedValue, KInt newValu return compareAndSwapImpl(thiz, expectedValue, newValue); } +KBoolean Kotlin_AtomicInt_compareAndSet(KRef thiz, KInt expectedValue, KInt newValue) { + return compareAndSetImpl(thiz, expectedValue, newValue); +} + void Kotlin_AtomicInt_set(KRef thiz, KInt newValue) { setImpl(thiz, newValue); } +KInt Kotlin_AtomicInt_get(KRef thiz) { + return getImpl(thiz); +} + KLong Kotlin_AtomicLong_addAndGet(KRef thiz, KLong delta) { return addAndGetImpl(thiz, delta); } +#ifdef __mips +static int lock64 = 0; +#endif + KLong Kotlin_AtomicLong_compareAndSwap(KRef thiz, KLong expectedValue, KLong newValue) { #ifdef __mips // Potentially huge performance penalty, but correct. // TODO: reconsider, once target MIPS can do proper 64-bit CAS. - static int lock = 0; - while (compareAndSwap(&lock, 0, 1) != 0); + while (compareAndSwap(&lock64, 0, 1) != 0); KLong* address = reinterpret_cast(thiz + 1); KLong old = *address; if (old == expectedValue) { *address = newValue; } - compareAndSwap(&lock, 1, 0); + compareAndSwap(&lock64, 1, 0); return old; #else return compareAndSwapImpl(thiz, expectedValue, newValue); #endif } +KBoolean Kotlin_AtomicLong_compareAndSet(KRef thiz, KLong expectedValue, KLong newValue) { +#ifdef __mips + // Potentially huge performance penalty, but correct. + // TODO: reconsider, once target MIPS can do proper 64-bit CAS. + KBoolean result = false; + while (compareAndSwap(&lock64, 0, 1) != 0); + KLong* address = reinterpret_cast(thiz + 1); + KLong old = *address; + if (old == expectedValue) { + result = true; + *address = newValue; + } + compareAndSwap(&lock64, 1, 0); + return result; +#else + return compareAndSetImpl(thiz, expectedValue, newValue); +#endif +} + void Kotlin_AtomicLong_set(KRef thiz, KLong newValue) { #ifdef __mips // Potentially huge performance penalty, but correct. // TODO: reconsider, once target MIPS can do proper 64-bit atomic store. - static int lock = 0; - while (compareAndSwap(&lock, 0, 1) != 0); + while (compareAndSwap(&lock64, 0, 1) != 0); KLong* address = reinterpret_cast(thiz + 1); *address = newValue; - compareAndSwap(&lock, 1, 0); + compareAndSwap(&lock64, 1, 0); #else setImpl(thiz, newValue); #endif } +KLong Kotlin_AtomicLong_get(KRef thiz) { +#ifdef __mips + // Potentially huge performance penalty, but correct. + // TODO: reconsider, once target MIPS can do proper 64-bit atomic store. + while (compareAndSwap(&lock64, 0, 1) != 0); + KLong* address = reinterpret_cast(thiz + 1); + KLong value = *address; + compareAndSwap(&lock64, 1, 0); + return value; +#else + return getImpl(thiz); +#endif +} + KNativePtr Kotlin_AtomicNativePtr_compareAndSwap(KRef thiz, KNativePtr expectedValue, KNativePtr newValue) { return compareAndSwapImpl(thiz, expectedValue, newValue); } +KBoolean Kotlin_AtomicNativePtr_compareAndSet(KRef thiz, KNativePtr expectedValue, KNativePtr newValue) { + return compareAndSetImpl(thiz, expectedValue, newValue); +} + void Kotlin_AtomicNativePtr_set(KRef thiz, KNativePtr newValue) { setImpl(thiz, newValue); } +KNativePtr Kotlin_AtomicNativePtr_get(KRef thiz) { + return getImpl(thiz); +} + void Kotlin_AtomicReference_checkIfFrozen(KRef value) { if (value != nullptr && !value->container()->permanentOrFrozen()) { ThrowInvalidMutabilityException(value); @@ -119,6 +180,15 @@ OBJ_GETTER(Kotlin_AtomicReference_compareAndSwap, KRef thiz, KRef expectedValue, RETURN_RESULT_OF(SwapRefLocked, &ref->value_, expectedValue, newValue, &ref->lock_); } +KBoolean Kotlin_AtomicReference_compareAndSet(KRef thiz, KRef expectedValue, KRef newValue) { + Kotlin_AtomicReference_checkIfFrozen(newValue); + // See Kotlin_AtomicReference_get() for explanations, why locking is needed. + AtomicReferenceLayout* ref = asAtomicReference(thiz); + ObjHolder holder; + auto old = SwapRefLocked(&ref->value_, expectedValue, newValue, &ref->lock_, holder.slot()); + return old == expectedValue; +} + void Kotlin_AtomicReference_set(KRef thiz, KRef newValue) { Kotlin_AtomicReference_checkIfFrozen(newValue); AtomicReferenceLayout* ref = asAtomicReference(thiz); diff --git a/runtime/src/main/cpp/Atomic.h b/runtime/src/main/cpp/Atomic.h index f957b6bd4fa..26e25289a6d 100644 --- a/runtime/src/main/cpp/Atomic.h +++ b/runtime/src/main/cpp/Atomic.h @@ -25,6 +25,21 @@ ALWAYS_INLINE inline T compareAndSwap(volatile T* where, T expectedValue, T newV #endif } +template +ALWAYS_INLINE inline bool compareAndSet(volatile T* where, T expectedValue, T newValue) { +#ifndef KONAN_NO_THREADS + return __sync_bool_compare_and_swap(where, expectedValue, newValue); +#else + T oldValue = *where; + if (oldValue == expectedValue) { + *where = newValue; + return true; + } + return false; +#endif +} + + template ALWAYS_INLINE inline void atomicSet(volatile T* where, T what) { #ifndef KONAN_NO_THREADS @@ -34,4 +49,15 @@ ALWAYS_INLINE inline void atomicSet(volatile T* where, T what) { #endif } +template +ALWAYS_INLINE inline T atomicGet(volatile T* where) { +#ifndef KONAN_NO_THREADS + T what; + __atomic_load(where, &what, __ATOMIC_SEQ_CST); + return what; +#else + return *where; +#endif +} + #endif // RUNTIME_ATOMIC_H \ No newline at end of file diff --git a/runtime/src/main/cpp/Worker.cpp b/runtime/src/main/cpp/Worker.cpp index c5a04ba15b6..e4801e2276b 100644 --- a/runtime/src/main/cpp/Worker.cpp +++ b/runtime/src/main/cpp/Worker.cpp @@ -512,7 +512,7 @@ KInt Kotlin_Worker_requestTerminationWorkerInternal(KInt id, KBoolean processSch return requestTermination(id, processScheduledJobs); } -KInt Kotlin_Worker_scheduleInternal(KInt id, KInt transferMode, KRef producer, KNativePtr job) { +KInt Kotlin_Worker_executeInternal(KInt id, KInt transferMode, KRef producer, KNativePtr job) { return schedule(id, transferMode, producer, job); } diff --git a/runtime/src/main/kotlin/kotlin/native/concurrent/Atomics.kt b/runtime/src/main/kotlin/kotlin/native/concurrent/Atomics.kt index 1cb40df8839..aa71b58316e 100644 --- a/runtime/src/main/kotlin/kotlin/native/concurrent/Atomics.kt +++ b/runtime/src/main/kotlin/kotlin/native/concurrent/Atomics.kt @@ -10,124 +10,166 @@ import kotlin.native.internal.NoReorderFields import kotlin.native.SymbolName import kotlinx.cinterop.NativePtr +/** + * Atomic values and freezing: atomics [AtomicInt], [AtomicLong], [AtomicNativePtr] and [AtomicReference] + * are unique types with regard to freezing. Namely, they provide mutating operations, while can participate + * in frozen subgraphs. So shared frozen objects can have fields of atomic types. + */ @Frozen -class AtomicInt(private var value: Int = 0) { +public class AtomicInt(private var value_: Int) { + + public var value: Int + get() = getImpl() + set(new) = setImpl(new) /** * Increments the value by [delta] and returns the new value. */ @SymbolName("Kotlin_AtomicInt_addAndGet") - external fun addAndGet(delta: Int): Int + external public fun addAndGet(delta: Int): Int /** * Compares value with [expected] and replaces it with [new] value if values matches. * Returns the old value. */ @SymbolName("Kotlin_AtomicInt_compareAndSwap") - external fun compareAndSwap(expected: Int, new: Int): Int + external public fun compareAndSwap(expected: Int, new: Int): Int /** - * Sets the new atomic value. + * Compares value with [expected] and replaces it with [new] value if values matches. + * Returns true if successful. */ - @SymbolName("Kotlin_AtomicInt_set") - external fun set(new: Int): Unit + @SymbolName("Kotlin_AtomicInt_compareAndSet") + external public fun compareAndSet(expected: Int, new: Int): Boolean /** * Increments value by one. */ - fun increment(): Int = addAndGet(1) + public fun increment(): Unit { + addAndGet(1) + } /** * Decrements value by one. */ - fun decrement(): Int = addAndGet(-1) - - /** - * Returns the current value. - */ - fun get(): Int = value + public fun decrement(): Unit { + addAndGet(-1) + } /** * Returns the string representation of this object. */ - public override fun toString(): String = "AtomicInt $value" + public override fun toString(): String = value.toString() + + // Implementation details. + @SymbolName("Kotlin_AtomicInt_set") + private external fun setImpl(new: Int): Unit + + @SymbolName("Kotlin_AtomicInt_get") + private external fun getImpl(): Int } @Frozen -class AtomicLong(private var value: Long = 0) { +public class AtomicLong(private var value_: Long = 0) { + + public var value: Long + get() = getImpl() + set(new) = setImpl(new) /** * Increments the value by [delta] and returns the new value. */ @SymbolName("Kotlin_AtomicLong_addAndGet") - external fun addAndGet(delta: Long): Long + external public fun addAndGet(delta: Long): Long /** * Increments the value by [delta] and returns the new value. */ - fun addAndGet(delta: Int): Long = addAndGet(delta.toLong()) + public fun addAndGet(delta: Int): Long = addAndGet(delta.toLong()) /** * Compares value with [expected] and replaces it with [new] value if values matches. * Returns the old value. */ @SymbolName("Kotlin_AtomicLong_compareAndSwap") - external fun compareAndSwap(expected: Long, new: Long): Long + external public fun compareAndSwap(expected: Long, new: Long): Long /** - * Sets the new atomic value. + * Compares value with [expected] and replaces it with [new] value if values matches. + * Returns true if successful. */ - @SymbolName("Kotlin_AtomicLong_set") - external fun set(new: Long): Unit + @SymbolName("Kotlin_AtomicLong_compareAndSet") + external public fun compareAndSet(expected: Long, new: Long): Boolean /** * Increments value by one. */ - fun increment(): Long = addAndGet(1L) + public fun increment(): Unit { + addAndGet(1L) + } /** * Decrements value by one. */ - fun decrement(): Long = addAndGet(-1L) - - /** - * Returns the current value. - */ - fun get(): Long = value + fun decrement(): Unit { + addAndGet(-1L) + } /** * Returns the string representation of this object. */ - public override fun toString(): String = "AtomicLong $value" + public override fun toString(): String = value.toString() + + // Implementation details. + @SymbolName("Kotlin_AtomicLong_set") + private external fun setImpl(new: Long): Unit + + @SymbolName("Kotlin_AtomicLong_get") + private external fun getImpl(): Long } @Frozen -class AtomicNativePtr(private var value: NativePtr) { +public class AtomicNativePtr(private var value_: NativePtr) { + + public var value: NativePtr + get() = getImpl() + set(new) = setImpl(new) + /** * Compares value with [expected] and replaces it with [new] value if values matches. * Returns the old value. */ @SymbolName("Kotlin_AtomicNativePtr_compareAndSwap") - external fun compareAndSwap(expected: NativePtr, new: NativePtr): NativePtr + external public fun compareAndSwap(expected: NativePtr, new: NativePtr): NativePtr /** - * Returns the current value. + * Compares value with [expected] and replaces it with [new] value if values matches. */ - fun get(): NativePtr = value -} + @SymbolName("Kotlin_AtomicNativePtr_compareAndSet") + external public fun compareAndSet(expected: NativePtr, new: NativePtr): Boolean + + /** + * Returns the string representation of this object. + */ + public override fun toString(): String = value.toString() -@SymbolName("Kotlin_AtomicReference_checkIfFrozen") -external private fun checkIfFrozen(ref: Any?) + // Implementation details. + @SymbolName("Kotlin_AtomicNativePtr_set") + private external fun setImpl(new: NativePtr): Unit + + @SymbolName("Kotlin_AtomicNativePtr_get") + private external fun getImpl(): NativePtr +} /** * An atomic reference to a frozen Kotlin object. Can be used in concurrent scenarious - * and must be zeroed out (with `compareAndSwap(get(), null)`) once no longer needed. - * Otherwise memory leak could happen. + * but frequently shall be of nullable type and be zeroed out (with `compareAndSwap(get(), null)`) + * once no longer needed. Otherwise memory leak could happen. */ @Frozen @NoReorderFields -class AtomicReference(private var value: T? = null) { - // A spinlock to fix potential ARC race. Not an AtomicInt just for the effeciency sake. +public class AtomicReference(private var value_: T) { + // A spinlock to fix potential ARC race. private var lock: Int = 0 /** @@ -139,81 +181,36 @@ class AtomicReference(private var value: T? = null) { } /** - * Compares value with [expected] and replaces it with [new] value if values matches. + * Sets the value to [new] value * If [new] value is not null, it must be frozen or permanent object, otherwise an * @InvalidMutabilityException is thrown. - * Returns the old value. */ - @SymbolName("Kotlin_AtomicReference_compareAndSwap") - external public fun compareAndSwap(expected: T?, new: T?): T? + public var value: T + get() = @Suppress("UNCHECKED_CAST")(getImpl() as T) + set(new) = setImpl(new) /** - * Sets the value to [new] value + * Compares value with [expected] and replaces it with [new] value if values matches. * If [new] value is not null, it must be frozen or permanent object, otherwise an * @InvalidMutabilityException is thrown. + * Returns the old value. */ - @SymbolName("Kotlin_AtomicReference_set") - external public fun set(new: T?): Unit + @SymbolName("Kotlin_AtomicReference_compareAndSwap") + external public fun compareAndSwap(expected: T, new: T): T + + @SymbolName("Kotlin_AtomicReference_compareAndSet") + external public fun compareAndSet(expected: T, new: T): Boolean /** - * Returns the current value. + * Returns the string representation of this object. */ - @SymbolName("Kotlin_AtomicReference_get") - external public fun get(): T? -} - -internal object UNINITIALIZED { - // So that single-threaded configs can use those as well. - init { - freeze() - } -} + public override fun toString(): String = "Atomic reference to $value" -internal object INITIALIZING { - // So that single-threaded configs can use those as well. - init { - freeze() - } -} + // Implementation details. + @SymbolName("Kotlin_AtomicReference_set") + private external fun setImpl(new: Any?): Unit -@Frozen -internal class AtomicLazyImpl(initializer: () -> T) : Lazy { - private val initializer_ = AtomicReference?>(initializer.freeze()) - private val value_ = AtomicReference(UNINITIALIZED) - - override val value: T - get() { - if (value_.compareAndSwap(UNINITIALIZED, INITIALIZING) === UNINITIALIZED) { - // We execute exclusively here. - val ctor = initializer_.get() - if (ctor != null && initializer_.compareAndSwap(ctor, null) === ctor) { - value_.compareAndSwap(INITIALIZING, ctor().freeze()) - } else { - // Something wrong. - assert(false) - } - } - var result: Any? - do { - result = value_.get() - } while (result === INITIALIZING) - - assert(result !== UNINITIALIZED && result !== INITIALIZING) - @Suppress("UNCHECKED_CAST") - return result as T - } - - // Racy! - override fun isInitialized(): Boolean = value_.get() !== UNINITIALIZED - - override fun toString(): String = if (isInitialized()) - value_.get().toString() else "Lazy value not initialized yet." -} + @SymbolName("Kotlin_AtomicReference_get") + private external fun getImpl(): Any? -/** - * Atomic lazy initializer, could be used in frozen objects, freezes initializing lambda, - * so use very carefully. Also, as with other uses of an @AtomicReference may potentially - * leak memory, so it is recommended to use `atomicLazy` in cases of objects living forever, - * such as object signletons, or in cases where it's guaranteed not to have cyclical garbage. - */ -public fun atomicLazy(initializer: () -> T): Lazy = AtomicLazyImpl(initializer) \ No newline at end of file +} \ No newline at end of file diff --git a/runtime/src/main/kotlin/kotlin/native/concurrent/Freezing.kt b/runtime/src/main/kotlin/kotlin/native/concurrent/Freezing.kt index 38a1120b797..3e882e91321 100644 --- a/runtime/src/main/kotlin/kotlin/native/concurrent/Freezing.kt +++ b/runtime/src/main/kotlin/kotlin/native/concurrent/Freezing.kt @@ -5,10 +5,9 @@ package kotlin.native.concurrent -import kotlin.native.internal.ExportForCppRuntime - /** * Exception thrown whenever freezing is not possible. + * [blocker] is an object preventing freezing, usually one marked with [ensureNeverFrozen] earlier. */ public class FreezingException(toFreeze: Any, blocker: Any) : RuntimeException("freezing of $toFreeze has failed, first blocker is $blocker") @@ -23,31 +22,20 @@ public class InvalidMutabilityException(where: Any) : * Freezes object subgraph reachable from this object. Frozen objects can be freely * shared between threads/workers. */ -fun T.freeze(): T { +public fun T.freeze(): T { freezeInternal(this) return this } -val Any?.isFrozen +/** + * Checks if given object is null or frozen or permanent (i.e. instantiated at compile-time). + */ +public val Any?.isFrozen get() = isFrozenInternal(this) - /** - * This function ensures that if we see such an object during freezing attempt - freeze fails and FreezingException - * is thrown. Is object is already frozen - FreezingException is thrown immediately. + * This function ensures that if we see such an object during freezing attempt - freeze fails and + * [FreezingException] is thrown. Is object is already frozen - [FreezingException] is thrown immediately. */ @SymbolName("Kotlin_Worker_ensureNeverFrozen") -external fun Any.ensureNeverFrozen() - -@SymbolName("Kotlin_Worker_freezeInternal") -internal external fun freezeInternal(it: Any?) - -@SymbolName("Kotlin_Worker_isFrozenInternal") -internal external fun isFrozenInternal(it: Any?): Boolean - -@ExportForCppRuntime -internal fun ThrowFreezingException(toFreeze: Any, blocker: Any): Nothing = - throw FreezingException(toFreeze, blocker) - -@ExportForCppRuntime -internal fun ThrowInvalidMutabilityException(where: Any): Nothing = throw InvalidMutabilityException(where) +public external fun Any.ensureNeverFrozen() \ No newline at end of file diff --git a/runtime/src/main/kotlin/kotlin/native/concurrent/Future.kt b/runtime/src/main/kotlin/kotlin/native/concurrent/Future.kt index 30726382995..984bbce2634 100644 --- a/runtime/src/main/kotlin/kotlin/native/concurrent/Future.kt +++ b/runtime/src/main/kotlin/kotlin/native/concurrent/Future.kt @@ -5,13 +5,7 @@ package kotlin.native.concurrent -import kotlin.native.SymbolName -import kotlin.native.internal.ExportForCppRuntime - -/** - * Unique identifier of the future. Futures can be used from other workers. - */ -typealias FutureId = Int +import kotlin.native.internal.Frozen /** * State of the future object. @@ -29,7 +23,8 @@ enum class FutureState(val value: Int) { /** * Class representing abstract computation, whose result may become available in the future. */ -public inline class Future(val id: FutureId) { +@Frozen +public class Future internal constructor(val id: Int) { /** * Blocks execution until the future is ready. */ @@ -45,7 +40,11 @@ public inline class Future(val id: FutureId) { throw IllegalStateException("Future is cancelled") } - public fun result(): T = consume { it -> it } + /** + * Blocks execution until the future is ready. Second attempt to get will result in an error. + */ + public val result: T + get() = consume { it -> it } public val state: FutureState get() = FutureState.values()[stateOfFuture(id)] @@ -53,6 +52,8 @@ public inline class Future(val id: FutureId) { public override fun equals(other: Any?): Boolean = (other is Future<*>) && (id == other.id) public override fun hashCode(): Int = id + + override public fun toString(): String = "future $id" } /** @@ -81,19 +82,4 @@ public fun Collection>.waitForMultipleFutures(millis: Int): Set Any?, + job: CPointer>): Future = + Future(executeInternal(worker.id, mode.value, producer, job)) + +@SymbolName("Kotlin_Worker_startInternal") +external internal fun startInternal(): Int + +@SymbolName("Kotlin_Worker_requestTerminationWorkerInternal") +external internal fun requestTerminationInternal(id: Int, processScheduledJobs: Boolean): Int + +@SymbolName("Kotlin_Worker_executeInternal") +external internal fun executeInternal( + id: Int, mode: Int, producer: () -> Any?, job: CPointer>): Int + +@ExportForCppRuntime +internal fun ThrowWorkerUnsupported(): Unit = + throw UnsupportedOperationException("Workers are not supported") + +@ExportForCppRuntime +internal fun ThrowWorkerInvalidState(): Unit = + throw IllegalStateException("Illegal transfer state") + +@ExportForCppRuntime +internal fun WorkerLaunchpad(function: () -> Any?) = function() + +@PublishedApi +@SymbolName("Kotlin_Worker_detachObjectGraphInternal") +external internal fun detachObjectGraphInternal(mode: Int, producer: () -> Any?): NativePtr + +@PublishedApi +@SymbolName("Kotlin_Worker_attachObjectGraphInternal") +external internal fun attachObjectGraphInternal(stable: NativePtr): Any? + +@SymbolName("Kotlin_Worker_freezeInternal") +internal external fun freezeInternal(it: Any?) + +@SymbolName("Kotlin_Worker_isFrozenInternal") +internal external fun isFrozenInternal(it: Any?): Boolean + +@ExportForCppRuntime +internal fun ThrowFreezingException(toFreeze: Any, blocker: Any): Nothing = + throw FreezingException(toFreeze, blocker) + +@ExportForCppRuntime +internal fun ThrowInvalidMutabilityException(where: Any): Nothing = throw InvalidMutabilityException(where) + +@SymbolName("Kotlin_AtomicReference_checkIfFrozen") +external internal fun checkIfFrozen(ref: Any?) diff --git a/runtime/src/main/kotlin/kotlin/native/concurrent/Lazy.kt b/runtime/src/main/kotlin/kotlin/native/concurrent/Lazy.kt index bb688bbd4db..112d6f4cc60 100644 --- a/runtime/src/main/kotlin/kotlin/native/concurrent/Lazy.kt +++ b/runtime/src/main/kotlin/kotlin/native/concurrent/Lazy.kt @@ -5,6 +5,7 @@ package kotlin.native.concurrent +import kotlin.native.internal.Frozen import kotlin.native.internal.NoReorderFields @SymbolName("Konan_ensureAcyclicAndSet") @@ -61,4 +62,59 @@ internal class FreezeAwareLazyImpl(initializer: () -> T) : Lazy { override fun toString(): String = if (isInitialized()) value.toString() else "Lazy value not initialized yet." -} \ No newline at end of file +} + +internal object UNINITIALIZED { + // So that single-threaded configs can use those as well. + init { + freeze() + } +} + +internal object INITIALIZING { + // So that single-threaded configs can use those as well. + init { + freeze() + } +} + +@Frozen +internal class AtomicLazyImpl(initializer: () -> T) : Lazy { + private val initializer_ = AtomicReference?>(initializer.freeze()) + private val value_ = AtomicReference(UNINITIALIZED) + + override val value: T + get() { + if (value_.compareAndSwap(UNINITIALIZED, INITIALIZING) === UNINITIALIZED) { + // We execute exclusively here. + val ctor = initializer_.value + if (ctor != null && initializer_.compareAndSet(ctor, null)) { + value_.compareAndSet(INITIALIZING, ctor().freeze()) + } else { + // Something wrong. + assert(false) + } + } + var result: Any? + do { + result = value_.value + } while (result === INITIALIZING) + + assert(result !== UNINITIALIZED && result !== INITIALIZING) + @Suppress("UNCHECKED_CAST") + return result as T + } + + override fun isInitialized(): Boolean = value_.value !== UNINITIALIZED + + override fun toString(): String = if (isInitialized()) + value_.value.toString() else "Lazy value not initialized yet." +} + +/** + * Atomic lazy initializer, could be used in frozen objects, freezes initializing lambda, + * so use very carefully. Also, as with other uses of an @AtomicReference may potentially + * leak memory, so it is recommended to use `atomicLazy` in cases of objects living forever, + * such as object signletons, or in cases where it's guaranteed not to have cyclical garbage. + */ +public fun atomicLazy(initializer: () -> T): Lazy = AtomicLazyImpl(initializer) \ No newline at end of file diff --git a/runtime/src/main/kotlin/kotlin/native/concurrent/Lock.kt b/runtime/src/main/kotlin/kotlin/native/concurrent/Lock.kt index 3cce083b10c..84d2c5bf083 100644 --- a/runtime/src/main/kotlin/kotlin/native/concurrent/Lock.kt +++ b/runtime/src/main/kotlin/kotlin/native/concurrent/Lock.kt @@ -30,7 +30,7 @@ internal class Lock { } 0 -> { // We just got the lock. - assert(reenterCount_.get() == 0) + assert(reenterCount_.value == 0) break@loop } } @@ -38,7 +38,7 @@ internal class Lock { } fun unlock() { - if (reenterCount_.get() > 0) { + if (reenterCount_.value > 0) { reenterCount_.decrement() } else { val lockData = CurrentThread.id.hashCode() diff --git a/runtime/src/main/kotlin/kotlin/native/concurrent/ObjectTransfer.kt b/runtime/src/main/kotlin/kotlin/native/concurrent/ObjectTransfer.kt index bb9569bfb65..c574b635382 100644 --- a/runtime/src/main/kotlin/kotlin/native/concurrent/ObjectTransfer.kt +++ b/runtime/src/main/kotlin/kotlin/native/concurrent/ObjectTransfer.kt @@ -6,57 +6,70 @@ package kotlin.native.concurrent import kotlinx.cinterop.* +import kotlin.native.internal.Frozen /** * Object Transfer Basics. * * Objects can be passed between threads in one of two possible modes. * - * - CHECKED - object subgraph is checked to be not reachable by other globals or locals, and passed + * - SAFE - object subgraph is checked to be not reachable by other globals or locals, and passed * if so, otherwise an exception is thrown - * - UNCHECKED - object is blindly passed to another worker, if there are references + * - UNSAFE - object is blindly passed to another worker, if there are references * left in the passing worker - it may lead to crash or program malfunction * - * Checked mode checks if object is no longer used in passing worker, using memory-management + * Safe mode checks if object is no longer used in passing worker, using memory-management * specific algorithm (ARC implementation relies on trial deletion on object graph rooted in - * passed object), and throws IllegalStateException if object graph rooted in transferred object + * passed object), and throws an [IllegalStateException] if object graph rooted in transferred object * is reachable by some other means, * - * Unchecked mode, intended for most performance crititcal operations, where object graph ownership - * is expected to be correct (such as application debugged earlier in CHECKED mode), just transfers + * Unsafe mode is intended for most performance critical operations, where object graph ownership + * is expected to be correct (such as application debugged earlier in [SAFE] mode), just transfers * ownership without further checks. * * Note, that for some cases cycle collection need to be done to ensure that dead cycles do not affect - * reachability of passed object graph. See `konan.internal.GC.collect()`. + * reachability of passed object graph. See `[kotlin.native.internal.GC.collect]`. * */ -enum class TransferMode(val value: Int) { - CHECKED(0), - UNCHECKED(1) // USE UNCHECKED MODE ONLY IF ABSOLUTELY SURE WHAT YOU'RE DOING!!! +public enum class TransferMode(val value: Int) { + SAFE(0), + UNSAFE(1) // USE UNSAFE MODE ONLY IF ABSOLUTELY SURE WHAT YOU'RE DOING!!! } -/** - * Creates stable pointer to object, ensuring associated object subgraph is disjoint in specified mode - * ([TransferMode.CHECKED] by default). - * It could be stored to C variable or passed to another thread, where it could be retrieved with [attachObjectGraph]. - */ -inline fun detachObjectGraph(mode: TransferMode = TransferMode.CHECKED, noinline producer: () -> T): COpaquePointer? = - detachObjectGraphInternal(mode.value, producer as () -> Any?) +@Frozen +public class DetachedObjectGraph internal constructor(pointer: NativePtr) { + @PublishedApi + internal val stable = AtomicNativePtr(pointer) + + /** + * Creates stable pointer to object, ensuring associated object subgraph is disjoint in specified mode + * ([TransferMode.SAFE] by default). + * Raw value returned by [asCPointer] could be stored to a C variable or passed to another Kotlin machine. + */ + public constructor(mode: TransferMode = TransferMode.SAFE, producer: () -> T) + : this(detachObjectGraphInternal(mode.value, producer as () -> Any?)) + + /** + * Creates detached object graph from value stored earlier in a C raw pointer. + */ + public constructor(pointer: COpaquePointer?) : this(pointer?.rawValue ?: NativePtr.NULL) + + /** + * Returns raw C pointer value. + */ + public fun asCPointer(): COpaquePointer? = interpretCPointer(stable.value) +} /** * Attaches previously detached with [detachObjectGraph] object subgraph. * Please note, that once object graph is attached, the stable pointer does not have sense anymore, * and shall be discarded. */ -inline fun attachObjectGraph(stable: COpaquePointer?): T = - attachObjectGraphInternal(stable) as T - -// Private APIs. -@PublishedApi -@SymbolName("Kotlin_Worker_detachObjectGraphInternal") -external internal fun detachObjectGraphInternal(mode: Int, producer: () -> Any?): COpaquePointer? - -@PublishedApi -@SymbolName("Kotlin_Worker_attachObjectGraphInternal") -external internal fun attachObjectGraphInternal(stable: COpaquePointer?): Any? - +public inline fun DetachedObjectGraph.attach(): T { + var rawStable: NativePtr + do { + rawStable = stable.value + } while (!stable.compareAndSet(rawStable, NativePtr.NULL)) + val result = attachObjectGraphInternal(rawStable) as T + return result +} diff --git a/runtime/src/main/kotlin/kotlin/native/concurrent/Worker.kt b/runtime/src/main/kotlin/kotlin/native/concurrent/Worker.kt index d10f4a91dbd..934f92c701e 100644 --- a/runtime/src/main/kotlin/kotlin/native/concurrent/Worker.kt +++ b/runtime/src/main/kotlin/kotlin/native/concurrent/Worker.kt @@ -5,8 +5,8 @@ package kotlin.native.concurrent -import kotlin.native.SymbolName import kotlin.native.internal.ExportForCppRuntime +import kotlin.native.internal.Frozen import kotlin.native.internal.VolatileLambda import kotlinx.cinterop.* @@ -21,26 +21,29 @@ import kotlinx.cinterop.* * workers as needed. */ -/** - * Unique identifier of the worker. Workers can be used from other workers. - */ -typealias WorkerId = Int - - /** * Class representing worker. */ -// TODO: make me value class! -public class Worker(val id: WorkerId) { +@Frozen +public class Worker private constructor(val id: Int) { + companion object { + /** + * Start new scheduling primitive, such as thread, to accept new tasks via `execute` interface. + * Typically new worker may be needed for computations offload to another core, for IO it may be + * better to use non-blocking IO combined with more lightweight coroutines. + */ + public fun start(): Worker = Worker(startInternal()) + } + /** * Requests termination of the worker. `processScheduledJobs` controls is we shall wait * until all scheduled jobs processed, or terminate immediately. */ public fun requestTermination(processScheduledJobs: Boolean = true) = - Future(requestTerminationInternal(id, processScheduledJobs)) + Future(requestTerminationInternal(id, processScheduledJobs)) /** - * Schedule a job for further execution in the worker. Schedule is a two-phase operation, + * Plan job for further execution in the worker. Execute is a two-phase operation, * first `producer` function is executed, and resulting object and whatever it refers to * is analyzed for being an isolated object subgraph, if in checked mode. * Afterwards, this disconnected object graph and `job` function pointer is being added to jobs queue @@ -50,10 +53,10 @@ public class Worker(val id: WorkerId) { * the future, can use result of worker's computations. */ @Suppress("UNUSED_PARAMETER") - public fun schedule(mode: TransferMode, producer: () -> T1, @VolatileLambda job: (T1) -> T2): Future = + public fun execute(mode: TransferMode, producer: () -> T1, @VolatileLambda job: (T1) -> T2): Future = /** * This function is a magical operation, handled by lowering in the compiler, and replaced with call to - * scheduleImpl(worker, mode, producer, job) + * executeImpl(worker, mode, producer, job) * but first ensuring that `job` parameter doesn't capture any state. */ throw RuntimeException("Shall not be called directly") @@ -61,38 +64,6 @@ public class Worker(val id: WorkerId) { override public fun equals(other: Any?): Boolean = (other is Worker) && (id == other.id) override public fun hashCode(): Int = id -} - -/** - * Start new scheduling primitive, such as thread, to accept new tasks via `schedule` interface. - * Typically new worker may be needed for computations offload to another core, for IO it may be - * better to use non-blocking IO combined with more lightweight coroutines. - */ -public fun startWorker(): Worker = Worker(startInternal()) - -// Private APIs. -@kotlin.native.internal.ExportForCompiler -internal fun scheduleImpl(worker: Worker, mode: TransferMode, producer: () -> Any?, - job: CPointer>): Future = - Future(scheduleInternal(worker.id, mode.value, producer, job)) - -@SymbolName("Kotlin_Worker_startInternal") -external internal fun startInternal(): WorkerId - -@SymbolName("Kotlin_Worker_requestTerminationWorkerInternal") -external internal fun requestTerminationInternal(id: WorkerId, processScheduledJobs: Boolean): FutureId - -@SymbolName("Kotlin_Worker_scheduleInternal") -external internal fun scheduleInternal( - id: WorkerId, mode: Int, producer: () -> Any?, job: CPointer>): FutureId - -@ExportForCppRuntime -internal fun ThrowWorkerUnsupported(): Unit = - throw UnsupportedOperationException("Workers are not supported") - -@ExportForCppRuntime -internal fun ThrowWorkerInvalidState(): Unit = - throw IllegalStateException("Illegal transfer state") -@ExportForCppRuntime -internal fun WorkerLaunchpad(function: () -> Any?) = function() + override public fun toString(): String = "worker $id" +} \ No newline at end of file diff --git a/runtime/src/main/kotlin/kotlin/random/Random.kt b/runtime/src/main/kotlin/kotlin/random/Random.kt index b1a2f32be9d..4f9abdd104f 100644 --- a/runtime/src/main/kotlin/kotlin/random/Random.kt +++ b/runtime/src/main/kotlin/kotlin/random/Random.kt @@ -19,13 +19,13 @@ public abstract class NativeRandom { * Random generator seed value. */ var seed: Long - get() = _seed.get() + get() = _seed.value set(value) = update(mult(value)) private fun mult(value: Long) = (value xor MULTIPLIER) and ((1L shl 48) - 1) - private fun update(seed: Long) { - _seed.compareAndSwap(_seed.get(), seed) + private fun update(seed: Long): Unit { + _seed.value = seed } /** diff --git a/runtime/src/main/kotlin/kotlin/text/regex/AbstractCharClass.kt b/runtime/src/main/kotlin/kotlin/text/regex/AbstractCharClass.kt index f9269590619..08f19d41450 100644 --- a/runtime/src/main/kotlin/kotlin/text/regex/AbstractCharClass.kt +++ b/runtime/src/main/kotlin/kotlin/text/regex/AbstractCharClass.kt @@ -87,10 +87,10 @@ internal abstract class AbstractCharClass : SpecialToken() { get() = this - private val surrogates_ = AtomicReference() + private val surrogates_ = AtomicReference(null) val surrogates: AbstractCharClass get() { - surrogates_.get()?.let { + surrogates_.value?.let { return it } val result = object : AbstractCharClass() { @@ -104,15 +104,15 @@ internal abstract class AbstractCharClass : SpecialToken() { } } result.setNegative(this.altSurrogates) - surrogates_.compareAndSwap(null, result.freeze()) - return surrogates_.get()!! + surrogates_.compareAndSet(null, result.freeze()) + return surrogates_.value!! } - private val withoutSurrogates_ = AtomicReference() + private val withoutSurrogates_ = AtomicReference(null) val withoutSurrogates: AbstractCharClass get() { - withoutSurrogates_.get()?.let { + withoutSurrogates_.value?.let { return it } val result = object : AbstractCharClass() { @@ -129,8 +129,8 @@ internal abstract class AbstractCharClass : SpecialToken() { } result.setNegative(isNegative()) result.mayContainSupplCodepoints = mayContainSupplCodepoints - withoutSurrogates_ .compareAndSwap(null, result.freeze()) - return withoutSurrogates_.get()!! + withoutSurrogates_ .compareAndSet(null, result.freeze()) + return withoutSurrogates_.value!! } @@ -561,8 +561,8 @@ internal abstract class AbstractCharClass : SpecialToken() { PF("Pf", { CachedCategory(CharCategory.FINAL_QUOTE_PUNCTUATION.value, false) }) } - private val classCache = Array>(CharClasses.values().size, { - AtomicReference() + private val classCache = Array>(CharClasses.values().size, { + AtomicReference(null) }) private val classCacheMap = CharClasses.values().associate { it -> it.regexName to it } @@ -578,9 +578,9 @@ internal abstract class AbstractCharClass : SpecialToken() { fun getPredefinedClass(name: String, negative: Boolean): AbstractCharClass { val charClass = classCacheMap[name] ?: throw PatternSyntaxException("No such character class") - val cachedClass = classCache[charClass.ordinal].get() ?: run { + val cachedClass = classCache[charClass.ordinal].value ?: run { classCache[charClass.ordinal].compareAndSwap(null, charClass.factory().freeze()) - classCache[charClass.ordinal].get()!! + classCache[charClass.ordinal].value!! } return cachedClass.getValue(negative) } diff --git a/samples/globalState/src/main/kotlin/Global.kt b/samples/globalState/src/main/kotlin/Global.kt index 3e62443989c..345455c0447 100644 --- a/samples/globalState/src/main/kotlin/Global.kt +++ b/samples/globalState/src/main/kotlin/Global.kt @@ -50,9 +50,9 @@ fun main(args: Array) { sharedData.f = 0.5f sharedData.string = "Hello Kotlin!".cstr.getPointer(arena) // Here we create detached mutable object, which could be later reattached by another thread. - sharedData.kotlinObject = detachObjectGraph { + sharedData.kotlinObject = DetachedObjectGraph { SharedData("A string", 42, SharedDataMember(2.39)) - } + }.asCPointer() // Here we create shared frozen object reference, val stableRef = StableRef.create(SharedData("Shared", 239, SharedDataMember(2.71)).freeze()) sharedData.frozenKotlinObject = stableRef.asCPointer() @@ -67,12 +67,12 @@ fun main(args: Array) { argC -> initRuntimeIfNeeded() dumpShared("thread2") - val kotlinObject = attachObjectGraph(sharedData.kotlinObject) - val arg = attachObjectGraph(argC) + val kotlinObject = DetachedObjectGraph(sharedData.kotlinObject).attach() + val arg = DetachedObjectGraph(argC).attach() println("thread arg is $arg Kotlin object is $kotlinObject frozen is $globalObject") // Workaround for compiler issue. null as COpaquePointer? - }, detachObjectGraph { SharedDataMember(3.14)} ).ensureUnixCallResult("pthread_create") + }, DetachedObjectGraph { SharedDataMember(3.14)}.asCPointer() ).ensureUnixCallResult("pthread_create") pthread_join(thread.value, null).ensureUnixCallResult("pthread_join") } diff --git a/samples/objc/src/main/kotlin/Window.kt b/samples/objc/src/main/kotlin/Window.kt index 463ea9a526b..3430d8f7aca 100644 --- a/samples/objc/src/main/kotlin/Window.kt +++ b/samples/objc/src/main/kotlin/Window.kt @@ -1,5 +1,4 @@ -import kotlin.native.concurrent.attachObjectGraph -import kotlin.native.concurrent.detachObjectGraph +import kotlin.native.concurrent.* import kotlinx.cinterop.* import platform.AppKit.* import platform.Foundation.* @@ -32,12 +31,12 @@ private class Controller : NSObject() { @ObjCAction fun onClick() { // Execute some async action on button click. - dispatch_async_f(asyncQueue, detachObjectGraph { + dispatch_async_f(asyncQueue, DetachedObjectGraph { Data(clock_gettime_nsec_np(CLOCK_REALTIME.convert())) - }, staticCFunction { + }.asCPointer(), staticCFunction { it -> initRuntimeIfNeeded() - val data = attachObjectGraph(it) + val data = DetachedObjectGraph(it).attach() println("in async: $data") }) } diff --git a/samples/videoplayer/src/main/kotlin/DecoderWorker.kt b/samples/videoplayer/src/main/kotlin/DecoderWorker.kt index f2a07d2beb6..4d8b928ad36 100644 --- a/samples/videoplayer/src/main/kotlin/DecoderWorker.kt +++ b/samples/videoplayer/src/main/kotlin/DecoderWorker.kt @@ -325,19 +325,14 @@ private class Decoder( fun audioVideoSynced() = (audio?.isSynced() ?: true) || done() } -class DecoderWorker : Disposable { +class DecoderWorker(val worker: Worker) : Disposable { // This class must have no other state, but this worker object. // All the real state must be stored on the worker's side. - private val worker: Worker - - constructor() { worker = startWorker() } - constructor(id: WorkerId) { worker = Worker(id) } + constructor() : this(Worker.start()) override fun dispose() { - worker.requestTermination().result() + worker.requestTermination().result } - - val workerId get() = worker.id fun initDecode(context: AVFormatContext, useVideo: Boolean = true, useAudio: Boolean = true): CodecInfo { // Find the first video/audio streams. @@ -362,7 +357,7 @@ class DecoderWorker : Disposable { } // Pack all state and pass it to the worker. - worker.schedule(TransferMode.CHECKED, { + worker.execute(TransferMode.SAFE, { Decoder(context.ptr, videoStreamIndex, audioStreamIndex, videoContext, audioContext) @@ -371,7 +366,7 @@ class DecoderWorker : Disposable { } fun start(videoOutput: VideoOutput, audioOutput: AudioOutput) { - worker.schedule(TransferMode.CHECKED, + worker.execute(TransferMode.SAFE, { Pair( videoOutput.toVideoDecoderOutput(), audioOutput.toAudioDecoderOutput()) @@ -381,27 +376,26 @@ class DecoderWorker : Disposable { } fun stop() { - worker.schedule(TransferMode.CHECKED, { null }) { + worker.execute(TransferMode.SAFE, { null }) { decoder?.run { dispose() decoder = null } - }.result() + }.result } fun done(): Boolean = - worker.schedule(TransferMode.CHECKED, { null }) { decoder?.done() ?: true }.result() + worker.execute(TransferMode.SAFE, { null }) { decoder?.done() ?: true }.result - fun requestDecodeChunk() { - worker.schedule(TransferMode.CHECKED, { null }) { decoder?.decodeIfNeeded() }.result() - } + fun requestDecodeChunk() = + worker.execute(TransferMode.SAFE, { null }) { decoder?.decodeIfNeeded() }.result fun nextVideoFrame(): VideoFrame? = - worker.schedule(TransferMode.CHECKED, { null }) { decoder?.nextVideoFrame() }.result() + worker.execute(TransferMode.SAFE, { null }) { decoder?.nextVideoFrame() }.result fun nextAudioFrame(size: Int): AudioFrame? = - worker.schedule(TransferMode.CHECKED, { size }) { decoder?.nextAudioFrame(it) }.result() + worker.execute(TransferMode.SAFE, { size }) { decoder?.nextAudioFrame(it) }.result fun audioVideoSynced(): Boolean = - worker.schedule(TransferMode.CHECKED, { null }) { decoder?.audioVideoSynced() ?: true }.result() + worker.execute(TransferMode.SAFE, { null }) { decoder?.audioVideoSynced() ?: true }.result } diff --git a/samples/videoplayer/src/main/kotlin/SDLAudio.kt b/samples/videoplayer/src/main/kotlin/SDLAudio.kt index d81be1f96c5..53f4bb6d037 100644 --- a/samples/videoplayer/src/main/kotlin/SDLAudio.kt +++ b/samples/videoplayer/src/main/kotlin/SDLAudio.kt @@ -14,6 +14,7 @@ * limitations under the License. */ +import kotlin.native.concurrent.Worker import kotlinx.cinterop.* import sdl.* import platform.posix.memset @@ -32,7 +33,7 @@ private fun SampleFormat.toSDLFormat(): SDL_AudioFormat? = when (this) { } class SDLAudio(val player: VideoPlayer) : DisposableContainer() { - private val threadData = arena.alloc().ptr + private val workerStable = StableRef.create(player.worker) private var state = State.STOPPED fun start(audio: AudioOutput) { @@ -48,10 +49,9 @@ class SDLAudio(val player: VideoPlayer) : DisposableContainer() { channels = audio.channels.convert() silence = 0u samples = 4096u - userdata = threadData + userdata = workerStable.asCPointer() callback = staticCFunction(::audioCallback) } - threadData.pointed.value = player.workerId val realSpec = alloc() if (SDL_OpenAudio(spec.ptr, realSpec.ptr) < 0) throwSDLError("SDL_OpenAudio") @@ -72,6 +72,7 @@ class SDLAudio(val player: VideoPlayer) : DisposableContainer() { fun stop() { pause() state = state.transition(State.PAUSED, State.STOPPED) { SDL_CloseAudio() } + workerStable.dispose() } } @@ -83,7 +84,7 @@ private fun audioCallback(userdata: COpaquePointer?, buffer: CPointer? // This handler will be invoked in the audio thread, so reinit runtime. kotlin.native.initRuntimeIfNeeded() val decoder = decoder ?: - DecoderWorker(userdata!!.reinterpret().pointed.value).also { decoder = it } + DecoderWorker(userdata!!.asStableRef().get()).also { decoder = it } var outPosition = 0 while (outPosition < length) { val frame = decoder.nextAudioFrame(length - outPosition) diff --git a/samples/videoplayer/src/main/kotlin/VideoPlayer.kt b/samples/videoplayer/src/main/kotlin/VideoPlayer.kt index d1d60ad5ba9..3fd6445ef86 100644 --- a/samples/videoplayer/src/main/kotlin/VideoPlayer.kt +++ b/samples/videoplayer/src/main/kotlin/VideoPlayer.kt @@ -41,15 +41,15 @@ enum class PlayMode { } class VideoPlayer(val requestedSize: Dimensions?) : DisposableContainer() { + private val decoder = disposable { DecoderWorker() } private val video = disposable { SDLVideo() } private val audio = disposable { SDLAudio(this) } private val input = disposable { SDLInput(this) } - private val decoder = disposable { DecoderWorker() } private val now = arena.alloc().ptr private var state = State.STOPPED - val workerId get() = decoder.workerId + val worker get() = decoder.worker var lastFrameTime = 0.0 fun stop() { diff --git a/samples/workers/README.md b/samples/workers/README.md index 1c21ae9fc29..ab8ea65b515 100644 --- a/samples/workers/README.md +++ b/samples/workers/README.md @@ -9,15 +9,13 @@ and connected to other worker. This relies on fact that memory management engine can ensure, that one worker doesn't keep references to certain object and whatever it refers to, and so the object could be safely transferred to another worker. -Workers do not share any state (i.e. globals and Kotlin static objects have different -values in different workers), but share executable code of the program and some -immutable data, such as immutable binary blobs. But Kotlin objects can be transferred +Workers do not share mutable state, but share executable code of the program and some +immutable data, such as immutable blobs. But Kotlin objects can be transferred between workers, as long, as they do not refer to objects, having external references. -The transfer is implemented with the function `schedule()` having the following signature +The transfer is implemented with the function `execute()` having the following signature - fun - schedule(mode: TransferMode, + fun execute(mode: TransferMode, producer: () -> T1, @VolatileLambda job: (T1) -> T2): Future diff --git a/samples/workers/src/main/kotlin/Workers.kt b/samples/workers/src/main/kotlin/Workers.kt index a8d169da7e5..80337fb4722 100644 --- a/samples/workers/src/main/kotlin/Workers.kt +++ b/samples/workers/src/main/kotlin/Workers.kt @@ -5,10 +5,10 @@ data class WorkerResult(val intResult: Int, val stringResult: String) fun main(args: Array) { val COUNT = 5 - val workers = Array(COUNT, { _ -> startWorker()}) + val workers = Array(COUNT, { _ -> Worker.start()}) for (attempt in 1 .. 3) { - val futures = Array(workers.size, { workerIndex -> workers[workerIndex].schedule(TransferMode.CHECKED, { + val futures = Array(workers.size, { workerIndex -> workers[workerIndex].execute(TransferMode.SAFE, { WorkerArgument(workerIndex, "attempt $attempt") }) { input -> var sum = 0 for (i in 0..input.intParam * 1000) { @@ -24,12 +24,13 @@ fun main(args: Array) { ready.forEach { it.consume { result -> if (result.stringResult != "attempt $attempt result") throw Error("Unexpected $result") - consumed++ } + consumed++ + } } } } workers.forEach { - it.requestTermination().consume { _ -> } + it.requestTermination().result } println("OK") }