ZIO-JMS adapts JMS API to ZIO streams and makes it working more conveniently and seamlessly integrates ZIO
- Add Maven Central Repository to your build script
- Add dependency on the library "io.github.dobrynya" %% "zio-jms" % "0.2". zio-jms is available for Scala 2 & 3
For receiving messages it needs to create a message consumer using utility methods in JmsConsumer object
import zio.{ZIO, Chunk}
import javax.jms.{Connection, Message, JMSException}
import io.github.dobrynya.zio.jms._
val received: ZIO[Connection, JMSException, Chunk[Message]] =
JmsConsumer.consume(Queue("test-queue"))
.take(5)
.collect(onlyText)
.runCollect
You can process a stream of messages transactionally like follows
import zio.{ZIO, Chunk, UIO}
import javax.jms.{Connection, Message, JMSException}
import io.github.dobrynya.zio.jms._
def messageProcessor(message: Message): UIO[Unit] = ???
val received: ZIO[Connection, JMSException, Unit] =
JmsConsumer.consumeTx(Queue("test-queue"))
.take(5)
.tap(transactionalMessage => messageProcessor(transactionalMessage.message) <* transactionalMessage.commit)
.runDrain
Another ability to process input messages without using ZIO streams is more concise. A message is being acknowledged after successful processing
import zio._
import javax.jms.{Connection, JMSException, Message}
import io.github.dobrynya.zio.jms._
import zio.Console.printLine
def someMessageProcessor(message: Message): ZIO[Console, Exception, Unit] =
printLine(s"Received message $message")
val processing: ZIO[Console with Connection, Exception, Unit] =
JmsConsumer.consumeWith(Topic("test-topic"), someMessageProcessor)
In case of possible failures during processing a message I recommend using a transactional consumer which commits a message when it is processed successfully and rolls back when it fails
import zio.{ZIO, IO}
import javax.jms.{Connection, Message, JMSException}
import io.github.dobrynya.zio.jms._
def someMessageProcessor(message: Message): IO[String, Unit] =
IO.fail(s"Error occurred during processing a message $message")
val processing: ZIO[Connection, Any, Unit] =
JmsConsumer.consumeTxWith(Topic("test-topic"), someMessageProcessor)
For sending messages it needs to create sinks providing a destination and a message encoder as follows
import zio.stream.ZStream
import io.github.dobrynya.zio.jms._
val messages = (1 to 100).map(i => s"Message $i")
ZStream.fromIterable(messages).run(JmsProducer.sink(Queue("test-queue"), textMessageEncoder))
The last thing is to provide a connection like follows
import zio._
import javax.jms.{Connection, ConnectionFactory, JMSException}
import io.github.dobrynya.zio.jms._
def connectionFactory: ConnectionFactory = ???
val connectionLayer: Layer[JMSException, Connection] =
ZLayer.fromManaged(connection(connectionFactory))
val consuming = JmsConsumer
.consume(Queue("test-queue"))
.runDrain
.provideSomeLayer(connectionLayer)
import io.github.dobrynya.zio.jms._
import zio.stream._
val request = Queue("request")
val response = Queue("response")
val messages: List[String] = ???
ZStream.fromIterable(messages)
.run(JmsProducer.requestSink(request, response, textMessageEncoder))
import io.github.dobrynya.zio.jms._
import zio.stream._
val request = Queue("request")
JmsConsumer.consumeAndReplyWith(request,
(message, session) => textMessageEncoder(onlyText(message).toUpperCase, session).asSome)
Here is a responder handling input messages and optionally responding to them to a destination specified in
JMSReplyTo
header. It automatically copy JMSCorellationID
header before sending.