Skip to content

Commit f035ebf

Browse files
committed
feat: allow to pass a pad selector to connect for filtering and mapping source pads to different sink pads
1 parent e413a6f commit f035ebf

File tree

4 files changed

+40
-21
lines changed

4 files changed

+40
-21
lines changed

examples/src/main/kotlin/dev/silenium/libs/flows/examples/Simple.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,8 @@ fun main() = runBlocking {
7373
val processor = transformer(MyTransformer(), "transformer")
7474
val sink = sink(MySink(), "sink")
7575

76-
source.connectTo(processor).getOrThrow()
77-
processor.connectTo(sink).getOrThrow()
76+
connect(source to processor)
77+
connect(processor to sink)
7878
}
7979
graph.source<MySource>("source")!!.impl.run()
8080
graph.close()

src/main/kotlin/dev/silenium/libs/flows/api/Extensions.kt

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,15 @@ interface FlowGraphConfigScope : FlowGraph {
1010
* Creates a connection job in the [kotlinx.coroutines.CoroutineScope] of the [FlowGraph].
1111
* The job is started immediately.
1212
*/
13-
infix fun <T, P> Source<T, P>.connectTo(sink: Sink<T, P>): Result<Job>
13+
fun <T, P> connect(
14+
pair: Pair<Source<T, P>, Sink<T, P>>,
15+
padSelector: (sourceSinkMap: Map<UInt, UInt>, sourcePads: Map<UInt, P>, sourcePad: UInt, metadata: P) -> UInt? = { _, _, pad, _ -> pad },
16+
): Job
1417

1518
/**
1619
* Configures the [FlowGraph].
1720
* Currently, it does:
1821
* - wait for all connection jobs to be started
1922
*/
20-
suspend fun configure(): Result<Unit>
23+
suspend fun configure(): Result<FlowGraph>
2124
}

src/main/kotlin/dev/silenium/libs/flows/impl/FlowGraphImpl.kt

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package dev.silenium.libs.flows.impl
22

33
import dev.silenium.libs.flows.api.*
44
import kotlinx.coroutines.*
5+
import kotlinx.coroutines.flow.map
56
import kotlin.coroutines.CoroutineContext
67
import kotlin.reflect.KClass
78

@@ -98,25 +99,38 @@ internal class FlowGraphImpl(private val coroutineScope: CoroutineScope) :
9899

99100
internal class FlowGraphConfigScopeImpl(private val flowGraph: FlowGraph) : FlowGraphConfigScope,
100101
FlowGraph by flowGraph {
101-
private val connectionStarted = mutableSetOf<Job>()
102-
103-
override fun <T, P> Source<T, P>.connectTo(sink: Sink<T, P>): Result<Job> {
104-
outputMetadata.forEach { (pad, metadata) ->
105-
sink.configure(pad, metadata).onFailure {
106-
return Result.failure(IllegalStateException("Unable to configure input pad $pad of sink $sink", it))
102+
private val configurationJobs = mutableSetOf<Job>()
103+
104+
override fun <T, P> connect(
105+
pair: Pair<Source<T, P>, Sink<T, P>>,
106+
padSelector: (sourceSinkMap: Map<UInt, UInt>, sourcePads: Map<UInt, P>, sourcePad: UInt, metadata: P) -> UInt?,
107+
): Job {
108+
val (source, sink) = pair
109+
val padMap = mutableMapOf<UInt, UInt>()
110+
for ((sourcePad, metadata) in source.outputMetadata) {
111+
val sinkPad = padSelector(padMap, source.outputMetadata, sourcePad, metadata) ?: continue
112+
padMap[sourcePad] = sinkPad
113+
}
114+
padMap.forEach { (sourcePad, sinkPad) ->
115+
val metadata = source.outputMetadata.getValue(sourcePad)
116+
sink.configure(sinkPad, metadata).onFailure {
117+
throw IllegalStateException("Unable to configure $sink:$sinkPad from $source:$sourcePad", it)
107118
}
108119
}
109120
val started = CompletableDeferred<Unit>()
110121
return launch {
111122
started.complete(Unit)
112-
flow.collect(sink)
123+
source.flow
124+
.map { it.copy(pad = padMap.getValue(it.pad)) }
125+
.collect(sink)
113126
}.also {
114-
connectionStarted.add(started)
115-
}.let { Result.success(it) }
127+
configurationJobs.add(started)
128+
}
116129
}
117130

118-
override suspend fun configure(): Result<Unit> = runCatching {
119-
connectionStarted.joinAll()
131+
override suspend fun configure(): Result<FlowGraph> = runCatching {
132+
configurationJobs.joinAll()
133+
flowGraph
120134
}
121135
}
122136

@@ -136,7 +150,7 @@ internal fun FlowGraph.builder() = FlowGraphConfigScopeImpl(this)
136150
suspend fun FlowGraph(
137151
coroutineContext: CoroutineContext = Dispatchers.Default,
138152
block: FlowGraphConfigScope.() -> Unit,
139-
): FlowGraph = FlowGraphImpl(coroutineContext).builder().apply(block).apply { configure() }
153+
): FlowGraph = FlowGraphImpl(coroutineContext).builder().apply(block).configure().getOrThrow()
140154

141155
/**
142156
* Creates a new [FlowGraph] with the given [coroutineScope] and [block] configuration.
@@ -152,4 +166,4 @@ suspend fun FlowGraph(
152166
suspend fun FlowGraph(
153167
coroutineScope: CoroutineScope,
154168
block: FlowGraphConfigScope.() -> Unit,
155-
): FlowGraph = FlowGraphImpl(coroutineScope).builder().apply(block).apply { configure() }
169+
): FlowGraph = FlowGraphImpl(coroutineScope).builder().apply(block).configure().getOrThrow()

src/test/kotlin/dev/silenium/libs/flows/impl/FlowGraphImplTest.kt

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@ class FlowGraphImplTest : FunSpec({
2020
val source = source(BufferSource<Base64Buffer, DataType>(0u to DataType.BASE64), "buffer-source")
2121
val sink = sink(BufferSink<ByteArray, DataType>(), "buffer-sink")
2222
val decoder = transformer(Base64Decoder(), "base64-decoder")
23-
source.connectTo(decoder)
24-
decoder.connectTo(sink)
23+
connect(source to decoder)
24+
connect(decoder to sink) { _, _, sourcePad, _ ->
25+
sourcePad + 1u
26+
}
2527
}
2628
val source = graph.source<BufferSource<Base64Buffer, DataType>>("buffer-source")!!
2729
val sink = graph.sink<BufferSink<ByteArray, DataType>>("buffer-sink")!!
@@ -30,9 +32,9 @@ class FlowGraphImplTest : FunSpec({
3032
val inputBuffer = input.encodeBase64()
3133
source.impl.submit(0u, inputBuffer)
3234
inputBuffer.close()
33-
val result = sink.impl.flow.firstOrNull { 0u in it && it[0u]!!.isNotEmpty() }
35+
val result = sink.impl.flow.firstOrNull { 1u in it && it[1u]!!.isNotEmpty() }
3436

3537
graph.close()
36-
result.shouldNotBeNull()[0u]!!.shouldNotBeEmpty().first().value.decodeToString() shouldBe input
38+
result.shouldNotBeNull()[1u]!!.shouldNotBeEmpty().first().value.decodeToString() shouldBe input
3739
}
3840
})

0 commit comments

Comments
 (0)