Skip to content

Commit c3cc1bf

Browse files
committed
feat: add JobTransformerBase that uses a coroutine for processing
1 parent f7eaf93 commit c3cc1bf

File tree

3 files changed

+62
-15
lines changed

3 files changed

+62
-15
lines changed
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package dev.silenium.libs.flows.base
2+
3+
import dev.silenium.libs.flows.api.Transformer
4+
import kotlinx.coroutines.CoroutineScope
5+
import kotlinx.coroutines.Job
6+
import kotlinx.coroutines.launch
7+
import kotlinx.coroutines.runBlocking
8+
import java.util.*
9+
10+
abstract class JobTransformerBase<IT, IP, OT, OP>(
11+
protected val coroutineScope: CoroutineScope,
12+
protected val pads: Set<UInt>? = setOf(0u)
13+
) : Transformer<IT, IP, OT, OP>, SourceBase<OT, OP>() {
14+
15+
@OptIn(ExperimentalUnsignedTypes::class)
16+
constructor(coroutineScope: CoroutineScope, vararg inputPads: UInt) : this(coroutineScope, inputPads.toSet())
17+
18+
protected val inputMetadata_ = mutableMapOf<UInt, IP>()
19+
override val inputMetadata: Map<UInt, IP?> = Collections.unmodifiableMap(inputMetadata_)
20+
protected var job: Job? = null
21+
22+
override fun configure(pad: UInt, metadata: IP): Result<Unit> {
23+
inputMetadata_[pad] = metadata
24+
outputMetadata_[pad] = outputMetadata(metadata)
25+
if (pads?.equals(inputMetadata_.keys) == true) {
26+
job = coroutineScope.launch { run() }
27+
}
28+
return Result.success(Unit)
29+
}
30+
31+
override fun close() {
32+
runBlocking { job?.join() }
33+
super.close()
34+
}
35+
36+
abstract fun outputMetadata(inputMetadata: IP): OP
37+
38+
/**
39+
* main processing function
40+
*
41+
* *Note: cancellation must be handled by the implementation
42+
* ([close] only joins the job, it does not cancel it)*
43+
* @see CoroutineScope
44+
*/
45+
protected abstract suspend fun CoroutineScope.run()
46+
}

src/test/kotlin/dev/silenium/libs/flows/base/SourceBaseTest.kt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import kotlinx.coroutines.flow.toList
1313
import java.nio.ByteBuffer
1414
import kotlin.io.encoding.Base64
1515
import kotlin.io.encoding.ExperimentalEncodingApi
16+
import kotlin.time.Duration.Companion.milliseconds
1617

1718

1819
@OptIn(ExperimentalEncodingApi::class)
@@ -36,6 +37,7 @@ class SourceBaseTest : FunSpec({
3637
decoder.flow.toList()
3738
}
3839
started.await()
40+
delay(100.milliseconds)
3941
val bufs = inputs.map { input ->
4042
val base64 = Base64.encode(input.encodeToByteArray())
4143
val byteBuffer = ByteBuffer.allocateDirect(base64.length)
@@ -46,8 +48,9 @@ class SourceBaseTest : FunSpec({
4648
buf.close()
4749
buf
4850
}
49-
job.cancelAndJoin()
5051
decoder.close()
52+
bufferSource.close()
53+
job.join()
5154
listAsync.await().shouldHaveSize(inputs.size).forEach { item ->
5255
println(item)
5356
item.pad shouldBe 0u
Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,25 @@
11
package dev.silenium.libs.flows.test
22

33
import dev.silenium.libs.flows.api.FlowItem
4-
import dev.silenium.libs.flows.api.Transformer
5-
import dev.silenium.libs.flows.base.SourceBase
6-
import kotlinx.coroutines.*
4+
import dev.silenium.libs.flows.base.JobTransformerBase
5+
import kotlinx.coroutines.CoroutineScope
6+
import kotlinx.coroutines.Dispatchers
77
import kotlinx.coroutines.channels.Channel
8-
import java.util.*
8+
import kotlinx.coroutines.delay
99
import kotlin.io.encoding.Base64
1010
import kotlin.io.encoding.ExperimentalEncodingApi
1111
import kotlin.random.Random
1212
import kotlin.random.nextInt
1313
import kotlin.time.Duration.Companion.milliseconds
1414

15-
class Base64Decoder : SourceBase<ByteArray, DataType>(), Transformer<Base64Buffer, DataType, ByteArray, DataType> {
16-
override val inputMetadata: MutableMap<UInt, DataType> =
17-
Collections.synchronizedMap(mutableMapOf<UInt, DataType>())
15+
@OptIn(ExperimentalUnsignedTypes::class)
16+
class Base64Decoder :
17+
JobTransformerBase<Base64Buffer, DataType, ByteArray, DataType>(CoroutineScope(Dispatchers.IO), 0u) {
1818
private val queue = Channel<FlowItem<Base64Buffer, DataType>>(capacity = 4)
1919

2020
@OptIn(ExperimentalEncodingApi::class)
21-
private val processor = CoroutineScope(Dispatchers.Default).async {
21+
override suspend fun CoroutineScope.run() {
22+
println("Base64Decoder.run")
2223
for (item in queue) {
2324
delay(Random.nextInt(1..250).milliseconds)
2425
val array = ByteArray(item.value.buffer.remaining())
@@ -28,11 +29,9 @@ class Base64Decoder : SourceBase<ByteArray, DataType>(), Transformer<Base64Buffe
2829
}
2930
}
3031

31-
override fun configure(pad: UInt, metadata: DataType): Result<Unit> {
32-
check(metadata == DataType.BASE64) { "metadata must be BASE64" }
33-
this.inputMetadata[pad] = metadata
34-
this.outputMetadata_[pad] = DataType.PLAIN
35-
return Result.success(Unit)
32+
override fun outputMetadata(inputMetadata: DataType): DataType {
33+
check(inputMetadata == DataType.BASE64) { "metadata must be BASE64" }
34+
return DataType.PLAIN
3635
}
3736

3837
override suspend fun receive(item: FlowItem<Base64Buffer, DataType>): Result<Unit> = runCatching {
@@ -41,7 +40,6 @@ class Base64Decoder : SourceBase<ByteArray, DataType>(), Transformer<Base64Buffe
4140

4241
override fun close() {
4342
queue.close()
44-
runBlocking { processor.join() }
4543
super.close()
4644
}
4745
}

0 commit comments

Comments
 (0)