Skip to content

Commit ad6d6d0

Browse files
committed
rm tuple2
1 parent 8d5f075 commit ad6d6d0

File tree

7 files changed

+29
-41
lines changed

7 files changed

+29
-41
lines changed

streams-client/src/main/java/no/sysco/testing/kafka/streams/topology/StreamProcessing.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import java.util.stream.Collectors;
44
import java.util.stream.Stream;
5-
import no.sysco.testing.kafka.streams.utils.Tuple2;
65
import org.apache.kafka.common.serialization.Serdes;
76
import org.apache.kafka.common.utils.Bytes;
87
import org.apache.kafka.streams.KeyValue;
@@ -18,23 +17,25 @@
1817
public class StreamProcessing {
1918

2019
// stateless
21-
public static Topology topologyUpperCase(final Tuple2<String, String> topics) {
20+
public static Topology topologyUpperCase(final String sourceTopic, final String sinkTopic) {
2221
final StreamsBuilder streamsBuilder = new StreamsBuilder();
2322
final KStream<String, String> sourceStream =
24-
streamsBuilder.stream(topics._1, Consumed.with(Serdes.String(), Serdes.String()));
23+
streamsBuilder.stream(sourceTopic, Consumed.with(Serdes.String(), Serdes.String()));
2524

2625
sourceStream
2726
.mapValues((ValueMapper<String, String>) String::toUpperCase)
28-
.to(topics._2, Produced.with(Serdes.String(), Serdes.String()));
27+
.to(sinkTopic, Produced.with(Serdes.String(), Serdes.String()));
2928
return streamsBuilder.build();
3029
}
3130

3231
// stateful
3332
public static Topology topologyCountAnagram(
34-
final Tuple2<String, String> topics, final String storeName) {
33+
final String sourceTopic,
34+
final String sinkTopic,
35+
final String storeName) {
3536
final StreamsBuilder streamsBuilder = new StreamsBuilder();
3637
final KStream<String, String> sourceStream =
37-
streamsBuilder.stream(topics._1, Consumed.with(Serdes.String(), Serdes.String()));
38+
streamsBuilder.stream(sourceTopic, Consumed.with(Serdes.String(), Serdes.String()));
3839
// 1. [null:"magic"] => ["acgim":"magic"]
3940
// 2. amount with same key
4041
sourceStream
@@ -49,7 +50,7 @@ public static Topology topologyCountAnagram(
4950
.groupByKey()
5051
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as(storeName))
5152
.toStream()
52-
.to(topics._2, Produced.with(Serdes.String(), Serdes.Long()));
53+
.to(sinkTopic, Produced.with(Serdes.String(), Serdes.Long()));
5354
return streamsBuilder.build();
5455
}
5556
}

streams-client/src/main/java/no/sysco/testing/kafka/streams/topology/StreamProcessingAvro.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package no.sysco.testing.kafka.streams.topology;
22

33
import no.sysco.testing.kafka.streams.avro.Person;
4-
import no.sysco.testing.kafka.streams.utils.Tuple2;
54
import org.apache.kafka.common.serialization.Serde;
65
import org.apache.kafka.common.serialization.Serdes;
76
import org.apache.kafka.common.utils.Bytes;
@@ -16,9 +15,11 @@ public class StreamProcessingAvro {
1615

1716
// stateless
1817
public static Topology topologyUpperCase(
19-
final Tuple2<String, String> topics, final Serde<Person> personSerdes) {
18+
final String sourceTopic,
19+
final String sinkTopic,
20+
final Serde<Person> personSerdes) {
2021
final StreamsBuilder builder = new StreamsBuilder();
21-
builder.stream(topics._1, Consumed.with(Serdes.String(), personSerdes))
22+
builder.stream(sourceTopic, Consumed.with(Serdes.String(), personSerdes))
2223
// .peek((k, v) -> System.out.printf("%s %s %s\n", v.getId(), v.getName(), v.getLastname()))
2324
.mapValues(
2425
person ->
@@ -27,22 +28,23 @@ public static Topology topologyUpperCase(
2728
.setName(person.getName().toUpperCase())
2829
.setLastname(person.getLastname().toUpperCase())
2930
.build())
30-
.to(topics._2, Produced.with(Serdes.String(), personSerdes));
31+
.to(sinkTopic, Produced.with(Serdes.String(), personSerdes));
3132
return builder.build();
3233
}
3334

3435
// stateful
3536
public static Topology topologyCountUsersWithSameName(
36-
final Tuple2<String, String> topics,
37+
String sourceTopic,
38+
String sinkTopic,
3739
final Serde<Person> personSerdes,
3840
final String storeName) {
3941

4042
final StreamsBuilder builder = new StreamsBuilder();
41-
builder.stream(topics._1, Consumed.with(Serdes.String(), personSerdes))
43+
builder.stream(sourceTopic, Consumed.with(Serdes.String(), personSerdes))
4244
.groupBy((key, value) -> value.getName())
4345
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as(storeName))
4446
.toStream()
45-
.to(topics._2, Produced.with(Serdes.String(), Serdes.Long()));
47+
.to(sinkTopic, Produced.with(Serdes.String(), Serdes.Long()));
4648

4749
return builder.build();
4850
}

streams-client/src/main/java/no/sysco/testing/kafka/streams/topology/StreamProcessingLowLvlAvro.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package no.sysco.testing.kafka.streams.topology;
22

33
import no.sysco.testing.kafka.streams.avro.Person;
4-
import no.sysco.testing.kafka.streams.utils.Tuple2;
54
import org.apache.kafka.common.serialization.Serde;
65
import org.apache.kafka.common.serialization.Serdes;
76
import org.apache.kafka.streams.KeyValue;
@@ -18,14 +17,17 @@ public class StreamProcessingLowLvlAvro {
1817

1918
// stateful
2019
public static Topology topologyDedupByUserId(
21-
final Tuple2<String, String> topics, final Serde<Person> personSerdes, final String idStore) {
20+
final String sourceTopic,
21+
final String sinkTopic,
22+
final Serde<Person> personSerdes,
23+
final String idStore) {
2224

2325
final StreamsBuilder builder = new StreamsBuilder();
2426
builder
2527
.addStateStore(
2628
Stores.keyValueStoreBuilder(
2729
Stores.persistentKeyValueStore(idStore), Serdes.String(), personSerdes))
28-
.stream(topics._1, Consumed.with(Serdes.String(), personSerdes))
30+
.stream(sourceTopic, Consumed.with(Serdes.String(), personSerdes))
2931
.transform(
3032
() ->
3133
new Transformer<String, Person, KeyValue<String, Person>>() {
@@ -56,7 +58,7 @@ public KeyValue<String, Person> transform(String key, Person value) {
5658
public void close() {}
5759
},
5860
idStore)
59-
.to(topics._2, Produced.with(Serdes.String(), personSerdes));
61+
.to(sinkTopic, Produced.with(Serdes.String(), personSerdes));
6062

6163
return builder.build();
6264
}

streams-client/src/main/java/no/sysco/testing/kafka/streams/utils/Tuple2.java

Lines changed: 0 additions & 11 deletions
This file was deleted.

streams-client/src/test/java/no/sysco/testing/kafka/streams/topology/StreamProcessingAvroTest.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import java.util.Optional;
1212
import java.util.Properties;
1313
import no.sysco.testing.kafka.streams.avro.Person;
14-
import no.sysco.testing.kafka.streams.utils.Tuple2;
1514
import org.apache.kafka.clients.consumer.ConsumerRecord;
1615
import org.apache.kafka.clients.producer.ProducerRecord;
1716
import org.apache.kafka.common.serialization.Serdes;
@@ -80,7 +79,7 @@ public void testTopologyAvro_statelessProcessors() throws IOException, RestClien
8079
serde.configure(schema, false);
8180
// get topology
8281
final Topology topology =
83-
StreamProcessingAvro.topologyUpperCase(new Tuple2<>(topicIn, topicOut), serde);
82+
StreamProcessingAvro.topologyUpperCase(topicIn, topicOut, serde);
8483
testDriver = new TopologyTestDriver(topology, properties);
8584

8685
final ConsumerRecordFactory<String, Person> factory =
@@ -128,8 +127,7 @@ public void testTopologyAvro_statefulProcessors() throws IOException, RestClient
128127
serde.configure(schema, false);
129128
// get topology
130129
final Topology topology =
131-
StreamProcessingAvro.topologyCountUsersWithSameName(
132-
new Tuple2<>(topicIn, topicOut), serde, storeName);
130+
StreamProcessingAvro.topologyCountUsersWithSameName(topicIn, topicOut, serde, storeName);
133131
testDriver = new TopologyTestDriver(topology, properties);
134132

135133
final ConsumerRecordFactory<String, Person> factory =

streams-client/src/test/java/no/sysco/testing/kafka/streams/topology/StreamProcessingLowLvlAvroTest.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import java.util.Optional;
1212
import java.util.Properties;
1313
import no.sysco.testing.kafka.streams.avro.Person;
14-
import no.sysco.testing.kafka.streams.utils.Tuple2;
1514
import org.apache.kafka.clients.consumer.ConsumerRecord;
1615
import org.apache.kafka.clients.producer.ProducerRecord;
1716
import org.apache.kafka.common.serialization.Serdes;
@@ -72,8 +71,7 @@ public void testTopologyLowLvlAvro_statefulProcessors() throws IOException, Rest
7271
serde.configure(schema, false);
7372
// get topology
7473
final Topology topology =
75-
StreamProcessingLowLvlAvro.topologyDedupByUserId(
76-
new Tuple2<>(topicIn, topicOut), serde, storeName);
74+
StreamProcessingLowLvlAvro.topologyDedupByUserId(topicIn, topicOut, serde, storeName);
7775
testDriver = new TopologyTestDriver(topology, properties);
7876

7977
final ConsumerRecordFactory<String, Person> factory =
@@ -112,8 +110,7 @@ public void testTopologyLowLvlAvro_statefulProcessors_invalidInput()
112110
serde.configure(schema, false);
113111
// get topology
114112
final Topology topology =
115-
StreamProcessingLowLvlAvro.topologyDedupByUserId(
116-
new Tuple2<>(topicIn, topicOut), serde, storeName);
113+
StreamProcessingLowLvlAvro.topologyDedupByUserId(topicIn, topicOut, serde, storeName);
117114
testDriver = new TopologyTestDriver(topology, properties);
118115

119116
final ConsumerRecordFactory<String, Person> factory =

streams-client/src/test/java/no/sysco/testing/kafka/streams/topology/StreamProcessingTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import java.util.Arrays;
44
import java.util.Optional;
55
import java.util.Properties;
6-
import no.sysco.testing.kafka.streams.utils.Tuple2;
76
import org.apache.kafka.clients.consumer.ConsumerRecord;
87
import org.apache.kafka.clients.producer.ProducerRecord;
98
import org.apache.kafka.common.serialization.LongDeserializer;
@@ -47,7 +46,7 @@ public void tearDown() {
4746
@Test
4847
public void testTopology_statelessProcessors_Uppercase() {
4948
// Arrange
50-
final Topology topology = StreamProcessing.topologyUpperCase(new Tuple2<>(topicIn, topicOut));
49+
final Topology topology = StreamProcessing.topologyUpperCase(topicIn, topicOut);
5150
testDriver = new TopologyTestDriver(topology, properties);
5251
final ConsumerRecordFactory<String, String> factory =
5352
new ConsumerRecordFactory<>(topicIn, new StringSerializer(), new StringSerializer());
@@ -75,7 +74,7 @@ public void testTopology_statefullProcessors_Anagram() {
7574
// Arrange
7675
final String storeName = "count-storage";
7776
final Topology topology =
78-
StreamProcessing.topologyCountAnagram(new Tuple2<>(topicIn, topicOut), storeName);
77+
StreamProcessing.topologyCountAnagram(topicIn, topicOut, storeName);
7978
// setup TopologyTestDriver
8079
testDriver = new TopologyTestDriver(topology, properties);
8180
final ConsumerRecordFactory<String, String> factory =

0 commit comments

Comments
 (0)