Skip to content
This repository was archived by the owner on May 25, 2023. It is now read-only.

Issue 33 : Added additional constructor for passing an instance of StreamBuilder #34

Merged
merged 5 commits into from
Jan 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,78 +1,74 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/

package com.lightbend.kafka.scala.streams

import java.util.regex.Pattern

import ImplicitConversions._
import org.apache.kafka.streams.kstream.{ GlobalKTable, Materialized }
import com.lightbend.kafka.scala.streams.ImplicitConversions._
import org.apache.kafka.common.utils.Bytes
import org.apache.kafka.streams.kstream.{GlobalKTable, Materialized}
import org.apache.kafka.streams.processor.{ProcessorSupplier, StateStore}
import org.apache.kafka.streams.state.{ StoreBuilder, KeyValueStore }
import org.apache.kafka.streams.state.{KeyValueStore, StoreBuilder}
import org.apache.kafka.streams.{Consumed, StreamsBuilder, Topology}
import org.apache.kafka.common.utils.Bytes

import scala.collection.JavaConverters._

/**
* Wraps the Java class StreamsBuilder and delegates method calls to the underlying Java object.
*/
class StreamsBuilderS {

val inner = new StreamsBuilder
* Wraps the Java class StreamsBuilder and delegates method calls to the underlying Java object.
*/
class StreamsBuilderS(inner: StreamsBuilder = new StreamsBuilder) {

def stream[K, V](topic: String) : KStreamS[K, V] =
inner.stream[K, V](topic)
def stream[K, V](topic: String): KStreamS[K, V] =
inner.stream[K, V](topic)

def stream[K, V](topic: String, consumed: Consumed[K, V]) : KStreamS[K, V] =
inner.stream[K, V](topic, consumed)
def stream[K, V](topic: String, consumed: Consumed[K, V]): KStreamS[K, V] =
inner.stream[K, V](topic, consumed)

def stream[K, V](topics: List[String]): KStreamS[K, V] =
inner.stream[K, V](topics.asJava)
def stream[K, V](topics: List[String]): KStreamS[K, V] =
inner.stream[K, V](topics.asJava)

def stream[K, V](topics: List[String], consumed: Consumed[K, V]): KStreamS[K, V] =
inner.stream[K, V](topics.asJava, consumed)
inner.stream[K, V](topics.asJava, consumed)

def stream[K, V](topicPattern: Pattern) : KStreamS[K, V] =
def stream[K, V](topicPattern: Pattern): KStreamS[K, V] =
inner.stream[K, V](topicPattern)

def stream[K, V](topicPattern: Pattern, consumed: Consumed[K, V]) : KStreamS[K, V] =
def stream[K, V](topicPattern: Pattern, consumed: Consumed[K, V]): KStreamS[K, V] =
inner.stream[K, V](topicPattern, consumed)

def table[K, V](topic: String) : KTableS[K, V] = inner.table[K, V](topic)
def table[K, V](topic: String): KTableS[K, V] = inner.table[K, V](topic)

def table[K, V](topic: String, consumed: Consumed[K, V]) : KTableS[K, V] =
def table[K, V](topic: String, consumed: Consumed[K, V]): KTableS[K, V] =
inner.table[K, V](topic, consumed)

def table[K, V](topic: String, consumed: Consumed[K, V],
materialized: Materialized[K, V, KeyValueStore[Bytes, Array[Byte]]]): KTableS[K, V] =
inner.table[K, V](topic, consumed, materialized)
materialized: Materialized[K, V, KeyValueStore[Bytes, Array[Byte]]]): KTableS[K, V] =
inner.table[K, V](topic, consumed, materialized)

def table[K, V](topic: String,
materialized: Materialized[K, V, KeyValueStore[Bytes, Array[Byte]]]): KTableS[K, V] =
def table[K, V](topic: String,
materialized: Materialized[K, V, KeyValueStore[Bytes, Array[Byte]]]): KTableS[K, V] =
inner.table[K, V](topic, materialized)

def globalTable[K, V](topic: String): GlobalKTable[K, V] =
inner.globalTable(topic)

def globalTable[K, V](topic: String, consumed: Consumed[K, V]) : GlobalKTable[K, V] =
inner.globalTable(topic, consumed)
def globalTable[K, V](topic: String, consumed: Consumed[K, V]): GlobalKTable[K, V] =
inner.globalTable(topic, consumed)

def globalTable[K, V](topic: String, consumed: Consumed[K, V],
materialized: Materialized[K, V, KeyValueStore[Bytes, Array[Byte]]]): GlobalKTable[K, V] =
inner.globalTable(topic, consumed, materialized)
materialized: Materialized[K, V, KeyValueStore[Bytes, Array[Byte]]]): GlobalKTable[K, V] =
inner.globalTable(topic, consumed, materialized)

