Skip to content

Commit

Permalink
use implicit Serdes
Browse files Browse the repository at this point in the history
  • Loading branch information
Seigneurin, Alexis (CONT) authored and Seigneurin, Alexis (CONT) committed May 11, 2017
1 parent 599282f commit c8a4ff0
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,71 +11,39 @@ object KStreamBuilderS {

val inner = new KStreamBuilder

def stream[K, V](topics: String*): KStreamS[K, V] =
inner.stream[K, V](topics: _*)

def stream[K, V](offsetReset: TopologyBuilder.AutoOffsetReset,
topics: String*): KStream[K, V] =
inner.stream[K, V](offsetReset, topics: _*)

def stream[K, V](topicPattern: Pattern): KStreamS[K, V] =
inner.stream[K, V](topicPattern)

def stream[K, V](offsetReset: TopologyBuilder.AutoOffsetReset,
topicPattern: Pattern): KStreamS[K, V] =
inner.stream[K, V](offsetReset, topicPattern)

def stream[K, V](keySerde: Serde[K],
valSerde: Serde[V],
topics: String*): KStreamS[K, V] =
def stream[K, V](topics: String*)
(implicit keySerde: Serde[K], valSerde: Serde[V]): KStreamS[K, V] =
inner.stream[K, V](keySerde, valSerde, topics: _*)

def stream[K, V](offsetReset: TopologyBuilder.AutoOffsetReset,
keySerde: Serde[K],
valSerde: Serde[V],
topics: String*): KStreamS[K, V] =
topics: String*)
(implicit keySerde: Serde[K], valSerde: Serde[V]): KStreamS[K, V] =
inner.stream[K, V](offsetReset, keySerde, valSerde, topics: _*)

def stream[K, V](keySerde: Serde[K],
valSerde: Serde[V],
topicPattern: Pattern): KStreamS[K, V] =
def stream[K, V](topicPattern: Pattern)
(implicit keySerde: Serde[K], valSerde: Serde[V]): KStreamS[K, V] =
inner.stream[K, V](keySerde, valSerde, topicPattern)

def stream[K, V](offsetReset: TopologyBuilder.AutoOffsetReset,
keySerde: Serde[K],
valSerde: Serde[V],
topicPattern: Pattern): KStreamS[K, V] =
topicPattern: Pattern)
(implicit keySerde: Serde[K], valSerde: Serde[V]): KStreamS[K, V] =
inner.stream[K, V](offsetReset, keySerde, valSerde, topicPattern)

def table[K, V](topic: String,
storeName: String): KTableS[K, V] =
inner.table[K, V](topic, storeName)

def table[K, V](offsetReset: TopologyBuilder.AutoOffsetReset,
topic: String,
storeName: String): KTableS[K, V] =
inner.table[K, V](offsetReset, topic, storeName)

def table[K, V](keySerde: Serde[K],
valSerde: Serde[V],
topic: String,
storeName: String): KTableS[K, V] =
storeName: String)
(implicit keySerde: Serde[K], valSerde: Serde[V]): KTableS[K, V] =
inner.table[K, V](keySerde, valSerde, topic, storeName)

def table[K, V](offsetReset: TopologyBuilder.AutoOffsetReset,
keySerde: Serde[K],
valSerde: Serde[V],
topic: String,
storeName: String): KTableS[K, V] =
storeName: String)
(implicit keySerde: Serde[K], valSerde: Serde[V]): KTableS[K, V] =
inner.table[K, V](offsetReset, keySerde, valSerde, topic, storeName)

def globalTable[K, V](topic: String, storeName: String): GlobalKTable[K, V] =
inner.globalTable(topic, storeName)

def globalTable[K, V](keySerde: Serde[K],
valSerde: Serde[V],
topic: String,
storeName: String): GlobalKTable[K, V] =
def globalTable[K, V](topic: String,
storeName: String)
(implicit keySerde: Serde[K], valSerde: Serde[V]): GlobalKTable[K, V] =
inner.globalTable(keySerde, valSerde, topic, storeName)

