Skip to content

Commit 7465a8f

Browse files
committed
Using headOption instead of head for accessing first element in a list.
1 parent 5c15b5d commit 7465a8f

File tree

4 files changed

+15
-6
lines changed

4 files changed

+15
-6
lines changed

src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,9 @@ trait EmbeddedKafka {
9999
val messageStreams =
100100
consumer.createMessageStreamsByFilter(filter, keyDecoder = new StringDecoder, valueDecoder = new StringDecoder)
101101

102-
val messageFuture = Future { messageStreams.head.iterator().next().message() }
102+
val messageFuture = Future {
103+
messageStreams.headOption.getOrElse(throw new KafkaSpecException("Unable to find a message stream")).iterator().next().message()
104+
}
103105

104106
try {
105107
Await.result(messageFuture, 3 seconds)

src/main/scala/net/manub/embeddedkafka/KafkaUnavailableException.scala

Lines changed: 0 additions & 3 deletions
This file was deleted.
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package net.manub.embeddedkafka
2+
3+
class KafkaUnavailableException extends RuntimeException
4+
5+
class KafkaSpecException(msg: String) extends RuntimeException(msg)

src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaSpec.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import akka.actor.{Actor, ActorRef, ActorSystem, Props}
88
import akka.io.Tcp._
99
import akka.io.{IO, Tcp}
1010
import akka.testkit.{ImplicitSender, TestKit}
11-
import kafka.consumer.{ConsumerConfig, Consumer, Whitelist}
11+
import kafka.consumer.{Consumer, ConsumerConfig, Whitelist}
1212
import kafka.serializer.StringDecoder
1313
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
1414
import org.apache.kafka.common.serialization.StringSerializer
@@ -124,7 +124,12 @@ class EmbeddedKafkaSpec
124124
val messageStreams = consumer.createMessageStreamsByFilter(filter, 1, stringDecoder, stringDecoder)
125125

126126
val eventualMessage = Future {
127-
messageStreams.head.iterator().next().message()
127+
messageStreams
128+
.headOption
129+
.getOrElse(throw new RuntimeException("Unable to retrieve message streams"))
130+
.iterator()
131+
.next()
132+
.message()
128133
}
129134

130135
whenReady(eventualMessage) { msg =>

0 commit comments

Comments
 (0)