This repository contains an Akka Streams Source
and Sink
implementation that can be used with Google's Cloud Pub/Sub messaging middleware. The Cloud Pub/Sub sinks and sources provided in this repository have been in production at Qubit for several months and is used to process a Cloud Pub/Sub stream that averages over a billion messages a day in real time.
Currently only Scala 2.11 artifacts are published.
SBT:
"com.qubit" % "akka-cloudpubsub_2.11" % "1.0.0"
Maven:
<dependency>
<groupId>com.qubit</groupId>
<artifactId>akka-cloudpubsub_2.11</artifactId>
<version>1.0.0</version>
</dependency>
From source:
sbt localInstall
PubSubSource
is backpressure aware and only reads from Pub/Sub when there's demand downstream. By default, messages are acked every 10 seconds or when the ack buffer is full (default size 100). The ack interval, buffer size and several other properties are configurable via attributes.
Creating a source reading from the foo-sub
subscription in the foo-project
with the default settings:
Source.fromGraph(new PubSubSource(PubSubSubscription("foo-project", "foo-sub")))
Overriding the ack interval to 30 seconds:
Source.fromGraph(new PubSubSource(PubSubSubscription("foo-project", "foo-sub")), 30.seconds)
Customise the ack interval, buffer size, pull timeout, max retries and retry jitter:
val attributes = Attributes(List(
PubSubPullTimeoutAttribute(60.seconds),
PubSubStageBufferSizeAttribute(200),
PubSubStageMaxRetriesAttribute(100),
PubSubStageRetryJitterAttribute(1, 5)))
Source.fromGraph(new PubSubSource(PubSubSubscription("foo-project", "foo-sub"), 30.seconds)).withAttributes(attributes)
PubSubSink
flushes messages either when the buffer is full (size is 100 by default) or after the messages have been in the buffer for maxDelay
(30 seconds by default).
Creating a sink writing to foo-topic
topic in the foo-project
with the default settings:
Sink.fromGraph(new PubSubSink(PubSubTopic("foo-project", "foo-topic")))
Overriding the max delay to 10 seconds:
Sink.fromGraph(new PubSubSink(PubSubTopic("foo-project", "foo-topic")), 10.seconds)
Customise the max delay, buffer size, publish timeout, max retries and retry jitter:
val attributes = Attributes(List(
PubSubStageBufferSizeAttribute(200),
PubSubStageMaxRetriesAttribute(100),
PubSubStageRetryJitterAttribute(1, 5),
PubSubPublishTimeoutAttribute(60.seconds)))
Sink.fromGraph(new PubSubSink(PubSubTopic("foo-project", "foo-topic"))).withAttributes(attributes)
The sink and the source both make use of a custom client library implemented using the gRPC API definitions published by Google. This client can be used as a standalone Pub/Sub client even if you are not interested in using Akka streams.
In almost all cases, you should be using the RetryingPubSubClient
which supports customizable retry policies using the Atmos library. Custom retry policies and retry execution contexts need to be provided as implicit parameters to the client when it is created. See com.qubit.pubsub.client.retry.RetryPolicyDefaults
for the default values used for these parameters.
Creating the default client:
val client = RetryingPubSubClient(PubSubGrpcClient())
Overriding the Pub/Sub endpoint and transport security (for tests using the Pub/Sub emulator):
val config = PubSubApiConfig(apiHost = "localhost", apiPort = "8897", tlsEnabled = false)
val client = RetryingPubSubClient(PubSubGrpcClient(config))
The integration tests require the Pub/Sub emulator to be running.
On a new terminal window:
mkdir -p /tmp/pubsub && gcloud beta emulators pubsub start --data-dir /tmp/pubsub
On another terminal window:
Change directory to source root and
eval $(gcloud beta emulators pubsub env-init --data-dir /tmp/pubsub)
sbt test it:test
Ensure that credentials and PGP settings are correctly configured as documented at http://www.scala-sbt.org/release/docs/Using-Sonatype.html
sbt release
sbt releaseSonatype
- Please make sure that the code is formatted using scalafmt using the provided formatting configuration
- Add tests to exercise the new additions/modifications
- Create an issue on Github before submitting your pull request