def merge[K, V](streams: KStream[K, V]*): KStream[K, V] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,16 @@ class KStreamS[K, V](val inner: KStream[K, V]) {
def print(keySerde: Serde[K], valSerde: Serde[V], streamName: String) =
inner.print(keySerde, valSerde, streamName)

def writeAsText(filePath: String) =
inner.writeAsText(filePath)

def writeAsText(filePath: String, streamName: String) =
inner.writeAsText(filePath, streamName)

def writeAsText(filePath: String, keySerde: Serde[K], valSerde: Serde[V]) =
def writeAsText(filePath: String)
(implicit keySerde: Serde[K], valSerde: Serde[V]) = {
inner.writeAsText(filePath, keySerde, valSerde)
}

def writeAsText(filePath: String, streamName: String, keySerde: Serde[K], valSerde: Serde[V]) =
def writeAsText(filePath: String,
streamName: String)
(implicit keySerde: Serde[K], valSerde: Serde[V]) = {
inner.writeAsText(filePath, streamName, keySerde, valSerde)
}

def foreach(action: (K, V) => Unit): Unit = {
val actionJ: ForeachAction[_ >: K, _ >: V] = (k: K, v: V) => action(k, v)
Expand All @@ -93,51 +92,27 @@ class KStreamS[K, V](val inner: KStream[K, V]) {
.map(kstream => wrapKStream(kstream))
}

def through(topic: String): KStreamS[K, V] =
inner.through(topic)

def through(partitioner: (K, V, Int) => Int,
topic: String): KStreamS[K, V] = {
val partitionerJ: StreamPartitioner[K, V] =
(key: K, value: V, numPartitions: Int) => partitioner(key, value, numPartitions)
inner.through(partitionerJ, topic)
}

def through(keySerde: Serde[K],
valSerde: Serde[V],
topic: String): KStreamS[K, V] = {
def through(topic: String)
(implicit keySerde: Serde[K], valSerde: Serde[V]): KStreamS[K, V] = {
inner.through(keySerde, valSerde, topic)
}

def through(keySerde: Serde[K],
valSerde: Serde[V],
partitioner: (K, V, Int) => Int,
topic: String): KStreamS[K, V] = {
def through(partitioner: (K, V, Int) => Int,
topic: String)
(implicit keySerde: Serde[K], valSerde: Serde[V]): KStreamS[K, V] = {
val partitionerJ: StreamPartitioner[K, V] =
(key: K, value: V, numPartitions: Int) => partitioner(key, value, numPartitions)
inner.through(keySerde, valSerde, partitionerJ, topic)
}

def to(topic: String) =
inner.to(topic)

def to(partitioner: (K, V, Int) => Int,
topic: String) = {
val partitionerJ: StreamPartitioner[K, V] =
(key: K, value: V, numPartitions: Int) => partitioner(key, value, numPartitions)
inner.to(partitionerJ, topic)
}

def to(keySerde: Serde[K],
valSerde: Serde[V],
topic: String) = {
def to(topic: String)
(implicit keySerde: Serde[K], valSerde: Serde[V]) = {
inner.to(keySerde, valSerde, topic)
}

def to(keySerde: Serde[K],
valSerde: Serde[V],
partitioner: (K, V, Int) => Int,
topic: String) = {
def to(partitioner: (K, V, Int) => Int,
topic: String)
(implicit keySerde: Serde[K], valSerde: Serde[V]) = {
val partitionerJ: StreamPartitioner[K, V] =
(key: K, value: V, numPartitions: Int) => partitioner(key, value, numPartitions)
inner.to(keySerde, valSerde, partitionerJ, topic)
Expand Down Expand Up @@ -178,12 +153,8 @@ class KStreamS[K, V](val inner: KStream[K, V]) {
inner.process(processorSupplierJ, stateStoreNames: _*)
}

def groupByKey: KGroupedStreamS[K, V] =
inner.groupByKey()

def groupByKey(keySerde: Serde[K], valSerde: Serde[V]): KGroupedStreamS[K, V] = {
def groupByKey(implicit keySerde: Serde[K], valSerde: Serde[V]): KGroupedStreamS[K, V] =
inner.groupByKey(keySerde, valSerde)
}

def groupBy[KR](selector: (K, V) => KR): KGroupedStreamS[KR, V] = {
val selectorJ: KeyValueMapper[K, V, KR] = (k: K, v: V) => selector(k, v)
Expand Down
61 changes: 17 additions & 44 deletions src/main/scala/com/seigneurin/kafka/streams/scala/api/KTableS.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,16 @@ class KTableS[K, V](val inner: KTable[K, V]) {
def print(keySerde: Serde[K], valSerde: Serde[V], streamName: String) =
inner.print(keySerde, valSerde, streamName)

def writeAsText(filePath: String) =
inner.writeAsText(filePath)

def writeAsText(filePath: String, streamName: String) =
inner.writeAsText(filePath, streamName)

def writeAsText(filePath: String, keySerde: Serde[K], valSerde: Serde[V]) =
def writeAsText(filePath: String)
(implicit keySerde: Serde[K], valSerde: Serde[V]) = {
inner.writeAsText(filePath, keySerde, valSerde)
}

def writeAsText(filePath: String, streamName: String, keySerde: Serde[K], valSerde: Serde[V]) =
def writeAsText(filePath: String,
streamName: String)
(implicit keySerde: Serde[K], valSerde: Serde[V]) = {
inner.writeAsText(filePath, streamName, keySerde, valSerde)
}

def foreach(action: (K, V) => Unit): Unit = {
val actionJ: ForeachAction[_ >: K, _ >: V] = (k: K, v: V) => action(k, v)
Expand All @@ -62,54 +61,28 @@ class KTableS[K, V](val inner: KTable[K, V]) {
}

def through(topic: String,
storeName: String): KTableS[K, V] =
inner.through(topic, storeName)

def through(partitioner: (K, V, Int) => Int,
topic: String,
storeName: String): KTableS[K, V] = {
val partitionerJ: StreamPartitioner[K, V] =
(key: K, value: V, numPartitions: Int) => partitioner(key, value, numPartitions)
inner.through(partitionerJ, topic, storeName)
}

def through(keySerde: Serde[K],
valSerde: Serde[V],
topic: String,
storeName: String): KTableS[K, V] = {
storeName: String)
(implicit keySerde: Serde[K], valSerde: Serde[V]): KTableS[K, V] = {
inner.through(keySerde, valSerde, topic, storeName)
}

def through(keySerde: Serde[K],
valSerde: Serde[V],
partitioner: (K, V, Int) => Int,
def through(partitioner: (K, V, Int) => Int,
topic: String,
storeName: String): KTableS[K, V] = {
storeName: String)
(implicit keySerde: Serde[K], valSerde: Serde[V]): KTableS[K, V] = {
val partitionerJ: StreamPartitioner[K, V] =
(key: K, value: V, numPartitions: Int) => partitioner(key, value, numPartitions)
inner.through(keySerde, valSerde, partitionerJ, topic, storeName)
}

def to(topic: String) =
inner.to(topic)

def to(partitioner: (K, V, Int) => Int,
topic: String) = {
val partitionerJ: StreamPartitioner[K, V] =
(key: K, value: V, numPartitions: Int) => partitioner(key, value, numPartitions)
inner.to(partitionerJ, topic)
}

def to(keySerde: Serde[K],
valSerde: Serde[V],
topic: String) = {
def to(topic: String)
(implicit keySerde: Serde[K], valSerde: Serde[V]) = {
inner.to(keySerde, valSerde, topic)
}

def to(keySerde: Serde[K],
valSerde: Serde[V],
partitioner: (K, V, Int) => Int,
topic: String) = {
def to(partitioner: (K, V, Int) => Int,
topic: String)
(implicit keySerde: Serde[K], valSerde: Serde[V]) = {
val partitionerJ: StreamPartitioner[K, V] =
(key: K, value: V, numPartitions: Int) => partitioner(key, value, numPartitions)
inner.to(keySerde, valSerde, partitionerJ, topic)
Expand Down

0 comments on commit c8a4ff0

Please sign in to comment.