Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove various obsolete code #4196

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Simplify the TaskContext definition
  • Loading branch information
dkhalanskyjb committed Aug 12, 2024
commit 378806b1be09a41e2f8cdc21a00abaadd3839623
13 changes: 13 additions & 0 deletions kotlinx-coroutines-core/common/src/SchedulerTask.common.kt
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
package kotlinx.coroutines

/**
* A [Runnable] that's especially optimized for running in [Dispatchers.Default] on the JVM.
*
* Replacing a [SchedulerTask] with a [Runnable] should not lead to any change in observable behavior.
*
* An arbitrary [Runnable], once it is dispatched by [Dispatchers.Default], gets wrapped into a class that
* stores the submission time, the execution context, etc.
* For [Runnable] instances that we know are only going to be executed in dispatch procedures, we can avoid the
* overhead of separately allocating a wrapper, and instead have the [Runnable] contain the required fields
* on construction.
*
* When running outside the standard dispatchers, these new fields are just dead weight.
*/
internal expect abstract class SchedulerTask internal constructor() : Runnable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really appreciate how you drop off improvements and additional commentary when doing the stewardship. Thanks!

60 changes: 24 additions & 36 deletions kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ import kotlin.math.*
*
* ### Support for blocking tasks
*
* The scheduler also supports the notion of [blocking][TASK_PROBABLY_BLOCKING] tasks.
* The scheduler also supports the notion of [blocking][Task.isBlocking] tasks.
* When executing or enqueuing blocking tasks, the scheduler notifies or creates an additional worker in
* addition to the core pool size, so at any given moment, it has [corePoolSize] threads (potentially not yet created)
* available to serve CPU-bound tasks. To properly guarantee liveness, the scheduler maintains
Expand Down Expand Up @@ -425,7 +425,7 @@ internal class CoroutineScheduler(
block.taskContext = taskContext
return block
}
return TaskImpl(block, nanoTime, taskContext)
return block.asTask(nanoTime, taskContext)
}

