Skip to content
This repository was archived by the owner on May 25, 2023. It is now read-only.

Commit 64d0688

Browse files
committed
Merge branch 'develop'
2 parents 8c983ab + 2bb39d1 commit 64d0688

30 files changed

+817
-311
lines changed

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ jdk: oraclejdk8
44
scala:
55
- 2.11.11
66
- 2.12.4
7-
sbt_args: -mem 1500
7+
sbt_args: -mem 2000
88
script:
99
- sbt "++ ${TRAVIS_SCALA_VERSION}!" test
1010
cache:

README.md

Lines changed: 45 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ The library wraps Java APIs in Scala thereby providing:
77
1. much better type inference in Scala
88
2. less boilerplate in application code
99
3. the usual builder-style composition that developers get with the original Java API
10+
4. complete compile time type safety
1011

1112
The design of the library was inspired by the work started by Alexis Seigneurin in [this repository](https://github.com/aseigneurin/kafka-streams-scala).
1213

@@ -15,24 +16,24 @@ The design of the library was inspired by the work started by Alexis Seigneurin
1516
`kafka-streams-scala` is published and cross-built for Scala `2.11`, and `2.12`, so you can just add the following to your build:
1617

1718
```scala
18-
val kafka_streams_scala_version = "0.1.1"
19+
val kafka_streams_scala_version = "0.2.0"
1920

2021
libraryDependencies ++= Seq("com.lightbend" %%
2122
"kafka-streams-scala" % kafka_streams_scala_version)
2223
```
2324

2425
> Note: `kafka-streams-scala` supports onwards Kafka Streams `1.0.0`.
2526
26-
The API docs for `kafka-streams-scala` is available [here](https://developer.lightbend.com/docs/api/kafka-streams-scala/0.1.1/com/lightbend/kafka/scala/streams) for Scala 2.12 and [here](https://developer.lightbend.com/docs/api/kafka-streams-scala_2.11/0.1.1/#package) for Scala 2.11.
27+
The API docs for `kafka-streams-scala` is available [here](https://developer.lightbend.com/docs/api/kafka-streams-scala/0.2.0/com/lightbend/kafka/scala/streams) for Scala 2.12 and [here](https://developer.lightbend.com/docs/api/kafka-streams-scala_2.11/0.2.0/#package) for Scala 2.11.
2728

2829
## Running the Tests
2930

3031
The library comes with an embedded Kafka server. To run the tests, simply run `sbt testOnly` and all tests will run on the local embedded server.
3132

32-
> The embedded server is started and stopped for every test and takes quite a bit of resources. Hence it's recommended that you allocate more heap space to `sbt` when running the tests. e.g. `sbt -mem 1500`.
33+
> The embedded server is started and stopped for every test and takes quite a bit of resources. Hence it's recommended that you allocate more heap space to `sbt` when running the tests. e.g. `sbt -mem 2000`.
3334
3435
```bash
35-
$ sbt -mem 1500
36+
$ sbt -mem 2000
3637
> +clean
3738
> +test
3839
```
@@ -52,39 +53,56 @@ val clicksPerRegion: KTableS[String, Long] = userClicksStream
5253
.map((_, regionWithClicks) => regionWithClicks)
5354

5455
// Compute the total per region by summing the individual click counts per region.
55-
.groupByKey(Serialized.`with`(stringSerde, longSerde))
56+
.groupByKey
5657
.reduce(_ + _)
5758
```
5859

59-
> **Notes:**
60-
>
61-
> 1. The left quotes around "with" are there because `with` is a Scala keyword. This is the mechanism you use to "escape" a Scala keyword when it's used as a normal identifier in a Java library.
62-
> 2. Note that some methods, like `map`, take a two-argument function, for key-value pairs, rather than the more typical single argument.
60+
## Implicit Serdes
6361

64-
## Better Abstraction
62+
One of the areas where the Java APIs' verbosity can be reduced is through a succinct way to pass serializers and de-serializers to the various functions. The library uses the power of Scala implicits towards this end. The library makes some decisions that help implement more succinct serdes in a type safe manner:
6563

66-
The wrapped Scala APIs also incur less boilerplate by taking advantage of Scala function literals that get converted to Java objects in the implementation of the API. Hence the surface syntax of the client API looks simpler and less noisy.
64+
1. No use of configuration based default serdes. Java APIs allow the user to define default key and value serdes as part of the configuration. This configuration, being implemented as `java.util.Properties` is type-unsafe and hence can result in runtime errors in case the user misses any of the serdes to be specified or plugs in an incorrect serde. `kafka-streams-scala` makes this completely type-safe by allowing all serdes to be specified through Scala implicits.
65+
2. The libraty offers implicit conversions from serdes to `Serialized`, `Produced`, `Consumed` or `Joined`. Hence as a user you just have to pass in the implicit serde and all conversions to `Serialized`, `Produced`, `Consumed` or `Joined` will be taken care of automatically.
6766

68-
Here's an example of a snippet built using the Java API from Scala ..
67+
68+
### Default Serdes
69+
70+
The library offers a module that contains all the default serdes for the primitives. Importing the object will bring in scope all such primitives and helps reduce implicit hell.
6971

7072
```scala
71-
val approximateWordCounts: KStream[String, Long] = textLines
72-
.flatMapValues(value => value.toLowerCase.split("\\W+").toIterable.asJava)
73-
.transform(
74-
new TransformerSupplier[Array[Byte], String, KeyValue[String, Long]] {
75-
override def get() = new ProbabilisticCounter
76-
},
77-
cmsStoreName)
78-
approximateWordCounts.to(outputTopic, Produced.`with`(Serdes.String(), longSerde))
73+
object DefaultSerdes {
74+
implicit val stringSerde: Serde[String] = Serdes.String()
75+
implicit val longSerde: Serde[Long] = Serdes.Long().asInstanceOf[Serde[Long]]
76+
implicit val byteArraySerde: Serde[Array[Byte]] = Serdes.ByteArray()
77+
implicit val bytesSerde: Serde[org.apache.kafka.common.utils.Bytes] = Serdes.Bytes()
78+
implicit val floatSerde: Serde[Float] = Serdes.Float().asInstanceOf[Serde[Float]]
79+
implicit val doubleSerde: Serde[Double] = Serdes.Double().asInstanceOf[Serde[Double]]
80+
implicit val integerSerde: Serde[Int] = Serdes.Integer().asInstanceOf[Serde[Int]]
81+
}
7982
```
8083

81-
And here's the corresponding snippet using the Scala library. Note how the noise of `TransformerSupplier` has been abstracted out by the function literal syntax of Scala.
84+
### Compile time typesafe
85+
86+
Not only the serdes, but `DefaultSerdes` also brings into scope implicit `Serialized`, `Produced`, `Consumed` and `Joined` instances. So all APIs that accept `Serialized`, `Produced`, `Consumed` or `Joined` will get these instances automatically with an `import DefaultSerdes._`.
87+
88+
Just one import of `DefaultSerdes._` and the following code does not need a bit of `Serialized`, `Produced`, `Consumed` or `Joined` to be specified explicitly or through the default config. **And the best part is that for any missing instances of these you get a compilation error.** ..
8289

8390
```scala
84-
textLines
85-
.flatMapValues(value => value.toLowerCase.split("\\W+").toIterable)
86-
.transform(() => new ProbabilisticCounter, cmsStoreName)
87-
.to(outputTopic, Produced.`with`(Serdes.String(), longSerde))
88-
```
91+
import DefaultSerdes._
92+
93+
val clicksPerRegion: KTableS[String, Long] =
94+
userClicksStream
95+
96+
// Join the stream against the table.
97+
.leftJoin(userRegionsTable, (clicks: Long, region: String) => (if (region == null) "UNKNOWN" else region, clicks))
98+
99+
// Change the stream from <user> -> <region, clicks> to <region> -> <clicks>
100+
.map((_, regionWithClicks) => regionWithClicks)
89101

90-
Also, the explicit conversion `asJava` from a Scala `Iterable` to a Java `Iterable` is done for you by the Scala library.
102+
// Compute the total per region by summing the individual click counts per region.
103+
.groupByKey
104+
.reduce(_ + _)
105+
106+
// Write the (continuously updating) results to the output topic.
107+
clicksPerRegion.toStream.to(outputTopic)
108+
```

build.sbt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ name := "kafka-streams-scala"
44

55
organization := "com.lightbend"
66

7-
version := "0.1.2"
7+
version := "0.2.0"
88

99
scalaVersion := Versions.Scala_2_12_Version
1010

@@ -23,7 +23,8 @@ libraryDependencies ++= Seq(
2323
minitest % "test",
2424
minitestLaws % "test",
2525
algebird % "test",
26-
chill % "test"
26+
chill % "test",
27+
avro4s % "test"
2728
)
2829

2930
testFrameworks += new TestFramework("minitest.runner.Framework")
@@ -54,4 +55,4 @@ publishTo := {
5455

5556
publishMavenStyle := true
5657

57-
publishArtifact in Test := false
58+
publishArtifact in Test := true

project/Dependencies.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,5 @@ object Dependencies {
2121
val minitestLaws = "io.monix" %% "minitest-laws" % MinitestVersion
2222
val algebird = "com.twitter" %% "algebird-core" % AlgebirdVersion
2323
val chill = "com.twitter" %% "chill" % ChillVersion
24+
val avro4s = "com.sksamuel.avro4s" %% "avro4s-core" % Avro4sVersion
2425
}

project/Versions.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,6 @@ object Versions {
99
val JDKVersion = "1.8"
1010
val Scala_2_12_Version = "2.12.4"
1111
val Scala_2_11_Version = "2.11.11"
12+
val Avro4sVersion = "1.8.3"
1213
val CrossScalaVersions = Seq(Scala_2_12_Version, Scala_2_11_Version )
1314
}

project/plugins.sbt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "1.0.0")

scalastyle-config.xml

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
<scalastyle>
2+
<name>Scalastyle standard configuration</name>
3+
<check level="warning" class="org.scalastyle.file.FileTabChecker" enabled="true"></check>
4+
<check level="warning" class="org.scalastyle.file.FileLengthChecker" enabled="true">
5+
<parameters>
6+
<parameter name="maxFileLength"><![CDATA[800]]></parameter>
7+
</parameters>
8+
</check>
9+
<check level="warning" class="org.scalastyle.file.HeaderMatchesChecker" enabled="false">
10+
<parameters>
11+
<parameter name="header"><![CDATA[// Copyright (C) 2011-2012 the original author or authors.
12+
// See the LICENCE.txt file distributed with this work for additional
13+
// information regarding copyright ownership.
14+
//
15+
// Licensed under the Apache License, Version 2.0 (the "License");
16+
// you may not use this file except in compliance with the License.
17+
// You may obtain a copy of the License at
18+
//
19+
// http://www.apache.org/licenses/LICENSE-2.0
20+
//
21+
// Unless required by applicable law or agreed to in writing, software
22+
// distributed under the License is distributed on an "AS IS" BASIS,
23+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
24+
// See the License for the specific language governing permissions and
25+
// limitations under the License.]]></parameter>
26+
</parameters>
27+
</check>
28+
<check level="warning" class="org.scalastyle.scalariform.SpacesAfterPlusChecker" enabled="true"></check>
29+
<check level="warning" class="org.scalastyle.file.WhitespaceEndOfLineChecker" enabled="true"></check>
30+
<check level="warning" class="org.scalastyle.scalariform.SpacesBeforePlusChecker" enabled="true"></check>
31+
<check level="warning" class="org.scalastyle.file.FileLineLengthChecker" enabled="true">
32+
<parameters>
33+
<parameter name="maxLineLength"><![CDATA[160]]></parameter>
34+
<parameter name="tabSize"><![CDATA[4]]></parameter>
35+
</parameters>
36+
</check>
37+
<check level="warning" class="org.scalastyle.scalariform.ClassNamesChecker" enabled="true">
38+
<parameters>
39+
<parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter>
40+
</parameters>
41+
</check>
42+
<check level="warning" class="org.scalastyle.scalariform.ObjectNamesChecker" enabled="true">
43+
<parameters>
44+
<parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter>
45+
</parameters>
46+
</check>
47+
<check level="warning" class="org.scalastyle.scalariform.PackageObjectNamesChecker" enabled="true">
48+
<parameters>
49+
<parameter name="regex"><![CDATA[^[a-z][A-Za-z]*$]]></parameter>
50+
</parameters>
51+
</check>
52+
<check level="warning" class="org.scalastyle.scalariform.EqualsHashCodeChecker" enabled="true"></check>
53+
<check level="warning" class="org.scalastyle.scalariform.IllegalImportsChecker" enabled="true">
54+
<parameters>
55+
<parameter name="illegalImports"><![CDATA[sun._,java.awt._]]></parameter>
56+
</parameters>
57+
</check>
58+
<check level="warning" class="org.scalastyle.scalariform.ParameterNumberChecker" enabled="true">
59+
<parameters>
60+
<parameter name="maxParameters"><![CDATA[8]]></parameter>
61+
</parameters>
62+
</check>
63+
<check level="warning" class="org.scalastyle.scalariform.MagicNumberChecker" enabled="false">
64+
<parameters>
65+
<parameter name="ignore"><![CDATA[-1,0,1,2,3]]></parameter>
66+
</parameters>
67+
</check>
68+
<check level="warning" class="org.scalastyle.scalariform.NoWhitespaceBeforeLeftBracketChecker" enabled="true"></check>
69+
<check level="warning" class="org.scalastyle.scalariform.NoWhitespaceAfterLeftBracketChecker" enabled="true"></check>
70+
<check level="warning" class="org.scalastyle.scalariform.ReturnChecker" enabled="true"></check>
71+
<check level="warning" class="org.scalastyle.scalariform.NullChecker" enabled="true"></check>
72+
<check level="warning" class="org.scalastyle.scalariform.NoCloneChecker" enabled="true"></check>
73+
<check level="warning" class="org.scalastyle.scalariform.NoFinalizeChecker" enabled="true"></check>
74+
<check level="warning" class="org.scalastyle.scalariform.CovariantEqualsChecker" enabled="true"></check>
75+
<check level="warning" class="org.scalastyle.scalariform.StructuralTypeChecker" enabled="true"></check>
76+
<check level="warning" class="org.scalastyle.file.RegexChecker" enabled="true">
77+
<parameters>
78+
<parameter name="regex"><![CDATA[println]]></parameter>
79+
</parameters>
80+
</check>
81+
<check level="warning" class="org.scalastyle.scalariform.NumberOfTypesChecker" enabled="true">
82+
<parameters>
83+
<parameter name="maxTypes"><![CDATA[30]]></parameter>
84+
</parameters>
85+
</check>
86+
<check level="warning" class="org.scalastyle.scalariform.CyclomaticComplexityChecker" enabled="true">
87+
<parameters>
88+
<parameter name="maximum"><![CDATA[10]]></parameter>
89+
</parameters>
90+
</check>
91+
<check level="warning" class="org.scalastyle.scalariform.UppercaseLChecker" enabled="true"></check>
92+
<check level="warning" class="org.scalastyle.scalariform.SimplifyBooleanExpressionChecker" enabled="true"></check>
93+
<check level="warning" class="org.scalastyle.scalariform.IfBraceChecker" enabled="false">
94+
<parameters>
95+
<parameter name="singleLineAllowed"><![CDATA[true]]></parameter>
96+
<parameter name="doubleLineAllowed"><![CDATA[false]]></parameter>
97+
</parameters>
98+
</check>
99+
<check level="warning" class="org.scalastyle.scalariform.MethodLengthChecker" enabled="true">
100+
<parameters>
101+
<parameter name="maxLength"><![CDATA[50]]></parameter>
102+
</parameters>
103+
</check>
104+
<check level="warning" class="org.scalastyle.scalariform.MethodNamesChecker" enabled="true">
105+
<parameters>
106+
<parameter name="regex"><![CDATA[^[a-z][A-Za-z0-9]*$]]></parameter>
107+
</parameters>
108+
</check>
109+
<check level="warning" class="org.scalastyle.scalariform.NumberOfMethodsInTypeChecker" enabled="true">
110+
<parameters>
111+
<parameter name="maxMethods"><![CDATA[30]]></parameter>
112+
</parameters>
113+
</check>
114+
<check level="warning" class="org.scalastyle.scalariform.PublicMethodsHaveTypeChecker" enabled="true"></check>
115+
<check level="warning" class="org.scalastyle.file.NewLineAtEofChecker" enabled="true"></check>
116+
<check level="warning" class="org.scalastyle.file.NoNewLineAtEofChecker" enabled="false"></check>
117+
</scalastyle>
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/**
2+
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
3+
*/
4+
5+
package com.lightbend.kafka.scala.streams
6+
7+
import org.apache.kafka.common.serialization.{Serde, Serdes}
8+
9+
10+
/**
11+
* Implicit values for default serdes
12+
*/
13+
object DefaultSerdes {
14+
implicit val stringSerde: Serde[String] = Serdes.String()
15+
implicit val longSerde: Serde[Long] = Serdes.Long().asInstanceOf[Serde[Long]]
16+
implicit val byteArraySerde: Serde[Array[Byte]] = Serdes.ByteArray()
17+
implicit val bytesSerde: Serde[org.apache.kafka.common.utils.Bytes] = Serdes.Bytes()
18+
implicit val floatSerde: Serde[Float] = Serdes.Float().asInstanceOf[Serde[Float]]
19+
implicit val doubleSerde: Serde[Double] = Serdes.Double().asInstanceOf[Serde[Double]]
20+
implicit val integerSerde: Serde[Int] = Serdes.Integer().asInstanceOf[Serde[Int]]
21+
}

src/main/scala/com/lightbend/kafka/scala/streams/FunctionConversions.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@ import org.apache.kafka.streams.KeyValue
88
import org.apache.kafka.streams.kstream._
99

1010
/**
11-
* Implicit classes that offer conversions of Scala function literals to
11+
* Implicit classes that offer conversions of Scala function literals to
1212
* SAM (Single Abstract Method) objects in Java. These make the Scala APIs much
1313
* more expressive, with less boilerplate and more succinct.
14-
*/
14+
*/
1515
object FunctionConversions {
1616

1717
implicit class PredicateFromFunction[K, V](val test: (K, V) => Boolean) extends AnyVal {

src/main/scala/com/lightbend/kafka/scala/streams/ImplicitConversions.scala

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,15 @@
55
package com.lightbend.kafka.scala.streams
66

77
import org.apache.kafka.streams.kstream._
8-
import org.apache.kafka.streams.KeyValue
8+
import org.apache.kafka.streams.{ KeyValue, Consumed }
9+
import org.apache.kafka.common.serialization.Serde
910

1011
import scala.language.implicitConversions
1112

1213
/**
1314
* Implicit conversions between the Scala wrapper objects and the underlying Java
1415
* objects.
15-
*/
16+
*/
1617
object ImplicitConversions {
1718

1819
implicit def wrapKStream[K, V](inner: KStream[K, V]): KStreamS[K, V] =
@@ -33,7 +34,22 @@ object ImplicitConversions {
3334
implicit def wrapKGroupedTable[K, V](inner: KGroupedTable[K, V]): KGroupedTableS[K, V] =
3435
new KGroupedTableS[K, V](inner)
3536

36-
implicit def Tuple2ToKeyValue[K, V](tuple: (K, V)): KeyValue[K, V] = new KeyValue(tuple._1, tuple._2)
37+
implicit def tuple2ToKeyValue[K, V](tuple: (K, V)): KeyValue[K, V] = new KeyValue(tuple._1, tuple._2)
3738

38-
}
39+
//scalastyle:on null
40+
// we would also like to allow users implicit serdes
41+
// and these implicits will convert them to `Serialized`, `Produced` or `Consumed`
42+
43+
implicit def serializedFromSerde[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Serialized[K, V] =
44+
Serialized.`with`(keySerde, valueSerde)
3945

46+
implicit def consumedFromSerde[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Consumed[K, V] =
47+
Consumed.`with`(keySerde, valueSerde)
48+
49+
implicit def producedFromSerde[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Produced[K, V] =
50+
Produced.`with`(keySerde, valueSerde)
51+
52+
implicit def joinedFromKVOSerde[K, V, VO](implicit keySerde: Serde[K], valueSerde: Serde[V],
53+
otherValueSerde: Serde[VO]): Joined[K, V, VO] =
54+
Joined.`with`(keySerde, valueSerde, otherValueSerde)
55+
}

src/main/scala/com/lightbend/kafka/scala/streams/KGroupedStreamS.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import FunctionConversions._
1414

1515
/**
1616
* Wraps the Java class KGroupedStream and delegates method calls to the underlying Java object.
17-
*/
17+
*/
1818
class KGroupedStreamS[K, V](inner: KGroupedStream[K, V]) {
1919

2020
def count(): KTableS[K, Long] = {
@@ -42,11 +42,16 @@ class KGroupedStreamS[K, V](inner: KGroupedStream[K, V]) {
4242
}
4343

4444
def reduce(reducer: (V, V) => V,
45-
storeName: String): KTableS[K, V] = {
45+
storeName: String)(implicit keySerde: Serde[K], valueSerde: Serde[V]): KTableS[K, V] = {
4646

4747
// need this explicit asReducer for Scala 2.11 or else the SAM conversion doesn't take place
4848
// works perfectly with Scala 2.12 though
49-
inner.reduce(((v1: V, v2: V) => reducer(v1, v2)).asReducer, Materialized.as[K, V, KeyValueStore[Bytes, Array[Byte]]](storeName))
49+
inner.reduce(((v1: V, v2: V) =>
50+
reducer(v1, v2)).asReducer,
51+
Materialized.as[K, V, KeyValueStore[Bytes, Array[Byte]]](storeName)
52+
.withKeySerde(keySerde)
53+
.withValueSerde(valueSerde)
54+
)
5055
}
5156

5257
def aggregate[VR](initializer: () => VR,

0 commit comments

Comments
 (0)