Skip to content

Commit

Permalink
Merge branch '5.2.3-post' into 5.3.0-post
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael G. Noll committed Oct 23, 2019
2 parents 4640b2a + ff209c7 commit d69646b
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,22 @@ package io.confluent.examples.streams
import java.util
import java.util.Properties

import io.confluent.examples.streams.IntegrationTestUtils.NothingSerde
import io.confluent.examples.streams.algebird.{CMSStore, CMSStoreBuilder, ProbabilisticCounter}
import io.confluent.examples.streams.algebird.{CMSStoreBuilder, ProbabilisticCounter}
import org.apache.kafka.common.serialization._
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes._
import org.apache.kafka.streams.scala.StreamsBuilder
import org.apache.kafka.streams.scala.kstream.KStream
import org.apache.kafka.streams.{KeyValue, StreamsConfig, TopologyTestDriver}
import org.apache.kafka.streams.{StreamsConfig, TopologyTestDriver}
import org.apache.kafka.test.TestUtils
import org.assertj.core.api.Assertions.assertThat
import org.junit._
import org.scalatest.junit.AssertionsForJUnit

import _root_.scala.collection.JavaConverters._

/**
* End-to-end integration test that demonstrates how to probabilistically count items in an input stream.
*
* This example uses a custom state store implementation, [[CMSStore]], that is backed by a
* Count-Min Sketch data structure. The algorithm is WordCount.
* This example uses a custom state store implementation, [[io.confluent.examples.streams.algebird.CMSStore]],
* that is backed by a Count-Min Sketch data structure. The algorithm is WordCount.
*/
class ProbabilisticCountingScalaIntegrationTest extends AssertionsForJUnit {

Expand Down Expand Up @@ -66,73 +62,61 @@ class ProbabilisticCountingScalaIntegrationTest extends AssertionsForJUnit {
("summit", 1L)
)

//
// Step 1: Configure and start the processor topology.
//
val streamsConfiguration: Properties = {
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "probabilistic-counting-scala-integration-test")
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy config")
// Use a temporary directory for storing state, which will be automatically removed after the test.
p.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory.getAbsolutePath)
p
}

val builder = new StreamsBuilder
val cmsStoreName = "cms-store"
builder.addStateStore(createCMSStoreBuilder(cmsStoreName))

// Read the input from Kafka.
val textLines: KStream[String, String] = builder.stream[String, String](inputTopic)

// previously: def transform[K1, V1](transformer: Transformer[K, V, (K1, V1)],
val approximateWordCounts: KStream[String, Long] = textLines
.flatMapValues(textLine => textLine.toLowerCase.split("\\W+"))
.transform(() => new ProbabilisticCounter(cmsStoreName), cmsStoreName)

// Write the results back to Kafka.
approximateWordCounts.to(outputTopic)
// Step 1: Create the topology and its configuration
val builder: StreamsBuilder = createTopology()
val streamsConfiguration = createTopologyConfiguration()

val topologyTestDriver = new TopologyTestDriver(builder.build(), streamsConfiguration)

try {
//
// Step 2: Publish some input text lines.
//
IntegrationTestUtils.produceKeyValuesSynchronously(
inputTopic,
inputTextLines.map(v => new KeyValue(null, v)).asJava,
topologyTestDriver,
new NothingSerde[Null],
new StringSerializer
)
// Step 2: Write the input
import IntegrationTestScalaUtils._
IntegrationTestScalaUtils.produceValuesSynchronously(inputTopic, inputTextLines, topologyTestDriver)

//
// Step 3: Verify the application's output data.
//
val actualWordCounts =
IntegrationTestUtils.drainTableOutput(outputTopic, topologyTestDriver, new StringDeserializer, new LongDeserializer)
// Step 3: Validate the output
val actualWordCounts: Map[String, Long] =
IntegrationTestScalaUtils.drainTableOutput[String, Long](outputTopic, topologyTestDriver)

// Note: This example only processes a small amount of input data, for which the word counts
// will actually be exact counts. However, for large amounts of input data we would expect to
// observe approximate counts (where the approximate counts would be >= true exact counts).
assertThat(actualWordCounts).isEqualTo(expectedWordCounts.asJava)
assert(actualWordCounts === expectedWordCounts)
} finally {
topologyTestDriver.close()
}
}

private def createCMSStoreBuilder(cmsStoreName: String): CMSStoreBuilder[String] = {
val changelogConfig: util.HashMap[String, String] = {
val cfg = new java.util.HashMap[String, String]
// The CMSStore's changelog will typically have rather few and small records per partition.
// To improve efficiency we thus set a smaller log segment size than Kafka's default of 1GB.
val segmentSizeBytes = (20 * 1024 * 1024).toString
cfg.put("segment.bytes", segmentSizeBytes)
cfg
def createTopology(): StreamsBuilder = {

def createCMSStoreBuilder(cmsStoreName: String): CMSStoreBuilder[String] = {
val changelogConfig: util.HashMap[String, String] = {
val cfg = new java.util.HashMap[String, String]
// The CMSStore's changelog will typically have rather few and small records per partition.
// To improve efficiency we thus set a smaller log segment size than Kafka's default of 1GB.
val segmentSizeBytes = (20 * 1024 * 1024).toString
cfg.put("segment.bytes", segmentSizeBytes)
cfg
}
new CMSStoreBuilder[String](cmsStoreName, Serdes.String()).withLoggingEnabled(changelogConfig)
}
new CMSStoreBuilder[String](cmsStoreName, Serdes.String())
.withLoggingEnabled(changelogConfig)

val builder = new StreamsBuilder
val cmsStoreName = "cms-store"
builder.addStateStore(createCMSStoreBuilder(cmsStoreName))
val textLines: KStream[String, String] = builder.stream[String, String](inputTopic)
val approximateWordCounts: KStream[String, Long] = textLines
.flatMapValues(textLine => textLine.toLowerCase.split("\\W+"))
.transform(() => new ProbabilisticCounter(cmsStoreName), cmsStoreName)
approximateWordCounts.to(outputTopic)
builder
}

def createTopologyConfiguration(): Properties = {
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "probabilistic-counting-scala-integration-test")
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy config")
// Use a temporary directory for storing state, which will be automatically removed after the test.
p.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory.getAbsolutePath)
p
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.kafka.test.{InternalMockProcessorContext, NoOpRecordCollector,
import org.assertj.core.api.Assertions.assertThat
import org.junit._
import org.scalatest.junit.AssertionsForJUnit
import org.scalatest.mock.MockitoSugar
import org.scalatest.mockito.MockitoSugar

class CMSStoreTest extends AssertionsForJUnit with MockitoSugar {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.kafka.common.serialization.Serde
import org.assertj.core.api.Assertions.assertThat
import org.junit._
import org.scalatest.junit.AssertionsForJUnit
import org.scalatest.mock.MockitoSugar
import org.scalatest.mockito.MockitoSugar

class TopCMSSerdeTest extends AssertionsForJUnit with MockitoSugar {

Expand Down

0 comments on commit d69646b

Please sign in to comment.