-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Rewrite tests to use new KIP-470 API #301
Conversation
@@ -84,7 +84,7 @@ private KafkaStreams processStreams(final String bootstrapServers, final String | |||
final GlobalKTable<Long, Customer> customers = builder.globalTable(CUSTOMERS.name(), | |||
Consumed.with(CUSTOMERS.keySerde(), CUSTOMERS.valueSerde())); | |||
|
|||
final Joined<String, Order, Payment> serdes = Joined | |||
final StreamJoined<String, Order, Payment> serdes = StreamJoined |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KIP-479 cleanup
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the KIP-479 cleanups!
@@ -135,7 +135,7 @@ public void stop() { | |||
void sendEmail(EmailTuple details); | |||
} | |||
|
|||
public class EmailTuple { | |||
public static class EmailTuple { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
side cleanup
@@ -47,13 +47,13 @@ | |||
ORDERS.valueSerde()); | |||
private final Grouped<String, OrderValidation> serdes3 = Grouped | |||
.with(ORDER_VALIDATIONS.keySerde(), ORDER_VALIDATIONS.valueSerde()); | |||
private final Joined<String, Long, Order> serdes4 = Joined | |||
private final StreamJoined<String, Long, Order> serdes4 = StreamJoined |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KIP-479 cleanup
@@ -72,7 +70,7 @@ public void shouldRoundTripGenericAvroDataThroughKafka() throws Exception { | |||
record.put("user", "alice"); | |||
record.put("is_new", true); | |||
record.put("content", "lorem ipsum"); | |||
final List<GenericRecord> inputValues = Collections.singletonList(record); | |||
final List<Object> inputValues = Collections.singletonList(record); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new KIP-470 API cannot resolve GenericRecord
as type, because KafkaAvroSerializer
uses Object
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Huh, that's a bummer.
* @param valueSerializer The {@link Serializer} corresponding to the value type | ||
* @param <V> Value type of the data records | ||
*/ | ||
static <K, V> void produceValuesSynchronously(final String topic, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the new KIP-470 API, we don't need those helpers any longer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yay!
@@ -58,29 +59,26 @@ public void shouldUppercaseTheInput() { | |||
final String inputTopic = "inputTopic"; | |||
final String outputTopic = "outputTopic"; | |||
final KStream<byte[], String> input = builder.stream(inputTopic); | |||
final KStream<byte[], String> uppercased = input.mapValues(s -> s.toUpperCase()); | |||
final KStream<byte[], String> uppercased = input.mapValues((ValueMapper<String, String>) String::toUpperCase); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
side cleanup
MockSchemaRegistry.dropScope(SCHEMA_REGISTRY_SCOPE); | ||
} | ||
|
||
@Test | ||
public void shouldCountPlayEventsBySession() throws Exception { | ||
public void shouldCountPlayEventsBySession() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
side cleanup -- this method does not throw
.keyValueStoreBuilder( | ||
Stores.persistentKeyValueStore("WordCountsStore"), | ||
Serdes.String(), | ||
Serdes.Long()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix indentation only
@@ -99,7 +99,7 @@ public void shouldJoinTwoStreams() { | |||
// In this specific example, we don't need to define join serdes explicitly because the key, left value, and | |||
// right value are all of type String, which matches our default serdes configured for the application. However, | |||
// we want to showcase the use of `Joined.with(...)` in case your code needs a different type setup. | |||
Joined.with( | |||
StreamJoined.with( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KIP-479 cleanup
// Compute the total per region by summing the individual click counts per region. | ||
.groupByKey(Grouped.with(stringSerde, longSerde)) | ||
.reduce(Long::sum); | ||
// Join the stream against the table. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix indentation only
(regionValue, lastLoginValue) -> regionValue + "/" + lastLoginValue, | ||
Materialized.as(storeName)) | ||
(regionValue, lastLoginValue) -> regionValue + "/" + lastLoginValue, | ||
Materialized.as(storeName)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix indentation only
* | ||
* @return ProducerRecord containing word as key and count as value | ||
*/ | ||
private ProducerRecord<String, Long> readOutput() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With KIP-470, we don't need those helpers any longer
@@ -61,220 +61,243 @@ | |||
|
|||
@Test | |||
public void shouldSumNumbersOnSameDay() { | |||
final List<MyEvent> inputValues = Arrays.asList( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replacing MyEvent
with new KIP-470 `TestRecord:
- requires to pass in
null
key - required to pass timestamp as
Instant
new MyEvent(7, ZonedDateTime.of(2019, 1, 1, 16, 31, 0, 0, zone)), | ||
dummyEventToForceSuppression() | ||
); | ||
final List<ExpectedResult> expectedValues = Arrays.asList( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replacing ExpectedResult
with simple KeyValue
pair
); | ||
verify(inputValues, expectedValues, zone); | ||
final List<KeyValue<Windowed<Integer>, Integer>> expectedValues = | ||
Collections.singletonList(KeyValue.pair(toWindowed(1, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using toWindowed
to get Windowed<K>
for the key passing in key, start/end ts
|
||
} | ||
|
||
private class MyEvent { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MyEvent
replace with TestRecord
from KIP-470
} | ||
} | ||
|
||
private class ExpectedResult { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ExpectedResult
replace with KeyValue
pair
} | ||
} | ||
|
||
private void verifyResults(final List<ExpectedResult> expectedValues, final TopologyTestDriver testDriver) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not needed any longer -- using KIP-470 API now
} | ||
|
||
private void injectFakeData(final List<MyEvent> inputValues, final TopologyTestDriver testDriver) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not needed any longer -- using KIP-470 API now
@@ -85,25 +86,28 @@ object IntegrationTestScalaUtils { | |||
def produceValuesSynchronously[V](topic: String, values: Seq[V], driver: TopologyTestDriver) | |||
(implicit valueSerializer: Serializer[V]): Unit = { | |||
import collection.JavaConverters._ | |||
IntegrationTestUtils.produceValuesSynchronously(topic, values.asJava, driver, valueSerializer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because we removed those helper, we call the TopologyTestDriver
method directly -- it's still useful to keep the Scala helpers for explicit Serde support
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow, thanks @mjsax ! I had one high-level suggestion, and a few smaller comments.
-John
testDriver.createInputTopic(inputTopic, | ||
new IntegrationTestUtils.NothingSerde<>(), | ||
new StringSerializer()) | ||
.pipeValueList(inputValues); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tentatively, I think it might set a better example for users if we establish a pattern like:
try (testDriver = new TopologyTestDriver(...)) {
final TestInputTopic<...> inputTopic = testDriver.createInputTopic(...);
final TestOutputTopic<...> outputTopic = testDriver.createOutputTopic(...);
inputTopic.pipeValueList(inputValues);
assertThat(outputTopic.readKeyValuesToMap()).isEqualTo(expectedOutput);
}
In other words, I feel it leads to more readable tests if we declare the inputs and outputs right up front, and then push data through and assert the results later on.
@@ -72,7 +70,7 @@ public void shouldRoundTripGenericAvroDataThroughKafka() throws Exception { | |||
record.put("user", "alice"); | |||
record.put("is_new", true); | |||
record.put("content", "lorem ipsum"); | |||
final List<GenericRecord> inputValues = Collections.singletonList(record); | |||
final List<Object> inputValues = Collections.singletonList(record); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Huh, that's a bummer.
topologyTestDriver.createInputTopic(inputTopic, | ||
new IntegrationTestUtils.NothingSerde<>(), | ||
new LongSerializer()) | ||
.pipeValueList(inputValues); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like declaring the same input topic twice may disguise the intent of this test. Do you think declaring it once with a ByteArray value serializer, and then piping in invalid and valid data arrays would be clearer?
* @param valueSerializer The {@link Serializer} corresponding to the value type | ||
* @param <V> Value type of the data records | ||
*/ | ||
static <K, V> void produceValuesSynchronously(final String topic, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yay!
playEventSerializer, | ||
start | ||
); | ||
topologyTestDriver.createInputTopic(SessionWindowsExample.PLAY_EVENTS, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we can also create the input and output topics in the @Before
method so we can tidy up these tests.
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName()); | ||
// The commit interval for flushing records to state stores and downstream must be lower than | ||
// this integration test's timeout (30 secs) to ensure we observe the expected processing results. | ||
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This config has no effect now, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack. Seems to be a left over from moving from "EmbeddedKafka" to TDD.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -84,7 +84,7 @@ private KafkaStreams processStreams(final String bootstrapServers, final String | |||
final GlobalKTable<Long, Customer> customers = builder.globalTable(CUSTOMERS.name(), | |||
Consumed.with(CUSTOMERS.keySerde(), CUSTOMERS.valueSerde())); | |||
|
|||
final Joined<String, Order, Payment> serdes = Joined | |||
final StreamJoined<String, Order, Payment> serdes = StreamJoined |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the KIP-479 cleanups!
@@ -99,7 +99,7 @@ public void shouldJoinTwoStreams() { | |||
// In this specific example, we don't need to define join serdes explicitly because the key, left value, and | |||
// right value are all of type String, which matches our default serdes configured for the application. However, | |||
// we want to showcase the use of `Joined.with(...)` in case your code needs a different type setup. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: update comment to use StreamJoined
as well.
Updated this. (Added some side cleanup to use |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks so much, @mjsax ; this looks awesome!
Rebasing this to new branch |
f9e39b6
to
37d4225
Compare
No description provided.