diff --git a/build.sbt b/build.sbt index d9ab651..d3c751a 100644 --- a/build.sbt +++ b/build.sbt @@ -2,7 +2,7 @@ organization := "com.kifi" name := "franz" -version := "0.3.11" +version := "0.3.12-SNAPSHOT" crossScalaVersions := Seq("2.10.4", "2.11.5") diff --git a/src/main/scala/com/kifi/franz/FakeSQSQueue.scala b/src/main/scala/com/kifi/franz/FakeSQSQueue.scala index e8bc3a1..69bb4ab 100644 --- a/src/main/scala/com/kifi/franz/FakeSQSQueue.scala +++ b/src/main/scala/com/kifi/franz/FakeSQSQueue.scala @@ -17,7 +17,7 @@ trait FakeSQSQueue[T] extends SQSQueue[T] { override def initQueueUrl(): String = "" - override def send(msg: T, messageAttributes: Option[Map[String,String]]): Future[MessageId] = Future.successful(MessageId("")) + override def send(msg: T, messageAttributes: Option[Map[String,String]], delay: Option[Int]): Future[MessageId] = Future.successful(MessageId("")) override protected def nextBatchRequestWithLock(maxBatchSize: Int, lockTimeout: FiniteDuration): Future[Seq[SQSMessage[T]]] = Future.successful(Seq.empty) diff --git a/src/main/scala/com/kifi/franz/SQSQueue.scala b/src/main/scala/com/kifi/franz/SQSQueue.scala index 605e5fc..515417a 100644 --- a/src/main/scala/com/kifi/franz/SQSQueue.scala +++ b/src/main/scala/com/kifi/franz/SQSQueue.scala @@ -61,11 +61,17 @@ trait SQSQueue[T]{ send (msg, None) } - def send(msg: T, messageAttributes: Option[Map[String, String]] = None): Future[MessageId] = { + def send(msg: T, delay:Int): Future[MessageId] = { + send (msg, None, Some(delay)) + } + + def send(msg: T, messageAttributes: Option[Map[String, String]] = None, delay:Option[Int] = None): Future[MessageId] = { val request = new SendMessageRequest request.setMessageBody(msg) request.setQueueUrl(queueUrl) - + delay.map{ d => + request.setDelaySeconds(d) + } // foreach on an Option unfolds Some, and skips if None messageAttributes.foreach { ma => ma.foreach { case (k,v) =>