Using Kotka means a more pleasant experience while using Kafka Streams.
Add a dependency on kotka-streams-extensions
for the basics.
// build.gradle.kts
repositories {
mavenCentral()
}
dependencies {
implementation("dev.adamko.kotka:kotka-streams-extensions:$kotkaVersion")
}
There are three modules. Add a dependency on com.github.adamko-dev:kotka-streams
to get them all
at once
dependencies {
implementation("dev.adamko.kotka:kotka-streams:$kotkaVersion")
}
Contains the basic extension functions to make Kafka Streams more Kotlin-esque.
implementation("dev.adamko.kotka:kotka-streams-extensions:$kotkaVersion")
import dev.adamko.kotka.extensions.tables.*
import dev.adamko.kotka.extensions.streams.*
import dev.adamko.kotka.extensions.*
data class MusicalBand(
val name: String,
val memberNames: List<String>,
)
builder.stream<String, MusicalBand>("musical-bands")
.flatMap("band-member-names-to-band-name") { _: String, band: MusicalBand ->
band.memberNames.map { memberName -> memberName to band.name }
}
.groupByKey(groupedAs("map-of-band-member-to-band-names"))
A light framework for structuring topics and records.
implementation("dev.adamko.kotka:kotka-streams-framework:$kotkaVersion")
Use TopicRecord
to standardise the data on each topic. Records can now easily be converted from
one type, to another.
import dev.adamko.kotka.extensions.tables.*
import dev.adamko.kotka.extensions.streams.*
import dev.adamko.kotka.extensions.*
import dev.adamko.kotka.topicdata.*
data class Animal(
val id: Long,
val name: String,
) : TopicRecord<Long> {
override val topicKey: Long by ::id
}
data class Pet(
val id: Long,
val name: String,
) : TopicRecord<Long> {
override val topicKey: Long by ::id
}
val petUpdates = builder.stream<Long, Animal>("animals")
.mapTopicRecords("convert-animals-to-pets") { _, animal ->
Pet(animal.id, animal.name)
}
Use KeyValueSerdes<K, V>
to define both the key and value serdes for a topic.
A TopicDefinition<K, V>
ties both of these together.
/** All [Pet] updates */
object PetUpdatesTopic : TopicDefinition<Long, Animal> {
override val topicName = "pet-updates"
override val serdes = KeyValueSerdes(Serdes.Long(), PetSerde())
}
petUpdates
.to(
PetUpdatesTopic.topicName,
PetUpdatesTopic.serdes.producer("send-pet-updates-to-pet-update-topic")
)
Use Kotlinx Serialization for topic key/value serdes.
implementation("dev.adamko.kotka:kotka-streams-kotlinx-serialization:$kotkaVersion")
import dev.adamko.kotka.extensions.tables.*
import dev.adamko.kotka.extensions.streams.*
import dev.adamko.kotka.extensions.*
import dev.adamko.kotka.topicdata.*
import dev.adamko.kotka.kxs.*
val jsonMapper = Json {}
@Serializable
data class Sku(
val sku: String
)
@Serializable
data class ShopItem(
val id: Sku,
val name: String,
) : TopicRecord<Sku> {
override val topicKey: Sku by ::id
}
object ShopItemTopic : TopicDefinition<Long, ShopItem> {
override val topicName = "shop-item-updates"
override val serdes = KeyValueSerdes.kxsJson(jsonMapper)
}