@@ -8,22 +8,14 @@ import kafka.admin.AdminUtils
8
8
import kafka .server .{KafkaConfig , KafkaServer }
9
9
import kafka .utils .ZkUtils
10
10
import org .apache .kafka .clients .consumer .KafkaConsumer
11
- import org .apache .kafka .clients .producer .{
12
- KafkaProducer ,
13
- ProducerConfig ,
14
- ProducerRecord
15
- }
11
+ import org .apache .kafka .clients .producer .{KafkaProducer , ProducerConfig , ProducerRecord }
16
12
import org .apache .kafka .common .KafkaException
17
- import org .apache .kafka .common .serialization .{
18
- Deserializer ,
19
- Serializer ,
20
- StringDeserializer ,
21
- StringSerializer
22
- }
13
+ import org .apache .kafka .common .serialization .{Deserializer , Serializer , StringDeserializer , StringSerializer }
23
14
import org .apache .zookeeper .server .{ServerCnxnFactory , ZooKeeperServer }
24
15
import org .scalatest .Suite
25
16
26
17
import scala .collection .JavaConversions .mapAsJavaMap
18
+ import scala .collection .mutable
27
19
import scala .concurrent .duration ._
28
20
import scala .concurrent .{ExecutionContext , TimeoutException }
29
21
import scala .language .{higherKinds , postfixOps }
@@ -37,40 +29,67 @@ object EmbeddedKafka extends EmbeddedKafkaSupport {
37
29
38
30
private [this ] var factory : Option [ServerCnxnFactory ] = None
39
31
private [this ] var broker : Option [KafkaServer ] = None
32
+ private [this ] val logsDirs = mutable.Buffer .empty[Directory ]
40
33
41
34
/**
42
- * Starts a ZooKeeper instance and a Kafka broker in memory.
35
+ * Starts a ZooKeeper instance and a Kafka broker in memory, using temporary directories for storing logs.
36
+ * The log directories will be cleaned after calling the [[stop() ]] method or on JVM exit, whichever happens earlier.
43
37
*
44
38
* @param config an implicit [[EmbeddedKafkaConfig ]]
45
39
*/
46
40
def start ()(implicit config : EmbeddedKafkaConfig ): Unit = {
47
- factory = Option (startZooKeeper(config.zooKeeperPort))
48
- broker = Option (startKafka(config))
41
+ val zkLogsDir = Directory .makeTemp(" zookeeper-logs" )
42
+ val kafkaLogsDir = Directory .makeTemp(" kafka-logs" )
43
+
44
+ factory = Option (startZooKeeper(config.zooKeeperPort, zkLogsDir))
45
+ broker = Option (startKafka(config, kafkaLogsDir))
46
+
47
+ logsDirs ++= Seq (zkLogsDir, kafkaLogsDir)
49
48
}
50
49
50
+ /**
51
+ * Starts a Zookeeper instance in memory, storing logs in a specific location.
52
+ *
53
+ * @param zkLogsDir the path for the Zookeeper logs
54
+ * @param config an implicit [[EmbeddedKafkaConfig ]]
55
+ */
51
56
def startZooKeeper (zkLogsDir : Directory )(
52
57
implicit config : EmbeddedKafkaConfig ): Unit = {
53
58
factory = Option (startZooKeeper(config.zooKeeperPort, zkLogsDir))
54
59
}
55
60
61
+ /**
62
+ * Starts a Kafka broker in memory, storing logs in a specific location.
63
+ *
64
+ * @param kafkaLogDir the path for the Kafka logs
65
+ * @param config an implicit [[EmbeddedKafkaConfig ]]
66
+ */
56
67
def startKafka (kafkaLogDir : Directory )(
57
68
implicit config : EmbeddedKafkaConfig ): Unit = {
58
69
broker = Option (startKafka(config, kafkaLogDir))
59
70
}
60
71
61
72
/**
62
- * Stops the in memory ZooKeeper instance and Kafka broker.
73
+ * Stops the in memory ZooKeeper instance and Kafka broker, and deletes the log directories .
63
74
*/
64
75
def stop (): Unit = {
65
76
stopKafka()
66
77
stopZooKeeper()
78
+ logsDirs.foreach(_.deleteRecursively())
79
+ logsDirs.clear()
67
80
}
68
81
82
+ /**
83
+ * Stops the in memory Zookeeper instance, preserving the logs directory.
84
+ */
69
85
def stopZooKeeper (): Unit = {
70
86
factory.foreach(_.shutdown())
71
87
factory = None
72
88
}
73
89
90
+ /**
91
+ * Stops the in memory Kafka instance, preserving the logs directory.
92
+ */
74
93
def stopKafka (): Unit = {
75
94
broker.foreach { b =>
76
95
b.shutdown()
@@ -102,15 +121,23 @@ sealed trait EmbeddedKafkaSupport {
102
121
*/
103
122
def withRunningKafka (body : => Any )(implicit config : EmbeddedKafkaConfig ): Any = {
104
123
105
- val factory = startZooKeeper(config.zooKeeperPort)
106
- val broker = startKafka(config)
124
+ def cleanLogs (directories : Directory * ): Unit = {
125
+ directories.foreach(_.deleteRecursively())
126
+ }
127
+
128
+ val zkLogsDir = Directory .makeTemp(" zookeeper-logs" )
129
+ val kafkaLogsDir = Directory .makeTemp(" kafka" )
130
+
131
+ val factory = startZooKeeper(config.zooKeeperPort, zkLogsDir)
132
+ val broker = startKafka(config, kafkaLogsDir)
107
133
108
134
try {
109
135
body
110
136
} finally {
111
137
broker.shutdown()
112
138
broker.awaitShutdown()
113
139
factory.shutdown()
140
+ cleanLogs(zkLogsDir, kafkaLogsDir)
114
141
}
115
142
}
116
143
@@ -258,8 +285,7 @@ sealed trait EmbeddedKafkaSupport {
258
285
}
259
286
260
287
def startZooKeeper (zooKeeperPort : Int ,
261
- zkLogsDir : Directory = Directory .makeTemp(
262
- " zookeeper-logs" )): ServerCnxnFactory = {
288
+ zkLogsDir : Directory ): ServerCnxnFactory = {
263
289
val tickTime = 2000
264
290
265
291
val zkServer = new ZooKeeperServer (zkLogsDir.toFile.jfile,
@@ -272,9 +298,8 @@ sealed trait EmbeddedKafkaSupport {
272
298
factory
273
299
}
274
300
275
- def startKafka (
276
- config : EmbeddedKafkaConfig ,
277
- kafkaLogDir : Directory = Directory .makeTemp(" kafka" )): KafkaServer = {
301
+ def startKafka (config : EmbeddedKafkaConfig ,
302
+ kafkaLogDir : Directory ): KafkaServer = {
278
303
val zkAddress = s " localhost: ${config.zooKeeperPort}"
279
304
280
305
val properties : Properties = new Properties
0 commit comments