Reads the contents of provided Kafka topics, either the topics in their entirety or up until a consumer groups last committed Offset depending on which LoadTopicStrategy
you provide.
As of version 1.3.0
, data can be loaded either from complete topics using load
or loadAndRun
Since version 1.4.0
the library is cross compiled for scala versions 2.12
and 2.13
Since version 1.6.0
the library is cross compiled for scala versions 2.12
, 2.13
and 3
Add the following to your build.sbt
libraryDependencies += "" %% "kafka-topic-loader" % "<version>"
import{LoadAll, TopicLoader}
import org.apache.kafka.common.serialization.Deserializer}
implicit val as: ActorSystem = ActorSystem()
implicit val stringDeserializer: Deserializer[String] = new StringDeserializer
val stream = TopicLoader.load[String, String]("topic-to-load"), LoadAll)
.mapAsync(1)(_ => ??? /* store records in akka.Actor for example */)
will load the topics, complete the Future[Done]
from the materialised value and then carry on
running, emitting any new records that appear on the topics. An example use-case for this is a REST API that holds the
contents of a Kafka topic in memory. This kind of application doesn't need to commit offsets and can use the Future[Done]
to determine readiness.
object Main extends App {
implicit val system = ActorSystem()
implicit val mat = ActorMaterializer()
import system.dispatcher
implicit val keyDeserializer: Deserializer[String] = new StringDeserializer
implicit val valueDeserializer: Deserializer[Array[Byte]] = new ByteArrayDeserializer
val state = new SimplifiedState
val (initialLoadingFuture, controlF): (Future[Done], Future[Consumer.Control]) =
.loadAndRun[String, Array[Byte]]("topic-to-load"))
.to(Sink.foreach(record =>, record.value)))
initialLoadingFuture.foreach(_ => state.isAppReady.set(true))
class SimplifiedState {
* API requests may query this state
val store = new ConcurrentHashMap[String, Array[Byte]]()
* A readiness endpoint could be created that queries this
val isAppReady = new AtomicBoolean()
The config in reference.conf
can be overridden by providing your own application.conf
By default, Akka's ConsumerConfig
will inherit the consumer
from the application kafka-topic-loader is running from. To separate the client id of your application and the kafka-topic-loader, provide it in your application.conf
topic-loader {
client-id = "custom-client-id"
You should configure the
to match that of your application, e.g.:
akka.kafka {
consumer.kafka-clients {
bootstrap.servers = ${?KAFKA_BROKERS} = assembler-consumer-group
producer.kafka-clients {
bootstrap.servers = ${?KAFKA_BROKERS}
This is deprecated in favour of a new API for partitioned loading which is coming soon.
Data can also be loaded from specific partitions using fromPartitions
. By loading from specific partitions the topic
loader can be used by multiple application instances with separate streams per set of partitions (see Alpakka kafka and below).
implicit val system = ActorSystem()
val consumerSettings: ConsumerSettings[String, Long] = ???
val doBusinessLogic: ConsumerRecord[String, Long] => Future[Unit] = ???
val stream: Source[ConsumerMessage.CommittableMessage[String, Long], Consumer.Control] =
.committablePartitionedSource(consumerSettings, Subscriptions.topics("topic-to-load"))
.flatMapConcat {
case (topicPartition, source) =>
.fromPartitions(LoadAll,, doBusinessLogic, new LongDeserializer())
.flatMapConcat(_ => source)