def globalTable[K, V](topic: String,
materialized: Materialized[K, V, KeyValueStore[Bytes, Array[Byte]]]): GlobalKTable[K, V] =
inner.globalTable(topic, materialized)
def globalTable[K, V](topic: String,
materialized: Materialized[K, V, KeyValueStore[Bytes, Array[Byte]]]): GlobalKTable[K, V] =
inner.globalTable(topic, materialized)

def addStateStore(builder: StoreBuilder[_ <: StateStore]): StreamsBuilder = inner.addStateStore(builder)

def addGlobalStore(storeBuilder: StoreBuilder[_ <: StateStore], topic: String, sourceName: String, consumed: Consumed[_, _], processorName: String, stateUpdateSupplier: ProcessorSupplier[_, _]): StreamsBuilder =
inner.addGlobalStore(storeBuilder,topic,sourceName,consumed,processorName,stateUpdateSupplier)

def build() : Topology = inner.build()
}

inner.addGlobalStore(storeBuilder, topic, sourceName, consumed, processorName, stateUpdateSupplier)

def build(): Topology = inner.build()
}
Original file line number Diff line number Diff line change
@@ -1,23 +1,18 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/

package com.lightbend.kafka.scala.streams

import minitest.TestSuite
import com.lightbend.kafka.scala.server.{ KafkaLocalServer, MessageSender, MessageListener, RecordProcessorTrait }

import java.util.{ Properties, Locale }
import java.util.Properties
import java.util.regex.Pattern

import org.apache.kafka.streams.{ KeyValue, StreamsConfig, KafkaStreams, Consumed }
import org.apache.kafka.streams.kstream.{ Materialized, Produced, KeyValueMapper, Printed }
import org.apache.kafka.common.serialization.{ Serdes, StringSerializer, StringDeserializer, Serde, LongDeserializer }
import com.lightbend.kafka.scala.server.{KafkaLocalServer, MessageListener, MessageSender, RecordProcessorTrait}
import minitest.TestSuite
import org.apache.kafka.clients.consumer.ConsumerRecord

import scala.concurrent.duration._

import ImplicitConversions._
import org.apache.kafka.common.serialization._
import org.apache.kafka.streams.kstream.Produced
import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsConfig}

object KafkaStreamsTest extends TestSuite[KafkaLocalServer] with WordCountTestData {

Expand Down Expand Up @@ -45,19 +40,19 @@ object KafkaStreamsTest extends TestSuite[KafkaLocalServer] with WordCountTestDa
val streamsConfiguration = new Properties()
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, s"wordcount-${scala.util.Random.nextInt(100)}")
streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "wordcountgroup")

streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, localStateDir)

val builder = new StreamsBuilderS
val builder = new StreamsBuilderS()

val textLines = builder.stream[String, String](inputTopic)

val pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS)

val wordCounts: KTableS[String, Long] =
val wordCounts: KTableS[String, Long] =
textLines.flatMapValues(v => pattern.split(v.toLowerCase))
.groupBy((k, v) => v)
.count()
Expand All @@ -70,15 +65,15 @@ object KafkaStreamsTest extends TestSuite[KafkaLocalServer] with WordCountTestDa
//
// Step 2: Produce some input data to the input topic.
//
val sender = MessageSender[String, String](brokers, classOf[StringSerializer].getName, classOf[StringSerializer].getName)
val sender = MessageSender[String, String](brokers, classOf[StringSerializer].getName, classOf[StringSerializer].getName)
val mvals = sender.batchWriteValue(inputTopic, inputValues)

//
// Step 3: Verify the application's output data.
//
val listener = MessageListener(brokers, outputTopic, "wordcountgroup",
classOf[StringDeserializer].getName,
classOf[LongDeserializer].getName,
val listener = MessageListener(brokers, outputTopic, "wordcountgroup",
classOf[StringDeserializer].getName,
classOf[LongDeserializer].getName,
new RecordProcessor
)

Expand All @@ -90,10 +85,11 @@ object KafkaStreamsTest extends TestSuite[KafkaLocalServer] with WordCountTestDa
}

