Skip to content

Clear log directories on EmbeddedKafka.stop() or end of withRunningKafka method. #59

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 11, 2016
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,14 @@ import kafka.admin.AdminUtils
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.ZkUtils
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.{
KafkaProducer,
ProducerConfig,
ProducerRecord
}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.serialization.{
Deserializer,
Serializer,
StringDeserializer,
StringSerializer
}
import org.apache.kafka.common.serialization.{Deserializer, Serializer, StringDeserializer, StringSerializer}
import org.apache.zookeeper.server.{ServerCnxnFactory, ZooKeeperServer}
import org.scalatest.Suite

import scala.collection.JavaConversions.mapAsJavaMap
import scala.collection.mutable
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, TimeoutException}
import scala.language.{higherKinds, postfixOps}
Expand All @@ -37,40 +29,67 @@ object EmbeddedKafka extends EmbeddedKafkaSupport {

private[this] var factory: Option[ServerCnxnFactory] = None
private[this] var broker: Option[KafkaServer] = None
private[this] val logsDirs = mutable.Buffer.empty[Directory]

/**
* Starts a ZooKeeper instance and a Kafka broker in memory.
* Starts a ZooKeeper instance and a Kafka broker in memory, using temporary directories for storing logs.
* The log directories will be cleaned after calling the [[stop()]] method or on JVM exit, whichever happens earlier.
*
* @param config an implicit [[EmbeddedKafkaConfig]]
*/
def start()(implicit config: EmbeddedKafkaConfig): Unit = {
factory = Option(startZooKeeper(config.zooKeeperPort))
broker = Option(startKafka(config))
val zkLogsDir = Directory.makeTemp("zookeeper-logs")
val kafkaLogsDir = Directory.makeTemp("kafka-logs")

factory = Option(startZooKeeper(config.zooKeeperPort, zkLogsDir))
broker = Option(startKafka(config, kafkaLogsDir))

logsDirs ++= Seq(zkLogsDir, kafkaLogsDir)
}

/**
* Starts a Zookeeper instance in memory, storing logs in a specific location.
*
* @param zkLogsDir the path for the Zookeeper logs
* @param config an implicit [[EmbeddedKafkaConfig]]
*/
def startZooKeeper(zkLogsDir: Directory)(
implicit config: EmbeddedKafkaConfig): Unit = {
factory = Option(startZooKeeper(config.zooKeeperPort, zkLogsDir))
}

/**
* Starts a Kafka broker in memory, storing logs in a specific location.
*
* @param kafkaLogDir the path for the Kafka logs
* @param config an implicit [[EmbeddedKafkaConfig]]
*/
def startKafka(kafkaLogDir: Directory)(
implicit config: EmbeddedKafkaConfig): Unit = {
broker = Option(startKafka(config, kafkaLogDir))
}

/**
* Stops the in memory ZooKeeper instance and Kafka broker.
* Stops the in memory ZooKeeper instance and Kafka broker, and deletes the log directories.
*/
def stop(): Unit = {
stopKafka()
stopZooKeeper()
logsDirs.foreach(_.deleteRecursively())
logsDirs.clear()
}

/**
* Stops the in memory Zookeeper instance, preserving the logs directory.
*/
def stopZooKeeper(): Unit = {
factory.foreach(_.shutdown())
factory = None
}

/**
* Stops the in memory Kafka instance, preserving the logs directory.
*/
def stopKafka(): Unit = {
broker.foreach { b =>
b.shutdown()
Expand Down Expand Up @@ -102,15 +121,23 @@ sealed trait EmbeddedKafkaSupport {
*/
def withRunningKafka(body: => Any)(implicit config: EmbeddedKafkaConfig): Any = {

val factory = startZooKeeper(config.zooKeeperPort)
val broker = startKafka(config)
def cleanLogs(directories: Directory*): Unit = {
directories.foreach(_.deleteRecursively())
}

val zkLogsDir = Directory.makeTemp("zookeeper-logs")
val kafkaLogsDir = Directory.makeTemp("kafka")

val factory = startZooKeeper(config.zooKeeperPort, zkLogsDir)
val broker = startKafka(config, kafkaLogsDir)

try {
body
} finally {
broker.shutdown()
broker.awaitShutdown()
factory.shutdown()
cleanLogs(zkLogsDir, kafkaLogsDir)
}
}

Expand Down Expand Up @@ -258,8 +285,7 @@ sealed trait EmbeddedKafkaSupport {
}

def startZooKeeper(zooKeeperPort: Int,
zkLogsDir: Directory = Directory.makeTemp(
"zookeeper-logs")): ServerCnxnFactory = {
zkLogsDir: Directory): ServerCnxnFactory = {
val tickTime = 2000

val zkServer = new ZooKeeperServer(zkLogsDir.toFile.jfile,
Expand All @@ -272,9 +298,8 @@ sealed trait EmbeddedKafkaSupport {
factory
}

def startKafka(
config: EmbeddedKafkaConfig,
kafkaLogDir: Directory = Directory.makeTemp("kafka")): KafkaServer = {
def startKafka(config: EmbeddedKafkaConfig,
kafkaLogDir: Directory): KafkaServer = {
val zkAddress = s"localhost:${config.zooKeeperPort}"

val properties: Properties = new Properties
Expand Down