Closed
Description
Hi there,
maybe there is a better way to achieve what I want, but with my current wrapper class around MutableMaps that allows coroutine waiting for key additions will at times get stuck and cause 100% cpu load. I'm using the recently added ConflatedBroadcastChannel class to inform all coroutines waiting for a key to arrive on a put operation.
Actually, I would only need a coroutine compatible wait/notify primitives... hints?
Anyway, when I pause the threads during the lockup, I will get a stacktrace that may look like this one:
<15> ForkJoinPool.commonPool-worker-3"@830,030,709,776 in group "main": RUNNING
getNext:121, LockFreeLinkedListNode {kotlinx.coroutines.experimental.internal}
takeFirstReceiveOrPeekClosed:920, AbstractSendChannel {kotlinx.coroutines.experimental.channels}
offerInternal:54, AbstractSendChannel {kotlinx.coroutines.experimental.channels}
offerInternal:39, ConflatedChannel {kotlinx.coroutines.experimental.channels}
offerInternal:268, ConflatedBroadcastChannel$Subscriber {kotlinx.coroutines.experimental.channels}
offerInternal:239, ConflatedBroadcastChannel {kotlinx.coroutines.experimental.channels}
offer:219, ConflatedBroadcastChannel {kotlinx.coroutines.experimental.channels}
put:11, AwaitableMap {com.example}
Any ideas what's going on here?
package com.example
import kotlinx.coroutines.experimental.channels.ConflatedBroadcastChannel
class AwaitableMap<K : Any, V : Any>(val delegateMap: MutableMap<K, V>) : MutableMap<K, V> by delegateMap {
val channel = ConflatedBroadcastChannel<K>()
override fun put(key: K, value: V): V? {
val putVal = delegateMap.put(key, value)
channel.offer(key)
return putVal
}
suspend fun await(key: K): V {
return delegateMap[key] ?: channel.open().use {
do {
val fetched = delegateMap[key]
if (fetched != null) {
return@use fetched
}
it.receive()
} while (true)
return@use null
}!!
}
}
Metadata
Metadata
Assignees
Labels
No labels