Skip to content

Commit 87b907e

Browse files
author
Joan Goyeau
committed
Scalafmt
1 parent f54ba7c commit 87b907e

16 files changed

+318
-282
lines changed

build.gradle

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,15 @@ buildscript {
2929
classpath 'org.scoverage:gradle-scoverage:2.3.0'
3030
classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.4'
3131
classpath 'org.owasp:dependency-check-gradle:3.2.1'
32+
classpath "com.diffplug.spotless:spotless-plugin-gradle:3.10.0"
33+
}
34+
}
35+
36+
apply plugin: "com.diffplug.gradle.spotless"
37+
spotless {
38+
scala {
39+
target 'streams/**/*.scala'
40+
scalafmt('1.5.1').configFile('checkstyle/.scalafmt.conf')
3241
}
3342
}
3443

checkstyle/.scalafmt.conf

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one or more
2+
// contributor license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright ownership.
4+
// The ASF licenses this file to You under the Apache License, Version 2.0
5+
// (the "License"); you may not use this file except in compliance with
6+
// the License. You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
docstrings = JavaDoc
16+
maxColumn = 120
17+
continuationIndent.defnSite = 2
18+
assumeStandardLibraryStripMargin = true
19+
danglingParentheses = true
20+
rewrite.rules = [SortImports, RedundantBraces, RedundantParens, SortModifiers]

streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import java.lang.{Iterable => JIterable}
3030
* more expressive, with less boilerplate and more succinct.
3131
* <p>
3232
* For Scala 2.11, most of these conversions need to be invoked explicitly, as Scala 2.11 does not
33-
* have full support for SAM types.
33+
* have full support for SAM types.
3434
*/
3535
object FunctionConversions {
3636

@@ -40,7 +40,7 @@ object FunctionConversions {
4040
}
4141
}
4242

