-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathCatsKafkaStreamsApp.scala
55 lines (45 loc) · 1.84 KB
/
CatsKafkaStreamsApp.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.kafka.demo
import java.util.concurrent.{ ExecutorService, Executors, TimeUnit }
import cats.Monad
import cats.effect._
import cats.syntax.flatMap._
import cats.syntax.functor._
import cats.syntax.show._
import com.kafka.demo.settings.Settings
import com.kafka.demo.streams.KafkaStreamsRuntime
import log.effect.LogWriter
import log.effect.fs2.SyncLogWriter.log4sLog
import scala.concurrent.ExecutionContext
// sbt "cats-kafka-streams/runMain com.kafka.demo.CatsKafkaStreamsApp"
object CatsKafkaStreamsApp extends IOApp.WithContext {
/**
* Run Kafka Streams application
*/
override def run(args: List[String]): IO[ExitCode] =
log4sLog[IO](getClass)
.flatMap(implicit logger => app[IO].redeemWith(onError[IO], onSuccess[IO]))
private[this] final def onSuccess[F[_]: Monad: LogWriter]: Unit => F[ExitCode] =
_ => LogWriter.info("Application succeeded") >> Monad[F].pure(ExitCode.Success)
private[this] final def onError[F[_]: Monad: LogWriter](e: Throwable): F[ExitCode] =
LogWriter.error("Application failed", e) >> Monad[F].pure(ExitCode.Error)
private[this] final def app[F[_]: Async: ContextShift: Timer: LogWriter]: F[Unit] =
for {
settings <- Settings.config.load[F]
_ <- LogWriter.info("Load settings ...")
_ <- LogWriter.info(settings.show)
_ <- LogWriter.info("Start application ...")
_ <- KafkaStreamsRuntime[F].run(settings)
} yield ()
override protected def executionContextResource: Resource[SyncIO, ExecutionContext] = {
val acquire = SyncIO(Executors.newCachedThreadPool())
val release: ExecutorService => SyncIO[Unit] = pool =>
SyncIO {
pool.shutdown()
pool.awaitTermination(10, TimeUnit.SECONDS)
()
}
Resource
.make(acquire)(release)
.map(ExecutionContext.fromExecutorService)
}
}