Skip to content

RHDZMOTA/google-pubsub-scala

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

10 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

PUBSUB Scala

Utility code for using Google PubSub with Scala (Akka Streams). This repo follows the SemVer Specification.

  • Current version: 1.0.0

Usage

Note: You need to create a google service account for pubsub to have the access keys for the project.

Configuration variables

You can put the relevant configuration variables in the ENV and create a pubsub config object:

val config: Option[PubSubConfig] = PubSubConfig.fromEnv(
  "PUBSUB_PRIVATE_KEY", "GOOGLE_PROJECT_ID",
  "PUBSUB_API_ID", "GOOGLE_SERVICE_ACCOUNT_EMAIL")

Or pass them directly (not recommended):

val config: PubSubConfig = PubSubConfig(
  "my-private-key", "my-google-ptoject-id",
  "my-pubsub-api-id", "my-google-service-account-email")

Or use a configuration file with the credentials and initialize the object later (e.g. application.conf).

Producer

Example of a producer/publisher:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import com.rhdzmota.pubsub.{PubSubConfig, PubSubProducer}

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

object Example {

  def main(args: Array[String]): Unit = {
    implicit val actorSystem = ActorSystem()
    implicit val actorMaterializer = ActorMaterializer()
    
    val pubSubConfig = PubSubConfig.fromEnv(
      "PUBSUB_PRIVATE_KEY", "GOOGLE_PROJECT_ID",
      "PUBSUB_API_ID", "GOOGLE_SERVICE_ACCOUNT_EMAIL")
      
    val exampleTopic = "my-pubsub-topic"
    val exampleData = "Data payload for pubsub"
    val exampleMessageId = "message-id-example"
    val exampleAttributes = Some(Map("key" -> "value"))
    
    val result: Option[Future[Seq[Seq[String]]]] = pubSubConfig.map(config => {
      val pubSubProducer = PubSubProducer(config)
      pubSubProducer.publish(exampleTopic, exampleData,
        exampleMessageId, exampleAttributes)
    })
    
    result match {
      case Some(future) => future.onComplete(println(_))
      case None => println("Missing env variables")
    }
  }
}

Consumer

Example of a consumer/subscriber:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.alpakka.googlecloud.pubsub.ReceivedMessage
import com.rhdzmota.pubsub.{PubSubConfig, PubSubConsumer}

object Example {

  def main(args: Array[String]): Unit = {
    implicit val actorSystem = ActorSystem()
    implicit val actorMaterializer = ActorMaterializer()
    
    val pubSubConfig = PubSubConfig.fromEnv(
      "PUBSUB_PRIVATE_KEY", "GOOGLE_PROJECT_ID",
      "PUBSUB_API_ID", "GOOGLE_SERVICE_ACCOUNT_EMAIL")
      
    val exampleSubscription = "my-subscription"
    val printMessageFunction: ReceivedMessage => Unit = 
      (receivedMessage: ReceivedMessage) => println(receivedMessage.message,toString)
      
    val graph = pubSubConfig.map(config => {
      val pubSubConsumer = PubSubConsumer(config)
      pubSubConsumer.subscribe(printMessageFunction)(exampleSubscription)
    })
    
    graph match {
      case Some(runnable) => runnable.run()
      case None => println("Missing env variables")
    }
  }
}