// NB: should only be called from 'dispatch' method due to blocking tasks increment
Expand Down Expand Up @@ -514,7 +514,7 @@ internal class CoroutineScheduler(
*/
if (state === WorkerState.TERMINATED) return task
// Do not add CPU tasks in local queue if we are not able to execute it
if (task.mode == TASK_NON_BLOCKING && state === WorkerState.BLOCKING) {
if (!task.isBlocking && state === WorkerState.BLOCKING) {
return task
}
mayHaveLocalTasks = true
Expand Down Expand Up @@ -810,29 +810,26 @@ internal class CoroutineScheduler(
private fun inStack(): Boolean = nextParkedWorker !== NOT_IN_STACK

private fun executeTask(task: Task) {
val taskMode = task.mode
idleReset(taskMode)
beforeTask(taskMode)
runSafely(task)
afterTask(taskMode)
}

private fun beforeTask(taskMode: Int) {
if (taskMode == TASK_NON_BLOCKING) return
// Always notify about new work when releasing CPU-permit to execute some blocking task
if (tryReleaseCpu(WorkerState.BLOCKING)) {
signalCpuWork()
terminationDeadline = 0L // reset deadline for termination
if (state == WorkerState.PARKING) {
assert { task.isBlocking }
state = WorkerState.BLOCKING
}
}

private fun afterTask(taskMode: Int) {
if (taskMode == TASK_NON_BLOCKING) return
decrementBlockingTasks()
val currentState = state
// Shutdown sequence of blocking dispatcher
if (currentState !== WorkerState.TERMINATED) {
assert { currentState == WorkerState.BLOCKING } // "Expected BLOCKING state, but has $currentState"
state = WorkerState.DORMANT
if (task.isBlocking) {
// Always notify about new work when releasing CPU-permit to execute some blocking task
if (tryReleaseCpu(WorkerState.BLOCKING)) {
signalCpuWork()
}
runSafely(task)
decrementBlockingTasks()
val currentState = state
// Shutdown sequence of blocking dispatcher
if (currentState !== WorkerState.TERMINATED) {
assert { currentState == WorkerState.BLOCKING } // "Expected BLOCKING state, but has $currentState"
state = WorkerState.DORMANT
}
} else {
runSafely(task)
}
}

Expand Down Expand Up @@ -923,15 +920,6 @@ internal class CoroutineScheduler(
state = WorkerState.TERMINATED
}

// It is invoked by this worker when it finds a task
private fun idleReset(mode: Int) {
terminationDeadline = 0L // reset deadline for termination
if (state == WorkerState.PARKING) {
assert { mode == TASK_PROBABLY_BLOCKING }
state = WorkerState.BLOCKING
}
}

fun findTask(mayHaveLocalTasks: Boolean): Task? {
if (tryAcquireCpuPermit()) return findAnyTask(mayHaveLocalTasks)
/*
Expand Down Expand Up @@ -1013,12 +1001,12 @@ internal class CoroutineScheduler(

enum class WorkerState {
/**
* Has CPU token and either executes [TASK_NON_BLOCKING] task or tries to find one.
* Has CPU token and either executes a [Task.isBlocking]` == false` task or tries to find one.
*/
CPU_ACQUIRED,

/**
* Executing task with [TASK_PROBABLY_BLOCKING].
* Executing task with [Task.isBlocking].
*/
BLOCKING,

Expand Down
35 changes: 17 additions & 18 deletions kotlinx-coroutines-core/jvm/src/scheduling/Tasks.kt
Original file line number Diff line number Diff line change
Expand Up @@ -49,39 +49,38 @@ internal val IDLE_WORKER_KEEP_ALIVE_NS = TimeUnit.SECONDS.toNanos(
internal var schedulerTimeSource: SchedulerTimeSource = NanoTimeSource

/**
* Marker indicating that task is CPU-bound and will not block
* Concurrency context of a task.
*
* Currently, it only signifies whether the task is blocking or non-blocking.
*/
internal const val TASK_NON_BLOCKING = 0
internal typealias TaskContext = Boolean

/**
* Marker indicating that task may potentially block, thus giving scheduler a hint that additional thread may be required
* This would be [TaskContext.toString] if [TaskContext] was a proper class.
*/
internal const val TASK_PROBABLY_BLOCKING = 1
private fun taskContextString(taskContext: TaskContext): String = if (taskContext) "Blocking" else "Non-blocking"

internal interface TaskContext {
val taskMode: Int // TASK_XXX
}
internal const val NonBlockingContext: TaskContext = false

private class TaskContextImpl(override val taskMode: Int): TaskContext

@JvmField
internal val NonBlockingContext: TaskContext = TaskContextImpl(TASK_NON_BLOCKING)

@JvmField
internal val BlockingContext: TaskContext = TaskContextImpl(TASK_PROBABLY_BLOCKING)
internal const val BlockingContext: TaskContext = true

/**
* A scheduler task.
*/
internal abstract class Task(
@JvmField var submissionTime: Long,
@JvmField var taskContext: TaskContext
) : Runnable {
internal constructor() : this(0, NonBlockingContext)
internal inline val mode: Int get() = taskContext.taskMode // TASK_XXX
}

internal inline val Task.isBlocking get() = taskContext.taskMode == TASK_PROBABLY_BLOCKING
internal inline val Task.isBlocking get() = taskContext

internal fun Runnable.asTask(submissionTime: Long, taskContext: TaskContext): Task =
TaskImpl(this, submissionTime, taskContext)

// Non-reusable Task implementation to wrap Runnable instances that do not otherwise implement task
internal class TaskImpl(
private class TaskImpl(
@JvmField val block: Runnable,
submissionTime: Long,
taskContext: TaskContext
Expand All @@ -91,7 +90,7 @@ internal class TaskImpl(
}

override fun toString(): String =
"Task[${block.classSimpleName}@${block.hexAddress}, $submissionTime, $taskContext]"
"Task[${block.classSimpleName}@${block.hexAddress}, $submissionTime, ${taskContextString(taskContext)}]"
}

// Open for tests
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/jvm/test/RejectedExecutionTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -165,4 +165,4 @@ class RejectedExecutionTest : TestBase() {
if (thread !is CoroutineScheduler.Worker) error("Not a thread from Dispatchers.IO: $thread")
assertEquals(CoroutineScheduler.WorkerState.BLOCKING, thread.state)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@ import kotlin.coroutines.*
import kotlin.test.*

class CoroutineSchedulerTest : TestBase() {
private val taskModes = listOf(TASK_NON_BLOCKING, TASK_PROBABLY_BLOCKING)
private val contexts = listOf(NonBlockingContext, BlockingContext)

@Test
fun testModesExternalSubmission() { // Smoke
CoroutineScheduler(1, 1).use {
for (mode in taskModes) {
for (context in contexts) {
val latch = CountDownLatch(1)
it.dispatch(Runnable {
latch.countDown()
}, TaskContextImpl(mode))
}, context)

latch.await()
}
Expand All @@ -28,12 +28,12 @@ class CoroutineSchedulerTest : TestBase() {
@Test
fun testModesInternalSubmission() { // Smoke
CoroutineScheduler(2, 2).use {
val latch = CountDownLatch(taskModes.size)
val latch = CountDownLatch(contexts.size)
it.dispatch(Runnable {
for (mode in taskModes) {
for (context in contexts) {
it.dispatch(Runnable {
latch.countDown()
}, TaskContextImpl(mode))
}, context)
}
})

Expand Down Expand Up @@ -164,6 +164,4 @@ class CoroutineSchedulerTest : TestBase() {
check(ratio >= 0.9)
}
}

private class TaskContextImpl(override val taskMode: Int) : TaskContext
}
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ class WorkQueueTest : TestBase() {
}
}

internal fun task(n: Long) = TaskImpl(Runnable {}, n, NonBlockingContext)
internal fun blockingTask(n: Long) = TaskImpl(Runnable {}, n, BlockingContext)
internal fun task(n: Long) = Runnable {}.asTask(n, NonBlockingContext)
internal fun blockingTask(n: Long) = Runnable {}.asTask(n, BlockingContext)

internal fun WorkQueue.drain(ref: ObjectRef<Task?>): List<Long> {
var task: Task? = poll()
Expand Down