Skip to content

kifi/franz

Repository files navigation

Franz is a simple reactive-ish Scala wrapper around Amazons SQS persitent message queue.

#Initialization First you will need an instance of the trait SQSClient. The only currently available implementation is SimpleSQSClient, which has three constructors

new SimpleSQSClient(
	credentialProvider: com.amazonaws.auth.AWSCredentialsProvider, 
	region: com.amazonaws.regions.Regions
)

SimpleSQSClient(credentials: com.amazonaws.auth.AWSCredentials, region: com.amazonaws.regions.Regions)

SimpleSQSClient(key: String, secret: String, region: com.amazonaws.regions.Regions)

Let's use the third.

import com.amazonaws.regions.Regions
val sqs = SimpleSQSClient(<your aws access key>, <your aws secret key>, Regions.US_WEST_1)

Now that we have an instance of SQSClient we can get ourselves an instance of SQSQueue like so

val queue = sqs.simple(QueueName(<your queue name>))

#SQSQueue

##Sending SQSQueue provides two methods for sending messages:

def send(msg: String)(implicit ec: ExecutionContext): Future[MessageId]
def send(msg: play.api.libs.json.JsValue)(implicit ec: ExecutionContext): Future[MessageId]

There is no current use for the returned MessageId, but you can use the success of the Future as a send confimation.

##Receiving

###Direct SQSQueue provides several methods for getting the next message in the queue

def nextString(implicit ec: ExecutionContext): Future[Option[SQSStringMessage]]
def nextStringWithLock(lockTimeout: FiniteDuration)(implicit ec: ExecutionContext): Future[Option[SQSStringMessage]]
def nextStringBatch(maxBatchSize: Int)(implicit ec: ExecutionContext): Future[Seq[SQSStringMessage]]
def nextStringBatchWithLock(maxBatchSize: Int, lockTimeout: FiniteDuration)(implicit ec: ExecutionContext): Future[Seq[SQSStringMessage]]

def nextJson(implicit ec: ExecutionContext): Future[Option[SQSJsonMessage]]
def nextJsonWithLock(lockTimeout: FiniteDuration)(implicit ec: ExecutionContext): Future[Option[SQSJsonMessage]]
def nextJsonBatch(maxBatchSize: Int)(implicit ec: ExecutionContext): Future[Seq[SQSJsonMessage]]
def nextJsonBatchWithLock(maxBatchSize: Int, lockTimeout: FiniteDuration)(implicit ec: ExecutionContext): Future[Seq[SQSJsonMessage]]

The returned SQS*Message object are instances of SQSMessage[String] and SQSMessage[play.api.libs.json.JsValue] respectively. SQSMessage[T] has the fields

val body: T //actual message payload
val attributes: Map[String,String] //raw attributes from com.amazonaws.services.sqs.model.Message
val consume: () => Unit //deletes the message from the queue

The *WithLock methods lock (or rather, hide) the retrieved message(s) in the queue so that no other call will retrieve them during the lock timeout. You need to call consume on the message before the timeout expires in order to permanently remove it form the queue.

If the lock expires the message will again be available for retrieval, which is useful e.g. in case of an error when cosume was never called.

A call to retrieve a message (batch) will long poll for 10 seconds. If there is nothing to fetch an empty sequence or a None will be returned, depending on the method.

###Iteratees For the more functionally inclined SQSQueue also provides enumerators to be used with your favorite Iteratee

def stringEnumerator(implicit ec: ExecutionContext) : Enumerator[SQSStringMessage]
def stringEnumeratorWithLock(lockTimeout: FiniteDuration)(implicit ec: ExecutionContext): Enumerator[SQSStringMessage]

def jsonEnumerator(implicit ec: ExecutionContext): Enumerator[SQSJsonMessage]
def jsonEnumeratorWithLock(lockTimeout: FiniteDuration)(implicit ec: ExecutionContext): Enumerator[SQSJsonMessage]

The semantics of retrievel and locking are identical to those of the next* methods.

#FormattedSQSQueue In addition to SQSQueue Franz also has a FormattedSQSQueue[T], which has the signature

def send(msg: T)(implicit ec: ExecutionContext, f: Writes[T]): Future[MessageId]
def next(implicit ec: ExecutionContext, f: Reads[T]): Future[Option[SQSMessage[T]]]
def nextWithLock(lockTimeout: FiniteDuration)(implicit ec: ExecutionContext, f: Reads[T]): Future[Option[SQSMessage[T]]]
def nextBatch(maxBatchSize: Int)(implicit ec: ExecutionContext, f: Reads[T]): Future[Seq[SQSMessage[T]]]
def nextBatchWithLock(maxBatchSize: Int, lockTimeout: FiniteDuration)(implicit ec: ExecutionContext, f: Reads[T]): Future[Seq[SQSMessage[T]]]
def enumerator(implicit ec: ExecutionContext, f: Reads[T]): Enumerator[SQSMessage[T]]
def enumeratorWithLock(lockTimeout: FiniteDuration)(implicit ec: ExecutionContext, f: Reads[T]): Enumerator[SQSMessage[T]]

The Reads[T] and Writes[T] are from play.api.libs.json and this basically allows you to have a queue for any type with a play style implicit json formatter. The semantics of the methods here are identical to the *Json* methods on SQSQueue, except that serialization/deserilization is taken care of for you.

#Limitations

  • Fairly high latency. Not really suitable for things that require immediate action.
  • Message size is limited to ~64KB.
  • FIFO not guaranteed for messages sent close together. (i.e. there is no strict ordering of messages)
  • Multicasting is really cumbersome.
  • No replay. Once a message is consumed, it's gone.

About

Better Api for using SQS with Scala

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages