Skip to content

Commit c75c7c9

Browse files
committed
feat: (buffersink) use shared flow
1 parent de0e439 commit c75c7c9

File tree

1 file changed

+5
-5
lines changed

1 file changed

+5
-5
lines changed

src/main/kotlin/dev/silenium/libs/flows/buffer/BufferSink.kt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,18 @@ package dev.silenium.libs.flows.buffer
22

33
import dev.silenium.libs.flows.api.FlowItem
44
import dev.silenium.libs.flows.api.Sink
5-
import kotlinx.coroutines.flow.MutableStateFlow
6-
import kotlinx.coroutines.flow.StateFlow
7-
import kotlinx.coroutines.flow.asStateFlow
5+
import kotlinx.coroutines.flow.MutableSharedFlow
6+
import kotlinx.coroutines.flow.SharedFlow
7+
import kotlinx.coroutines.flow.asSharedFlow
88

99
class BufferSink<T, P>(vararg pads: Pair<UInt, P>) : Sink<T, P> {
1010
private val inputMetadata_: MutableMap<UInt, P?> = pads.toMap().toMutableMap()
1111
override val inputMetadata: Map<UInt, P?> by ::inputMetadata_
1212

1313
private val buffer_: MutableMap<UInt, MutableList<FlowItem<T, P>>> = mutableMapOf()
1414
val buffer: Map<UInt, List<FlowItem<T, P>>> by ::buffer_
15-
private val flow_ = MutableStateFlow<Map<UInt, List<FlowItem<T, P>>>>(emptyMap())
16-
val flow: StateFlow<Map<UInt, List<FlowItem<T, P>>>> = flow_.asStateFlow()
15+
private val flow_ = MutableSharedFlow<Map<UInt, List<FlowItem<T, P>>>>(replay = 1)
16+
val flow: SharedFlow<Map<UInt, List<FlowItem<T, P>>>> = flow_.asSharedFlow()
1717

1818
override suspend fun receive(item: FlowItem<T, P>): Result<Unit> {
1919
buffer_.getOrPut(item.pad, ::mutableListOf).add(item)

0 commit comments

Comments
 (0)