Utility code for using Google PubSub with Scala (Akka Streams). This repo follows the SemVer Specification.
- Current version: 1.0.0
Note: You need to create a google service account for pubsub to have the access keys for the project.
You can put the relevant configuration variables in the ENV and create a pubsub config object:
val config: Option[PubSubConfig] = PubSubConfig.fromEnv(
Or pass them directly (not recommended):
val config: PubSubConfig = PubSubConfig(
"my-private-key", "my-google-ptoject-id",
"my-pubsub-api-id", "my-google-service-account-email")
Or use a configuration file with the credentials and initialize the object later (e.g. application.conf
Example of a producer/publisher:
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import com.rhdzmota.pubsub.{PubSubConfig, PubSubProducer}
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
object Example {
def main(args: Array[String]): Unit = {
implicit val actorSystem = ActorSystem()
implicit val actorMaterializer = ActorMaterializer()
val pubSubConfig = PubSubConfig.fromEnv(
val exampleTopic = "my-pubsub-topic"
val exampleData = "Data payload for pubsub"
val exampleMessageId = "message-id-example"
val exampleAttributes = Some(Map("key" -> "value"))
val result: Option[Future[Seq[Seq[String]]]] = pubSubConfig.map(config => {
val pubSubProducer = PubSubProducer(config)
pubSubProducer.publish(exampleTopic, exampleData,
exampleMessageId, exampleAttributes)
result match {
case Some(future) => future.onComplete(println(_))
case None => println("Missing env variables")
Example of a consumer/subscriber:
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.alpakka.googlecloud.pubsub.ReceivedMessage
import com.rhdzmota.pubsub.{PubSubConfig, PubSubConsumer}
object Example {
def main(args: Array[String]): Unit = {
implicit val actorSystem = ActorSystem()
implicit val actorMaterializer = ActorMaterializer()
val pubSubConfig = PubSubConfig.fromEnv(
val exampleSubscription = "my-subscription"
val printMessageFunction: ReceivedMessage => Unit =
(receivedMessage: ReceivedMessage) => println(receivedMessage.message,toString)
val graph = pubSubConfig.map(config => {
val pubSubConsumer = PubSubConsumer(config)
graph match {
case Some(runnable) => runnable.run()
case None => println("Missing env variables")