Skip to content

ConflatedBroadcastChannel hangs in offer() #66

Closed
@chrisly42

Description

@chrisly42

Hi there,

maybe there is a better way to achieve what I want, but with my current wrapper class around MutableMaps that allows coroutine waiting for key additions will at times get stuck and cause 100% cpu load. I'm using the recently added ConflatedBroadcastChannel class to inform all coroutines waiting for a key to arrive on a put operation.

Actually, I would only need a coroutine compatible wait/notify primitives... hints?

Anyway, when I pause the threads during the lockup, I will get a stacktrace that may look like this one:

<15> ForkJoinPool.commonPool-worker-3"@830,030,709,776 in group "main": RUNNING
getNext:121, LockFreeLinkedListNode {kotlinx.coroutines.experimental.internal}
takeFirstReceiveOrPeekClosed:920, AbstractSendChannel {kotlinx.coroutines.experimental.channels}
offerInternal:54, AbstractSendChannel {kotlinx.coroutines.experimental.channels}
offerInternal:39, ConflatedChannel {kotlinx.coroutines.experimental.channels}
offerInternal:268, ConflatedBroadcastChannel$Subscriber {kotlinx.coroutines.experimental.channels}
offerInternal:239, ConflatedBroadcastChannel {kotlinx.coroutines.experimental.channels}
offer:219, ConflatedBroadcastChannel {kotlinx.coroutines.experimental.channels}
put:11, AwaitableMap {com.example}

Any ideas what's going on here?

package com.example

import kotlinx.coroutines.experimental.channels.ConflatedBroadcastChannel

class AwaitableMap<K : Any, V : Any>(val delegateMap: MutableMap<K, V>) : MutableMap<K, V> by delegateMap {

    val channel = ConflatedBroadcastChannel<K>()

    override fun put(key: K, value: V): V? {
        val putVal = delegateMap.put(key, value)
        channel.offer(key)
        return putVal
    }

    suspend fun await(key: K): V {
        return delegateMap[key] ?: channel.open().use {
            do {
                val fetched = delegateMap[key]
                if (fetched != null) {
                    return@use fetched
                }
                it.receive()
            } while (true)
            return@use null
        }!!
    }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions