Skip to content

Commit

Permalink
Merge pull request #15 from Spinoco/feature/fs2-dep-up
Browse files Browse the repository at this point in the history
Updated the fs2 dependency to RC2
  • Loading branch information
AdamChlupacek authored Jan 25, 2018
2 parents 0102d30 + 28397a2 commit b295ee4
Show file tree
Hide file tree
Showing 15 changed files with 165 additions and 170 deletions.
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ lazy val commonSettings = Seq(
libraryDependencies ++= Seq(
"org.scalatest" %% "scalatest" % "3.0.0" % "test"
, "org.scalacheck" %% "scalacheck" % "1.13.4" % "test"
, "co.fs2" %% "fs2-core" % "0.10.0-M8"
, "co.fs2" %% "fs2-io" % "0.10.0-M8"
, "co.fs2" %% "fs2-core" % "0.10.0-RC2"
, "co.fs2" %% "fs2-io" % "0.10.0-RC2"
, "com.spinoco" %% "protocol-kafka" % "0.2.1"

),
Expand Down
91 changes: 47 additions & 44 deletions src/main/scala/spinoco/fs2/kafka/KafkaClient.scala

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions src/main/scala/spinoco/fs2/kafka/network/BrokerConnection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ object BrokerConnection {
}
.flatMap { rm =>
MessageCodec.requestCodec.encode(rm).fold(
err => fail(new Throwable(s"Failed to serialize message: $err : $rm"))
err => raiseError(new Throwable(s"Failed to serialize message: $err : $rm"))
, data => eval(sendOne(Chunk.bytes(data.toByteArray)))
)
}
Expand Down Expand Up @@ -126,10 +126,10 @@ object BrokerConnection {
val buff = acc ++ ByteVector.view(bs.values, bs.offset, bs.size)
val (rem, sz, out) = collectChunks(buff, msgSz)

Pull.segment(out) *> go(rem, sz, tail)
Pull.segment(out) >> go(rem, sz, tail)

case None =>
if (acc.nonEmpty) Pull.fail(new Throwable(s"Input terminated before all data were consumed. Buff: $acc"))
if (acc.nonEmpty) Pull.raiseError(new Throwable(s"Input terminated before all data were consumed. Buff: $acc"))
else Pull.done
}
}
Expand Down Expand Up @@ -189,16 +189,16 @@ object BrokerConnection {
openRequests: Ref[F,Map[Int,RequestMessage]]
):Pipe[F,ByteVector,ResponseMessage] = {
_.flatMap { bs =>
if (bs.size < 4) Stream.fail(new Throwable(s"Message chunk does not have correlation id included: $bs"))
if (bs.size < 4) Stream.raiseError(new Throwable(s"Message chunk does not have correlation id included: $bs"))
else {
val correlationId = bs.take(4).toInt()
eval(openRequests.modify{ _ - correlationId}).flatMap { case Change(m, _) =>
m.get(correlationId) match {
case None => Stream.fail(new Throwable(s"Received message correlationId for message that does not exists: $correlationId : $bs : $m"))
case None => Stream.raiseError(new Throwable(s"Received message correlationId for message that does not exists: $correlationId : $bs : $m"))
case Some(req) =>
MessageCodec.responseCodecFor(req.version, ApiKey.forRequest(req.request)).decode(bs.drop(4).bits)
.fold(
err => Stream.fail(new Throwable(s"Failed to decode repsonse to request: $err : $req : $bs"))
err => Stream.raiseError(new Throwable(s"Failed to decode repsonse to request: $err : $req : $bs"))
, result => Stream.emit(ResponseMessage(correlationId,result.value))
)
}
Expand Down
10 changes: 5 additions & 5 deletions src/test/scala/spinoco/fs2/kafka/DockerSupport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ object DockerSupport {
Stream.eval(async.unboundedQueue[IO,String]).flatMap { q =>

def enqueue(s: String): Unit = {
semaphore.increment *>
isDone.get.flatMap { done => if (!done) q.enqueue1(s) else IO.unit } *>
semaphore.increment >>
isDone.get.flatMap { done => if (!done) q.enqueue1(s) else IO.unit } >>
semaphore.decrement
} unsafeRunSync

Expand All @@ -78,7 +78,7 @@ object DockerSupport {

Stream.bracket(IO(Process(s"docker logs -f $imageId").run(logger)))(
_ => q.dequeue
, p => semaphore.increment *> isDone.modify(_ => true) *> IO(p.destroy()) *> semaphore.decrement
, p => semaphore.increment >> isDone.modify(_ => true) >> IO(p.destroy()) >> semaphore.decrement
)
}}}
}
Expand All @@ -96,7 +96,7 @@ object DockerSupport {
* Issues a kill to image with given id
*/
def killImage(imageId: String @@ DockerId):IO[Unit] = {
IO { Process(s"docker kill $imageId").!! } *>
IO { Process(s"docker kill $imageId").!! } >>
runningImages.flatMap { allRun =>
if (allRun.exists(imageId.startsWith)) killImage(imageId)
else IO.pure(())
Expand All @@ -107,7 +107,7 @@ object DockerSupport {
* Cleans supplied image from the docker
*/
def cleanImage(imageId: String @@ DockerId):IO[Unit] = {
IO { Process(s"docker rm $imageId").!! } *>
IO { Process(s"docker rm $imageId").!! } >>
availableImages.flatMap { allAvail =>
if (allAvail.exists(imageId.startsWith)) cleanImage(imageId)
else IO.pure(())
Expand Down
18 changes: 10 additions & 8 deletions src/test/scala/spinoco/fs2/kafka/Fs2KafkaRuntimeSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class Fs2KafkaRuntimeSpec extends Fs2KafkaClientSpec {
/** stops and cleans the given image **/
def stopImage(zkImageId: String @@ DockerId):IO[Unit] = {
runningImages flatMap { allRunning =>
if (allRunning.exists(zkImageId.startsWith)) killImage(zkImageId) *> cleanImage(zkImageId)
if (allRunning.exists(zkImageId.startsWith)) killImage(zkImageId) >> cleanImage(zkImageId)
else availableImages flatMap { allAvailable =>
if (allAvailable.exists(zkImageId.startsWith)) cleanImage(zkImageId)
else IO.pure(())
Expand Down Expand Up @@ -146,14 +146,15 @@ class Fs2KafkaRuntimeSpec extends Fs2KafkaClientSpec {

/** process emitting once docker id of zk and kafka in singleton (one node) **/
def withKafkaSingleton[A](version: KafkaRuntimeRelease.Value)(f: (String @@ DockerId, String @@ DockerId) => Stream[IO, A]):Stream[IO,A] = {
Stream.eval(createNetwork("fs2-kafka-network")) *>
Stream.eval(createNetwork("fs2-kafka-network")) >>
Stream.eval(startZk()).flatMap { zkId =>
awaitZKStarted(zkId) ++ S.sleep_[IO](2.seconds) ++
Stream.eval(startK(version, 1)).flatMap { kafkaId =>
(awaitKStarted(version, kafkaId) ++ f(zkId, kafkaId))
.onFinalize {
stopImage(kafkaId) *>
stopImage(zkId) *>
IO.apply(println("Stopping and deleting")) >>
stopImage(kafkaId) >>
stopImage(zkId) >>
removeNetwork("fs2-kafka-network")
}
}}
Expand All @@ -162,8 +163,8 @@ class Fs2KafkaRuntimeSpec extends Fs2KafkaClientSpec {

def withKafkaClient[A](version: KafkaRuntimeRelease.Value, protocol: ProtocolVersion.Value)(f: KafkaClient[IO] => Stream[IO, A]): Stream[IO, A] = {
withKafkaSingleton(version) { (_, kafkaDockerId) =>
S.sleep[IO](1.second) *>
Stream.eval(createKafkaTopic(kafkaDockerId, testTopicA)) *>
S.sleep[IO](1.second) >>
Stream.eval(createKafkaTopic(kafkaDockerId, testTopicA)) >>
KafkaClient[IO](Set(localBroker1_9092), protocol, "test-client") flatMap { kc =>
awaitLeaderAvailable(kc, testTopicA, part0).drain ++ f(kc)
}
Expand Down Expand Up @@ -265,9 +266,10 @@ class Fs2KafkaRuntimeSpec extends Fs2KafkaClientSpec {
def publishNMessages(client: KafkaClient[IO],from: Int, to: Int, quorum: Boolean = false): IO[Unit] = {

Stream.range(from, to).evalMap { idx =>
println(s"publishing $idx")
client.publish1(testTopicA, part0, ByteVector(1), ByteVector(idx), quorum, 10.seconds)
}
.run
.compile.drain

}

Expand All @@ -285,7 +287,7 @@ class Fs2KafkaRuntimeSpec extends Fs2KafkaClientSpec {
case BrokerAddress(_, 9092) => Stream.eval_(killImage(nodes.nodes(tag[Broker](1))))
case BrokerAddress(_, 9192) => Stream.eval_(killImage(nodes.nodes(tag[Broker](2))))
case BrokerAddress(_, 9292) => Stream.eval_(killImage(nodes.nodes(tag[Broker](3))))
case other => Stream.fail(new Throwable(s"Unexpected broker: $other"))
case other => Stream.raiseError(new Throwable(s"Unexpected broker: $other"))
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions src/test/scala/spinoco/fs2/kafka/KafkaClientLastOffsetSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@ class KafkaClientLastOffsetSpec extends Fs2KafkaRuntimeSpec {
s"Last Offset (single broker)" - {

"queries when topic is empty" in {
((withKafkaClient(runtime, protocol) { kc =>
withKafkaClient(runtime, protocol) { kc =>
Stream.eval(kc.offsetRangeFor(testTopicA, tag[PartitionId](0)))
} runLog ) unsafeRunTimed 30.seconds) shouldBe Some(Vector((offset(0), offset(0))))
}.compile.toVector.unsafeRunTimed(30.seconds) shouldBe Some(Vector((offset(0), offset(0))))
}


"queries when topic is non-empty" in {
((withKafkaClient(runtime, protocol) { kc =>
Stream.eval(kc.publish1(testTopicA, part0, ByteVector(1, 2, 3), ByteVector(5, 6, 7), false, 10.seconds)) *>
withKafkaClient(runtime, protocol) { kc =>
Stream.eval(kc.publish1(testTopicA, part0, ByteVector(1, 2, 3), ByteVector(5, 6, 7), false, 10.seconds)) >>
Stream.eval(kc.offsetRangeFor(testTopicA, tag[PartitionId](0)))
} runLog ) unsafeRunTimed 30.seconds) shouldBe Some(Vector((offset(0), offset(1))))
}.compile.toVector.unsafeRunTimed(30.seconds) shouldBe Some(Vector((offset(0), offset(1))))
}


Expand Down
61 changes: 31 additions & 30 deletions src/test/scala/spinoco/fs2/kafka/KafkaClientPublishSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ class KafkaClientPublishSpec extends Fs2KafkaRuntimeSpec {
} drain
}

((withKafkaClient(runtime, protocol) { kc =>
withKafkaClient(runtime, protocol) { kc =>
publish(kc) ++
S.sleep[IO](2.second) *> // wait for message to be accepted
S.sleep[IO](2.second) >> // wait for message to be accepted
kc.subscribe(testTopicA, part0, offset(0l)).take(10)
} runLog ) unsafeRunTimed 30.seconds).size shouldBe 10
}.compile.toVector.unsafeRunTimed(30.seconds).map(_.size) shouldBe Some(10)

}

Expand All @@ -39,12 +39,13 @@ class KafkaClientPublishSpec extends Fs2KafkaRuntimeSpec {
} map (Left(_))
}

(((withKafkaClient(runtime, protocol) { kc =>
withKafkaClient(runtime, protocol) { kc =>
publish(kc) ++
(kc.subscribe(testTopicA, part0, offset(0l)) map (Right(_)))
} take 20) runLog) unsafeRunTimed 30.seconds) shouldBe
}.take(20).compile.toVector.unsafeRunTimed(30.seconds) shouldBe Some {
(for { idx <- 0 until 10} yield Left(offset(idx))).toVector ++
(for { idx <- 0 until 10} yield Right(TopicMessage(offset(idx), ByteVector(1), ByteVector(idx), offset(10)))).toVector
}
}


Expand All @@ -55,11 +56,11 @@ class KafkaClientPublishSpec extends Fs2KafkaRuntimeSpec {
} drain
}

((withKafkaClient(runtime, protocol) { kc =>
withKafkaClient(runtime, protocol) { kc =>
publish(kc) ++
S.sleep[IO](3.second) *> // wait for message to be accepted
kc.subscribe(testTopicA, part0, offset(0l)) take (100)
} runLog ) unsafeRunTimed 30.seconds).size shouldBe 100
S.sleep[IO](3.second) >> // wait for message to be accepted
kc.subscribe(testTopicA, part0, offset(0l)).take(100)
}.compile.toVector.unsafeRunTimed(30.seconds).map(_.size) shouldBe Some(100)

}

Expand All @@ -71,13 +72,13 @@ class KafkaClientPublishSpec extends Fs2KafkaRuntimeSpec {
} map (Left(_))
}

(((withKafkaClient(runtime, protocol) { kc =>
withKafkaClient(runtime, protocol) { kc =>
publish(kc) ++
(kc.subscribe(testTopicA, part0, offset(0l)) map (Right(_)))
} take 110 ) runLog ) unsafeRunTimed 30.seconds) shouldBe
}.take(110).compile.toVector.unsafeRunTimed(30.seconds) shouldBe Some {
(for { idx <- 0 until 10} yield Left(offset(idx*10))).toVector ++
(for { idx <- 0 until 100} yield Right(TopicMessage(offset(idx), ByteVector(idx % 10), ByteVector(idx / 10), offset(100)))).toVector

}
}


Expand All @@ -88,11 +89,11 @@ class KafkaClientPublishSpec extends Fs2KafkaRuntimeSpec {
} drain
}

((withKafkaClient(runtime, protocol) { kc =>
withKafkaClient(runtime, protocol) { kc =>
publish(kc) ++
S.sleep[IO](3.second) *> // wait for message to be accepted
S.sleep[IO](3.second) >> // wait for message to be accepted
kc.subscribe(testTopicA, part0, offset(0l)).take(100)
} runLog ) unsafeRunTimed 30.seconds).size shouldBe 100
}.compile.toVector.unsafeRunTimed(30.seconds).map(_.size) shouldBe Some(100)

}

Expand All @@ -104,13 +105,13 @@ class KafkaClientPublishSpec extends Fs2KafkaRuntimeSpec {
} map (Left(_))
}

((withKafkaClient(runtime, protocol) { kc =>
withKafkaClient(runtime, protocol) { kc =>
publish(kc) ++
((kc.subscribe(testTopicA, part0, offset(0l)) map (Right(_))) take 100)
} runLog ) unsafeRunTimed 30.seconds) shouldBe
}.compile.toVector.unsafeRunTimed(30.seconds) shouldBe Some {
(for { idx <- 0 until 10} yield Left(offset(idx*10))).toVector ++
(for { idx <- 0 until 100} yield Right(TopicMessage(offset(idx), ByteVector(idx % 10), ByteVector(idx / 10), offset(100)))).toVector

(for { idx <- 0 until 100} yield Right(TopicMessage(offset(idx), ByteVector(idx % 10), ByteVector(idx / 10), offset(100)))).toVector
}
}


Expand All @@ -121,13 +122,13 @@ class KafkaClientPublishSpec extends Fs2KafkaRuntimeSpec {
} map (Left(_))
}

((withKafkaClient(runtime, protocol) { kc =>
withKafkaClient(runtime, protocol) { kc =>
publish(kc) ++
((kc.subscribe(testTopicA, part0, offset(5l)) map (Right(_))) take 95)
} runLog ) unsafeRunTimed 30.seconds) shouldBe
((for { idx <- 0 until 10} yield Left(offset(idx*10))).toVector ++
(for { idx <- 0 until 100} yield Right(TopicMessage(offset(idx), ByteVector(idx % 10), ByteVector(idx / 10), offset(100)))).drop(5).toVector)

}.compile.toVector.unsafeRunTimed(30.seconds) shouldBe Some {
(for { idx <- 0 until 10} yield Left(offset(idx*10))).toVector ++
(for { idx <- 0 until 100} yield Right(TopicMessage(offset(idx), ByteVector(idx % 10), ByteVector(idx / 10), offset(100)))).drop(5).toVector
}
}


Expand All @@ -138,11 +139,11 @@ class KafkaClientPublishSpec extends Fs2KafkaRuntimeSpec {
} drain
}

((withKafkaClient(runtime, protocol) { kc =>
withKafkaClient(runtime, protocol) { kc =>
publish(kc) ++
S.sleep[IO](3.second) *> // wait for message to be accepted
S.sleep[IO](3.second) >> // wait for message to be accepted
kc.subscribe(testTopicA, part0, offset(0l)).take(100)
} runLog ) unsafeRunTimed 30.seconds).size shouldBe 100
}.compile.toVector.unsafeRunTimed(30.seconds).map(_.size) shouldBe Some(100)

}

Expand All @@ -154,13 +155,13 @@ class KafkaClientPublishSpec extends Fs2KafkaRuntimeSpec {
} map (Left(_))
}

((withKafkaClient(runtime, protocol) { kc =>
withKafkaClient(runtime, protocol) { kc =>
publish(kc) ++
((kc.subscribe(testTopicA, part0, offset(0l)) map (Right(_))) take 100)
} runLog ) unsafeRunTimed 30.seconds ) shouldBe
}.compile.toVector.unsafeRunTimed(30.seconds) shouldBe Some {
(for { idx <- 0 until 10} yield Left(offset(idx*10))).toVector ++
(for { idx <- 0 until 100} yield Right(TopicMessage(offset(idx), ByteVector(idx % 10), ByteVector(idx / 10), offset(100)))).toVector

}
}

}
Expand Down
24 changes: 12 additions & 12 deletions src/test/scala/spinoco/fs2/kafka/KafkaClientSubscribeSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,39 +17,39 @@ class KafkaClientSubscribeSpec extends Fs2KafkaRuntimeSpec {

"subscribe-at-zero" in {

((withKafkaClient(runtime, protocol) { kc =>
Stream.eval(publishNMessages(kc, 0, 20)) *>
withKafkaClient(runtime, protocol) { kc =>
Stream.eval(publishNMessages(kc, 0, 20)) >>
kc.subscribe(testTopicA, part0, offset(0l)).take(10)
} runLog ) unsafeRunTimed 30.seconds) shouldBe Some(generateTopicMessages(0, 10, 20))
}.compile.toVector.unsafeRunTimed(30.seconds) shouldBe Some(generateTopicMessages(0, 10, 20))
}


"subscribe-at-zero-empty" in {
((withKafkaClient(runtime, protocol) { kc =>
Stream(
withKafkaClient(runtime, protocol) { kc =>
Stream[Stream[IO, TopicMessage]](
kc.subscribe(testTopicA, part0, offset(0l))
, S.sleep_[IO](1.second) ++ Stream.eval_(publishNMessages(kc, 0, 20))
).joinUnbounded.take(10)
} runLog) unsafeRunTimed 30.seconds).map { _.map { _.copy(tail = offset(0)) } } shouldBe Some(generateTopicMessages(0, 10, 0))
}.compile.toVector.unsafeRunTimed(30.seconds).map { _.map { _.copy(tail = offset(0)) } } shouldBe Some(generateTopicMessages(0, 10, 0))

}

"subscriber before head" in {
((withKafkaClient(runtime, protocol) { kc =>
Stream(
withKafkaClient(runtime, protocol) { kc =>
Stream[Stream[IO, TopicMessage]](
kc.subscribe(testTopicA, part0, offset(-1l))
, S.sleep_[IO](1.second) ++ Stream.eval_(publishNMessages(kc, 0, 20))
).joinUnbounded.take(10)
} runLog) unsafeRunTimed 30.seconds).map { _.map { _.copy(tail = offset(0)) } } shouldBe Some(generateTopicMessages(0, 10, 0))
}.compile.toVector.unsafeRunTimed(30.seconds).map { _.map { _.copy(tail = offset(0)) } } shouldBe Some(generateTopicMessages(0, 10, 0))
}

"subscriber after head" in {
((withKafkaClient(runtime, protocol) { kc =>
Stream(
withKafkaClient(runtime, protocol) { kc =>
Stream[Stream[IO, TopicMessage]](
Stream.eval_(publishNMessages(kc, 0, 20)) ++ kc.subscribe(testTopicA, part0, TailOffset)
, S.sleep_[IO](1.second) ++ Stream.eval_(publishNMessages(kc, 20, 40))
).joinUnbounded.take(10)
} runLog) unsafeRunTimed 30.seconds).map { _.map { _.copy(tail = offset(0)) }} shouldBe Some(generateTopicMessages(20, 30, 0))
}.compile.toVector.unsafeRunTimed(30.seconds).map { _.map { _.copy(tail = offset(0)) }} shouldBe Some(generateTopicMessages(20, 30, 0))

}

Expand Down
10 changes: 5 additions & 5 deletions src/test/scala/spinoco/fs2/kafka/KafkaClusterPublish.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,16 @@ class KafkaClusterPublish extends Fs2KafkaRuntimeSpec {
} map (Left(_))
}

((withKafkaCluster(runtime) flatMap { nodes =>
S.sleep[IO](3.second) *>
Stream.eval(createKafkaTopic(nodes.broker1DockerId, testTopicA)) *> {
withKafkaCluster(runtime).flatMap { nodes =>
S.sleep[IO](3.second) >>
Stream.eval(createKafkaTopic(nodes.broker1DockerId, testTopicA)) >> {
KafkaClient[IO](Set(localBroker1_9092), protocol, "test-client") flatMap { kc =>
awaitLeaderAvailable(kc, testTopicA, part0) *>
awaitLeaderAvailable(kc, testTopicA, part0) >>
publish(kc) ++
(kc.subscribe(testTopicA, part0, offset(0l)) map (Right(_)))
} take 20
}
} runLog ) unsafeRunTimed 100.seconds) shouldBe Some(
}.compile.toVector.unsafeRunTimed(100.seconds) shouldBe Some(
(for { idx <- 0 until 10} yield Left(offset(idx))).toVector ++
(for { idx <- 0 until 10} yield Right(TopicMessage(offset(idx), ByteVector(1), ByteVector(idx), offset(10)))).toVector
)
Expand Down
Loading

0 comments on commit b295ee4

Please sign in to comment.