43-
implicit class MapperFromFunction[T, U, VR](val f:(T,U) => VR) extends AnyVal {
43+
implicit class MapperFromFunction[T, U, VR](val f: (T, U) => VR) extends AnyVal {
4444
def asKeyValueMapper: KeyValueMapper[T, U, VR] = new KeyValueMapper[T, U, VR] {
4545
override def apply(key: T, value: U): VR = f(key, value)
4646
}
@@ -49,7 +49,7 @@ object FunctionConversions {
4949
}
5050
}
5151

52-
implicit class KeyValueMapperFromFunction[K, V, KR, VR](val f:(K,V) => (KR, VR)) extends AnyVal {
52+
implicit class KeyValueMapperFromFunction[K, V, KR, VR](val f: (K, V) => (KR, VR)) extends AnyVal {
5353
def asKeyValueMapper: KeyValueMapper[K, V, KeyValue[KR, VR]] = new KeyValueMapper[K, V, KeyValue[KR, VR]] {
5454
override def apply(key: K, value: V): KeyValue[KR, VR] = {
5555
val (kr, vr) = f(key, value)
@@ -88,7 +88,7 @@ object FunctionConversions {
8888
}
8989
}
9090

91-
implicit class MergerFromFunction[K,VR](val f: (K, VR, VR) => VR) extends AnyVal {
91+
implicit class MergerFromFunction[K, VR](val f: (K, VR, VR) => VR) extends AnyVal {
9292
def asMerger: Merger[K, VR] = new Merger[K, VR] {
9393
override def apply(aggKey: K, aggOne: VR, aggTwo: VR): VR = f(aggKey, aggOne, aggTwo)
9494
}

streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,8 @@ object ImplicitConversions {
7777
valueSerde: Serde[V]): Materialized[K, V, S] =
7878
Materialized.`with`[K, V, S](keySerde, valueSerde)
7979

80-
implicit def joinedFromKeyValueOtherSerde[K, V, VO]
81-
(implicit keySerde: Serde[K], valueSerde: Serde[V], otherValueSerde: Serde[VO]): Joined[K, V, VO] =
80+
implicit def joinedFromKeyValueOtherSerde[K, V, VO](implicit keySerde: Serde[K],
81+
valueSerde: Serde[V],
82+
otherValueSerde: Serde[VO]): Joined[K, V, VO] =
8283
Joined.`with`(keySerde, valueSerde, otherValueSerde)
8384
}

streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -25,47 +25,48 @@ import org.apache.kafka.common.serialization.{Deserializer, Serde, Serializer, S
2525
import org.apache.kafka.streams.kstream.WindowedSerdes
2626

2727
object Serdes {
28-
implicit val String: Serde[String] = JSerdes.String()
29-
implicit val Long: Serde[Long] = JSerdes.Long().asInstanceOf[Serde[Long]]
30-
implicit val JavaLong: Serde[java.lang.Long] = JSerdes.Long()
31-
implicit val ByteArray: Serde[Array[Byte]] = JSerdes.ByteArray()
28+
implicit val String: Serde[String] = JSerdes.String()
29+
implicit val Long: Serde[Long] = JSerdes.Long().asInstanceOf[Serde[Long]]
30+
implicit val JavaLong: Serde[java.lang.Long] = JSerdes.Long()
31+
implicit val ByteArray: Serde[Array[Byte]] = JSerdes.ByteArray()
3232
implicit val Bytes: Serde[org.apache.kafka.common.utils.Bytes] = JSerdes.Bytes()
33-
implicit val Float: Serde[Float] = JSerdes.Float().asInstanceOf[Serde[Float]]
34-
implicit val JavaFloat: Serde[java.lang.Float] = JSerdes.Float()
35-
implicit val Double: Serde[Double] = JSerdes.Double().asInstanceOf[Serde[Double]]
36-
implicit val JavaDouble: Serde[java.lang.Double] = JSerdes.Double()
37-
implicit val Integer: Serde[Int] = JSerdes.Integer().asInstanceOf[Serde[Int]]
38-
implicit val JavaInteger: Serde[java.lang.Integer] = JSerdes.Integer()
33+
implicit val Float: Serde[Float] = JSerdes.Float().asInstanceOf[Serde[Float]]
34+
implicit val JavaFloat: Serde[java.lang.Float] = JSerdes.Float()
35+
implicit val Double: Serde[Double] = JSerdes.Double().asInstanceOf[Serde[Double]]
36+
implicit val JavaDouble: Serde[java.lang.Double] = JSerdes.Double()
37+
implicit val Integer: Serde[Int] = JSerdes.Integer().asInstanceOf[Serde[Int]]
38+
implicit val JavaInteger: Serde[java.lang.Integer] = JSerdes.Integer()
3939

4040
implicit def timeWindowedSerde[T]: WindowedSerdes.TimeWindowedSerde[T] = new WindowedSerdes.TimeWindowedSerde[T]()
41-
implicit def sessionWindowedSerde[T]: WindowedSerdes.SessionWindowedSerde[T] = new WindowedSerdes.SessionWindowedSerde[T]()
41+
implicit def sessionWindowedSerde[T]: WindowedSerdes.SessionWindowedSerde[T] =
42+
new WindowedSerdes.SessionWindowedSerde[T]()
4243

4344
def fromFn[T >: Null](serializer: T => Array[Byte], deserializer: Array[Byte] => Option[T]): Serde[T] =
4445
JSerdes.serdeFrom(
4546
new Serializer[T] {
46-
override def serialize(topic: String, data: T): Array[Byte] = serializer(data)
47+
override def serialize(topic: String, data: T): Array[Byte] = serializer(data)
4748
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()
48-
override def close(): Unit = ()
49+
override def close(): Unit = ()
4950
},
5051
new Deserializer[T] {
51-
override def deserialize(topic: String, data: Array[Byte]): T = deserializer(data).orNull
52+
override def deserialize(topic: String, data: Array[Byte]): T = deserializer(data).orNull
5253
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()
53-
override def close(): Unit = ()
54+
override def close(): Unit = ()
5455
}
5556
)
5657

5758
def fromFn[T >: Null](serializer: (String, T) => Array[Byte],
58-
deserializer: (String, Array[Byte]) => Option[T]): Serde[T] =
59+
deserializer: (String, Array[Byte]) => Option[T]): Serde[T] =
5960
JSerdes.serdeFrom(
6061
new Serializer[T] {
61-
override def serialize(topic: String, data: T): Array[Byte] = serializer(topic, data)
62+
override def serialize(topic: String, data: T): Array[Byte] = serializer(topic, data)
6263
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()
63-
override def close(): Unit = ()
64+
override def close(): Unit = ()
6465
},
6566
new Deserializer[T] {
66-
override def deserialize(topic: String, data: Array[Byte]): T = deserializer(topic, data).orNull
67+
override def deserialize(topic: String, data: Array[Byte]): T = deserializer(topic, data).orNull
6768
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()
68-
override def close(): Unit = ()
69+
override def close(): Unit = ()
6970
}
7071
)
7172
}

streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -31,18 +31,18 @@ import ImplicitConversions._
3131
import scala.collection.JavaConverters._
3232

3333
/**
34-
* Wraps the Java class StreamsBuilder and delegates method calls to the underlying Java object.
35-
*/
34+
* Wraps the Java class StreamsBuilder and delegates method calls to the underlying Java object.
35+
*/
3636
class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
3737

3838
/**
3939
* Create a [[kstream.KStream]] from the specified topic.
4040
* <p>
41-
* The `implicit Consumed` instance provides the values of `auto.offset.reset` strategy, `TimestampExtractor`,
41+
* The `implicit Consumed` instance provides the values of `auto.offset.reset` strategy, `TimestampExtractor`,
4242
* key and value deserializers etc. If the implicit is not found in scope, compiler error will result.
4343
* <p>
4444
* A convenient alternative is to have the necessary implicit serdes in scope, which will be implicitly
45-
* converted to generate an instance of `Consumed`. @see [[ImplicitConversions]].
45+
* converted to generate an instance of `Consumed`. @see [[ImplicitConversions]].
4646
* {{{
4747
* // Brings all implicit conversions in scope
4848
* import ImplicitConversions._
@@ -88,11 +88,11 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
8888
/**
8989
* Create a [[kstream.KTable]] from the specified topic.
9090
* <p>
91-
* The `implicit Consumed` instance provides the values of `auto.offset.reset` strategy, `TimestampExtractor`,
91+
* The `implicit Consumed` instance provides the values of `auto.offset.reset` strategy, `TimestampExtractor`,
9292
* key and value deserializers etc. If the implicit is not found in scope, compiler error will result.
9393
* <p>
9494
* A convenient alternative is to have the necessary implicit serdes in scope, which will be implicitly
95-
* converted to generate an instance of `Consumed`. @see [[ImplicitConversions]].
95+
* converted to generate an instance of `Consumed`. @see [[ImplicitConversions]].
9696
* {{{
9797
* // Brings all implicit conversions in scope
9898
* import ImplicitConversions._
@@ -123,8 +123,9 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
123123
* @see #table(String)
124124
* @see `org.apache.kafka.streams.StreamsBuilder#table`
125125
*/
126-
def table[K, V](topic: String, materialized: Materialized[K, V, ByteArrayKeyValueStore])
127-
(implicit consumed: Consumed[K, V]): KTable[K, V] =
126+
def table[K, V](topic: String, materialized: Materialized[K, V, ByteArrayKeyValueStore])(
127+
implicit consumed: Consumed[K, V]
128+
): KTable[K, V] =
128129
inner.table[K, V](topic, consumed, materialized)
129130

130131
/**
@@ -139,21 +140,22 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
139140
inner.globalTable(topic, consumed)
140141

141142
/**
142-
* Create a `GlobalKTable` from the specified topic. The resulting `GlobalKTable` will be materialized
143-
* in a local `KeyValueStore` configured with the provided instance of `Materialized`. The serializers
143+
* Create a `GlobalKTable` from the specified topic. The resulting `GlobalKTable` will be materialized
144+
* in a local `KeyValueStore` configured with the provided instance of `Materialized`. The serializers
144145
* from the implicit `Consumed` instance will be used.
145146
*
146147
* @param topic the topic name
147148
* @param materialized the instance of `Materialized` used to materialize a state store
148149
* @return a `GlobalKTable` for the specified topic
149150
* @see `org.apache.kafka.streams.StreamsBuilder#globalTable`
150151
*/
151-
def globalTable[K, V](topic: String, materialized: Materialized[K, V, ByteArrayKeyValueStore])
152-
(implicit consumed: Consumed[K, V]): GlobalKTable[K, V] =
152+
def globalTable[K, V](topic: String, materialized: Materialized[K, V, ByteArrayKeyValueStore])(
153+
implicit consumed: Consumed[K, V]
154+
): GlobalKTable[K, V] =
153155
inner.globalTable(topic, consumed, materialized)
154156

155157
/**
156-
* Adds a state store to the underlying `Topology`. The store must still be "connected" to a `Processor`,
158+
* Adds a state store to the underlying `Topology`. The store must still be "connected" to a `Processor`,
157159
* `Transformer`, or `ValueTransformer` before it can be used.
158160
*
159161
* @param builder the builder used to obtain this state store `StateStore` instance
@@ -164,11 +166,11 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
164166
def addStateStore(builder: StoreBuilder[_ <: StateStore]): StreamsBuilderJ = inner.addStateStore(builder)
165167

166168
/**
167-
* Adds a global `StateStore` to the topology. Global stores should not be added to `Processor, `Transformer`,
169+
* Adds a global `StateStore` to the topology. Global stores should not be added to `Processor, `Transformer`,
168170
* or `ValueTransformer` (in contrast to regular stores).
169171
*
170172
* @see `org.apache.kafka.streams.StreamsBuilder#addGlobalStore`
171-
*/
173+
*/
172174
def addGlobalStore(storeBuilder: StoreBuilder[_ <: StateStore],
173175
topic: String,
174176
consumed: Consumed[_, _],

streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import org.apache.kafka.streams.kstream.{KGroupedStream => KGroupedStreamJ, _}
2424
import org.apache.kafka.streams.scala.ImplicitConversions._
2525
import org.apache.kafka.streams.scala.FunctionConversions._
2626

27-
2827
/**
2928
* Wraps the Java class KGroupedStream and delegates method calls to the underlying Java object.
3029
*
@@ -41,7 +40,7 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
4140
* The result is written into a local `KeyValueStore` (which is basically an ever-updating materialized view)
4241
* provided by the given `materialized`.
4342
*
44-
* @param materialized an instance of `Materialized` used to materialize a state store.
43+
* @param materialized an instance of `Materialized` used to materialize a state store.
4544
* @return a [[KTable]] that contains "update" records with unmodified keys and `Long` values that
4645
* represent the latest (rolling) count (i.e., number of records) for each key
4746
* @see `org.apache.kafka.streams.kstream.KGroupedStream#count`
@@ -55,8 +54,8 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
5554
/**
5655
* Combine the values of records in this stream by the grouped key.
5756
*
58-
* @param reducer a function `(V, V) => V` that computes a new aggregate result.
59-
* @param materialized an instance of `Materialized` used to materialize a state store.
57+
* @param reducer a function `(V, V) => V` that computes a new aggregate result.
58+
* @param materialized an instance of `Materialized` used to materialize a state store.
6059
* @return a [[KTable]] that contains "update" records with unmodified keys, and values that represent the
6160
* latest (rolling) aggregate for each key
6261
* @see `org.apache.kafka.streams.kstream.KGroupedStream#reduce`

streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
3939
* Count number of records of the original [[KTable]] that got [[KTable#groupBy]] to
4040
* the same key into a new instance of [[KTable]].
4141
*
42-
* @param materialized an instance of `Materialized` used to materialize a state store.
42+
* @param materialized an instance of `Materialized` used to materialize a state store.
4343
* @return a [[KTable]] that contains "update" records with unmodified keys and `Long` values that
4444
* represent the latest (rolling) count (i.e., number of records) for each key
4545
* @see `org.apache.kafka.streams.kstream.KGroupedTable#count`

0 commit comments

Comments
 (0)