class RecordProcessor extends RecordProcessorTrait[String, Long] {
override def processRecord(record: ConsumerRecord[String, Long]): Unit = {
override def processRecord(record: ConsumerRecord[String, Long]): Unit = {
// println(s"Get Message $record")
}
}

}

trait WordCountTestData {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
* Adapted from Confluent Inc. whose copyright is reproduced below.
*/
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
* Adapted from Confluent Inc. whose copyright is reproduced below.
*/

/*
* Copyright Confluent Inc.
Expand All @@ -22,21 +22,15 @@ package com.lightbend.kafka.scala.streams

import java.util.Properties

import minitest.TestSuite
import com.lightbend.kafka.scala.server.{ KafkaLocalServer, MessageSender, MessageListener, RecordProcessorTrait, Utils }

import com.lightbend.kafka.scala.server.{KafkaLocalServer, MessageListener, MessageSender, RecordProcessorTrait}
import com.lightbend.kafka.scala.streams.ImplicitConversions._
import com.lightbend.kafka.scala.streams.algebird.{CMSStore, CMSStoreBuilder}

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import minitest.TestSuite
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization._
import org.apache.kafka.streams.kstream.{KStream, Produced, Transformer, TransformerSupplier}
import org.apache.kafka.streams.kstream.{Produced, Transformer}
import org.apache.kafka.streams.processor.ProcessorContext
import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsBuilder, StreamsConfig}
import org.apache.kafka.clients.consumer.ConsumerRecord

import collection.JavaConverters._
import ImplicitConversions._
import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsConfig}

/**
* End-to-end integration test that demonstrates how to probabilistically count items in an input stream.
Expand Down Expand Up @@ -71,7 +65,7 @@ trait ProbabilisticCountingScalaIntegrationTestData {
)
}

object ProbabilisticCountingScalaIntegrationTest extends TestSuite[KafkaLocalServer]
object ProbabilisticCountingScalaIntegrationTest extends TestSuite[KafkaLocalServer]
with ProbabilisticCountingScalaIntegrationTestData {

override def setup(): KafkaLocalServer = {
Expand Down Expand Up @@ -103,7 +97,7 @@ object ProbabilisticCountingScalaIntegrationTest extends TestSuite[KafkaLocalSer
p
}

val builder = new StreamsBuilderS
val builder = new StreamsBuilderS()

val cmsStoreName = "cms-store"
val cmsStoreBuilder = {
Expand Down Expand Up @@ -159,15 +153,15 @@ object ProbabilisticCountingScalaIntegrationTest extends TestSuite[KafkaLocalSer
//
// Step 2: Publish some input text lines.
//
val sender = MessageSender[String, String](brokers, classOf[StringSerializer].getName, classOf[StringSerializer].getName)
val sender = MessageSender[String, String](brokers, classOf[StringSerializer].getName, classOf[StringSerializer].getName)
val mvals = sender.batchWriteValue(inputTopic, inputTextLines)

//
// Step 3: Verify the application's output data.
//
val listener = MessageListener(brokers, outputTopic, "probwordcountgroup",
classOf[StringDeserializer].getName,
classOf[LongDeserializer].getName,
val listener = MessageListener(brokers, outputTopic, "probwordcountgroup",
classOf[StringDeserializer].getName,
classOf[LongDeserializer].getName,
new RecordProcessor
)

Expand All @@ -178,8 +172,9 @@ object ProbabilisticCountingScalaIntegrationTest extends TestSuite[KafkaLocalSer
}

class RecordProcessor extends RecordProcessorTrait[String, Long] {
override def processRecord(record: ConsumerRecord[String, Long]): Unit = {
override def processRecord(record: ConsumerRecord[String, Long]): Unit = {
// println(s"Get Message $record")
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ object StreamToTableJoinScalaIntegrationTest extends TestSuite[KafkaLocalServer]
p
}

val builder = new StreamsBuilderS
val builder = new StreamsBuilderS()

val userClicksStream: KStreamS[String, Long] = builder.stream(userClicksTopic, Consumed.`with`(stringSerde, longSerde))

Expand Down