From 94c587f82470a32908676709c9172cf46ff8a153 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Wed, 28 Mar 2018 17:32:27 +0300 Subject: [PATCH] Adaptive spinning, parking and load balancing mechanism: Spin and park adaptively to reduce CPU consumption Provide mechanism to work offloading when worker detects it's overloaded Implement hand-rolled thread-local xorshift random, which appears to be 15% faster than ThreadLocal (ThreadLocalRandom is unavailable on Java 1.6) Use queue size as metrics for work stealing in addition to task deadline --- .../jmh/kotlin/benchmarks/LaunchBenchmark.kt | 11 +- .../benchmarks/ParametrizedDispatcherBase.kt | 4 +- .../actors/PingPongActorBenchmark.kt | 4 +- .../scheduling/CoroutineScheduler.kt | 207 +++++++++++++++--- .../ExperimentalCoroutineDispatcher.kt | 16 +- .../experimental/scheduling/RandomUtils.kt | 10 - .../experimental/scheduling/Tasks.kt | 9 +- .../experimental/scheduling/WorkQueue.kt | 120 ++++++---- .../CoroutineSchedulerStressTest.kt | 96 ++++++-- .../scheduling/CoroutineSchedulerTest.kt | 28 +++ .../experimental/scheduling/TestTimeSource.kt | 2 +- .../scheduling/WorkQueueStressTest.kt | 13 +- .../experimental/scheduling/WorkQueueTest.kt | 56 ++++- 13 files changed, 431 insertions(+), 145 deletions(-) delete mode 100644 core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/RandomUtils.kt diff --git a/benchmarks/src/jmh/kotlin/benchmarks/LaunchBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/LaunchBenchmark.kt index 23042da7f1..684aa80ed3 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/LaunchBenchmark.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/LaunchBenchmark.kt @@ -1,25 +1,24 @@ package benchmarks import kotlinx.coroutines.experimental.launch -import kotlinx.coroutines.experimental.scheduling.ForkedMarker import org.openjdk.jmh.annotations.* import java.util.concurrent.CyclicBarrier import java.util.concurrent.TimeUnit /* * Benchmark to measure scheduling overhead in comparison with FJP. - * LaunchBenchmark.massiveLaunch experimental avgt 30 187.342 ± 20.244 us/op + * LaunchBenchmark.massiveLaunch experimental avgt 30 328.662 ± 52.789 us/op * LaunchBenchmark.massiveLaunch fjp avgt 30 179.762 ± 3.931 us/op */ @Warmup(iterations = 10, time = 1, timeUnit = TimeUnit.SECONDS) -@Measurement(iterations = 100000, time = 1, timeUnit = TimeUnit.SECONDS) -@Fork(value = 3, jvmArgsAppend = ["-XX:+PreserveFramePointer"]) +@Measurement(iterations = 10, time = 1, timeUnit = TimeUnit.SECONDS) +@Fork(value = 2) @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MICROSECONDS) @State(Scope.Benchmark) open class LaunchBenchmark : ParametrizedDispatcherBase() { - @Param("experimental") + @Param("experimental", "fjp") override var dispatcher: String = "fjp" private val jobsToLaunch = 100 @@ -31,7 +30,7 @@ open class LaunchBenchmark : ParametrizedDispatcherBase() { @Benchmark fun massiveLaunch() { repeat(submitters) { - launch(benchmarkContext + ForkedMarker) { + launch(benchmarkContext) { // Wait until all cores are occupied allLaunched.await() allLaunched.reset() diff --git a/benchmarks/src/jmh/kotlin/benchmarks/ParametrizedDispatcherBase.kt b/benchmarks/src/jmh/kotlin/benchmarks/ParametrizedDispatcherBase.kt index c3f1d574a8..b4da2009c5 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/ParametrizedDispatcherBase.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/ParametrizedDispatcherBase.kt @@ -18,7 +18,7 @@ import kotlin.coroutines.experimental.CoroutineContext abstract class ParametrizedDispatcherBase { abstract var dispatcher: String - lateinit var benchmarkContext: CoroutineContext // coroutinesContext clashes with scope parameter + lateinit var benchmarkContext: CoroutineContext // coroutineContext clashes with scope parameter var closeable: Closeable? = null @Setup @@ -26,7 +26,7 @@ abstract class ParametrizedDispatcherBase { benchmarkContext = when { dispatcher == "fjp" -> CommonPool dispatcher == "experimental" -> { - ExperimentalCoroutineDispatcher(CORES_COUNT) + ExperimentalCoroutineDispatcher(CORES_COUNT).also { closeable = it } } dispatcher.startsWith("ftp") -> { newFixedThreadPoolContext(dispatcher.substring(4).toInt(), dispatcher).also { closeable = it } diff --git a/benchmarks/src/jmh/kotlin/benchmarks/actors/PingPongActorBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/actors/PingPongActorBenchmark.kt index ffdd391780..9118224464 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/actors/PingPongActorBenchmark.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/actors/PingPongActorBenchmark.kt @@ -11,11 +11,13 @@ import kotlin.coroutines.experimental.CoroutineContext /* * Benchmark (dispatcher) Mode Cnt Score Error Units + * PingPongActorBenchmark.coresCountPingPongs experimental avgt 10 185.066 ± 21.692 ms/op * PingPongActorBenchmark.coresCountPingPongs fjp avgt 10 200.581 ± 22.669 ms/op * PingPongActorBenchmark.coresCountPingPongs ftp_1 avgt 10 494.334 ± 27.450 ms/op * PingPongActorBenchmark.coresCountPingPongs ftp_2 avgt 10 498.754 ± 27.743 ms/op * PingPongActorBenchmark.coresCountPingPongs ftp_8 avgt 10 804.498 ± 69.826 ms/op * + * PingPongActorBenchmark.singlePingPong experimental avgt 10 45.521 ± 3.281 ms/op * PingPongActorBenchmark.singlePingPong fjp avgt 10 217.005 ± 18.693 ms/op * PingPongActorBenchmark.singlePingPong ftp_1 avgt 10 57.632 ± 1.835 ms/op * PingPongActorBenchmark.singlePingPong ftp_2 avgt 10 112.723 ± 5.280 ms/op @@ -30,7 +32,7 @@ import kotlin.coroutines.experimental.CoroutineContext open class PingPongActorBenchmark : ParametrizedDispatcherBase() { data class Letter(val message: Any?, val sender: SendChannel) - @Param("fjp", "ftp_1", "ftp_2", "ftp_8", "experimental") + @Param("experimental") override var dispatcher: String = "fjp" @Benchmark diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineScheduler.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineScheduler.kt index 48df0b19b1..925f086fbc 100644 --- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineScheduler.kt +++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineScheduler.kt @@ -1,10 +1,12 @@ package kotlinx.coroutines.experimental.scheduling +import kotlinx.atomicfu.atomic import kotlinx.coroutines.experimental.Runnable import java.io.Closeable import java.util.* import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.Executor +import java.util.concurrent.TimeUnit import java.util.concurrent.locks.LockSupport /** @@ -13,9 +15,27 @@ import java.util.concurrent.locks.LockSupport class CoroutineScheduler(private val corePoolSize: Int) : Executor, Closeable { private val workers: Array - private val globalWorkQueue: Queue = ConcurrentLinkedQueue() + private val globalWorkQueue = ConcurrentLinkedQueue() + private val parkedWorkers = atomic(0) + private val random = Random() + @Volatile - private var isClosed = false + private var isTerminated = false + + companion object { + private const val STEAL_ATTEMPTS = 4 + private const val MAX_SPINS = 1000L + private const val MAX_YIELDS = 500L + @JvmStatic + private val MAX_PARK_TIME_NS = TimeUnit.SECONDS.toNanos(1) + @JvmStatic + private val MIN_PARK_TIME_NS = (WORK_STEALING_TIME_RESOLUTION_NS / 4).coerceAtLeast(10).coerceAtMost(MAX_PARK_TIME_NS) + + // Local queue 'offer' results + private const val ADDED = -1 + private const val ADDED_WITH_OFFLOADING = 0 // Added to the local queue, but pool requires additional worker to keep up + private const val NOT_ADDED = 1 + } init { require(corePoolSize >= 1, { "Expected positive core pool size, but was $corePoolSize" }) @@ -26,41 +46,111 @@ class CoroutineScheduler(private val corePoolSize: Int) : Executor, Closeable { override fun execute(command: Runnable) = dispatch(command) override fun close() { - isClosed = true + isTerminated = true } - fun dispatch(command: Runnable, intensive: Boolean = false) { - val task = TimedTask(System.nanoTime(), command) - if (!submitToLocalQueue(task, intensive)) { + fun dispatch(command: Runnable) { + val task = TimedTask(schedulerTimeSource.nanoTime(), command) + + val offerResult = submitToLocalQueue(task) + if (offerResult == ADDED) { + return + } + + if (offerResult == NOT_ADDED) { globalWorkQueue.add(task) } + + unparkIdleWorker() } - private fun submitToLocalQueue(task: Task, intensive: Boolean): Boolean { - val worker = Thread.currentThread() as? PoolWorker ?: return false - if (intensive && worker.localQueue.bufferSize > FORKED_TASK_OFFLOAD_THRESHOLD) return false - worker.localQueue.offer(task, globalWorkQueue) - return true + private fun unparkIdleWorker() { + // If no threads are parked don't try to wake anyone + val parked = parkedWorkers.value + if (parked == 0) { + return + } + + // Try to wake one worker + repeat(STEAL_ATTEMPTS) { + val victim = workers[random.nextInt(workers.size)] + if (victim.isParking) { + /* + * Benign data race, victim can wake up after this check, but before 'unpark' call succeeds, + * making first 'park' in next idle period a no-op + */ + LockSupport.unpark(victim) + return + } + } + } + + + private fun submitToLocalQueue(task: Task): Int { + val worker = Thread.currentThread() as? PoolWorker ?: return NOT_ADDED + if (worker.localQueue.offer(task, globalWorkQueue)) { + // We're close to queue capacity, wakeup anyone to steal + if (worker.localQueue.bufferSize > QUEUE_SIZE_OFFLOAD_THRESHOLD) { + return ADDED_WITH_OFFLOADING + } + + return ADDED + } + + return ADDED_WITH_OFFLOADING + } + + /** + * Returns a string identifying state of this scheduler for nicer debugging + */ + override fun toString(): String { + var parkedWorkers = 0 + val queueSizes = arrayListOf() + for (worker in workers) { + if (worker.isParking) { + ++parkedWorkers + } else { + queueSizes += worker.localQueue.bufferSize + } + } + + return "${super.toString()}[core pool size = ${workers.size}, " + + "parked workers = $parkedWorkers, " + + "active workers buffer sizes = $queueSizes, " + + "global queue size = ${globalWorkQueue.size}]" } - private inner class PoolWorker(index: Int) : Thread("CoroutinesScheduler-worker-$index") { + + internal inner class PoolWorker(index: Int) : Thread("CoroutinesScheduler-worker-$index") { init { isDaemon = true } val localQueue: WorkQueue = WorkQueue() + /** + * Time of last call to [unparkIdleWorker] due to missing tasks deadlines. + * Used as throttling mechanism to avoid unparking multiple threads when it's not really necessary. + */ + private var lastExhaustionTime = 0L @Volatile - var yields = 0 + var isParking = false + @Volatile + private var spins = 0L + private var yields = 0L + private var parkTimeNs = MIN_PARK_TIME_NS + private var rngState = random.nextInt() override fun run() { - while (!isClosed) { + while (!isTerminated) { try { val job = findTask() if (job == null) { - awaitWork() + // Wait for a job with potential park + idle() } else { - yields = 0 + idleReset() + checkExhaustion(job) job.task.run() } } catch (e: Throwable) { @@ -69,15 +159,77 @@ class CoroutineScheduler(private val corePoolSize: Int) : Executor, Closeable { } } - private fun awaitWork() { - // Temporary solution - if (++yields > 100000) { - LockSupport.parkNanos(WORK_STEALING_TIME_RESOLUTION / 2) + private fun checkExhaustion(job: Task) { + val parked = parkedWorkers.value + if (parked == 0) { + return + } + + // Check last exhaustion time to avoid the race between steal and next task execution + val now = schedulerTimeSource.nanoTime() + if (now - job.submissionTime >= WORK_STEALING_TIME_RESOLUTION_NS && now - lastExhaustionTime >= WORK_STEALING_TIME_RESOLUTION_NS * 5) { + lastExhaustionTime = now + unparkIdleWorker() + } + } + + /* + * Marsaglia xorshift RNG with period 2^32-1 for work stealing purposes. + * ThreadLocalRandom cannot be used to support Android and ThreadLocal is up to 15% slower on ktor benchmarks + */ + internal fun nextInt(upperBound: Int): Int { + rngState = rngState xor (rngState shl 13) + rngState = rngState xor (rngState shr 17) + rngState = rngState xor (rngState shl 5) + val mask = upperBound - 1 + // Fast path for power of two bound + if (mask and upperBound == 0) { + return rngState and mask } + + return (rngState and Int.MAX_VALUE) % upperBound + } + + private fun idle() { + /* + * Simple adaptive await of work: + * Spin on the volatile field with an empty loop in hope that new work will arrive, + * then start yielding to reduce CPU pressure, and finally start adaptive parking. + * + * The main idea is not to park while it's possible (otherwise throughput on asymmetric workloads suffers due to too frequent + * park/unpark calls and delays between job submission and thread queue checking) + */ + when { + spins < MAX_SPINS -> ++spins + ++yields <= MAX_YIELDS -> Thread.yield() + else -> { + if (!isParking) { + isParking = true + parkedWorkers.incrementAndGet() + } + + if (parkTimeNs < MAX_PARK_TIME_NS) { + parkTimeNs = (parkTimeNs * 1.5).toLong().coerceAtMost(MAX_PARK_TIME_NS) + } + + LockSupport.parkNanos(parkTimeNs) + } + } + } + + private fun idleReset() { + if (isParking) { + isParking = false + parkTimeNs = MIN_PARK_TIME_NS + parkedWorkers.decrementAndGet() + } + + spins = 0 + yields = 0 } private fun findTask(): Task? { - // TODO explain, probabilistic check with park counter? + // TODO probabilistic check if thread is not idle? var task: Task? = globalWorkQueue.poll() if (task != null) return task @@ -92,16 +244,17 @@ class CoroutineScheduler(private val corePoolSize: Int) : Executor, Closeable { return null } - while (true) { - val worker = workers[RANDOM_PROVIDER().nextInt(workers.size)] + // Probe a couple of workers + repeat(STEAL_ATTEMPTS) { + val worker = workers[nextInt(workers.size)] if (worker !== this) { - worker.localQueue.offloadWork(true) { - localQueue.offer(it, globalWorkQueue) + if (localQueue.trySteal(worker.localQueue, globalWorkQueue)) { + return@repeat } - - return localQueue.poll() } } + + return localQueue.poll() } } } diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/ExperimentalCoroutineDispatcher.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/ExperimentalCoroutineDispatcher.kt index 98dc459fe2..59dd732a20 100644 --- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/ExperimentalCoroutineDispatcher.kt +++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/ExperimentalCoroutineDispatcher.kt @@ -3,29 +3,23 @@ package kotlinx.coroutines.experimental.scheduling import kotlinx.coroutines.experimental.* import java.io.Closeable import java.util.concurrent.TimeUnit -import kotlin.coroutines.experimental.AbstractCoroutineContextElement import kotlin.coroutines.experimental.CoroutineContext -/** - * Unstable API and subject to change. - * Context marker which gives scheduler a hint that submitted jobs can be distributed among cores aggressively. - * Usually it's useful for massive jobs submission produced by single coroutine, e.g. data intensive fork-join tasks - * or fan-out notifications for a large number of listeners. - */ -object ForkedMarker : AbstractCoroutineContextElement(ForkedKey) - -private object ForkedKey : CoroutineContext.Key class ExperimentalCoroutineDispatcher(threads: Int = Runtime.getRuntime().availableProcessors()) : CoroutineDispatcher(), Delay, Closeable { private val coroutineScheduler = CoroutineScheduler(threads) override fun dispatch(context: CoroutineContext, block: Runnable) { - coroutineScheduler.dispatch(block, context[ForkedKey] != null) + coroutineScheduler.dispatch(block) } override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation) = DefaultExecutor.scheduleResumeAfterDelay(time, unit, continuation) override fun close() = coroutineScheduler.close() + override fun toString(): String { + return "${super.toString()}[scheduler = $coroutineScheduler]" + } + } diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/RandomUtils.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/RandomUtils.kt deleted file mode 100644 index 9ea7158107..0000000000 --- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/RandomUtils.kt +++ /dev/null @@ -1,10 +0,0 @@ -package kotlinx.coroutines.experimental.scheduling - -import java.util.* - -private val RANDOM = object : ThreadLocal() { - override fun initialValue() = Random() -} - -// Dynamic discovery is not yet supported -val RANDOM_PROVIDER = { RANDOM.get() } diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/Tasks.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/Tasks.kt index c4d6719be5..691ea64f0f 100644 --- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/Tasks.kt +++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/Tasks.kt @@ -5,11 +5,12 @@ import java.util.* internal typealias Task = TimedTask internal typealias GlobalQueue = Queue -internal val WORK_STEALING_TIME_RESOLUTION = readFromSystemProperties( - "kotlinx.coroutines.scheduler.resolution.us", 500L, String::toLongOrNull) +// 100us is default resolution +internal val WORK_STEALING_TIME_RESOLUTION_NS = readFromSystemProperties( + "kotlinx.coroutines.scheduler.resolution.ns", 100000L, String::toLongOrNull) -internal val FORKED_TASK_OFFLOAD_THRESHOLD = readFromSystemProperties( - "kotlinx.coroutines.scheduler.fork.threshold", 64L, String::toLongOrNull) +internal val QUEUE_SIZE_OFFLOAD_THRESHOLD = readFromSystemProperties( + "kotlinx.coroutines.scheduler.offload.threshold", 96L, String::toLongOrNull) internal var schedulerTimeSource: TimeSource = NanoTimeSource diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/WorkQueue.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/WorkQueue.kt index 4e7a9ee92a..f10687ff71 100644 --- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/WorkQueue.kt +++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/scheduling/WorkQueue.kt @@ -1,7 +1,6 @@ package kotlinx.coroutines.experimental.scheduling -import java.util.concurrent.atomic.AtomicInteger -import java.util.concurrent.atomic.AtomicReference +import kotlinx.atomicfu.atomic import java.util.concurrent.atomic.AtomicReferenceArray internal const val BUFFER_CAPACITY_BASE = 7 @@ -20,68 +19,90 @@ internal const val MASK = BUFFER_CAPACITY - 1 // 128 by default * E.g. submitted jobs [1, 2, 3, 4] will be executed in [4, 1, 2, 3] order. * * Work offloading - * When queue is full, half of existing tasks is offloaded to global queue which is regularly polled by other pool workers. - * Offloading occurs in LIFO order for the sake of implementation simplicity: offload should be extremely rare and occurs only in specific use-cases + * When the queue is full, half of existing tasks are offloaded to global queue which is regularly polled by other pool workers. + * Offloading occurs in LIFO order for the sake of implementation simplicity: offloads should be extremely rare and occurs only in specific use-cases * (e.g. when coroutine starts heavy fork-join-like computation), so fairness is not important. - * As an alternative, offloading directly to some [CoroutineScheduler.PoolWorker] may be used, but then strategy of selecting most idle worker + * As an alternative, offloading directly to some [CoroutineScheduler.PoolWorker] may be used, but then the strategy of selecting any idle worker * should be implemented and implementation should be aware multiple producers. */ internal class WorkQueue { - internal val bufferSize: Int get() = producerIndex.get() - consumerIndex.get() + internal val bufferSize: Int get() = producerIndex.value - consumerIndex.value private val buffer: AtomicReferenceArray = AtomicReferenceArray(BUFFER_CAPACITY) - private val lastScheduledTask: AtomicReference = AtomicReference(null) + private val lastScheduledTask = atomic(null) - private val producerIndex: AtomicInteger = AtomicInteger(0) - private val consumerIndex: AtomicInteger = AtomicInteger(0) + private val producerIndex = atomic(0) + private val consumerIndex = atomic(0) /** - * Retrieves and removes task from head of the queue - * Invariant: this method is called only by owner of the queue ([pollExternal] is not) + * Retrieves and removes task from the head of the queue + * Invariant: this method is called only by the owner of the queue ([pollExternal] is not) */ fun poll(): Task? { return lastScheduledTask.getAndSet(null) ?: pollExternal() } /** - * Invariant: this method is called only by owner of the queue + * Invariant: this method is called only by the owner of the queue + * * @param task task to put into local queue - * @param globalQueue fallback queue which is used when local queue is overflown + * @param globalQueue fallback queue which is used when the local queue is overflown + * @return true if no offloading happened, false otherwise */ - fun offer(task: Task, globalQueue: GlobalQueue) { + fun offer(task: Task, globalQueue: GlobalQueue): Boolean { while (true) { - val previous = lastScheduledTask.get() + val previous = lastScheduledTask.value if (lastScheduledTask.compareAndSet(previous, task)) { if (previous != null) { - addLast(previous, globalQueue) + return addLast(previous, globalQueue) } - return + return true } } } /** - * Offloads half of the current buffer to [sink] - * @param byTimer whether task deadline should be checked before offloading + * @return whether any task was stolen */ - inline fun offloadWork(byTimer: Boolean, sink: (Task) -> Unit) { - repeat((bufferSize / 2).coerceAtLeast(1)) { - if (bufferSize == 0) { // try to steal head if buffer is empty - val lastScheduled = lastScheduledTask.get() ?: return - if (!byTimer || schedulerTimeSource.nanoTime() - lastScheduled.submissionTime < WORK_STEALING_TIME_RESOLUTION) { - return - } + fun trySteal(victim: WorkQueue, globalQueue: GlobalQueue): Boolean { + val time = schedulerTimeSource.nanoTime() - if (lastScheduledTask.compareAndSet(lastScheduled, null)) { - sink(lastScheduled) - return - } + if (victim.bufferSize == 0) { + val lastScheduled = victim.lastScheduledTask.value ?: return false + if (time - lastScheduled.submissionTime < WORK_STEALING_TIME_RESOLUTION_NS) { + return false } - // TODO use batch drain and (if target queue allows) batch insert - val task = pollExternal { !byTimer || schedulerTimeSource.nanoTime() - it.submissionTime >= WORK_STEALING_TIME_RESOLUTION } - ?: return - sink(task) + if (victim.lastScheduledTask.compareAndSet(lastScheduled, null)) { + offer(lastScheduled, globalQueue) + return true + } + + return false + } + + /* + * Invariant: time is monotonically increasing (thanks to nanoTime), so we can stop as soon as we find the first task not satisfying a predicate. + * If queue size is larger than QUEUE_SIZE_OFFLOAD_THRESHOLD then unconditionally steal tasks over this limit to prevent possible queue overflow + */ + var stolen = false + repeat((victim.bufferSize / 2).coerceAtLeast(1)) { + val task = victim.pollExternal { time - it.submissionTime >= WORK_STEALING_TIME_RESOLUTION_NS || victim.bufferSize > QUEUE_SIZE_OFFLOAD_THRESHOLD } + ?: return@repeat + stolen = true + offer(task, globalQueue) + } + + return stolen + } + + /** + * Offloads half of the current buffer to [target] + */ + private fun offloadWork(target: GlobalQueue) { + repeat((bufferSize / 2).coerceAtLeast(1)) { + val task = pollExternal() ?: return + target.add(task) } } @@ -90,8 +111,8 @@ internal class WorkQueue { */ private inline fun pollExternal(predicate: (Task) -> Boolean = { true }): Task? { while (true) { - val tailLocal = consumerIndex.get() - if (tailLocal - producerIndex.get() == 0) return null + val tailLocal = consumerIndex.value + if (tailLocal - producerIndex.value == 0) return null val index = tailLocal and MASK val element = buffer[index] ?: continue if (!predicate(element)) { @@ -105,25 +126,32 @@ internal class WorkQueue { } } - // Called only by owner - private fun addLast(task: Task, globalQueue: GlobalQueue) { + // Called only by the owner + private fun addLast(task: Task, globalQueue: GlobalQueue): Boolean { + var addedToGlobalQueue = false + + /* + * We need the loop here because race possible not only on full queue, + * but also on queue with one element during stealing + */ while (!tryAddLast(task)) { - offloadWork(false) { - globalQueue.add(it) - } + offloadWork(globalQueue) + addedToGlobalQueue = true } + + return !addedToGlobalQueue } - // Called only by owner + // Called only by the owner private fun tryAddLast(task: Task): Boolean { if (bufferSize == BUFFER_CAPACITY - 1) return false - val headLocal = producerIndex.get() + val headLocal = producerIndex.value val nextIndex = headLocal and MASK /* - * If current element is not null then we're racing with consumers for tail. If we skip this check then - * consumer can null out current element and it will be lost. If we're racing for tail then - * queue is close to overflow => it's fine to offload work to global queue + * If current element is not null then we're racing with consumers for the tail. If we skip this check then + * the consumer can null out current element and it will be lost. If we're racing for tail then + * the queue is close to overflowing => it's fine to offload work to global queue */ if (buffer[nextIndex] != null) { return false diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineSchedulerStressTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineSchedulerStressTest.kt index c498ba77a7..8d907d5b25 100644 --- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineSchedulerStressTest.kt +++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineSchedulerStressTest.kt @@ -1,12 +1,12 @@ package kotlinx.coroutines.experimental.scheduling -import kotlinx.coroutines.experimental.TestBase +import kotlinx.coroutines.experimental.* import org.junit.After import org.junit.Test import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.AtomicInteger -import kotlin.coroutines.experimental.CoroutineContext +import java.util.concurrent.atomic.AtomicLong import kotlin.coroutines.experimental.EmptyCoroutineContext import kotlin.test.assertEquals @@ -16,6 +16,7 @@ class CoroutineSchedulerStressTest : TestBase() { private val observedThreads = ConcurrentHashMap>() private val tasksNum = 1_000_000 private val processed = AtomicInteger(0) + private val finishLatch = CountDownLatch(1) @After fun tearDown() { @@ -23,36 +24,93 @@ class CoroutineSchedulerStressTest : TestBase() { } @Test - fun submitTasks() { - stressTest(ForkedMarker) + fun testExternalTasksSubmission() { + stressTest(CommonPool) } @Test - fun submitTasksForked() { - stressTest(EmptyCoroutineContext) + fun testInternalTasksSubmission() { + stressTest(dispatcher) } - private fun stressTest(ctx: CoroutineContext) { - val finishLatch = CountDownLatch(1) + @Test + fun testStealingFromBlocking() { + /* + * Work-stealing stress test, + * one thread submits pack of tasks, waits until they are completed (to avoid work offloading) + * and then repeats, thus never executing its own tasks and relying only on work stealing. + */ + var blockingThread: Thread? = null + dispatcher.dispatch(EmptyCoroutineContext, Runnable { + // Submit million tasks + blockingThread = Thread.currentThread() + var submittedTasks = 0 + val processedCounter = AtomicLong(0) + while (submittedTasks <= tasksNum) { + for (i in 1..120) { + if (++submittedTasks > tasksNum) { + // Block current thread + finishLatch.await() + return@Runnable + } - for (i in 1..tasksNum) { - dispatcher.dispatch(ctx, Runnable { - var numbers = observedThreads[Thread.currentThread()] - if (numbers == null) { - numbers = hashSetOf() - observedThreads[Thread.currentThread()] = numbers + val defensiveCopy = submittedTasks + dispatcher.dispatch(EmptyCoroutineContext, Runnable { + processTask(defensiveCopy) + processedCounter.incrementAndGet() + }) } - require(numbers.add(i)) - if (processed.incrementAndGet() == tasksNum) { - finishLatch.countDown() + while (processedCounter.get() < 100) { + Thread.yield() } - }) - } + processedCounter.set(0L) + } + }) + + finishLatch.await() + + require(blockingThread!! !in observedThreads) + require(observedThreads.size == Runtime.getRuntime().availableProcessors() - 1) + validateResults() + } + + private fun stressTest(submissionInitiator: CoroutineDispatcher) { + /* + * Run 1 million tasks and validate that + * 1) All of them are completed successfully + * 2) Every thread executed task at least once + */ + submissionInitiator.dispatch(EmptyCoroutineContext, Runnable { + for (i in 1..tasksNum) { + dispatcher.dispatch(EmptyCoroutineContext, Runnable { + processTask(i) + }) + } + }) finishLatch.await() assertEquals(Runtime.getRuntime().availableProcessors(), observedThreads.size) + validateResults() + } + + private fun processTask(i: Int) { + var numbers = observedThreads[Thread.currentThread()] + if (numbers == null) { + numbers = hashSetOf() + observedThreads[Thread.currentThread()] = numbers + } + + require(numbers.add(i)) + + if (processed.incrementAndGet() == tasksNum) { + finishLatch.countDown() + } + } + + private fun validateResults() { val result = observedThreads.values.flatMap { it }.toSet() assertEquals((1..tasksNum).toSet(), result) } + } diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineSchedulerTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineSchedulerTest.kt index d8dad31c8a..f7289f3350 100644 --- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineSchedulerTest.kt +++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/CoroutineSchedulerTest.kt @@ -116,4 +116,32 @@ class CoroutineSchedulerTest : TestBase() { finish(4) } } + + @Test + fun testRngUniformDistribution() { + CoroutineScheduler(1).use { scheduler -> + val worker = scheduler.PoolWorker(1) + testUniformDistribution(worker, 2) + testUniformDistribution(worker, 4) + testUniformDistribution(worker, 8) + testUniformDistribution(worker, 12) + testUniformDistribution(worker, 16) + } + } + + private fun testUniformDistribution(worker: CoroutineScheduler.PoolWorker, bound: Int) { + val result = IntArray(bound) + val iterations = 10_000_000 + repeat(iterations) { + ++result[worker.nextInt(bound)] + } + + val bucketSize = iterations / bound + for (i in result) { + val ratio = i.toDouble() / bucketSize + // 10% deviation + check(ratio <= 1.1) + check(ratio >= 0.9) + } + } } diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/TestTimeSource.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/TestTimeSource.kt index 74f2a67ebc..c8956ac23c 100644 --- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/TestTimeSource.kt +++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/TestTimeSource.kt @@ -4,7 +4,7 @@ internal class TestTimeSource(var time: Long) : TimeSource() { override fun nanoTime() = time - fun step(delta: Long = WORK_STEALING_TIME_RESOLUTION) { + fun step(delta: Long = WORK_STEALING_TIME_RESOLUTION_NS) { time += delta } } diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/WorkQueueStressTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/WorkQueueStressTest.kt index 4ba5cc3da9..07f594eaed 100644 --- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/WorkQueueStressTest.kt +++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/WorkQueueStressTest.kt @@ -54,11 +54,11 @@ class WorkQueueStressTest : TestBase() { val myQueue = WorkQueue() startLatch.await() while (!producerFinished || producerQueue.bufferSize != 0) { - producerQueue.offloadWork(true, { myQueue.offer(it, stolenTasks[i]) }) + myQueue.trySteal(producerQueue, stolenTasks[i]) } // Drain last element which is not counted in buffer - producerQueue.offloadWork(true, { myQueue.offer(it, stolenTasks[i]) }) + myQueue.trySteal(producerQueue, stolenTasks[i]) stolenTasks[i].addAll(myQueue.drain().map { task(it) }) } } @@ -88,11 +88,10 @@ class WorkQueueStressTest : TestBase() { threads += thread(name = "stealer") { val myQueue = WorkQueue() startLatch.await() - var consumed = 0 - while (consumed != offerIterations) { - producerQueue.offloadWork(true, { - ++consumed - myQueue.offer(it, stolen) }) + while (stolen.size != offerIterations) { + if (!myQueue.trySteal(producerQueue, stolen)) { + stolen.addAll(myQueue.drain().map { task(it) }) + } } stolen.addAll(myQueue.drain().map { task(it) }) } diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/WorkQueueTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/WorkQueueTest.kt index 31d5d7e525..a288436217 100644 --- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/WorkQueueTest.kt +++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/scheduling/WorkQueueTest.kt @@ -8,7 +8,6 @@ import org.junit.Test import java.util.* import kotlin.test.assertEquals - class WorkQueueTest : TestBase() { private val timeSource = TestTimeSource(0) @@ -16,7 +15,6 @@ class WorkQueueTest : TestBase() { @Before fun setUp() { schedulerTimeSource = timeSource - } @After @@ -45,40 +43,76 @@ class WorkQueueTest : TestBase() { } @Test - fun testTimelyWorkOffload() { + fun testWorkOffloadPrecision() { val queue = WorkQueue() val globalQueue = ArrayDeque() + repeat(128) { require(queue.offer(task(0), globalQueue)) } + require(globalQueue.isEmpty()) + require(!queue.offer(task(0), globalQueue)) + require(globalQueue.size == 63) + } + + @Test + fun testTimelyStealing() { + val victim = WorkQueue() + val globalQueue = ArrayDeque() - (1L..128L).forEach { queue.offer(task(it), globalQueue) } + (1L..96L).forEach { victim.offer(task(it), globalQueue) } timeSource.step() timeSource.step(2) val stealer = WorkQueue() - queue.offloadWork(true, { stealer.offer(it, globalQueue) }) + require(stealer.trySteal(victim, globalQueue)) assertEquals(arrayListOf(2L, 1L), stealer.drain()) - queue.offloadWork(true, { stealer.offer(it, globalQueue) }) + require(!stealer.trySteal(victim, globalQueue)) assertEquals(emptyList(), stealer.drain()) timeSource.step(3) - queue.offloadWork(true, { stealer.offer(it, globalQueue) }) + require(stealer.trySteal(victim, globalQueue)) assertEquals(arrayListOf(5L, 3L, 4L), stealer.drain()) + require(globalQueue.isEmpty()) + } + + @Test + fun testStealingBySize() { + val victim = WorkQueue() + val globalQueue = ArrayDeque() + + (1L..110L).forEach { victim.offer(task(it), globalQueue) } + val stealer = WorkQueue() + require(stealer.trySteal(victim, globalQueue)) + assertEquals((1L..13L).toSet(), stealer.drain().toSet()) + + require(!stealer.trySteal(victim, globalQueue)) + require(stealer.drain().isEmpty()) + + + timeSource.step() + timeSource.step(13) + require(!stealer.trySteal(victim, globalQueue)) + require(stealer.drain().isEmpty()) + + timeSource.step(1) + require(stealer.trySteal(victim, globalQueue)) + assertEquals(arrayListOf(14L), stealer.drain()) + } @Test fun testStealingFromHead() { - val queue = WorkQueue() + val victim = WorkQueue() val globalQueue = ArrayDeque() - (1L..2L).forEach { queue.offer(task(it), globalQueue) } + (1L..2L).forEach { victim.offer(task(it), globalQueue) } timeSource.step() timeSource.step(3) val stealer = WorkQueue() - queue.offloadWork(true, { stealer.offer(it, globalQueue) }) + require(stealer.trySteal(victim, globalQueue)) assertEquals(arrayListOf(1L), stealer.drain()) - queue.offloadWork(true, { stealer.offer(it, globalQueue) }) + require(stealer.trySteal(victim, globalQueue)) assertEquals(arrayListOf(2L), stealer.drain()) } }