From b286646ad206f0729ce00bac416b5841096fecd3 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy <52952525+dkhalanskyjb@users.noreply.github.com> Date: Mon, 12 Aug 2024 12:54:09 +0200 Subject: [PATCH] Remove various obsolete code (#4196) Removed some code that's no longer relevant: * Backward compatibility for Ktor 1.0.0, * Some code that was more complex than needed to support that, * A JDK 6 workaround, * Other small things. --- .../api/kotlinx-coroutines-core.api | 17 -- .../common/src/SchedulerTask.common.kt | 21 +- .../common/src/internal/DispatchedTask.kt | 16 +- .../src/internal/LocalAtomics.common.kt | 6 +- .../jsAndWasmShared/src/SchedulerTask.kt | 10 - kotlinx-coroutines-core/jvm/src/Executors.kt | 11 +- .../jvm/src/SchedulerTask.kt | 9 - .../jvm/src/flow/internal/FlowExceptions.kt | 1 - .../jvm/src/internal/Concurrent.kt | 19 -- .../jvm/src/internal/FastServiceLoader.kt | 8 +- .../jvm/src/scheduling/CoroutineScheduler.kt | 60 ++--- .../jvm/src/scheduling/Deprecated.kt | 208 ------------------ .../jvm/src/scheduling/Tasks.kt | 46 ++-- .../jvm/test/MutexCancellationStressTest.kt | 4 +- .../jvm/test/RejectedExecutionTest.kt | 4 +- .../test/scheduling/CoroutineSchedulerTest.kt | 16 +- .../jvm/test/scheduling/WorkQueueTest.kt | 4 +- .../native/src/SchedulerTask.kt | 9 - 18 files changed, 81 insertions(+), 388 deletions(-) delete mode 100644 kotlinx-coroutines-core/jvm/src/scheduling/Deprecated.kt diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index af4c0baa84..1f5b05a59d 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -1258,23 +1258,6 @@ public final class kotlinx/coroutines/intrinsics/CancellableKt { public static final fun startCoroutineCancellable (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)V } -public class kotlinx/coroutines/scheduling/ExperimentalCoroutineDispatcher : kotlinx/coroutines/ExecutorCoroutineDispatcher { - public synthetic fun (II)V - public synthetic fun (IIILkotlin/jvm/internal/DefaultConstructorMarker;)V - public fun (IIJLjava/lang/String;)V - public synthetic fun (IIJLjava/lang/String;ILkotlin/jvm/internal/DefaultConstructorMarker;)V - public fun (IILjava/lang/String;)V - public synthetic fun (IILjava/lang/String;ILkotlin/jvm/internal/DefaultConstructorMarker;)V - public final fun blocking (I)Lkotlinx/coroutines/CoroutineDispatcher; - public static synthetic fun blocking$default (Lkotlinx/coroutines/scheduling/ExperimentalCoroutineDispatcher;IILjava/lang/Object;)Lkotlinx/coroutines/CoroutineDispatcher; - public fun close ()V - public fun dispatch (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V - public fun dispatchYield (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V - public fun getExecutor ()Ljava/util/concurrent/Executor; - public final fun limited (I)Lkotlinx/coroutines/CoroutineDispatcher; - public fun toString ()Ljava/lang/String; -} - public final class kotlinx/coroutines/selects/OnTimeoutKt { public static final fun onTimeout (Lkotlinx/coroutines/selects/SelectBuilder;JLkotlin/jvm/functions/Function1;)V public static final fun onTimeout-8Mi8wO0 (Lkotlinx/coroutines/selects/SelectBuilder;JLkotlin/jvm/functions/Function1;)V diff --git a/kotlinx-coroutines-core/common/src/SchedulerTask.common.kt b/kotlinx-coroutines-core/common/src/SchedulerTask.common.kt index 3eea46c4a7..e950dcb538 100644 --- a/kotlinx-coroutines-core/common/src/SchedulerTask.common.kt +++ b/kotlinx-coroutines-core/common/src/SchedulerTask.common.kt @@ -1,11 +1,16 @@ package kotlinx.coroutines +/** + * A [Runnable] that's especially optimized for running in [Dispatchers.Default] on the JVM. + * + * Replacing a [SchedulerTask] with a [Runnable] should not lead to any change in observable behavior. + * + * An arbitrary [Runnable], once it is dispatched by [Dispatchers.Default], gets wrapped into a class that + * stores the submission time, the execution context, etc. + * For [Runnable] instances that we know are only going to be executed in dispatch procedures, we can avoid the + * overhead of separately allocating a wrapper, and instead have the [Runnable] contain the required fields + * on construction. + * + * When running outside the standard dispatchers, these new fields are just dead weight. + */ internal expect abstract class SchedulerTask internal constructor() : Runnable - -internal expect interface SchedulerTaskContext - -@Suppress("EXTENSION_SHADOWED_BY_MEMBER") -internal expect val SchedulerTask.taskContext: SchedulerTaskContext - -@Suppress("EXTENSION_SHADOWED_BY_MEMBER") -internal expect inline fun SchedulerTaskContext.afterTask() diff --git a/kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt b/kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt index e892cd9743..309685bb7c 100644 --- a/kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt +++ b/kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt @@ -76,7 +76,6 @@ internal abstract class DispatchedTask internal constructor( final override fun run() { assert { resumeMode != MODE_UNINITIALIZED } // should have been set before dispatching - val taskContext = this.taskContext var fatalException: Throwable? = null try { val delegate = delegate as DispatchedContinuation @@ -107,8 +106,7 @@ internal abstract class DispatchedTask internal constructor( // This instead of runCatching to have nicer stacktrace and debug experience fatalException = e } finally { - val result = runCatching { taskContext.afterTask() } - handleFatalException(fatalException, result.exceptionOrNull()) + fatalException?.let { handleFatalException(it) } } } @@ -130,15 +128,9 @@ internal abstract class DispatchedTask internal constructor( * Fatal exception handling can be intercepted with [CoroutineExceptionHandler] element in the context of * a failed coroutine, but such exceptions should be reported anyway. */ - internal fun handleFatalException(exception: Throwable?, finallyException: Throwable?) { - if (exception === null && finallyException === null) return - if (exception !== null && finallyException !== null) { - exception.addSuppressed(finallyException) - } - - val cause = exception ?: finallyException + internal fun handleFatalException(exception: Throwable) { val reason = CoroutinesInternalError("Fatal exception in coroutines machinery for $this. " + - "Please read KDoc to 'handleFatalException' method and report this incident to maintainers", cause!!) + "Please read KDoc to 'handleFatalException' method and report this incident to maintainers", exception) handleCoroutineException(this.delegate.context, reason) } } @@ -203,7 +195,7 @@ internal inline fun DispatchedTask<*>.runUnconfinedEventLoop( * This exception doesn't happen normally, only if we have a bug in implementation. * Report it as a fatal exception. */ - handleFatalException(e, null) + handleFatalException(e) } finally { eventLoop.decrementUseCount(unconfined = true) } diff --git a/kotlinx-coroutines-core/common/src/internal/LocalAtomics.common.kt b/kotlinx-coroutines-core/common/src/internal/LocalAtomics.common.kt index 7124f74a49..aea07bed0b 100644 --- a/kotlinx-coroutines-core/common/src/internal/LocalAtomics.common.kt +++ b/kotlinx-coroutines-core/common/src/internal/LocalAtomics.common.kt @@ -5,13 +5,11 @@ package kotlinx.coroutines.internal * where atomicfu doesn't support its tranformations. * * Have `Local` prefix to avoid AFU clashes during star-imports + * + * TODO: remove after https://youtrack.jetbrains.com/issue/KT-62423/ */ internal expect class LocalAtomicInt(value: Int) { fun get(): Int fun set(value: Int) fun decrementAndGet(): Int } - -internal inline var LocalAtomicInt.value - get() = get() - set(value) = set(value) diff --git a/kotlinx-coroutines-core/jsAndWasmShared/src/SchedulerTask.kt b/kotlinx-coroutines-core/jsAndWasmShared/src/SchedulerTask.kt index 111a9fc5e1..24b2311268 100644 --- a/kotlinx-coroutines-core/jsAndWasmShared/src/SchedulerTask.kt +++ b/kotlinx-coroutines-core/jsAndWasmShared/src/SchedulerTask.kt @@ -1,13 +1,3 @@ package kotlinx.coroutines internal actual abstract class SchedulerTask : Runnable - -internal actual interface SchedulerTaskContext { } - -private object TaskContext: SchedulerTaskContext { } - -internal actual val SchedulerTask.taskContext: SchedulerTaskContext get() = TaskContext - -@Suppress("NOTHING_TO_INLINE") -internal actual inline fun SchedulerTaskContext.afterTask() {} - diff --git a/kotlinx-coroutines-core/jvm/src/Executors.kt b/kotlinx-coroutines-core/jvm/src/Executors.kt index 16232bfa7c..8ba3f18a24 100644 --- a/kotlinx-coroutines-core/jvm/src/Executors.kt +++ b/kotlinx-coroutines-core/jvm/src/Executors.kt @@ -117,13 +117,12 @@ private class DispatcherExecutor(@JvmField val dispatcher: CoroutineDispatcher) internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcher(), Delay { - /* - * Attempts to reflectively (to be Java 6 compatible) invoke - * ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy in order to cleanup - * internal scheduler queue on cancellation. - */ init { - removeFutureOnCancel(executor) + /* Attempt to invoke ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy in order to clean up + * the internal scheduler queue on cancellation. */ + if (executor is ScheduledThreadPoolExecutor) { + executor.removeOnCancelPolicy = true + } } override fun dispatch(context: CoroutineContext, block: Runnable) { diff --git a/kotlinx-coroutines-core/jvm/src/SchedulerTask.kt b/kotlinx-coroutines-core/jvm/src/SchedulerTask.kt index 5f81dd33d3..ca1ab87f68 100644 --- a/kotlinx-coroutines-core/jvm/src/SchedulerTask.kt +++ b/kotlinx-coroutines-core/jvm/src/SchedulerTask.kt @@ -3,12 +3,3 @@ package kotlinx.coroutines import kotlinx.coroutines.scheduling.* internal actual typealias SchedulerTask = Task - -internal actual typealias SchedulerTaskContext = TaskContext - -@Suppress("EXTENSION_SHADOWED_BY_MEMBER") -internal actual val SchedulerTask.taskContext: SchedulerTaskContext get() = taskContext - -@Suppress("NOTHING_TO_INLINE", "EXTENSION_SHADOWED_BY_MEMBER") -internal actual inline fun SchedulerTaskContext.afterTask() = - afterTask() diff --git a/kotlinx-coroutines-core/jvm/src/flow/internal/FlowExceptions.kt b/kotlinx-coroutines-core/jvm/src/flow/internal/FlowExceptions.kt index e6f3453359..d8c46d2a76 100644 --- a/kotlinx-coroutines-core/jvm/src/flow/internal/FlowExceptions.kt +++ b/kotlinx-coroutines-core/jvm/src/flow/internal/FlowExceptions.kt @@ -1,7 +1,6 @@ package kotlinx.coroutines.flow.internal import kotlinx.coroutines.* -import kotlinx.coroutines.flow.* internal actual class AbortFlowException actual constructor( @JvmField @Transient actual val owner: Any diff --git a/kotlinx-coroutines-core/jvm/src/internal/Concurrent.kt b/kotlinx-coroutines-core/jvm/src/internal/Concurrent.kt index d9a9fe2471..0a931f5f5d 100644 --- a/kotlinx-coroutines-core/jvm/src/internal/Concurrent.kt +++ b/kotlinx-coroutines-core/jvm/src/internal/Concurrent.kt @@ -1,8 +1,6 @@ package kotlinx.coroutines.internal -import java.lang.reflect.* import java.util.* -import java.util.concurrent.* import kotlin.concurrent.withLock as withLockJvm @Suppress("ACTUAL_WITHOUT_EXPECT") @@ -22,20 +20,3 @@ internal actual annotation class BenignDataRace() @Suppress("NOTHING_TO_INLINE") // So that R8 can completely remove ConcurrentKt class internal actual inline fun identitySet(expectedSize: Int): MutableSet = Collections.newSetFromMap(IdentityHashMap(expectedSize)) - -private val REMOVE_FUTURE_ON_CANCEL: Method? = try { - ScheduledThreadPoolExecutor::class.java.getMethod("setRemoveOnCancelPolicy", Boolean::class.java) -} catch (e: Throwable) { - null -} - -@Suppress("NAME_SHADOWING") -internal fun removeFutureOnCancel(executor: Executor): Boolean { - try { - val executor = executor as? ScheduledThreadPoolExecutor ?: return false - (REMOVE_FUTURE_ON_CANCEL ?: return false).invoke(executor, true) - return true - } catch (e: Throwable) { - return false // failed to setRemoveOnCancelPolicy, assume it does not removes future on cancel - } -} diff --git a/kotlinx-coroutines-core/jvm/src/internal/FastServiceLoader.kt b/kotlinx-coroutines-core/jvm/src/internal/FastServiceLoader.kt index 7c83d06820..eb2c4869ae 100644 --- a/kotlinx-coroutines-core/jvm/src/internal/FastServiceLoader.kt +++ b/kotlinx-coroutines-core/jvm/src/internal/FastServiceLoader.kt @@ -1,11 +1,11 @@ package kotlinx.coroutines.internal +import kotlinx.coroutines.CoroutineExceptionHandler import java.io.* import java.net.* import java.util.* import java.util.jar.* import java.util.zip.* -import kotlin.collections.ArrayList /** * Don't use JvmField here to enable R8 optimizations via "assumenosideeffects" @@ -68,7 +68,7 @@ internal object FastServiceLoader { // Also search for test-module factory createInstanceOf(clz, "kotlinx.coroutines.test.internal.TestMainDispatcherFactory")?.apply { result.add(this) } result - } catch (e: Throwable) { + } catch (_: Throwable) { // Fallback to the regular SL in case of any unexpected exception load(clz, clz.classLoader) } @@ -85,7 +85,7 @@ internal object FastServiceLoader { return try { val clz = Class.forName(serviceClass, true, baseClass.classLoader) baseClass.cast(clz.getDeclaredConstructor().newInstance()) - } catch (e: ClassNotFoundException) { // Do not fail if TestMainDispatcherFactory is not found + } catch (_: ClassNotFoundException) { // Do not fail if TestMainDispatcherFactory is not found null } } @@ -93,7 +93,7 @@ internal object FastServiceLoader { private fun load(service: Class, loader: ClassLoader): List { return try { loadProviders(service, loader) - } catch (e: Throwable) { + } catch (_: Throwable) { // Fallback to default service loader ServiceLoader.load(service, loader).toList() } diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt index c07b7ab4f1..3c22116b66 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt @@ -73,7 +73,7 @@ import kotlin.math.* * * ### Support for blocking tasks * - * The scheduler also supports the notion of [blocking][TASK_PROBABLY_BLOCKING] tasks. + * The scheduler also supports the notion of [blocking][Task.isBlocking] tasks. * When executing or enqueuing blocking tasks, the scheduler notifies or creates an additional worker in * addition to the core pool size, so at any given moment, it has [corePoolSize] threads (potentially not yet created) * available to serve CPU-bound tasks. To properly guarantee liveness, the scheduler maintains @@ -425,7 +425,7 @@ internal class CoroutineScheduler( block.taskContext = taskContext return block } - return TaskImpl(block, nanoTime, taskContext) + return block.asTask(nanoTime, taskContext) } // NB: should only be called from 'dispatch' method due to blocking tasks increment @@ -514,7 +514,7 @@ internal class CoroutineScheduler( */ if (state === WorkerState.TERMINATED) return task // Do not add CPU tasks in local queue if we are not able to execute it - if (task.mode == TASK_NON_BLOCKING && state === WorkerState.BLOCKING) { + if (!task.isBlocking && state === WorkerState.BLOCKING) { return task } mayHaveLocalTasks = true @@ -810,29 +810,26 @@ internal class CoroutineScheduler( private fun inStack(): Boolean = nextParkedWorker !== NOT_IN_STACK private fun executeTask(task: Task) { - val taskMode = task.mode - idleReset(taskMode) - beforeTask(taskMode) - runSafely(task) - afterTask(taskMode) - } - - private fun beforeTask(taskMode: Int) { - if (taskMode == TASK_NON_BLOCKING) return - // Always notify about new work when releasing CPU-permit to execute some blocking task - if (tryReleaseCpu(WorkerState.BLOCKING)) { - signalCpuWork() + terminationDeadline = 0L // reset deadline for termination + if (state == WorkerState.PARKING) { + assert { task.isBlocking } + state = WorkerState.BLOCKING } - } - - private fun afterTask(taskMode: Int) { - if (taskMode == TASK_NON_BLOCKING) return - decrementBlockingTasks() - val currentState = state - // Shutdown sequence of blocking dispatcher - if (currentState !== WorkerState.TERMINATED) { - assert { currentState == WorkerState.BLOCKING } // "Expected BLOCKING state, but has $currentState" - state = WorkerState.DORMANT + if (task.isBlocking) { + // Always notify about new work when releasing CPU-permit to execute some blocking task + if (tryReleaseCpu(WorkerState.BLOCKING)) { + signalCpuWork() + } + runSafely(task) + decrementBlockingTasks() + val currentState = state + // Shutdown sequence of blocking dispatcher + if (currentState !== WorkerState.TERMINATED) { + assert { currentState == WorkerState.BLOCKING } // "Expected BLOCKING state, but has $currentState" + state = WorkerState.DORMANT + } + } else { + runSafely(task) } } @@ -923,15 +920,6 @@ internal class CoroutineScheduler( state = WorkerState.TERMINATED } - // It is invoked by this worker when it finds a task - private fun idleReset(mode: Int) { - terminationDeadline = 0L // reset deadline for termination - if (state == WorkerState.PARKING) { - assert { mode == TASK_PROBABLY_BLOCKING } - state = WorkerState.BLOCKING - } - } - fun findTask(mayHaveLocalTasks: Boolean): Task? { if (tryAcquireCpuPermit()) return findAnyTask(mayHaveLocalTasks) /* @@ -1013,12 +1001,12 @@ internal class CoroutineScheduler( enum class WorkerState { /** - * Has CPU token and either executes [TASK_NON_BLOCKING] task or tries to find one. + * Has CPU token and either executes a [Task.isBlocking]` == false` task or tries to find one. */ CPU_ACQUIRED, /** - * Executing task with [TASK_PROBABLY_BLOCKING]. + * Executing task with [Task.isBlocking]. */ BLOCKING, diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/Deprecated.kt b/kotlinx-coroutines-core/jvm/src/scheduling/Deprecated.kt deleted file mode 100644 index 2fd3173543..0000000000 --- a/kotlinx-coroutines-core/jvm/src/scheduling/Deprecated.kt +++ /dev/null @@ -1,208 +0,0 @@ -@file:Suppress("unused") - -package kotlinx.coroutines.scheduling -import kotlinx.atomicfu.* -import kotlinx.coroutines.* -import java.util.concurrent.* -import kotlin.coroutines.* - -/** - * This API was "public @InternalApi" and leaked into Ktor enabled-by-default sources. - * Since then, we refactored scheduler sources and its API and decided to get rid of it in - * its current shape. - * - * To preserve backwards compatibility with Ktor 1.x, previous version of the code is - * extracted here as is and isolated from the rest of code base, so R8 can get rid of it. - * - * It should be removed after Ktor 3.0.0 (EOL of Ktor 1.x) around 2022. - */ -@PublishedApi -internal open class ExperimentalCoroutineDispatcher( - private val corePoolSize: Int, - private val maxPoolSize: Int, - private val idleWorkerKeepAliveNs: Long, - private val schedulerName: String = "CoroutineScheduler" -) : ExecutorCoroutineDispatcher() { - public constructor( - corePoolSize: Int = CORE_POOL_SIZE, - maxPoolSize: Int = MAX_POOL_SIZE, - schedulerName: String = DEFAULT_SCHEDULER_NAME - ) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, schedulerName) - - @Deprecated(message = "Binary compatibility for Ktor 1.0-beta", level = DeprecationLevel.HIDDEN) - public constructor( - corePoolSize: Int = CORE_POOL_SIZE, - maxPoolSize: Int = MAX_POOL_SIZE - ) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS) - - override val executor: Executor - get() = coroutineScheduler - - // This is variable for test purposes, so that we can reinitialize from clean state - private var coroutineScheduler = createScheduler() - - override fun dispatch(context: CoroutineContext, block: Runnable): Unit = - try { - coroutineScheduler.dispatch(block) - } catch (e: RejectedExecutionException) { - // CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved - // for testing purposes, so we don't have to worry about cancelling the affected Job here. - DefaultExecutor.dispatch(context, block) - } - - override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = - try { - coroutineScheduler.dispatch(block, tailDispatch = true) - } catch (e: RejectedExecutionException) { - // CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved - // for testing purposes, so we don't have to worry about cancelling the affected Job here. - DefaultExecutor.dispatchYield(context, block) - } - - override fun close(): Unit = coroutineScheduler.close() - - override fun toString(): String { - return "${super.toString()}[scheduler = $coroutineScheduler]" - } - - /** - * Creates a coroutine execution context with limited parallelism to execute tasks which may potentially block. - * Resulting [CoroutineDispatcher] doesn't own any resources (its threads) and provides a view of the original [ExperimentalCoroutineDispatcher], - * giving it additional hints to adjust its behaviour. - * - * @param parallelism parallelism level, indicating how many threads can execute tasks in the resulting dispatcher parallel. - */ - fun blocking(parallelism: Int = 16): CoroutineDispatcher { - require(parallelism > 0) { "Expected positive parallelism level, but have $parallelism" } - return LimitingDispatcher(this, parallelism, null, TASK_PROBABLY_BLOCKING) - } - - /** - * Creates a coroutine execution context with limited parallelism to execute CPU-intensive tasks. - * Resulting [CoroutineDispatcher] doesn't own any resources (its threads) and provides a view of the original [ExperimentalCoroutineDispatcher], - * giving it additional hints to adjust its behaviour. - * - * @param parallelism parallelism level, indicating how many threads can execute tasks in the resulting dispatcher parallel. - */ - fun limited(parallelism: Int): CoroutineDispatcher { - require(parallelism > 0) { "Expected positive parallelism level, but have $parallelism" } - require(parallelism <= corePoolSize) { "Expected parallelism level lesser than core pool size ($corePoolSize), but have $parallelism" } - return LimitingDispatcher(this, parallelism, null, TASK_NON_BLOCKING) - } - - internal fun dispatchWithContext(block: Runnable, context: TaskContext, tailDispatch: Boolean) { - try { - coroutineScheduler.dispatch(block, context, tailDispatch) - } catch (e: RejectedExecutionException) { - // CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved - // for testing purposes, so we don't have to worry about cancelling the affected Job here. - // TaskContext shouldn't be lost here to properly invoke before/after task - DefaultExecutor.enqueue(coroutineScheduler.createTask(block, context)) - } - } - - private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName) -} - -private class LimitingDispatcher( - private val dispatcher: ExperimentalCoroutineDispatcher, - private val parallelism: Int, - private val name: String?, - override val taskMode: Int -) : ExecutorCoroutineDispatcher(), TaskContext, Executor { - - private val queue = ConcurrentLinkedQueue() - private val inFlightTasks = atomic(0) - - override val executor: Executor - get() = this - - override fun execute(command: Runnable) = dispatch(command, false) - - override fun close(): Unit = error("Close cannot be invoked on LimitingBlockingDispatcher") - - override fun dispatch(context: CoroutineContext, block: Runnable) = dispatch(block, false) - - private fun dispatch(block: Runnable, tailDispatch: Boolean) { - var taskToSchedule = block - while (true) { - // Commit in-flight tasks slot - val inFlight = inFlightTasks.incrementAndGet() - - // Fast path, if parallelism limit is not reached, dispatch task and return - if (inFlight <= parallelism) { - dispatcher.dispatchWithContext(taskToSchedule, this, tailDispatch) - return - } - - // Parallelism limit is reached, add task to the queue - queue.add(taskToSchedule) - - /* - * We're not actually scheduled anything, so rollback committed in-flight task slot: - * If the amount of in-flight tasks is still above the limit, do nothing - * If the amount of in-flight tasks is lesser than parallelism, then - * it's a race with a thread which finished the task from the current context, we should resubmit the first task from the queue - * to avoid starvation. - * - * Race example #1 (TN is N-th thread, R is current in-flight tasks number), execution is sequential: - * - * T1: submit task, start execution, R == 1 - * T2: commit slot for next task, R == 2 - * T1: finish T1, R == 1 - * T2: submit next task to local queue, decrement R, R == 0 - * Without retries, task from T2 will be stuck in the local queue - */ - if (inFlightTasks.decrementAndGet() >= parallelism) { - return - } - - taskToSchedule = queue.poll() ?: return - } - } - - override fun dispatchYield(context: CoroutineContext, block: Runnable) { - dispatch(block, tailDispatch = true) - } - - override fun toString(): String { - return name ?: "${super.toString()}[dispatcher = $dispatcher]" - } - - /** - * Tries to dispatch tasks which were blocked due to reaching parallelism limit if there is any. - * - * Implementation note: blocking tasks are scheduled in a fair manner (to local queue tail) to avoid - * non-blocking continuations starvation. - * E.g. for - * ``` - * foo() - * blocking() - * bar() - * ``` - * it's more profitable to execute bar at the end of `blocking` rather than pending blocking task - */ - override fun afterTask() { - var next = queue.poll() - // If we have pending tasks in current blocking context, dispatch first - if (next != null) { - dispatcher.dispatchWithContext(next, this, true) - return - } - inFlightTasks.decrementAndGet() - - /* - * Re-poll again and try to submit task if it's required otherwise tasks may be stuck in the local queue. - * Race example #2 (TN is N-th thread, R is current in-flight tasks number), execution is sequential: - * T1: submit task, start execution, R == 1 - * T2: commit slot for next task, R == 2 - * T1: finish T1, poll queue (it's still empty), R == 2 - * T2: submit next task to the local queue, decrement R, R == 1 - * T1: decrement R, finish. R == 0 - * - * The task from T2 is stuck is the local queue - */ - next = queue.poll() ?: return - dispatch(next, true) - } -} diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/Tasks.kt b/kotlinx-coroutines-core/jvm/src/scheduling/Tasks.kt index b82d31b445..bdf6335d98 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/Tasks.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/Tasks.kt @@ -49,58 +49,48 @@ internal val IDLE_WORKER_KEEP_ALIVE_NS = TimeUnit.SECONDS.toNanos( internal var schedulerTimeSource: SchedulerTimeSource = NanoTimeSource /** - * Marker indicating that task is CPU-bound and will not block + * Concurrency context of a task. + * + * Currently, it only signifies whether the task is blocking or non-blocking. */ -internal const val TASK_NON_BLOCKING = 0 +internal typealias TaskContext = Boolean /** - * Marker indicating that task may potentially block, thus giving scheduler a hint that additional thread may be required + * This would be [TaskContext.toString] if [TaskContext] was a proper class. */ -internal const val TASK_PROBABLY_BLOCKING = 1 - -internal interface TaskContext { - val taskMode: Int // TASK_XXX - fun afterTask() -} +private fun taskContextString(taskContext: TaskContext): String = if (taskContext) "Blocking" else "Non-blocking" -private class TaskContextImpl(override val taskMode: Int): TaskContext { - override fun afterTask() { - // Nothing for non-blocking context - } -} +internal const val NonBlockingContext: TaskContext = false -@JvmField -internal val NonBlockingContext: TaskContext = TaskContextImpl(TASK_NON_BLOCKING) - -@JvmField -internal val BlockingContext: TaskContext = TaskContextImpl(TASK_PROBABLY_BLOCKING) +internal const val BlockingContext: TaskContext = true +/** + * A scheduler task. + */ internal abstract class Task( @JvmField var submissionTime: Long, @JvmField var taskContext: TaskContext ) : Runnable { internal constructor() : this(0, NonBlockingContext) - internal inline val mode: Int get() = taskContext.taskMode // TASK_XXX } -internal inline val Task.isBlocking get() = taskContext.taskMode == TASK_PROBABLY_BLOCKING +internal inline val Task.isBlocking get() = taskContext + +internal fun Runnable.asTask(submissionTime: Long, taskContext: TaskContext): Task = + TaskImpl(this, submissionTime, taskContext) // Non-reusable Task implementation to wrap Runnable instances that do not otherwise implement task -internal class TaskImpl( +private class TaskImpl( @JvmField val block: Runnable, submissionTime: Long, taskContext: TaskContext ) : Task(submissionTime, taskContext) { override fun run() { - try { - block.run() - } finally { - taskContext.afterTask() - } + block.run() } override fun toString(): String = - "Task[${block.classSimpleName}@${block.hexAddress}, $submissionTime, $taskContext]" + "Task[${block.classSimpleName}@${block.hexAddress}, $submissionTime, ${taskContextString(taskContext)}]" } // Open for tests diff --git a/kotlinx-coroutines-core/jvm/test/MutexCancellationStressTest.kt b/kotlinx-coroutines-core/jvm/test/MutexCancellationStressTest.kt index dc096fe716..3528702a05 100644 --- a/kotlinx-coroutines-core/jvm/test/MutexCancellationStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/MutexCancellationStressTest.kt @@ -55,7 +55,7 @@ class MutexCancellationStressTest : TestBase() { delay(500) // If we've caught the completion after delay, then there is a chance no progress were made whatsoever, bail out if (completed.get()) return@launch - val c = counterLocal.map { it.value } + val c = counterLocal.map { it.get() } for (i in 0 until mutexJobNumber) { assert(c[i] > lastCounterLocalSnapshot[i]) { "No progress in MutexJob-$i, last observed state: ${c[i]}" } } @@ -76,7 +76,7 @@ class MutexCancellationStressTest : TestBase() { cancellationJob.join() mutexJobs.forEach { it.join() } checkProgressJob.join() - assertEquals(counter, counterLocal.sumOf { it.value }) + assertEquals(counter, counterLocal.sumOf { it.get() }) dispatcher.close() } } diff --git a/kotlinx-coroutines-core/jvm/test/RejectedExecutionTest.kt b/kotlinx-coroutines-core/jvm/test/RejectedExecutionTest.kt index 770a854acc..cf72b31481 100644 --- a/kotlinx-coroutines-core/jvm/test/RejectedExecutionTest.kt +++ b/kotlinx-coroutines-core/jvm/test/RejectedExecutionTest.kt @@ -87,7 +87,6 @@ class RejectedExecutionTest : TestBase() { @Test fun testRejectOnDelay() = runTest { - if (!removeFutureOnCancel(executor)) return@runTest // Skip this test on old JDKs expect(1) executor.acceptTasks = 1 // accept one task assertFailsWith { @@ -109,7 +108,6 @@ class RejectedExecutionTest : TestBase() { @Test fun testRejectWithTimeout() = runTest { - if (!removeFutureOnCancel(executor)) return@runTest // Skip this test on old JDKs expect(1) executor.acceptTasks = 1 // accept one task assertFailsWith { @@ -165,4 +163,4 @@ class RejectedExecutionTest : TestBase() { if (thread !is CoroutineScheduler.Worker) error("Not a thread from Dispatchers.IO: $thread") assertEquals(CoroutineScheduler.WorkerState.BLOCKING, thread.state) } -} \ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerTest.kt index edb3799639..042ea2f7d8 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerTest.kt @@ -9,16 +9,16 @@ import kotlin.coroutines.* import kotlin.test.* class CoroutineSchedulerTest : TestBase() { - private val taskModes = listOf(TASK_NON_BLOCKING, TASK_PROBABLY_BLOCKING) + private val contexts = listOf(NonBlockingContext, BlockingContext) @Test fun testModesExternalSubmission() { // Smoke CoroutineScheduler(1, 1).use { - for (mode in taskModes) { + for (context in contexts) { val latch = CountDownLatch(1) it.dispatch(Runnable { latch.countDown() - }, TaskContextImpl(mode)) + }, context) latch.await() } @@ -28,12 +28,12 @@ class CoroutineSchedulerTest : TestBase() { @Test fun testModesInternalSubmission() { // Smoke CoroutineScheduler(2, 2).use { - val latch = CountDownLatch(taskModes.size) + val latch = CountDownLatch(contexts.size) it.dispatch(Runnable { - for (mode in taskModes) { + for (context in contexts) { it.dispatch(Runnable { latch.countDown() - }, TaskContextImpl(mode)) + }, context) } }) @@ -164,8 +164,4 @@ class CoroutineSchedulerTest : TestBase() { check(ratio >= 0.9) } } - - private class TaskContextImpl(override val taskMode: Int) : TaskContext { - override fun afterTask() {} - } } diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueTest.kt index e9fbc6beae..08ed5ca113 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueTest.kt @@ -78,8 +78,8 @@ class WorkQueueTest : TestBase() { } } -internal fun task(n: Long) = TaskImpl(Runnable {}, n, NonBlockingContext) -internal fun blockingTask(n: Long) = TaskImpl(Runnable {}, n, BlockingContext) +internal fun task(n: Long) = Runnable {}.asTask(n, NonBlockingContext) +internal fun blockingTask(n: Long) = Runnable {}.asTask(n, BlockingContext) internal fun WorkQueue.drain(ref: ObjectRef): List { var task: Task? = poll() diff --git a/kotlinx-coroutines-core/native/src/SchedulerTask.kt b/kotlinx-coroutines-core/native/src/SchedulerTask.kt index 229302b075..24b2311268 100644 --- a/kotlinx-coroutines-core/native/src/SchedulerTask.kt +++ b/kotlinx-coroutines-core/native/src/SchedulerTask.kt @@ -1,12 +1,3 @@ package kotlinx.coroutines internal actual abstract class SchedulerTask : Runnable - -internal actual interface SchedulerTaskContext { } - -private object TaskContext: SchedulerTaskContext { } - -internal actual val SchedulerTask.taskContext: SchedulerTaskContext get() = TaskContext - -@Suppress("NOTHING_TO_INLINE") -internal actual inline fun SchedulerTaskContext.afterTask() {}