Skip to content

Commit

Permalink
Ported to fs2 0.10
Browse files Browse the repository at this point in the history
  • Loading branch information
pchlupacek committed Nov 12, 2017
1 parent 24fae2d commit 0102d30
Show file tree
Hide file tree
Showing 20 changed files with 361 additions and 326 deletions.
5 changes: 2 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ lazy val commonSettings = Seq(
libraryDependencies ++= Seq(
"org.scalatest" %% "scalatest" % "3.0.0" % "test"
, "org.scalacheck" %% "scalacheck" % "1.13.4" % "test"
, "co.fs2" %% "fs2-core" % "0.9.7"
, "co.fs2" %% "fs2-io" % "0.9.7"

, "co.fs2" %% "fs2-core" % "0.10.0-M8"
, "co.fs2" %% "fs2-io" % "0.10.0-M8"
, "com.spinoco" %% "protocol-kafka" % "0.2.1"

),
Expand Down
144 changes: 75 additions & 69 deletions src/main/scala/spinoco/fs2/kafka/KafkaClient.scala

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions src/main/scala/spinoco/fs2/kafka/Logger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package spinoco.fs2.kafka

import java.util.logging.LogRecord

import cats.effect.Sync
import fs2._
import fs2.util.Async
import spinoco.fs2.kafka.Logger.Level

/**
Expand Down Expand Up @@ -40,7 +40,7 @@ object Logger {
}


def JDKLogger[F[_]](jdkLogger:java.util.logging.Logger)(implicit F: Async[F]):F[Logger[F]] = F.delay {
def JDKLogger[F[_]](jdkLogger:java.util.logging.Logger)(implicit F: Sync[F]):F[Logger[F]] = F.delay {
new Logger[F] {
def log(level: Logger.Level.Value, msg: => String, throwable: Throwable): F[Unit] = F.delay {
val jdkLevel =
Expand Down
6 changes: 4 additions & 2 deletions src/main/scala/spinoco/fs2/kafka/kafka.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@ package spinoco.fs2

import java.nio.channels.AsynchronousChannelGroup

import cats.effect.Effect
import fs2._
import fs2.util.Async
import scodec.bits.ByteVector
import shapeless.tag
import shapeless.tag._
import spinoco.fs2.kafka.network.BrokerAddress
import spinoco.protocol.kafka.{Offset, PartitionId, ProtocolVersion, TopicName}

import scala.concurrent.ExecutionContext


package object kafka {

Expand Down Expand Up @@ -52,7 +54,7 @@ package object kafka {
ensemble: Set[BrokerAddress]
, protocol: ProtocolVersion.Value
, clientName: String
)(implicit AG: AsynchronousChannelGroup, F:Async[F], S: Scheduler, L: Logger[F]):Stream[F,KafkaClient[F]] =
)(implicit AG: AsynchronousChannelGroup, EC: ExecutionContext, F: Effect[F], S: Scheduler, L: Logger[F]):Stream[F,KafkaClient[F]] =
KafkaClient(ensemble, protocol, clientName)


Expand Down
6 changes: 3 additions & 3 deletions src/main/scala/spinoco/fs2/kafka/network/BrokerAddress.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package spinoco.fs2.kafka.network

import java.net.InetSocketAddress

import fs2.util.Effect
import cats.effect.Sync


/**
Expand All @@ -15,8 +15,8 @@ case class BrokerAddress(
, port: Int
) { self =>

def toInetSocketAddress[F[_]](implicit F: Effect[F]): F[InetSocketAddress] =
F.delay { new InetSocketAddress(self.host, self.port) }
def toInetSocketAddress[F[_]](implicit F: Sync[F]): F[InetSocketAddress] =
F.catchNonFatal { new InetSocketAddress(self.host, self.port) }

}

Expand Down
94 changes: 54 additions & 40 deletions src/main/scala/spinoco/fs2/kafka/network/BrokerConnection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,19 @@ package spinoco.fs2.kafka.network
import java.net.InetSocketAddress
import java.nio.channels.AsynchronousChannelGroup

import cats.Monad
import cats.effect.Effect
import fs2._
import Stream._
import fs2.util.Async.Change
import fs2.util._
import fs2.Stream._
import fs2.async.Ref
import fs2.async.Ref.Change
import scodec.bits.ByteVector
import spinoco.protocol.kafka.Request.{ProduceRequest, RequiredAcks}
import spinoco.protocol.kafka.codec.MessageCodec
import spinoco.protocol.kafka.{ApiKey, RequestMessage, ResponseMessage}

import scala.annotation.tailrec
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._


Expand Down Expand Up @@ -43,10 +46,10 @@ object BrokerConnection {
address: InetSocketAddress
, writeTimeout: Option[FiniteDuration] = None
, readMaxChunkSize: Int = 256 * 1024 // 256 Kilobytes
)(implicit AG:AsynchronousChannelGroup, F: Async[F]): Pipe[F, RequestMessage, ResponseMessage] = {
)(implicit AG:AsynchronousChannelGroup, EC: ExecutionContext, F: Effect[F]): Pipe[F, RequestMessage, ResponseMessage] = {
(source: Stream[F,RequestMessage]) =>
fs2.io.tcp.client(address).flatMap { socket =>
eval(F.refOf(Map.empty[Int,RequestMessage])).flatMap { openRequests =>
eval(async.refOf(Map.empty[Int,RequestMessage])).flatMap { openRequests =>
val send = source.through(impl.sendMessages(
openRequests = openRequests
, sendOne = (x) => socket.write(x, writeTimeout)
Expand Down Expand Up @@ -76,7 +79,7 @@ object BrokerConnection {
* @return
*/
def sendMessages[F[_]](
openRequests: Async.Ref[F,Map[Int,RequestMessage]]
openRequests: Ref[F,Map[Int,RequestMessage]]
, sendOne: Chunk[Byte] => F[Unit]
)(implicit F: Monad[F]):Sink[F,RequestMessage] = {
_.evalMap { rm =>
Expand All @@ -97,7 +100,7 @@ object BrokerConnection {


def receiveMessages[F[_]](
openRequests: Async.Ref[F,Map[Int,RequestMessage]]
openRequests: Ref[F,Map[Int,RequestMessage]]
):Pipe[F,Byte,ResponseMessage] = {
_.through(receiveChunks)
.through(decodeReceived(openRequests))
Expand All @@ -114,46 +117,57 @@ object BrokerConnection {
*
* @return
*/
def receiveChunks[F[_]]:Pipe[F,Byte,ByteVector] = {
def go(acc:ByteVector,msgSz:Option[Int]): Handle[F,Byte] => Pull[F,ByteVector,Unit] = {
_.receive { case (ch,h) =>
val bytes = ch.toBytes
val bv = ByteVector.view(bytes.values).drop(bytes.offset).take(bytes.size)

val (acc0, rem0, out) = collectChunks( Vector.empty, msgSz, acc ++ bv)
Pull.outputs(out) >> go(acc0,rem0)(h)
def receiveChunks[F[_]]: Pipe[F,Byte,ByteVector] = {

def go(acc: ByteVector, msgSz: Option[Int], s: Stream[F, Byte]): Pull[F, ByteVector, Unit] = {
s.pull.unconsChunk flatMap {
case Some((ch, tail)) =>
val bs = ch.toBytes
val buff = acc ++ ByteVector.view(bs.values, bs.offset, bs.size)
val (rem, sz, out) = collectChunks(buff, msgSz)

Pull.segment(out) *> go(rem, sz, tail)

case None =>
if (acc.nonEmpty) Pull.fail(new Throwable(s"Input terminated before all data were consumed. Buff: $acc"))
else Pull.done
}
}
_.pull(go(ByteVector.empty,None))

s => go(ByteVector.empty, None, s).stream
}


/**
* Collects chunk of messages received from the broker.
* If any message size is larger then `maxMessageSize` this will fail w/o emitting single value
* returns remainder to be processed when more data are available, Size of required chunk, and resulting stream.
* Collects chunks of messages received.
* Each chunk is forming whole message, that means this looks for the first 4 bytes, that indicates message size,
* then this take up to that size to produce single ByteVector of message content, and emits that
* content it term of Segment. Note that Segment may be empty or may contain multiple characters
*/
@tailrec
def collectChunks[F[_]](
acc:Vector[ByteVector]

def collectChunks(
in: ByteVector
, msgSz:Option[Int]
, received: ByteVector
):(ByteVector,Option[Int],Stream[F,ByteVector]) = {
msgSz match {
case None =>
if (received.size < 4) (received, None, Stream.emits(acc))
else {
val sz = received.take(4).toInt()
collectChunks[F](acc,Some(sz),received.drop(4))
}
):(ByteVector, Option[Int], Segment[ByteVector, Unit]) = {
@tailrec
def go(buff: ByteVector, currSz: Option[Int], acc: Vector[ByteVector]): (ByteVector, Option[Int], Segment[ByteVector, Unit]) = {
currSz match {
case None =>
if (buff.size < 4) (buff, None, Segment.indexedSeq(acc))
else {
val (sz, rem) = buff.splitAt(4)
go(rem, Some(sz.toInt()), acc)
}

case Some(sz) =>
val (chunk, rest) = received.splitAt(sz)
if (rest.nonEmpty) collectChunks[F](acc :+ chunk, None, rest)
else {
if (chunk.size.toInt == sz) (rest,None,Stream.emits(acc :+ chunk))
else (chunk, Some(sz), Stream.emits(acc))
}
case Some(sz) =>
if (buff.size < sz) (buff, Some(sz), Segment.indexedSeq(acc))
else {
val (h,t) = buff.splitAt(sz)
go(t, None, acc :+ h)
}
}
}
go(in, msgSz, Vector.empty)
}


Expand All @@ -172,13 +186,13 @@ object BrokerConnection {
* @return
*/
def decodeReceived[F[_]](
openRequests: Async.Ref[F,Map[Int,RequestMessage]]
openRequests: Ref[F,Map[Int,RequestMessage]]
):Pipe[F,ByteVector,ResponseMessage] = {
_.flatMap { bs =>
if (bs.size < 4) Stream.fail(new Throwable(s"Message chunk does not have correlation id included: $bs"))
else {
val correlationId = bs.take(4).toInt()
eval(openRequests.modify{ _ - correlationId}).flatMap { case Change(m,_) =>
eval(openRequests.modify{ _ - correlationId}).flatMap { case Change(m, _) =>
m.get(correlationId) match {
case None => Stream.fail(new Throwable(s"Received message correlationId for message that does not exists: $correlationId : $bs : $m"))
case Some(req) =>
Expand Down
49 changes: 25 additions & 24 deletions src/test/scala/spinoco/fs2/kafka/DockerSupport.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package spinoco.fs2.kafka

import cats.effect.{Effect, IO}
import cats.syntax.all._

import fs2._
import fs2.Task
import fs2.util.Async
import fs2.util.syntax._
import shapeless.tag
import shapeless.tag.@@

import scala.concurrent.ExecutionContext
import scala.sys.process.{Process, ProcessLogger}

/**
Expand All @@ -17,7 +18,7 @@ object DockerSupport {
sealed trait DockerId

/** Returns version of docker, if that docker is available. **/
def dockerVersion:Task[Option[String]] = Task.delay {
def dockerVersion:IO[Option[String]] = IO {
val output = Process("docker -v").!!
for {
m <- ExtractVersion.findAllMatchIn(output).toList.headOption
Expand All @@ -32,7 +33,7 @@ object DockerSupport {
* @param imageName name of the image inclusive the tag
* @return
*/
def installImageWhenNeeded(imageName:String):Task[Boolean] = Task.delay {
def installImageWhenNeeded(imageName:String):IO[Boolean] = IO {
val current:String= Process(s"docker images $imageName -q").!!
if (current.lines.isEmpty) {
Process(s"docker pull $imageName").!!
Expand All @@ -47,7 +48,7 @@ object DockerSupport {
* @param props Parameters, props to docker `run` command
* @return
*/
def runImage(imageName: String, name: Option[String])(props: String*):Task[String @@ DockerId] = Task.delay {
def runImage(imageName: String, name: Option[String])(props: String*):IO[String @@ DockerId] = IO {
val cmd = s"docker run -d ${ name.map(n => s"--name=$n").mkString } ${props.mkString(" ")} $imageName"
tag[DockerId](Process(cmd).!!.trim)
}
Expand All @@ -57,16 +58,16 @@ object DockerSupport {
* @param imageId Id of image to follow
* @return
*/
def followImageLog(imageId:String @@ DockerId)(implicit F:Async[Task]):Stream[Task,String] = {
def followImageLog(imageId:String @@ DockerId)(implicit F: Effect[IO], ec: ExecutionContext): Stream[IO,String] = {
Stream.eval(async.semaphore(1)) flatMap { semaphore =>
Stream.eval(Async.refOf(false)) flatMap { isDone =>
Stream.eval(async.unboundedQueue[Task,String]).flatMap { q =>
Stream.eval(async.refOf(false)) flatMap { isDone =>
Stream.eval(async.unboundedQueue[IO,String]).flatMap { q =>

def enqueue(s: String): Unit = {
semaphore.increment >>
isDone.get.flatMap { done => if (!done) q.enqueue1(s) else Task.now(()) } >>
semaphore.increment *>
isDone.get.flatMap { done => if (!done) q.enqueue1(s) else IO.unit } *>
semaphore.decrement
} unsafeRun
} unsafeRunSync

val logger = new ProcessLogger {
def buffer[T](f: => T): T = f
Expand All @@ -75,52 +76,52 @@ object DockerSupport {
def err(s: => String): Unit = enqueue(s)
}

Stream.bracket(Task.delay(Process(s"docker logs -f $imageId").run(logger)))(
Stream.bracket(IO(Process(s"docker logs -f $imageId").run(logger)))(
_ => q.dequeue
, p => semaphore.increment >> isDone.modify(_ => true) >> Task.delay(p.destroy()) >> semaphore.decrement
, p => semaphore.increment *> isDone.modify(_ => true) *> IO(p.destroy()) *> semaphore.decrement
)
}}}
}

def runningImages: Task[Set[String @@ DockerId]] = Task.delay {
def runningImages: IO[Set[String @@ DockerId]] = IO {
Process(s"docker ps -q").!!.lines.filter(_.trim.nonEmpty).map(tag[DockerId](_)).toSet
}

def availableImages: Task[Set[String @@ DockerId]] = Task.delay {
def availableImages: IO[Set[String @@ DockerId]] = IO {
Process(s"docker ps -aq").!!.lines.filter(_.trim.nonEmpty).map(tag[DockerId](_)).toSet
}


/**
* Issues a kill to image with given id
*/
def killImage(imageId: String @@ DockerId):Task[Unit] = {
Task.delay { Process(s"docker kill $imageId").!! } >>
def killImage(imageId: String @@ DockerId):IO[Unit] = {
IO { Process(s"docker kill $imageId").!! } *>
runningImages.flatMap { allRun =>
if (allRun.exists(imageId.startsWith)) killImage(imageId)
else Task.now(())
else IO.pure(())
}
}

/**
* Cleans supplied image from the docker
*/
def cleanImage(imageId: String @@ DockerId):Task[Unit] = {
Task.delay { Process(s"docker rm $imageId").!! } >>
def cleanImage(imageId: String @@ DockerId):IO[Unit] = {
IO { Process(s"docker rm $imageId").!! } *>
availableImages.flatMap { allAvail =>
if (allAvail.exists(imageId.startsWith)) cleanImage(imageId)
else Task.now(())
else IO.pure(())
}
}


def createNetwork(name: String, ipSubnet:String = "172.30.0.0/16 "): Task[Unit] = Task.delay {
def createNetwork(name: String, ipSubnet:String = "172.30.0.0/16 "): IO[Unit] = IO {
Process(s"""docker network create --subnet $ipSubnet $name""").!!
()
}


def removeNetwork(name: String): Task[Unit] = Task.delay {
def removeNetwork(name: String): IO[Unit] = IO {
Process(s"""docker network rm $name""").!!
()
}
Expand Down
Loading

0 comments on commit 0102d30

Please sign in to comment.